Практика переноса данных Kafka

задняя часть Kafka контрольная работа модульный тест

Приветствую всех вОблако + сообщество, получить больше крупной технической практики Tencent по галантерее~

автор:mikealzhou

В этой статье основное внимание уделяется двум распространенным методам переноса данных в Kafka: 1. Раздельный перенос данных между разными дисками данных внутри брокера 2. Раздельный перенос данных между разными брокерами.

1. Разделите миграцию данных между разными дисками данных в брокере.

1.1 Введение

Недавно важный клиент Tencent Cloud обнаружил, что хранилище данных тематических разделов внутри брокера kafka распределено неравномерно, что приводит к 100%-му исчерпанию некоторых дисков и только 40%-му использованию некоторых дисков.

После анализа причины обнаруживается, что данные разделов некоторых тем слишком сконцентрированы на некоторых дисках, например, диск данных /data5, показанный на скриншоте ниже.

В соответствии с характеристиками распределенной системы легко придумать метод переноса данных для переноса данных раздела различных дисков данных в брокере. Перед выполнением онлайн-миграции данных кластера, чтобы обеспечить целостность данных и безопасность рабочего кластера, необходимо выполнить тест на тестовом кластере.

1.2 Протестируйте различные диски данных в брокере на предмет переноса данных раздела

1.2.1 Установите тестовую тему и убедитесь, что производство и потребление в норме.

В созданном нами тестовом кластере у Kafka есть три брокера с именами хостов: tbds-172-16-16-11, tbds-172-16-16-12, tbds-172-16-16-16. Каждый брокер настроен с двумя дисками данных, а кэшированные данные хранятся в /data/kafka-logs/ и /data1/kafka-logs/ соответственно.

Сначала создайте тестовую тему:

./kafka-topics.sh --create --zookeeper tbds-172-16-16-11:2181 --replication-factor 2 --partitions 3 --topic test_topic

Затем отправьте 500 фрагментов данных в производство темы и одновременно потребляйте данные. Затем проверьте раздел данных темы:

GROUP    TOPIC      PARTITION   CURRENT-OFFSET   LOG-END-OFFSET   LAG   OWNER
groupid1 test_topic 0       172      172      0      kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 1       156      156      0      kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 2       172      172      0      kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3

Обнаружено, что данные о производстве и потреблении test_topic в норме.

1.2.2 Перенос данных разделов между дисками

Теперь войдите в узел брокера tbds-172-16-16-12 и переместите каталог данных раздела test_topic /data1/kafka-logs/test_topic-0/ в /data/kafka-logs/ :

mv /data1/kafka-logs/test_topic-0/ /data/kafka-logs/

Просмотрите данные раздела test_topic-0 в каталоге /data/kafka-logs/:

1.2.3 Снова создавать и использовать данные по теме теста

Снова отправьте 500 фрагментов данных и одновременно потребляйте их. Затем посмотрите на данные:

GROUP    TOPIC      PARTITION   CURRENT-OFFSET   LOG-END-OFFSET   LAG   OWNER
groupid1 test_topic 0       337      337      0      kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 1       304      304      0      kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 2       359      359      0      kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3

Еще раз проверьте данные в каталоге раздела /data/kafka-logs/test_topic-0/ узла брокера tbds-172-16-16-12:

Было обнаружено, что данные кеша не увеличились с /data1/kafka-logs/ в каталог данных раздела test_topic-0/ (то есть раздел с номером 0) в каталоге /data/kafka-logs/.

Поскольку каждый раздел test_topic имеет 2 реплики, я нашел еще один раздел с номером 0 для хранения данных реплики на узле брокера tbds-172-16-16-16. Войдите в узел брокера tbds-172-16-16-16, откройте каталог данных кэша раздела с номером 0 и получите следующую информацию:

Обнаружено, что объем кэшированных данных в каталоге данных раздела test_topic-0/ брокерского узла tbds-172-16-16-16 увеличился, то есть данные сообщения, отправленные повторно для производства, кэшируются.

Видно, что после перемещения нет новых данных кэша в каталоге кэша данных раздела с номером 0 узла брокера tbds-172-16-16-12. Соответственно, tbds-172-16-16-16, узел брокера без операции перемещения данных раздела, недавно добавил повторно отправленные данные в каталог данных кэша раздела с номером 0.

Означает ли это, что данные раздела нельзя перемещать между дисками брокера?

1.2.4 Вызвать перезапуск Дафа: перезапустить кафку

Перезапустите кластер kafka.После завершения перезапуска обнаружено, что данные в каталоге данных кэша раздела с номером 0 узла брокера tbds-172-16-16-12 также увеличились до нормального уровня.

Указывает, что после перезапуска данные переноса между разными дисками посредника вступили в силу.

1.2.5 Убедитесь, что данные раздела, переносимые между дисками, действительны.

Снова отправьте 500 фрагментов данных в test_topic, одновременно используйте данные, а затем проверьте данные:

GROUP    TOPIC      PARTITION   CURRENT-OFFSET   LOG-END-OFFSET   LAG   OWNER
groupid1 test_topic 0       521      521      0      kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 1       468      468      0      kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 2       511      511      0      kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3

Проверьте каталог кэша данных раздела test_topic-0 двух узлов брокера tbds-172-16-16-12 и tbds-172-16-16-16:

Выяснил, что две реплики абсолютно одинаковые.

1.3 Заключение

Разделенные каталоги данных можно свободно перемещать между различными дисками данных в брокере Kafka. После завершения миграции перезапустите kafka, чтобы изменения вступили в силу.

2. Передача данных раздела между разными брокерами

После расширения кластера Kafka, поскольку недавно расширенный брокер не имеет кэш-данных, легко привести к неравномерному распределению данных в системе. Следовательно, необходимо перенести данные раздела исходного брокера кластера на новый узел брокера расширения.

