Подробная информация о RocketMQ для разработчиков Java (дизайн отправки и высокой доступности)

RocketMQ

image

предисловие

Цель этой статьи:

1. Анализ принципа синхронной отправки и асинхронной отправки

2. Говоря об архитектуре RocketMQ

содержание

Что такое RocketMQ?

Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.

Apache RocketMQ— это распределенная платформа обмена сообщениями и потоковой передачи с малой задержкой, высокой производительностью и надежностью, емкостью в триллион масштабов и гибкой масштабируемостью.

官方core图

  • Это промежуточное программное обеспечение сообщений модели очереди с высокой производительностью, высокой надежностью, высокими характеристиками реального времени и распределенными характеристиками.
  • Очереди производителя и потребителя могут быть распределены.
  • Источник отправляет сообщения в некоторые очереди по очереди. Набор очередей называется темой. Если Потребитель выполняет широковещательное потребление, один экземпляр Потребителя потребляет все очереди, соответствующие этой теме. Если это кластерное потребление, несколько экземпляров Потребителя будут потреблять набор очередей, соответствующих данной теме в среднем.
  • Возможность гарантировать строгий порядок сообщений
  • Обеспечивает расширенный режим извлечения сообщений
  • Эффективная горизонтальная масштабируемость абонента
  • Механизм подписки на сообщения в реальном времени
  • Возможность накопления сообщений на уровне миллиардов
  • менее зависимый

Что хорошего в RocketMQ

Высокая доступность и высокая производительность

Принципиальный анализ и знакомство с API (отправить)

DefaultMQProducer producer = new DefaultMQProducer("SYNC_PRODUCER_GROUP");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 只需要在发送前初始化一次
producer.start();
// 构建消息实体
Message msg = new Message("SYNC_MSG_TOPIC", "TagA", ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送同步消息
SendResult sendResult = producer.send(msg);

Какие важные вещи начал делать продюсер.старт?

блок-схема

start流程

пусковой механизм

проверка конфигурации checkConfig

主要校验producerGroup属性是否满足
  • changeInstanceNameToPID генерирует pid
  • getAndCreateMQClientInstance
创建MQClientlnstance实例

MQClientlnstance封装了RocketMQ网络处理API,是消息生产者( Producer)、消息消费者(Consumer)与NameServer、Broker打交道的网络通道
  • registerProducerЗарегистрировать производителя
将当前生产者加入到MQClientlnstance管理中,方便后续调用网络请求、进行心跳检测等
  • mQClientFactory.start() запускает различные службы через фабрику mQClient.
public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

Эта часть делает еще несколько вещей.

 org.apache.rocketmq.remoting.netty.NettyRemotingClient#start
 
 通过romoting (netty客户端的实现)去建立连接 (反正这块可以理解为通过这个操作,可以服务通信了)

Кроме того, мы понимаем большое сердце startScheduledTask.

private void startScheduledTask() {
    // 定时校验nameSrv 保证地址不为空来维持后续服务的可用性
    if (null == this.clientConfig.getNamesrvAddr()) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }
    // 定时刷新topic路由信息到客户端实例上
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    // 定时清理离线的broker 并发送心跳保活
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    // 定时获取所有消费进度
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    // 定时调整线程池
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}

Затем запустите службу сообщений по запросу

通过pullRequestQueue队列来维护拉取的消息

this.rebalanceService.start

内部定时轮询做负载均衡
  • sendHeartbeatToAllBrokerWithLock
给所有broker发送心跳并且加锁
  • registerShutDownHook
注册相关的shutDown钩子

резюме

Основные этапы процесса start()

  • адресация nameSrv
  • Запланированные задачи: обновление информации о маршрутизации, очистка брокеров, отправка пульсаций для поддержания активности.
  • Балансировка нагрузки: с помощью алгоритма балансировки нагрузки циклического перебора все очереди в теме опрашиваются для достижения балансировки нагрузки на стороне отправителя.

Отправить синхронно

Блок-схема синхронной отправки

同步发送流程Зеленые блоки - это основные шаги, которые в основном объясняются вокруг этих основных блоков. Я не буду публиковать здесь конкретный код, чтобы не слишком долго.

tryToFindTopicPublishInfo находит маршрутизацию сообщений

