Разговор об исключении увеличения отставания потребителя kafka

Java задняя часть Spring Kafka

последовательность

В этой статье в основном анализируется исключение, заключающееся в том, что задержка смещения потребителя kafka постоянно увеличивается.

Просмотр потребительского потребления

Group       Topic        Pid    Offset     logSize         Lag             Owner
demo-group demo-topic     0    9678273         9858394         180121          xxx-service-dpqpc-1510557406684-e2171bd6-0
demo-group demo-topic     1    9689443         9873522         184079          xxx-service-dpqpc-1510557406684-e2171bd6-1
demo-group demo-topic     2    9676875         9855874         178999          xxx-service-q7vch-1510557399475-b1d7d22c-0
demo-group demo-topic     3    9683393         9864518         181125          xxx-service-q7vch-1510557399475-b1d7d22c-1

Обнаружено, что разница между смещением потребителя и logSize слишком велика, а значение задержки превышает 10 Вт.

нормальная ситуация

Group           Topic         Pid   Offset          logSize         Lag             Owner
demo-group      demo-topic    0     9860587         9860587         0               demo-group_tomcat2-1512984437115-fc1ee57b-0
demo-group      demo-topic    1     9875814         9875814         0               demo-group_tomcat2-1512984437115-fc1ee57b-0
demo-group      demo-topic    2     9858213         9858214         1               demo-group_tomcat2-1512984437115-fc1ee57b-1
demo-group      demo-topic    3     9866744         9866744         0               demo-group_tomcat2-1512984437115-fc1ee57b-2

Это нормально иметь такой небольшой запаздывание.

Просмотр раздела темы

    Topic:demo-topic    PartitionCount:4    ReplicationFactor:2 Configs:
    Topic: demo-topic   Partition: 0    Leader: 3   Replicas: 3,4   Isr: 4,3
    Topic: demo-topic   Partition: 1    Leader: 4   Replicas: 4,1   Isr: 1,4
    Topic: demo-topic   Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1,2
    Topic: demo-topic   Partition: 3    Leader: 2   Replicas: 2,3   Isr: 2,3

Тема имеет 4 раздела, поэтому 4 потребителя потребляют нормально. Проблема может заключаться в том, что потребитель потребляет слишком медленно или потребитель потребляет ненормально.

Проверять

jstack -l pid

2017-12-27 04:06:23
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):

"Attach Listener" #12286 daemon prio=9 os_prio=0 tid=0x00007f2920001000 nid=0x3087 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"ConsumerFetcherThread-xxx-service-dpqpc-1510557406684-e2171bd6-0-3" #9263 prio=5 os_prio=0 tid=0x00007f287400d800 nid=0x2440 waiting on condition [0x00007f285e6eb000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007048874b0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"ConsumerFetcherThread-xxx-service-dpqpc-1510557406684-e2171bd6-0-4" #9262 prio=5 os_prio=0 tid=0x00007f28740c2800 nid=0x243f waiting on condition [0x00007f291950d000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007048086d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"xxx-service-dpqpc-1510557406684-e2171bd6-leader-finder-thread" #9261 prio=5 os_prio=0 tid=0x0000000002302800 nid=0x243e waiting on condition [0x00007f28bd1df000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000703d06518> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"consume-2" #62 prio=5 os_prio=0 tid=0x00007f28f8e86000 nid=0x51 waiting on condition [0x00007f28bd3e1000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000070440cd38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"consume-1" #61 prio=5 os_prio=0 tid=0x00007f28f8e84800 nid=0x50 waiting on condition [0x00007f28bd4e2000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000070440cd38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"xxx-service-dpqpc-1510557406684-e2171bd6_watcher_executor" #59 prio=5 os_prio=0 tid=0x00007f28fb685800 nid=0x4e waiting on condition [0x00007f28bd8e4000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007048878d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:544)

Вышеупомянутые потребления-1 и потребление-2 — это конкретные бизнес-потоки, которые потребляют Kafka.

error log

2017-12-16 12:53:34.257  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], begin rebalancing consumer xxx-service-q7vch-1510557399475-b1d7d22c try #1
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] kafka.consumer.ConsumerFetcherManager    : [ConsumerFetcherManager-1510557399586] Stopping leader finder thread
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] kafka.consumer.ConsumerFetcherManager    : [ConsumerFetcherManager-1510557399586] Stopping all fetchers
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] kafka.consumer.ConsumerFetcherManager    : [ConsumerFetcherManager-1510557399586] All connections stopped
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], Cleared all relevant queues for this fetcher
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], Cleared the data chunks in all the consumer message iterators
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], Committing all offsets after clearing the fetcher queues

Следов потребляемых сообщений в логе почти нет, но лагов действительно очень много.

Сначала я просмотрел журнал исключений, нашел это и добавил вышеуказанный jstack.Я увидел, что ConsumerFetcherThread блокируется в PartitionTopicInfo.enqueue.Я подозреваю, что это взаимоблокировка или блокировка, вызванная перебалансировкой. До того, как jstack забыл добавить -l, информацию о взаимоблокировке нельзя было увидеть. Я проверил в Интернете и увиделConsumerFetcherThread deadlock?Упомянуты похожие проблемы, но это пост 14-летней давности, и версия kafka0.8.2.2 должна была быть исправлена. тогда см.

The fetchers are blocked on the queue since it is full, is your consumer iterator stopped and hence not getting more data from it?

Я стал задумываться, не поймал ли мой бизнес-тред исключение и повесил трубку, значит, потребления не было. Перезапускаем программу, смотрим лог, чистим новости потребления. Сравните с jstack еще раз

