Первый разговор о Кафке

Java задняя часть сервер Kafka

Kafka — это распределенная, разделяемая, реплицируемая, публикуемая/подписываемая система обмена сообщениями, Kafka в основном используется в больших полях данных, разумеется, в распределенных системах. В настоящее время популярная на рынке очередь сообщений RocketMQ основана на принципе Ali Kafka и разработана с использованием Java.

Kafka подходит для потребления сообщений в автономном и онлайн-режиме, и ее сообщения хранятся на диске.

Kafka суммирует сообщения в блоках тем, производители отправляют (Push) сообщения в темы, а потребители (извлекают) сообщения, подписанные на темы.

Базовые концепты

Основные понятия очередей сообщений особенно важны.После глубокого понимания основных понятий принципы и общие проблемы очередей сообщений станут более очевидными.

  1. Брокер: один сервер Kafka является брокером. Основная задача брокера — получать сообщения от производителей, распределять смещения и затем сохранять упакованные данные на диск; кроме того, брокеры также получают сообщения от потребителей и других брокеров. обрабатывается соответственно типу запроса, а затем возвращается ответ. Несколько брокеров могут быть объединены в кластер (кластер) для предоставления внешних услуг. Каждый кластер выберет брокера, который будет выступать в качестве контроллера. Контроллер является командным центром кластера Kafka, а другие брокеры следуют команде контроллера для реализации соответствующих функций. . Контроллер отвечает за управление состоянием раздела, управление состоянием реплики каждого раздела, мониторинг изменений данных в zookeeper и т. д. Контроллер также является реализацией одного ведущего и нескольких ведомых.Все брокеры будут следить за состоянием ведущего контроллера.В случае сбоя ведущего контроллера он переизберет нового ведущего контроллера.

  2. Сообщение. Сообщение — это основная единица сообщения в Kafka. Сообщение состоит из строки байтов, которые в основном состоят из ключа и значения, оба из которых являются массивами байтов. Основная функция ключа — направить сообщение в указанный раздел в соответствии с определенной стратегией, чтобы гарантировать, что все сообщения, содержащие один и тот же ключ, будут записаны в один и тот же раздел.

  3. Тема: Тема — это логическое понятие для хранения сообщений.Тема может рассматриваться как набор сообщений. В каждой теме может быть несколько производителей, отправляющих в нее сообщения, или несколько потребителей, извлекающих в нее сообщения.

  4. Раздел: Каждая тема может быть разделена на несколько разделов (каждая тема имеет как минимум один раздел), и разные разделы будут выделены для разных брокеров, чтобы расширить Kafka по горизонтали, чтобы увеличить возможности параллельной обработки Kafka. Разные разделы одной темы содержат разные сообщения. Когда каждое сообщение добавляется в раздел, ему будет присвоено смещение, которое является уникальным номером сообщения в этом разделе. Кроме того, Kafka использует смещение для обеспечения порядка сообщений в разделе. Последовательность смещений не меняется. кросс-разделы, то есть сообщения в одном и том же разделе Kafka упорядочены, а сообщения в разных разделах не могут быть упорядочены. Концептуальная карта перегородок

    Aaron Swartz

  5. Журнал: раздел логически соответствует журналу.Когда производитель записывает сообщение в раздел, оно фактически записывается в журнал. Журнал — это логическое понятие, соответствующее папке на диске. Журнал состоит из нескольких сегментов, и каждый сегмент соответствует файлу журнала и индексному файлу.

  6. Реплика: Kafka создает избыточные резервные копии сообщений, каждый раздел может иметь несколько реплик, и сообщения, содержащиеся в каждой реплике, одинаковы (но не гарантируется, что они будут точно такими же в одно и то же время). Типы реплик делятся на Ведущих и Последователей, Когда в разделе есть только одна реплика, реплика принадлежит Лидеру, а реплики нет. Последователь. Реплика Kafka имеет определенный механизм синхронизации.В каждом наборе реплик реплика будет выбрана в качестве ведущей реплики.Кафка будет использовать разные стратегии выбора в разных сценариях. Все запросы на чтение и запись в Kafka обрабатываются выбранной копией-лидером, а остальные используются в качестве копий-последователей, копия-последователь только подтягивает данные из копии-лидера в локальную и синхронно обновляет их в собственный журнал.

    Копия раздела:

    Aaron Swartz

  7. Производитель: производители в основном создают сообщения и отправляют сообщения в тематические разделы в соответствии с определенными правилами.

  8. Потребитель: Потребители в основном извлекают сообщения из тем и потребляют их. Потребитель хранит информацию о том, какую позицию (значение смещения) потребитель потребляет в разделе. **В Kafka несколько потребителей могут образовывать группу потребителей, а потребитель может принадлежать только к одной группе потребителей. Группа потребителей гарантирует, что каждый раздел в теме, на которую она подписана, назначается для обработки только одному потребителю в группе потребителей, поэтому, если необходимо реализовать широковещательное потребление сообщений, потребители могут быть помещены в несколько разных групп потребителей. **За счет динамического добавления соответствующего количества потребителей в группу потребителей операция перебалансировки Kafka может использоваться для перераспределения соответствующих отношений между разделами и потребителями, тем самым реализуя возможность горизонтального масштабирования.

  9. Набор ISR: набор ISR представляет собой набор реплик, который в настоящее время доступен (действующий) и имеет объем сообщений, аналогичный объему сообщений лидера, то есть подмножество всего набора реплик. Узлы, на которых расположены реплики в наборе ISR, все подключены к ZK, кроме того, разница между смещением последнего сообщения реплики и смещением последнего сообщения реплики-лидера не может превышать заданный порог. Ведущая реплика каждого раздела поддерживает набор ISR для этого раздела. Как упоминалось выше, копия-лидер сделала запрос на запись сообщения, а копия-последователь вытянет написанное сообщение от лидера.Во втором процессе будет состояние, когда количество сообщений в копии-последователе меньше, чем ведущая копия, если разница меньше указанного порога, то реплика, установленная в это время, является набором ISR.

