Серия RabbitMQ (2) Глубокое понимание принципа работы и простое использование RabbitMQ

Java RabbitMQ

Введение в RabbitMQ

Прежде чем представить RabbitMQ, давайте представим MQ Что такое MQ?

Полное название MQ — Message Queue, что можно понимать как значение очереди сообщений, Проще говоря, сообщения доставляются в конвейере.

RabbitMQ — это служба очереди сообщений, реализующая расширенный протокол очереди сообщений AMQP (Advanced Message Queuing Protocol) на языке Erlang.

сцены, которые будут использоваться

Когда мы спешим покупать товары за считанные секунды, система напомнит нам подождать в очереди, а не застрять на странице или выдать пользователю ошибку, как несколько лет назад.

Этот вид расчетов по очереди использует механизм очереди сообщений, которые помещаются в канал для обработки расчетов по одному, а не внезапный наплыв крупномасштабных запросов в определенное время.Новое дополнение обрушит базу данных, поэтому RabbitMQ по существу играет роль Роль состоит в том, чтобы срезать пики и заполнять долины для сопровождения бизнеса.

Почему стоит выбрать RabbitMQ

Сегодня на рынке существует множество вариантов MQ, таких как ActiveMQ, ZeroMQ, Appche Qpid.Вопрос в том, почему стоит выбрать RabbitMQ?

  1. Помимо Qpid, RabbitMQ — единственный сервер сообщений, реализующий стандарт AMQP;
  2. Надежность, поддержка сохраняемости RabbitMQ обеспечивает стабильность сообщений;
  3. Высокий уровень параллелизма, RabbitMQ использует язык разработки Erlang.Эрланг — это язык, разработанный для телефонных коммутаторов.Он создан с функциями высокого параллелизма и высокой доступности;
  4. Развертывание кластера простое, и именно Erlang делает развертывание кластера RabbitMQ очень простым;
  5. Сообщество очень активно.Согласно онлайн-информации, RabbitMQ также является первым выбором;

Рабочий механизм

Производители, потребители и агенты

Прежде чем понять передачу сообщений, мы должны сначала понять 3 концепции: производитель, потребитель и агент.

Производитель: создатель сообщения, ответственный за создание и отправку данных на сервер сообщений;

Потребитель: получатель сообщения, используемый для обработки данных и подтверждения сообщения;

Брокер: Это сам RabbitMQ, который используется в роли "курьера", он не выдает сообщения сам по себе, а только играет роль "курьера".

принцип отправки сообщения

Прежде всего, вам нужно подключиться к Rabbit, чтобы публиковать и получать сообщения, так как же вы подключаетесь и отправляете сообщения?

Между вашим приложением и сервером Rabbit создается TCP-соединение. После того, как TCP открыт и аутентифицирован, аутентификация представляет собой информацию о соединении с сервером Rabbit, а также имя пользователя и пароль, которые вы отправили перед попыткой подключения к Rabbit, немного похоже на программу, подключающуюся к база данных с использованием Java. Существует два метода аутентификации соединения, которые будут подробно описаны далее в коде.После прохождения аутентификации между вашим приложением и Rabbit создается канал AMQP (Channel).

Каналы — это виртуальные соединения, созданные на "реальном" TCP. Команды AMQP отправляются через каналы. Каждый канал будет иметь уникальный идентификатор. Публикация сообщений, подписка на очереди или ввод сообщений осуществляется через каналы.

Почему бы просто не отправить команду напрямую через TCP?

Для операционной системы создание и уничтожение сеансов TCP является очень дорогостоящим. Операционная система также ограничена в количестве TCP, которые она может создавать в секунду, поэтому она может быстро столкнуться с узким местом в системе.

Если мы используем TCP-соединение для каждого запроса, это не только отвечает требованиям производительности, но и обеспечивает конфиденциальность каждого соединения, поэтому вводится понятие канала.

Кролик, ты должен знать

Чтобы по-настоящему понять Кролика, вам необходимо знать некоторые термины.

