Эта статья из:Технологическое сообщество PerfMa
В последние две недели я начал брать на себя работу apache flink по полному мониторингу данных, включая логику статистики индикаторов, сопоставление бизнес-правил и т. д., а результаты вычислений записывались в elasticsearch в режиме реального времени. Я столкнулся с проблемой, из-за которой производственная среда не могла нормально перезапуститься. Я несу ответственность за эту проблему. Проведите расследование и последующие действия.
Первый шаг, базовое исследование
Во-первых, я получил логи jobmanager и taskmanager.Я быстро нашел два основных типа ошибок из журнала taskmanager, одна была npe, а другая была исключением, что не удалось найти индекс.
Приемник Elasticsearch предоставляет интерфейс обратного вызова до и после записи данных, чтобы позволить разработчикам заданий обрабатывать исключения или успешные записи.Если во время обработки исключения возникнет исключение, платформа завершит задачу и вызовет перезапуск задания.
npe легко исправить, index not found — это небольшая ошибка в сервисе, создавшем индекс, это мелкие проблемы.
Дело в том, что в логе вижу другую ошибку:
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Unknown Source)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.<init>(RecordWriter.java:122)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.createRecordWriter(RecordWriter.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriter(StreamTask.java:1202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1170)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:212)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:190)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.<init>(OneInputStreamTask.java:52)
at sun.reflect.GeneratedConstructorAccessor4.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
at java.lang.Thread.run(Unknown Source)
Такого рода исключения обычно вызваны слишком маленьким параметром nproc или исчерпанием физической памяти.После проверки ulimit и memory обнаруживается, что они в норме, что довольно странно.
Второй шаг, анализ jstack и jmap
У Perfma есть продукт под названием xland, которым я тоже пользуюсь впервые.Должен сказать, что он действительно потрясающий и простой в использовании! Во-первых, создайте дамп информации о стеке потоков и памяти рассматриваемого диспетчера задач.Конкретная команда:
jstatck pid > 生成的文件名
jmap -dump:format=b,file=生成的文件名 进程号
Затем импортируйте эти два файла в xland, и xland может напрямую видеть общее количество потоков, которое может легко искать и подсчитывать количество потоков, количество экземпляров и т. д.
Первая проблема, которую я обнаружил, заключается в том, что общее количество потоков в этом диспетчере задач составляет 17000+.Это число, очевидно, немного велико.На данный момент я хочу посмотреть, какой тип потоков больше.Xland может легко искать и считать.В в этот раз обращаю внимание Много тем одного типа, всего 15520
Не могу посмотреть информацию о звонках верхнего слоя, только с http клиента apache.По джобфлоу первое что приходит в голову, это то, что RestHighLevelClient стокера es использует эту штуку.
Затем мы считаем количество объектов RestHighLevelClient в xland и обнаруживаем, что их сотни.Очевидно, здесь есть проблема.
Третий шаг – найти конкретную проблему
С помощью предыдущего xland мы можем легко определить проблему с esclient. В нашей домашней работе мы используем es-клиент в двух местах: одно — es-поглотитель, который используется es-поглотителем — RestHighLevelClient, а другое — es-клиент, написанный нашими одноклассниками, который также использует RestHighLevelClient, который используется отдельно в ElasticsearchSinkFunction of es sinker.Конструкция, используемая для поиска чего-то для слияния перед записью es, а также для создания кеша
1. Подозрение на ошибку RestHighLevelClient
Мы проходим тест, чтобы убедиться, что это проблема с RestHighLevelClient
Запустите задание, которое просто использует ES Sinker, отрегулируйте параллелизм и обратите внимание, что есть больше Количество потоков диспетчера ввода-вывода и, наконец, выяснилось, что один приемник es также будет иметь 240+ Потоки диспетчера ввода-вывода, настраивая параллелизм, все диспетчеры задач Общее количество потоков диспетчера ввода-вывода в основном пропорционально параллелизму. Прекратите писать задания es. В настоящее время все диспетчеры задач не имеют потоков диспетчера ввода-вывода.
Кажется, что диспетчер ввода-вывода имеет большое количество потоков, что кажется «нормальным».
2. Убейте задание и посмотрите, нормально ли перерабатывается поток.Убить задание, поток диспетчера ввода/вывода становится 0, кажется, что использование приемника es нормально
На данный момент в основном можно судить, что это проблема клиента es, который мы написали сами. В чем проблема?
Давайте проведем еще один тест, чтобы подтвердить
3. Запустите проблемное задание, завершите задание и наблюдайте за количеством потоков диспетчера ввода-вывода.Перезапустите все диспетчеры задач flink, задайте «чистую» среду и обнаружите, что после завершения задания все еще есть потоки диспетчера ввода-вывода. Этот тест может судить о том, что в нашем клиенте es есть утечка потока.
В-четвертых, принцип
По сути, es-поглотитель представляет собой RichSinkFunction, а RichSinkFunction имеет методы open и close.В методе close es-поглотитель корректно закрывает http-клиент.
@Override
public void close() throws Exception {
if (bulkProcessor != null) {
bulkProcessor.close();
bulkProcessor = null;
}
if (client != null) {
client.close();
client = null;
}
callBridge.cleanup();
// make sure any errors from callbacks are rethrown
checkErrorAndRethrow();
}
И наш клиент es не закрывается должным образом.
Конкретный принцип должен быть следующим: когда в приемнике es возникает исключение, такое как npe или запись es reject, задание будет перезапущено с помощью flink. и освобождаем некоторые ресурсы, и пишем In ElasticsearchSinkFunction, es-клиент не будет заботиться фреймворком, и мы не можем предопределить логику закрытия клиента после перезапуска таким образом.
Если при построении используется синглтон, теоретически должна быть возможность избежать утечек потоков и утечек памяти, вызванных непрерывным построением клиента es при многократном перезапуске задания, но возникает проблема с записью синглтонов. проверьте, volatile не добавляется, и при этом Блокировка именно эта, не класс.
V. Резюме
1. xland действительно прост в использовании, и он очень полезен для решения проблем.
2. Внешние клиенты, используемые в заданиях flink, не должны создаваться отдельно. Вместо этого используйте метод, аналогичный RichFunction, чтобы предоставить методы открытия и закрытия, чтобы обеспечить правильное освобождение ресурсов с помощью flink.
3. Лучше всего называть используемые объекты, созданные потоки, пулы потоков и т. д., чтобы облегчить использование xland для последующего устранения неполадок.Если у вас есть опыт, вы должны подсчитать объект класса-оболочки, используемый для построения клиента es из начало.число.
Давайте учиться вместе: