Передовой путь Java-инженеров RocketMQ (1)

Java RocketMQ
Передовой путь Java-инженеров RocketMQ (1)

Учебная комната Cabbage Java охватывает основные знания

Передовой путь Java-инженеров RocketMQ (1)
Передовой путь для инженеров Java RocketMQ (2)

1. Введение в RocketMQ

RocketMQРанее называвшийся MetaQ, он был переименован в RocketMQ, когда MeataQ выпустил версию 3.0.Его идея дизайна похожа на Kafka по сути, но отличается от Kafka тем, что использует Java для разработки.Поскольку отечественная аудитория Java намного больше, чем Scala , Таким образом, RocketMQ является первым выбором многих компаний, использующих язык Java. Те же RocketMQ и Kafka оба являются проектами верхнего уровня в Apache Foundation, их сообщества очень активны, и итерация обновления проекта проходит очень быстро.

2. Схема архитектуры RocketMQ

Для схемы архитектуры RocketMQ в целом нет особых отличий от Kafka, но есть много отличий во многих деталях, которые будут описаны далее по порядку.

3. Глоссарий RocketMQ

В диаграмме архитектуры RocketMQ есть несколько производителей, множественные главные брокеры и несколько подчиненных брокеров. Каждый производитель может соответствовать нескольким темам, и каждый потребитель также может потреблять несколько тем.

Информация о посреднике будет передана серверу имен, а потребитель получит информацию о посреднике и теме с сервера имен.

  • Producer: производитель сообщений, клиент, который отправляет сообщения брокеру.

  • Consumer: потребитель сообщений, клиент, который читает сообщения от брокера.

  • Broker: Узел обработки в середине сообщения. В отличие от kafka, брокер kafka не имеет понятия «ведущий» и «ведомый» и может писать запросы и резервные данные других узлов. RocketMQ может писать только главный узел-брокер, и вообще чтение через главный узел.Когда главный узел имеет сбой или какие-то другие особые обстоятельства, для чтения будет использоваться подчиненный узел, что несколько похоже на архитектуру ведущий-подчиненный в mysql.

  • Topic: Тема сообщения, тип сообщения первого уровня, производитель отправляет ему сообщения, а потребитель читает его сообщения.

  • Group: Разделяется на ProducerGroup, ConsumerGroup, которые представляют определенный тип производителей и потребителей.Вообще говоря, одна и та же служба может использоваться как группа, и одна и та же группа обычно отправляет и потребляет одни и те же сообщения.

  • Tag: Нет такой концепции в Кафке. Тег - это вторичный тип сообщения. Вообще говоря, связанные с бизнесом могут использовать тот же тег, такой как очередь сообщений, используя TEAT_ORDER, тег можно разделить на заказ Tag_Food, порядок Tag_Clothing и т. Д. Ждать.

  • Queue: В кафке это называется Partition.Каждая очередь упорядочена внутри.В RocketMQ она делится на две очереди:чтение и запись.Вообще говоря количество очередей чтения и записи одинаковое.Если они несовместимы,будут много проблем.

  • NameServer: ZooKeeper используется в Kafka для сохранения адресной информации Брокера и избрания Лидера Брокера. В RocketMQ стратегия выбора Брокера не принята, поэтому для хранения используется NameServer без сохранения состояния. Поскольку NameServer не имеет состояния, узлы кластера Между ними нет связи, поэтому при загрузке данных их необходимо отправить на все узлы.

Многие друзья спрашивают, что такое безгражданство? Существование состояния на самом деле зависит от того, будут ли данные храниться. Если есть состояние, данные будут сохраняться. Службу без сохранения состояния можно понимать как службу памяти. Сам NameServer также является службой памяти. Все данные хранятся в памяти. После перезагрузки пропадет.

4. Тема и очередь RocketMQ

в RocketMQКаждое сообщение имеет тему, которая используется для различения разных сообщений.. У темы обычно есть несколько подписчиков сообщений.Когда производитель публикует сообщение в теме, все потребители, подписавшиеся на тему, могут получать новые сообщения, написанные производителем.