В том числе: ConnectionFactory (диспетчер соединений), Channel (канал), Exchange (обмен), Queue (очередь), RoutingKey (ключ маршрутизации), BindingKey (ключ привязки).

**ConnectionFactory (менеджер соединений): ** Менеджер, устанавливающий соединение между приложением и Кроликом, используется в программном коде;

**Канал (канал): **Канал, используемый для отправки сообщения;

**Exchange (Обмен): ** используется для приема и распространения сообщений;

Очередь: используется для хранения сообщений производителя;

Ключ маршрутизации: используется для распространения данных генератора на биржу;

BindingKey: используется для привязки сообщения обмена к очереди;

Увидев приведенное выше объяснение, наиболее сложно понять ключи маршрутизации и привязки ключей, а затем, как они работают, см. следующий рисунок:

Мы поговорим о других переключателях позже.

сохранение сообщения

Очереди и обмены Rabbit имеют невыразимый секрет, то есть перезапуск сервера по умолчанию приведет к потере сообщений, так как же сделать так, чтобы Rabbit не потерялся при перезапуске? Ответ — постоянство сообщения.

Когда вы отправляете сообщение на сервер Rabbit, вам нужно выбрать, хотите ли вы сохранить или нет, но это не гарантирует, что Rabbit сможет восстановиться после сбоя.Для восстановления сообщений Rabbit должны быть выполнены три условия:

  1. При доставке сообщения для параметра durable установлено значение true, а сообщение является постоянным Код: channel.queueDeclare(x, true, false, false, null), для параметра 2 установлено значение true для сохранения;
  2. Установите для режима доставки deliveryMode значение 2 (постоянный), код: channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x), параметр 3 настроен на сохранение простого текста на диск;
  3. Сообщение поступило на обмен постоянством;
  4. Сообщение достигло постоянной очереди;

Как работает настойчивость

Rabbit запишет ваше постоянное сообщение в постоянный файл журнала на диске.После того, как сообщение будет использовано, Rabbit пометит сообщение как ожидающее сборки мусора.

Недостатки настойчивости

Преимущество сохранения сообщений очевидно, но недостаток также очевиден, и это производительность, потому что запись на диск намного менее производительна, чем запись в память, что снижает пропускную способность сервера, хотя ситуацию можно облегчить с помощью SSD-дисков. производительность Rabbit по-прежнему хромает, когда на диск записываются тысячи сообщений.

Поэтому пользователи должны выбирать метод, который подходит им в зависимости от их собственной ситуации.

веб хостинг

Каждый Rabbit может создавать много виртуальных хостов, которые мы называем виртуальными хостами.Каждый виртуальный хост на самом деле является мини-версией RabbitMQ со своими собственными очередями, переключателями и привязками, а также собственным механизмом разрешений.

функции виртуального хоста

  1. Виртуальным хостом RabbitMQ по умолчанию является "/" из коробки;

  2. Несколько виртуальных хостов изолированы, несколько виртуальных хостов не могут обмениваться данными, и нет необходимости беспокоиться о конфликтах имен (очереди, обмены и привязки), достигая многоуровневого разделения;

  3. Вы должны указать vhost при создании пользователя;

виртуальная операция

Его можно создать с помощью команды утилиты rabbitmqctl:

rabbitmqctl add_vhost[vhost_name]

Удалить виртуальный хост:

rabbitmqctl delete_vhost[vhost_name]

Посмотреть все виртуальные хосты:

rabbitmqctl list_vhosts

Строительство окружающей среды

В предыдущей статье мы представили шаги по сборке RabbitMQ в Ubuntu:Среда RabbitMQ построена на Ubuntu

Если вы устанавливаете на Windows 10, будет проще, сначала укажите адрес загрузки:

Ссылка на сетевой диск Erlang/Rabbit Server Baidu:disk.baidu.com/yes/1TN K DV-Z U…Пароль: wct9

