схема
У Kafka есть проблема с потерей сообщений.Потеря сообщений бывает трех типов: брокер, производитель и потребитель.
Broker
Потеря сообщений брокером вызвана самой Kafka.Чтобы получить более высокую производительность и пропускную способность, Kafka хранит данные на диске асинхронно в пакетах. В процессе сброса сообщений, чтобы повысить производительность и уменьшить количество сбросов, Kafka применяет метод пакетного сброса. То есть диск сбрасывается по определенному количеству сообщений и временным интервалам. Этот механизм также определяется операционной системой Linux. Когда данные хранятся в операционной системе Linux, они сначала будут храниться в кэше страниц, а диск будет очищаться (из кэша страниц в файл) в зависимости от времени или других условий, или диск будет принудительно очищен через команда fsync. Когда данные находятся в кэше страниц, в случае зависания системы данные будут потеряны.
На приведенном выше рисунке кратко описан процесс записи и синхронизации данных брокером. Брокер записывает данные только в PageCache, а pageCache находится в памяти. Эта часть данных будет потеряна после сбоя питания. Данные pageCache сбрасываются программой флешера Linux. Есть три триггерных условия для чистки зубов:
- Активно вызывать функцию синхронизации или fsync
- Доступная память ниже порогового значения
- Время грязных данных достигает порога. Грязный — это идентификатор кэша страниц. Когда данные записываются в кэш страниц, кэш страниц помечается как грязный. После сброса данных флаг загрязнения сбрасывается.
Брокер настраивает механизм сброса, вызывая функцию fsync, чтобы взять на себя действие сброса. С точки зрения отдельного Брокера данные pageCache будут потеряны.
Kafka не предоставляет возможности синхронного обновления диска. Синхронная очистка реализована в RocketMQ.Принцип реализации заключается в том, чтобы заблокировать процесс асинхронной очистки и дождаться ответа, аналогично обратному вызову ajax или java future. Ниже приведен исходный код RocketMQ.
- GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
- service.putRequest(request);
- логическое значение flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // сбросить
То есть теоретически Kafka не может полностью гарантировать, что ни один брокер не потеряет сообщения, эту ситуацию можно только смягчить, настроив параметры механизма сброса. Например, уменьшите интервал очистки и уменьшите размер данных очистки. Чем короче время, тем хуже производительность и выше надежность (насколько это возможно). Это вопрос с множественным выбором.
Чтобы решить эту проблему, Kafka использует производителя и брокера для совместной работы в случае отсутствия параметров у одного брокера. Как только производитель обнаружит, что сообщение брокера потеряно, он может автоматически повторить попытку. Сообщение не будет потеряно, если количество повторных попыток не превысит пороговое значение (настраиваемое). В этом случае клиент-производитель должен справиться с ситуацией вручную. Так как же производитель обнаруживает потерю данных? Это через механизм подтверждения, аналогичный трехстороннему рукопожатию http.
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0
If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and theretries
конфигурация не вступит в силу (поскольку клиент, как правило, не узнает ни о каких сбоях). Смещение, возвращаемое для каждой записи, всегда будет равно -1.acks=1
This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.acks=all
This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.
Приведенная выше ссылка является официальной кафкой для параметров.acks
Пояснение (в старых версиях этот параметр былrequest.required.acks
).
- acks=0, производитель не ждет ответа брокера, что является наиболее эффективным, но сообщение, скорее всего, будет потеряно.
- acks=1, после получения сообщения ведущим брокером он возвращает подтверждение, не дожидаясь ответа от других последователей. Также можно понять, что номер подтверждения равен 1. В этот момент, если ведомый не получил сообщение, синхронизированное лидером, и лидер завис, сообщение будет потеряно. Согласно примеру на рисунке выше, если лидер получает сообщение и успешно записывает его в PageCache, он возвращает ack.В это время производитель считает, что сообщение было успешно отправлено. Но в это время, судя по приведенному выше рисунку, данные не были синхронизированы с ведомым устройством. Если лидер потеряет власть в это время, данные будут потеряны.
- acks=-1, после того как ведущий брокер получает сообщение, он приостанавливается и ждет, пока все последователи в списке ISR вернут результат, а затем возвращает подтверждение. -1 эквивалентно
all
. В этой конфигурации только лидер записывает данные в кэш страниц и не будет возвращать подтверждение, а все ISR должны возвращать «успех», чтобы активировать подтверждение. Если в это время отключится питание, производитель может узнать, что сообщение не было отправлено успешно, и отправит его повторно. Если ведомое устройство успешно возвращает подтверждение после получения данных, ведущее устройство отключается, а данные сохраняются в исходном ведомом устройстве. После переизбрания новый лидер будет владеть этой частью данных. Синхронизация данных от ведущего к ведомому требует 2 шагов:-
Данные сбрасываются из pageCache на диск. Потому что только данные на диске могут быть синхронизированы с репликой.
-
Данные синхронизируются с репликой, и реплика успешно записывает данные в PageCache. После того, как производитель получит подтверждение, даже если все машины будут отключены, данные по крайней мере будут существовать на диске лидера.
-
В третьем пункте выше упоминался повторитель списка ISR, который необходимо сопоставить с другим параметром, чтобы лучше обеспечить достоверность ack. ISR — это «список надежных подписчиков», поддерживаемый брокером, список синхронизированных реплик, а конфигурация брокера содержит параметр:min.insync.replicas
. Этот параметр указывает минимальное количество реплик в ISR. Если это значение не установлено, список последователей в ISR может быть пустым. Это эквивалентно acks=1.
Как показано выше:
- acks=0, общее время f(t) = f(1).
- acks=1, общее время f(t) = f(1) + f(2).
- acks=-1, общее время f(t) = f(1) + max(f(A), f(B)) + f(2).
Производительность снижается, а надежность возрастает.
Producer
Производитель теряет сообщения, что происходит на стороне клиента производителя.
Чтобы повысить эффективность и сократить количество операций ввода-вывода, производитель может объединять несколько запросов и отправлять их при отправке данных. Объединенный запрос отправляется в буфер первой строки в локальном буфере. Метод кеширования аналогичен описанному выше сбросу: производитель может упаковывать запрос в «блоки» или отправлять данные в буфер в соответствии с временным интервалом. Через буфер мы можем преобразовать производителя в асинхронный способ, что может повысить нашу эффективность отправки.
Однако данные в буфере опасны. В обычных условиях асинхронный вызов клиента может обработать сбой или тайм-аут отправки сообщения через обратный вызов, однако, как только производитель будет незаконно остановлен, данные в буфере будут потеряны, и брокер не сможет получить эту часть сообщения. данные. Или, когда клиент-производитель исчерпает память, если выбранная стратегия заключается в отбрасывании сообщений (другая стратегия — блокировка блоков), сообщения также будут потеряны. Или генерация сообщения (асинхронная генерация) выполняется слишком быстро, что приводит к слишком большому количеству приостановленных потоков и нехватке памяти, что приводит к сбою программы и потере сообщения.
В соответствии с приведенным выше рисунком можно придумать несколько решений:
- Отправляйте сообщения асинхронно, а не синхронно. Или когда служба генерирует сообщение, используется заблокированный пул потоков, а количество потоков имеет определенный верхний предел. Общая идея состоит в том, чтобы контролировать скорость генерации сообщений.
- Разверните конфигурацию емкости Buffer. Этот метод может облегчить возникновение ситуации, но не предотвратить ее.
- Сервис не отправляет сообщение напрямую в буфер (память), а записывает сообщение на локальный диск (базу данных или файл) и отправляет сообщение другим (или небольшим количеством) производственных потоков. Это эквивалентно добавлению буферного слоя с большим пространством между буфером и сервисом.
Consumer
Потребители потребляют сообщения на следующих этапах:
- получить сообщение
- обработать сообщение
- Отзыв "обработан" (зафиксирован)
Существует два основных способа потребления потребителей:
- Автоматическая фиксация смещения, Автоматическая фиксация смещения
- Отправить смещение вручную, ручное управление смещением
Механизм автоматической фиксации Потребителя заключается в фиксации полученного сообщения в соответствии с определенным интервалом времени. Процесс фиксации и процесс потребления сообщений являются асинхронными. Другими словами, может быть неудачный процесс потребления (например, создание исключения), и сообщение фиксации было отправлено. В этот момент сообщение теряется.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 自动提交开关
props.put("enable.auto.commit", "true");
// 自动提交的时间间隔,此处是1s
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
// 调用poll后,1000ms后,消息状态会被改为 committed
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
insertIntoDB(record); // 将消息入库,时间可能会超过1000ms
}
Приведенный выше пример является примером автоматической фиксации. Если в это время,insertIntoDB(record)
В случае возникновения исключения сообщение будет потеряно. Вот пример ручной фиксации:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 关闭自动提交,改为手动提交
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
// 调用poll后,不会进行auto commit
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
// 所有消息消费完毕以后,才进行commit操作
consumer.commitSync();
buffer.clear();
}
}
После изменения типа фиксации на ручной сообщение гарантированно будет «использовано хотя бы один раз». Однако в это время может произойти повторное потребление, и повторное потребление не относится к сфере действия этой статьи.
В приведенных выше двух примерах напрямую используется API высокого уровня потребителя, а клиент прозрачен для смещения и других элементов управления. Вы также можете использовать низкоуровневый API для ручного управления смещением, что также может гарантировать, что сообщение не будет потеряно, но это будет сложнее.
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 精确控制offset
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
Автор: информация Связь:blog.dogchao.cn/?p=305
Наконец, как обычно, у Amway есть волна нашего публичного аккаунта: «Отдел исследований и разработок терминала». В настоящее время мы рекомендуем каждый день высококачественную статью, связанную с технологиями, в основном делясь технологиями, связанными с Java, и навыками интервью. Наша цель — узнать что это такое и почему , Заложи хорошую основу и сделай все хорошо!Этот публичный аккаунт главной креативной технологии супер достоин всеобщего внимания.