Основные базовые знания RabbitMQ

RabbitMQ

Многие известные интернет-компании сейчас используют RabbitMQ. Его производительность и масштабируемость заставляют многие крупные компании использовать его. Однако, если вы хотите в полной мере использовать RabbitMQ, вам необходимо освоить некоторые из его основных концепций. Вот что вам нужно, чтобы освоить RabbitMQ Необходимые знания

Производитель и потребитель

Производитель: программа, которая создает сообщение и отправляет его брокеру (RabbitMQ).

Потребитель: подключитесь к прокси-серверу и подпишитесь на очередь для получения сообщений

поток сообщений

Протокол AMQP предусматривает, что сообщения AMQP должны состоять из трех частей: обменов, очередей и привязок. Производитель отправляет сообщение на биржу, а отношения связывания между биржой и очередью определяют, как сообщение направляется в конкретную очередь и, наконец, принимается потребителем.

如图

Примечание:Сообщения не могут попасть в очередь напрямую

выключатель

Сообщение фактически доставляется на коммутатор, и конкретная маршрутизация в эту очередь выполняется коммутатором в соответствии с ключом маршрутизации.

  • Когда вы отправляете сообщение брокеру, даже если ключ маршрутизации пуст, RabbitMQ сопоставит его с используемым ключом маршрутизации. Если маршрутизируемое сообщение не соответствует ни одному шаблону привязки, оно попадет в черную дыру.

Переключатель играет роль среднего уровня между очередью и сообщением.С помощью переключателя мы можем добиться более гибких функций.В RabbitMQ есть три часто используемых типа переключателей:

  • прямой: если ключ маршрутизации совпадает, сообщение доставляется в соответствующую очередь
  • fanout: доставлять сообщения во все очереди, привязанные к текущему коммутатору
  • тема: Позволяет реализовать интересные сценарии обмена сообщениями, позволяя сообщениям из 5 разных источников попадать в одну и ту же очередь. Имя очереди тем содержит два специальных ключевых слова.
    • *можно заменить слово
    • #может заменить все слова

Можно понять, что директ — это 1v1, fanout принадлежит 1v, топик более гибкий, а 1v может быть произвольным.

image.png

веб хостинг

Каждый виртуальный хост (vhost) эквивалентен мини-версии сервера RabbitMQ со своими собственными очередями, переключателями и привязками, разрешениями... Это позволяет RabbitMQ обслуживать множество приложений, не конфликтуя друг с другом.

Виртуальный хост по умолчанию для rabbitMQ: "/" Обычно, когда мы создаем пользователя Rabbit, мы назначаем пользователю виртуальный хост.

Для работы с виртуальными хостами в дополнение к командной строке есть страница веб-управления.

#创建虚拟主机
rabbitmqctl add vhost [vhost_name]
#删除虚拟主机
rabbitmqctl delete vhost [vhost_name]
#列出虚拟主机
rabbitmqctl list_vhosts

image.png

стратегия доставки сообщений

По умолчанию очереди и обмены RabbitMQ исчезают после перезапуска сервера RabbitMQ из-за устойчивого свойства очередей и обменов, которое по умолчанию равно false.

Сообщение, которое можно восстановить после сбоя сервера AMQP, называется постоянным сообщением.Если вы хотите восстановиться после сбоя, сообщение должно

  • Режим доставки установлен на 2, чтобы пометить сообщение как постоянное
  • отправить на постоянный обмен
  • в постоянную очередь

Недостаток: производительность записи сообщений на диск намного хуже. если только особо критические сообщения не будут использовать

Ключевой API

Все вышеперечисленное является концептуальным содержанием. На самом деле, нам все еще нужно программировать для достижения нашей цели. Клиентский API RabbitMQ предоставляет множество функций. Глядя на код, мы можем понять его мощь.

перед основными шагамиНачало работы с RabbitMQКак уже упоминалось, класс Channel является ключевой частью: он содержит множество функций, которые нам нужны.

image.png

подтверждение сообщения

Генератор может добавлять события прослушивателя:

channel.addConfirmListener(new ConfirmListener() {
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("-------no ack!-----------");
			}
			
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("-------ack!-----------");
			}
		});

Потребитель может подтвердить статус сообщения:

public class MyConsumer extends DefaultConsumer {


