Kafka — это распределенная, разделяемая, реплицируемая, публикуемая/подписываемая система обмена сообщениями, Kafka в основном используется в больших полях данных, разумеется, в распределенных системах. В настоящее время популярная на рынке очередь сообщений RocketMQ основана на принципе Ali Kafka и разработана с использованием Java.
Kafka подходит для потребления сообщений в автономном и онлайн-режиме, и ее сообщения хранятся на диске.
Kafka суммирует сообщения в блоках тем, производители отправляют (Push) сообщения в темы, а потребители (извлекают) сообщения, подписанные на темы.
Базовые концепты
Основные понятия очередей сообщений особенно важны.После глубокого понимания основных понятий принципы и общие проблемы очередей сообщений станут более очевидными.
-
Брокер: один сервер Kafka является брокером. Основная задача брокера — получать сообщения от производителей, распределять смещения и затем сохранять упакованные данные на диск; кроме того, брокеры также получают сообщения от потребителей и других брокеров. обрабатывается соответственно типу запроса, а затем возвращается ответ. Несколько брокеров могут быть объединены в кластер (кластер) для предоставления внешних услуг. Каждый кластер выберет брокера, который будет выступать в качестве контроллера. Контроллер является командным центром кластера Kafka, а другие брокеры следуют команде контроллера для реализации соответствующих функций. . Контроллер отвечает за управление состоянием раздела, управление состоянием реплики каждого раздела, мониторинг изменений данных в zookeeper и т. д. Контроллер также является реализацией одного ведущего и нескольких ведомых.Все брокеры будут следить за состоянием ведущего контроллера.В случае сбоя ведущего контроллера он переизберет нового ведущего контроллера.
-
Сообщение. Сообщение — это основная единица сообщения в Kafka. Сообщение состоит из строки байтов, которые в основном состоят из ключа и значения, оба из которых являются массивами байтов. Основная функция ключа — направить сообщение в указанный раздел в соответствии с определенной стратегией, чтобы гарантировать, что все сообщения, содержащие один и тот же ключ, будут записаны в один и тот же раздел.
-
Тема: Тема — это логическое понятие для хранения сообщений.Тема может рассматриваться как набор сообщений. В каждой теме может быть несколько производителей, отправляющих в нее сообщения, или несколько потребителей, извлекающих в нее сообщения.
-
Раздел: Каждая тема может быть разделена на несколько разделов (каждая тема имеет как минимум один раздел), и разные разделы будут выделены для разных брокеров, чтобы расширить Kafka по горизонтали, чтобы увеличить возможности параллельной обработки Kafka. Разные разделы одной темы содержат разные сообщения. Когда каждое сообщение добавляется в раздел, ему будет присвоено смещение, которое является уникальным номером сообщения в этом разделе. Кроме того, Kafka использует смещение для обеспечения порядка сообщений в разделе. Последовательность смещений не меняется. кросс-разделы, то есть сообщения в одном и том же разделе Kafka упорядочены, а сообщения в разных разделах не могут быть упорядочены. Концептуальная карта перегородок
-
Журнал: раздел логически соответствует журналу.Когда производитель записывает сообщение в раздел, оно фактически записывается в журнал. Журнал — это логическое понятие, соответствующее папке на диске. Журнал состоит из нескольких сегментов, и каждый сегмент соответствует файлу журнала и индексному файлу.
-
Реплика: Kafka создает избыточные резервные копии сообщений, каждый раздел может иметь несколько реплик, и сообщения, содержащиеся в каждой реплике, одинаковы (но не гарантируется, что они будут точно такими же в одно и то же время). Типы реплик делятся на Ведущих и Последователей, Когда в разделе есть только одна реплика, реплика принадлежит Лидеру, а реплики нет. Последователь. Реплика Kafka имеет определенный механизм синхронизации.В каждом наборе реплик реплика будет выбрана в качестве ведущей реплики.Кафка будет использовать разные стратегии выбора в разных сценариях. Все запросы на чтение и запись в Kafka обрабатываются выбранной копией-лидером, а остальные используются в качестве копий-последователей, копия-последователь только подтягивает данные из копии-лидера в локальную и синхронно обновляет их в собственный журнал.
Копия раздела:
-
Производитель: производители в основном создают сообщения и отправляют сообщения в тематические разделы в соответствии с определенными правилами.
-
Потребитель: Потребители в основном извлекают сообщения из тем и потребляют их. Потребитель хранит информацию о том, какую позицию (значение смещения) потребитель потребляет в разделе. **В Kafka несколько потребителей могут образовывать группу потребителей, а потребитель может принадлежать только к одной группе потребителей. Группа потребителей гарантирует, что каждый раздел в теме, на которую она подписана, назначается для обработки только одному потребителю в группе потребителей, поэтому, если необходимо реализовать широковещательное потребление сообщений, потребители могут быть помещены в несколько разных групп потребителей. **За счет динамического добавления соответствующего количества потребителей в группу потребителей операция перебалансировки Kafka может использоваться для перераспределения соответствующих отношений между разделами и потребителями, тем самым реализуя возможность горизонтального масштабирования.
-
Набор ISR: набор ISR представляет собой набор реплик, который в настоящее время доступен (действующий) и имеет объем сообщений, аналогичный объему сообщений лидера, то есть подмножество всего набора реплик. Узлы, на которых расположены реплики в наборе ISR, все подключены к ZK, кроме того, разница между смещением последнего сообщения реплики и смещением последнего сообщения реплики-лидера не может превышать заданный порог. Ведущая реплика каждого раздела поддерживает набор ISR для этого раздела. Как упоминалось выше, копия-лидер сделала запрос на запись сообщения, а копия-последователь вытянет написанное сообщение от лидера.Во втором процессе будет состояние, когда количество сообщений в копии-последователе меньше, чем ведущая копия, если разница меньше указанного порога, то реплика, установленная в это время, является набором ISR.
основное использование
Начать Кафку
Вот начальная установка одного экземпляра Kafka:
- Загрузка с официального сайта
kafka_2.11-1.0.0.tgz
, затем распаковать - Запустите службу zk, которая поставляется с Kafka,
./bin/windows/zookeeper-server-start.bat ./conf/zookeeper.properties
- Запустите сервер Кафка,
./bin/windows/kafka-server-start.bat ./conf/server.properties
Командная строка использует Kafka (например, Windows, Linux нужно использовать только соответствующий файл .sh для выполнения команды):
- Создайте тему с именем demo, укажите количество разделов как 1 и количество фабрик реплик как 1.
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
- Список всех тем:
kafka-topics.bat --list --zookeeper localhost:2181
- Чтобы отправить сообщение в указанную тему, сначала войдите в командный терминал:
kafka-console-producer.bat --broker-list localhost:9092 --topic demo
, затем введите сообщение в командном терминалеHello World!
- Укажите, чтобы начать потребление сообщений из головы очереди сообщений:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic demo --from-beginning
Java вызывает API с помощью Kafka:
/**
* @Name: ProducerDemo
* @Description: Kafka服务端进行消息的Push
* @Author: BeautifulSoup
* @Date: 2018年2月1日 下午11:24:39
*/
public class ProducerDemo {
public static void main(String[] args) {
//构造Kafka的配置项
Properties properties=new Properties();
//定义Kafka服务端的主机名和端口号
properties.put("bootstrap.servers", "localhost:9092");
//定义客户端的ID
properties.put("client.id", "DemoProducer");
//定义消息的key和value的数据类型都是字节数组
properties.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//创建生产者的核心类
KafkaProducer producer=new KafkaProducer<>(properties);
//指定topic的名称
String topic = "demo";
//定义消息的key
int messageNo=1;
while(true){
//定义消息的value
String messageStr="Message_"+messageNo;
long startTime=System.currentTimeMillis();
//异步的发送消息
producer.send(new ProducerRecord<>(topic, messageNo,messageStr,new Callback() {
//消息发送成功之后收到了Kafka服务端发来的ACK确认消息之后,就回调下面的方法
//metadata保存着生产者发送过来的消息的元数据,如果消息的发送过程中出现了异常,则改参数的值为null
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime=System.currentTimeMillis()-startTime;
if(null!=metadata){
System.out.println("消息发送给的分区是:"+metadata.partition()+",消息的发送一共用了:"+elapsedTime+"ms");
}else{
exception.printStackTrace();
}
}
}));
}
}
}
/**
* @Name: ConsumerDemo
* @Description: Kafka客户端进行消息的Pull
* @Author: BeautifulSoup
* @Date: 2018年2月10日 下午11:24:58
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties properties=new Properties();
properties.put("bootstrap.servers","localhost:9092");
//指定Consumer Group的id
properties.put("group.id", "BeautifulSoup");
//自动提交offset
properties.put("enable.auto.commit", "true");
//自动提交offset的时间间隔
properties.put("auto.commit.interval.ms","1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer=new KafkaConsumer<>(properties);
//指定消费者订阅的topic
consumer.subscribe(Arrays.asList("demo","test"));
try{
while(true){
//从服务端开始拉取消息,每次的poll都会拉取多个消息
ConsumerRecords<String, String> records=consumer.poll(100);
for (ConsumerRecord<String,String> consumerRecord : records) {
System.out.println("消息记录的位置:"+consumerRecord.offset()+",消息的键:"+consumerRecord.key()+",消息的值:"+consumerRecord.value());
}
}
}finally{
//关闭consumer
consumer.close();
}
}
}