Введение в Kafka и интеграция с SpringBoot

Spring Boot Kafka

В последнее время я работаю над интеграцией SOFA и SpringCloud. Я надеюсь помочь вам лучше использовать SOFA и SpringCloud с помощью серии DEMO-проектов. В то же время я также надеюсь, что все будут участвовать в совместном строительстве и звезде.

Портал GitHub:spring-cloud-sofastack-samples

Знакомство с Кафкой

Официальный сайт: https://kafka.apache.org/

img

предоставляемая функция

Apache Kafka™ — это платформа с распределенными потоками данных, из пояснений в официальной документации ее функции примерно следующие:

  • Публикуйте потоки записей и подписывайтесь на них, подобно очереди сообщений или корпоративной системе обмена сообщениями. Публикуйте потоки данных и подписывайтесь на них, подобно очередям сообщений или корпоративным системам обмена сообщениями.
  • Храните потоки записей отказоустойчивым надежным способом. Поток данных аварийно-устойчивого хранилища
  • Обрабатывайте потоки записей по мере их возникновения. Своевременная обработка потоков данных.

В качестве внутреннего драйвера в большинстве случаев Kafka используется в качестве распределенной очереди сообщений.Распределенные очереди сообщений могут предоставлять такие функции, как развязка приложений, снижение пикового трафика и распределение сообщений, которые уже невозможны для масштабных архитектур интернет-сервисов. , Также отсутствует базовая настройка.

Базовые концепты

темы и разделы

Основная абстракция, предоставляемая Kafka для данных, тема — это категория или имя публикуемого потока данных. Топик в Kafka поддерживает несколько подписчиков, то есть в топике может быть ноль, один или несколько потребителей, подписанных на данные, записываемые в соответствующий топик. Для каждой темы кластер Kafka поддерживает секционированный журнал, подобный следующему:

img
Каждый раздел представляет собой упорядоченную, неизменяемую и постоянно добавляемую последовательность записей, известную как структурированный журнал фиксации. Чтобы обеспечить уникальную идентификацию каждой записи данных в разделе, каждой записи в разделе присваивается идентификационный номер, называемый последовательностью смещения. С настраиваемым периодом хранения кластер Kafka сохраняет все опубликованные данные независимо от того, были ли они обработаны потребителями. Например, если срок хранения установлен на два дня, данные могут использоваться в течение двух дней после публикации записи, после чего они будут удалены для освобождения места. Производительность Kafka не зависит от размера данных, поэтому хранение данных в течение длительного времени не является проблемой.

img
На самом деле, единственные метаданные, хранящиеся у каждого потребителя, — это позиция смещения потребителя в журнале, и это смещение контролируется потребителем: обычно потребитель линейно увеличивает свое значение смещения по мере чтения записи (offset++), но на практике, поскольку положение смещения контролируется потребителем, он может обрабатывать записи данных в любом порядке. Например, потребители могут вернуться к более старому смещению, чтобы повторно обработать данные из прошлого, или пропустить предыдущие записи и начать потребление «сейчас». Эта комбинация функций означает, что потребители Kafka очень легкие и могут включаться и выключаться по желанию, не оказывая большого влияния на других потребителей.

Разделы в журнале служат нескольким целям:

  • Для обеспечения масштабируемости лога размер топика не ограничен размером одного сервера. Каждый отдельный раздел должен быть меньше размера диска сервера, на котором он размещен, но тема может иметь много разделов, поэтому она может обрабатывать любое количество массивных данных.
  • как блок параллельной обработки (Ноу-раздел: Kafka может разделить тему на несколько разделов и будет выбирать, в каком разделе хранить сообщения, в соответствии с правилами раздела. Пока правила раздела установлены разумно, все сообщения будут равномерно распределены по разным разделам, чтобы достичь нагрузки. балансировка и горизонтальное расширение. Кроме того, несколько подписчиков могут одновременно получать данные из одного или нескольких разделов для поддержки больших возможностей обработки данных)

Почему тема в кафке разбита на разделы?

