Начните работу с Kafka для небольших партнеров компании уже сегодня, хотите знать?

Kafka

Промежуточное ПО/система обмена сообщениями

промежуточное ПО

Промежуточное ПО — это тип компьютерного программного обеспечения, которое соединяет программные компоненты и приложения и включает в себя набор служб. Для облегчения взаимодействия нескольких программ, работающих на одной или нескольких машинах по сети.

Функциональная совместимость, обеспечиваемая этой технологией, привела к развитию согласованных распределенных архитектур, которые часто используются для поддержки и упрощения сложных распределенных приложений, включая веб-серверы, мониторы транзакций ипрограмма очереди сообщений.

очередь сообщений

В информатике, очередь сообщений является методом межпроцессной связи или связи между различными потоками того же процесса, очередь программного обеспечения, используемого для обработки ряд входов, обычно от пользователей.

Очередь сообщений обеспечивает асинхронный протокол связи. Записи в каждой очереди содержат подробные данные, включая время возникновения, тип устройства ввода и конкретные входные параметры, то есть отправитель и приемник сообщения не подключены. Нужно Взаимодействовать с очередью сообщения одновременно. Сообщение хранится в очереди, пока приемник не извлекает его.

Очереди сообщений часто хранятся в структуре связанного списка. Привилегированный процесс может записывать или читать сообщения в очередь сообщений.

--Выше из Википедии

Разберемся на примере курьера, доставляющего курьера.

Сначала его доставляли до двери.Иногда дома не было, и приходилось ждать следующего дня.Сейчас там новобранец.Брату-курьеру нужно только положить его внутрь,и можно идти на станция, чтобы забрать его, если у вас есть экспресс.

Курьеру не нужно напрямую связываться с таким количеством клиентов, просто бросьте посылку в гостиницу, а затем мы идем в гостиницу, чтобы забрать ее, даже если иногда мы не пусты, мы можем оставить посылку в гостинице. еще на несколько дней, что является его емкостью штабелирования.

Почему мы используем очереди сообщений

Асинхронная обработка

Предположим, у вас есть ссылка на системный вызов, системе A требуется 50 мс для вызова системы B, 200 мс для системы B для вызова системы C и 2 с для системы C для вызова системы D, которая должна выполнить операцию тайм-аута, как показано на рисунке. на следующем рисунке:

Сейчас самая большая проблема в том, что при запросе пользователя время вызова всего линка составляет 50мс+200мс+2000мс=2250мс, а это больше 2-х секунд.

Фактически, в цепочке вызовов системе А требуется всего 250 мс для вызова системы Б и системе В для вызова системы С, но системе С требуется 2 с для вызова системы D.

Именно связь добавления системы C к вызову системы D приводит к увеличению времени отклика системы с 250 мс до 2250 мс, что в 10 раз меньше.

Если мы уберем систему D из связи и позволим системе C вызывать D асинхронно, то вызов C в системе B, C обрабатывает и завершает свою собственную логику и отправляет асинхронный запрос на вызов системы D, не дожидаясь, пока система D ответит и вернет . Это намного лучше?

В качестве примера возьмем наш обычный вынос:

Обычно мы заканчиваем прием пищи, платим деньги, а затем зачисляем их в дебетовую систему, создаем заказы, уведомляем предприятия о приготовлении блюд.

Далее, вам нужно найти всадника, который доставит вам еду? Процесс поиска гонщика требует сложного алгоритма для составления расписания, что отнимает много времени.

Тогда можем ли мы предпринять шаги по поиску райдера для доставки вам еды из линка и сделать его асинхронным, даже если он задерживается на десятки секунд, но до тех пор, пока мы найдем райдера для вас в определенные временные рамки Питание доставка в порядке.

