Очередь сообщений | RocketMq некоторые очереди блокируют и не потребляют

RocketMQ

Добро пожаловать в официальную учетную запись [sharedCode], посвященную анализу исходного кода основного промежуточного программного обеспечения, вы можете связаться со мной напрямую.

Личный сайт блогера:www.shared-code.com

Мелкие партнеры компании сообщили, что в кластере RocketMq, за который они отвечали, внезапно появились две очереди, которые не потреблялись, а накопление сообщений достигло более 10 000. Это точно ненормально.

Ниже приводится фактическая ситуация с потреблением группы потребителей в то время.

Как видно из рисунка выше, две очереди серьезно заблокированы, а о смещениях давно не сообщается.

По моему собственному опыту в прошлом, в целом RocketMq часть потребления очереди терпит неудачу, в основном по следующим трем причинам.

  1. Есть проблема с количеством кластеров консьюмеров.Например,я уже писал статью ранее,в которой также было представлено,что некоторые очереди RocketMq не могут потребляться нормально.Если вам интересно,можете глянуть.Устранение проблемы, связанной с невозможностью использования некоторых очередей RocketMq.
  2. Неизбирательное использование потребительской группы потребителей означает, что вы не потребляете эту тему, но имя вашей потребительской группы такое же, как и у обычного потребления других людей.В это время некоторые очереди будут выделены вашему узлу, но эти узлы ваши собственные.Если нет потребления, это вызовет хаос потребления
  3. Потребительская нить заблокирована

Три пункта, перечисленные выше, в основном перечисляют ситуации, с которыми я столкнулся, и то, о чем я могу думать. Вы можете добавить их!

После проверки по порядку почти отсутствует возможность первого пункта и второго пункта.Все используют его относительно стандартно, поэтому третий пункт вызывает подозрения.

Лучший способ определить, заблокирован ли поток, — пройтиjstackКоманда для просмотра информации обо всем стеке потоков.

Здесь я используюgceasy.io/сделать анализ треда

первая тяга

второй рывок

Извлекая информацию о стеке потоков дважды, мы ясно видим, что оба раза выполняется 5 потоков-потребителей RocketMQ, что является большой проблемой. Далее посмотрите на код, исполняемый стеком,

ConsumeMessageThread_20
priority:5 - threadId:0x00007fcbbdca7000 - nativeId:0x3675 - nativeId (decimal):13941 - state:RUNNABLE
stackTrace:
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
at sun.security.ssl.InputRecord.read(InputRecord.java:532)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
- locked <0x00000000cceb4ab8> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
- locked <0x00000000cceb6098> (a sun.security.ssl.AppInputStream)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
- locked <0x00000000cceb6070> (a java.io.BufferedInputStream)
at sun.net.www.MeteredStream.read(MeteredStream.java:134)
- locked <0x00000000ccecbe30> (a sun.net.www.MeteredStream)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3454)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x00000000ccecab68> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
- locked <0x00000000ccecab68> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at com.xxx.utils.GaodeUtil.readFileByUrl(GaodeUtil.java:87)
at com.xxx.utils.GaodeUtil.getDirection(GaodeUtil.java:73)
at com.xxx.utils.GaodeUtil.getDistanceByLngLat(GaodeUtil.java:198)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Информация о стеке 5 потоков почти одинакова, и видно, что все они заблокированы.GaodeUtil.readFileByUrlВыше этого метода нижний уровень окончательно блокируется на socketRead.

Бизнес-код легко найти, перейдите непосредственно к коду, чтобы просмотреть код.

private String readFileByUrl(String urlString) {
        try {
            StringBuffer html = new StringBuffer();
            URL url = new URL(urlString);
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            InputStreamReader isr = new InputStreamReader(conn.getInputStream(), "UTF-8");
            BufferedReader br = new BufferedReader(isr);
            String temp;
            while ((temp = br.readLine()) != null) {
                html.append(temp).append("\n");
            }
            br.close();
            isr.close();
            return html.toString();
        } catch (Exception e) {
            log.error("GaodeUtil.readFileByUrl error:", e);
            return null;
        }
    }

readFileByUrlКод как выше.Эти сволочи не очень хорошо подумали над написанием кода.В этом явно проявляется проблема.При инициации http соединения нет таймаута.

HttpURLConnection основан на протоколе HTTP, а его нижний уровень реализуется посредством связи через сокеты. Если тайм-аут не установлен, в случае ненормальной работы сети программа может зависнуть и больше не выполняться.

Хорошо, проблема прояснена, потому что HTTP-запрос инициируется, когда служба обработки сообщений потребляется, а время ожидания не установлено, что приводит к зависанию программы непосредственно в случае сетевой аномалии.

Решение состоит в том, чтобы перезагрузить машину (для восстановления бизнес-операций), изменить код и увеличить период ожидания.