основное использование

Начать Кафку

Вот начальная установка одного экземпляра Kafka:

  1. Загрузка с официального сайтаkafka_2.11-1.0.0.tgz, затем распаковать
  2. Запустите службу zk, которая поставляется с Kafka,./bin/windows/zookeeper-server-start.bat ./conf/zookeeper.properties
  3. Запустите сервер Кафка,./bin/windows/kafka-server-start.bat ./conf/server.properties

Командная строка использует Kafka (например, Windows, Linux нужно использовать только соответствующий файл .sh для выполнения команды):

  1. Создайте тему с именем demo, укажите количество разделов как 1 и количество фабрик реплик как 1.kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
  2. Список всех тем:kafka-topics.bat --list --zookeeper localhost:2181
  3. Чтобы отправить сообщение в указанную тему, сначала войдите в командный терминал:kafka-console-producer.bat --broker-list localhost:9092 --topic demo, затем введите сообщение в командном терминалеHello World!
  4. Укажите, чтобы начать потребление сообщений из головы очереди сообщений:kafka-console-consumer.bat --zookeeper localhost:2181 --topic demo --from-beginning

Java вызывает API с помощью Kafka:

		/**
		 * @Name: ProducerDemo
		 * @Description: Kafka服务端进行消息的Push 
		 * @Author: BeautifulSoup
		 * @Date: 2018年2月1日 下午11:24:39
		 */
		public class ProducerDemo {
			public static void main(String[] args) {
				//构造Kafka的配置项
				Properties properties=new Properties();
				//定义Kafka服务端的主机名和端口号
				properties.put("bootstrap.servers", "localhost:9092");
				//定义客户端的ID
				properties.put("client.id", "DemoProducer");
				//定义消息的key和value的数据类型都是字节数组
				properties.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
				properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
				//创建生产者的核心类
				KafkaProducer producer=new KafkaProducer<>(properties);
				//指定topic的名称
				String topic = "demo";
				//定义消息的key
				int messageNo=1;
				while(true){
					//定义消息的value
					String messageStr="Message_"+messageNo;
					long startTime=System.currentTimeMillis();
					//异步的发送消息
					producer.send(new ProducerRecord<>(topic, messageNo,messageStr,new Callback() {
						//消息发送成功之后收到了Kafka服务端发来的ACK确认消息之后,就回调下面的方法
						//metadata保存着生产者发送过来的消息的元数据,如果消息的发送过程中出现了异常,则改参数的值为null
						@Override
						public void onCompletion(RecordMetadata metadata, Exception exception) {
							long elapsedTime=System.currentTimeMillis()-startTime;
							if(null!=metadata){
								System.out.println("消息发送给的分区是:"+metadata.partition()+",消息的发送一共用了:"+elapsedTime+"ms");
							}else{
								exception.printStackTrace();
							}
						}
					}));
				}
				
			}
		}
	

		/**
		 * @Name: ConsumerDemo
		 * @Description: Kafka客户端进行消息的Pull 
		 * @Author: BeautifulSoup
		 * @Date: 2018年2月10日 下午11:24:58
		 */
		public class ConsumerDemo {
			public static void main(String[] args) {
				Properties properties=new Properties();
				properties.put("bootstrap.servers","localhost:9092");
				//指定Consumer Group的id
				properties.put("group.id", "BeautifulSoup");
				//自动提交offset
				properties.put("enable.auto.commit", "true");
				//自动提交offset的时间间隔
				properties.put("auto.commit.interval.ms","1000");
				properties.put("session.timeout.ms", "30000");
				properties.put("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
				properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
				KafkaConsumer consumer=new KafkaConsumer<>(properties);
				//指定消费者订阅的topic
				consumer.subscribe(Arrays.asList("demo","test"));
				try{
					while(true){
						//从服务端开始拉取消息,每次的poll都会拉取多个消息
						ConsumerRecords<String, String> records=consumer.poll(100);
						for (ConsumerRecord<String,String> consumerRecord : records) {
							System.out.println("消息记录的位置:"+consumerRecord.offset()+",消息的键:"+consumerRecord.key()+",消息的值:"+consumerRecord.value());
						}
					}
				}finally{
					//关闭consumer
					consumer.close();
				}
			}
		}