что такое кафка
Apache Kafka® — это платформа для распределенной обработки потоков.
Выше приведено введение официального веб-сайта.По сравнению с общей системой обработки сообщений разница заключается в следующем:
- kafka — это распределенная система, которую легко масштабировать.
- Он обеспечивает высокую пропускную способность как для публикации, так и для подписки.
- Он поддерживает несколько подписчиков и автоматически балансирует потребителей в случае сбоя.
- Сохранение сообщений
Сравнение с другими системами обмена сообщениями:
Индикатор сравнения | kafka | activemq | rabbitmq | rocketmq |
---|---|---|---|---|
задний план | Kafka — это высокопроизводительная распределенная система обмена сообщениями, разработанная LinkedIn. Она широко используется в таких сценариях, как сбор журналов, потоковая обработка данных, онлайн- и офлайн-рассылка сообщений и т. д. | ActiveMQ — это промежуточное программное обеспечение с открытым исходным кодом, ориентированное на сообщения (MOM), которое реализует спецификацию JMS1.1, обеспечивая эффективную, масштабируемую, стабильную и безопасную передачу сообщений на уровне предприятия для приложений. | RabbitMQ — это реализация протокола AMQP (Advanced Message Queue) с открытым исходным кодом, разработанная erlang. | RocketMQ — промежуточное ПО для обмена сообщениями с открытым исходным кодом от Alibaba, выпущенное в 2012 году. Оно было передано в дар Apache Foundation и в ноябре 2016 года стало инкубационным проектом Apache. |
Язык разработки | Ява, Скала | Java | Erlang | Java |
Поддержка протокола | самореализуемый набор | JMS-протокол | AMQP | JMS, MQTT |
Упорство | служба поддержки | служба поддержки | служба поддержки | служба поддержки |
отказоустойчивость производителя | Параметр конфигурации acks предоставляется в kafka, acks=0 Производитель не будет ждать ответа от сервера, прежде чем успешно написать сообщение acks=1 Пока ведущий узел кластера получает сообщение, производитель будет получать сообщение от сервера Успешный ответ acks=all Производитель получит успешный ответ от сервера только тогда, когда все узлы, участвующие в репликации, получат сообщение, этот режим является самым безопасным | Повторите попытку после отправки | Есть акк модели. Модель подтверждения может дублировать сообщения, а модель транзакций обеспечивает полную согласованность | похож на кафку |
пропускная способность | Kafka обладает высокой пропускной способностью, внутренней пакетной обработкой сообщений, механизмом нулевого копирования, хранением и сбором данных являются последовательные пакетные операции на локальном диске со сложностью O(1), а эффективность обработки сообщений очень высока. | RabbitMQ немного уступает Kafka по пропускной способности.Отправные точки у них разные. RabbitMQ поддерживает надежную доставку сообщений, поддерживает транзакции и не поддерживает пакетные операции; исходя из требований надежности хранилища, для место хранения. | Kafka имеет более высокую пропускную способность, чем RocketMq, когда количество тем небольшое, а RocketMq выше, чем kafka, когда количество тем большое. | |
балансировки нагрузки | Kafka использует zookeeper для управления брокерами и потребителями в кластере и может регистрировать темы в zookeeper; с помощью механизма координации zookeeper производитель сохраняет информацию брокера о соответствующей теме, которая может быть отправлена брокеру случайным образом или путем опроса; и производитель может быть указан на основе семантики Фрагментация, сообщение отправляется фрагменту брокера | Для балансировки нагрузки RabbitMQ требуется отдельный балансировщик нагрузки для поддержки | NamerServer для балансировки нагрузки |
Схема архитектуры:
Пример использования:
режиссер:
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
@Override
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
* non-null.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
}
consumer:
public class Consumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
public Consumer(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
@Override
public void run() {
while (true) {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1).getSeconds());
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}
}
properties:
public class KafkaProperties {
public static final String TOPIC = "topic1";
public static final String KAFKA_SERVER_URL = "localhost";
public static final int KAFKA_SERVER_PORT = 9092;
public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
public static final int CONNECTION_TIMEOUT = 100000;
public static final String TOPIC2 = "topic2";
public static final String TOPIC3 = "topic3";
public static final String CLIENT_ID = "SimpleConsumerDemoClient";
private KafkaProperties() {}
}
Родственные существительные:
- Производитель: производитель сообщений, клиент, который отправляет сообщения брокеру.
- Потребитель : потребитель сообщения, клиент, который читает сообщение от брокера, потребитель
- брокер : узел обработки промежуточного программного обеспечения сообщений, узел Kafka является брокером, и один или несколько брокеров могут формировать кластер Kafka.
- тема : тема, Kafka классифицирует сообщения в соответствии с темой, и каждое сообщение, опубликованное в кластере Kafka, должно указывать тему.
- Раздел: раздел, физическая концепция, тема может быть разделена на несколько разделов, каждый раздел упорядочен внутри, Kafka по умолчанию определяет, что сообщения отправляются в определенный раздел в соответствии с ключом% partithon.
- ConsumerGroup : каждый потребитель принадлежит к определенной группе потребителей, сообщение может быть отправлено нескольким различным группам потребителей, но только один потребитель в группе потребителей может использовать сообщение.
Тема и раздел
- Сообщение в теме будет отправлено в определенный раздел по заданным правилам (по умолчанию стоит хэш-значение ключа % от количества разделов, конечно, можно и настроить);
- Каждый раздел представляет собой последовательную неизменяемую очередь сообщений, которую можно добавлять непрерывно. Сообщениям в разделе присваивается порядковый номер, называемый смещением, который уникален для каждого раздела.
- Метаданные, хранящиеся у потребителя, представляют собой это смещение, которое является позицией потребителя в этом журнале (разделе). Это смещение контролируется потребителем: обычно, когда потребитель получает сообщение, смещение также увеличивается линейно.
Потребитель и раздел
- Вообще говоря, модель сообщений можно разделить на два типа: очередь и публикация-подписка. Метод обработки очереди заключается в том, что группа потребителей извлекает данные с одного конца очереди, и после потребления данные исчезают. В модели «публикация-подписка» сообщение рассылается всем потребителям, и потребители, получившие сообщение, могут его обработать. Абстрактное в модели Кафки: группа потребителей (consumer group)
- Группа потребителей: в каждой группе несколько потребителей. Если все потребители находятся в группе, то это становится моделью очереди; если потребители находятся в разных группах, это становится моделью публикации-подписки.
- Данные в разделе будут обрабатываться только потребителями в одной группе, а другие потребители в той же группе не будут обрабатываться повторно.
- Количество потребителей в группе потребителей
В конце концов
У Little Tail есть волна, добро пожаловать, чтобы обратить внимание на мой публичный аккаунт и время от времени делиться своими мыслями о программировании, инвестициях и жизни :)