Inventory MQ: взгляд на Кафку

Java
Inventory MQ: взгляд на Кафку

Общая документация:Каталог статей
Github : github.com/black-ant

Введение

доRabbitMQПосле простого анализа в этой статье будут рассмотрены связанные операции Kafka.

2. Основное использование

kafka — очень часто используемая высокопроизводительная очередь сообщений, давайте сначала рассмотрим ее основное использование.

Основные зависимости Maven

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

информация о конфигурации

spring.kafka.bootstrap-servers=127.0.0.1:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=ant
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 配置消费者消息的key和value的编解码方式-consumer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.listener.missing-topics-fatal=false

новости производства

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void produce() {
        kafkaTemplate.send("start", "one", "are you ok?" + "----" + i);
    }

Использование сообщений

@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId")
public void listener0(ConsumerRecord<?, ?> record) {
    logger.info("------> this is in listerner 0:{}<-------", record.value());
}

Суммировать

  • KafkaTemplate публикует сообщения
  • KafkaListener прослушивает сообщения

В целом, это то же самое, что и предыдущее использование Rabbit, и конкретное внутреннее использование будет подробно рассмотрено в последующем исходном коде.

3. Очки базовых знаний

ссылка на базовый членочередь сообщенийДокументация.

4. Список исходного кода инструмента

мы вращаемсяKafkaTemplateа такжеKafkaListenerАнализируются 2 класса.

4.1 KafkaTemplate

происхождение отправки

Видно что получается через БУДУЩЕЕ получить результат

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
    final Producer<K, V> producer = getTheProducer();
    final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
    producer.send(producerRecord, buildCallback(producerRecord, producer, future));
    if (this.autoFlush) {
        flush();
    }
    return future;
}

Создание сообщения

Основная логика — завершить отправку через объект org.apache.kafka.clients.producer.Producer, который является нативным пакетом Kafka и принадлежит kafka-client.

// 第一步 : 消息的构建
C0- KafkaProducer
    M_01- send(ProducerRecord<K, V> record, Callback callback)
        - 首先调用 ProducerInterceptors 构建一个 ProducerRecord -> PS:M_01_01
            ?- 注意 , 这里主要是通过拦截器添加额外的功能 , 会内部进行一个 For 循环
            |- For 许纳湖List<ProducerInterceptor<K, V>>
                - 调用对应的 interceptor.onSend(interceptRecord) , 替换之前的 ProducerInterceptors
                - 如果集合为空 , 则直接返回正确的
        - 调用 doSend 主流程 -> M_02
    M_02- doSend
        1- 先判断 Sender 对象是否存在且运行 , 否则抛出异常 -> PS:M_02_02
        2- 调用 waitOnMetadata , 判断集群元数据可用
        3- 构建 Cluster 对象
        4- 构建序列化 serializedKey , serializedValue 
        5- 生成 partition  -> PS:M_02_03 
        6- 生成最终发送对象 TopicPartition
        7- 构建 Header 和 设置 readOnly 属性
        8- 确定消息size
        9- 构建 callback 拦截器  -> PS:M_02_05  
        10- 发送消息 , 获得一个 RecordAccumulator.RecordAppendResult 用于异步获取结果
            ?- 其中包含一个 future 对象
            ?- RecordAccumulator append 发起发送
        

// M_02 代码
 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // 首先,确保主题的元数据可用
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                // 内部判断逻辑 : 
                // 构建元数据 , 发起 metadata.fetch() ,判断超时时间
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException....;
                throw e;
            }
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw ....;
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw ....;
            }
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            // 生产者回调将确保调用'回调'和拦截回调
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
            // 事务管理
            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);

            // 发送消息且返回一个 RecordAppendResult 对象
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (Exception e) {
             // ... 省略异常处理 , 主要有以下操作
             // ApiException / InterruptedException / BufferExhaustedException / KafkaException / Exception
             // this.errors.record();
             // this.interceptors.onSendError(record, tp, e);
        }
    }
    
    
// Step 2 : 消息的发送 -- 见后文     

PS: роль объекта M_01_01 ProducerRecord