Это делает вас очень быстрым, чтобы заказать еду на вынос? После успешного платежа можно напрямую создать заказ, списать средства со счета и уведомить продавца о немедленном приготовлении еды для вас.Этот процесс может занять сотни миллисекунд. Затем асинхронность в фоновом режиме может занять десятки секунд, чтобы найти водителя, который доставит вам еду через алгоритм планирования, но этот шаг не влияет на наш быстрый заказ.

Таким образом, приведенная выше ссылка также одинакова.Если бизнес-процесс поддерживает асинхронность, можете ли вы рассмотреть возможность извлечения вызова из системы C в систему D, чтобы сделать его асинхронным, вместо того, чтобы помещать его в ссылку и вызывать его синхронно и последовательно.

Весь процесс выглядит следующим образом:

разделение сообщений

Если вы в настоящее время системная система A, эта система выводит основные данные, а также системы B и C нужны эти данные.

Затем мы обычно делаем прямой вызов системы B и системы C и отправляем данные в прошлое.

Процесс выглядит следующим образом:

Через несколько дней другим бизнес-системам D и E тоже нужны эти данные, а потом становится так

Что, если в будущем появится система? Вы не должны менять свой код до смерти. . .

В этом случае системная связь очень серьезная, что делать, если вы не можете отправить системный вызов?

В ответ на вышеуказанные проблемы мы можем использовать очереди сообщений для достижения разделения системы.

Система А отправляет данные в очередь сообщений, а другие системы, которым они нужны, идут в очередь сообщений и получают их сами.

Снижение пикового трафика и сбор журналов

Предположим, у вас есть система, которая может обрабатывать сотни запросов в секунду в обычное время.Система развернута на 8-ядерной машине 16G, нормальная обработка в порядке, а сотням запросов в секунду можно легко сопротивляться.

Однако, как показано на рисунке ниже, во время пикового периода внезапно пришли тысячи запросов в секунду, и был мгновенный пик трафика.В это время ваш выбор состоит в том, чтобы построить 10 машин, чтобы выдержать мгновенный пик тысяч запросов. в секунду?

Тогда если мгновенный пик всего полчаса в день, а то он сразу сводится к сотням запросов в секунду.Если развернуть много машин онлайн, то каждая машина может обрабатывать десятки запросов в секунду.Не мало ли пустой траты машинных ресурсов?

Большую часть времени сотни запросов в секунду, достаточно одной машины, но чтобы выдержать мгновенный пик каждый день деплоят 10 машин, которые пригодятся на полчаса каждый день, а в остальное время это пустая трата времени. Ресурсы.

На этом этапе мы можем использовать очередь сообщений, чтобы устранить пики. Слой MQ развернут перед всеми машинами, и каждый может легко получать сообщения с сотнями запросов в секунду.

Как только достигается мгновенный пиковый период, тысячи запросов в секунду могут накапливаться в MQ, а затем машина будет медленно обрабатывать и потреблять.

После завершения пикового периода и после периода потребления резерв данных в MQ будет использован.

Как показано ниже:

Вышеупомянутое содержание взято из [Заметок об архитектуре Ши Шаня], изображение нарисовано мной, это мое собственное понимание.

Чтобы узнать о нем больше, перейдите наТалант /user/208432…

Какая очередь сообщений

Четыре часто используемые очереди сообщенийActiveMQ, RabbitMQ, RocketMQ, Кафка

nuggets.capable/post/684490…

Что такое Кафка

Kafka — это высокопроизводительная распределенная система обмена сообщениями с публикацией и подпиской, написанная на Scala.

Для студентов, знакомых со спецификацией JMS (Java Message Service), система сообщений не является новой концепцией (например, ActiveMQ, RabbitMQ и т. д.).

KAFKA имеет функции, которые он должен иметь в качестве системы обмена сообщениями, но у него есть уникальный дизайн. Можно сказать, что кафка заимствует идеи спецификации JMS, но не полностью следит за спецификацией JMS.

