предисловие
Изначально серия серий обновлялась по всеобщей просьбе, но вроде с эффектом кафки первой статьи все в порядке, так что чем больше серий будет усерднее (в конце концов, кто не хочет смотреть на себя, то лайков становится больше ххх🤣 ), в прошлой статье было сказано одно Дело для иллюстрации того, как рассматривать развертывание кластера kafka.Его можно считать эталоном.Ведь у всех работающих в разных компаниях обязательно будет свой набор планов внедрения.
На этот раз мы вернемся к основной проблеме, на этот раз мы продолжим стиль первой статьи и познакомим вас с диаграммой шаг за шагом. расслабиться и быть счастливым
1. Продюсерский принцип Кафки
Во-первых, у нас должен быть кластер, а затем в кластере несколько серверов, каждый сервер мы называем Broker, который на самом деле является процессом Kafka.
если ты помнишьпервый разСодержимое догадаться несложно, и обязательно будет контроллер, несколько фолловеров и кластер zookeeper.В начале наши брокеры будут зарегистрированы на нашем кластере zookeeper.
Затем контроллер также будет отслеживать изменения кластера zookeeper и изменять свои собственные метаданные при изменении кластера. И последователи также будут обращаться к своему главному контроллеру для синхронизации информации метаданных, чтобы информация метаданных на всех серверах в кластере Kafka была согласованной.
После того, как вышеуказанные приготовления завершены, мы официально запускаем контент нашего производителя.
① Существительное 1 --- ProducerRecord
Прежде чем производителю потребуется отправить сообщение в кластер, он должен сначалаКаждое сообщение инкапсулируется в объект ProducerRecord., что делается внутри производителя. Затем он пройдет через процесс сериализации. Как упоминалось в нескольких предыдущих столбцах, все данные, которые необходимо передать по сети, представляют собой двоичные байтовые данные, которые необходимо сериализовать перед передачей.
В этот момент будет проблема, нам нужно отправить сообщение в топик под топикомleader partition, а как производитель доберется до того, какой раздел в рамках этой темы является разделом-лидером?
Некоторые друзья, возможно, забыли, напомните мне, что контроллер может рассматриваться как лидер брокера, отвечающий за управление метаданными кластера, а лидерный раздел используется для балансировки нагрузки, и они будут храниться на разных серверах в распределенным образом. И производственные данные, и данные о потреблении в кластере обрабатываются для ведущего раздела.
② существительное 2 --- разделитель
Как узнать, какой раздел является ведущим, просто нужно получить метаданные.
Получить метаданные несложно, если вы найдете сервер в кластере (поскольку метаданные каждого сервера в кластере одинаковы)
③ Существительное 3 --- Буфер
В это время производитель не торопится отправлять сообщение, а сначала кладет его в буфер
④ Существительное 4 --- Отправитель
После помещения сообщения в буфер одновременно будетНезависимый отправитель потока для упаковки сообщений в пакеты в пакетах, не сложно сообразить, что если Кафка - это действительно посообщенная передача, а сообщение - сетевое соединение, то производительность будет тянуть очень плохо. Для повышения пропускной способности применяется пакетный подход.
После завершения каждого пакета он начинает отправляться на соответствующий хост. На данный момент, после модели в дизайне сети Kakfa, упомянутой в первой статье, она записывается в кэш ОС, а затем на диск.
На следующем рисунке показана модель проектирования сети Kafka, которую мы объяснили в то время.
⑤ Код производителя
1. Раздел настройки параметров
// 创建配置文件对象
Properties props = new Properties();
// 这个参数目的是为了获取kafka集群的元数据
// 写一台主机也行,多个更加保险
// 这里使用的是主机名,要根据server.properties来决定
// 使用主机名的情况需要配置电脑的hosts文件(重点)
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
// 这个就是负责把发送的key从字符串序列化为字节数组
// 我们可以给每个消息设置key,作用之后再阐述
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 这个就是负责把你发送的实际的message从字符串序列化为字节数组
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 以下属于调优,之后再解释
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
2. Создайте экземпляр производителя
// 创建一个Producer实例:线程资源,跟各个broker建立socket连接资源
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
3. Создайте сообщение
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic", "test-value");
Конечно, вы также можете указать ключ, который будет объяснен позже.
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic", "test-key", "test-value");
4. Отправить сообщение
С функцией обратного вызова, если нет исключения, она вернет сообщение, отправленное успешно.
// 这是异步发送的模式
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null) {
// 消息发送成功
System.out.println("消息发送成功");
} else {
// 消息发送失败,需要重新发送
}
}
});
Thread.sleep(10 * 1000);
// 这是同步发送的模式(是一般不会使用的,性能很差,测试可以使用)
// 你要一直等待人家后续一系列的步骤都做完,发送消息之后
// 有了消息的回应返回给你,你这个方法才会退出来
producer.send(record).get();
5. Закройте соединение
producer.close();
Во-вторых, время галантереи: код тюнинговой части
Часть, которая отличает прилежную машинистку, на самом деле является частью настройки, которая не упоминалась в 1, и они объясняются один за другим, что является следующей серией.
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 32384);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
① подтверждение подтверждения сообщения
props.put("acks", "-1");
acks | Сообщение успешно отправлено |
---|---|
-1 | лидер и все последователи получают |
1 | лидер получает |
0 | сообщение можно отправить |
Параметр acks имеет3 значения, -1,0,1, установка этих 3-х разных значений станет кафкойОснова для оценки того, успешно ли отправлено сообщение. Разделы в Kafka имеют реплики.если акс равен -1, тогда сообщение находится вПосле записи в ведущий раздел раздела, эти сообщения также требуютСинхронизируется со всеми остальными репликами этого разделаЗадний,быть успешно отправленным(соответствующий код для вывода System.out.println("сообщение успешно отправлено")), в это время снижается производительность отправки данных
еслиУстановите количество подтверждений на 1, пока отправляемое сообщение записывается в раздел-лидер, оно считается успешно отправленным, но этот метод имеет риск потери данных.Например, после того, как сообщение только что успешно отправлено в раздел-лидер, Раздел лидера немедленно отключается, а остальные последователи Независимо от того, кто выбирает лидера, только что отправленное сообщение не существует.
еслиустановить акки на 0, пока сообщение отправлено, по умолчанию оно будет отправлено успешно. Это не имеет значения.
② повторов количество повторов (важно)
Этот параметр по-прежнему очень важен. Это параметр, который необходимо установить в производственной среде. Он используется для установки количества повторных отправок сообщения.
props.put("retries", 3);
В kafka могут встречаться различные исключения (вы можете сразу перейти к дополнительным типам исключений ниже), но независимо от того, какое исключение встречается, в это время возникает проблема с отправкой сообщения, особенно если возникла внезапная проблема с сети, однако кластер не может каждый раз выбрасывать исключение, и сеть может восстановиться в следующую секунду, поэтому нам нужно настроить механизм повторных попыток.
Вот дополнение: После настройки ретритов 95% аномалий в кластере слетят сами, я не шучу 🤣
Я настраивал в коде 3 раза.На самом деле разумно ставить от 5 до 10 раз.Добавлю еще одно пояснение.Если нам нужно задать как часто повторять попытку один раз,то там тоже есть параметры.Если я правильно помню, это retry.backoff.ms. Я установлю его ниже. Повторять попытку каждые 100 миллисекунд, что составляет 0,1 секунды.
props.put("retry.backoff.ms",100);
③ размер партии размер партии
Размер пакета по умолчанию — 16 КБ, здесь установлено значение 32 КБ. Установка большего размера может немного улучшить пропускную способность. Установка размера этого пакета также связана с размером сообщения. Предположим, что размер сообщения составляет 16 КБ, а партия также 16 К. В этом случае партии не имеют смысла. Поэтому нам необходимо заранее оценить размер сообщений в кластере, обычно размер устанавливается несколько раз.
props.put("batch.size", 32384);
④ ограничение времени отправки linger.ms
Например, у меня сейчас установлен размер пакета 32К, а сообщение 2К.На данный момент отправлено 3 сообщения, а общий размер 6К.Однако сообщения от производителя нет, значит не хватает 32К.В таком случае он не отправляется на прошлый кластер? Явно нет, linger.ms устанавливается на фиксированное время, даже если пакет не полный, он будет отправлен, ниже я ставлю 100 миллисекунд, поэтому даже если мой пакет не будет полным 32К, пакет будет отправлен в кластер через 100 миллисекунд.
props.put("linger.ms", 100);
⑤ buffer.memory размер буфера
Когда наш поток Sender очень медленный, а производственные данные очень быстрые, если емкости буфера в нашей середине недостаточно, производитель больше не может продолжать производить данные, поэтому нам необходимо немного увеличить буферную память. , Размер буфера по умолчанию составляет 32 МБ, что в принципе разумно.
props.put("buffer.memory", 33554432);
Итак, как мы должны проверить, что мы должны настроить размер буфера в это время?Мы можем использовать общий метод Java для вычисления времени окончания минус время начала для тестирования.Когда время окончания минус время начала больше 100 мс, мы думаем, что поток Sender обрабатывается в это время, скорость низкая, и размер буфера необходимо увеличить.
Конечно, в обычных условиях нам не нужно устанавливать этот параметр, и 32M достаточно, чтобы справиться с ним в нормальных условиях.
Long startTime=System.currentTime();
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null) {
// 消息发送成功
System.out.println("消息发送成功");
} else {
// 消息发送失败,需要重新发送
}
}
});
Long endTime=System.currentTime();
If(endTime - startTime > 100){//说明内存被压满了
说明有问题
}
⑦ метод сжатия Compression.type
сжатие.тип, по умолчанию нет, без сжатия, но можно использовать сжатие lz4, эффективность все равно хорошая, после сжатия можно уменьшить объем данных и улучшить пропускную способность, но увеличит нагрузку на ЦП производителя боковая сторона
props.put("compression.type", lz4);
⑧ макс.блок.мс
Оставьте это исходному коду, чтобы объяснить, что он должен установить время блокировки определенных методов.
props.put("max.block.ms", 3000);
⑨ max.request.size максимальный размер сообщения
max.request.size: этот параметр используется для управления размером отправляемого сообщения. Значение по умолчанию – 1 048 576 байт, что составляет 1 М. Как правило, это слишком мало. Многие сообщения могут превышать размер 1 МБ, поэтому вам необходимо оптимизировать и настраивайте сами.Выставляется побольше (предприятия вообще ставят на 10М), а то программа работает нормально и вдруг приходит сообщение 2М, и система сообщает об ошибке, тогда выигрыш перевешивает проигрыш.
props.put("max.request.size", 1048576);
⑩ request.timeout.ms время ожидания запроса
request.timeout.ms: Это означает, что после отправки запроса у него есть ограничение по таймауту, по умолчанию 30 секунд, если в течение 30 секунд не получено ответа (т. подумает об исключении и сгенерирует исключение TimeoutException для нас. Если сеть компании плохая, отрегулируйте этот параметр соответствующим образом.
props.put("request.timeout.ms", 30000);
Добавлено: Исключения в kafka.
Независимо от того, является ли он асинхронным или синхронным, он может позволить вам обрабатывать исключения.
1)LeaderNotAvailableException: это означает, что если машина зависнет, ведущая копия в это время будет недоступна, что приведет к сбою записи. Прежде чем продолжить запись, вы должны дождаться, пока другие ведомые копии переключятся на ведущую копию. В это время вы можете повторите отправку. Если вы обычно перезапускаете процесс брокера kafka, это обязательно приведет к переключению лидеров, что обязательно заставит вас написать ошибку, которая является LeaderNotAvailableException
2)NotControllerException: Это тоже самое.Если Брокер, где находится Контроллер, зависнет, то в это время будет проблема, и вам нужно дождаться переизбрания Контролера. То же самое и в это время, просто попробуйте еще раз .
3)NetworkException: Сеть не в порядке, попробуйте еще раз Ранее мы настроили параметр retries, который будет автоматически повторять попытку, но если после нескольких повторных попыток все равно произойдет сбой, нам будет предоставлено исключение. Параметры: повторы. Значение по умолчанию — 3. Параметры: retry.backoff.ms Интервал времени между попытками
finally
Приведенный выше анализ процесса от производителя, создающего сообщение, до отправки приводит к следующим различным настройкам параметров всего процесса.Если вы можете четко понять эти базовые знания, я считаю, что это будет вам полезно. Позже мы рассмотрим еще один случай производителя и потребителя. Заинтересованные друзья могут обратить на это внимание, спасибо.