Функция: каждая запись ProducerRecord представляет собой сообщение, объект содержит тему, раздел, заголовки, используемые для сопоставления логики отправки.

public class ProducerRecord<K, V> {

    // Topic 类型
    private final String topic;
    
    // 如果指定了一个有效的分区号,该分区将在发送记录时使用
    // 如果没有分区 , 但指定了一个键,则使用该键的哈希值来选择分区
    // 如果键和分区都没有 , 则分区将以循环方式分配
    private final Integer partition;
    private final Headers headers;
    
    // 监听的 Id
    private final K key;
    // 发送的消息体
    private final V value;
    
    // 如果用户没有提供时间戳,生产者将记录当前时间。
    // Kafka最终使用的时间戳取决于为 Topic 配置的时间戳类型。
    private final Long timestamp;
    //.................
}

image.png

PS: M_02_02 Объект-отправитель

эффект:основной объект отправки, для отправки сообщений, для обработки кластера
Владение пакетом: org.apache.kafka.clients.producer.internals

TODO

PS:M_02_03 Генерация основной логики раздела

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }


PS: функция M_02_03 InterceptorCallback

отправка сообщений

В общем, для отправки сообщения есть 3 строки:

  • Строка 1: при запуске проекта запуск KafkaMessageListenerContainer зацикливается.
    • Вызовите pollSelectionKeys, чтобы завершить процесс записи канала.
  • Строка вторая: установить отправить
    • Sender # runOnce sendProducerData(currentTimeMs)Установите ClientRequest в
  • Строка 3: Получить Отправить Отправить
    • Sender # runOnce client.poll(pollTimeout, currentTimeMs)Инициировать выполнение опроса

// PS : 消息的产生和发送是异步的 , 消息的发送主要依赖于 Sender

public class Sender implements Runnable{.........}

// Step 1 : Thread run 方法一览
C2- Sender
    M2_01- runOnce()
        - client.poll(pollTimeout, currentTimeMs) 发送消息
    M2_02- sendProduceRequest : 核心发送方法
        - 构建了一个 RequestCompletionHandler . 其中有个 onComplete 用于后续回调
        - 构建一个 ClientRequest 
        - 调用 ClientRequest.send 发送消息
            ?-NetworkClient.doSend -> PS:M2_02_01
            
            
// PS:M2_02_01 底层一览
    public void send(Send send) {
        String connectionId = send.destination();
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        if (closingChannels.containsKey(connectionId)) {
            // 确保通知通过'断开',离开通道在状态中关闭被触发
            this.failedSends.add(connectionId);
        } else {
            try {
                // 此处使用 KafkaChannel 进行处理
                channel.setSend(send);
            } catch (Exception e) {
                // 更新状态以保持一致性,通道关闭后将被丢弃
                channel.state(ChannelState.FAILED_SEND);
                // 确保在下一次轮询中处理' failedsent '时通过' disconnected '通知
                this.failedSends.add(connectionId);
                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                if (!(e instanceof CancelledKeyException)) {
                    throw e;
                }
            }
        }
    }
       
// 隐藏方法 Selector  , 在上文中 M2_02 中设置了 send , 此处进行处理
C- Selector
    M- pollSelectionKeys
        -  send = channel.write()
       

обратный вызов сообщения

обратный вызов основан наSender # handleProduceResponseинициирован, методsendProduceRequestНабор методов


// Start : 请求的返回处理
C- Sender # handleProduceResponse
// 核心代码一览
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
    TopicPartition tp = entry.getKey();
    ProduceResponse.PartitionResponse partResp = entry.getValue();
    ProducerBatch batch = batches.get(tp);
    completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());

// End : template 回调
// 在 KafkaTemplate.doSend 中设置
C-KafkaTemplate
    M- doSend 设置
        - producer.send(producerRecord, buildCallback(producerRecord, producer, future));
    M- buildCallback
        - 

// PS : 请求到 Callback 的流转
C- Sender # handleProduceResponse
C- Sender # completeBatch
C- ProduceBatch # done
C- ProduceBatch # completeFutureAndFireCallbacks
C- InterceptorCallback # onCompletion 
// 此处就会调用 End : template 回调