Kafka — это распределенная, разделенная служба сообщений (официально называемая журналом фиксации). Он обеспечивает функциональность, которой должна обладать система обмена сообщениями, но имеет уникальный дизайн. Во-первых, давайте рассмотрим основную терминологию, связанную с сообщениями:

  • Topic: Kafka поддерживает сообщения в соответствии с классификацией тем.
  • Producer: Мы называем процесс публикации сообщения в теме производителем
  • Consumer: Процесс подписки на тему и обработки сообщений в теме мы называем потребителем
  • Broker: Kafka работает в кластере, и каждый сервер в кластере называется брокером.

Следовательно, на высоком уровне производители отправляют сообщения в кластер Kafka по сети, а потребители затем их потребляют, как показано на следующем рисунке:

Связь между сервером (Hrokers) и клиентом, потребитель завершен через протокол TCP. Мы предлагаем Java Client для Kafka, но вы также можете использовать клиенты, написанные на других языках.

Схема архитектуры Кафки

Основные концепции Кафки

Тема и журнал

Давайте сначала глубоко поймем, что Кафка предлагает абстракцию высокого уровня — Тему.

Можно понять, что Тема — это название категории, и все сообщения отправляются в Теме. Для каждой темы кластер kafka поддерживает раздел (раздел, который можно понимать как очередь) файл журнала следующим образом:

Раздел — это упорядоченная последовательность сообщений, которые последовательно добавляются в файл, называемый журналом фиксации. Сообщения в каждом разделе имеют уникальный номер, называемыйoffset, используемый для уникальной идентификации сообщения в разделе.

Раздел поддерживает чтение смещения сообщений, а перемещением сообщений управляет сам потребитель, как показано на следующем рисунке:

Как видно из рисунка выше, разные потребители не мешают друг другу при чтении сообщений из одного раздела.Потребители могут контролировать данные, которые они хотят получить, задавая смещение сообщения.Например, они могут читать с начала и чтение последних данных, выборка, повторное чтение и другие функции.

Distribution

Разделы журнала распределены по разным брокерам в кластере kafka, и каждый брокер может запросить резервное копирование данных разделов на других брокерах. Кластер kafka поддерживает настройку количества резервных копий разделов. Для каждого раздела есть один посредник, который действует как «лидер», и ноль или более других посредников действуют как «последователи».

Лидер обрабатывает все запросы на чтение и запись для этого раздела, в то время как последователи пассивно копируют результаты лидера. Если лидер терпит неудачу, один из последователей автоматически становится новым лидером.

Каждый брокер является лидером раздела, управляемого сама по себе, и также является последователем разделов, управляемых другими брокерами. Кафка достигает балансировки нагрузки таким образом.

Producers

Производитель отправляет сообщение в тему и отвечает за выбор раздела темы для отправки сообщения. Простая балансировка нагрузки с помощью циклического перебора. Также его можно отличить по определенному ключевому слову в сообщении. Обычно чаще используется второй метод.

Consumers

Существует два традиционных режима доставки сообщений: очередь (очередь) и (публикация-подписка).

В режиме очереди несколько потребителей считывают данные с сервера, и сообщения доходят только до одного потребителя.

В модели публикации-подписки сообщения рассылаются всем потребителям.

Kafka предоставляет потребительскую абстракцию, основанную на этих двух режимах:consumer group

Каждый потребитель должен указать, к какой группе потребителей он принадлежит. Опубликованное в теме сообщение сообщение доставляется потребителю экземпляра группы потребителей. потребительские экземпляры могут работать в разных процессах, могут быть на разных физических машинах.

Если все потребители находятся в одной группе потребителей, это похоже на традиционный режим очереди, и балансировка нагрузки выполняется между многими экземплярами потребителей.

Если у всех потребителей есть своя уникальная группа потребителей, это похоже на традиционную модель публикации-подписки.