Конечно, вы также можете зайти на официальный сайт Erlang и Rabbit, но скорость относительно невысокая. My Baidu Cloud Rabbit последняя версия: 3.7.6, версия Erlang: 20.2, Примечание: Не скачивайте последнюю версию Erlang, есть проблема с открытием расширения на Windows 10, его невозможно открыть.

  1. установить Эрланг;

  2. Установить сервер кроликов;

  3. Перейдите в каталог установки \sbin и используйте команду «rabbitmq-plugins enable rabbitmq_management», чтобы запустить плагин веб-управления;

  4. Перезапустите службу Rabbit;

Используйте: http://localhost:15672 для тестирования, учетная запись по умолчанию: guest, а пароль: guest

Яма многократной установки Rabbit Server

Если вы не первый раз устанавливаете Rabbit Server на Windows, обязательно удалите Rabbit и Erlang и найдите реестр: HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv Удалите все пункты под ним.

В противном случае возникнет ситуация, когда Rabbit не запустится после установки.По идее, порядок удаления тоже Rabbit first в Erlang.

Код

Реализована java версия, с помощью проекта maven, творение можно посмотреть:Настройки кряка MyEclipse2017 и построение проекта maven

После успешного создания проекта добавьте jar-пакет Rabbit Client, который нужно только настроить в pom.xml следующим образом:

 <dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.2.0</version>
</dependency>

Код реализации Java разделен на два класса: первый предназначен для создания подключения Rabbit, а второй — класс приложения для публикации и потребления сообщений самым простым способом.

Подключение кролика, два способа:

метод первый:

public static Connection GetRabbitConnection() {
	ConnectionFactory factory = new ConnectionFactory();
	factory.setUsername(Config.UserName);
	factory.setPassword(Config.Password);
	factory.setVirtualHost(Config.VHost);
	factory.setHost(Config.Host);
	factory.setPort(Config.Port);
	Connection conn = null;
	try {
		conn = factory.newConnection();
	} catch (Exception e) {
		e.printStackTrace();
	}
	return conn;
}

Способ второй:

public static Connection GetRabbitConnection2() {
	ConnectionFactory factory = new ConnectionFactory();
	// 连接格式:amqp://userName:password@hostName:portNumber/virtualHost
	String uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port,
			Config.VHost);
	Connection conn = null;
	try {
		factory.setUri(uri);
		factory.setVirtualHost(Config.VHost);
		conn = factory.newConnection();
	} catch (Exception e) {
		e.printStackTrace();
	}
	return conn;
}

Часть 2. Классы приложений, публикация и использование сообщений самым простым способом

public static void main(String[] args) {
	Publisher(); // 推送消息

	Consumer(); // 消费消息
}

/**
 * 推送消息
 */
public static void Publisher() {
	// 创建一个连接
	Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
	if (conn != null) {
		try {
			// 创建通道
			Channel channel = conn.createChannel();
			// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
			channel.queueDeclare(Config.QueueName, false, false, false, null);
			String content = String.format("当前时间:%s", new Date().getTime());
			// 发送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-routing headers,此属性为MessageProperties.PERSISTENT_TEXT_PLAIN用于设置纯文本消息存储到硬盘;参数四:消息主体】
			channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8"));
			System.out.println("已发送消息:" + content);
			// 关闭连接
			channel.close();
			conn.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

/**
 * 消费消息
 */
public static void Consumer() {
	// 创建一个连接
	Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
	if (conn != null) {
		try {
			// 创建通道
			Channel channel = conn.createChannel();
			// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
			channel.queueDeclare(Config.QueueName, false, false, false, null);

			// 创建订阅器,并接受消息
			channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
						byte[] body) throws IOException {
					String routingKey = envelope.getRoutingKey(); // 队列名称
					String contentType = properties.getContentType(); // 内容类型
					String content = new String(body, "utf-8"); // 消息正文
					System.out.println("消息正文:" + content);
					channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于index的消息】
				}
			});

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

К коду написаны очень подробные комментарии, и я не буду вводить его здесь слишком подробно.

Эффект выполнения показан на рисунке: