Mogujie сотни миллиардов сообщений Kafka облачная практика

задняя часть очередь сообщений

Введение: Apache Kafka играет важную роль в архитектуре данных в режиме реального времени или потоковой передачи данных из-за его высокой пропускной способности и высокой надежности, и многие корпоративные пользователи предпочитают его. Однако с наступлением эры облачных вычислений поставщики публичных облаков запускали службы очередей сообщений одну за другой, и многие пользователи постепенно перешли от создания собственных кластеров сообщений к использованию служб очередей сообщений в облаке. В этой статье в качестве примера будет взята миграция службы Mogujie Kafka в облако, чтобы объяснить, как очередь сообщений Tencent Cloud CKafka приносит пользу пользователям. (Редактировать: промежуточное ПО, младшая сестра Q)

Введение в Apache Kafka

Официальный сайт Apache Kafka описывает последнюю версию Kafka в следующем предложении: Платформа распределенной потоковой передачи. То есть платформа распределенных потоковых вычислений, и это объясняется следующим образом:

Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

Перевод: Kafka® для создания конвейеров данных в реальном времени и потоковых приложений. Он горизонтально масштабируемый, отказоустойчивый, сверхбыстрый и может быть запущен в производство в тысячах компаний.

2.0+, чтобы добавить к себе слой, определенный Kafka, то есть платформу потоковых вычислений. Однако в сценариях корпоративного использования Kafka по-прежнему часто используется в качестве конвейера данных для выполнения основных функций очереди сообщений. Типичный сценарий использования выглядит следующим образом:

  • Конвейер данных и разделение системы.
  • Асинхронная обработка и управление событиями.
  • Отсечение трафика.
  • Возможная согласованность для транзакционных сообщений и распределенных транзакций.

Болевые точки самодельных кластеров Kafka

Благодаря простой и удобной конструкции Kafka, а также ее эффективной и стабильной работе многие корпоративные пользователи предпочитают создавать свои собственные кластеры Kafka. Но за этим, казалось бы, идеальным осуществимым решением скрывается неявный риск: когда объем данных сообщений в бизнесе достигает определенного уровня, самостоятельно созданный кластер очереди сообщений будет вызывать различные проблемы, так как же решить проблему?

Мы все знаем, что с Kafka легко начать, но есть определенные пороги для продвинутых. Персонал отдела исследований и разработок, решающий проблему, должен хорошо разбираться в компьютерах (знать компьютерную сеть, ввод-вывод и т. д.) и иметь глубокое понимание основных принципов Kafka и различных параметров конфигурации, чтобы настраивать параметры кластера Kafka и быстро обрабатывать внезапные сбои, восстанавливать дрожание кластера и динамически выполнять расширение и сжатие кластера. Из-за этого возникли некоторые проблемы: с одной стороны, предприятиям необходимо вкладывать больше рабочей силы и материальных затрат, а с другой стороны, им необходимо постоянно следить за работоспособностью кластера и вовремя устранять проблемы для обеспечения стабильная работа бизнеса. Таким образом, несмотря на простоту самостоятельно созданного кластера Kafka, он должен нести растущие затраты на исследования и разработки, эксплуатацию и техническое обслуживание.

Грибная улица на фоне облаков

Бизнес-сценарии и программная архитектура Mogujie определяют его сильную зависимость от Kafka.Являясь лидером в области электронной коммерции, его общее количество сообщений в среднем достигло 100 миллиардов в день, а его пиковая производственная пропускная способность достигла уровня ГБ на второй. Его основные бизнес-сценарии — это распределенные сценарии обработки больших данных, такие как реклама, транзакции, безопасность, автономная обработка и т. д.

Осознав болевые точки самостоятельно созданного кластера Kafka, чтобы обеспечить безопасность данных и стабильность кластера, Могужи решил использовать облачную службу очередей сообщений CKafka. CKafka не только поддерживает аварийное восстановление с несколькими зонами доступности, но также помогает клиентам разделять горячие и холодные данные, устраняет узкое место, связанное с частым чтением с диска, и обеспечивает хорошую гарантию стабильной работы бизнеса. Далее давайте проанализируем и объясним, как CKafka обеспечивает устойчивость зоны доступности к сбоям и высокопроизводительный ввод-вывод кластерного сервера.