Для передачи данных раздела между разными брокерами вы можете использовать инструмент-скрипт kafka-reassign-partitions.sh, который поставляется вместе с kafka.

На основе исходных 3 брокеров в тестовом кластере kafka мы расширили 1 брокера.

2.1 Получите дистрибутив раздела test_topic

Выполнение заказа:

./kafka-topics.sh --zookeeper 172.16.16.11:2181 --topic test_topic --describe

Вы можете получить распределение 3 разделов test_topic (каждый раздел имеет 2 реплики) на трех узлах брокера:

Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001
Topic: test_topic Partition: 1 Leader: 1003 Replicas: 1003,1002 Isr: 1003,1002
Topic: test_topic Partition: 2 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003

2.2 Получите файл квоты для перераспределения темы

Поручил написать скрипт: move_kafka_topic.json выглядит следующим образом:

{"topics": [{"topic":"test_topic"}], "version": 1}

Выполните скрипт генерации плана размещения:

./kafka-reassign-partitions.sh --zookeeper tbds-172-16-16-11:2181 --topics-to-move-json-file /tmp/move_kafka_topic.json --broker-list "1001,1002,1003,1004" --generate

Список брокеров в команде заполняет идентификаторы четырех брокеров в кластере kafka. Разные кластеры kafka имеют разные идентификаторы брокера из-за разных методов развертывания. Наши идентификаторы брокера тестового кластера: 1001, 1002, 1003, 1004. Читатели должны указать идентификатор брокера в соответствии со своими настройками кластера kafka.

После выполнения команды я получаю следующий результат:

Current partition replica assignment #当前分区的副本分配
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1002,1001]},{"topic":"test_topic","partition":2,"replicas":[1001,1003]},{"topic":"test_topic","partition":1,"replicas":[1003,1002]}]}
Proposed partition reassignment configuration #建议的分区配置
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1001,1002]},{"topic":"test_topic","partition":2,"replicas":[1003,1004]},{"topic":"test_topic","partition":1,"replicas":[1002,1003]}]}

После конфигурации «Предлагаемое переназначение раздела» следует план распределения раздела в формате json, сгенерированный в соответствии с указанным списком посредников в командной строке. Скопируйте и сохраните конфигурацию Proposed partition reassignment configuration в файл move_kafka_topic_result.json:

{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1001,1002]},{"topic":"test_topic","partition":2,"replicas":[1003,1004]},{"topic":"test_topic","partition":1,"replicas":[1002,1003]}]}

2.3 Перераспределить данные раздела темы

Выполните команду переназначения:

./kafka-reassign-partitions.sh --zookeeper tbds-172-16-16-11:2181 --reassignment-json-file /tmp/move_kafka_topic_result.json --execute

Получил следующий результат:

Current partition replica assignment
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1002,1001]},{"topic":"test_topic","partition":2,"replicas":[1001,1003]},{"topic":"test_topic","partition":1,"replicas":[1003,1002]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1001,1002]},{"topic":"test_topic","partition":2,"replicas":[1003,1004]},{"topic":"test_topic","partition":1,"replicas":[1002,1003]}]}

Судя по возвращенным результатам, задача перераспределения данных раздела успешно запущена.

2.4 Просмотр хода перераспределения данных раздела

Чтобы проверить статус выделения, выполните команду:

./kafka-reassign-partitions.sh --zookeeper tbds-172-16-16-11:2181 --reassignment-json-file /tmp/move_kafka_topic_result.json --verify

получил ответ:

Status of partition reassignment:
Reassignment of partition [test_topic,0] completed successfully
Reassignment of partition [test_topic,2] completed successfully
Reassignment of partition [test_topic,1] completed successfully

Указывает, что задача повторного шага данных раздела завершена.

2.5 Снова получить дистрибутив раздела test_topic

Чтобы снова просмотреть распределение каждого раздела, выполните команду:

./kafka-topics.sh --zookeeper 172.16.16.11:2181 --topic test_topic --describe

Получите возвращаемый результат:

Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 1002 Replicas: 1001,1002 Isr: 1002,1001
Topic: test_topic Partition: 1 Leader: 1003 Replicas: 1002,1003 Isr: 1003,1002
Topic: test_topic Partition: 2 Leader: 1003 Replicas: 1003,1004 Isr: 1003,1004

Из результатов видно, что данные раздела test_topic были перераспределены с 3 исходных брокеров на 4 брокера.

3. Заключение теста

Ø Разделенные каталоги данных можно свободно перемещать между различными дисками данных в брокере Kafka. После завершения миграции перезапустите kafka, чтобы изменения вступили в силу;

Ø Kafka может переносить данные между разными брокерами с помощью инструмента-скрипта kafka-reassign-partitions.sh, который поставляется вместе с Kafka.

В-четвертых, устраните сбой кластера kafka клиента.

Мы используем проверенный в этой статье метод для переноса данных между разными дисками в узле брокера для кластера Kafka клиента, выполнения переноса данных для нескольких тем и, наконец, достижения равномерного распределения кэша данных между дисками.

В то же время мы расширили кластер kafka клиента.После расширения мы приняли метод миграции секционированных данных между различными брокерами, описанный в этой статье, и выполнили миграцию данных для нескольких тем, чтобы гарантировать, что новый узел расширения также имеет кэшированные данные. , а исходный узел брокера сохраняет данные. Давление уменьшается.

Связанное Чтение

Принципы дизайна Кафки

Практика развертывания сервисов кластеров с отслеживанием состояния Kafka и zookeeper в kubernetes (1)

Эта статья была разрешена автором для публикации в сообществе Yunjia, пожалуйста, укажите при перепечаткепервоисточник


Категории