Учебная комната Cabbage Java охватывает основные знания
Spring Cloud Alibaba Actual Combat (1) Подготовка
Spring Cloud Alibaba Actual Combat (2) Nacos
Spring Cloud Alibaba Actual Combat (3) Sentinel
Весеннее облако Alibaba бой (4) Oauth2
Spring Cloud Alibaba Actual Combat (5) Zuul
Реальные бои Spring Cloud Alibaba (6) Статьи RocketMQ
Spring Cloud Alibaba Actual Combat (7) Seata
Spring Cloud Alibaba Actual Combat (8) SkyWalking
Адрес проекта на GitHub:GitHub.com/D2C-CAI/Хайер…
1. Введение в RocketMQ
RocketMQ — промежуточное ПО для распределенного обмена сообщениями с открытым исходным кодом от Alibaba. Поддержка сообщений о транзакциях, последовательных сообщений, пакетных сообщений, сообщений о времени, обратного отслеживания сообщений и т. д. В нем есть несколько понятий, отличных от стандартного промежуточного программного обеспечения сообщений, таких как группа, тема, очередь и т. д. Система состоит из производителя, потребителя, брокера, сервера имен и т. д.
Возможности RocketMQ
- Поддержка модели публикации/подписки (PUB/SUB) и двухточечной (P2P) передачи сообщений;
- Благодаря надежному принципу «первым поступил — первым обслужен» (FIFO) и строгой доставке заказов в очередь RocketMQ может гарантировать строгий порядок сообщений, в то время как ActiveMQ не может;
- Поддерживает два режима сообщений: Pull и Push; Push легко понять, например, установить обратный вызов Listener на стороне потребителя; и Pull, управление лежит в приложении, то есть приложение должно активно вызывать метод сообщения pull для получать сообщения от брокера. Существует проблема использования записей о местоположении (если не записать, это приведет к повторному использованию сообщений);
- Возможность накапливать миллионы сообщений в одной очереди RocketMQ дает возможность накапливать сотни миллионов сообщений Суть не в этом, а в сохранении низкой задержки записи после накопления сотен миллионов сообщений;
- Поддержка различных протоколов сообщений, таких как JMS, MQTT и т. д.;
- Распределенная архитектура развертывания с высокой доступностью удовлетворяет по крайней мере одной семантике доставки сообщений; RocketMQ изначально поддерживает распределенную, а ActiveMQ изначально имеет единую точку;
- Предоставлять образы докеров для изолированного тестирования и развертывания облачного кластера;
- Предоставляет многофункциональную панель инструментов для настройки, показателей и мониторинга.
Broker
Брокер на самом деле является сервером RocketMQ, отвечающим за хранение и пересылку сообщений. Брокер отвечает за получение и хранение сообщений, отправленных производителями в системе RocketMQ, и за подготовку запросов на вытягивание от потребителей. Брокеры также хранят метаданные, связанные с сообщениями, включая группы потребителей, смещения хода выполнения, а также темы и сообщения очередей.
Broker Server — это реальное бизнес-ядро RocketMQ, включающее в себя несколько важных подмодулей:
- модуль маршрутизации: Сущность всего Брокера, отвечающая за обработку запросов от клиентов.
- Управление клиентами: отвечает за управление клиентом (производитель/потребитель) и поддержание информации о подписке на тему потребителя.
- служба хранения: Предоставляет удобный и простой интерфейс API для обработки хранения сообщений на физическом жестком диске и функций запросов.
- Служба высокой доступности: Служба высокой доступности, обеспечивающая синхронизацию данных между главным брокером и подчиненным брокером.
- служба индексации сообщений: индексирует сообщения, доставленные брокеру, в соответствии с определенным ключом сообщения, чтобы обеспечить быстрый запрос сообщений.
NameServer
NameServer — это очень простой реестр маршрутизации темы, его роль аналогична zookeeper в Dubbo, и он поддерживает динамическую регистрацию и обнаружение брокеров.
В основном включает две функции:
- Брокерское управление: NameServer принимает регистрационную информацию кластера брокера и сохраняет ее в качестве основных данных маршрутной информации. Затем предоставьте механизм обнаружения сердцебиения, чтобы проверить, жив ли брокер;
- Управление маршрутной информацией: Предоставление услуг Производителю и Потребителю для получения списка Брокеров. Каждый NameServer будет содержать всю информацию о маршрутизации кластера Broker и информацию об очереди для клиентских запросов. Затем производитель и потребитель могут получить информацию о маршрутизации всего кластера брокера через сервер имен, чтобы доставлять и потреблять сообщения.
2. Используйте Docker для быстрой сборки RocketMQ 4.4.
Rocketmq необходимо развернуть брокера и сервер имен.Учитывая, что развернуть отдельно хлопотно, здесь будет использоваться docker-compose. Кроме того, необходимо построить консоль веб-визуализации, которая может отслеживать статус службы mq и потребление сообщений, здесь используется RocketMQ-консоль, и программа также будет установлена с помощью docker.
- Выберите и создайте каталог на сервере Linux;
mkdir rocketmq-docker
- Войдите в каталог RocketMQ-Docker и создайте файл конфигурации с именем Broker.conf со следующим содержимым:
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
# broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
# 0表示Master,大于0表示不同的slave
brokerId = 0
# 表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
# 在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
# 有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
# 刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址
# brokerIP1 = 192.168.138.131
Уведомление: Обязательно установите здесь brokerIP1, иначе он станет внутренним IP-адресом контейнера докеров по умолчанию, что приведет к сбою внешней сети.
- Находясь в каталоге RocketMQ-Docker, создайте файл сценария с именем RocketMQ.yaml.;
Содержимое Rocketmq.yaml выглядит следующим образом:
version: '2'
services:
namesrv:
image: rocketmqinc/rocketmq
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- /docker/rocketmq/data/namesrv/logs:/home/rocketmq/logs
- /docker/rocketmq/data/namesrv/store:/home/rocketmq/store
command: sh mqnamesrv
broker:
image: rocketmqinc/rocketmq
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- /docker/rocketmq/data/broker/logs:/home/rocketmq/logs
- /docker/rocketmq/data/broker/store:/home/rocketmq/store
- /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
command: sh mqbroker -n namesrv:9876 -c /opt/rocketmq-4.4.0/conf/broker.conf
depends_on:
- namesrv
environment:
- JAVA_HOME=/usr/lib/jvm/jre
console:
image: styletang/rocketmq-console-ng
container_name: rocketmq-console-ng
ports:
- 8087:8080
depends_on:
- namesrv
environment:
- JAVA_OPTS= -Dlogging.level.root=info -Drocketmq.namesrv.addr=rmqnamesrv:9876
- Dcom.rocketmq.sendMessageWithVIPChannel=false
- Откройте порт брандмауэра, используемый брокером, чтобы облегчить последующее использование.:
firewall-cmd --zone=public --add-port=10909-10912/tcp --permanent
- Выполните сценарий sentinel-dashboard.yaml, чтобы запустить контейнер.:
docker-compose -f rocketmq.yaml up
- Зайдя в консоль Rocketmq, мы найдем похожий граф (конечно, вначале он должен быть пустым):
http://(安装RocketMQ机器的IP):8087
- Мы выбираем столбец «кластер», и когда мы входим, мы видим внешний сетевой IP-адрес установленного нами брокера.;
На данный момент мы теоретически завершили развертывание сервера RocketMQ, и теперь мы можем использовать клиент в проекте Spring.
3. Внедрите клиент RocketMQ в проект Spring.
- Добавить зависимости файла pom:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
- Добавьте конфигурацию в application.yml:
server:
port: 10801
spring:
application:
name: (项目名称)-service
rocketmq:
name-server: (安装RocketMQ机器的IP):9876
producer:
group: (项目名称)-group
- Создайте сообщение для отправки класса messageproducer в качестве производителя сообщения:
@Service
public class MessageProducer implements CommandLineRunner {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public void run(String... args) throws Exception {
rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
}
}
- Создайте новый класс приема сообщений MessageListener в качестве потребителя сообщений.:
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class MessageListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("received message: {}", message);
}
}
- Создайте новый класс контроллера вызовов для MessageProducer.:
@RestController
@RequestMapping
public class HelloController {
@Resource
private MessageProducer messageProducer;
@RequestMapping("/message")
public void message() throws Exception {
messageProducer.run("");
}
- Запускаем проект Spring, давайте протестируем простейшую отправку сообщений:
GET http://localhost:10801/message
Accept: */*
Cache-Control: no-cache
Мы также можем примерно увидеть потребление сообщений в консоли управления RocketMQ:
Фактический бой не объяснил здесь некоторые концепции, и все могут быть немного сбиты с толку.
Тема (topic) эквивалентна типу сообщения, например, можно указать Topic1 как бизнес по распространению, Topic2 как бизнес по купонам, а Group (группа) эквивалентна группе производителей и потребителей, тогда наш микро Сервис может быть как производителем, так и потребителем, например Group1 может быть настроена как микросервис для товаров, а Group2 — это микросервис для заказов, конечно, необходимо различать, является ли это группой производителей или группой потребителей. .
- Group: делится на ProducerGroup и ConsumerGroup, которые представляют определенный тип производителей и потребителей.Вообще говоря, одна и та же служба может использоваться как группа, и одна и та же группа обычно отправляет и потребляет одни и те же сообщения.
- Topic: Тема сообщения, тип сообщения первого уровня, производитель отправляет ему сообщения, а потребитель читает его сообщения.
- Queue: Разделены на две очереди, в целом сказали, что количество очередей чтения согласуется, и будет много проблем, если оно несовместимо.
Тема разделена на несколько очередей, которые на самом деле являются наименьшей единицей для отправки/чтения каналов сообщений.. Когда мы отправляем сообщения, нам нужно указать определенную Очередь для записи, а когда мы извлекаем сообщения, нам также нужно указать определенную Очередь для извлечения, чтобы наши последовательные сообщения могли поддерживать порядок в очереди на основе нашего измерения Очереди. затем вам нужно установить размер очереди на 1, чтобы все данные были упорядочены в очереди.
Spring Cloud Alibaba Actual Combat (1) Подготовка
Spring Cloud Alibaba Actual Combat (2) Nacos
Spring Cloud Alibaba Actual Combat (3) Sentinel
Весеннее облако Alibaba бой (4) Oauth2
Spring Cloud Alibaba Actual Combat (5) Zuul
Реальные бои Spring Cloud Alibaba (6) Статьи RocketMQ
Spring Cloud Alibaba Actual Combat (7) Seata
Spring Cloud Alibaba Actual Combat (8) SkyWalking
Адрес проекта на GitHub:GitHub.com/D2C-CAI/Хайер…