Управление транзакциями

TransactionSynchronizationManager

4.2 KafkaListener

Ядром kafka Listener является KafkaListenerAnnotationBeanPostProcessor, а инициализация выполняется на основе postProcessAfterInitialization класса BeanPostProcessor.

Процесс потребления Кафки имеет следующие маршруты:

  • регистрация контейнера
  • Мониторинг цикла контейнера
  • потребление сообщений

KafkaMQ.png

регистрация контейнера

C3- KafkaListenerAnnotationBeanPostProcessor
    M3_01- postProcessAfterInitialization
        ?- 这里来自于Bean 加载的 applyBeanPostProcessorsAfterInitialization 方法 , 处理Bean 的前置扩展
        - 获取 Class 级别的 KafkaListener
        - 通过 MethodIntrospector 反射获取各方法的 KafkaListener
            - 注意 , 这里是一个 set 集合 , 意味着他允许一个方法上面编注多个 @KafkaListener -> PS:M3_01_01
        FOR- 循环处理 KafkaListener
            - processKafkaListener 循环处理
   M3_02- processKafkaListener
       - 判断是否为代理方法
       - 创建一个 MethodKafkaListenerEndpoint 节点 , 将当前方法设置到 MethodKafkaListenerEndpoint 节点中
       - 为节点设置相关的属性
           ?- Bean / MessageHandlerMethodFactory / Id / GroupId / TopicPartitions 
           ?- Topics / TopicPattern / ClientIdPrefix
       - 调用 KafkaListenerEndpointRegistrar 注册当前节点
    M3_03- Method checkProxy(Method methodArg, Object bean) 
        - 获取当前 method
        - 返回由AOP代理代理的接口
        - 从代理接口中获取对应的 method 
    M3_04- processListener
        - 继续构建 MethodKafkaListenerEndpoint 
            ?- Bean / MessageHandlerMethodFactory / Id / GroupId / TopicPartitions / Topics /TopicPattern / ClientIdPrefix
            ?- 根据条件处理 concurrency / autoStartup / Group / 
         - 通过 BeanFactory 构建 KafkaListenerContainerFactory
         - 为 MethodKafkaListenerEndpoint 设置属性
            ?- beanFactory / ErrorHandler
         - 调用 registrar.registerEndpoint 注册当前 endpoint
            ?- 该对象会在 afterSingletonsInstantiated 中被消费
    M3_05- afterSingletonsInstantiated
        - 为 registrar 注册各种属性
            ?- BeanFactory / EndpointRegistry / ContainerFactoryBeanName
         - 调用 registerAllEndpoints 处理 registrar -> M4_06
    M3_07- resolveContainerFactory
         - 返回一个 KafkaListenerContainerFactory
         
         
// M3_01 核心代码
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            // 获取 Class 级别的 KafkaListener
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<>();
            // 获取方法级别注解
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
                        Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    });
            if (hasClassLevelListeners) {
                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        (ReflectionUtils.MethodFilter) method ->
                                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                multiMethods.addAll(methodsWithHandler);
            }
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
            }
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (KafkaListener listener : entry.getValue()) {
                        // 核心处理逻辑
                        processKafkaListener(listener, method, bean, beanName);
                    }
                }
            }
            if (hasClassLevelListeners) {
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
            }
        }
        return bean;
    }
         
         
// M3_03 核心伪代码 : 
private Method checkProxy(Method methodArg, Object bean) {
	Method method = methodArg;
	if (AopUtils.isJdkDynamicProxy(bean)) {
		method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
		Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
		for (Class<? iface : proxiedInterfaces) {
			method = iface.getMethod(method.getName(), method.getParameterTypes());
			break;
		}
	}
	return method;
}

// 手动注入 Listener
// 无意中在代码中发现了一个 KafkaListenerConfigurer 接口 , 该接口可以用于手动注入 Listener
I- KafkaListenerConfigurer    
    M- configureKafkaListeners(KafkaListenerEndpointRegistrar registrar)
    	- 构建了一个 MessageListenerContainer 