Прежде чем сообщение будет отправлено, необходимо сначала получить информацию о маршрутизации темы.Только после получения этой информации мы можем знать, что сообщение должно быть отправлено на конкретный узел брокера.

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 从本地缓存读取尝试获取
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 通过topic获取配置
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // 如果没有获取到配置,通过默认的topic去找路由配置信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

Действуйте следующим образом

imageФункция метода udateTopicRoutelnfoFromNameServer заключается в обновлении и обслуживании кэша маршрутизации производителем сообщений, который сравнивает информацию о маршруте с информацией о маршруте в локальном кэше, чтобы определить, нужно ли обновлять информацию о маршруте.

selectOneMessageQueue выберите очередь сообщений для отправки сообщения

На самом деле есть строчка кода перед этим тоже стоит обратить внимание.Этот блок отправляется синхронно.В самом mq есть настраиваемое количество повторов.По умолчанию х+1, а дальше повтор по мере необходимости по количеству раз отправлено. Если это не удается, продолжайте и войдите в цикл for.

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

Тогда давайте посмотрим, как таким образом выделяется очередь сообщений.

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //是否开启故障延时机制
    if (this.sendLatencyFaultEnable) {
        try {
            // 通过ThreadLocal保存上一次发送的消息队列下标
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            // 循环topic下所有的消息队列 确保所在Broker是正常的
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 判断当前消息队列是否可用
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

Во-первых, в процессе отправки сообщения метод выбора очереди сообщений может выполняться несколько раз, а lastBrokerName — это последний выбранный Брокер, которому не удалось отправить сообщение.

Когда выбор очереди сообщений выполняется в первый раз, lastBrokerName имеет значение null.В это время sendWhichQueue используется для автоматического увеличения и последующего получения значения по модулю количества очередей сообщений в текущей таблице маршрутизации и возврата MessageQueue в это местоположение (метод selectOneMessageQueue()).Если сообщение Если отправка снова не удалась, избегайте брокера, в котором находится последняя очередь сообщений, при выборе очереди сообщений в следующий раз, в противном случае велика вероятность повторной ошибки.

Алгоритм может успешно избежать неисправного Брокера в процессе отправки сообщения, но если Брокер доступен, т.к. очередь сообщений в алгоритме маршрутизации сортируется Брокером, если последним выбранным по алгоритму маршрутизации является первый Брокер который не работает. Если есть очередь, то в следующий раз будет выбрана вторая очередь неработающего брокера. Отправка сообщения, скорее всего, завершится ошибкой, что приведет к повторной попытке и ненужной потере производительности. Так есть ли способ отправить сообщение после сбой? После этого почему Брокер должен быть временно исключен из диапазона выбора очереди сообщений? Некоторые друзья могут спросить, почему маршрутная информация Брокера все еще включается в маршрутную информацию после того, как Брокер недоступен? На самом деле, это нетрудно объяснить: во-первых, NameServer обнаруживает брокера. Доступен ли он с задержкой, самый короткий - это интервал обнаружения сердцебиения (1Os);

Во-вторых, NameServer не будет отправлять сообщения производителю сообщений сразу же после обнаружения того, что Брокер не работает, но производитель сообщений будет обновлять информацию о маршрутизации каждые 30 секунд, поэтому производителю сообщений требуется 30 секунд, чтобы воспринимать последнюю информацию о маршрутизации Брокера как как можно скорее. Если может быть введен механизм, во время простоя брокера, если сообщение не может быть отправлено, брокер может быть временно исключен из диапазона выбора очереди сообщений.

updateFaultItem механизм задержки обновления ошибки

/**
 * 更新故障延迟
 *
 * @param brokerName
 * @param currentLatency
 * @param isolation
 */
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

/**
 * 计算不可用间隔时间
 *
 * @param currentLatency
 * @return
 */
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }

    return 0;
}

Если изоляция имеет значение true, используйте 30 с в качестве параметра метода calculateNotAvailableDuration;

Если изоляция ложна, используйте эту задержку отправки сообщения в качестве параметра метода calculateNotAvailableDuration, тогда функция calculateNotAvailableDuration состоит в том, чтобы вычислить продолжительность времени, в течение которого брокеру следует избегать из-за сбоя отправки этого сообщения, то есть, как долго будет Брокер не будет использоваться в следующий раз?Участвовать в загрузке очереди отправки сообщений.