Исходный пост:Почему тема в кафке разбита на разделы?, Поскольку его нельзя воспроизвести, исходный текст здесь не приводится ~

режиссер

Производители публикуют данные в теме по своему выбору, и производитель несет ответственность за выбор раздела в теме для назначения данных. Это можно сделать просто путем циклической балансировки нагрузки или путем разделения в соответствии с некоторой семантикой (например, на основе определенных ключевых слов в данных).

потребитель

Потребители используют группы потребителей (consumer group) имя для маркировки себя, несколько потребителей разделяют группу, и все данные, опубликованные в теме, будут доставлены каждой группе потребителей (consumer group) в потребительском экземпляре. Экземпляры-потребители могут находиться в разных процессах или на разных машинах.

Если все экземпляры-потребители имеют одну и ту же группу потребителей, записи будут эффективно сбалансированы по нагрузке для всех экземпляров-потребителей.

Если все экземпляры потребителей имеют разные группы потребителей, то каждая запись будет транслироваться всем процессам-потребителям, и все данные будут отправляться всем потребителям.

img

Иллюстрация выше объяснена изВведение в «Официальную документацию Kafka»:

Как показано на рисунке выше, кластер Kafka с двумя серверными узлами содержит 4 раздела (P0–P3) и разделен на две группы потребителей: группа потребителей A имеет 2 экземпляра потребителей, а группа потребителей B — 4. мы обнаруживаем, что темы имеют небольшое количество потребительских групп, каждая из которых представляет «логического подписчика». Каждая группа состоит из множества экземпляров потребителей, что гарантирует масштабируемость и отказоустойчивость. Можно сказать, что это имеет семантику «публикация-подписка», но пользователь — это группа потребителей, а не отдельный процесс. Способ, которым потребление реализовано в Kafka, заключается в разделении разделов в журнале поровну между экземплярами-потребителями, так что каждый экземпляр является единственным потребителем «части соответствующего размера» раздела в любое время. Процесс поддержания членства в группе потребителей, динамически управляемый протоколом Kafka. Если к группе присоединятся новые экземпляры, они возьмут на себя некоторые разделы от других членов группы; если экземпляр исчезнет, ​​его разделы будут распределены между оставшимися экземплярами. Кафка предоставляет только одинвнутри перегородкиПорядок записей, а не общий порядок между разными разделами в теме. Для большинства приложений достаточно сортировки по разделам в сочетании с разделением по ключам. Однако, если вам нужно использовать общий порядок, вы можете сделать это с темой только с одним разделом, хотя это означает только один процесс-потребитель на группу потребителей.

Kafka как система обмена сообщениями

Системы обмена сообщениями традиционно имеют два режима:очередьипубликовать-подписываться.

  • В режиме очереди пул потребителей может читать с сервера, и каждая запись будет потребляться только определенным потребителем.
    • Позволяет распределять обработку данных между несколькими экземплярами потребителей, но как только данные потребляются, данные исчезают.
  • В модели публикации-подписки записи рассылаются всем потребителям.
    • Позволяет транслировать данные нескольким процессам, но не может масштабироваться и масштабироваться, поскольку каждое сообщение отправляется каждому подписчику.

В этой статье представлены только некоторые основные концепции Kafka как очереди сообщений.официальная документация.

Кафка инсталляция

