Резюме предыдущей ситуации: эта статья не предназначена для того, чтобы углубляться в исходный код RocketMQ для анализа его принципа балансировки нагрузки, сосредоточившись на том, как инициировать балансировку нагрузки.
Вы, должно быть, поняли, что философия дизайна RocketMQ заключается в том, что одна и та же очередь может одновременно удерживаться только одним потребителем. Однако один и тот же потребитель может использовать несколько очередей одновременно.Для повышения эффективности модели подписки и потребления Rocket всегда надеется распределять очереди достаточно равномерно. В повседневном использовании онлайн и офлайн Потребителя, а также динамическое расширение и сжатие Очереди могут нарушить баланс распределения. Поэтому Rocket предоставляет полный механизм ребалансировки для описанной выше ситуации.
Условия срабатывания
总结下来Rebalance一共三个触发条件,两个主动,一个被动。满足任意一个都会触发
1.Consumer启动之时执行start方法主动执行负载均衡逻辑;
2.定时任务触发;
3.Broker下发通知告知Client需要进行负载均衡;
今天重新翻阅代码的时候发现
很巧合三个触发条件或多或少跟DefaultMQPushConsumerImpl.start()都有关系;
DefaultMQPushConsumerImpl.start()
Когда DefaultMQPushConsumerImpl создает экземпляр, он инициализирует переменную-член rebalanceImpl.
закрытый окончательный RebalanceImpl rebalanceImpl = новый RebalancePushImpl(this);
В этот момент объект rebalanceImpl не действует, потому что атрибут ключевого члена ta по-прежнему равен null, а за назначение отвечает начало ниже.
Ниже приведен отрывок из основного кода start():
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
/* 检查配置 */
this.checkConfig();
/* 构建 Topic 订阅信息——SubscriptionData,并添加至 RebalanceImpl 的订阅信息中 */
this.copySubscription();
/* 初始化 MQClientInstance */
this.mQClientFactory = MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
/**
* 丰富 rebalanceImpl 对象属性,注意到了吗之前初始化的对象充血了
* 之前产生的 rebalanceImpl 对象直到此刻才算真正意义上的初始化完成
* rebalanceImpl就是负载均衡的相关实现
*/
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
/**
* 向 MQClientInstance 注册消费者,并启动 MQClientInstance
* 一个 JVM 中的所有消费者、生产者持有同一个 MQClientInstance,MQClientInstance 只会启动一次
*/
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
break;
case
...;
default:
break;
}
/* Consumer启动成功,立即向所有Broker发送心跳 */
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
/*
* 注意到了嘛,Consumer上线会立即触发一次负载均衡
* 但是这里并不是调用一下负载均衡的实现那么简单,这里其实是唤醒了相关服务线程
* 下文笔者会着重介绍
*/
this.mQClientFactory.rebalanceImmediately();
}
RebalanceImpl
Как следует из названия, все операции, связанные с балансировкой нагрузки Consumer, делегируются объекту RebalanceImpl.
Каждый объект Consumer содержит экземпляр RebalanceImpl, и каждый экземпляр RebalanceImpl обслуживает только одного Consumer.
Эти два являются взаимным владением, круговыми отношениями ссылки.
Давайте взглянем на ключевые свойства этого объекта:
RebalancePushImpl extends RebalanceImpl {
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<>(64);
/* ConcurrentMap<topic, Set<MessageQueue>>, Topic与分给自己的MessageQueue信息 */
protected final ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<>();
/* ConcurrentMap<topic, SubscriptionData>, Topic与订阅数据 */
protected final ConcurrentMap<String, SubscriptionData> subscriptionInner = new ConcurrentHashMap<>();
/* 负载算法的具体实现,究竟如何分配就是由这个总指挥决定的 */
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
/* Consumer实例 */
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
}
Один из объектов ProcessQueue особенно примечателен, так как я не делал никаких замечаний.Если вы читали исходный код Rocket, то должны знать, что это Потребитель - чрезвычайно важная часть в процессе потребления Сообщения, и это очень важно Вы можете думать, что ta является носителем сообщения на стороне клиента. Поскольку это имеет мало общего со временем загрузки, здесь это повторяться не будет. (Автор подчеркивает, что это мало связано с таймингом загрузки, а напрямую связано с балансировкой нагрузки)
Именно doRebalance() отвечает за балансировку нагрузки, на самом деле реальная логика нагрузки это rebalanceByTopic();
RebalanceByTopic()
rebalanceByTopic() является конечным пунктом назначения балансировки нагрузки, то есть все вызовы в системе, требующие загрузки, в конечном итоге будут поступать сюда.
Здесь автор анализирует только реализацию в режиме потребления кластера
Выдержка из кода ключа:
private void rebalanceByTopic(String topic, boolean isOrder) {
/* 获取到该Topic下的所有队列 */
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
/* 获取该Topic下ConsumerGroup此消费组所有的消费者Id */
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<>(mqSet);
/* 这两个排序极其关键 */
Collections.sort(mqAll);
Collections.sort(cidAll);
/* 负载均衡算法 */
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult;
/* 调用具体的算法实现进行队列分配 */
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll
);
}
}
Это кажется преуменьшением, но две строки кода действительно важны:
Collections.sort(mqAll);
Collections.sort(cidAll);
Смысл этих двух предложений в том, что порядок в очереди и порядок идентификаторов потребителей, полученные всеми потребителями в ConsumerGroup, согласованы. Исходя из того, что непротиворечивость представления распределения гарантируется, алгоритм выделения тот же, так что, хотя потребители не обмениваются какой-либо информацией во время балансировки нагрузки, они могут распределять очереди упорядоченным образом, не мешая друг другу.
Конкретные сведения о том, как распределять, находятся в allocateMessageQueueStrategy. RocketMQ также по умолчанию поддерживает множественные алгоритмы распределения, что относительно просто, и я не хочу вдаваться в подробности.
После завершения реализации балансировки нагрузки, кто будет вызывать ta, как звонить ta и когда ta будет вызываться, каждый вопрос не дает нам покоя.До анализа вышеперечисленных проблем мы не можем обойти объект RebalanceService.
RebalanceService
В RocketMQ есть класс объектов, статус которых является отсоединенным и уникальным.Объект доминирует над полем, и они часто ограничены только операционной системой. Похоже, операционная система их тоже предпочитает, потому что им будут выделены временные отрезки и запланирован запуск напрямую (на самом деле причина очень проста, ответ будет позже). ta — это легендарный ServiceThread, и люди в реках и озерах часто называют их сервисными потоками.
public abstract class ServiceThread implements Runnable {
protected boolean isDaemon = false;
/* 这能不被钟爱吗,直接持有一个独立线程 */
private Thread thread;
/* 执行start的时候申请一个线程 */
public void start() {
/* 只允许申请一次 */
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
/* 设置为非守护线程 */
this.thread.setDaemon(isDaemon);
this.thread.start();
}
}
Семейство ServiceThread благополучно, помимо RebalanceService, отвечающего за балансировку нагрузки, есть еще много братьев и сестер:
FlushRealTimeService: поток службы асинхронной очистки
CommitRealTimeService: поток службы асинхронной кисти
GroupCommitService: поток службы синхронной кисти
......
Каждый из них является героем стабильной работы Rocket.Три вышеперечисленных на самом деле являются служебными потоками, связанными с очисткой диска.
(Это слишком широко, если кто-то хочет это увидеть, я попытаюсь проанализировать это)
С приведенным выше предзнаменованием следует хорошо понимать RebalanceService.Он держит один поток для балансировки нагрузки.Конечно, ta не выполняет бесконечную обработку нагрузки.
public class RebalanceService extends ServiceThread {
/* 负载均衡时间间隔,默认20s,支持配置 */
private final static long waitInterval =Long.parseLong(
System.getProperty("rocketmq.client.rebalance.waitInterval", "20000")
);
private final MQClientInstance mqClientFactory;
public void run() {
/* 只要该线程未终止就会一直执行 */
while (!this.isStopped()) {
/* 喜欢才会放肆,但爱是克制,休息20s */
this.waitForRunning(waitInterval);
/* 执行负载均衡 */
this.mqClientFactory.doRebalance();
}
}
}
задача на время
理解了上面的RebalanceService,应该就理解了定时触发的逻辑,只需要定时唤醒服务线程即可
每个Java服务单点只会启动一个RebalanceService服务实例,同时也只会启动一个mqClientFactory实例
单点内所有的Consumer实例都会共用该实例对象。
每次定时触发mqClientFactory.doRebalance()都会对该JVM下持有的所有Consumer进行负载均衡
/**
* RebalanceService 线程默认每隔20s调用该方法
* ⚠️:每个 Java 服务单点只会启动一个 MQClientInstance 实例,单点内所有的 Consumer 实例都会持有该实例对象
* @see #registerConsumer Consumer 对象会将自己注册进 MQClientInstance
* @see #consumerTable Consumer对象注册表
*
* ⚠️:一个 Java 服务单点只有一个 RebalanceService 服务线程
* ⚠️:但每个 Consumer 实例都持有一个 RebalanceImpl 对象
*/
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
Активный триггер
写这篇记录之时我还在思考,为什么刚刚那个定时任务不是交给JDK中的ScheduledExecutorService
事实上,RocketMQ中的很多定时任务也都是这么做的。
可是直到刚刚我才明白,因为RebalanceService支持主动唤醒,提前执行任务。
Consumer上线时候触发主动负载均衡就是因为唤醒了RebalanceService线程,
start()最后会调用rebalanceImmediately()
public void rebalanceImmediately() {
this.rebalanceService.wakeup();
}
Уведомление брокера
Брокер отправляет уведомление, чтобы проинструктировать Потребителя о необходимости балансировки нагрузки, что, очевидно, намного сложнее, но то же самое верно Это не более чем несколько вызовов Rpc и не более чем передача по сети.
每当DefaultMQPushConsumerImpl实例,调用start之后,总是会向Broker发送一个心跳
调用栈如下:
DefaultMQPushConsumerImpl.start()
-> MQClientInstance.sendHeartbeatToAllBrokerWithLock()
-> MQClientInstance.sendHeartbeatToAllBroker()
-> MQClientAPIImpl.sendHearbeat()
Сразу после запуска Потребителя отправляется пакет пульса для информирования Брокера.
public int sendHearbeat(
String addr, HeartbeatData heartbeatData, long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
/* 又是一次 Rpc 远程调用 */
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
RemotingCommand response = this.remotingClient.invokeSync(
addr, request, timeoutMillis
);
}
Согласно RequestCode.HEART_BEAT известно, что процессор этого Rpc — ClientManageProcessor. ClientManageProcessor.heartBeat() -> ConsumerManager.registerConsumer()
Извлекается только код ключа:
public boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable
) {
/* 有新的Consumer上线则有新的SocketChannel建立 */
boolean r1 = consumerGroupInfo.updateChannel(
clientChannelInfo, consumeType, messageModel, consumeFromWhere
);
/* 判断订阅信息是否发生变化 */
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
/* 触发ConsumerGroupEvent.CHANGE事件 */
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
}
Вызов переходит к DefaultConsumerIdsChangeListener.handle().
Вы можете видеть, что если это событие CHANGE, вызывается Broker2Client.notifyConsumerIdsChanged().
public void notifyConsumerIdsChanged(Channel channel, String consumerGroup) {
/* 构造Rpc请求头 */
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
/* 构造Rpc请求对象 */
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader
);
/* 又是一次RPC */
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
}
Этот запрос RPC в конечном итоге будет передан ClientRemotingProcessor.notifyConsumerIdsChanged() для обработки.
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
RemotingCommand request) {
NotifyConsumerIdsChangedRequestHeader requestHeader =(NotifyConsumerIdsChangedRequestHeader) request
.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
/* 调用负载均衡逻辑 */
this.mqClientFactory.rebalanceImmediately();
}