Конкретный алгоритм: начните поиск с конца массива latencyMax, найдите первый индекс меньше, чем currentLatency, затем получите длительность, которую следует избегать, из массива notAvailableDuration и, наконец, вызовите updateFaultltem для LatencyFaultTolerance.

Использование и меры предосторожности

DefaultMQProducer producer = new DefaultMQProducer("SYNC_PRODUCER_GROUP");
    // 设置NameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 只需要在发送前初始化一次
    producer.start();
    for (int i = 0; i < 1; i++) {
        try {
            // 构建消息实体
            Message msg = new Message("SYNC_MSG_TOPIC", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送同步消息
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
producer.shutdown();
  • Во-первых, во многих случаях нашу систему даже не волнует результат sendResult [неудивительно, в конце концов, фреймворки — это пустые методы], не говорите мне, что она не выйдет из строя при нормальных обстоятельствах, что, если это не удается, будут ли некоторые операции, выполняемые локально, вызывать грязные данные.
  • Перед отправкой бизнес-система должна выполнять постоянные операции, потому что, когда происходит сбой самого MQ, потери трудно измерить.Зачем вам нужно сохраняться?Постоянство предназначено для устранения неполадок.

резюме

    1. Механизм загрузки очереди сообщений

    Прежде чем производитель отправит сообщение, он сначала читает из локальной таблицы маршрутизации, если нет, он получает информацию о маршрутизации от сервера имен, обновляет локальную таблицу информации, и производитель синхронизирует информацию о маршрутизации с сервера имен каждые 30 с.

    1. Механизм исключения отправки сообщения

Есть два способа отправить высокую доступность:механизм повторной попытки,Механизм предотвращения сбоев

механизм повторной попыткиЭто количество повторных попыток x + 1, когда отправка не удалась, чтобы гарантировать, что сообщение может быть отправлено успешно в максимально возможной степени.

предотвращение ошибокТо есть, если в процессе отправки сообщения будет обнаружена ошибка, то брокер будет добавлен к времени уклонения.В течение этого времени брокер не будет выбран для отправки сообщений, чтобы повысить вероятность успешной отправки сообщений.

Асинхронная отправка

блок-схема

RocketMQ异步回调机制

Механизм реализации

  • Используется для регистрации методов обратного вызова SendCallBack OnSuccess, OnException
  • MQClientAPIImpl.sendMessageAsync запускает обратный вызов
  • Вы можете настроить специальный пул потоков для обработки ответа или использовать publicExecutor.Когда пул потоков отклоняет задачу, обратный вызов будет выполняться в текущем потоке (defaultEventExecutorGroup).
private void executeInvokeCallback(final ResponseFuture responseFuture) {
    boolean runInThisThread = false;
    // 这块执行的时候,优先去获取可配置的公用线程池,如果有可用的就使用,没有就跑在当前线程中
    ExecutorService executor = this.getCallbackExecutor();
    if (executor != null) {
        try {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        responseFuture.executeInvokeCallback();
                    } catch (Throwable e) {
                        log.warn("execute callback in executor exception, and callback throw", e);
                    } finally {
                        responseFuture.release();
                    }
                }
            });
        } catch (Exception e) {
            runInThisThread = true;
            log.warn("execute callback in executor exception, maybe executor busy", e);
        }
    } else {
        runInThisThread = true;
    }

    if (runInThisThread) {
        try {
            responseFuture.executeInvokeCallback();
        } catch (Throwable e) {
            log.warn("executeInvokeCallback Exception", e);
        } finally {
            responseFuture.release();
        }
    }
}
  • Рекурсивно вызовите sendMessageAsync для повторной отправки сообщения.
 int tmp = curTimes.incrementAndGet();
        if (needRetry && tmp <= timesTotal) {
            String retryBrokerName = brokerName;//by default, it will send to the same broker
            if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
                MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
                retryBrokerName = mqChosen.getBrokerName();
            }
            String addr = instance.findBrokerAddressInPublish(retryBrokerName);
            log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
                    retryBrokerName);
            try {
                request.setOpaque(RemotingCommand.createNewRequestId());
                sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                        timesTotal, curTimes, context, producer);
            } catch (InterruptedException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                        context, false, producer);
            } catch (RemotingConnectException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                        context, true, producer);
            } catch (RemotingTooMuchRequestException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                        context, false, producer);
            } catch (RemotingException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                        context, true, producer);
            }
        } else {

            if (context != null) {
                context.setException(e);
                context.getProducer().executeSendMessageHookAfter(context);
            }

            try {
                sendCallback.onException(e);
            } catch (Exception ignored) {
            }
        }

