Эта статья участвует в "Java Theme Month - Java Development Practice", подробнее см.Ссылка на мероприятие
Это первый день моего участия в Gengwen Challenge, смотрите подробности мероприятия:Обновить вызов
Внедрение Kafka в проект Spring очень удобно, с помощьюkafkaTemplate
(Шаблон производителя)+@KafkaListener
(Слушатель потребителя) может завершить разработку кода производителя-потребителя Я считаю, что студенты, которые его использовали, очень понятны, я не буду слишком много объяснять здесь Spring-Kafka. Сегодня мы в основном обсуждаем, как улучшить потребляемую мощность kafka.
1. Простые потребители
1.1 Настройка фабрики потребителей
Сначала настройте свойства потребителя
@Bean(BeanNameConstant.CONSUMER_FACTORY)
public ConsumerFactory<String, String> consumerFactory() {
final StringDeserializer stringDeserializer = new StringDeserializer();
Map<String, Object> props = new HashMap<>(10);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//设置是否自动提交offset 2.3 版本以后默认为false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory(props, stringDeserializer, stringDeserializer);
return consumerFactory;
}
1.2 Настройка KafkaListenerContainerFactory
Основная инкапсуляция потребителя находится в ConcurrentKafkaListenerContainerFactory. Сам KafkaConsumer является небезопасным для потоков и не может работать одновременно. Здесь Spring оборачивает другой слой для создания нескольких параллельных контейнеров KafkaMessageListenerContainers в соответствии с настроенным spring.kafka.listener.concurrency.
@Bean(BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY)
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory
(@Qualifier(BeanNameConstant.CONSUMER_FACTORY) ConsumerFactory<String, String> consumerFactory) {
//构建kafka并行消费监听类工厂类 此类通过topic名称创建该topic消费监听
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
//可通过注解的方式进行设置
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
//手动ack
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckOnError(false);
//设置ack模型机制 当发生error时 不同处理机制针对与offset有不同处理机制
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return concurrentKafkaListenerContainerFactory;
}
1.3 Потребители
@KafkaListener(
topics = "${kafka-topic.demo}",
containerFactory = BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY,
concurrency = "1"
)
public void loadListener(ConsumerRecord<?, ?> record, Acknowledgment ack){
try{
//业务方法
dealMessage(JsonUtil.readValue(String.valueOf(record.value()),Demo.class));
}catch (Exception e){
log.error("消费失败");
}finally {
//手动提交ack
ack.acknowledge();
}
}
ConsumerRecord
Класс содержит информацию о разделе, заголовок сообщения, тело сообщения и т. д. Если бизнесу необходимо получить эти параметры, используйтеConsumerRecord
будет хорошим выбором. Более удобно использовать определенный тип для получения тела сообщения, например тип String для получения тела сообщения. Обычно мы использовали быConsumerRecord
потреблять.
Скажи это здесь,Если подтверждение не отправлено, потребитель перезапускается, и потребитель повторно извлекает последнее смещение из раздела после перебалансировки. Возможно дублирование потребления.
2. Потребление несколькими потребителями
Как вы можете видеть выше, в @KafkaListener есть свойствоconcurrency
этоConcurrentKafkaListenerContainerFactory
Переменная-член, мы можем установить ее в конфигурации KafkaListenerContainerFactory или перезаписать конфигурацию в каждом KafkaListener.
/**
* Specify the container concurrency.
* @param concurrency the number of consumers to create.
* @see ConcurrentMessageListenerContainer#setConcurrency(int)
*/
public void setConcurrency(Integer concurrency) {
this.concurrency = concurrency;
}
Что он делает, так это создает nKafkaMessageListenerContainer
Экземпляр, то есть n kafkaconumser. Это ключ к реализации множественного потребительского потребления.
@KafkaListener(
topics = "${kafka-topic.demo}",
containerFactory = BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY,
concurrency = "12"
)
Значит, нам просто нужно изменить количество параллелизма, чтобы добиться многопоточного потребления?Так просто? ответ отрицательный.
2.1 Как настроить параллелизм
Настройка concurrency зависит от данных раздела kafka, то есть объема данных раздела. из-заKafkaMessageListenerContainer
Будет использован только один раздел.
Если в вашей теме всего 8 разделов, то параллелизм может работать должным образом только с 8 разделами.Обратите внимание, что если это распределенная система, то также * количество узлов. Если у вас есть два узла, то параллелизм каждого узла должен быть установлен на 4
3. Многопоточное пакетное потребление
Благодаря настройке параллелизма мы действительно добились многопоточного потребления, которое стало быстрее, чем раньше. Если мы хотим быть быстрыми, мы не можем бесконечно увеличивать количество разделов. Расширение раздела не может быть уменьшено. Так что найди другой способ.
Официальный пакетный интерфейс такой. То есть мы привыклиConsumerRecord
класс для получения сообщений, теперь замененный наList<ConsumerRecord>
Вот и все.
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
Перед этим нам нужно настроить конфигурацию потребителя
3.1 Настройка конфигурации
3.1.1 Настройка потребительской фабрики
//最大拉取条数2000 最大拉取时间1200s
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1200000);
-
max.poll.records контролирует количество извлекаемых записей каждый раз.
-
max.poll.interval.ms Максимальный временной интервал для каждого опроса.
max.poll.interval.ms 这个参数对于批量消费很重要
. Если параметр слишком короткий, потребитель будет снова запрашивать пакет данных при отправке смещения без завершения бизнес-обработки, вызывая повторную балансировку потребителя. В результате сообщения, которые были обработаны ранее, распределяются среди других потребителей для повторного использования. Затем войдите в бесконечный цикл. Все потребители потребляли данные этого смещения. Вызывает сжатие данных и повторное потребление.
Что касается моей практики, установка особенно большого значения не имеет никакого эффекта. Это не означает, что данные опроса поступают только после времени max.poll.interval.ms или что если после этого времени опроса не будет, потребитель вернется и запустит его один раз.
3.1.2 Настройка KafkaListenerContainerFactory
//构建kafka并行消费监听类工厂类 此类通过topic名称创建该topic消费监听
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
……
//是否并发消费
concurrentKafkaListenerContainerFactory.setBatchListener(true);
3.2 Реализация кода
// 每个线程处理的最大数量
private static final int MAX_NUM = 100;
@KafkaListener(groupId = "${spring.kafka.consumer.group-id}",
topics = "${kafka-topic}",concurrency="3")
public void loadListener(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
final long startTime = System.currentTimeMillis();
int batchSize = record.size() / MAX_NUM;
//该消费者每次消费时都积压了大量消息,提交offset时要保证所有异步线程处理完毕
if (batchSize == 0) {
// 消息小于100
this.dealMessage(record);
} else {
LinkedList<Future> futures = new LinkedList<>();
for (int i = 0; i < batchSize; i++) {
List<ConsumerRecord<?, ?>> records;
if (i == batchSize - 1) {
//需要把余数加上
records = record.subList(i * MAX_NUM, record.size());
} else {
records = record.subList(i * MAX_NUM, (i + 1) * MAX_NUM);
}
final Future<?> submit = executorService.submit(() -> {
this.dealMessage(records);
});
futures.add(submit);
}
//等待线程全部执行完
for (Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
// ignore
} catch (ExecutionException e) {
// ignore
}
}
}
log.info("批量处理完成,处理数量={},耗时={}ms", record.size(), System.currentTimeMillis() - startTime);
//手动确定 提交offset
ack.acknowledge();
}
/**
* 批量处理
* @param record
*/
public void dealMessage(List<ConsumerRecord<?, ?>> record) {
for (ConsumerRecord<?, ?> consumerRecord : record) {
//业务逻辑
this.dealMessage(consumerRecord);
}
}
Моя идея кода состоит в том, чтобы получать сообщения пакетами и использовать их в нескольких потоках. Если вас больше беспокоит безопасность и точность данных, мы можем подождать, пока этот пакет данных будет обработан, прежде чем отправлять компенсацию. Если вас это не волнует, вы также можете бросить его в пул асинхронных потоков и медленно обрабатывать его, а также напрямую отправлять смещение.
public void loadListener(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
final long startTime = System.currentTimeMillis();
//不用考虑消息丢失
for (ConsumerRecord<?, ?> consumerRecord : record) {
executorService.execute(()->{
dealMessage(consumerRecord);
});
}
log.info("批量处理完成,处理数量={},耗时={}ms", record.size(), System.currentTimeMillis() - startTime);
//手动确定 提交offset
ack.acknowledge();
}
В любом случае улучшение производительности будет заметно. Однако следует отметить, что количество протягиваний и настройка нитей должны основываться на реальной ситуации. Во время теста обращайте внимание на мониторинг процессора/памяти и БД и вовремя корректируйте его.Не гонитесь за возможностями обработки данных вслепую и не убивайте другие предприятия. Предпосылкой оптимизации производительности должно быть обеспечение стабильности системы. Удачи~