// M4_01 的触发流程 : afterPropertiesSet , 再往上为 DefaultListableBeanFactory # afterSingletonsInstantiated

C4- KafkaListenerEndpointRegistrar
    F4_01- List<KafkaListenerEndpointDescriptor> endpointDescriptors  
    F4_02- Map<String, MessageListenerContainer> listenerContainers    
    M4_01- registerEndpoint
        - 构建一个 KafkaListenerEndpointDescriptor
        - 如果startImmediately 初始化就启动 , synchronized 上锁注册到容器中
        - 否则先添加到 List<KafkaListenerEndpointDescriptor> 集合中 -> M4_02
    M4_02- registerAllEndpoints
        - synchronized 上锁注册到容器中 
            - 注册容器 -> M4_04
    M4_03- resolveContainerFactory : 返回一个 KafkaListenerContainerFactory
    M4_04- registerListenerContainer
        - 构建一个 MessageListenerContainer , 并且加入 Map<String, MessageListenerContainer> 
            ?- createListenerContainer 时会注册 Method
        - 如果包含 Group , 则加入List<MessageListenerContainer> 集合 
            ?- 该集合会在 start(继承于Lifecycle)方法中调用 -> M4_05
    M4_05- startIfNecessary
        - 调用 MessageListenerContainer start 方法
    M4_06- createListenerContainer
        - 
    M4_07- start
        FOR- getListenerContainers 获取所有的 MessageListenerContainer , 依次调用 startIfNecessary
            ?- M4_05
        
        
C05- MessageListenerContainer
    M5_01- start()
        -
            
C06- AbstractMessageListenerContainer
    M6_01- doStart()
        - 最终调用 KafkaMessageListenerContainer 
    M6_02- run()
        - 这里通过一个 While 方法进行循环处理 -> 
    M6_03- pollAndInvoke()
        - invokeListener(records) 反射处理
        
C07- KafkaMessageListenerContainer
    M7_01- doStart()
        - 准备 ContainerProperties 
        - 校验 AckMode 模式
        - 构建 GenericMessageListener
        - 构建 ListenerConsumer
        - 修改容器状态
            ?- 这里和后面的循环监听形成前后关联
        - 这里还用了一个 CountDownLatch 来等待执行
        - 开始线程的执行 , 这里会执行 listenConsumer 的 run 方法 -> PS:M7_01_02
    M7_02- doInvokeOnMessage()
        - 调用对应 MessageListener 的 onMessage , 最终调用 M8_1
    M7_03- onMessage()
        - 
        