Здесь легко понять: если есть исключение в remotingClient.invokeAsync, повторять рекурсивно и избегать недоступных брокеров

  • Семантика тайм-аута асинхронной отправки немного отличается от синхронной отправки.

  • Куда возвращается синхронизация?

Текущий проектный пул потоков возвращается при отправке задачи, что в конце концов логично, но по сравнению со старой версией 4.2, которую возвращает NettyRemotingClient.invokeAsync, строго говоря, такой дизайн старой версии уже нельзя назвать асинхронным в строгом смысле. ;

  • Куда возвращается асинхронность?
    • Запрос возвращает NettyClientHandler
    • Ошибка отправки запроса ChannelFutureListener NettyRemotingAbstract#requestFail
    • Соединение прервано NettyConnectManageHandler.close() NettyRemotingClient.NettyConnectManageHandler
    • Время ожидания запроса истекло NettyRemotingAbstract#scanResponseTable

использование

producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        countDownLatch.countDown();
        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    }

    @Override
    public void onException(Throwable e) {
        countDownLatch.countDown();
        System.out.printf("%-10d Exception %s %n", index, e);
        e.printStackTrace();
    }
});

резюме

  • Появление асинхронности заключается в том, чтобы службы системы могли быстро реагировать, такие как вынос: заказ -> расчет -> доставка, нам нужно только синхронно звонить и отправлять отдельно, и заботиться о соответствующем обратном вызове;

Как отправитель сообщения, как сохранить сообщение от потери

Отправить не более одного раза Отправить хотя бы один раз Отправить ровно один раз

    1. Во-первых, отправитель выполняет операции сохранения, чтобы предотвратить сбои MQ, в результате чего сообщения не отправляются на сервер MQ, с сохранением удобно иметь дело с исправлением ошибок;
    1. Вызовите API отправки, нам нужно позаботиться о синхронном возврате результата;
    1. Рекомендуется использовать сообщения транзакций RocketMQ, чтобы обеспечить точную отправку сообщений и правильное потребление.

расширение повтора сообщения

    1. Управление потоком SYSTEM_BUSY не повторяет попытку, повторная попытка сообщения не всегда повторяется
    1. Чем больше попыток, тем лучше? Общее время ожидания = время ожидания * (x+1)
    1. Тайм-ауты могут происходить раньше сети, особенно с пользовательской логикой.
    1. Механизм задержки сбоя также можно использовать для сегментированного управления потоком и других сценариев.

Сообщение хранится локально

  • Повышение надежности сообщений
  • Используя встроенную базу данных KV BerkeleyDB, сообщения хранятся в локальных файлах.
  • Минимизируйте влияние на клиента: строго контролируйте количество и размер локального хранилища, предоставьте библиотеку асинхронной записи
  • Взаимодействие с пользователем: Предоставьте интерфейс правил, пользователи могут настраивать локальные правила и правила повторной доставки.
  • Нет гарантии, что сообщение не будет потеряно, но пользователь будет уведомлен о потере сообщения и причине потери.

Дизайнерское мышление и расширение высокой доступности

Схема архитектуры системы RocketMQ

image

  • Производитель обращается к серверу имен, чтобы найти брокера, и отправляет сообщение брокеру.
  • Потребитель переходит к серверу имен, чтобы найти брокера для получения или получения сообщений от брокера.
  • При запуске посредника регистрируется основная информация об очередях чтения и записи, а информация о маршрутизации регулярно синхронизируется.

Развертывание с несколькими мастерами для предотвращения единой точки отказа

Как namesrv управляет брокерами

  • Брокер начинает регистрировать информацию с nameServer org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
    TopicConfigSerializeWrapper topicConfigWrapper) {
    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.getHAServerAddr(),
        topicConfigWrapper,
        this.filterServerManager.buildNewFilterServerList(),
        oneway,
        this.brokerConfig.getRegisterBrokerTimeoutMills(),
        this.brokerConfig.isCompressedRegister());
}

NameServer превращает данные, выбрасываемые брокером, в набор собственного обслуживания.

  • Какие наборы брокеров есть в каждом кластере
  • Какие брокеры включены в каждый набор брокеров, brokerId и ip:port брокера
  • Выживание каждого брокера, согласно информации, предоставленной каждым брокером, очистить брокеров, которые могут быть в автономном режиме.
  • Информация об очереди сообщений для каждой темы, несколько очередей чтения, несколько очередей записи

