Отправка сообщений в реальном времени на основе RabbitMQ
Несколько способов реализации push на стороне сервера
Веб-приложения основаны на режиме запроса/ответа протокола HTTP, который не может поддерживать длительное соединение, как протокол TCP, поэтому веб-приложениям трудно обеспечить отправку сообщений в реальном времени, как мобильный телефон. В настоящее время методы отправки сообщений веб-приложений в основном включают следующее:
1. Короткий опрос Ajax
Опрос Ajax в основном реализует загрузку данных через JS на стороне страницы для периодического асинхронного обновления задачи, но этот метод имеет плохой эффект в реальном времени и большую нагрузку на серверную сторону.
2. Долгий опрос
Долгий опрос в основном осуществляется с помощью механизма Ajax, но отличается от традиционных приложений Ajax тем, что серверная сторона длительного опроса блокирует запрос, когда нет данных, до тех пор, пока не будут сгенерированы новые данные или пока не истечет время запроса, а затем клиент повторно установит подключение для получения данных, конкретная реализация показана на рисунке 1. Однако сервер с длительным опросом будет занимать ресурсы в течение длительного времени, и если сообщения будут отправляться часто, это вызовет большую нагрузку на сервер.
Рисунок 1. Реализация длинного опроса
3. Двусторонняя связь WebSocket
WebSocket — это новый протокол связи в HTML5, который может реализовать полнодуплексную связь между браузером и сервером. Если и браузер, и сервер поддерживают протокол WebSocket, реализованный таким образом push сообщений, несомненно, является наиболее эффективным и лаконичным. А последние версии IE, Firefox, Chrome и других браузеров уже поддерживают протокол WebSocket, а версии после Apache Tomcat 7.0.27 тоже поддерживают WebSocket.
Введение в RabbitMQ
AMQP, Advanced Message Queuing Protocol, является открытым стандартом для протоколов прикладного уровня, разработанным для промежуточного программного обеспечения, ориентированного на сообщения. Промежуточное программное обеспечение сообщений в основном используется для разделения компонентов, отправителю сообщения не нужно знать о существовании потребителя сообщения, и наоборот. Основными функциями AMQP являются сообщения, очереди и маршрутизация, надежность и безопасность. RabbitMQ — это реализация AMQP с открытым исходным кодом, серверная часть написана на языке Erlang и поддерживает несколько клиентов, таких как: Python, Ruby, .NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP. и т.д. Поддерживается Ajax. Он используется для хранения и пересылки сообщений в распределенных системах и хорошо работает с точки зрения простоты использования, масштабируемости и высокой доступности. В RabbitMQ есть несколько концепций, которые нам необходимо понять перед использованием, в основном это следующие: брокер, биржа, очередь, привязка, ключ маршрутизации, производитель, потребитель, канал.
1.Broker
Проще говоря, это сущность сервера очереди сообщений.
2.Exchange
Получите сообщение, перенаправьте его в связанную очередь и укажите, в какую очередь направляется сообщение в соответствии с какими правилами.
3.Queue
Носитель очереди сообщений используется для хранения сообщений.Очереди с одними и теми же атрибутами могут быть определены повторно, и каждое сообщение будет помещено в одну или несколько очередей.
4.Binding
Binding, его роль заключается в связывании Exchange и Queue в соответствии с правилами маршрутизации.
5.RoutingKey
Ключевое слово маршрутизации, Exchange выполняет доставку сообщений в соответствии с этим ключевым словом.
6.Producter
Производитель сообщений, программа, которая создает сообщения.
7.Consumer
Потребители сообщений, программы, получающие сообщения.
8.Channel
Канал сообщений.В каждом соединении клиента может быть установлено несколько каналов, и каждый канал представляет сеанс.
Установите службу RabbitMQ
Решение, предложенное в этой статье, основано на сервере сообщений RabbitMQ, поэтому в начале необходимо установить службу RabbitMQ и соответствующие подключаемые модули. RabbitMQ разработан на основе языка Erlang, поэтому сначала необходимо установить среду выполнения Erlang. Ниже в качестве примера используется 64-разрядный сервер CentOS6.5 для описания процесса установки всей службы:
1. Скачайте erlang-R15B-02.1.el5.x86_64.rpm и установите
# rpm -ivh erlang-R15B-02.1.el5.x86_64.rpm
2. Скачайте rabbitmq-server-3.2.1-1.noarch.rpm и установите
# rpm -ivh rabbitmq-server-3.2.1-1.noarch.rpm
3. Включите связанные плагины
# rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp
4. Перезапустите службу RabbitMQ.
# service rabbitmq-server restart
5. Убедитесь, что установка прошла успешно
В настоящее время мы можем проверить рабочее состояние RabbitMQ через веб-браузер.Введите http://{server_ip}:15672 в браузере, войдите в систему с пользователем по умолчанию и паролем guest/guest, а затем проверьте рабочее состояние. из RabbitMQ.
Отправка сообщений в реальном времени на основе RabbitMQ
RabbitMQ имеет множество сторонних плагинов, которые могут создавать множество расширенных приложений на основе протокола AMQP. Подключаемый модуль Web STOMP представляет собой подключаемый модуль текстового протокола STOMP, основанный на AMQP. WebSocket может легко реализовать обмен сообщениями в реальном времени между браузерами и серверами. Конкретная реализация показана на рисунке 2 ниже:
Рисунок 2. Реализация обмена сообщениями в реальном времени между браузером и сервером
отправитель сообщения
Давайте возьмем пример, чтобы проиллюстрировать весь процесс, используя Java в качестве отправителя сообщений клиента RabbitMQ и веб-браузер в качестве получателя сообщений.
Листинг 1. Сторонний код Java
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Program {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.101");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("rabbitmq", "fanout");
String routingKey = "rabbitmq_routingkey";
String message = "{\"name\":\"Welcome to RabbitMQ message push!\"}";
channel.basicPublish("rabbitmq", routingKey,null, message.getBytes());
System.out.println("[x] Sent Message:"+message);
channel.close();
connection.close();
}
}
Здесь мы используем клиентскую библиотеку Java, официально предоставленную RabbitMQ, для отправки сообщений Процесс использования очереди сообщений примерно показан на рисунке 3:
Рисунок 3. Поток сообщений о доставке клиента
После того, как обмен получит сообщение, он направит сообщение в соответствии с ключом сообщения и установленной привязкой и, наконец, доставит его в одну или несколько очередей для обработки сообщения. RabbitMQ предварительно устанавливает некоторые обмены.Если клиент не объявляет обмен, RabbitMQ будет использовать обмен по умолчанию в соответствии с типом обмена.Подробности см. в Таблице 1.
Таблица 1. Предустановленные имена обмена
Name | Default pre declared names |
---|---|
Direct exchange | amq.direct |
Fanount exchange | amq.fanout |
Topic exchange | amq.topic |
Heades exchange | amq.headers |
Тип обмена
Существуют следующие виды обмена:
1.Direct exchange
Прямой обмен полностью доставляется в соответствии с ключом. Только сообщение, чей ключ точно совпадает с ключом маршрутизации на момент привязки, получит сообщение. См. рисунок 4 на официальном сайте для более интуитивного понимания прямого обмена. .
Рисунок 4. Прямой обмен
2.Fanount exchange
Fanunt вообще не заботится о ключе и напрямую использует широковещательный метод для доставки сообщения.Все очереди, привязанные к коммутатору, получат сообщение.Подробности см. на рисунке 5 на официальном сайте.
Рисунок 5. Обмен фанунтами
3.Topic exchange
Тематический обмен выполнит сопоставление с шаблоном в соответствии с ключом, а затем доставит его. Только очередь, соответствующая заданному ключу маршрутизации, может получить сообщение.
4.Headers exchange
Обмен заголовками использует заголовок сообщения вместо ключа маршрутизации в качестве ключевого слова для маршрутизации, но этот тип обмена редко используется в практических приложениях.
сохранение сообщения
RabbitMQ поддерживает сохраняемость сообщений, т. е. сохраняет данные сообщения на диск. Если сервер сообщений отключается на полпути, постоянное сообщение будет повторно отправлено при его следующем открытии. Сохранение очереди сообщений должно обеспечивать обмен (укажите durable = 1), очередь (укажите durable=1) и три части сообщения (delivery_mode=2) являются постоянными. По соображениям безопасности данных общие сообщения сохраняются.
получатель сообщения
Листинг 2. Код JavaScript
// Stomp.js boilerplate
if (location.search == '?ws') {
var ws = new WebSocket('ws://192.168.1.102:15674/ws');
} else {
var ws = new SockJS('http://192.168.1.102:15674/stomp');
}
// Init Client
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
// Declare on_connect
var on_connect = function(x) {
client.subscribe("/exchange/rabbitmq/rabbitmq_routingkey", function(d) {
print_first(d.body);
});
};
// Declare on_error
var on_error = function() {
console.log('error');
};
// Conect to RabbitMQ
client.connect('guest', 'guest', on_connect, on_error, '/');
Подключаемый модуль RabbitMQ Web STOMP можно рассматривать как мост между HTML5 WebSocket и протоколом STOMP, и его цель также состоит в том, чтобы позволить браузерам использовать RabbitMQ. Когда на сервере сообщений RabbitMQ включены подключаемые модули STOMP и Web STOMP, браузер может легко использовать клиент WebSocket или SockerJS для связи с сервером RabbitMQ.
RabbitMQ Web STOMP — это мост к протоколу STOMP, поэтому его синтаксис также полностью соответствует протоколу STOMP. STOMP — это протокол на основе фреймов, аналогичный фрейму HTTP. Фрейм содержит команду, ряд необязательных заголовков и тело. Пользовательский агент клиента STOMP может, конечно, играть две роли одновременно: как производитель, отправляющий сообщения на сервер через кадр SEND; как потребитель, отправляющий кадры SUBCRIBE получателю и через кадр MESSAGE. Получить сообщения с сервера.
Чтобы использовать протокол STOMP с WebSocket на веб-странице, вам нужно всего лишь загрузить stomp.js.Учитывая, что старая версия браузера не поддерживает WebSocket, SockJS обеспечивает поддержку имитации WebSocket. Протокол STOMP, используемый на веб-странице, подробно описан в следующем списке кодов:
Листинг 3. Код JavaScript
// 初始化 ws 对象
if (location.search == '?ws') {
var ws = new WebSocket('ws://192.168.1.102:15674/ws');
} else {
var ws = new SockJS('http://192.168.1.102:15674/stomp');
}
// 建立连接
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
// 定义连接成功回调函数
var on_connect = function(x) {
console.log('connect successfully');
// 发送消息
client.send(destination,head,body);
// 发送消息
client.subcribe(destination,callback);
// 默认主动 ACK,手动 ACK
client.subcribe(destination,function(message){
Message.ack();
},{ack:'client'});
// 事务支持
var tx = client.begin();
client.send(destination,head,body);
tx.commit();
};
// 定义连接失败回调函数
var on_error = function(error) {
console.log(error.headers.message);
};
// 连接消息服务器
client.connect(login, password, on_connect, on_error, '/');
Упомянутое выше назначение определено в RabbitMQ Web STOM, В соответствии с различными сценариями использования в основном существуют следующие четыре типа:
1./exchange/<exchangeName>
Для кадра SUBCRIBE назначение обычно имеет форму /exchange/
Для кадра SEND назначение обычно имеет вид /exchange/
2./queue/<queueName>
Для кадра SUBCRIBE пункт назначения определяет общую очередь и реализует подписку сообщений на очередь.
Для кадра SEND пункт назначения определяет общую очередь только при первой отправке сообщения. Сообщение будет отправлено на обмен по умолчанию, а routingKey — .
3./amq/queue/<queueName>
В этом случае ни кадр SUBCRIBE, ни кадр SEND не будут генерировать очередь. Но если очередь не существует, кадр SUBCRIBE сообщит об ошибке.
Для кадра SUBCRIBE пункт назначения реализует подписку на сообщения в очереди .
Для кадров SEND сообщение будет отправлено непосредственно в очередь через обмен по умолчанию.
4./topic/<topicName>
Для фрейма SUBCRIBE пункт назначения создает автоматически удаляемую временную очередь и привязывает ее к обмену amq.topic в соответствии с ключом маршрутизации как
Для кадра SEND сообщение будет отправлено на обмен amq.topic с ключом маршрутизации
Рисунок 6. Сторона Java отправляет сообщение
Рисунок 7. Ответ в реальном времени на стороне JavaScript
Нажмите, чтобы увеличить изображение
резюме
В качестве нового поколения асинхронного метода связи клиент-сервер, предоставляемого HTML5, WebSocket может легко обеспечить двустороннюю связь между интерфейсом и сервером. Служба RabbitMQ предоставляет подключаемый модуль STOMP, который может соединяться с WebSocket, что позволяет не только реализовать активную отправку сообщений, но и реализовать асинхронную обработку сообщений. В традиционной веб-разработке существует много требований к изменению состояния в реальном времени, например, после того, как ресурс занят, его состояние в реальном времени необходимо транслировать, а с помощью решения, предложенного в этой статье, его можно легко запушить всем. слушающих клиентов. Поэтому в новом проекте разработки J2EE рекомендуется использовать решение, предложенное в этой статье, для замены исходного метода опроса ajax для обновления состояния.
Скачать ресурсы
- образец кода (stomp.zip | 59k)
связанная тема
- Ссылаться наВнедрение службы push-уведомлений в режиме реального времени на уровне миллионов в Worktileчтобы узнать о других решениях для отправки сообщений.
- Ссылаться наДлительное соединение и долгий опрос веб-общения, чтобы ознакомиться с основными принципами постоянных веб-соединений.
- Ссылаться наНачало работы с AMQP и RabbitMQчтобы узнать больше о протоколе AMQP.
- Ссылаться наОфициальный сайт RabbitMQчтобы узнать больше о сервере сообщений RabbitMQ.
- Ссылаться наComet: технология «Server Push», основанная на постоянном соединении HTTP, чтобы увидеть, как Comet реализует принудительную отправку длинных соединений.
- темы о технологиях с открытым исходным кодом на developerWorks: Найдите множество практических сведений, инструментов и обновлений проекта, которые помогут вам освоить и использовать технологии с открытым исходным кодом с продуктами IBM.