Начало работы с Кафкой

задняя часть ZooKeeper Kafka RabbitMQ

что такое кафка

Apache Kafka® — это платформа для распределенной обработки потоков.

Выше приведено введение официального веб-сайта.По сравнению с общей системой обработки сообщений разница заключается в следующем:

  1. kafka — это распределенная система, которую легко масштабировать.
  2. Он обеспечивает высокую пропускную способность как для публикации, так и для подписки.
  3. Он поддерживает несколько подписчиков и автоматически балансирует потребителей в случае сбоя.
  4. Сохранение сообщений

Сравнение с другими системами обмена сообщениями:

Индикатор сравнения kafka activemq rabbitmq rocketmq
задний план Kafka — это высокопроизводительная распределенная система обмена сообщениями, разработанная LinkedIn. Она широко используется в таких сценариях, как сбор журналов, потоковая обработка данных, онлайн- и офлайн-рассылка сообщений и т. д. ActiveMQ — это промежуточное программное обеспечение с открытым исходным кодом, ориентированное на сообщения (MOM), которое реализует спецификацию JMS1.1, обеспечивая эффективную, масштабируемую, стабильную и безопасную передачу сообщений на уровне предприятия для приложений. RabbitMQ — это реализация протокола AMQP (Advanced Message Queue) с открытым исходным кодом, разработанная erlang. RocketMQ — промежуточное ПО для обмена сообщениями с открытым исходным кодом от Alibaba, выпущенное в 2012 году. Оно было передано в дар Apache Foundation и в ноябре 2016 года стало инкубационным проектом Apache.
Язык разработки Ява, Скала Java Erlang Java
Поддержка протокола самореализуемый набор JMS-протокол AMQP JMS, MQTT
Упорство служба поддержки служба поддержки служба поддержки служба поддержки
отказоустойчивость производителя Параметр конфигурации acks предоставляется в kafka, acks=0 Производитель не будет ждать ответа от сервера, прежде чем успешно написать сообщение acks=1 Пока ведущий узел кластера получает сообщение, производитель будет получать сообщение от сервера Успешный ответ acks=all Производитель получит успешный ответ от сервера только тогда, когда все узлы, участвующие в репликации, получат сообщение, этот режим является самым безопасным Повторите попытку после отправки Есть акк модели. Модель подтверждения может дублировать сообщения, а модель транзакций обеспечивает полную согласованность похож на кафку
пропускная способность Kafka обладает высокой пропускной способностью, внутренней пакетной обработкой сообщений, механизмом нулевого копирования, хранением и сбором данных являются последовательные пакетные операции на локальном диске со сложностью O(1), а эффективность обработки сообщений очень высока. RabbitMQ немного уступает Kafka по пропускной способности.Отправные точки у них разные. RabbitMQ поддерживает надежную доставку сообщений, поддерживает транзакции и не поддерживает пакетные операции; исходя из требований надежности хранилища, для место хранения. Kafka имеет более высокую пропускную способность, чем RocketMq, когда количество тем небольшое, а RocketMq выше, чем kafka, когда количество тем большое.
балансировки нагрузки Kafka использует zookeeper для управления брокерами и потребителями в кластере и может регистрировать темы в zookeeper; с помощью механизма координации zookeeper производитель сохраняет информацию брокера о соответствующей теме, которая может быть отправлена ​​​​брокеру случайным образом или путем опроса; и производитель может быть указан на основе семантики Фрагментация, сообщение отправляется фрагменту брокера Для балансировки нагрузки RabbitMQ требуется отдельный балансировщик нагрузки для поддержки NamerServer для балансировки нагрузки

Схема архитектуры:

Пример использования:

режиссер:

public class Producer extends Thread {

    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public Producer(String topic, Boolean isAsync) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    @Override
    public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            ++messageNo;
        }
    }

    class DemoCallBack implements Callback {

        private final long startTime;
        private final int key;
        private final String message;

        public DemoCallBack(long startTime, int key, String message) {
            this.startTime = startTime;
            this.key = key;
            this.message = message;
        }

        /**
         * A callback method the user can implement to provide asynchronous handling of request completion. This method will
         * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
         * non-null.
         *
         * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
         *                  occurred.
         * @param exception The exception thrown during processing of this record. Null if no error occurred.
         */
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (metadata != null) {
                System.out.println(
                        "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                                "), " +
                                "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
            } else {
                exception.printStackTrace();
            }
        }
    }
}

consumer:

public class Consumer extends Thread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;

    public Consumer(String topic) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }

    @Override
    public void run() {
        while (true) {
            consumer.subscribe(Collections.singletonList(this.topic));
            ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1).getSeconds());
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
            }
        }
    }
}

properties:

public class KafkaProperties {
    public static final String TOPIC = "topic1";
    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;
    public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
    public static final int CONNECTION_TIMEOUT = 100000;
    public static final String TOPIC2 = "topic2";
    public static final String TOPIC3 = "topic3";
    public static final String CLIENT_ID = "SimpleConsumerDemoClient";

    private KafkaProperties() {}
}

Родственные существительные:

  1. Производитель: производитель сообщений, клиент, который отправляет сообщения брокеру.
  2. Потребитель : потребитель сообщения, клиент, который читает сообщение от брокера, потребитель
  3. брокер : узел обработки промежуточного программного обеспечения сообщений, узел Kafka является брокером, и один или несколько брокеров могут формировать кластер Kafka.
  4. тема : тема, Kafka классифицирует сообщения в соответствии с темой, и каждое сообщение, опубликованное в кластере Kafka, должно указывать тему.
  5. Раздел: раздел, физическая концепция, тема может быть разделена на несколько разделов, каждый раздел упорядочен внутри, Kafka по умолчанию определяет, что сообщения отправляются в определенный раздел в соответствии с ключом% partithon.
  6. ConsumerGroup : каждый потребитель принадлежит к определенной группе потребителей, сообщение может быть отправлено нескольким различным группам потребителей, но только один потребитель в группе потребителей может использовать сообщение.

Тема и раздел

  • Сообщение в теме будет отправлено в определенный раздел по заданным правилам (по умолчанию стоит хэш-значение ключа % от количества разделов, конечно, можно и настроить);
  • Каждый раздел представляет собой последовательную неизменяемую очередь сообщений, которую можно добавлять непрерывно. Сообщениям в разделе присваивается порядковый номер, называемый смещением, который уникален для каждого раздела.
  • Метаданные, хранящиеся у потребителя, представляют собой это смещение, которое является позицией потребителя в этом журнале (разделе). Это смещение контролируется потребителем: обычно, когда потребитель получает сообщение, смещение также увеличивается линейно.

Потребитель и раздел

  • Вообще говоря, модель сообщений можно разделить на два типа: очередь и публикация-подписка. Метод обработки очереди заключается в том, что группа потребителей извлекает данные с одного конца очереди, и после потребления данные исчезают. В модели «публикация-подписка» сообщение рассылается всем потребителям, и потребители, получившие сообщение, могут его обработать. Абстрактное в модели Кафки: группа потребителей (consumer group)
  • Группа потребителей: в каждой группе несколько потребителей. Если все потребители находятся в группе, то это становится моделью очереди; если потребители находятся в разных группах, это становится моделью публикации-подписки.
  • Данные в разделе будут обрабатываться только потребителями в одной группе, а другие потребители в той же группе не будут обрабатываться повторно.
  • Количество потребителей в группе потребителей

В конце концов

У Little Tail есть волна, добро пожаловать, чтобы обратить внимание на мой публичный аккаунт и время от времени делиться своими мыслями о программировании, инвестициях и жизни :)