предисловие
······
1. Маленький кейс Kafka Producer
Предположим, теперь у нас есть система электронной коммерции. Все пользователи, которые могут войти в систему, являются ее участниками. Ценность участников отражается в том, сколько денег они тратят, и будут накапливаться соответствующие баллы. Баллы можно обменять на подарочные наборы, купоны... и т.д.
Пришло время снова рисовать 👌. Прежде всего, мы должны сначала прийти к системе заказов, тогда в этой системе заказов обязательно будут генерироваться логи данных.Теперь нужно записать эти логи в Kafka, и мы используем json для записи логов. Выписка на рисунке представляет собой статус заказа, который на данный момент оплачен.
В настоящее время нашим потребителем должна быть система членства, которая должна накапливать баллы для этого члена, чей идентификатор равен 1. Конечно, ситуация, которую необходимо учитывать, заключается в том, что участник также может выполнить операцию возврата, и соответствующие баллы также будут уменьшены. заявление отменено в это время
мыпоследняя лекцияВ параметрах настройки в , упоминается, что мы можем установить ключ для каждого сообщения, или мы не можем его указать.Этот ключ связан с тем, в какой раздел какой темы мы хотим отправить это сообщение. Например, у нас сейчас есть топик с названием tellYourDream, под топиком два раздела, и в двух разделах две реплики (в это время на фолловера не обращаем внимания, т.к. его данные являются лидером синхронизации)
Topic:tellYourDream
p0:leader partition <- follower partition
p1:leader partition <- follower partition
еслине указывать ключкогда сообщение отправленоОн будет отправлен в раздел в режиме опроса. То есть, например, у меня первое сообщение одно, потом это одно отправляется на p0, второе — два, оно отправляется на p1, следующие три — p0, четвертое — p1... и так далее.
Если указать ключ, например мой ключ message1, Kafka получит хеш-значение этого ключа, а затем полученное число будет по модулю количества наших партиций, а потом какая партиция будет определяться по значению по модулю (например, теперь мы p0 , два раздела p1, значение по модулю будет только 0, 1), если по модулю равно 0, оно будет отправлено в p0, а если по модулю равно 1, он будет отправлен на p1, таким образомГарантируется, что сообщения с одним и тем же ключом будут отправлены в один и тот же раздел(Вы также можете использоватьЭта функция указывает, что определенные сообщения должны быть отправлены в указанный раздел). Похож ли этот подход на перетасовку в MapReduce, поэтому эти платформы больших данных действительно обладают хорошей функциональной совместимостью.
Для только что упомянутой системы членства, если сообщение, когда пользователь размещает заказ, отправляется на p0, а сообщение о возмещении отправляется на p1, неизбежно произойдет, что потребитель сначала потребляет сообщение в p1.1000 баллов были вычтены до этого. очки увеличились, и будут проблемы с отображением. Итак, чтобы гарантировать, что сообщения одного и того же пользователя отправляются в один и тот же раздел, нам нужно указать ключ.
раздел кода
Так какПринцип производителя Кафки и описание важных параметровМы уже объяснили все настройки prop.put ниже, поэтому на этот раз мы будем напрямую использовать Ctrl+C и Ctrl+V. По сути, это извлечение кода создания производителя в этот момент в метод createProducer().
public class OrderProducer {
public static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("buffer.memory", 33554432);
props.put("compression.type", "lz4");
props.put("batch.size", 32768);
props.put("linger.ms", 100);
props.put("retries", 10);//5 10
props.put("retry.backoff.ms", 300);
props.put("request.required.acks", "1");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
return producer;
}
Вот только кусок кода сообщения, который выдает формат JSON, и он тоже извлекается в метод.
public static JSONObject createRecord() {
JSONObject order=new JSONObject();
order.put("userId", 12344);
order.put("amount", 100.0);
order.put("statement", "pay");
return order;
}
Здесь прямое создание производителей и сообщений.В это время ключ может использовать userId или id заказа, что не является большой проблемой.
public static void main(String[] args) throws Exception {
KafkaProducer<String, String> producer = createProducer();
JSONObject order=createRecord();
ProducerRecord<String, String> record = new ProducerRecord<>(
"tellYourDream",order.getString("userId") ,order.toString());
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null) {
System.out.println("消息发送成功");
} else {
//进行处理
}
}
});
Thread.sleep(10000);
producer.close();
}
}
В это время, если сообщение все еще является ненормальным после выполнения механизма повторной попытки, более строгие проекты компании будут иметь резервные ссылки, такие как хранение данных в MySQL, Redis и т. д., чтобы гарантировать, что сообщение не будет потеряно.
Дополнение: нестандартный раздел (сами понимаете)
Поскольку механизм, предоставленный самой Kafka, в основном удовлетворил использование в производственной среде, поэтому этот раздел не будет подробно объясняться. Кроме того, есть кастомная сериализация и кастомные перехватчики, которые нечасто используются в работе, если вы ими пользуетесь, то наверняка сможете научиться на Baidu.
Например, в записях о звонках записи обращений в службу поддержки должны храниться в одном разделе, а остальные записи равномерно распределяться по остальным разделам. Мы продемонстрируем этот случай.Если вы хотите настроить ситуацию, вы должны реализовать интерфейс Partition, а затем реализовать три метода.Сказано реализовать три.На самом деле, основной метод заключается в реализации метода partition().
package com.bonc.rdpe.kafka110.partitioner;
import java.util.List;import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
/**
* @Title PhonenumPartitioner.java
* @Description 自定义分区器
* @Date 2018-06-25 14:58:14
*/
public class PhonenumPartitioner implements Partitioner{
@Override
public void configure(Map<String, ?> configs) {
// TODO nothing
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 得到 topic 的 partitions 信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 模拟某客服
if(key.toString().equals("10000") || key.toString().equals("11111")) {
// 放到最后一个分区中
return numPartitions - 1;
}
String phoneNum = key.toString();
return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1);
}
@Override
public void close() {
// TODO nothing
}
}
Используйте пользовательский разделитель
package com.bonc.rdpe.kafka110.producer;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title PartitionerProducer.java
* @Description 测试自定义分区器
* @Date 2018-06-25 15:10:04
*/public class PartitionerProducer {
private static final String[] PHONE_NUMS = new String[]{
"10000", "10000", "11111", "13700000003", "13700000004",
"10000", "15500000006", "11111", "15500000008",
"17600000009", "10000", "17600000011"
};
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
// 设置分区器
props.put("partitioner.class", "com.bonc.rdpe.kafka110.partitioner.PhonenumPartitioner");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
int count = 0;
int length = PHONE_NUMS.length;
while(count < 10) {
Random rand = new Random();
String phoneNum = PHONE_NUMS[rand.nextInt(length)];
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", phoneNum, phoneNum);
RecordMetadata metadata = producer.send(record).get();
String result = "phonenum [" + record.value() + "] has been sent to partition " + metadata.partition();
System.out.println(result);
Thread.sleep(500);
count++;
}
producer.close();
}
}
Результат пользовательского раздела:
Во-вторых, анализ потребительских принципов Кафки.
1. смещение смещения
В это время, пожалуйста, покиньте наш кластер kafka снова, есть несколько потребителей, которые одновременно потребляют информацию в кластере.
Если программа выполнялась стабильно, проблем со всем нашим процессом не будет, но что, если программа перестанет выполняться? Возможно, в программе есть ошибка, а может быть, мы вручную остановили программу из-за нашей модификации. Когда следующее восстановление, где потребители должны начать потреблять?
Topic:tellYourDream ConsumerA
tellYourDream:p0(10000)
tellYourDream:p1(10001)
Смещение похоже на понимание индексов массива, например, почему индексы массивов должны начинаться с 0, исходя из модели памяти массивов. Он определяется расстоянием между позицией массива и первым адресом.array[0] — это позиция со смещением 0, которое является первым адресом. array[k] — позиция, смещенная на k. Точно так же понимается и смещение в kafka.Это смещение фактически используется для записи позиции. Используется для определения того, что потребитель использовал это местоположение на этот раз.
В kafka kafka не помогает поддерживать это смещение смещения, это смещение должно поддерживаться самим потребителем. Kafka предоставляет два параметра о смещении, один из которыхenable_auto_commit, когда для этого параметра установлено значение true, все данные будут повторно потребляться при каждом перезапуске kafka. другойauto_commit_interval_ms, это временной интервал для каждого смещения фиксации.
Место хранения этого смещения хранилось в zookeeper до версии 0.8 (опять же, старайтесь не использовать kafka до версии 0.8). Этот дизайн явно проблематичен. Весь кластер kafka имеет много тем, и в системе есть тысячи потребителей, которые их потребляют. Если смещение хранится в zookeeper, потребители должны каждый раз отправлять это значение в zookeeper. , Может ли zookeeper встать к этому? Если вы думаете, что сейчас со студентами проблем нет, значит, вы просто плохо учились.Интерлюдия: практика развертывания, эксплуатации и обслуживания кластера Kafka3.4 --- Информация о потреблении в , идите и быстро просмотрите ее🤣.
После версии 0.8 Kafka сохранила это смещение во внутренней теме с именем Consumer_offset. Эта внутренняя тема по умолчанию имеет 50 разделов, и мы знаем, что группы потребителей имеют для них group.id. Отправляя прошлое,ключ это group.id+topic+номер раздела(это дляУбедитесь, что смещения данных одного и того же раздела в кластере Kakfa передаются в один и тот же раздел Consumer_offset.). Это предложение немного искажено, но, пожалуйста, прочтите его.
value — это значение текущего смещения. Время от времени Kafka внутренне сжимает эту тему. То есть каждый номер group.id+topic+partition содержит самую последнюю часть данных. И поскольку этот Consumer_offsets может получать много одновременных запросов, раздел по умолчанию равен 50, поэтому, если ваша kafka развертывает большой кластер, например, 50 машин, вы можете использовать 50 машин, чтобы противостоять давлению запросов, отправленному смещением, намного лучше.
2.Coordinator
Каждая группа потребителей выберет одинbrokerКак ваш собственный координатор,Отвечает за мониторинг сердцебиения каждого потребителя в этой группе потребителей и оценку того, не работает ли он, а затем включает перебалансировку., В соответствии с внутренним механизмом отбора будет выбран соответствующий Брокер.Кафка будет равномерно распределять каждую группу потребителей для каждого Брокера в качестве координатора для управления.Каждый потребитель в группе потребителей будет соответствовать выбранной группе потребителей сразу после ее запуска. брокер, где находится координатор, связывается, а затем координатор выделяет разделы потребителю для потребления. Координатор будет распределять разделы как можно более равномерно для каждого потребителя для потребления.
2.1 Как выбрать координатора?
Сначала хешируйте groupId группы потребителей, а затем возьмите по модулю количество разделов в Consumer_offsets.По умолчанию 50. Вы можете установить его через offsets.topic.num.partitions, чтобы узнать, какой раздел Consumer_offsets имеет смещение ваша группа потребителей должна быть представлена. Например: groupId, "membership-consumer-group" -> хеш-значение (число) -> по модулю 50 (результат может быть только от 0 до 49, что является той же процедурой в прошлом) -> знать, что группа потребителей в разделе Когда все потребители отправляют смещения, в какой раздел они отправляют смещения, найдите раздел Consumer_offsets (количество реплик разделов Consumer_offset равно 1 по умолчанию, и есть только один лидер), а затем найдите брокера, в котором соответствующий лидер расположен для этого раздела, брокер является координатором группы потребителей, и тогда потребитель будетПоддерживайте соединение Socket для связи с брокером.
На самом деле, простое объяснение состоит в том, чтобы найти число в Consumer_offsets и соответствующий ему раздел. После того, как модуль равен 2, найдите второй раздел в 50 разделах Consumer_offsets, который равен p1. После того, как модуль равен 10, найдите десятый раздел из 50 разделов Consumer_offsets, который равен p9.
2.2 Какую работу выполняет координатор
Затем координатор выберет потребителя-лидера (кто зарегистрируется первым, тот и будет лидером), в это время координатор также доложит потребителю-лидеру ситуацию по всей теме, а потребитель-лидер сформулирует план потребления. После этого будет отправлен запрос SyncGroup для возврата плана потребления координатору.
Подытожу короткой фразой:
Во-первых, есть группа потребителей.Эта группа потребителей будет иметь свой номер group.id.На основании этого можно вычислить, какой брокер является их координатором.После определения координатора все потребители отправят запрос на вступление в группу для регистрации. После этого координатор выберет первого зарегистрированного потребителя ведущим потребителем по умолчанию и сообщит ведущему потребителю ситуацию по всей теме. После этого ведущий потребитель сформулирует план потребления в соответствии с идеей балансировки нагрузки и вернет его координатору.После того, как координатор получит план, он будет разослан всем потребителям для завершения процесса.
Потребитель отправит тактовый сигнал координатору.Можно считать, что потребитель является подчиненным узлом, а координатор — ведущим узлом. Когда потребитель перестает поддерживать связь с координатором в течение длительного времени, задача, поставленная перед потребителем, будет выполняться повторно. Если потребитель лидера будет прерван, будет переизбран новый лидер, а затем будут выполнены только что упомянутые шаги.
2.3 Балансировка нагрузки схемы разделов
Если потребитель присоединяется или временно уходит, ведущий потребитель должен сформулировать новый план потребления.
Например, тема, которую мы используем, состоит из 12 разделов: р0, р1, р2, р3, р4, р5, р6, р7, р8, р9, р10, р11
Предположим, у нас есть три потребителя в нашей группе потребителей.
2.3.1 диапазонная стратегия
Стратегия диапазона заключается в следовании диапазону порядковых номеров раздела.
p0~3 consumer1
p4~7 consumer2
p8~11 consumer3
2.3.2 Стратегия циклического перебора
consumer1:0,3,6,9
consumer2:1,4,7,10
consumer3:2,5,8,11
Но есть проблема с предыдущими двумя решениями: Предположим, завис потребительr1: p0-5 назначен потребителю2, p6-11 назначен потребителю3 В этом случае разделы p6 и p7, изначально расположенные на узле Consumer2, будут выделены узлу Consumer3.
2.3.3.липкая стратегия
Последняя липкая стратегия состоит в том, чтобы обеспечить как можно больше при ребалансировке, пусть оригинал принадлежит этому потребителю. Разделы по-прежнему принадлежат им, Затем распределите избыточные разделы равномерно, чтобы максимально сохранить исходную стратегию распределения разделов.
consumer1:0-3
consumer2: 4-7
consumer3: 8-11
假设consumer3挂了
consumer1:0-3,+8,9
consumer2: 4-7,+10,11
2.3.4 Механизм генерации ребаланса
Во время перебалансировки вы могли использовать данные раздела 3. В результате часть данных была использована, а смещение не было отправлено. В результате перебалансировка назначит раздел 3 другому потребителю. Если вы отправите смещение данных раздела 3, Может ли это работать? Это не должно работать, поэтому каждый ребаланс будет запускать генерацию группы потребителей, генерацию, каждое поколение будет увеличиваться на 1, и тогда вы не сможете отправить смещение предыдущего поколения, этот раздел может больше не принадлежать вам, все следуют ему Новая схема выделения разделов повторно потребляет данные.
Вышеперечисленное — более важные вещи, а затем пришло время для расслабленного и счастливого кода.
В-третьих, часть потребительского кода
На самом деле нельзя сказать, что они точно такие же, как у производителя, но структура точно такая же, поэтому будет короче, чем у производителя. Поскольку эти вещи уже известны, многие вещи несложно решить с помощью поисковых систем.
public class ConsumerDemo {
private static ExecutorService threadPool = Executors.newFixedThreadPool(20);
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> consumer = createConsumer();
//指定消费的主题
consumer.subscribe(Arrays.asList("order-topic"));
try {
while(true) {
//这里设置的是一个超时时间
ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE);
//对消费到的数据进行业务处理
for(ConsumerRecord<String, String> record : records) {
JSONObject order = JSONObject.parseObject(record.value());
threadPool.submit(new CreditManageTask(order));
}
}
} catch(Exception e) {
e.printStackTrace();
consumer.close();
}
}
private static KafkaConsumer<String, String> createConsumer() {
//设置参数的环节
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("heartbeat.interval.ms", 1000); // 这个尽量时间可以短一点
props.put("session.timeout.ms", 10 * 1000); // 如果说kafka broker在10秒内感知不到一个consumer心跳
props.put("max.poll.interval.ms", 30 * 1000); // 如果30秒才去执行下一次poll
// 就会认为那个consumer挂了,此时会触发rebalance
// 如果说某个consumer挂了,kafka broker感知到了,会触发一个rebalance的操作,就是分配他的分区
// 给其他的cosumer来消费,其他的consumer如果要感知到rebalance重新分配分区,就需要通过心跳来感知
// 心跳的间隔一般不要太长,1000,500
props.put("fetch.max.bytes", 10485760);
props.put("max.poll.records", 500); // 如果说你的消费的吞吐量特别大,此时可以适当提高一些
props.put("connection.max.idle.ms", -1); // 不要去回收那个socket连接
// 开启自动提交,他只会每隔一段时间去提交一次offset
// 如果你每次要重启一下consumer的话,他一定会把一些数据重新消费一遍
props.put("enable.auto.commit", "true");
// 每次自动提交offset的一个时间间隔
props.put("auto.commit.ineterval.ms", "1000");
// 每次重启都是从最早的offset开始读取,不是接着上一次
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
return consumer;
}
static class CreditManageTask implements Runnable {
private JSONObject order;
public CreditManageTask(JSONObject order) {
this.order = order;
}
@Override
public void run() {
System.out.println("对订单进行积分的维护......" + order.toJSONString());
// 就可以做一系列的数据库的增删改查的事务操作
}
}
}
3.1 Основные параметры потребителей
3.1.1 [сердцебиение.интервал.мс]
Время пульса потребителя, вы должны сохранить пульсацию, чтобы узнать, неисправен ли потребитель, а затем, если произойдет ошибка, команда перебалансировки будет выдана другим потребителям через пульсацию, чтобы уведомить их о выполнении операции перебалансировки.
3.1.2 [сеанс.время ожидания.мс]
Сколько времени нужно kafka, чтобы воспринять потребителя как неудачника, по умолчанию 10 секунд
3.1.3 [макс.интервал опроса.мс]
Если это время будет превышено между двумя операциями опроса, то будет считаться, что вычислительная мощность потребления слишком слаба, и оно будет выброшено из группы потребления, а раздел будет выделен другим для потребления, что сочетается с вашей собственной бизнес-обработкой.
3.1.4 [выборка.макс.байт]
Получить максимальное количество байтов сообщения, обычно рекомендуется устанавливать большее число
3.1.5 [макс.записей опроса]
Максимальное количество сообщений, возвращаемых одним опросом, по умолчанию 500.
3.1.6 [connection.max.idle.ms]
Если сокетное соединение между потребителем и посредником не используется более определенного времени, соединение будет автоматически восстановлено в это время, но сокетное соединение должно быть восстановлено для следующего потребления.Эта рекомендация имеет значение -1. , не перерабатывать
3.1.7 [авто.смещение.сброс]
earliest:
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
topicA -> partition0:1000
partitino1:2000
latest:
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从当前位置开始消费
none:
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
Примечание. Обычно мы устанавливаем самые последние версии нашего производства.
3.1.8 【enable.auto.commit】
Это единственный способ включить автофиксацию
3.1.9 [auto.commit.ineterval.ms]
Это относится к тому, как часто условие смещается
4. Время приема пищи: дополнить содержание, не упомянутое в первой статье.
журнал двоичного поиска
На самом деле, его также можно назватьРазреженный индекс. Это также структура, похожая на таблицу пропуска.. Откройте раздел под темой, мы можем увидеть некоторые файлы, подобные этому
00000000000000000000.index(偏移量的索引)
00000000000000000000.log(日志文件)
00000000000000000000.timeindex(时间的索引)
Для файлов сегментов журнала файл .log соответствует двум индексным файлам, .index и .timeindex. Когда kafka записывает файлы журналов, она также записывает индексные файлы, а именно .index и .timeindex, один из которых является индексом смещения, а другой — индексом отметки времени.
По умолчанию есть параметр log.index.interval.bytes который ограничивает сколько данных пишется в лог файл.В файле индекса надо писать индекс, по умолчанию 4кб, писать 4кб данных и потом писать индекс в индексе, поэтому сам индекс представляет собой индекс в разреженном формате, а не один индекс для каждой части данных. Более того, данные в индексном файле сортируются в порядке возрастания по смещению и отметке времени, поэтому при поиске индекса Kafka будет использовать бинарный поиск.Временная сложность O(logN).Если индекс найден, его можно находится в файле .log data.
Вышеуказанные 0, 2039... Они представляют физические местоположения. Почему разреженный индекс быстрее, чем чтение напрямую по одному?Он не записывает каждую часть данных, это способ записи нескольких частей данных, но, например, если вы хотите потреблять данные со смещением 7, просто сначала посмотрите на этот разреженный.Запись в индексе, когда найдено 6, 7 больше, чем 6, затем непосредственно посмотрите на следующие данные, найдите 8, 8 больше, чем 7, а затем посмотрите назад, определите, что 7 между 6 и 8, а физическое местоположение 6 находится в Физическое местоположение 9807,8 находится в 12345, найдите его прямо посередине из них. Это увеличивает скорость нахождения физических местоположений. Это похоже на бинарный поиск в обычных случаях.
ISR-механизм
Просто полагаясь на механизм множественного копирования, можно обеспечить высокую доступность Kafka, но может ли он гарантировать, что данные не будут потеряны? Нет, потому что если лидер отключен, но данные лидера не были синхронизированы с ведомым, даже если ведомый выбран новым лидером, предыдущие данные будут потеряны.
ISR — это синхронизированная реплика, т.Не отставайте от лидера разделаколичество подчиненных разделов,Только последователи в списке ISR могут быть избраны новым лидером после того, как лидер выйдет из строя., потому что данные, представляющие его в этом списке ISR, синхронизируются с лидером.
Если вы хотите, чтобы данные, записанные в kafka, не были потеряны, вам во-первых нужно убедиться, что в ISR есть хотя бы один фолловер, а во-вторых, после того, как кусок данных будет записан в раздел лидера, его необходимо скопировать чтобы все последующие разделы в ISR представляли эти данные.Отправлено и никогда не будет потеряно, это обещание, данное Кафкой
При каких обстоятельствах реплика будет выкинута из ISR?Если реплика не синхронизирует данные с лидером более 10 секунд, то она будет выкинута из списка ISR. Но если эта проблема будет решена (дрожание сети или полный gc и т. д.), ведомый снова синхронизируется с лидером, и лидер будет иметь суждение.Если разница в данных небольшая, ведомый будет воссоединен, так что разница?Она только небольшая, давайте оставим исходный код для объяснения.
finally
На этот раз места очень и очень много, а мест для понимания много.На самом деле в ядре kafka на самом деле есть принцип HW&LEO, но мне лень продолжать писать ххх. Давайте поговорим об этом в следующей статье исходного кода.