1 Обзор
в этой статьеSpring Boot Series Thirteen Spring Boot интегрирует RabbitMQВ этой статье представим, как Spring Boot использует Rabbitmq, эта статья из исходного кода для анализа того, как Spring Boot Integrate Integrate Rabbitmq.
2. Вход
В spring.factories в spring-boot-autoconfigure.jar есть следующее определение, которое означает, что при запуске spring будет выполнена инициализация RabbitAutoConfiguration
…
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration,\
…
3. RabbitProperties
файлы свойств application_*.yml
spring:
# 配置rabbitMQspring:
rabbitmq:
host: 10.240.80.134
username: spring-boot
password: spring-boot
virtual-host: spring-boot-vhost
Приведенный выше файл свойств будет внедрен в свойство RabbitProperties.
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
…
}
4. RabbitAutoConfiguration
4.1 Анализ аннотаций по классам:
Это класс конфигурации. Упомянутый выше объект RabbitProperties будет инициализирован при запуске, а затем он представит другой класс конфигурации, RabbitAnnotationDrivenConfiguration. Этот класс конфигурации связан с мониторингом сообщений. Мы представим этот класс позже. Есть 3 внутренних класса, и все они являются классом конфигурации, этот класс конфигурации инициализирует класс, требуемый RabbitMQ, в соответствии с условиями
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
// 会初始化RabbitProperties.class
@EnableConfigurationProperties(RabbitProperties.class)
// 引入@Configuration类RabbitAnnotationDrivenConfiguration
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
…
}
4.2 Внутренний класс RabbitConnectionFactoryCreator
Внутренний класс RabbitConnectoreForceReatorCreator будет инициализировать экземпляр CachingConnectionFactory (это подкласс соединения) в соответствии с параметрами, настроенными RabbitProperties, который является соединительным пулом, соединяющим Rabbitmq.
Экземпляр CachingConnectionFactory является официальным пакетом com.rabbitmq.client.ConnectionFactory и com.rabbitmq.client.Channel для RabbitMQ для кэширования этих двух ресурсов. CachingConnectionFactory имеет два режима кэширования.
1. Если выбран режим кэширования CacheMode#CHANNEL, когда мы вызываем метод createConnection(), каждый раз возвращается одно и то же соединение. По умолчанию создается только одно соединение и создается только один канал (можно создать и кэшировать несколько каналов, настроив количество создаваемых каналов). То есть можно создать несколько каналов, но все каналы используют одно и то же соединение.
2. Если вы выберете режим кэширования CacheMode#CONNECTION, вы сможете одновременно настроить количество создаваемых подключений и данные канала. при звонке
При createConnection() доступное Connection берется из кеша, если его нет и количество созданных подключений не достигает верхнего предела, создается новое Connection. Аналогично Канал
@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {
@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
throws Exception {
// 根据RabbitProperties 配置RabbitMQ的连接工厂类
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
if (config.determineHost() != null) {
factory.setHost(config.determineHost());
}
…
factory.afterPropertiesSet();
// 连接缓存类
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
factory.getObject());
connectionFactory.setAddresses(config.determineAddresses());
connectionFactory.setPublisherConfirms(config.isPublisherConfirms());
connectionFactory.setPublisherReturns(config.isPublisherReturns());
…
return connectionFactory;
}
}
4.3 Внутренний класс RabbitTemplateConfiguration
Внутренний класс RabbitTemplateConfiguration присваивает параметры, настроенные RabbitProperties и MessageConverter, соответствующим переменным-членам класса через конструктор класса, а затем создает RabbitTemplate и RabbitAdmin в методе rabbitTemplate() в соответствии с экземпляром CachingConnectionFactory, созданным RabbitConnectionFactoryCreator.
@Configuration
// 引入RabbitConnectionFactoryCreator
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {
private final ObjectProvider<MessageConverter> messageConverter;
private final RabbitProperties properties;
// 注入MessageConverter和RabbitProperties
public RabbitTemplateConfiguration(
ObjectProvider<MessageConverter> messageConverter,
RabbitProperties properties) {
this.messageConverter = messageConverter;
this.properties = properties;
}
// 初始化RabbitTemplate
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitTemplate.class)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
// 创建RabbitTemplate
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = this.messageConverter.getIfUnique();
if (messageConverter != null) {
// 配置MessageConverter
rabbitTemplate.setMessageConverter(messageConverter);
}
// 其它参数配置略
…
return rabbitTemplate;
}
// 初始化AmqpAdmin
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean(AmqpAdmin.class)
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
// 创建RabbitAdmin
return new RabbitAdmin(connectionFactory);
}
}
4.4 Внутренние классы конфигурации :. MessagingTemPlateconFiguration
Класс внутренней конфигурации: MessagingTemplateConfiguration Внедряет экземпляр RabbitTemplate, созданный выше, и создает RabbitMGingTempla методом RabbitMPLateConfiguration().
@Configuration
@ConditionalOnClass(RabbitMessagingTemplate.class)
@ConditionalOnMissingBean(RabbitMessagingTemplate.class)
// 引入RabbitTemplateConfiguration配置类
@Import(RabbitTemplateConfiguration.class)
protected static class MessagingTemplateConfiguration {
// 生成实例RabbitMessagingTemplate, 其中RabbitTemplate 由RabbitTemplateConfiguration实例化
@Bean
@ConditionalOnSingleCandidate(RabbitTemplate.class)
public RabbitMessagingTemplate rabbitMessagingTemplate(
RabbitTemplate rabbitTemplate) {
return new RabbitMessagingTemplate(rabbitTemplate);
}
}
Инициализация bean-компонента, связанного с отправителем RabbitMQ, завершена с помощью приведенной выше конфигурации, мы можем использовать RabbitTemplate и RabbitAdmin для отправки сообщений. Если вы хотите прослушивать сообщения RabbitMQ, вам также потребуется следующая конфигурация, которая является более сложной.
5. RabbitAnnotationDrivenConfiguration
Этот класс представлен в RabbitAutoConfiguration, который создает bean-компоненты, связанные с прослушиванием сообщений. Разберем этот класс подробно.
5.1 Конструктор класса:
Для входящего мониторинга требуются экземпляры MessageConverter, экземпляры MessageRecoverer и экземпляры RabbitProperties в качестве переменных-членов класса.
@Configuration
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {
private final ObjectProvider<MessageConverter> messageConverter;
private final ObjectProvider<MessageRecoverer> messageRecoverer;
private final RabbitProperties properties;
RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<MessageRecoverer> messageRecoverer,
RabbitProperties properties) {
this.messageConverter = messageConverter;
this.messageRecoverer = messageRecoverer;
this.properties = properties;
}
…
}
5.2. Метод RabbitListenerContainerFactoryConfigurer() класса
Создайте объект SimpleRabbitListenerContainerFactoryConfigurer, который сохраняет экземпляр MessageConverter, экземпляр MessageRecoverer и экземпляр RabbitProperties, необходимые для создания RabbitListenerContainer.
// 实例SimpleRabbitListenerContainerFactoryConfigurer 对象,设置MessageConverter、MessageRecovere、RabbitMQ的属性
@Bean
@ConditionalOnMissingBean
public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
configurer.setRabbitProperties(this.properties);
return configurer;
}
5.3. Метод rabbitListenerContainerFactory() в классе
Создайте экземпляр SimpleRabbitListenerContainerFactory (подкласс RabbitListenerContainerFactory), где SimpleRabbitListenerContainerFactoryConfigurer происходит из приведенного ниже метода, а ConnectionFactory — из RabbitAutoConfiguration, описанного выше.
@Bean
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
5.4. Запустите @EnableRabbit
Этот внутренний класс в основном зависит от его аннотации @EnableRabbit, которая использует параметр RabbitListenerContainer, создает другие связанные экземпляры Bean и прослушивает сообщения. В следующем разделе подробно описывается @EnableRabbit.
@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration {
}
6. @EnableRabbit
Ввести класс конфигурации RabbitBootstrapConfiguration
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 引入配置类RabbitBootstrapConfiguration
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}
7. RabbitBootstrapConfiguration
Создайте rabritListenerannationationBeanPostProcessor и rabbitListenerendPointegistry в этом классе конфигурации.
@Configuration
public class RabbitBootstrapConfiguration {
// 创建RabbitListenerAnnotationBeanPostProcessor ,@RabbitListener+@RabbitHandler注解的方法,当收到监听消息分发到这些方法进行处理
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
}
// 创建RabbitListenerEndpointRegistry,供监听节点的注册
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
}
8. RabbitListenerAnnotationBeanPostProcessor
Унаследован BeanPostProcessor, после того как Spring создает объект, перехватываются все аннотации @ Rabbitner + @ RabbitLer
8.1 Метод afterSingletonInstantiated() класса
При создании экземпляра класса выполняется инициализация, важные операции
1. Настройте получение экземпляра RabbitListenerEndpointRegistry и установите экземпляр RabbitListenerEndpointRegistrar.
2. Задайте для имени containerFactoryBeanName значение rabbitListenerContainerFactory в RabbitListenerEndpointRegistrar.
3. Вызовите RabbitListenerEndpointRegistrar.afterPropertiesSet() для инициализации, содержание этого метода будет представлено позже.
// 创建实例
private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
@Override
public void afterSingletonsInstantiated() {
…
// 设置获取RabbitListenerEndpointRegistry实例,并设置实例到RabbitListenerEndpointRegistrar中
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
// 在RabbitListenerEndpointRegistrar中设置containerFactoryBeanName名称为rabbitListenerContainerFactory
if (this.containerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
}
// Actually register all listeners,初始化RabbitListenerEndpointRegistrar
this.registrar.afterPropertiesSet();
}
8.2. Метод postProcessAfterInitialization()
После инициализации объекта будет выполнен метод postProcessAfterInitialization(), который перехватит все методы, аннотированные @RabbitListener и @RabbitHandler.
1. Если метод @RabbitListener аннотирован, будет вызван метод processAmqpListener(), а вызывающий метод будет инкапсулирован с помощью MethodRabbitListenerEndpoint.
2. Если @RabbitListener аннотирован в классе, и класс имеет методы, аннотированные @RabbitHandler, будет вызываться processMultiMethodListeners(), и в это время будет использоваться MultiMethodRabbitListenerEndpoint.
инкапсулировать вызывающий метод
И MethodRabbitListenerEndpoint, и MultiMethodRabbitListenerEndpoint являются подклассами MethodRabbitListenerEndpoint.
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
…
// 处理所有被@RabbitListener注解的方法
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
// 处理所有被@RabbitHandler注解的方法
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}
private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods,
Object bean, String beanName) {
…
for (RabbitListener classLevelListener : classLevelListeners) {
// 创建处理有多个监听方法的类
MultiMethodRabbitListenerEndpoint endpoint = new MultiMethodRabbitListenerEndpoint(checkedMethods, bean);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
}
}
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
// 创建处理单个监听方法的类
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
8.3. Метод processListener()
Оба метода processMultiMethodListeners() и processMultiMethodListeners() войдут в processListener(), здесь сделайте следующее:
1. Первый шаг — установить очередь, приоритет и монопольное ожидание для мониторинга с помощью MethodRabbitListenerEndpoint в соответствии с параметрами конфигурации @RabbitListener в методе прослушивания.
2. Второй шаг — получить экземпляр rabbitAdmin и установить для него значение MethodRabbitListenerEndpoint.
середина
3. Шаг 3 Получите RabbitListenerContainerFactory в соответствии со значением, настроенным с помощью containerFactory() @RabbitListener, значение по умолчанию пусто.
4. Четвертый шаг вызовет класс инструментов RabbitListenerEndpointRegistrar для регистрации RabbitListenerEndpoint в RabbitListenerEndpointRegistry. Этот класс RabbitListenerEndpointRegistrar будет объяснен позже.
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object adminTarget, String beanName) {
// 这里有设置MethodRabbitListenerEndpoint endpoint的要监听的队列、优先级、排他性等待
…
// 获取rabbitAdmin实例,并设置到MethodRabbitListenerEndpoint 中
String rabbitAdmin = resolve(rabbitListener.admin());
if (StringUtils.hasText(rabbitAdmin)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
try {
endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
rabbitAdmin + "' was found in the application context", ex);
}
}
// 根据@RabbitListener的containerFactory()配置的值获取RabbitListenerContainerFactory
RabbitListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "] for bean " + beanName + ", no " +
RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
containerFactoryBeanName + "' was found in the application context", ex);
}
}
// 调用工具类RabbitListenerEndpointRegistrar将RabbitListenerEndpoint注册到RabbitListenerEndpointRegistry。RabbitListenerEndpointRegistra下面会解释这个类
this.registrar.registerEndpoint(endpoint, factory);
}
9. RabbitListenerEndpointRegistrar
Зарегистрируйтесь над классом RabbitListenerEndpoint, чтобы работать с RabbitListenerEndpointRegistry.
9.1. Операция инициализации APTOPROPERTISTISESET ()
Операция инициализации, этот метод вызывается методом afterSingletonsInstantiated() RabbitListenerAnnotationBeanPostProcessor для запуска инициализации. Основное содержание следующее:
- Шаг 1: зациклить AmqpListenerEndpointDescriptor, зарегистрировать экземпляры MethodRabbitListenerEndpoint и RabbitListenerContainerFactory для вызова RabbitListenerEndpointRegistry в этом классе
- Шаг 2: Установите startImmediately в true, указывая, что
private RabbitListenerEndpointRegistry endpointRegistry;...
// 初始化操作
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {、
# AmqpListenerEndpointDescriptor是保存RabbitListenerEndpoint和RabbitListenerContainerFactory实例
for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
# 将MethodRabbitListenerEndpoint endpoint + RabbitListenerContainerFactory注册到endpointRegistry上
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
# 设置值为true
this.startImmediately = true; // trigger immediate startup
}
}
# 获取RabbitListenerContainerFactory实例
/**
如果endpoint 结点注册是有RabbitListenerContainerFactory,则使用这个值(实际来自@RabbitListener的containerFactory()值)。
如果没有则使用默认的RabbitListenerContainerFactory,如果没有,则从spring容器中获取名称为containerFactoryBeanName值的RabbitListenerContainerFactory对象并设置为默认值
在之前我们已经知道这个值被RabbitListenerAnnotationBeanPostProcessor在afterSingletonsInstantiated()中设置为rabbitListenerContainerFactory
**/
private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
if (descriptor.containerFactory != null) {
return descriptor.containerFactory;
}
else if (this.containerFactory != null) {
return this.containerFactory;
}
else if (this.containerFactoryBeanName != null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
this.containerFactory = this.beanFactory.getBean(
this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
return this.containerFactory; // Consider changing this if live change of the factory is required
}
else {
throw new IllegalStateException("Could not resolve the " +
RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
descriptor.endpoint + "] no factory was given and no default is set.");
}
}
9.2 Метод registerEndpoint()
Этот метод вызывается методом processListener() компонента RabbitListenerAnnotationBeanPostProcessor. Основное содержание таково: В предыдущем анализе мы изначально установили startImmediately=true, а теперь анализируем только истинный случай. На этом этапе вызовите метод registerListenerContainer() RabbitListenerEndpointRegistry и передайте параметр startImmediately=true
// 在 RabbitListenerAnnotationBeanPostProcessor调用此方法进行注册
private RabbitListenerEndpointRegistry endpointRegistry;
public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
…
// Factory may be null, we defer the resolution right before actually creating the container
AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) { // Register and start immediately
# 在之前的分析中我们发现startImmediately=true,现在只分析true情况
# 调用endpointRegistry的方法注册到上面this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
10. RabbitListenerEndpointRegistry
Функция: создать экземпляр MessageListenerContainer для зарегистрированной точки RabbitListenerEndpoint, выполнить инициализацию MessageListenerContainer и, наконец, выполнить метод start() этого объекта. Этот класс также управляет жизненным циклом прослушивающего контейнера.
10.1 Метод registerListenerContainer():
- Шаг 1. Создайте прослушиватель MessageListenerContainer для RabbitListenerEndpoint на основе RabbitListenerContainerFactory.
- Шаг 2: Из вышеизложенного мы знаем, что наше значение startImmediately равно true, и startIfNecessary() будет выполняться для запуска MessageListenerContainer.
// 设置RabbitListenerEndpoint 创建一个监听容器
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
# 为endpoint根据factory创建一个监听器,方法详细见下边
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
# Group??
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (startImmediately) {
# 启动容器MessageListenerContainer,已知传入的值为true
startIfNecessary(container);
}
}
}
10.2 Метод createListenerContainer()
- Первый шаг: создайте MessageListenerContainer с помощью метода createListenerContainer() RabbitListenerContainerFactory, сконфигурированного RabbitListenerEndpoint. По умолчанию используется SimpleRabbitListenerContainerFactory для создания экземпляра SimpleMessageListenerContainer.
- Шаг второй: Потому что MessagelistenerContainer наследует инициализируйтеБеобразные, все будут выполнять инициализацию SIMPLEMESSAGELISTERERCONTAINER. Мы вернемся к инициализации класса
# 为endpoint根据factory创建一个监听器
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
RabbitListenerContainerFactory<?> factory) {
# 使用endpoint配置的RabbitListenerContainerFactory创建MessageListenerContainer 。默认是使用SimpleRabbitListenerContainerFactory创建的实例为SimpleMessageListenerContainer
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
# 初始化容器
if (listenerContainer instanceof InitializingBean) {
try {
((InitializingBean) listenerContainer).afterPropertiesSet();
}
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize message listener container", ex);
}
}
…
return listenerContainer;
}
10.3 Запуск контейнера
Вызовите метод start() класса MessageListenerContainer, чтобы запустить контейнер.
// 启动容器
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
// 容器默认值为自动启动,所有会执行一下操作
listenerContainer.start();
}
}
11. класс SimpleRabbitListenerContainerFactory
Этот класс является подклассом RabbitListenerContainerFactory.
11.1. Метод createContainerInstance()
Создать экземпляр SimpleMessageListenerContainer
// 创建SimpleMessageListenerContainer实例
@Override
protected SimpleMessageListenerContainer createContainerInstance() {
return new SimpleMessageListenerContainer();
}
11.2 Методы createContainerInstance()
Этот метод: вызовите createContainerInstance(), чтобы создать экземпляр SimpleMessageListenerContainer, и установите параметры, содержащиеся в RabbitListenerEndpoint, для созданного экземпляра, и, наконец, вызовите initializeContainer(), чтобы инициализировать экземпляр SimpleMessageListenerContainer.
@Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
// 创建实例
C instance = createContainerInstance();
// 以下设置容器的初始值
if (this.connectionFactory != null) {
instance.setConnectionFactory(this.connectionFactory);
}
// 其他根据本对象的成员变量配置RabbitListenerEndpoint 代码略
…
instance.setListenerId(endpoint.getId());
endpoint.setupListenerContainer(instance);
// 初始化容器
initializeContainer(instance);
return instance;
}
11.3. Метод initializeContainer()
Инициализируйте только что созданный экземпляр SimpleMessageListenerContainer и настройте переменные-члены в этом объекте на экземпляр SimpleMessageListenerContainer.
// 根据容器工厂初始化容器值
@Override
protected void initializeContainer(SimpleMessageListenerContainer instance) {
super.initializeContainer(instance);
if (this.applicationContext != null) {
instance.setApplicationContext(this.applicationContext);
}
if (this.taskExecutor != null) {
instance.setTaskExecutor(this.taskExecutor);
}
if (this.transactionManager != null) {
instance.setTransactionManager(this.transactionManager);
}
if (this.txSize != null) {
instance.setTxSize(this.txSize);
}
// 其他根据本对象的成员变量配置SimpleMessageListenerContainer 代码略
…
}
12. SimpleMessageListenerContainer
Метод start() класса SimpleMessageListenerContainer вызывается в RabbitListenerEndpointRegistry.
12.1 Метод start()
Если не инициализирован, выполните инициализацию этого класса
// 初始操作,主要操作见doStart()
@Override
public void start() {
if (!this.initialized) {
synchronized (this.lifecycleMonitor) {
if (!this.initialized) {
afterPropertiesSet();
this.initialized = true;
}
}
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Starting Rabbit listener container.");
}
# 调用子类方法
doStart();
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
}
12.2 Метод doStart():
Основные операции следующие:
Шаг 1: Получите экземпляр RabbitAdmin
Шаг 2: Инициализация rabbitAdmin: объявление новых очередей, обменов и привязок на RabbitMQ
Шаг 3: вызовите метод doStart() родительского класса
Шаг 4. Запустите программу сообщений в пуле потоков для обработки сообщений.
Шаг 5: Следите за тем, успешно ли запущен мессенджер, и выдавайте исключение, если он не работает.
protected void doStart() throws Exception {
if (getMessageListener() instanceof ListenerContainerAware) {
// 验证当前监听的队列是否和容器相同
….
Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
if (expectedQueueNames != null) {
String[] queueNames = getQueueNames();
Assert.state(expectedQueueNames.size() == queueNames.length,
"Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
boolean found = false;
for (String queueName : queueNames) {
if (expectedQueueNames.contains(queueName)) {
found = true;
}
else {
found = false;
break;
}
}
Assert.state(found, "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
}
}
// 获取rabbitAdmin值
if (this.rabbitAdmin == null && this.getApplicationContext() != null) {
Map<String, RabbitAdmin> admins = this.getApplicationContext().getBeansOfType(RabbitAdmin.class);
if (admins.size() == 1) {
this.rabbitAdmin = admins.values().iterator().next();
}
else {
….
}
}
// rabbitAdmin初始化:对新增的队列、交换机、绑定在RabbitMQ上进行声明
checkMismatchedQueues();
// 调用父类方法
super.doStart();
synchronized (this.consumersMonitor) {
// 初始化消息者,此方法会创建消息者BlockingQueueConsumer
int newConsumers = initializeConsumers();
….
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
// 在线程池中启动这个消息者,进行消息消费:AsyncMessageProcessingConsumer 是个线程类,它调用BlockingQueueConsumer的start()方法接收消息并进行处理。
for (BlockingQueueConsumer consumer : this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// 在线程池中执行 AsyncMessageProcessingConsumer
this.taskExecutor.execute(processor);
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
// 监控消息者是否启动成功,如果失败则抛出异常
for (AsyncMessageProcessingConsumer processor : processors) {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
}
13. КроликАдмин:
Инициализация RabbitAdmin: объявление новых очередей, обменов и привязок на RabbitMQ
public void initialize() {
…
// 获取所有的队列、交换机、绑定
Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
this.applicationContext.getBeansOfType(Exchange.class).values());
Collection<Queue> contextQueues = new LinkedList<Queue>(
this.applicationContext.getBeansOfType(Queue.class).values());
Collection<Binding> contextBindings = new LinkedList<Binding>(
this.applicationContext.getBeansOfType(Binding.class).values());
…
// 在RabbitMQ上进行声明,创建对应的队列、交换机、绑定
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
@Override
public Object doInRabbit(Channel channel) throws Exception {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
}
});
}
14. AsyncMessageProcessingConsumer
Наследуйте Runnable, вызовите метод start() BlockingQueueConsumer в методе run() класса
private final class AsyncMessageProcessingConsumer implements Runnable {
private final BlockingQueueConsumer consumer;
private volatile FatalListenerStartupException startupException;
private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
this.consumer = consumer;
this.start = new CountDownLatch(1);
}
@Override
public void run() {
….
try {
if (SimpleMessageListenerContainer.this.autoDeclare) {
SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
}
// 启动BlockingQueueConsumer 的start()方法
this.consumer.start();
..
}
…
...
15. BlockingQueueConsumer
BlockingQueueConsumer: вызовите метод basicConsume в начале, чтобы подписаться на сообщение. Здесь мы видим, что spring начинает вызывать класс (.channel.basicConsume) в пакете Jar, предоставленном RabbitMQ, указывая на то, что мы проследовали до конца, и на этом весь анализ исходного кода заканчивается.
15.1 Метод start() потока
void start() throws AmqpException {
try {
for (String queueName : this.queues) {
// 循环订阅所有的消息
if (!this.missingQueues.contains(queupublic eName)) {
consumeFromQueue(queueName);
}
}
}
catch (IOException e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
}
private void consumeFromQueue(String queue) throws IOException {
// 调用basicConsume方法定阅消息。在这里我们可以看到spring开始调用RabbitMQ提供的Jar包里的类(.channel.basicConsume),
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), false, this.exclusive,
this.consumerArgs, this.consumer);
if (consumerTag != null) {
this.consumerTags.put(consumerTag, queue);
}
...
}
}