В более общем случае тема обычно имеет несколько групп потребителей, и каждая группа потребителей является логическим подписчиком. Каждая группа потребителей состоит из нескольких экземпляров потребителей для обеспечения масштабируемости и устойчивости к сбоям. В этом нет ничего особенного, он просто заменяет потребителей в потребителях, работающих в одном процессе в модели публикации-подписки, группой потребителей. Как показано ниже:

Описание: Кластер kafka, состоящий из 2 брокеров, имеет в общей сложности 4 раздела (P0-P3). Этот кластер состоит из 2 групп потребителей, A имеет 2 экземпляра потребителя, а B имеет 4.

порядок потребления

Kafka имеет более сильные гарантии упорядочения, чем традиционные системы обмена сообщениями. В традиционном случае сервер резервирует сообщения в очереди по порядку.Если есть несколько потребителей, получающих сообщения в очереди, сервер будет предоставлять сообщения в том порядке, в котором они были получены.

Однако, хотя сообщение для того, чтобы предоставить сервер, ноДоставка сообщений каждому потребителю асинхронна, что может привести к тому, что потребителю, который использует первым, потребуется больше времени для получения сообщения, чем потребителю, который использует позже, что приведет к невозможности гарантировать порядок.

Это говорит о том, что сообщения могут потерять порядок для нескольких потребителей при параллельном использовании.

Системы обмена сообщениями обычно принимают "exclusive consumer«Концепция обеспечения того, чтобы только один потребитель мог получать из очереди одновременно, но на самом деле это означает, что параллелизм не поддерживается в процессе обработки сообщений.

Кафка в этом лучше.По концепции разделения параллелизма, то есть разбиения, конвета Кафки может быть предоставлена ​​и множество последовательного потребителя, когда потребитель при балансировании нагрузки в то время как. Принцип реализации заключается в назначении раздела в теме другому экземпляру потребителя в группе потребителей.

Таким образом, мы можем гарантировать, что раздел имеет только один экземпляр потребителя в сообщении одновременно, тем самым обеспечивая порядок.

Несмотря на то, что в теме есть несколько разделов, в группе потребителей также есть несколько экземпляров потребителей, и балансировка нагрузки по-прежнему может быть гарантирована за счет разумного распределения.

Следует отметить, что количество экземпляров потребителей в группе потребителей не может быть больше, чем количество разделов в топике. Если их слишком много, он не сможет выделить сообщения раздела.

Kafka гарантирует только локальный порядок потребления сообщений в рамках раздела и не может гарантировать общий порядок потребления в нескольких разделах одной темы. Как правило, этого достаточно для большинства приложений.

Однако, если есть необходимость гарантировать порядок потребления в целом, то мы можем установить количество экземпляров потребителей в группе потребителей равным 1, установив количество разделов топика равным 1.

Но при этом пропускная способность Kafka снизится.

Установка и использование кафки

Подготовка окружающей среды перед установкой

Поскольку Kafka разработана на Scala и работает на JVM, перед установкой Kafka необходимо установить JDK.

# yum install java-1.8.0-openjdk* -y

Kafka полагается на zookeeper, поэтому вам нужно сначала установить zookeeper.

# wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
# tar -zxvf zookeeper-3.4.12.tar.gz
# cd zookeeper-3.4.12
# cp conf/zoo_sample.cfg conf/zoo.cfg 启动zookeeper
# bin/zkServer.sh start
# bin/zkCli.sh
# ls /   #查看zk的根目录相关节点

Шаг 1: Загрузите установочный пакет

Загрузите версию 1.1.0 и разархивируйте ее:

# wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
# tar -xzf kafka_2.11-1.1.0.tgz
# cd kafka_2.11-1.1.0

Шаг 2. Запустите службу

Теперь запустите службу kafka: Синтаксис сценария запуска:kafka-server-start.sh [-daemon] server.properties