Разделено по темеМножественные очереди, которые на самом деле являются наименьшей единицей, которую мы отправляем/читаем в каналах сообщений., нам нужно указать определенную очередь для записи в сообщение при отправке сообщения, а также указать определенную очередь для извлечения при извлечении сообщения, чтобы наши последовательные сообщения могли поддерживать порядок в очереди на основе нашего измерения очереди. мы хотим добиться глобального упорядочения, тогда вам нужно установить размер очереди равным 1, чтобы все данные были упорядочены в очереди.

На приведенном выше рисунке наш продюсер выберет очередь с помощью некоторых стратегий:

  • непоследовательные сообщения: Непоследовательные сообщения обычно отправляются напрямую путем опроса.

  • последовательное сообщение: В соответствии с определенным ключом, таким как наш общий идентификатор заказа, идентификатор пользователя, выполните хеширование и поместите данные одного типа в одну и ту же очередь для обеспечения нашего заказа.

Наша же группа потребителей также выберет очередь в соответствии с некоторыми стратегиями, такими как среднее распределение или последовательное распределение хэша. Следует отметить, что когда Потребитель находится офлайн или онлайн, здесь нужно делать ребалансировку, то есть Rebalance.Механизм перебалансировки RocketMQ выглядит следующим образом.:

  1. Регулярно получайте самую свежую информацию о брокере и теме
  2. Ребаланс каждые 20 секунд
  3. Случайным образом выбрать главного брокера текущей темы.Здесь следует отметить, что все основные брокеры будут выбираться каждый раз при ребалансировке, потому что будет один брокер и несколько брокеров.
  4. Получите все текущие машины ID брокера, текущая потребительская группа.
  5. Затем выполните назначение политики.

Так как перебалансировка производится периодически, здесьВозможно, что очередь потребляется двумя потребителями одновременно, поэтому будет повторная доставка сообщений..

Механизм перебалансировки Kafka отличается от RocketMQ. Перебалансировка Kafka осуществляется через контакт между Потребителем и Координатором. Когда Координатор воспринимает изменение группы потребителей, он отправляет сигнал перебалансировки во время процесса сердцебиения, а затем Лидер-потребитель выполнит ребалансировку. Выберите, а затем координатор уведомит всех потребителей о результате.

Что делать, если количество операций чтения и записи в очереди несовместимо?

В RocketMQ Queue делится на два типа: чтение и запись.Когда я впервые связался с RocketMQ, я всегда думал, что не будет проблем с несогласованной настройкой количества очередей чтения и записи.Например, когда много Consumer машин, мы настраиваем множество очередей чтения, однако в реальном процессе обнаруживается, что сообщение не может быть потреблено и потребление сообщений отсутствует вообще.

  1. Когда количество очередей записи превышает количество очередей чтения, данные очередей записи с идентификаторами, превышающими эту часть очереди чтения, не могут быть использованы, поскольку они не будут выделены потребителям.
  2. Когда количество очередей чтения больше, чем количество очередей записи, никакие сообщения не будут доставлены для такого количества очередей.

5. Вводный пример RocketMQ

5.1. Производитель RocketMQ

Определите производителя напрямую, создайте сообщение и вызовите метод отправки.

public class Producer {

    public static void main(String[] args)
            throws MQClientException,
            InterruptedException {
        DefaultMQProducer producer = new
                DefaultMQProducer("ProducerGroupName");
        producer.start();
        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes
                                    (RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
    
}

5.2 Потребитель RocketMQ

public class PushConsumer {

    public static void main(String[] args)
            throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere
                (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener
                (new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus
                    consumeMessage(List<MessageExt> msgs,
                                   ConsumeConcurrentlyContext context) {
                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
    
}

Передовой путь Java-инженеров RocketMQ (1)
Передовой путь для инженеров Java RocketMQ (2)