Запишите процесс устранения неполадок для исключений заданий Flink.

Flink

Эта статья из:Технологическое сообщество PerfMa

Официальный сайт PerfMa (Benma Network)

В последние две недели я начал брать на себя работу apache flink по полному мониторингу данных, включая логику статистики индикаторов, сопоставление бизнес-правил и т. д., а результаты вычислений записывались в elasticsearch в режиме реального времени. Я столкнулся с проблемой, из-за которой производственная среда не могла нормально перезапуститься. Я несу ответственность за эту проблему. Проведите расследование и последующие действия.

Первый шаг, базовое исследование

Во-первых, я получил логи jobmanager и taskmanager.Я быстро нашел два основных типа ошибок из журнала taskmanager, одна была npe, а другая была исключением, что не удалось найти индекс.

Приемник Elasticsearch предоставляет интерфейс обратного вызова до и после записи данных, чтобы позволить разработчикам заданий обрабатывать исключения или успешные записи.Если во время обработки исключения возникнет исключение, платформа завершит задачу и вызовет перезапуск задания.

npe легко исправить, index not found — это небольшая ошибка в сервисе, создавшем индекс, это мелкие проблемы.

Дело в том, что в логе вижу другую ошибку:

java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Unknown Source)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.<init>(RecordWriter.java:122)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.createRecordWriter(RecordWriter.java:321)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriter(StreamTask.java:1202)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1170)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:212)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:190)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.<init>(OneInputStreamTask.java:52)
	at sun.reflect.GeneratedConstructorAccessor4.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
	at java.lang.reflect.Constructor.newInstance(Unknown Source)
	at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
	at java.lang.Thread.run(Unknown Source)

Такого рода исключения обычно вызваны слишком маленьким параметром nproc или исчерпанием физической памяти.После проверки ulimit и memory обнаруживается, что они в норме, что довольно странно.

Второй шаг, анализ jstack и jmap

У Perfma есть продукт под названием xland, которым я тоже пользуюсь впервые.Должен сказать, что он действительно потрясающий и простой в использовании! Во-первых, создайте дамп информации о стеке потоков и памяти рассматриваемого диспетчера задач.Конкретная команда:

jstatck pid > 生成的文件名
jmap -dump:format=b,file=生成的文件名 进程号

Затем импортируйте эти два файла в xland, и xland может напрямую видеть общее количество потоков, которое может легко искать и подсчитывать количество потоков, количество экземпляров и т. д.

Первая проблема, которую я обнаружил, заключается в том, что общее количество потоков в этом диспетчере задач составляет 17000+.Это число, очевидно, немного велико.На данный момент я хочу посмотреть, какой тип потоков больше.Xland может легко искать и считать.В в этот раз обращаю внимание Много тем одного типа, всего 15520

image.png

Не могу посмотреть информацию о звонках верхнего слоя, только с http клиента apache.По джобфлоу первое что приходит в голову, это то, что RestHighLevelClient стокера es использует эту штуку.

Затем мы считаем количество объектов RestHighLevelClient в xland и обнаруживаем, что их сотни.Очевидно, здесь есть проблема.

Третий шаг – найти конкретную проблему

С помощью предыдущего xland мы можем легко определить проблему с esclient. В нашей домашней работе мы используем es-клиент в двух местах: одно — es-поглотитель, который используется es-поглотителем — RestHighLevelClient, а другое — es-клиент, написанный нашими одноклассниками, который также использует RestHighLevelClient, который используется отдельно в ElasticsearchSinkFunction of es sinker.Конструкция, используемая для поиска чего-то для слияния перед записью es, а также для создания кеша

1. Подозрение на ошибку RestHighLevelClient

Мы проходим тест, чтобы убедиться, что это проблема с RestHighLevelClient

Запустите задание, которое просто использует ES Sinker, отрегулируйте параллелизм и обратите внимание, что есть больше Количество потоков диспетчера ввода-вывода и, наконец, выяснилось, что один приемник es также будет иметь 240+ Потоки диспетчера ввода-вывода, настраивая параллелизм, все диспетчеры задач Общее количество потоков диспетчера ввода-вывода в основном пропорционально параллелизму. Прекратите писать задания es. В настоящее время все диспетчеры задач не имеют потоков диспетчера ввода-вывода.

Кажется, что диспетчер ввода-вывода имеет большое количество потоков, что кажется «нормальным».

2. Убейте задание и посмотрите, нормально ли перерабатывается поток.Убить задание, поток диспетчера ввода/вывода становится 0, кажется, что использование приемника es нормально

На данный момент в основном можно судить, что это проблема клиента es, который мы написали сами. В чем проблема?

Давайте проведем еще один тест, чтобы подтвердить

3. Запустите проблемное задание, завершите задание и наблюдайте за количеством потоков диспетчера ввода-вывода.Перезапустите все диспетчеры задач flink, задайте «чистую» среду и обнаружите, что после завершения задания все еще есть потоки диспетчера ввода-вывода. Этот тест может судить о том, что в нашем клиенте es есть утечка потока.

В-четвертых, принцип

По сути, es-поглотитель представляет собой RichSinkFunction, а RichSinkFunction имеет методы open и close.В методе close es-поглотитель корректно закрывает http-клиент.

@Override
	public void close() throws Exception {
		if (bulkProcessor != null) {
			bulkProcessor.close();
			bulkProcessor = null;
		}

		if (client != null) {
			client.close();
			client = null;
		}

		callBridge.cleanup();

		// make sure any errors from callbacks are rethrown
		checkErrorAndRethrow();
	}

И наш клиент es не закрывается должным образом.

Конкретный принцип должен быть следующим: когда в приемнике es возникает исключение, такое как npe или запись es reject, задание будет перезапущено с помощью flink. и освобождаем некоторые ресурсы, и пишем In ElasticsearchSinkFunction, es-клиент не будет заботиться фреймворком, и мы не можем предопределить логику закрытия клиента после перезапуска таким образом.

Если при построении используется синглтон, теоретически должна быть возможность избежать утечек потоков и утечек памяти, вызванных непрерывным построением клиента es при многократном перезапуске задания, но возникает проблема с записью синглтонов. проверьте, volatile не добавляется, и при этом Блокировка именно эта, не класс.

V. Резюме

1. xland действительно прост в использовании, и он очень полезен для решения проблем.

2. Внешние клиенты, используемые в заданиях flink, не должны создаваться отдельно. Вместо этого используйте метод, аналогичный RichFunction, чтобы предоставить методы открытия и закрытия, чтобы обеспечить правильное освобождение ресурсов с помощью flink.

3. Лучше всего называть используемые объекты, созданные потоки, пулы потоков и т. д., чтобы облегчить использование xland для последующего устранения неполадок.Если у вас есть опыт, вы должны подсчитать объект класса-оболочки, используемый для построения клиента es из начало.число.

Давайте учиться вместе:

Параметры JVM серии PerfMa KO [Память]

Анализ исходного кода и принципа работы FutureTask от JCU