Недавно было обнаружено, что асинхронная отправка Kafka-продюсера в некоторых случаях блокирует основной поток, позже, в процессе устранения неполадок и решения проблемы, выяснилось, что это можно расценивать как некорректное описание Kafka.
постановка задачи
Во многих сценариях мы будем использовать асинхронный режим для отправки сообщений Kafka и будем использовать следующие методы в KafkaProducer:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {}
согласно сОписание документаЭто асинхронный метод отправки.В принципе, он не должен блокировать основной поток.Однако в некоторых случаях возникает блокирующий поток, например, некорректно работает брокер, не создана тема и т. д. Иногда мы делаем не нужно Отправляемый результат гарантирован, но если есть блокировка, это повлияет на другую бизнес-логику.
проблема
С объявлением метода отправки KafkaProducer проблем нет, поэтому давайте взглянем на ее конкретную реализацию:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); //出现问题的地方
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
...
} catch (ApiException e) {
...
}
}
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
metadata.add(topic);
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
//一直获取topic的元数据信息,直到获取成功,若获取时间超过maxWaitMs,则抛出异常
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs) { //判断执行时间是否大于maxWaitMs
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
metadata.maybeThrowException();
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
Из его реализации видно, что причиной блокировки потока является следующая логика:
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException
В процессе выполнения отправки через KafkaProducer сначала необходимо получить метаданные, и это непрерывная циклическая операция до тех пор, пока получение не будет успешным или не будет выдано исключение.
На самом деле, нет никаких проблем с первоначальным намерением Кафки достичь этого, потому что предпосылкой отправки сообщения является получение информации о границе и теме.Проблема в том, что метод отправки открыт для внешнего мира, но внутренняя реализация заблокирован, поэтому в некоторых случаях эта ситуация не принимается во внимание в то время, как только возникает исключение границы или темы, системный поток будет заблокирован, в результате чего система будет медленно реагировать, пока не произойдет сбой.
задача решена
На самом деле решение этой проблемы очень простое — создать несколько потоков отдельно для отправки сообщения, чтобы даже при возникновении непредвиденной ситуации блокировались только несколько потоков, что не вызовет большой площади системные потоки блокируются и становятся недоступными Конкретная реализация:
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
class ProducerF[K,V](kafkaProducer: KafkaProducer[K,V]) {
val executor: ExecutorService = Executors.newScheduledThreadPool(1)
def sendAsync(producerRecord: ProducerRecord[K,V], callback: Callback) = {
executor.submit(new Callable[RecordMetadata]() {
def call = kafkaProducer.send(producerRecord, callback).get()
})
}
}
Это метод реализации. Конечно, вы также можете поддерживать версию Kafka самостоятельно, но это может быть немного проблематично. Конкретный метод выбора зависит от вашего собственного сценария.
Пример использования:
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
object FixExample extends App {
val props = new Properties()
props.put("max.block.ms", "3000")
props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "ProducerSendFixExample")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val topic = "topic-trace-one"
val userId = "godpan"
val msg = "login wechat"
val data = new ProducerRecord[String, String](topic, userId, msg)
val startTime = System.currentTimeMillis()
val producerF = new ProducerF(producer)
producerF.sendAsync(data,(metadata: RecordMetadata, exception: Exception) => {
println(s"[producerF-sendAsync] data producerRecord: ${data}, exception: ${exception}")
})
// 如果想要得到发送结果,可以线程等待4s
// Thread.sleep(4000)
System.exit(0)
}
Соответствующий код загружен на github по адресу:kafka-send-async-bug-fix.