в предыдущем«Простая в освоении очередь сообщений»В этой статье мы узнали о роли, преимуществах и недостатках, а также сценариях использования очередей сообщений.Я думаю, вы уже имеете общее понятие об очередях сообщений.В конце статьи мы закопали для себя дыру и сказали, что мы в будущем напишу практический туториал, как раз сейчас стажировка закончилась. , Может я давно не писал практический туториал, так что я здесь, чтобы заполнить дыру.
Предварительное знание
Перед чтением этой статьи рекомендуется иметь некоторые предварительные знания, включая, помимо прочего:
- Общие команды Linux
- Знание очередей сообщений
- Основное использование Docker
- Основы docker-compose
- Основное использование SpringBoot
Без дальнейших церемоний, давайте начнем.
Код и файлы конфигурации, используемые в этой статье, можно получить, ответив на «rocketmq» в фоновом режиме общедоступной учетной записи WeChat «01 Binary».
Зачем использовать RocketMQ в качестве примера?
Эта статья предназначена в основном для того, чтобы интуитивно понять очереди сообщений на примерах. Затем возникает проблема, так много очередей сообщений (ActiveMQ, RabbitMQ, Kafka),яПочему стоит выбрать RocketMQ? Здесь мы не говорим о принципе, просто говорим об опыте,Это просто личный выбор, не распыляйте, если вам это не нравится.
- Полагаясь на Али, не глядя на оценку, известно, что его производительность находится в первой партии только из-за того, что он прошел множество проверок дважды-одиннадцать.
- Как Java-программист, если вы выберете программное обеспечение, написанное на чистом Java, вам будет намного легче читать его исходный код позже. (Нижний слой RabbitMQ — Erlang, а нижний слой kafka — Scala)
- Я всегда пользовался внутренней версией RocketMQ во время стажировки на Али, для меня более привычна RocketMQ.
Знакомство с RocketMQ
Прежде чем использовать очередь сообщений, нам нужно знать, что такое очередь сообщений.«Простая в освоении очередь сообщений», которые здесь повторяться не будут.
В этом разделе объясняются связанные концепции, связанные с RocketMQ.Давайте кратко рассмотрим официальную схему архитектуры RocketMQ.Из приведенного выше рисунка интуитивно видно, что полная архитектура RocketMQ состоит из четырех частей:NameServer, Брокер, Производитель и Потребитель.
- NameServer: в основном используется в качестве реестра для управления информацией о теме и информацией о маршрутизации.
- Брокер: отвечает за хранение, фильтрацию и пересылку тегов сообщений. Вам необходимо сообщить свою информацию в центр регистрации NameServer
- Продюсер: Продюсер
- Потребитель: потребитель
Понимание с точки зрения отправки письма
Приведенное выше объяснение может быть трудным для понимания, давайте рассмотрим ответственность следующих четырех частей на примере отправки письма.
- Производитель и потребитель Излишне говорить, что производитель и потребитель сообщения, производитель несет ответственность за доставку сообщения, потребитель отвечает за получение сообщения,приложение, которое мы собираемся написать. Может пониматься как отправитель и получатель.
- Брокер отвечает за хранение сообщений, принимая Тему в качестве измерения и сохраняя сообщения в виде очередей. Под ним можно понимать почтовый ящик, в котором специально хранятся письма, откуда получатель (Потребитель) может получать письма.
- NameServer отвечает за управление исходными данными, включая Topic и Broker. Под ним можно понимать почтовое отделение, отвечающее за управление распределением почты и поддержание статуса почтового ящика (Брокер).
Из функций вышеперечисленных частей нам нужно сначала установить и запустить NameServer, а затем запустить Broker для сборки RocketMQ.
Установить RocketMQ
Если вы уже настроили соответствующую среду RocketMQ на своем компьютере, вы можете пропустить эту главу.
Из приведенного выше введения мы можем знать, что перед созданием и потреблением сообщений нам необходимо установитьБрокеры и серверы имен.
Готов к работе
Для простоты развертывания я рекомендую использовать докер для создания сервисов. Кроме того, поскольку Rocketmq необходимо развернуть брокер и сервер имен отдельно, учитывая, что развернуть отдельно хлопотно, я буду использовать здесь docker-compose. Поэтому на вашем хосте должны быть установлены docker и docker-compose.
Кроме того, нам также необходимо построить консоль веб-визуализации для мониторинга состояния службы mq и потребления сообщений, здесь мы используем RocketMQ-консоль, и программа также будет установлена с помощью docker.
Если вы не знакомы с докером, вы можете сначала прочитать учебник по докеру из учебника для новичков 👉Учебник по докеру
Установить
Установить Докер
Линукс:
Выполните следующую команду
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
Мак:
Выполните следующую команду
brew cask install docker
Win:
Загрузите соответствующий установочный файл, а затем дважды щелкните, чтобы запустить установку. Адрес загрузки находится по адресу:Хубэй.docker.com/editions/co…
Учитывая, что для загрузки файла требуется научный доступ в Интернет, вы можете ответить на «docker» в фоновом режиме общедоступной учетной записи WeChat «01 binary», чтобы получить ссылку для загрузки установочного пакета Docker.
Если ваша система win10 может использовать winget, выполните следующую команду. (У Win наконец-то появился собственный инструмент управления пакетами 🙏)
winget install Docker.DockerDesktop
Иногда бывает сложно получить образы из DockerHub в Китае, но в это время вы можете настроить ускоритель образов. Учебное пособие по настройке может относиться к 👉Ускорение образа Docker
Установите образ RocketMQ
Мы можем сделать docker-образ ракеты mq самостоятельно, что подробно описано в официальной документации 👉apache/rocketmq-docker
Для удобства здесь мы напрямую используем зеркало, которое сделали другие, адрес зеркала 👉foxiswho/rocketmq
Создайте новый каталог для хранения связанных скриптов, а затем выполните следующие команды в терминале 👇
git clone https://github.com/foxiswho/docker-rocketmq.git
cd docker-rocketmq
cd rmq
chmod +x start.sh
./start.sh
Подождав некоторое время, заходим через браузерlocalhost:8180
Если вы видите следующую страницу, установка прошла успешно.
Парсинг скрипта установки
Установка в один клик с помощью скрипта действительно удобна, но если установка завершена, то все будет хорошо.В отношении обучения людей рыбной ловле давайте посмотрим, что находится в скрипте установки:
start.sh
Строки 4-7 создают каталог, а строки 10-13 устанавливают права доступа для только что созданного каталога, о причинах мы поговорим позже.
Мы видим, что строка 16 использует команду docker-compose для запуска контейнера и настроена на автоматический запуск в фоновом режиме, поэтому давайте взглянем на этот файл docker-compose.yml.
docker-compose.yml
version: "3.5"
services:
rmqnamesrv:
image: foxiswho/rocketmq:4.7.0
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./rmqs/logs:/opt/logs
- ./rmqs/store:/opt/store
environment:
JAVA_OPT_EXT: "-Duser.home=/opt -Xms512M -Xmx512M -Xmn128m"
command: ["sh", "mqnamesrv"]
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:4.7.0
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./rmq/logs:/opt/logs
- ./rmq/store:/opt/store
- ./rmq/brokerconf/broker.conf:/etc/rocketmq/broker.conf
environment:
JAVA_OPT_EXT: "-Duser.home=/opt -Xms512M -Xmx512M -Xmn128m"
command:
[
"sh",
"mqbroker",
"-c",
"/etc/rocketmq/broker.conf",
"-n",
"rmqnamesrv:9876",
"autoCreateTopicEnable=true",
]
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8180:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
Мы создали три службы, имена этих трех служб — rmqnamesrv, rmqbroker и rmqconsole, которые соответствуют серверу имен, брокеру и визуальной консоли, о которых мы упоминали ранее. И сделать разные сопоставления портов для разных сервисов, заодно смонтировать локально указанную файловую директорию в контейнер докера и подключиться к сети в виде бриджа.
кrmqnamesrv
Например, его базовое изображениеfoxiswho/rocketmq:4.7.0
, имя созданного контейнераrmqnamesrv
, и сопоставьте его внутренний порт 9876 с портом 9876 хоста и сопоставьте локальный./rmqs/logs
файл, смонтированный в докер-контейнере/opt/logs
в каталоге.
rmqnamesrv:
image: foxiswho/rocketmq:4.7.0
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./rmqs/logs:/opt/logs
- ./rmqs/store:/opt/store
Если вы не знакомы с docker-compose, вы можете обратиться к соответствующим учебным пособиям, чтобы узнать 👉Docker Compose
SpringBoot интегрирует небольшой экземпляр RocketMQ
После завершения относительно сложной установки и настройки мы, наконец, можем реализовать небольшую демонстрацию, чтобы пройти весь процесс.
Создавайте темы сообщений и группы подписки
При использовании RocketMQ для отправки сообщенийтема должна быть указана, есть переключатель для настройки темыautoCreateTopicEnable
, обычно втест на разработкуСреда будет использовать настройки по умолчаниюautoCreateTopicEnable = true
, но это приведет к тому, что настройка темы непроста для стандартизации управления, нет единого аудита и т. д., поэтому в формальной среде параметры будут установлены при запуске брокераautoCreateTopicEnable = false
. Таким образом, когда вам нужно добавить тему, вам нужно добавить ее в веб-интерфейс управления.
Способ добавления темы в веб-интерфейсе следующий:
Аналогично при получении сообщений нам также необходимо настроить группу подписки на сообщения, и есть переключатель для настроек подписки на сообщенияautoCreateSubscriptionGroup
, обычно в производственной среде нам нужно установитьautoCreateSubscriptionGroup=false
, который требует, чтобы администратор зашел в веб-интерфейс управления, чтобы создать группу подписки для получения сообщений.
Способ добавления группы подписки в веб-интерфейсе аналогичен, как показано на следующем рисунке:
Если это просто тестовая среда, мы можем включить эти два переключателя в файле конфигурации, а файл конфигурации находится в
rmq/rmq/brokerconf
Под содержанием
Написать код
Apache официально предоставил пусковое устройство Springboot, соответствующее RocketMQ, что значительно упрощает работу по настройке, которую нам нужно выполнить, поэтому нам нужно сначала создать новый проект Springboot, а затем приступить к его реализации следующим образом.
импортировать зависимости
Сначала импортируйте стартер, предоставленный apache, в pom.xml.
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
настроить application.yml
После импорта зависимости нам нужно настроить адрес сервера имен в application.yml, конкретное значение зависит от вашей машины.
rocketmq:
name-server: localhost:9876
producer:
group: myGroup
Создайте класс производителя
Производитель отправляет сообщение:
@RestController
public class RocketController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 发送给Broker,默认会自动创建topic,topic和tag用冒号分隔
@GetMapping("/rocket/send")
public String rocketSend() {
LocalDateTime currentTime = LocalDateTime.now();
rocketMQTemplate.convertAndSend("rocket-topic-2", currentTime.toString());
return currentTime.toString();
}
// 延时消息,RocketMQ支持这几个级别的延时消息,不能自定义时长
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
@GetMapping("/rocket/delayMsg/send")
public String rocketDelayMsgSend() {
LocalDateTime currentTime = LocalDateTime.now();
rocketMQTemplate.syncSend("rocket-topic-2:tag-2", MessageBuilder.withPayload(currentTime.toString()).build(), 2000, 3);
return currentTime.toString();
}
}
создать потребителя
Потребитель слушает сообщения:
@Component
@Slf4j
public class RokcetServiceListener {
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group-1", topic = "rocket-topic-2")
public class Consumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("consumer1 rocket收到消息:{}", s);
}
}
// RocketMQ支持两种消费方式,集器消费和广播消费
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group-2", topic = "rocket-topic-2",
selectorExpression = "tag2", messageModel = MessageModel.BROADCASTING)
public class Consumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("consumer2 rocket收到消息:{}", s);
}
}
}
контрольная работа
Заходим в браузерlocalhost:8080/rocket/send
, вы можете увидеть возвращенную метку времени.
В то же время вы также можете видеть, что потребитель получил эту информацию на консоли.
Точно так же мы можем просмотреть соответствующие сообщения в визуальной консоли.
Мы также можем просматривать производство и потребление сообщений потребителями и производителями в визуальной консоли, которую читатель может изучить самостоятельно. На этом полный пример использования Docker для установки RocketMQ и его использования с SpringBoot закончен.
вопрос
Вопрос 1: Нет информации о маршруте этой темы: xxxxxx
Благодаря переводу мы можем узнать, что причина этой ошибки в том, что соответствующая очередь сообщений не создается.topic, так что нам нужно зайти в консоль и создать новую тему
Проблема 2: Ненормальное соединение
Если возникает ошибка, подобная следующему исключению подключения
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <172.0.0.120:10909> failed
Возможная причина в том, что вы не поместили проект в docker-контейнер, поэтому ваш код проекта не может быть доступен напрямую с помощью контейнера RocketMQ, поэтому нам нужно поместитьbroker.conf
середина#brokerIP1=xxxxx
Фронт#
убери номер и поставь следующееIP地址
меняй на свойrocketmq
узел контейнераIP地址
, файл конфигурации находится вrmq/rmq/brokerconf
Под содержанием.
В конце концов
Чтобы заполнить дыру, я выбрал в качестве объекта объяснения в примере RocketMQ и объяснил причины, по которым я использую RocketMQ в первом разделе, а затем объяснил несколько важных концепций RocketMQ, а затем использовал докер для быстрого развертывания и установки Rocketmq и проанализировал сценарий установки. Наконец, мы реализовали экземпляр производителя и потребителя через Springboot, текущую основную веб-инфраструктуру, и объяснили возможные проблемы и решения.
Выше приведено все содержание этой статьи. Если вы чувствуете, что это полезно для вас, не отпускайте, обратите внимание, поставьте лайк и поддержите волну. Ваша поддержка - самая большая мотивация для моего обновления.