Предпосылка: этот пример подходит для тем сообщений, в которых нет требований к порядку.
Благодаря ряду оптимизаций скорость записи и чтения Kafka может достигать десятков тысяч записей в секунду. Увеличивая количество разделов, можно увеличить параллельное потребление за счет развертывания нескольких потребителей. Тем не менее, есть еще много случаев, когда скорость выполнения некоторых сервисов слишком низкая.В настоящее время нам нужно использовать многопоточность, чтобы потреблять и повышать коэффициент использования машин приложений, вместо того, чтобы слепо увеличивать нагрузку на Kafka.
Создать потребителя kafka с помощью Spring очень просто. Путь, который мы выбираем, это унаследовать Кафку.ShutdownableThread
, а затем реализовать егоdoWork
метод.
Ссылаться на:GitHub.com/Apache/Кафка…
Многопоточное потребление данных в разделе
Даже если используется многопоточность, нам необходимо создать новый пул потоков.
Мы создали пул потоков с максимальной емкостью 20, и есть два параметра, на которые следует обратить внимание. (Ссылаться на"Сценарии многопоточного использования JAVA и меры предосторожности, краткая версия").Мы использовали нулевую мощностьSynchronousQueue
, один вход и один выход, чтобы избежать буферизации данных в очереди, чтобы при аварийном завершении работы системы можно было исключить возможность потери сообщений из-за блокировки очередей.
а затем использовалCallerRunsPolicy
Стратегия насыщения позволяет блокировать потребляющий поток Kafka, когда многопоточность не может быть обработана.
Затем закладываем реальную бизнес-логику в задачу для многопоточного выполнения, после каждого выполнения один раз вручную коммитимack
, что указывает на то, что я обработал сообщение. Так как эти задачи заявлены пулом потоков, последовательность не может быть гарантирована, некоторые задачи могут не выполняться, а последующие задачи уже отправили свои смещения.o.O
Но это пока не важно, просто пусть сначала работает параллельно.
К сожалению, когда мы запускаем программу, сразу выбрасывается исключение, и мы не можем продолжить.В программе прямо сказано:KafkaConsumer is not safe for multi-threaded access
Очевидно, что потребительская сторона Kafka не является потокобезопасной, она отказывается называть API таким образом. Первоначальный замысел kafka хорош, я хочу избежать некоторых проблем с параллельной средой, но мне действительно нужно использовать многопоточность.
Потребители Kafka определяют, инициирован ли запрос внешним потоком, сравнивая идентификатор потока вызывающего объекта.
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
да, толькоcommitSync
Функция размещается вне потока, и сначала отправляется подтверждение, а затем выполняется задача.
Присоединяйтесь к конвейеру
Сообщения, которые мы получаем, могут быть отфильтрованы до того, как они будут фактически выполнены, например, некоторые нулевые значения или суждения о конкретных условиях. Хотя его можно запускать прямо в потоке-потребителе, он очень грязный, и можно добавить модель производитель-потребитель (можно подумать, что это лишнее). Используемая здесь очередь блокировки по-прежнемуSynchronousQueue
, который действует как конвейер.
После того, как мы поставили задачу в пайплайн, мы сразу же ее фиксируем. Если пул потоков заполнен, он будет блокироваться в потоке-потребителе до тех пор, пока не появится вакансия. Затем мы запускаем отдельный поток для получения этих данных, и код, отправленный в эту часть, выглядит так.
Приложение может быть запущено, а скорость потребления очень быстрая.
конфигурация параметров
У Kafka много параметров, и нас больше волнуют следующие параметры.
max.poll.records
Вызовите опрос один раз и верните максимальное количество записей. Если это значение установлено большим, обработка будет медленной, и ее легко превысить.max.poll.interval.ms
Значение (по умолчанию 5 минут) приводит к отключению потребителя. В очень трудоемком потреблении необходимо обратить особое внимание.
enable.auto.commit
Включить ли автоматическую отправку (смещение) Если включено, информация о смещении, которую потребляет потребитель, будет периодически отправляться в Kafka (постоянное хранилище).
Когда включено автоматическое представление смещения, частота времени отправки запроса определяется параметромauto.commit.interval.ms
контроль.
fetch.max.wait.ms
Если объем данных, возвращаемых брокером, недостаточен (fetch.min.bytes), максимальное время ожидания запроса на выборку. Если объем данных соответствует потребностям, он вернется немедленно.
session.timeout.ms
Время ожидания сеанса потребителя. Если сервер не получил никаких запросов (включая обнаружение пульса) от потребителя в течение этого времени, сервер определит, что потребитель находится в автономном режиме. Чем больше значение, тем дольше сервер ожидает отказа потребителя и времени перебалансировки.
heartbeat.interval.ms
Интервал времени для обнаружения пульса между координатором потребителей и кластером kafka. Кластер kafka оценивает активность сеанса потребителя по пульсу, чтобы определить, находится ли потребитель в сети.Если он находится в автономном режиме, раздел, зарегистрированный потребителем, будет назначен другим потребителям в той же группе. Это значение должно быть меньше, чем «session.timeout.ms», то есть время истечения сеанса должно быть больше, чем интервал обнаружения пульса, обычно треть от session.timeout.ms, иначе обнаружение пульса будет бессмысленным.
В этом примере наши параметры просто устанавливаются следующим образом, в основном регулируя количество баров, получаемых каждый раз, и время обнаружения. Все остальное по умолчанию.
гарантия сообщения
Внимательные студенты могут заметить, что наш код все еще не полностью безопасен. Это потому, что мы отправили подтверждение заранее. При нормальной работе программы все в порядке. Однако, когда приложение аварийно закрывается, исполняемые сообщения, скорее всего, будут потеряны.Для приложений с очень высокими требованиями к согласованности нам нужно обеспечить это двумя способами.
использовать хук выключения
Первый - рассмотреть случай убийства-15. Этот метод относительно прост, до тех пор, пока включен метод отключения ShutdownableThread, приложение будет иметь возможность выполнить задачу в пуле резьбы, убедитесь, что приложение снова закрыто.
@Override
public void shutdown() {
super.shutdown();
executor.shutdown();
}
Использовать обработку журнала
Примените oom или просто убейте -9, и все станет грязным.
Поддерживайте отдельный файл журнала (или локальную базу данных), записывайте журнал перед фиксацией, а затем записывайте соответствующий журнал после фактического выполнения. Когда система запустится, прочитайте эти файлы журналов, найдите задачи, которые не были выполнены успешно, и выполните их снова.
Если вы хотите эффективности и надежности, вам придется много работать.
Обработка с помощью Redis
Этот метод похож на метод журнала, но поскольку Redis очень эффективен (до десятков тысяч) и удобен, он лучше, чем метод журнала.
Вы можете использовать структуру Hash для записи в Redis при отправке задачи, удалить это значение после выполнения задачи, а остальное — сообщение о проблеме.
Когда система запустится, сначала проверьте, нет ли ненормальных данных в Redis. Если есть, сначала обработайте данные, а затем используйте их в обычном режиме.End
Многопоточность — для повышения эффективности, Redis и т. д. — для повышения надежности. Бизнес-код очень легко написать, и вы сможете получить большую его часть после того, как поймете логику; бизнес-код иногда сложен, и вам нужно написать множество вспомогательных функций, чтобы повысить его эффективность и позаботиться о его границах.
С точки зрения программиста, наиболее конкурентоспособный код должен заботиться о граничных исключениях, которые возникают с малой вероятностью.
У Kafka есть различные компромиссы с точки зрения пропускной способности и надежности, многие из которых связаны с отношениями между рыбой и медвежьей лапой. Нам не нужно беспокоиться об этом, мы можем использовать внешние инструменты для получения большей выгоды. В этом случае вероятность краха Redis и краха приложения одновременно относительно мала. Сообщение 5 9 гарантированно достижимо, а остальные сообщения о несовершенных проблемах, почему вы не находите их в логе?
Дальнейшее чтение:
1, многопоточные сценарии использования JAVA и соображения простой версии
2. Индекс базовых знаний Кафки
3. Тест на 360 градусов: потеряет ли KAFKA данные? Соответствует ли его высокая доступность спросу?