Как читать и писать в Kafka с помощью Python?

Python Kafka

В третьей статье о Kafka поговорим о том, как использовать Python для чтения и записи Kafka. В этой статье сторонняя библиотека, которую мы собираемся использовать, называетсяkafka-python. каждый может использоватьpipилиpipenvУстановите его. Вы можете выбрать один из следующих двух вариантов установки.

python3 -m pip install kafka-python
pipenv install kafka-python

Как показано ниже:

В этой статье мы воспользуемся кратчайшим кодом для реализации примера чтения и записи Kafka.

Создать файл конфигурации

Поскольку к Kafka должны подключаться и производители, и потребители, я написал отдельный файл конфигурации.config.pyОн используется для сохранения параметров, необходимых для подключения к Kafka, вместо того, чтобы напрямую прописывать эти параметры Hard Code в коде:

# config.py
SERVER = '123.45.32.11:1234'
USERNAME = 'kingname'
PASSWORD = 'kingnameisgod'
TOPIC = 'howtousekafka'

Kafka, использованная в демонстрации в этой статье, была создана коллегами из нашей группы по платформе, для подключения требуется учетная запись и пароль, поэтому я добавил ее в файл конфигурации.USERNAMEиPASSWORDдва. Если используемая вами Kafka не имеет учетной записи и пароля, вам нужно толькоSERVERиTOPICВот и все.

Создать производителя

Код настолько прост, что даже не требует пояснений. первое использованиеKafkaProducerКласс подключается к Kafka, получает объект производителя и записывает в него данные.

import json
import time
import datetime
import config
from kafka import KafkaProducer


producer = KafkaProducer(bootstrap_servers=config.SERVER,
                         value_serializer=lambda m: json.dumps(m).encode())