Кластерное решение для аварийного восстановления в разных зонах доступности

Схематическая диаграмма аварийного восстановления кластера между зонами доступности

В системе сообщений Kafka основными операциями клиентоориентированного сервера являются производство и потребление. Цель аварийного восстановления в зонах перекрестной доступности: когда в зоне доступности происходит сбой (например, пожар, отключение электроэнергии и т. д.), клиент может производить и потреблять, не осознавая этого, и обеспечивать стабильную работу бизнеса. Для обеспечения аварийоустойчивости зоны доступности на техническом уровне необходимо решить следующие проблемы (в качестве примера возьмем приведенную выше схему):

  • Распределение реплик разделов между зонами доступности гарантирует, что реплики каждого раздела распределяются по разным зонам доступности. Например, если кластер охватывает две зоны доступности в шанхайской зоне 2 и зоне 4, а раздел имеет четыре реплики, необходимо обеспечить распределение двух реплик в каждой зоне доступности;
  • Kafka сильно зависит от Apache Zookeeper. Когда Zookeeper не может нормально предоставлять услуги, кластер Kafka также будет затронут. Поэтому для реализации межрегионального аварийного восстановления Kafka также необходимо реализовать межрегиональное аварийное восстановление zookeeper. Как и Kafka, Apache Zookeeper поддерживает межрегиональное аварийное восстановление.
  • IP узла брокера должен быть прозрачным для клиента. То есть клиент не может воспринимать адрес Брокера. Таким образом, при сбое внутреннего Borker и изменении IP-адреса коммутационной машины клиент этого не воспринимает и все еще может нормально работать.

Решение вышеуказанных проблем требует следующих четырех технических решений.

Прозрачный и гибкий IP-адрес узла брокера

Почему IP-адрес и порт узла брокера должны быть прозрачными для клиента? Давайте сначала посмотрим на следующий код:

Properties props = new Properties();
 props.put("bootstrap.servers", "192.168.10.10:9092,192.168.10.11:9092,192.168.10.12:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i),
 Integer.toString(i)));
 producer.close();

Это самый простой фрагмент кода Kafka Produce. 192.168.10.10:9092, 192.168.10.11:9092, 192.168.10.12:9092 — это IP-адреса и порты трех реальных брокеров Kafka для получения информации о метаданных на стороне сервера и запуска операции с производственными сообщениями. Представим следующую ситуацию:

Когда одну из машин 192.168.10.10 не удалось восстановить, мы перезапустили другую Borker, например 192.168.10.13:9092, для предоставления услуг. На этом этапе все клиенты должны быть уведомлены об изменении адреса Kafka с: «192.168.10.10:9092,192.168.10.11:9092,192.168.10.12:9092» заменено на "192.168.10.13:9092,192.168.10.11:9092,192.168.10.12:9092". Если конфигурация IP жестко закодирована в клиентском коде, этот код необходимо изменить, упаковать и опубликовать. **Из-за настройки сервера клиент должен изменить конфигурацию и перезапуститься, что является катастрофой! **Тогда как решить эту проблему?

Идея решения проблемы такова:Virtual IP Address. Как показано на рисунке ниже, мы установим четырехуровневый виртуальный IP-адрес (VIP) и виртуальный порт (VPORT) перед каждым предоставленным брокером, и пользователи смогут получить доступ к фактической службе брокера, получив доступ к VIP и VPORT. Например, 10.0.0.1:9092 соответствует реальному брокерскому сервису 192.169.10.10.9092. Таким образом, фактический IP-адрес брокера прозрачен для пользователя.

Рабочая схема виртуального IP-адреса

Так что же такое дрифт? Службы должны быть устойчивыми к стихийным бедствиям во всех зонах доступности. То есть предоставляемый нами виртуальный IP-адрес можно переключать между зонами доступности.При выходе из строя зоны доступности VIP может быстро переключиться на другую зону доступности и продолжить предоставление услуг. Тогда VIP должен иметь доступ ко всем зонам доступности. Как показано на рисунке ниже, при сбое Шанхайской зоны доступности 2 служба Virtual Ip Service быстро и автоматически переключается на экземпляр брокера, доступный в Шанхайской зоне доступности 1, чтобы обеспечить нормальное использование клиентов.

Схема переключения виртуальных IP-адресов между зонами доступности

Межрегиональное распределение реплик разделов

Нативная Kafka распределяет копии случайным образом по принципу, что копии одной зоны доступности не могут располагаться на одной машине. Логика распространения реплик не зависит от AZ. То есть, когда у брокера в кластере есть свободное место, реплики распределяются по брокерам. Можно распределить разделы одного и того же раздела в одном разделе.

如上面的跨可用区Virual IP切换示意图所示,当创建一个3个Replication(副本)的Partition时,很有可能该Partition的Replication都落在了上海可用区2。如果此时上海可用区2发生故障,那么该Partition就不能正常提供服务,直接影响业务。 Как решить эту проблему?

Ckafka добавляет доступные теги зоны на брокере. Когда субъект, созданный заказчиком, находится в разделе «Доступные темы области», копии одного и того же раздела выделяются в нескольких доступных областях, гарантируют, что раздел все еще имеет выживущую копию, когда неисправность района по-прежнему доступны. Доступная логика тегов области добавляется путем изменения логики распределения раздела исходного кода KAFKA, и другой реплитрон назначен для различных брокеров в зависимости от требований. Эти брокеры относятся к разным имеющимся областям. Принцип реализации следующий:

Во-первых, давайте взглянем на содержимое узла /broker/topics/test-topic в Zookeeper.Содержимое выглядит следующим образом:

{"version":1,"partitions":{"0":[10840,10839],"1":[10838,10840],"2":[10839,10838]}}

Это содержание означает: тема test-topic имеет три раздела: 0, 1 и 2. Часть 0 распространяется на брокере [10840, 10839], раздел 1 распространяется на брокере [10838, 10840] и так далее. Следовательно, необходимо только изменить логику генерации контента для управления распространением Partiton, и логика может быть реализована.

Межрегиональное развертывание Zookeeper

Компонент Zookeeper, который сильно зависит от Kafka, также необходимо развернуть в разных регионах, чтобы обеспечить его доступность. Во-первых, давайте посмотрим на избирательную стратегию Zookeeper: более половины узлов могут быть избраны только лидером, если это четное количество узлов, это может привести к одинаковому количеству голосов, что приведет к отказ от выбора лидера и в конечном итоге привести к отказу кластера. Кроме того, когда количество неисправных узлов в кластере Zookeeper превышает половину, кластер Zookeeper не будет работать должным образом.

Из характеристик алгоритма распределенного консенсуса Zookeeper можно сделать вывод: если в каждой зоне разворачивается zk-нода, zk нужно поддерживать n-зонное аварийное восстановление (и вешать zk-ноды в n зонах одновременно), и нужно разворачивать 2n+1 узлов zk Разделы необходимы для обеспечения доступности разделов Zookeeper. То есть в случае n=1 необходимо развернуть 3 зоны доступности, чтобы обеспечить доступность одной зоны доступности кластера zookeeper.

Оптимизация конфигурации брокера

Согласно дизайну, некоторые параметры необходимо корректировать при развертывании Брокеров в разных зонах доступности. Эти параметры обеспечивают максимальную доступность сервисов для межрегионального аварийного восстановления. Необходимо изменить следующие три конфигурации:

unclean.leader.election.enable=true
min.insync.replicas=1
offsets.topic.replication.factor=3

Что означают эти три конфигурации? Давайте рассмотрим их по очереди:

unclean.leader.election.enable

