1 Обзор
В этой статье мы познакомим вас сSpring Cloud Stream
, платформа для создания микросервисных приложений, управляемых сообщениями, на основе распространенного брокера обмена сообщениями, такого какRabbitMQ
,Apache Kafka
и т.д.) подключение.
Spring Cloud Stream
Опирайтесь на существующие фреймворки Spring, такие какSpring Messaging
иSpring Integration
) выше. Хотя эти фреймворки проверены в боевых условиях и работают очень хорошо, внедрение и использованиеmessage broker
Тесно связаны. Кроме того, масштабирование для определенных вариантов использования иногда затруднено.
Spring Cloud Stream
Идея очень типичнаSpring Boot
концепция--抽象地讲,让Spring根据配置和依赖关系管理在运行时找出实现自动注入
.这意味着您可以通过更改依赖项和配置文件来更改message broker
. допустимыйздесьНайдите различные брокеры сообщений, которые в настоящее время поддерживаются.
В этой статье будет использоватьсяRabbitMQ
в видеmessage broker
. Перед этим давайте разбиратьсяbroker
Некоторые основные понятия (прокси) и зачем они нужны в архитектуре, ориентированной на микросервисы.
2. Сообщения в микросервисах
В архитектуре микросервисов у нас есть много небольших приложений, которые взаимодействуют друг с другом для выполнения запросов — одно из их основных преимуществ — улучшенная масштабируемость. Обычно запрос проходит от нескольких нижестоящих микросервисов до завершения. Например, предположим, что у нас естьService-A
Внутренний вызовService-B
иService-C
для выполнения запроса:
Да, будут другие компоненты, такие какSpring Cloud Eureka
,Spring Cloud Zuul
И так далее, но нас по-прежнему беспокоит сосредоточенность на вопросах, специфичных для этого типа архитектуры.
допустим по какой-то причинеService-B
Для ответа требуется больше времени. может быть, он выполняетсяI/O操作
или на длительный срокDB事务
, или дальнейшие призывы к другим причинамService-B
Более медленные услуги делают невозможным быть более эффективным.
Теперь мы можем начать большеService-B
Пример решить эту проблему, это нормально, ноService-A
на самом деле очень отзывчивый, нужно подождатьService-B
ответ для дальнейшей обработки. это приведет кService-A
Больше запросов не может быть получено, а значит, мы также должны начатьService-A
несколько экземпляров .
Другой способ справиться с подобной ситуацией — использовать архитектуру микросервисов, управляемую событиями. что в основном означаетService-A
Не напрямуюHTTP
перечислитьService-B
илиService-C
, но опубликуйте запрос или событие вmessage broker
(брокер сообщений).Service-B
иService-C
будетmessage broker
Подписчик на это событие в (брокере сообщений).
Это имеет ряд преимуществ по сравнению с традиционными микросервисными архитектурами, основанными на HTTP-вызовах:
- Улучшите масштабируемость и надежность — теперь мы знаем, какие сервисы являются истинными узкими местами во всем приложении.
- Поощряйте слабую связанность -
Service-A
не нужно знатьService-B
иService-C
. его просто нужно подключить кmessage broker
и публиковать события. Дальнейшая организация событий зависит от настроек прокси. Этим способом,Service-A
Может работать независимо, что является одной из основных концепций микросервисов. - Взаимодействие с устаревшими системами - часто мы не можем переместить все в новый стек технологий. Нам все еще нужно использовать устаревшие системы, которые медленно, но надежны.
3. RabbitMQ
高级消息队列协议(AMQP)
даRabbitMQ
Протокол передачи сообщений. Несмотря на то чтоRabbitMQ
Некоторые другие протоколы поддерживаются, ноAMQP
Более популярен из-за совместимости и большого количества функций, которые он предлагает.
3.1 Проект архитектуры RabbitMQ
Итак, издатель для публикации новостейRabbitMQ
позвонил вExchange
(Обменник).Exchange
(обмен) получает сообщения и направляет их одному или несколькимQueues
(очередь). Алгоритм маршрутизации зависит отExchange
(Обменник) тип иrouting
(маршрутизация) ключ/заголовок (передается вместе с сообщением). будетExchange
(переключатель) подключен кQueues
Эти правила (очереди) называютсяbindings
(привязка).
Привязки могут быть 4 видов:
-
Direct: он основан на
routing key
(ключ маршрутизации) будетExchange
(переключатель) для маршрутизации непосредственно к определенномуQueues
(очередь). -
Fanout: он направляет сообщение в привязку
Exchange
(обмен) все вQueues
(очередь). -
Topic: основывается на точном совпадении или частичных данных
routing key
(ключ маршрутизации) соответствует перенаправлению сообщения на (0, 1 или более)Queues
(очередь). -
Headers: это похоже на
Topic
(Тема) Тип биржи, но это базаrouting header
(заголовок маршрутизации) вместоrouting key
(ключ маршрутизации) для маршрутизации.
источник:www.cloudamqp.com/
пройти черезExchange
(Обменник) иQueues
(Очередь) весь процесс публикации и потребления сообщений осуществляется черезChannel
(Канал) готов.
Для получения дополнительной информации о маршрутизации, пожалуйста, посетите здесьСсылка на сайт.
3.2 Настройки RabbitMQ
3.2.1 Установка
мы можем начать сздесьЗагрузите и установите двоичные файлы на основе нашей операционной системы.
Однако в этой статье мы будем использоватьcloudamqp.com
Предусмотрена бесплатная облачная установка. Просто зарегистрируйтесь в сервисе и авторизуйтесь.
Нажмите на главную панель инструментов创建新实例
:
Затем дайте вашему экземпляру имя и перейдите к следующему шагу:
Затем выберите зону доступности:
Наконец, чтобы просмотреть информацию об экземпляре, нажмите в правом нижнем углу创建实例
:
Вот и все. Теперь запускаем один в облакеRabbitMQ
пример. Для получения дополнительной информации об экземплярах перейдите на панель инструментов и щелкните新创建的实例
:
Мы можем видеть хосты, на которых мы можем получить доступ к экземпляру RabbitMQ, например имя пользователя и пароль, необходимые для подключения из нашего проекта:
Мы будем использовать в приложении SpringAMQP URL
Подключитесь к этому экземпляру, так что запишите его где-нибудь.
Вы также можете нажать кнопкуRabbitMQ manager
Для просмотра консоли управления. Он будет использовать его для управления вашимRabbitMQ
пример.
Конфигурация проекта
Теперь, когда наша настройка готова, давайте создадим наш сервис:
- cloud-stream-producer-rabbitmq: как издатель, отправляйте сообщения на
RabbitMQ
- cloud-stream-consumer-rabbitmq: потребители потребляют сообщения
использоватьSpring Initializr
Создайте проект леса. Это будет нашеproducer
проект, мы будем использоватьREST
Конечная точка публикует сообщение.
выбери свой любимыйSpring Boot
версия, добавитьWeb
иCloud Stream
зависимости, сборкаMaven
проект:
Уведомление:
Пожалуйста, обрати вниманиеcloud-stream
зависимости. Это также требует что-то вродеRabbitMQ
,Kafka
Подождите, пока зависимости связующего заработают.
Так как мы будем использоватьRabbitMQ
, добавьте следующееMaven
Зависимости:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
В качестве альтернативы мы также можем использовать комбинацию двухspring-cloud-starter-stream-rabbit
:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Используя тот же метод, создайте потребительский товар, но используйте толькоspring-cloud-starter-stream-rabbit
зависимости.
4. Создайте производителя
Как упоминалось ранее, весь процесс доставки сообщения от издателя в очередь выполняется через каналы. Итак, давайте создадимHelloBinding
Интерфейс, который содержит наш механизм сообщенияgreetingChannel
:
interface HelloBinding {
@Output("greetingChannel")
MessageChannel greeting();
}
Поскольку это опубликует сообщение, мы используем@Output
аннотация. Имя метода может быть каким угодно, конечно, в интерфейсе может быть больше одного метода.Channel
(ряд).
Теперь давайте создадимREST
, который отправляет сообщения на этотChannel
(ряд)
@RestController
public class ProducerController {
private MessageChannel greet;
public ProducerController(HelloBinding binding) {
greet = binding.greeting();
}
@GetMapping("/greet/{name}")
public void publish(@PathVariable String name) {
String greeting = "Hello, " + name + "!";
Message<String> msg = MessageBuilder.withPayload(greeting)
.build();
this.greet.send(msg);
}
}
Выше, мы создали одинProducerController
класс, который имеетMessageChannel
свойства типаgreet
. Это инициализируется в конструкторе с помощью метода, который мы объявили ранее.
Уведомление: Мы можем сделать то же самое в краткой форме, но мы используем разные имена, чтобы дать вам более четкое представление о том, как вещи связаны.
Тогда у нас есть простойREST
Интерфейс, который получаетPathVariable
изname
и использоватьMessageBuilder
СоздаватьString
тип сообщения. Наконец, мы используемMessageChannel
Вверх.send()
способ публикации сообщения.
Теперь мы добавим в основной класс@EnableBinding
Обратите внимание, входящийHelloBinding
РассказыватьSpring
нагрузка.
@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Наконец, мы должны сказатьSpring
Как подключиться кRabbitMQ
(через предыдущийAMQP URL
), и воляgreetingChannel
Подключиться к доступной потребителю.
Оба они находятся вapplication.properties
определено в:
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=8080
5. Создайте потребителя
Теперь нам нужно прослушать канал, который мы создали ранее.greetingChannel
. Создадим для него привязку:
public interface HelloBinding {
String GREETING = "greetingChannel";
@Input(GREETING)
SubscribableChannel greeting();
}
Два очень характерных различия между привязкой производства. Потому что мы являемся потребителем, поэтому мы используемSubscribableChannel
и@Input
Аннотация связана сgreetingChannel
, сюда будут помещены данные сообщения.
Теперь давайте создадим методы для обработки данных:
@EnableBinding(HelloBinding.class)
public class HelloListener {
@StreamListener(target = HelloBinding.GREETING)
public void processHelloChannelGreeting(String msg) {
System.out.println(msg);
}
}
Здесь мы создаемHelloListener
класс, вprocessHelloChannelGreeting
добавлен метод@StreamListener
аннотация.这个方法需要一个字符串作为参数,我们刚刚在控制台打印了这个参数。我们还在类添加@EnableBinding
ВключеноHelloBinding
.
Опять же, мы используем здесь@EnableBinding
, а не основной класс, чтобы рассказать нам, как его использовать.
Взгляните на наш основной класс, у нас нет никаких модификаций:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
существуетapplication.properties
Файл конфигурации, нам нужно определить одно и то же свойство с производителями, в дополнение к изменению порта
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination=greetings
server.port=9090
6. Все тесты
Давайте запустим сервис производителя и потребителя одновременно. Во-первых, давайте поразим конечную точкуhttp://localhost:8080/greet/john
производить сообщения.
См. содержание сообщения в журнале потребителя:
Мы запускаем другой экземпляр службы потребителя (на другом порту (9091)) с помощью следующей команды:
$ mvn spring-boot:run -Dserver.port=9091
Теперь, когда мы нажимаем на производителяREST
Когда новости по производству конечных точек мы видели, что два потребителя получили сообщение:
Это может быть то, что мы хотим в некоторых случаях использования. Но что, если мы хотим, чтобы только один потребитель получил сообщение?application.properties
Создайте группу потребителей в . Конфигурационный файл потребителя:
spring.cloud.stream.bindings.greetingChannel.group = greetings-group
Теперь снова запустите 2 экземпляра потребителя на разных портах и снова посмотрите на производителя, создающего сообщения:
Все это тоже можноRabbitMQ
Консоль менеджера видит:
7. Заключение
В этой статье мы объяснили основные концепции обмена сообщениями, их роль в микросервисах и способы их использования.Spring Cloud Stream
Пойми. Мы используемRabbitMQ
в качестве брокера сообщений, но мы также можем использовать других популярных брокеров, таких как如Kafka
, просто измените конфигурацию и зависимости.
Как всегда, пример кода, использованный в этой статье, полностью доступен на GitHub.исходный код.
оригинал:stack abuse.com/spring - уродливо...
автор:Dhananjay Singh
Переводчик:Ли Донг