и т.д! Мы наступили на эти две ямы Spring-RabbitMQ для вас

задняя часть Python Spring RabbitMQ
и т.д! Мы наступили на эти две ямы Spring-RabbitMQ для вас

Ян Джи. Он хорошо разбирается в Python и JAVA. В настоящее время он является старшим инженером Python в команде логистики Ele.me. Он отвечает за изменение основных связей и сосредоточение внимания на системном бизнес-анализе и построении стабильности.

В прошлый раз мы поделились нашей командойОпыт пребывания в яме Java-приложений Докеризованное развертывание GC становится длиннее, и обнаружил, что это действительно помогло многим учащимся решать одни и те же задачи в своих проектах. Это действительно большой стимул для нас, поэтому мы решили время от времени делиться опытом нашей команды по преодолению пит-шагов.Сегодня мы делимся двумя ямами потребителя Spring-RabbitMQ, надеясь помочь всем.

Яма № 1: потребительская анимация, отсутствие реальной покупательной способности

задний план

версия spring-rabbit изменена на 1.6.2.RELEASE

Феномен

Количество потребителей в норме, пульт управления mqprefetchПараметр всегда равен 1, сообщение не может быть подтверждено нормально, очередь находится в приостановленном состоянии, Система сообщает об исключенииorg.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException

效果图

[2018-09-09 10:31:27.27]RuntimeException-org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException [ ERROR] [ Elog ]
Execution of Rabbit message listener failed. org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpIllegalStateException: No default listener method specified: Either specify a non-null value for the 'defaultListenerMethod' property or override the 'getListenerMethodName' method.
	at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:291)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
	... 10 more

Ненормальная причина

ListenerExecutionFailedException

в соответствии софициальная документацияЭто означает, что у потребителя есть исключение при потреблении сообщения.По умолчанию сообщение будет отклонено и возвращено в очередь, но если исключение произойдет до того, как исключение достигнет пользовательского кода, сообщение всегда будет доставлено.

官方文档

org.springframework.amqp.AmqpIllegalStateException

Это исключение выбрасывается, и RabbitMQ не может получить ответ на сообщение и всегда будет находиться в состоянии ожидания.

No default listener method specified: Either specify a non-null value ...

потребитель не найденOnMessageметод, догадался, что это была ошибка загрузки класса, поэтому я снова начал проверять определение потребительского класса и, наконец, обнаружил, чтоrabbit:listenerID реального потребителя конфликтует с ID реального потребителя, что делает реальный bean-потребитель недействительным и не может найти способ принять сообщение Rabbitmq остановит потребление, когда во время связи с клиентом возникнет исключение.

<bean id="consumer_1" class="me.ele.Consumer1"/>
<rabbit:listener-container connection-factory="galaxyConnectionFactory" acknowledge="manual" concurrency="16" >
        <rabbit:listener queues="queue_1" ref="consumer_1" id="consumer_1" />
    </rabbit:listener-container>

Решение

Удалитьrabbit:listenerидентификатор атрибута

Яма №2: количество потребителей ненормальное

задний план

Исходный сервис имеет 6 очередей, по 10 потребителей в каждой.task-executorПул потоков был установлен на 90. Позже из-за увеличения объема данных было обнаружено, что мощности потребления недостаточно, было принято решение увеличить количество потребителей, настроить параллелизм слушателя-контейнера на 20. , и перезапустите сервер.

Феномен

Количество потребителей в некоторых очередях недостаточно, а отсутствующие элементы всегда являются очередями, объявленными позже в xml.

效果图

Аномальное местоположение

Параметр резервного параллелизма равен 10, а аномальное исчезновение Наблюдайте за аномальной сценой и обнаружите, что очередь исчезновений потребителей имеет последовательный порядок, а верхний предел количества потребителей равен 90. Предполагается, что параметр ограничен. параметры конфигурации следующим образом:

    <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
      id="taskExecutor">
        <!--核心线程数 -->
        <property name="corePoolSize" value="16"/>
        <!--最大线程数 -->
        <property name="maxPoolSize" value="16"/>
        <property name="queueCapacity" value="500"/>
        <!--线程池维护线程所允许的空闲时间 -->
        <property name="keepAliveSeconds" value="60"/>
        <!--线程池对拒绝任务(无线程可用)的处理策略 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
        <property name="WaitForTasksToCompleteOnShutdown" value="true"/>
    </bean>
    
    <rabbit:listener-container connection-factory="monitorConnectionFactory" 
            acknowledge="manual" task-executor="taskExecutor" prefetch="10" concurrency="20">
        <rabbit:listener queues="queue_1" ref="consumer_1"/>
        <rabbit:listener queues="queue_2" ref="consumer_2"/>
        <rabbit:listener queues="queue_3" ref="consumer_3"/>
        <rabbit:listener queues="queue_4" ref="consumer_4"/>
        <rabbit:listener queues="queue_5" ref="consumer_5"/>
        <rabbit:listener queues="queue_6" ref="consumer_6"/>
    </rabbit:listener-container>
    

Ненормальная причина

После исследования и сравнения установлено, что значение верхнего предела совпадает с параметром ThreadPoolTaskExecutor исполнителя задачи.corePoolSize,maxPoolSizeЭто очень близко.После запроса соответствующих данных обнаруживается, что потребители нескольких очередей будут совместно использовать количество пулов потоков taskExecutor.Если количество пулов потоков недостаточно, потребители не могут быть созданы. позже узналофициальная документацияЭто было ясно сказано.

官方文档

Решение

Увеличить исполнителя задачcorePoolSizeиmaxPoolSizeЗначение 200, перезапустить службу, решить.

Суммировать

  1. В управлении контейнерами Spring идентификатор — это ссылка на объект, который должен иметь только один неповторяющийся идентификатор, включая bean-компоненты и другие пользовательские теги (например,rabbit:listener), даже в разных файлах нужно внимание.
  2. spring-rabbit task-exeutor используется по умолчаниюSimpleAsyncTaskExecutorКак асинхронный пул потоков, каждый раз, когда запрашивается новый поток, не устанавливается максимальное количество потоков.Это не настоящий пул потоков, этот класс не использует потоки повторно, и для каждого вызова создается новый поток. Вместо этого можно использоватьThreadPoolTaskExecutorАльтернативный вариант, но необходимый при наличии нескольких очередей.ThreadPoolTaskExecutorПула потоков достаточно, особенно для нескольких контейнеров-слушателей, совместно использующих исполнителя задач.




Недостаточно читать блоги?

Вы можете отсканировать QR-код, добавив группового помощника, присоединиться к группе общения, обсудить технические вопросы, связанные с ведением блога, и больше взаимодействовать с блогерами.

По вопросам перепечатки блога, офлайн-мероприятий и сотрудничества обращайтесь по адресуshadowfly_zyl@hotmail.comобщаться