Когда производитель отправляет сообщение, он знает, какому брокеру оно отправляется (почему мастер)

Избыточность сообщений (структура ведущий-ведомый) для предотвращения потери сообщений

RocketMQ реализует избыточность сообщений через структуру ведущий-ведомый.Мастер получает сообщение, отправленное от производителя, а затем синхронизирует сообщение с подчиненным.В соответствии с ролью мастера, время синхронизации можно разделить на две разные ситуации:

  • SYNC_MASTER: Если мастер находится в этой роли, каждый раз, когда мастер записывает сообщение, отправленное производителем, в память (диск), он будет ждать, пока мастер синхронно передаст сообщение подчиненному.
  • ASYNC_MASTER: сообщения в этой роли будут асинхронно копироваться в ведомое устройство.

ASYNC_MASTER синхронизировать данные с ведомым

  • Ведомый подключается к ведущему и сообщает мастеру о текущем смещении ведомого.
  • После того, как мастер получает его, он подтверждает начальную позицию отправки данных слейву.
  • MappedFIle, соответствующий начальной позиции главного запроса.
  • Мастер отправляет найденные данные ведомому
  • После того, как ведомое устройство получает данные, оно сохраняет их в свой собственный CommitLog.

SYNC_MASTER синхронизирует данные с ведомым

Процесс передачи данных SYNC_MASTER и ASYNC_MASTER в ведомую одинаков, но время отличается. Когда SYNC_MASTER получает сообщение, отправленное производителем, он будет ожидать синхронной передачи сообщения подчиненному устройству.

  • Мастер формирует данные, которые необходимо передать подчиненному устройству, в виде запроса GroupCommitRequest и передает их службе GroupTransferService.
  • Разбудите поток, который передает данные (HAClient.run будет ждать новых сообщений, если больше нет данных для передачи)
  • Дождитесь завершения текущего запроса на передачу

надежность

  • Все сообщения, отправляемые брокеру, имеют синхронный и асинхронный механизмы сброса, в целом надежность очень высокая.
  • Когда диск сбрасывается синхронно, сообщение будет успешно возвращено только в физический файл, поэтому оно очень надежно.
  • Когда диск обновляется асинхронно, потеря сообщений происходит только тогда, когда компьютер не работает.Может произойти простой брокера, но простои компьютера и сбои случаются редко, если только не происходит внезапный сбой питания.

Расширение [Как RocketMQ поддерживает сотни тысяч параллелизма в секунду]

Технология кэширования страниц + последовательная запись на диск

Стартовая архитектура mq записывает напрямую на диск

写入模型一Интуитивно понятно, что производительность операций ввода-вывода относительно низкая.

Обнаружено, что на уровне операционной системы существует кэш страниц [или кэш ОС],

写入模型二На основе теста среды Linux версии ECS для бедных, без записи в кеш и кеша无cache写入cache写入

Для операции записи 819M операция кеша занимает всего 7 секунд, а операция без кеша занимает около 80 секунд, не говоря уже о том, что это конфигурация плохой машины, лучшая машина должна быть в сто раз другой;

Итак, если в соответствии с первой моделью записи, если одна запись занимает 1 мс => 1000/с, и используется кэш ОС, предполагается, что одна запись занимает 0,01 мс => 10 Вт/с, Таким образом, сотни тысяч в секунду поддерживаются.

кеш страницУзнать о ссылках

технология нулевого копирования

В первую очередь решается проблема с записью, так как читают потребители, по нашему опыту, это должен быть кеш, тогда давай

读取模型一

Источник данных: кэш чтения диска (кеш ОС) копия кеша -> кеш процесса -> копия кеша -> SOCKET

На самом деле производительность уже хорошая, но есть еще две ненужные копии, так что если нет двух шагов копирования, это непобедимо? Ответ: Да

Таким образом, внедрение технологии нулевого копирования

读取模型二Нетрудно понять, что кеш сокета предназначен только для копирования дескриптора данных, а затем данные отправляются непосредственно из кеша ОС на сетевую карту.Этот процесс значительно повышает производительность чтения файловых данных во время потребления данных.

Эпилог

Увидимся в следующий раз, если это возможно