// M7_01 代码
protected void doStart() {
    if (isRunning()) {
        return;
    }
    if (this.clientIdSuffix == null) { // stand-alone container
        checkTopics();
    }
    // 准备 ContainerProperties 
    ContainerProperties containerProperties = getContainerProperties();
    checkAckMode(containerProperties);

    Object messageListener = containerProperties.getMessageListener();
    if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
					(getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }
    GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
    ListenerType listenerType = determineListenerType(listener);
    this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    // 修改容器状态
    setRunning(true);
    this.startLatch = new CountDownLatch(1);
    // 此处开始现场的执行
    this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor()
				.submitListenable(this.listenerConsumer);
    try {
        if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {
            publishConsumerFailedToStart();
        }
    }catch (@SuppressWarnings(UNUSED) InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
        
        
C8- ConcurrentMessageListenerContainer
    M8_01-  doStart()
        - 校验 topic
        - 启动所有的 KafkaMessageListenerContainer
        
        
    
C10- MethodKafkaListenerEndpoint
    M10_01- createMessageListener
        - messageListener.setHandlerMethod 注册 Method
        
        
        


PS: M3_01_01 больше аннотаций Kafkalistener

kafkaTemplate.send("start", "one", "are you ok one?" + "----");
kafkaTemplate.send("topic1", "two", "are you ok two?" + "----");


@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId")
@KafkaListener(id = "two", topics = "start", clientIdPrefix = "myClientId")
public void listener0(ConsumerRecord<?, ?> record) {
    logger.info("------> this is in listerner 0:{}<-------", record.value());
}

// 结果
2021-05-04 22:04:35.094  INFO 20012 --- [      two-0-C-1] c.g.k.demo.service.KafkaConsumerService  : ------> this is in listerner 0:are you ok two?----<-------
2021-05-04 22:04:35.098  INFO 20012 --- [      one-0-C-1] c.g.k.demo.service.KafkaConsumerService  : ------> this is in listerner 0:are you ok one?----<-------


image.png

PS: поток исполнения M7_01_02

image.png

Мониторинг цикла контейнера

Основная логика находится во внутреннем классе KafkaMessageListenerContainer ListenerConsumer.

  • C- ConsumerListener # run
  • C- KafkaMessageListenerContainer # pollAndInvoke
  • C- KafkaMessageListenerContainer # invokeOnMessage
  • C- KafkaMessageListenerContainer # doInvokeOnMessage
    • Соответствующий MessagingMessageListenerAdapter вызывается в этом классе для окончательного выполнения.

// Step 1 : 起点
C- KafkaMessageListenerContainer.ListenerConsumer
    public void run() {
        publishConsumerStartingEvent();
        this.consumerThread = Thread.currentThread();
        if (this.consumerSeekAwareListener != null) {
            this.consumerSeekAwareListener.registerSeekCallback(this);
        }
        KafkaUtils.setConsumerGroupId(this.consumerGroupId);
        this.count = 0;
        this.last = System.currentTimeMillis();
        initAssignedPartitions();
        publishConsumerStartedEvent();
        while (isRunning()) {
            try {
                // 执行 invoke 逻辑
                pollAndInvoke();
            }catch (@SuppressWarnings(UNUSED) WakeupException e) {
                // ... 省略异常处理
            }
        }
        wrapUp();
    }
    
// Step 2 : 循环中 poll 对象 
C- ListenerConsumer
    M- pollAndInvoke() : 核心方法 , poll 获取并且映射到相关方法
        - ConsumerRecords<K, V> records = doPoll()
            |- this.consumer.poll(this.pollTimeout)
        - invokeListener(records)
    M- doPoll
        
    
// Step 3 :在 doInvokeWithRecords 中 , 会对消息进行迭代处理
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
    doInvokeRecordListener(record, null, iterator)
}


// Step 4 : 在 doInvokeOnMessage 中 , 调用对应的 ListenerAdapter 完成消息处理
// PS : 该 在 KafkaMessageListenerContainer 中通过构造器生成
C- KafkaMessageListenerContainer.ListenerConsumer

// 通过四种监听类型 , 选择处理的 listener 
// ACKNOWLEDGING_CONSUMER_AWARE,CONSUMER_AWARE,ACKNOWLEDGING,SIMPLE

Окончательная обработка выполнения

Обработчик в конечном итоге будет выполнен из MessagingMessageListerAdAppter, обработчик вводится в кафкалистеэнердендеренпистрар

 // 最终执行类
 
C08- RecordMessagingMessageListenerAdapter
    M8_1- onMessage
        - 获取对应的 Method . 反射调用
            ?- 该 HandlerAdapter 在 MethodKafkaListenerEndpoint # createMessageListener 是处理
 
C10- RecordMessagingMessageListenerAdapter
    M10_1- onMessage


5. Кафка указывает на глубину

TODO : Последующие статьи анализируют Kafka узел за узлом, если он реализует связанные функции.

FAQ

java.nio.file.FileSystemException -> Этот файл используется другой программой, и процесс не может получить к нему доступ

Детали проблемы:D:\tmp\kafka-logs\topic_1-0\00000000000000000000.timeindex.deleted: этот файл используется другой программой, и процесс не может получить к нему доступ
решение :Вручную удалите файлы журналов в \kafka-logs и перезапустите kafka.

Суммировать

В этой статье в основном рассказывается о двух частях, отправке и мониторинге, отправка в основном основана на отправке client.poll, а ядро ​​​​мониторинга в основном заключается в сканировании и циклическом извлечении опроса потребителей.

Поняв эту часть процесса, вы можете начать детально разбираться в том, как работать и настраивать его в дополнительных функциях, таких как кластеризация.