Официальное описание: Указывает, следует ли разрешить выбор реплик, не входящих в набор ISR, в качестве ведущей в качестве крайней меры, даже если это может привести к потере данных.

Объяснение: Значение по умолчанию для этого поля — False. По умолчанию лидер не может быть выбран из списка реплик, отличных от ISR, поскольку выбор лидера из списка реплик, отличных от ISR, может привести к некоторой потере данных. Если да, то зачем открывать это поле? Потому что в очень нестандартных обстоятельствах, например, когда реплики в ISR недоступны, если в поле установлено значение False, служба напрямую зависнет; если поле установлено в True, разрешено выбрать лидера из не-ISR list, то сервис можно продолжать использовать, несмотря на возможность потери данных. Поэтому этот параметр должен ссылаться на бизнес-характеристики, чтобы решить, открывать ли его.

min.insync.replicas

Официальное описание: Когда производитель устанавливает acks на «все» (или «-1»), min.insync.replicas указывает минимальное количество реплик, которые должны подтвердить запись, чтобы запись считалась успешной.

Объяснение: Значение по умолчанию для этого поля равно 1. Приведенный выше английский перевод: Когда acks=-1, по крайней мере одна реплика подтверждает получение, прежде чем подтвердить, что данные успешно записаны. Этот параметр часто меняют на 2, чтобы обеспечить целостность данных при построении кластера. Причина изменения здесь значения 1 состоит в том, чтобы гарантировать, что клиент может нормально предоставлять услуги в крайнем случае, когда работает только одна копия, а остальные отключены. Если установлено значение 2, когда работает только одна копия, производственная сторона всегда не будет производить, что повлияет на бизнес.

offsets.topic.replication.factor

Описание официального сайта: ................................

Объяснение: Значение по умолчанию равно 1. Представляет количество реплик внутренней темы Kafka Consumer_offsets. Когда брокер, в котором находится копия, не работает, существует только одна копия Consumer_offsets, и раздел не работает. Потребители, которые используют этот раздел для хранения позиции смещения группы потребления, будут затронуты, и смещение не может быть отправлено, в результате чего производитель сможет отправлять сообщения, но потребитель будет недоступен. Поэтому вам нужно установить значение этого поля больше 1.

Кластерное решение для оптимизации давления ввода-вывода

Пользователи, создающие собственные кластеры сообщений, часто сталкиваются с проблемой: во время пикового трафика нагрузка на ввод-вывод кластера очень высока, и пользователи могут лишь временно решить проблему, расширив емкость. Но это все-таки целесообразная мера, и чтобы помочь пользователям действительно решить эту проблему, команда Tencent Cloud CKafka провела глубокий анализ различных показателей и бизнес-сценариев на стороне клиента и сервера. Мы обнаружили, что наибольшая доля давления ввода-вывода в кластере приходится на давление чтения с диска. Но почему давление чтения с диска высокое? Давайте сначала рассмотрим основные принципы проектирования дисковых хранилищ Kafka.

Принцип проектирования дискового хранилища Kafka

Дизайн дискового хранилища Kafka можно охарактеризовать тремя словами: последовательное чтение и запись на диск, кэширование страниц и нулевое копирование.

  • Последовательное чтение и запись диска: то есть запись и чтение данных Kafka выполняются последовательно. По принципу локальности в реальных тестах соотношение производительности последовательной записи на диск и случайной записи отличается максимум в 6000 раз.
  • Кэш страницы: это ключевая технология Kafka для обеспечения последовательного чтения и записи. Кроме того, это также дисковый кеш, в основном реализуемый операционной системой для сокращения операций дискового ввода-вывода. Конкретный метод заключается в кэшировании данных на диске в память и изменении доступа к диску на доступ к памяти. Данные в Page Cache будут обновляться на диск в соответствии с определенной стратегией.
  • Zero Copy: копирование файлов данных напрямую с диска на устройство сетевой карты без необходимости ручного применения переходного отверстия. Это значительно повышает производительность приложения, уменьшая переключение контекста между режимом ядра и пользовательским режимом. В операционной системе Linux метод нулевого копирования основан на базовом методе sendfile(). Для языка Java базовым методом реализации FileChannal.transferTo() также является метод sendfile()

