Асинхронная отправка производителя Kafka в некоторых случаях блокирует основной поток.

Kafka

Недавно было обнаружено, что асинхронная отправка Kafka-продюсера в некоторых случаях блокирует основной поток, позже, в процессе устранения неполадок и решения проблемы, выяснилось, что это можно расценивать как некорректное описание Kafka.

постановка задачи

Во многих сценариях мы будем использовать асинхронный режим для отправки сообщений Kafka и будем использовать следующие методы в KafkaProducer:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {}

согласно сОписание документаЭто асинхронный метод отправки.В принципе, он не должен блокировать основной поток.Однако в некоторых случаях возникает блокирующий поток, например, некорректно работает брокер, не создана тема и т. д. Иногда мы делаем не нужно Отправляемый результат гарантирован, но если есть блокировка, это повлияет на другую бизнес-логику.

проблема

С объявлением метода отправки KafkaProducer проблем нет, поэтому давайте взглянем на ее конкретную реализацию:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

/**
  * Implementation of asynchronously send a record to a topic.
  */
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        throwIfProducerClosed();
        // first make sure the metadata for the topic is available
        ClusterAndWaitTime clusterAndWaitTime;
        try {
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);  //出现问题的地方
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        ...
    } catch (ApiException e) {
        ...
    }
}

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
    // add topic to metadata topic list if it is not there already and reset expiry
    Cluster cluster = metadata.fetch();

    if (cluster.invalidTopics().contains(topic))
        throw new InvalidTopicException(topic);

    metadata.add(topic);

    Integer partitionsCount = cluster.partitionCountForTopic(topic);
    // Return cached metadata if we have it, and if the record's partition is either undefined
    // or within the known partition range
    if (partitionsCount != null && (partition == null || partition < partitionsCount))
        return new ClusterAndWaitTime(cluster, 0);

    long begin = time.milliseconds();
    long remainingWaitMs = maxWaitMs;
    long elapsed;
    
    //一直获取topic的元数据信息,直到获取成功,若获取时间超过maxWaitMs,则抛出异常
    do {
        if (partition != null) {
            log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
        } else {
            log.trace("Requesting metadata update for topic {}.", topic);
        }
        metadata.add(topic);
        int version = metadata.requestUpdate();
        sender.wakeup();
        try {
            metadata.awaitUpdate(version, remainingWaitMs);
        } catch (TimeoutException ex) {
            // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
            throw new TimeoutException(
                    String.format("Topic %s not present in metadata after %d ms.",
                            topic, maxWaitMs));
        }
        cluster = metadata.fetch();
        elapsed = time.milliseconds() - begin;
        if (elapsed >= maxWaitMs) {  //判断执行时间是否大于maxWaitMs
            throw new TimeoutException(partitionsCount == null ?
                    String.format("Topic %s not present in metadata after %d ms.",
                            topic, maxWaitMs) :
                    String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                            partition, topic, partitionsCount, maxWaitMs));
        }
        metadata.maybeThrowException();
        remainingWaitMs = maxWaitMs - elapsed;
        partitionsCount = cluster.partitionCountForTopic(topic);
    } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

    return new ClusterAndWaitTime(cluster, elapsed);
}

Из его реализации видно, что причиной блокировки потока является следующая логика:

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException

В процессе выполнения отправки через KafkaProducer сначала необходимо получить метаданные, и это непрерывная циклическая операция до тех пор, пока получение не будет успешным или не будет выдано исключение.

На самом деле, нет никаких проблем с первоначальным намерением Кафки достичь этого, потому что предпосылкой отправки сообщения является получение информации о границе и теме.Проблема в том, что метод отправки открыт для внешнего мира, но внутренняя реализация заблокирован, поэтому в некоторых случаях эта ситуация не принимается во внимание в то время, как только возникает исключение границы или темы, системный поток будет заблокирован, в результате чего система будет медленно реагировать, пока не произойдет сбой.

задача решена

На самом деле решение этой проблемы очень простое — создать несколько потоков отдельно для отправки сообщения, чтобы даже при возникновении непредвиденной ситуации блокировались только несколько потоков, что не вызовет большой площади системные потоки блокируются и становятся недоступными Конкретная реализация:

import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}

class ProducerF[K,V](kafkaProducer: KafkaProducer[K,V]) {

  val executor: ExecutorService = Executors.newScheduledThreadPool(1)

  def sendAsync(producerRecord: ProducerRecord[K,V], callback: Callback) = {
    executor.submit(new Callable[RecordMetadata]() {
      def call = kafkaProducer.send(producerRecord, callback).get()
    })
  }
}

Это метод реализации. Конечно, вы также можете поддерживать версию Kafka самостоятельно, но это может быть немного проблематично. Конкретный метод выбора зависит от вашего собственного сценария.

Пример использования:

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

object FixExample extends App {

  val props = new Properties()
  props.put("max.block.ms", "3000")
  props.put("bootstrap.servers", "localhost:9092")
  props.put("client.id", "ProducerSendFixExample")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

  val producer = new KafkaProducer[String, String](props)
  val topic = "topic-trace-one"
  val userId = "godpan"
  val msg = "login wechat"
  val data = new ProducerRecord[String, String](topic, userId, msg)

  val startTime = System.currentTimeMillis()
  val producerF = new ProducerF(producer)
  producerF.sendAsync(data,(metadata: RecordMetadata, exception: Exception) => {
    println(s"[producerF-sendAsync] data producerRecord: ${data}, exception: ${exception}")
  })

  // 如果想要得到发送结果,可以线程等待4s
  // Thread.sleep(4000)
  System.exit(0)

}

Соответствующий код загружен на github по адресу:kafka-send-async-bug-fix.