Вот как установить kafka, адрес загрузки: https://kafka.apache.org/downloads. Версия, используемая в этой статье,kafka_2.12-1.1.1.

  • получить файл пакета

    > wget http://mirrors.shu.edu.cn/apache/kafka/1.1.1/kafka_2.12-1.1.1.tgz
    
  • Разархивируйте сжатый пакет

    > tar -zxvf kafka_2.12-1.1.1.tgz
    
  • Изменить файл конфигурации

    > cd kafka_2.12-1.1.1/config
    > vim server.properties
    

    Мои основные модификации здесь включают следующее:

    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    listeners=PLAINTEXT://192.168.0.1:9092
    
    advertised.listeners=PLAINTEXT://192.168.0.1:9092
    # zookeeper 地址,可以多个
    zookeeper.connect=192.168.0.6:2181
    

    Запуск службы Kafka должен полагаться на Zookeeper, поэтому вам нужно указать адрес кластера Zookeeper в файле конфигурации. Собственный установочный пакет Kafka включает Zookeeper после распаковки.Вы можете запустить экземпляр Zookeeper с одним узлом следующими способами:

    > sh zookeeper-server-start.sh -daemon config/zookeeper.properties
    

    Здесь я указал машину ZK, которая была развернута ранее, поэтому я могу напрямую указать адрес ZK на развернутый адрес. Установка Zookeeper может означать:Установите Zookeeper под Linux

    С помощью вышеуказанных операций следующее может напрямую запустить службу Kafka:

    > sh kafka-server-start.sh config/server.properties
    

SpringBoot интегрирует Kafka

Создайте простую зависимость инструмента Kafka Producer

  • импорт зависимостей
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>1.3.5.RELEASE</version><!--$NO-MVN-MAN-VER$-->
</dependency>
  • producer

Чтобы обеспечить упаковку Kafka для других модулей, вы можете использовать механизм автоматической настройки SpringBoot для упаковки классов инструментов Kafka на стороне производства следующим образом:

@Configuration
public class KafkaProducerAutoConfiguration {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Bean
    public KafkaSender kafkaSender(){
        return new KafkaSender(kafkaTemplate);
    }
}
  • KafkaSender
public class KafkaSender {
    private KafkaTemplate<String, String> kafkaTemplate;
    public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    /**
     * send message
     */
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}
  • Автоматическая конфигурация
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.sofastack.cloud.core.kafka.configuration.KafkaProducerAutoConfiguration

Инженерные модули следующие: изображение-20190306151759441.png

кейс тест

Введите зависимость в тестовый проект, который упакован указанным выше проектом:

<dependency>
	<groupId>io.sofastack.cloud</groupId>
	<artifactId>sofastack-cloud-core-kafka</artifactId>
</dependency>
  • Создайте новый файл конфигурации application.properties в каталоге ресурсов.
#============== kafka ===================
# 指定kafka 代理地址,可以多个,这里的192.168.0.1是上面Kafka 启动配置文件中对应的
# 注:网上一些帖子中说 Kafka 这里的配置只能是主机名,不支持 ip,没有验证过,
# 如果您在验证时出现问题,可以尝试本机绑定下 host
spring.kafka.bootstrap-servers= 192.168.0.1:9092
#=============== provider  =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100ms
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.application.name=kafka-test
logging.path=./logs
  • Имитация отправки сообщения в классе запуска
@SpringBootApplication
@PropertySource("classpath:application-kafka.properties")
public class ProviderApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext run = SpringApplication.run(ProviderApplication.class, args);
        // 这里通过容器获取,正常使用情况下,可以直接使用 Autowired 注入
        KafkaSender bean = run.getBean(KafkaSender.class);
        for (int i = 0; i < 3; i++) {
            //调用消息发送类中的消息发送方法
            bean.sendMessage(KafkaContants.TRADE_TOPIC, "send a test message");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • Напишите потребителя, в проекте SpringBoot реализация потребителя очень проста
@Component
public class KafkaReceiver {
    // 配置监听的主体,groupId 和配置文件中的保持一致
    @KafkaListener(topics = { KafkaContants.TRADE_TOPIC }, groupId = "test-consumer-group")
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println(message);
        }
    }
}

После запуска проекта вы можете просмотреть информацию, напечатанную потребителем в консоли:

Поддерживайте нормальную работу приложения здесь, а затем вручную отправляйте сообщения через сервер и убедитесь, что текущий потребитель может правильно отслеживать соответствующую тему и использовать ее.

> sh kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic trading

После выполнения вышеуказанной команды командная строка будет ожидать ввода, здесь введите glmapper и диван:

Затем посмотрите на результаты ввода консоли приложения: изображение-20190306153452565.png

Ссылаться на