Почему сервер находится под высоким давлением чтения?

схема хранения

Из приведенной выше схемы хранилища: теоретически давление чтения кластера не должно быть таким большим, потому что большая часть давления чтения должна приходиться на кэш страниц и не должна считываться с диска. Тем не менее, на практике часто происходит чтение с диска. После анализа существует несколько бизнес-сценариев, в которых клиенты потребляют одно и то же сообщение.В соответствии с характером потребления в реальном времени поведение потребителей сообщений можно разделить на две категории: потребители в реальном времени и офлайн-потребители.

  • Потребители в реальном времени: требования к данным в реальном времени высоки, и требуется потребление сообщений в реальном времени. В сценарии потребления в реальном времени Kafka будет использовать кэш страниц системы для создания сообщений брокеру, а затем пересылать их непосредственно из памяти потребителям в реальном времени без нагрузки на диск. Вышеуказанные операции обычно называются горячим чтением, а общие бизнес-сценарии включают рекламу и рекомендации.
  • Автономные потребители: также известные как периодические потребители по времени, потребляемые сообщения обычно минут или часов назад. Такие сообщения обычно хранятся на диске, и при использовании они запускают операции ввода-вывода на диске. Это обычно называется холодным чтением и подходит для периодически выполняемых бизнес-сценариев, таких как вычисление отчета и пакетное вычисление.

В случае очень большого объема сообщений потребители в режиме реального времени и в автономном режиме потребляют кластер одновременно, что вызовет две проблемы:

  • На потребителей в режиме реального времени влияют потребители в автономном режиме: из-за потребления потребителями в автономном режиме размещенные данные и данные в реальном времени будут часто подкачиваться в памяти и из нее, что напрямую влияет на производительность служб в реальном времени и увеличивает задержка отклика сервисов реального времени;

  • Автономные данные приведут к интенсивным дисковым операциям ввода-вывода: когда объем данных, считываемых автономными задачами, очень велик, будет активирован высокий дисковый ввод-вывод, и использование дискового ввода-вывода может даже достичь 100%, что повлияет на стабильность кластера.

Способ оптимизации: схема разделения холодных и горячих данных

Ввиду сосуществования холодного чтения данных и горячего чтения в пользовательском кластере мы считаем, что разделение горячих и холодных данных в данных кластера является лучшим решением в настоящее время. Как разделить горячие и холодные данные, не меняя поведение производственной стороны? Tencent Cloud CKafka запустила службу синхронизации данных на основе Kafka Connector с открытым исходным кодом для решения вышеуказанных проблем. Схема архитектуры показана на следующем рисунке:

Схема архитектуры службы синхронизации данных на основе Kafka Connector

Кластер брокера разделен на активный кластер и автономный кластер. Два кластера несут ответственность за то, чтобы оффлайновые компании одновременно использовали офлайновые кластеры. CKafka добавляет кластер коннекторов между двумя кластерами. Кластер соединителей синхронизирует сообщения, подписанные офлайн-бизнесом (синхронизированные в соответствии с измерением темы), из кластера реального времени в офлайн-кластер. Кластер соединителей выполняет синхронизацию данных в реальном времени, что соответствует потребителям в реальном времени. Эта операция не только не влияет на дисковый ввод-вывод, но и не влияет на других потребителей в реальном времени.

Ценность CKafka для бизнеса

CKafka обеспечивает высокую пропускную способность, высокую масштабируемость службы очередей сообщений. Обладая превосходными преимуществами с точки зрения производительности, масштабируемости, безопасности бизнеса, эксплуатации и обслуживания, пользователи могут пользоваться недорогими мощными функциями, избавляясь от утомительной работы по эксплуатации и техническому обслуживанию.

Добро пожаловать, чтобы отсканировать код, чтобы подписаться на нашу общедоступную учетную запись WeChat, и с нетерпением ждем встречи с вами ~