Анализ сайта потребления сброса RocketMQ

исходный код RocketMQ
Анализ сайта потребления сброса RocketMQ

сброс потребления

Операция сброса места потребления по времени, похоже, не фигурирует в обычной логике выполнения Rocket, похоже, сработает только логика использования функции сброса из консоли.
Проанализируем процесс реализации ta.

Вход

Rocket的运维指令都包装成了SubCommand对象,而重置消费位点对应的是ResetOffsetByTimeCommand对象
Rocket的MQAdmin启动时,调用initCommand(),将命令对象实例化并注册到subCommandList中。

接收到对应的命令之后findSubCommand()可以定位到对应的SubCommand对象,
并调用该命令对象的buildCommandlineOptions()进行参数的解析与组装,最后返回一个参数集合对象Options

真正的逻辑处理需要执行命令对象的execute();

ResetOffsetByTimeCommand.execute()

首先要根据时间戳获取到ConsumeQueue中的偏移量

调用栈如下:

    ResetOffsetByTimeCommand.execute()
->  DefaultMQAdminExt.resetOffsetByTimestamp()
->  DefaultMQAdminExtImpl.resetOffsetByTimestamp()
->  MQClientAPIImpl.invokeBrokerToResetOffset()
->  RemotingClient.invokeSync()

假设没有服务端开发经验,也没有系统的研究过网络编程,调用链到此处就断开了。
想必你应该知道Rocket的各个组件通过Tcp建立通信连接,组件间的信息交换则是通过一个个RPC调用实现的。
Rocket组件间的交互信息的通讯协议专门定义了此数据帧的业务类型,所有类型在RequestCode中都有明确定义。
同样重置消费位点也有约定其业务类型枚举常量:RequestCode.INVOKE_BROKER_TO_RESET_OFFSET。
顺藤摸瓜你就可以找到此消息的处理器:
    (这里默认你已经理解了Rocket中处理数据包的那套逻辑,即使不理解也没问题,跟着文中思路也可以找到此处)
    AdminBrokerProcessor.processRequest()
->  AdminBrokerProcessor.resetOffset()
->  Broker2Client.resetOffset()
->  DefaultMessageStore.getOffsetInQueueByTime()
->  ConsumeQueue.getOffsetInQueueByTime()

根据时间戳获取到ConsumeQueue中的偏移量的实现就在这一个方法中ConsumeQueue.getOffsetInQueueByTime()

Broker2Client

После распаковки и разбора фрейма данных в нем содержится ключевая информация для сброса сайта потребления: топик, группа, отметка времени. TopicConfigManager сохраняет соответствующую конфигурацию каждой темы, поэтому можно легко получить соответствующую конфигурацию по названию темы.
Основная цель здесь — получить количество доступных для записи очередей для ta.
Фрагмент кода ключа:

    Map<MessageQueue, Long> offsetTable = new HashMap<>();
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        /* i即是QueueId,构造出MessageQueue对象 */
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setTopic(topic);
        mq.setQueueId(i);
        /* 根据时间戳计算出ConsumeQueue中的偏移量 */
        timeStampOffset = this.brokerController.getMessageStore()
            .getOffsetInQueueByTime(topic, i, timeStamp);
        
        offsetTable.put(mq, timeStampOffset);
    }

Реализация getOffsetInQueueByTime() также весьма замечательна и в конечном итоге будет вызываться для ConsumeQueue#getOffsetInQueueByTime() использует метод деления пополам для определения окончательного смещения.
На данный момент Брокер вычислил смещение указанной точки времени в каждой ConsumeQueu Темы, но у Клиента нет восприятия.
Таким образом, как и в предыдущей операции, Брокер выполняет вызов RPC, чтобы сообщить Клиенту, что определенной группе в теме необходимо обновить смещение потребления, которое зависит от данных, которые я передаю на этот раз.

Broker Rpc Client

выдержка некоторых ключевых кодов

public RemotingCommand resetOffset(String topic, String group, long timeStamp,
                                   boolean isForce, boolean isC) {
    /* 构造请求体 */                                   
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timeStamp);
    RemotingCommand request = RemotingCommand.createRequestCommand(
        RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader
    );     
    ResetOffsetBody body = new ResetOffsetBody();
    body.setOffsetTable(offsetTable);
    request.setBody(body.encode());     

    /* 执行Rpc调用 */   
    this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
   }

Client reset

Принцип тот же, что и выше, соответствующую логику обработки мы можем найти по RequestCode

public RemotingCommand resetOffset(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    ResetOffsetRequestHeader requestHeader =
        (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
    /* 序列化数据得到Message、消费偏移量的映射关系 */
    Map<MessageQueue, Long> offsetTable = new HashMap<>();
    if (request.getBody() != null) {
        ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
        offsetTable = body.getOffsetTable();
    }

    /* 根据上述数据修改客户端本地消费进度 */
    this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
    return null;
}

MqClientFactory

MqClientFactory инкапсулирует API обработки сети Rocket и является сетевым каналом для обмена информацией между производителями сообщений, потребителями сообщений, NameServ и Broker.
Также содержит все экземпляры Consumer в экземпляре JVM.
Извлеките код ключа MqClientFactory:

resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
    /* 根据Group从列表中搜索到对应的Consumer */
    MQConsumerInner impl = this.consumerTable.get(group);

    /* 如果是推模式则强转为DefaultMQPushConsumerImpl类型 */
    if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
        consumer = (DefaultMQPushConsumerImpl) impl;
    } else {
        /* 如果不是则直接结束 */
        return;
    }

    /* 暂停消费 */
    consumer.suspend();

    /* 循环更新消费进度 */
    while (iterator.hasNext()) {
        MessageQueue mq = iterator.next();
        Long offset = offsetTable.get(mq);
        if (topic.equals(mq.getTopic()) && offset != null) {
            try {
                consumer.updateConsumeOffset(mq, offset);
            } catch (Exception e) {
                log.warn("reset offset failed. group={}, {}", group, mq, e);
            }
        }
    }
        
    /* 恢复工作 */
    consumer.resume();
}
   

сомневаться:

Во время выполнения resetOffset обнаруживается, что тип потребителя не DefaultMQPushConsumerImpl, и он возвращает прямо заранее.Как сбросить сайт потребления в режиме pull?