	private Channel channel ;
	
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("body: " + new String(body));
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		if((Integer)properties.getHeaders().get("num") == 0) {
			channel.basicNack(envelope.getDeliveryTag(), false, true);
		} else {
			channel.basicAck(envelope.getDeliveryTag(), false);
		}
	}
}

Последний параметр channel.basicAck и basicNack указывает, будет ли сообщение повторно поставлено в очередь.

Прослушивание недоступных сообщений

Наш производитель сообщений отправляет сообщение в очередь, указав коммутатор и ключ маршрутизации, но иногда указанный ключ маршрутизации не существует или коммутатор не существует, тогда сообщение вернется, мы можем добавить прослушиватель возврата для достижения:

channel.addReturnListener(new ReturnListener() {
			@Override
			public void handleReturn(int replyCode, String replyText, String exchange,
					String routingKey, BasicProperties properties, byte[] body) throws IOException {
				
				System.err.println("---------handle  return----------");
				System.err.println("replyCode: " + replyCode);
				System.err.println("replyText: " + replyText);
				System.err.println("exchange: " + exchange);
				System.err.println("routingKey: " + routingKey);
				System.err.println("properties: " + properties);
				System.err.println("body: " + new String(body));
			}
		});
		

		channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

Обязательный параметр в basicPublish должен иметь значение true, чтобы он вступил в силу, иначе брокер удалит сообщение.

Ограничение тока на стороне потребителя

Предположим, что когда MQ-сервер накопил тысячи сообщений, потребитель в это время внезапно подключается, тогда огромное количество сообщений все проталкивается, но клиент не может обработать столько данных за один раз.

В случае высокого параллелизма мгновенно генерируемый трафик очень большой, а сообщение очень большое.Важная роль MQ заключается в ограничении тока, что и делает потребитель.

RabbitMQ предоставляет функцию QoS (Quality of Service), то есть, исходя из предпосылки неавтоматического подтверждения сообщений, новые сообщения не будут потребляться до тех пор, пока не будет использовано определенное количество сообщений.

// prefetchSize消息的限制大小,一般设置为0,在生产端限制
// prefetchCount 我们一次最多消费多少条消息,一般设置为1
// global,一般设置为false,在消费端进行限制
channel.basicQos(int prefetchSize, int prefetchCount, boolean global) 

// 使用
channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));    

Примечание:Если для параметра autoAck установлено значение false, вы должны подписать сообщение вручную.

Очередь недоставленных писем (DLX)

Когда сообщение становится недоставленным в очереди и нет потребителя для потребления, сообщение может быть повторно опубликовано в другой очереди, которая является очередью недоставленных сообщений.

Следующие условия вызывают попадание сообщений в очередь недоставленных сообщений:

  • Когда basic.reject/basic.nack и requeue имеют значение false (не возвращать в очередь), сообщение является мертвой буквой

  • Срок жизни сообщения истек

  • Очередь достигает максимальной длины

Очередь недоставленных сообщений — это тоже обычный Exchange, который ничем не отличается от обычного Exchange, но требуется небольшая операция.

Настройка очереди недоставленных сообщений включает в себя:

  • Установите Exchange (имя dlx.exchange необязательно), установите Queue (dlx.queue), установите RoutingKey (#)
  • Создайте нормальный обмен, очередь, привязку, просто добавьте параметр arguments.put("x-dead-letter-exchange","dlx.exchange")
      // 这就是一个普通的交换机 和 队列 以及路由
		String exchangeName = "test_dlx_exchange";
		String routingKey = "dlx.#";
		String queueName = "test_dlx_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		
		Map<String, Object> agruments = new HashMap<String, Object>();
		agruments.put("x-dead-letter-exchange", "dlx.exchange");
		//这个agruments属性,要设置到声明队列上
		channel.queueDeclare(queueName, true, false, false, agruments);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//要进行死信队列的声明:
		channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
		channel.queueDeclare("dlx.queue", true, false, false, null);
		channel.queueBind("dlx.queue", "dlx.exchange", "#");

В конце концов

Здесь мы в основном говорим о некоторых концепциях, которые часто используются при использовании RabbitMQ.Если вы понимаете концепции, вы не будете путаться при их применении. Затем перечислите несколько важных API-интерфейсов Java-клиента MQ.

Ссылаться на

  • "Битва RabbitMQ"
  • Промежуточное ПО сообщений RabbitMQ продолжает развиваться