for i in range(100):
    data = {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    producer.send(config.TOPIC, data)
    time.sleep(1)

параметрbootstrap_serversИспользуется для указания адреса подключения к серверу для Kafka.

параметрvalue_serializerИспользуется для указания метода сериализации. Здесь я использую json для сериализации данных, так что я передаю словарь в Kafka, и Kafka автоматически преобразует его в строку JSON.

Как показано ниже:

Обратите внимание, что на картинке выше я прописал еще 4 параметра:

security_protocol="SASL_PLAINTEXT"
sasl_mechanism="PLAIN"
sasl_plain_username=config.USERNAME
sasl_plain_password=config.PASSWORD

Эти четыре параметра добавлены, потому что мне нужно подключиться к Kafka через пароль.Если у вашей Kafka нет пароля учетной записи, эти четыре параметра не требуются.

создать потребителя

Потребители Kafka также должны подключиться к Kafka, сначала используйтеKafkaConsumerКласс инициализирует объект-потребитель, а затем выполняет цикл для чтения данных. код показывает, как показано ниже:

import config
from kafka import KafkaConsumer


consumer = KafkaConsumer(config.TOPIC,
                         bootstrap_servers=config.SERVER,
                         group_id='test',
                         auto_offset_reset='earliest')
for msg in consumer:
    print(msg.value)

Первый параметр KafkaConsumer используется для указания темы. Вы можете думать об этой теме как о ключе Redis.

bootstrap_servers используется для указания адреса подключения к серверу Kafka.

Строка, следующая за параметром group_id, может быть заполнена произвольно. Если две программыTopicиgroup_idодинаковы, то данные, которые они считывают, не будут повторяться, две программыTopicто же самое, ноgroup_idЕсли они разные, то каждый из них потребляет все данные, не влияя друг на друга.

auto_offset_rest Этот параметр имеет два значения.earliestиlatest, если этот параметр опущен, то по умолчаниюlatest. Этот параметр будет введен отдельно. Пропустите это здесь.

После подключения к Kafka вы можете напрямую использовать итерацию цикла for на объекте-потребителе, чтобы непрерывно получать в нем данные.

Запустить демо

Запустите две программы-потребителя и одну программу-производителя, эффект показан на следующем рисунке.

Мы видим, что данные, считанные двумя программами-потребителями, не повторяются и не пропускаются.

Когда все данные будут использованы, если вы закроете две потребительские программы и снова запустите одну из них, вы обнаружите, что никакие данные не будут напечатаны.

Но если вы измените group_id, программа может начать нормально потреблять с нуля, как показано на следующем рисунке:

Несколько мест, где многие люди путаются

самый ранний и последний

Когда мы создаем объект-потребитель, есть параметр с именемauto_offset_reset='earliest'. кто-то виделearliestиlatest, считать само собой разумеющимся, что настройкаearliest, то есть читать в обратном направлении из заголовка Темы, установленной наlatestЭто игнорирование предыдущих данных, и после запуска программы начинают считываться новые данные.

Эта точка зрения неверна.

auto_offset_resetЭтот параметр только вgroupРаботает только при первом запуске, со второго запуска этот параметр недействителен.

Предположим, у вас сейчас есть 100 данных в вашей теме, и вы установили новый group_id какtest2.auto_offset_resetУстановить какearliest. Затем, когда ваш потребитель работает, Kafka сначала установит для вашего смещения значение 0, а затем позволит вам начать потребление с нуля.

Предположим, у вас сейчас есть 100 данных в вашей теме, и вы установили новый group_id какtest3.auto_offset_resetУстановить какlatest. Затем, когда ваш потребитель работает, Kafka не будет возвращать вам никаких данных, потребитель выглядит так, как будто он застрял, но Kafka напрямую заставит состояние первых 100 фрагментов данных быть установленным в состояние, которое было использовано вами. . Итак, в настоящее время ваше смещение равно 99. Потребитель не может прочитать его, пока производитель не вставит новый фрагмент данных. Смещение, соответствующее этим новым данным, становится равным 100.

Предположим, у вас сейчас есть 100 данных в вашей теме, и вы установили новый group_id какtest4.auto_offset_resetУстановить какearliest. Затем, когда ваш потребитель работает, Kafka сначала установит для вашего смещения значение 0, а затем позволит вам начать потребление с нуля. Когда 50-й фрагмент данных будет использован, вы закроете программу-потребитель и поместитеauto_offset_resetУстановить какlatest, а затем перезапустить. В это время потребитель продолжит считывать данные с 51-го. Остальные 50 фрагментов данных не будут пропущены.

Таким образом, функция auto_offset_reset состоит в том, чтобы установить для вас начальное смещение, когда ваша группа запускается в первый раз и смещения нет. И как только ваша группа уже имеет смещение, параметр auto_offset_reset больше не будет работать.

Как распределяются разделы?

Для той же группы по той же теме:

Предположим, что ваша тема имеет 10 разделов, и вы запускаете только 1 потребителя в начале. Затем потребитель будет вращаться, чтобы читать данные из этих 10 разделов.

Когда вы запустите второго потребителя, Kafka возьмет 5 разделов у первого потребителя и передаст их второму потребителю. Таким образом, каждый из двух потребителей читает по 5 разделов. не влияют друг на друга.

Когда снова появляется третий потребитель, Kafka берет еще 1 раздел от первого потребителя и 2 раздела от второго потребителя к третьему потребителю. Следовательно, Потребитель 1 имеет 4 раздела, Потребитель 2 имеет 3 раздела, а Потребитель 3 имеет 3 раздела, которые не влияют друг на друга.

Когда у вас есть 10 потребителей, потребляющих вместе, каждый потребитель читает раздел, не влияя друг на друга.

Когда появляется 11-й потребитель, он ничего не может прочитать, потому что не может выделить раздел.

Итак, в прошлой статье я сказал, что в одной и той же теме, в одной и той же группе, сколько у вас партитонов, вы можете запускать столько процессов, чтобы потреблять их одновременно.

Кафка совершенно без повторений и пропусков?

В крайних случаях Кафка повторит и промахнется, но такие крайние случаи не распространены. Если ваша Kafka часто пропускает данные или всегда имеет дублирующиеся данные, значит, ваша среда настроена неправильно или проблема связана с кодом.

совет

Напоминаем еще раз: профессиональные люди делают профессиональные вещи, не создавайте кластеры Kafka самостоятельно. Позвольте преданному коллеге копировать, создавать и поддерживать, вы просто используете его. Это самый эффективный способ сделать это.