Видно, что конфигурационный путь server.properties является обязательным параметром, -daemon означает запуск в качестве фонового процесса, иначе служба будет остановлена ​​после выхода ssh-клиента. (Обратите внимание, что IP-адрес, связанный с именем хоста Linux, будет использоваться при запуске kafka, поэтому вам необходимо настроить сопоставление имени хоста и IP-адреса Linux с локальным хостом, используйте vim /etc/hosts)

# bin/kafka-server-start.sh -daemon config/server.properties

Входим в каталог zookeeper для просмотра дерева каталогов zookeeper через клиент zookeeper

# bin/zkCli.sh
# ls /   #查看zk的根目录kafka相关节点
# ls /brokers/ids #查看kafka节点

Шаг 3: Создайте тему

Теперь давайте создадим тему с именем «Тест», в этой теме есть только один раздел, а коэффициент резервного копирования также установлен равным 1:

# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Теперь мы можем просмотреть темы, существующие в настоящее время в kafka, с помощью следующей команды

# bin/kafka-topics.sh --list --zookeeper localhost:2181

Помимо создания темы вручную, мы можем настроить брокера на автоматическое создание темы, когда производитель публикует сообщение с указанной темой, но тема не существует.

Шаг 4: Отправьте сообщение

Kafka поставляется с командным клиентом производителя, который может считывать содержимое из локальных файлов, или мы можем напрямую вводить содержимое из командной строки и отправлять содержимое в кластер Kafka в виде сообщений. По умолчанию каждая строка обрабатывается как отдельное сообщение.

Сначала нам нужно запустить скрипт, который публикует сообщение, а затем ввести содержимое сообщения для отправки в команду:

# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a msg
>this is a another msg

Шаг пятый: потребительские новости

Для потребителя kafka также имеет клиент командной строки, который будет выводить полученный контент в команду:

# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test   --from-beginning #老版本

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup --consumer-property client.id=consumer-1  --topic test    #新版本

Если вы запустите приведенную выше команду из другого окна терминала, вы увидите, что вы набрали в терминале производителя, что вскоре отобразится в окне терминала потребителя.

Некоторые команды для практики Кафки

启动zk 
bin/zkServer.sh start
启动kafka
bin/kafka-server-start.sh config/server.properties &
停止kafka 如果不管用 就是用kill -9 
bin/kafka-server-stop.sh
1.创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2.列出主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
3.生产消息
bin/kafka-console-producer.sh --broker-list localhost:9092 -topic test
4.消费消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic test --from-beginning
5.删除主题
 1. 删除 kafka 主题 
    bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic sceniccenter-base-ticket
 2. 在kafka 数据目录删除主题文件夹
 3. 删除 zookeeper 上的 记录
    1)登录zookeeper客户端:命令:./zkCli.sh 
        2)找到topic所在的目录:ls /brokers/topics
        3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。    
    另外被标记为 marked for deletion 的topic你可以在zookeeper客户端中通过命令获得: ls /admin/delete_topics/【topic name】
  总结  彻底删除topic:
     1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
     2、如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过 zookeeper-client 删除掉broker下的topic即可。
6.查看toplic 的分区等情况 
bin/kafka-topics --describe --zookeeper hadoop1:2181 --topic wwcc1
Topic:wwcc1    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: wwcc1    Partition: 0    Leader: 127    Replicas: 127,128,129    Isr: 127,128,129
    Topic: wwcc1    Partition: 1    Leader: 128    Replicas: 128,129,127    Isr: 128,129,127
    Topic: wwcc1    Partition: 2    Leader: 129    Replicas: 129,127,128    Isr: 129,127,128

некоторые учебные адреса

Зачем использовать промежуточное программное обеспечение сообщений в архитектуре системы - китайский Шишань:nuggets.capable/post/684490…

Официальный адрес:kafka.apache.org/

Учебник по китайскому языку:orchome.com/kafka/index

Подробное объяснение промежуточного программного обеспечения сообщений MQ:blog.51CTO.com/victim/210…