Интеллект-карта высокого разрешения была синхронизирована с Git: https://github.com/SoWhat1412/xmindfile, подпишитесь на общедоступную учетную запись sowhat1412, чтобы получить огромные ресурсы.
1. Какую проблему решает очередь сообщений
Промежуточное программное обеспечение для обмена сообщениями является популярным в настоящее время промежуточным программным обеспечением, среди которого RabbitMQ занимает определенную долю рынка и в основном используется для асинхронной обработки, развязки приложений, сглаживания пиков трафика, обработки журналов и так далее.
1. Асинхронная обработка
Пользователь входит на веб-сайт, чтобы зарегистрироваться, а затем система отправляет текстовые сообщения и электронные письма, чтобы сообщить об успешной регистрации.Как правило, есть три решения.
- Серийно-последовательное выполнение, проблема в том, что пользователь может им пользоваться после регистрации, нет необходимости ждать проверочный код и почту.
- После успешной регистрации электронная почта и проверочный код выполняются параллельно.Проблема в том, что электронная почта и проверочный код не являются важными задачами.Должна ли регистрация в системе ждать завершения этих двух?
- На основе обработки асинхронного MQ после успешной регистрации пользователя информация асинхронно напрямую отправляется в MQ, а затем почтовая система и система кодов проверки берут на себя инициативу по извлечению данных.
2. Разделение приложений
Например, у нас есть система заказов и система инвентаризации. Когда пользователь размещает заказ, ему нужно вызвать систему инвентаризации для его обработки. Что, если возникнут проблемы с системой инвентаризации?
3. Ограничение трафика
провестивсплеск активности, как лучше спроектировать? Сервисный уровень напрямую принимает мгновенный доступ к плотности и абсолютно не может, по крайней мере, присоединиться к MQ.
4. Обработка журнала
Когда пользователь отправляет запрос через доступ через WebUI, как серверная часть принимает и обрабатывает его?
2. Установка и настройка RabbitMQ
Официальный сайт:woohoo.rabbitcurrent.com/download.contract…Язык разработки:www.erlang.org/Официально установить и разрешить совместимость между версиями Erlang и RabbitMQ! Я здесь, чтобы избежать неприятностей и напрямую использовать Dockerвытащить изображение. скачать:На: Страница управления Учетная запись по умолчанию: guest Пароль по умолчанию: guest . При запуске Docker вы можете указать пароль учетной записи и внешний порт и
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
запускать:Пользователь добавил:Виртуальные хосты эквивалентны БД в mysql. Создайте виртуальные хосты, обычно начинающиеся с /.Авторизуйте пользователя, нажмите /vhost_mmr,Что касается WebUI, вы можете узнать о нем подробнее.
3. Настоящий бой
Официальный сайт RabbitMQ поддерживает режим задачи:ву ву ву.кролик в настоящее время.com/начать.…Создайте проект Maven для импорта необходимых зависимостей:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
0. Получить соединение MQ
package com.sowhat.mq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtils {
/**
* 连接器
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/vhost_mmr");
factory.setUsername("user_mmr");
factory.setPassword("sowhat");
Connection connection = factory.newConnection();
return connection;
}
}
1. Простая очередь
P:Producer Производитель сообщения Посередине: очередь сообщений очереди C:Consumer Потребитель сообщения
package com.sowhat.mq.simple;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
// 创建队列声明
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello Simple";
// exchange,队列,参数,消息字节体
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("--send msg:" + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.simple;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者获取消息
*/
public class Recv {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
newApi();
oldApi();
}
private static void newApi() throws IOException, TimeoutException {
// 创建连接
Connection connection = ConnectionUtils.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 队列声明 队列名,是否持久化,是否独占模式,无消息后是否自动删除,消息携带参数
channel.queueDeclare(Send.QUEUE_NAME,false,false,false,null);
// 定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override // 事件模型,消息来了会触发该函数
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("---new api recv:" + s);
}
};
// 监听队列
channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);
}
// 老方法 消费者 MQ 在3。4以下 用次方法,
private static void oldApi() throws IOException, TimeoutException, InterruptedException {
// 创建连接
Connection connection = ConnectionUtils.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 定义队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(Send.QUEUE_NAME, true, consumer);
while (true) {
// 发货体
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
byte[] body = delivery.getBody();
String s = new String(body);
System.out.println("---Recv:" + s);
}
}
}
В правом верхнем углу вы можете установить частоту обновления страницы, а затем вы можете вручную использовать ее непосредственно в интерфейсе пользовательского интерфейса, как показано ниже:
简单队列的不足
: Связь слишком высока, и производители соответствуют потребителям один за другим. Если есть несколько потребителей, которые хотят потреблять информацию в очереди, это невозможно.
2. Рабочая очередь WorkQueue
В простой очереди между производством и потреблением существует только взаимно однозначное соответствие.В реальной разработке производителю очень просто отправить сообщение, а потребителю нужно интегрироваться с бизнесом.После того, как потребитель получит сообщение , требуется время для его обработки.В очереди может быть задержка сообщений. Так что, если несколько потребителей могут ускорить потребление.
1. круговой опрос
Код программирует одного производителя и двух потребителей:
package com.sowhat.mq.work;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取 channel
Channel channel = connection.createChannel();
// 声明队列
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i <50 ; i++) {
String msg = "hello-" + i;
System.out.println("WQ send " + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
}
}
};
boolean autoAck = true;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000 );
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
}
}
};
boolean autoAck = true;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
Феномен:
Потребитель 1 и Потребитель 2 обрабатывают одинаковое количество данных:
Потребитель 1: обрабатывать четные числа
Потребитель 2: обработка нечетных чисел
Этот способ называется轮询分发(round-robin)
В результате, независимо от того, какой из двух потребителей занят,Данные всегда одни для вас и одни для меня, когда MQ отправляет данные двум потребителям, он не знает о производительности потребителя, и по умолчанию выставляется дождь и роса. На данный момент autoAck = true.
2. честная отправка
Если вы хотите достичь公平分发
, чтобы потребители уведомляли MQ после получения части данных, а затем позволяли MQ отправлять данные. Автоответчик отключить!
package com.sowhat.mq.work;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取 channel
Channel channel = connection.createChannel();
// s声明队列
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只发送一个消息
// 从而限制一次性发送给消费者到消息不得超过1个。
int perfetchCount = 1;
channel.basicQos(perfetchCount);
for (int i = 0; i <50 ; i++) {
String msg = "hello-" + i;
System.out.println("WQ send " + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 保证一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 保证一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
结果
: достигается справедливое распределение, и потребитель 2 потребляет в два раза больше, чем потребитель 1.
3. публикация/подписка в режиме публикации-подписки
Как и при подписке и публикации публичных аккаунтов, указывать routingKey не нужно:
Интерпретация:
- Один производитель, много потребителей
- У каждого потребителя своя очередь
- Производитель не отправляет сообщение непосредственно в очередь, а отправляет его
交换机转化器(exchange)
. - Каждая очередь должна быть привязана к обмену.
- Сообщение, отправленное производителем, попадает в очередь через коммутатор, так что одно сообщение может быть использовано несколькими потребителями.
Режиссер:
package com.sowhat.mq.ps;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 分发= fanout
// 发送消息
String msg = "hello ps ";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("Send:" + msg);
channel.close();
connection.close();
}
}
Куда пропали новости? Забыли, в RabbitMQ только очереди имеют объем памяти,Поскольку в данный момент очередь не привязана к обмену, сообщение теряется.. потребитель:
package com.sowhat.mq.ps;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String QUEUE_NAME = "test_queue_fanout_email";
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机转发器
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );
// 保证一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.ps;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String QUEUE_NAME = "test_queue_fanout_sms";
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机转发器
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );
// 保证一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
При этом вы также можете вручную добавить на биржу мониторинг очереди.
4. Шаблон маршрутизации с подстановочными знаками
АТС (коммутатор, повторитель):С одной стороны, он принимает сообщения производителя, а с другой стороны, отправляет сообщения в очередь..
Анонимная переадресация обозначается "", например, передняя очередь в простую очередь и WorkQueue.fanout
: не обрабатывать ключи маршрутизации.Нет необходимости указывать routingKey, нам просто нужно привязать очередь к обмену,Сообщение будет отправлено во все очереди.direct
: обрабатывать ключи маршрутизации,Необходимо указать routingKey, в это время производитель будет указывать ключ при отправке данных, и очередь задач тоже будет указывать ключ, в очередь будет отправлено только сообщение с таким же ключом. Как показано ниже
package com.sowhat.mq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// exchange
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String msg = "hello info!";
// 可以指定类型
String routingKey = "info";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("Send : " + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.routing;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.routing;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 绑定种类似 Key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
WebUI:
缺点
: ключ маршрутизации должен быть четким, и обычное нечеткое сопоставление не может быть достигнуто.
5. Темы
Сопоставьте ключ маршрутизации с шаблоном, # означает соответствие >= 1 символу, * означает соответствие одному. Производитель будет нести routingKey, но MQ потребителя будет нести нечеткий routingKey.Товары: публиковать, удалять, изменять, запрашивать.
package com.sowhat.mq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String msg = "商品!";
// 可以指定类型
String routingKey = "goods.find";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("Send : " + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.topic;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static final String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.topic;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static final String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 此乃重点
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件触发机制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
6. Постоянство и непостоянство MQ
Поскольку сообщение находится в памяти, если MQ зависает, сообщение также теряется, поэтому следует учитывать сохраняемость MQ. MQ поддерживает постоянство,
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
boolean durable
Является ли указать, может ли она быть сохранена, если мы поместим программу вdurable = false
изменить наtrue
Это невозможно! потому что мы определилиtest_work_queue
, эта очередь была объявлена несохраненной.结论
: MQ не позволяет изменять существующий параметр очереди.
7. Сторона потребителя подтверждает сообщение вручную и автоматически
// 自动应答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
Когда MQ отправляет данные потребителю, потребитель должен ответить MQ на полученную информацию о паре.
еслиautoAck = true
выражатьрежим автоматического подтверждения, как только MQ отправит сообщение потребителю, оно удалит сообщение из памяти. Если потребитель получил сообщение, но не закончил его использование, а данные в MQ были удалены, это приведет к потере обрабатываемого сообщения.
еслиautoAck = false
выражатьРучной режим подтверждения, если потребитель вешает трубку, MQ может отправить информацию другим потребителям, поскольку он не получил информацию о получении.
MQ поддерживает ответ на сообщение (Message acknowledgement
), потребитель отправляет ответ на сообщение, чтобы сообщить MQ, что сообщение использовано, и MQ удаляет его из памяти. режим ответа на сообщениеЗначение по умолчанию — ложь.
8. Механизм подтверждения сообщения RabbitMQ на стороне производителя (транзакция + подтверждение)
В RabbitMQ мы можем решить проблему аномальной потери данных сервера MQ за счет сохраняемости, ноКак производитель гарантирует, что данные отправляются в MQ? Производитель также не знает по умолчанию. Такие каккак решитьШерстяная ткань?
1. AMQP-транзакция
Первый способ AMQP реализует механизм транзакций, аналогичный механизму транзакций MySQL. txSelect: пользователь устанавливает текущий канал в режим перехода. txCommit: используется для фиксации транзакций. txRollback: используется для отката транзакции.
Все вышеперечисленные операции над производителями.
package com.sowhat.mq.tx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TxSend {
public static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello tx message";
try {
//开启事务模式
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
int x = 1 / 0;
// 提交事务
channel.txCommit();
} catch (IOException e) {
// 回滚
channel.txRollback();
System.out.println("send message rollback");
} finally {
channel.close();
connection.close();
}
}
}
---
package com.sowhat.mq.tx;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TxRecv {
public static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv[tx] msg:" + new String(body, "utf-8"));
}
});
channel.close();
connection.close();
}
}
Недостаток заключается в том, что большое количество запросов пробуется, а затем завершается с ошибкой, а затем откатывается, что снижает пропускную способность MQ.
2. Подтвердите режим.
Сторона производителя подтверждает принцип реализацииПроизводитель устанавливает канал в режим подтверждения. Как только канал переходит в режим подтверждения, информации, опубликованной на канале, будет присвоен уникальный идентификатор (начиная с 1). После того, как сообщение будет доставлено во все соответствующие очереди, брокер вернет Отправить подтверждение для производителя (включая уникальный идентификатор сообщения), которое позволяет производителю узнать, что сообщение прибыло в очередь назначения правильно.Если сообщение и очередь являются постоянными, сообщение подтверждения будет отправлено после того, как сообщение будет записано на диск. Поле тега доставки в сообщении подтверждения, которое брокер отправляет обратно производителю, содержит сообщение подтверждения серийного номера.Кроме того, брокер также может установить множественное поле basic.ack, чтобы указать, что вся информация была обработана. перед серийным номером.
Самым большим преимуществом режима подтверждения является то, что он异步
из. После отправки первого сообщения нет необходимости ждать ответа перед отправкой второго сообщения.
Включите режим подтверждения:channel.confimSelect()
Режим программирования:
1. После отправки сообщения обычно waitForConfirms()
package com.sowhat.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send1 {
public static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将channel模式设置为 confirm模式,注意设置这个不能设置为事务模式。
channel.confirmSelect();
String msg = "hello confirm message";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
if (!channel.waitForConfirms()) {
System.out.println("消息发送失败");
} else {
System.out.println("消息发送OK");
}
channel.close();
connection.close();
}
}
---
package com.sowhat.confirm;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv {
public static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv[tx] msg:" + new String(body, "utf-8"));
}
});
}
}
2. Отправить пакет данных пакетами waitForConfirms()
package com.sowhat.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send2 {
public static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将channel模式设置为 confirm模式,注意设置这个不能设置为事务模式。
channel.confirmSelect();
String msg = "hello confirm message";
// 批量发送
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
// 确认
if (!channel.waitForConfirms()) {
System.out.println("消息发送失败");
} else {
System.out.println("消息发送OK");
}
channel.close();
connection.close();
}
}
---
接受信息跟上面一样
3. Асинхронный режим подтверждения, предоставляющий метод обратного вызова.
Channel
Метод обратного вызова ConfirmListener(), предоставляемый объектом, содержит толькоdeliveryTag
(включая порядковый номер отправленного в данный момент сообщения), нам нужно поддерживать по одному для каждого канала самостоятельноunconfirm
набор порядковых номеров сообщений, каждыйpublish
Часть данных, элементы в коллекции увеличиваются на 1, и каждый обратный вызов вызывается один разhandleAck
метод,unconfirm
Установить удалить один из ответов (multiple=false
) или несколько (multiple=true
) рекорды, с точки зрения операционной эффективности,unconfirm
Наборы лучше всего использовать отсортированные наборыSortedSet
структура хранения.
![Вставьте сюда описание изображения](https://img-blog.csdnimg.cn/2020071921400852.png#pic_ введите)
package com.sowhat.mq.confirm;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
public class Send3 {
public static final String QUEUE_NAME = "test_queue_confirm3";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生产者调用confirmSelect
channel.confirmSelect();
// 存放未确认消息
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// 添加监听通道
channel.addConfirmListener(new ConfirmListener() {
// 回执有问题的
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("--handleNack---multiple");
confirmSet.headSet(deliveryTag + 1).clear();
} else {
System.out.println("--handleNack-- multiple false");
confirmSet.remove(deliveryTag);
}
}
// 没有问题的handleAck
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("--handleAck---multiple");
confirmSet.headSet(deliveryTag + 1).clear();
} else {
System.out.println("--handleAck--multiple false");
confirmSet.remove(deliveryTag);
}
}
});
// 一般情况下是先开启 消费者,指定好 exchange跟routingkey,如果生产者等routingkey 就会触发这个return 方法
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("---- handle return----");
System.out.println("replyCode:" + replyCode );
System.out.println("replyText:" +replyText );
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
});
String msgStr = "sssss";
while(true){
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());
confirmSet.add(nextPublishSeqNo);
Thread.sleep(1000);
}
}
}
总结
: режим AMQP имеет лучшую производительность, чем режим подтверждения, и рекомендуется использовать последний.
9. Очередь задержки RabbitMQ
Оплата заказа Taobao, проверочный код и другие услуги, ограниченные по времени.
Map<String,Object> headers = new HashMap<String,Object>();
headers.put("my1","111");
headers.put("my2","222");
AMQP.BasicProperties build = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();
10. SpringBoot Tpoic Demo
Карта требований:Создайте новый проект SpringBoot и добавьте следующие зависимости:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1. Продюсер
application.yml
spring:
rabbitmq:
host: 127.0.0.1
username: admin
password: admin
Прецедент:
package com.sowhat.mqpublisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqpublisherApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
void userInfo() {
/**
* exchange,routingKey,message
*/
this.amqpTemplate.convertAndSend("log.topic","user.log.error","Users...");
}
}
2. Потребители
application.xml
spring:
rabbitmq:
host: 127.0.0.1
username: admin
password: admin
# 自定义配置
mq:
config:
exchange_name: log.topic
# 配置队列名称
queue_name:
info: log.info
error: log.error
logs: log.logs
Три разных потребителя:
package com.sowhat.mqconsumer.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @QueueBinding value属性:用于绑定一个队列。@Queue去查找一个名字为value属性中的值得队列,如果没有则创建,如果有则返回
* type = ExchangeTypes.TOPIC 指定交换器类型。默认的direct交换器
*/
@Service
public class ErrorReceiverService {
/**
* 把一个方法跟一个队列进行绑定,收到消息后绑定给msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue_name.error}"),
exchange = @Exchange(value = "${mq.config.exchange_name}", type = ExchangeTypes.TOPIC),
key = "*.log.error"
)
)
public void process(String msg) {
System.out.println(msg + " Logs...........");
}
}
---
package com.sowhat.mqconsumer.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @QueueBinding value属性:用于绑定一个队列。
* @Queue去查找一个名字为value属性中的值得队列,如果没有则创建,如果有则返回
*/
@Service
public class InfoReceiverService {
/**
* 添加一个能够处理消息的方法
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value ="${mq.config.queue_name.info}"),
exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),
key = "*.log.info"
))
public void process(String msg){
System.out.println(msg+" Info...........");
}
}
--
package com.sowhat.mqconsumer.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @QueueBinding value属性:用于绑定一个队列。
* @Queue去查找一个名字为value属性中的值得队列,如果没有则创建,如果有则返回
*/
@Service
public class LogsReceiverService {
/**
* 添加一个能够处理消息的方法
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value ="${mq.config.queue_name.logs}"),
exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),
key = "*.log.*"
))
public void process(String msg){
System.out.println(msg+" Error...........");
}
}
Для получения подробной информации об установке и коде см. справочную загрузку:
Суммировать
Если вам нужно указать режим, он обычно устанавливается на стороне потребителя, и настраивается гибкость.
модель | Очередь производителей | обмен производителями | Ключ маршрутизации производителя | потребительская биржа | потребительская очередь | routingKey |
---|---|---|---|---|---|---|
Простой (простой режим используется реже) | уточнить | Не указывать | Не указывать | Не указывать | уточнить | Не указывать |
WorkQueue (реже используется несколькими потребителями) | уточнить | Не указывать | Не указывать | Не указывать | уточнить | Не указывать |
разветвление (режим публикации/подписки) | Не указывать | уточнить | Не указывать | уточнить | уточнить | Не указывать |
прямой (режим маршрутизации) | Не указывать | уточнить | уточнить | уточнить | уточнить | Consumer routingKey точно указывает несколько |
тема (нечеткое соответствие тем) | Не указывать | уточнить | уточнить | уточнить | уточнить | Consumer routingKey может выполнять нечеткое сопоставление |
Ссылаться на
SpringBoot интегрирует RabbitMQ
Установка RabbitMQ и демонстрация интеграции SpringBoot
В этой статье используетсяmdniceнабор текста