"ConsumerFetcherThread-xxx-376jt-1514353818187-b37be1c0-0-3" #81 prio=5 os_prio=0 tid=0x00007fe39c004000 nid=0x63 waiting on condition [0x00007fe3931f4000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007822ac4e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"ConsumerFetcherThread-xxx-376jt-1514353818187-b37be1c0-0-4" #80 prio=5 os_prio=0 tid=0x00007fe39c003000 nid=0x62 waiting on condition [0x00007fe3926ea000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007821c9a68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"xxx-376jt-1514353818187-b37be1c0-leader-finder-thread" #79 prio=5 os_prio=0 tid=0x0000000001f5a000 nid=0x61 waiting on condition [0x00007fe3920e7000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000782154c30> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"consume-2" #62 prio=5 os_prio=0 tid=0x00007fe48da13800 nid=0x51 runnable [0x00007fe392ff1000]
   java.lang.Thread.State: RUNNABLE
    //......
    at org.springframework.data.mongodb.core.MongoTemplate.executeFindMultiInternal(MongoTemplate.java:1948)
    at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1768)
    at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1751)
    at org.springframework.data.mongodb.core.MongoTemplate.find(MongoTemplate.java:625)
    at org.springframework.data.mongodb.core.MongoTemplate.findOne(MongoTemplate.java:590)
    at org.springframework.data.mongodb.core.MongoTemplate.findOne(MongoTemplate.java:582)
    at com.xxx.consumer.KafkaStreamProcessor.process(KafkaStreamProcessor.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"consume-1" #61 prio=5 os_prio=0 tid=0x00007fe48e310000 nid=0x50 runnable [0x00007fe3930f2000]
   java.lang.Thread.State: RUNNABLE
    //...
    at org.springframework.data.mongodb.core.MongoTemplate$12.doInCollection(MongoTemplate.java:1157)
    at org.springframework.data.mongodb.core.MongoTemplate$12.doInCollection(MongoTemplate.java:1137)
    at org.springframework.data.mongodb.core.MongoTemplate.execute(MongoTemplate.java:463)
    at org.springframework.data.mongodb.core.MongoTemplate.doUpdate(MongoTemplate.java:1137)
    at org.springframework.data.mongodb.core.MongoTemplate.upsert(MongoTemplate.java:1099)
    at com.xxx.consumer.KafkaStreamProcessor.process(KafkaStreamProcessor.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"xxx-376jt-1514353818187-b37be1c0_watcher_executor" #59 prio=5 os_prio=0 tid=0x00007fe48fe7c000 nid=0x4e waiting on condition [0x00007fe3934f5000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000782155248> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:544)

Для сравнения обнаружено, что изначально подозреваемый ConsumerFetcherThread был заблокирован даже после перезапуска PartitionTopicInfo.enqueue, так что это может быть нормальным явлением.

При сравнении потребления-1 и потребления-2 была обнаружена проблема: рассматриваемый стек потока не видел собственного бизнес-метода, но после перезапуска бизнес-метод был найден. Поэтому постепенно стала ясна причина проблемы, потому что исключения catch не было.

деловой подход

Первоначальный бизнес-метод примерно выглядит следующим образом

@Async
public void process(KafkaStream<byte[], byte[]> stream){
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while (it.hasNext()) {
        System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));
    }
}

Здесь есть сомнение, что при отсутствии перехвата для исключения потока, теоретически, id нового потока должен быть снова инкрементирован, но путем экспериментов установлено, что id потока остается неизменным после того, как асинхронный метод выброшен, и выбрасывается исключение.

spring-core-4.3.13.RELEASE-sources.jar!/org/springframework/util/CustomizableThreadCreator.java

public class CustomizableThreadCreator implements Serializable {
    private final AtomicInteger threadCount = new AtomicInteger(0);
    /**
     * Template method for the creation of a new {@link Thread}.
     * <p>The default implementation creates a new Thread for the given
     * {@link Runnable}, applying an appropriate thread name.
     * @param runnable the Runnable to execute
     * @see #nextThreadName()
     */
    public Thread createThread(Runnable runnable) {
        Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
        thread.setPriority(getThreadPriority());
        thread.setDaemon(isDaemon());
        return thread;
    }

    /**
     * Return the thread name to use for a newly created {@link Thread}.
     * <p>The default implementation returns the specified thread name prefix
     * with an increasing thread count appended: e.g. "SimpleAsyncTaskExecutor-0".
     * @see #getThreadNamePrefix()
     */
    protected String nextThreadName() {
        return getThreadNamePrefix() + this.threadCount.incrementAndGet();
    }
    //...
}

ThreadCount здесь не видит вызываемый метод уменьшения, поэтому, если поток ненормально зависает, теоретически идентификатор вновь добавленного потока должен быть увеличен.

/Library/Java/JavaVirtualMachines/jdk1.8.0_71.jdk/Contents/Home/src.zip!/java/util/concurrent/ThreadPoolExecutor.java

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }    

Отладка показала, что CompleteAbruptly имеет значение false, поэтому бизнес-поток не генерирует исключений, что не является противоречием. Внезапно я вспомнил о перехвате асинхронных аннотаций и постепенно просветлел.

AsyncExecutionInterceptor

spring-aop-4.3.13.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

@Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }

        Callable<Object> task = new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future<?>) result).get();
                    }
                }
                catch (ExecutionException ex) {
                    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation.getArguments());
                }
                return null;
            }
        };

        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

Асинхронная аннотация перехватывается AsyncExecutionInterceptor, а затем упаковывается в слой для обработки исключения, поэтому в пуле потоков нет исключений.

резюме

  • При использовании kafka для потребления данных необходимо отслеживать значение задержки смещения в режиме реального времени, чтобы убедиться, что скорость потребления в порядке.
  • Поток-потребитель итератора, вызывающий KafkaStream, должен перехватить исключение, иначе будет выдано исключение и потребление будет остановлено.

doc