Общая документация:Каталог статей
Github : github.com/black-ant
Введение
доRabbitMQПосле простого анализа в этой статье будут рассмотрены связанные операции Kafka.
2. Основное использование
kafka — очень часто используемая высокопроизводительная очередь сообщений, давайте сначала рассмотрим ее основное использование.
Основные зависимости Maven
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
информация о конфигурации
spring.kafka.bootstrap-servers=127.0.0.1:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=ant
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 配置消费者消息的key和value的编解码方式-consumer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.listener.missing-topics-fatal=false
новости производства
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void produce() {
kafkaTemplate.send("start", "one", "are you ok?" + "----" + i);
}
Использование сообщений
@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId")
public void listener0(ConsumerRecord<?, ?> record) {
logger.info("------> this is in listerner 0:{}<-------", record.value());
}
Суммировать
- KafkaTemplate публикует сообщения
- KafkaListener прослушивает сообщения
В целом, это то же самое, что и предыдущее использование Rabbit, и конкретное внутреннее использование будет подробно рассмотрено в последующем исходном коде.
3. Очки базовых знаний
ссылка на базовый членочередь сообщенийДокументация.
4. Список исходного кода инструмента
мы вращаемсяKafkaTemplateа такжеKafkaListenerАнализируются 2 класса.
4.1 KafkaTemplate
происхождение отправки
Видно что получается через БУДУЩЕЕ получить результат
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
final Producer<K, V> producer = getTheProducer();
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
if (this.autoFlush) {
flush();
}
return future;
}
Создание сообщения
Основная логика — завершить отправку через объект org.apache.kafka.clients.producer.Producer, который является нативным пакетом Kafka и принадлежит kafka-client.
// 第一步 : 消息的构建
C0- KafkaProducer
M_01- send(ProducerRecord<K, V> record, Callback callback)
- 首先调用 ProducerInterceptors 构建一个 ProducerRecord -> PS:M_01_01
?- 注意 , 这里主要是通过拦截器添加额外的功能 , 会内部进行一个 For 循环
|- For 许纳湖List<ProducerInterceptor<K, V>>
- 调用对应的 interceptor.onSend(interceptRecord) , 替换之前的 ProducerInterceptors
- 如果集合为空 , 则直接返回正确的
- 调用 doSend 主流程 -> M_02
M_02- doSend
1- 先判断 Sender 对象是否存在且运行 , 否则抛出异常 -> PS:M_02_02
2- 调用 waitOnMetadata , 判断集群元数据可用
3- 构建 Cluster 对象
4- 构建序列化 serializedKey , serializedValue
5- 生成 partition -> PS:M_02_03
6- 生成最终发送对象 TopicPartition
7- 构建 Header 和 设置 readOnly 属性
8- 确定消息size
9- 构建 callback 拦截器 -> PS:M_02_05
10- 发送消息 , 获得一个 RecordAccumulator.RecordAppendResult 用于异步获取结果
?- 其中包含一个 future 对象
?- RecordAccumulator append 发起发送
// M_02 代码
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// 首先,确保主题的元数据可用
ClusterAndWaitTime clusterAndWaitTime;
try {
// 内部判断逻辑 :
// 构建元数据 , 发起 metadata.fetch() ,判断超时时间
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException....;
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw ....;
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw ....;
}
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
// 生产者回调将确保调用'回调'和拦截回调
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
// 事务管理
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
// 发送消息且返回一个 RecordAppendResult 对象
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (Exception e) {
// ... 省略异常处理 , 主要有以下操作
// ApiException / InterruptedException / BufferExhaustedException / KafkaException / Exception
// this.errors.record();
// this.interceptors.onSendError(record, tp, e);
}
}
// Step 2 : 消息的发送 -- 见后文
PS: роль объекта M_01_01 ProducerRecord
Функция: каждая запись ProducerRecord представляет собой сообщение, объект содержит тему, раздел, заголовки, используемые для сопоставления логики отправки.
public class ProducerRecord<K, V> {
// Topic 类型
private final String topic;
// 如果指定了一个有效的分区号,该分区将在发送记录时使用
// 如果没有分区 , 但指定了一个键,则使用该键的哈希值来选择分区
// 如果键和分区都没有 , 则分区将以循环方式分配
private final Integer partition;
private final Headers headers;
// 监听的 Id
private final K key;
// 发送的消息体
private final V value;
// 如果用户没有提供时间戳,生产者将记录当前时间。
// Kafka最终使用的时间戳取决于为 Topic 配置的时间戳类型。
private final Long timestamp;
//.................
}
PS: M_02_02 Объект-отправитель
эффект:основной объект отправки, для отправки сообщений, для обработки кластера
Владение пакетом: org.apache.kafka.clients.producer.internals
TODO
PS:M_02_03 Генерация основной логики раздела
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
PS: функция M_02_03 InterceptorCallback
отправка сообщений
В общем, для отправки сообщения есть 3 строки:
- Строка 1: при запуске проекта запуск KafkaMessageListenerContainer зацикливается.
- Вызовите pollSelectionKeys, чтобы завершить процесс записи канала.
- Строка вторая: установить отправить
- Sender # runOnce sendProducerData(currentTimeMs)Установите ClientRequest в
- Строка 3: Получить Отправить Отправить
- Sender # runOnce client.poll(pollTimeout, currentTimeMs)Инициировать выполнение опроса
// PS : 消息的产生和发送是异步的 , 消息的发送主要依赖于 Sender
public class Sender implements Runnable{.........}
// Step 1 : Thread run 方法一览
C2- Sender
M2_01- runOnce()
- client.poll(pollTimeout, currentTimeMs) 发送消息
M2_02- sendProduceRequest : 核心发送方法
- 构建了一个 RequestCompletionHandler . 其中有个 onComplete 用于后续回调
- 构建一个 ClientRequest
- 调用 ClientRequest.send 发送消息
?-NetworkClient.doSend -> PS:M2_02_01
// PS:M2_02_01 底层一览
public void send(Send send) {
String connectionId = send.destination();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (closingChannels.containsKey(connectionId)) {
// 确保通知通过'断开',离开通道在状态中关闭被触发
this.failedSends.add(connectionId);
} else {
try {
// 此处使用 KafkaChannel 进行处理
channel.setSend(send);
} catch (Exception e) {
// 更新状态以保持一致性,通道关闭后将被丢弃
channel.state(ChannelState.FAILED_SEND);
// 确保在下一次轮询中处理' failedsent '时通过' disconnected '通知
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
throw e;
}
}
}
}
// 隐藏方法 Selector , 在上文中 M2_02 中设置了 send , 此处进行处理
C- Selector
M- pollSelectionKeys
- send = channel.write()
обратный вызов сообщения
обратный вызов основан наSender # handleProduceResponseинициирован, методsendProduceRequestНабор методов
// Start : 请求的返回处理
C- Sender # handleProduceResponse
// 核心代码一览
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
// End : template 回调
// 在 KafkaTemplate.doSend 中设置
C-KafkaTemplate
M- doSend 设置
- producer.send(producerRecord, buildCallback(producerRecord, producer, future));
M- buildCallback
-
// PS : 请求到 Callback 的流转
C- Sender # handleProduceResponse
C- Sender # completeBatch
C- ProduceBatch # done
C- ProduceBatch # completeFutureAndFireCallbacks
C- InterceptorCallback # onCompletion
// 此处就会调用 End : template 回调
Управление транзакциями
TransactionSynchronizationManager
4.2 KafkaListener
Ядром kafka Listener является KafkaListenerAnnotationBeanPostProcessor, а инициализация выполняется на основе postProcessAfterInitialization класса BeanPostProcessor.
Процесс потребления Кафки имеет следующие маршруты:
- регистрация контейнера
- Мониторинг цикла контейнера
- потребление сообщений
регистрация контейнера
C3- KafkaListenerAnnotationBeanPostProcessor
M3_01- postProcessAfterInitialization
?- 这里来自于Bean 加载的 applyBeanPostProcessorsAfterInitialization 方法 , 处理Bean 的前置扩展
- 获取 Class 级别的 KafkaListener
- 通过 MethodIntrospector 反射获取各方法的 KafkaListener
- 注意 , 这里是一个 set 集合 , 意味着他允许一个方法上面编注多个 @KafkaListener -> PS:M3_01_01
FOR- 循环处理 KafkaListener
- processKafkaListener 循环处理
M3_02- processKafkaListener
- 判断是否为代理方法
- 创建一个 MethodKafkaListenerEndpoint 节点 , 将当前方法设置到 MethodKafkaListenerEndpoint 节点中
- 为节点设置相关的属性
?- Bean / MessageHandlerMethodFactory / Id / GroupId / TopicPartitions
?- Topics / TopicPattern / ClientIdPrefix
- 调用 KafkaListenerEndpointRegistrar 注册当前节点
M3_03- Method checkProxy(Method methodArg, Object bean)
- 获取当前 method
- 返回由AOP代理代理的接口
- 从代理接口中获取对应的 method
M3_04- processListener
- 继续构建 MethodKafkaListenerEndpoint
?- Bean / MessageHandlerMethodFactory / Id / GroupId / TopicPartitions / Topics /TopicPattern / ClientIdPrefix
?- 根据条件处理 concurrency / autoStartup / Group /
- 通过 BeanFactory 构建 KafkaListenerContainerFactory
- 为 MethodKafkaListenerEndpoint 设置属性
?- beanFactory / ErrorHandler
- 调用 registrar.registerEndpoint 注册当前 endpoint
?- 该对象会在 afterSingletonsInstantiated 中被消费
M3_05- afterSingletonsInstantiated
- 为 registrar 注册各种属性
?- BeanFactory / EndpointRegistry / ContainerFactoryBeanName
- 调用 registerAllEndpoints 处理 registrar -> M4_06
M3_07- resolveContainerFactory
- 返回一个 KafkaListenerContainerFactory
// M3_01 核心代码
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
// 获取 Class 级别的 KafkaListener
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<>();
// 获取方法级别注解
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
// 核心处理逻辑
processKafkaListener(listener, method, bean, beanName);
}
}
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}
// M3_03 核心伪代码 :
private Method checkProxy(Method methodArg, Object bean) {
Method method = methodArg;
if (AopUtils.isJdkDynamicProxy(bean)) {
method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
for (Class<? iface : proxiedInterfaces) {
method = iface.getMethod(method.getName(), method.getParameterTypes());
break;
}
}
return method;
}
// 手动注入 Listener
// 无意中在代码中发现了一个 KafkaListenerConfigurer 接口 , 该接口可以用于手动注入 Listener
I- KafkaListenerConfigurer
M- configureKafkaListeners(KafkaListenerEndpointRegistrar registrar)
- 构建了一个 MessageListenerContainer
// M4_01 的触发流程 : afterPropertiesSet , 再往上为 DefaultListableBeanFactory # afterSingletonsInstantiated
C4- KafkaListenerEndpointRegistrar
F4_01- List<KafkaListenerEndpointDescriptor> endpointDescriptors
F4_02- Map<String, MessageListenerContainer> listenerContainers
M4_01- registerEndpoint
- 构建一个 KafkaListenerEndpointDescriptor
- 如果startImmediately 初始化就启动 , synchronized 上锁注册到容器中
- 否则先添加到 List<KafkaListenerEndpointDescriptor> 集合中 -> M4_02
M4_02- registerAllEndpoints
- synchronized 上锁注册到容器中
- 注册容器 -> M4_04
M4_03- resolveContainerFactory : 返回一个 KafkaListenerContainerFactory
M4_04- registerListenerContainer
- 构建一个 MessageListenerContainer , 并且加入 Map<String, MessageListenerContainer>
?- createListenerContainer 时会注册 Method
- 如果包含 Group , 则加入List<MessageListenerContainer> 集合
?- 该集合会在 start(继承于Lifecycle)方法中调用 -> M4_05
M4_05- startIfNecessary
- 调用 MessageListenerContainer start 方法
M4_06- createListenerContainer
-
M4_07- start
FOR- getListenerContainers 获取所有的 MessageListenerContainer , 依次调用 startIfNecessary
?- M4_05
C05- MessageListenerContainer
M5_01- start()
-
C06- AbstractMessageListenerContainer
M6_01- doStart()
- 最终调用 KafkaMessageListenerContainer
M6_02- run()
- 这里通过一个 While 方法进行循环处理 ->
M6_03- pollAndInvoke()
- invokeListener(records) 反射处理
C07- KafkaMessageListenerContainer
M7_01- doStart()
- 准备 ContainerProperties
- 校验 AckMode 模式
- 构建 GenericMessageListener
- 构建 ListenerConsumer
- 修改容器状态
?- 这里和后面的循环监听形成前后关联
- 这里还用了一个 CountDownLatch 来等待执行
- 开始线程的执行 , 这里会执行 listenConsumer 的 run 方法 -> PS:M7_01_02
M7_02- doInvokeOnMessage()
- 调用对应 MessageListener 的 onMessage , 最终调用 M8_1
M7_03- onMessage()
-
// M7_01 代码
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
// 准备 ContainerProperties
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);
Object messageListener = containerProperties.getMessageListener();
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
// 修改容器状态
setRunning(true);
this.startLatch = new CountDownLatch(1);
// 此处开始现场的执行
this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {
publishConsumerFailedToStart();
}
}catch (@SuppressWarnings(UNUSED) InterruptedException e) {
Thread.currentThread().interrupt();
}
}
C8- ConcurrentMessageListenerContainer
M8_01- doStart()
- 校验 topic
- 启动所有的 KafkaMessageListenerContainer
C10- MethodKafkaListenerEndpoint
M10_01- createMessageListener
- messageListener.setHandlerMethod 注册 Method
PS: M3_01_01 больше аннотаций Kafkalistener
kafkaTemplate.send("start", "one", "are you ok one?" + "----");
kafkaTemplate.send("topic1", "two", "are you ok two?" + "----");
@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId")
@KafkaListener(id = "two", topics = "start", clientIdPrefix = "myClientId")
public void listener0(ConsumerRecord<?, ?> record) {
logger.info("------> this is in listerner 0:{}<-------", record.value());
}
// 结果
2021-05-04 22:04:35.094 INFO 20012 --- [ two-0-C-1] c.g.k.demo.service.KafkaConsumerService : ------> this is in listerner 0:are you ok two?----<-------
2021-05-04 22:04:35.098 INFO 20012 --- [ one-0-C-1] c.g.k.demo.service.KafkaConsumerService : ------> this is in listerner 0:are you ok one?----<-------
PS: поток исполнения M7_01_02
Мониторинг цикла контейнера
Основная логика находится во внутреннем классе KafkaMessageListenerContainer ListenerConsumer.
- C- ConsumerListener # run
- C- KafkaMessageListenerContainer # pollAndInvoke
- C- KafkaMessageListenerContainer # invokeOnMessage
- C- KafkaMessageListenerContainer # doInvokeOnMessage
- Соответствующий MessagingMessageListenerAdapter вызывается в этом классе для окончательного выполнения.
// Step 1 : 起点
C- KafkaMessageListenerContainer.ListenerConsumer
public void run() {
publishConsumerStartingEvent();
this.consumerThread = Thread.currentThread();
if (this.consumerSeekAwareListener != null) {
this.consumerSeekAwareListener.registerSeekCallback(this);
}
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
this.count = 0;
this.last = System.currentTimeMillis();
initAssignedPartitions();
publishConsumerStartedEvent();
while (isRunning()) {
try {
// 执行 invoke 逻辑
pollAndInvoke();
}catch (@SuppressWarnings(UNUSED) WakeupException e) {
// ... 省略异常处理
}
}
wrapUp();
}
// Step 2 : 循环中 poll 对象
C- ListenerConsumer
M- pollAndInvoke() : 核心方法 , poll 获取并且映射到相关方法
- ConsumerRecords<K, V> records = doPoll()
|- this.consumer.poll(this.pollTimeout)
- invokeListener(records)
M- doPoll
// Step 3 :在 doInvokeWithRecords 中 , 会对消息进行迭代处理
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
doInvokeRecordListener(record, null, iterator)
}
// Step 4 : 在 doInvokeOnMessage 中 , 调用对应的 ListenerAdapter 完成消息处理
// PS : 该 在 KafkaMessageListenerContainer 中通过构造器生成
C- KafkaMessageListenerContainer.ListenerConsumer
// 通过四种监听类型 , 选择处理的 listener
// ACKNOWLEDGING_CONSUMER_AWARE,CONSUMER_AWARE,ACKNOWLEDGING,SIMPLE
Окончательная обработка выполнения
Обработчик в конечном итоге будет выполнен из MessagingMessageListerAdAppter, обработчик вводится в кафкалистеэнердендеренпистрар
// 最终执行类
C08- RecordMessagingMessageListenerAdapter
M8_1- onMessage
- 获取对应的 Method . 反射调用
?- 该 HandlerAdapter 在 MethodKafkaListenerEndpoint # createMessageListener 是处理
C10- RecordMessagingMessageListenerAdapter
M10_1- onMessage
5. Кафка указывает на глубину
TODO : Последующие статьи анализируют Kafka узел за узлом, если он реализует связанные функции.
FAQ
java.nio.file.FileSystemException -> Этот файл используется другой программой, и процесс не может получить к нему доступ
Детали проблемы:D:\tmp\kafka-logs\topic_1-0\00000000000000000000.timeindex.deleted: этот файл используется другой программой, и процесс не может получить к нему доступ
решение :Вручную удалите файлы журналов в \kafka-logs и перезапустите kafka.
Суммировать
В этой статье в основном рассказывается о двух частях, отправке и мониторинге, отправка в основном основана на отправке client.poll, а ядро мониторинга в основном заключается в сканировании и циклическом извлечении опроса потребителей.
Поняв эту часть процесса, вы можете начать детально разбираться в том, как работать и настраивать его в дополнительных функциях, таких как кластеризация.