Добро пожаловать в официальную учетную запись [sharedCode], посвященную анализу исходного кода основного промежуточного программного обеспечения, вы можете связаться со мной напрямую.
Личный сайт блогера:www.shared-code.com
Мелкие партнеры компании сообщили, что в кластере RocketMq, за который они отвечали, внезапно появились две очереди, которые не потреблялись, а накопление сообщений достигло более 10 000. Это точно ненормально.
Ниже приводится фактическая ситуация с потреблением группы потребителей в то время.
Как видно из рисунка выше, две очереди серьезно заблокированы, а о смещениях давно не сообщается.
По моему собственному опыту в прошлом, в целом RocketMq часть потребления очереди терпит неудачу, в основном по следующим трем причинам.
- Есть проблема с количеством кластеров консьюмеров.Например,я уже писал статью ранее,в которой также было представлено,что некоторые очереди RocketMq не могут потребляться нормально.Если вам интересно,можете глянуть.Устранение проблемы, связанной с невозможностью использования некоторых очередей RocketMq.
- Неизбирательное использование потребительской группы потребителей означает, что вы не потребляете эту тему, но имя вашей потребительской группы такое же, как и у обычного потребления других людей.В это время некоторые очереди будут выделены вашему узлу, но эти узлы ваши собственные.Если нет потребления, это вызовет хаос потребления
- Потребительская нить заблокирована
Три пункта, перечисленные выше, в основном перечисляют ситуации, с которыми я столкнулся, и то, о чем я могу думать. Вы можете добавить их!
После проверки по порядку почти отсутствует возможность первого пункта и второго пункта.Все используют его относительно стандартно, поэтому третий пункт вызывает подозрения.
Лучший способ определить, заблокирован ли поток, — пройтиjstack
Команда для просмотра информации обо всем стеке потоков.
Здесь я используюgceasy.io/сделать анализ треда
первая тяга
второй рывок
Извлекая информацию о стеке потоков дважды, мы ясно видим, что оба раза выполняется 5 потоков-потребителей RocketMQ, что является большой проблемой. Далее посмотрите на код, исполняемый стеком,
ConsumeMessageThread_20
priority:5 - threadId:0x00007fcbbdca7000 - nativeId:0x3675 - nativeId (decimal):13941 - state:RUNNABLE
stackTrace:
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
at sun.security.ssl.InputRecord.read(InputRecord.java:532)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
- locked <0x00000000cceb4ab8> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
- locked <0x00000000cceb6098> (a sun.security.ssl.AppInputStream)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
- locked <0x00000000cceb6070> (a java.io.BufferedInputStream)
at sun.net.www.MeteredStream.read(MeteredStream.java:134)
- locked <0x00000000ccecbe30> (a sun.net.www.MeteredStream)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3454)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x00000000ccecab68> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
- locked <0x00000000ccecab68> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at com.xxx.utils.GaodeUtil.readFileByUrl(GaodeUtil.java:87)
at com.xxx.utils.GaodeUtil.getDirection(GaodeUtil.java:73)
at com.xxx.utils.GaodeUtil.getDistanceByLngLat(GaodeUtil.java:198)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Информация о стеке 5 потоков почти одинакова, и видно, что все они заблокированы.GaodeUtil.readFileByUrl
Выше этого метода нижний уровень окончательно блокируется на socketRead.
Бизнес-код легко найти, перейдите непосредственно к коду, чтобы просмотреть код.
private String readFileByUrl(String urlString) {
try {
StringBuffer html = new StringBuffer();
URL url = new URL(urlString);
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
InputStreamReader isr = new InputStreamReader(conn.getInputStream(), "UTF-8");
BufferedReader br = new BufferedReader(isr);
String temp;
while ((temp = br.readLine()) != null) {
html.append(temp).append("\n");
}
br.close();
isr.close();
return html.toString();
} catch (Exception e) {
log.error("GaodeUtil.readFileByUrl error:", e);
return null;
}
}
readFileByUrl
Код как выше.Эти сволочи не очень хорошо подумали над написанием кода.В этом явно проявляется проблема.При инициации http соединения нет таймаута.
HttpURLConnection основан на протоколе HTTP, а его нижний уровень реализуется посредством связи через сокеты. Если тайм-аут не установлен, в случае ненормальной работы сети программа может зависнуть и больше не выполняться.
Хорошо, проблема прояснена, потому что HTTP-запрос инициируется, когда служба обработки сообщений потребляется, а время ожидания не установлено, что приводит к зависанию программы непосредственно в случае сетевой аномалии.
Решение состоит в том, чтобы перезагрузить машину (для восстановления бизнес-операций), изменить код и увеличить период ожидания.