В третьей статье о Kafka поговорим о том, как использовать Python для чтения и записи Kafka. В этой статье сторонняя библиотека, которую мы собираемся использовать, называетсяkafka-python
. каждый может использоватьpip
илиpipenv
Установите его. Вы можете выбрать один из следующих двух вариантов установки.
python3 -m pip install kafka-python
pipenv install kafka-python
Как показано ниже:
В этой статье мы воспользуемся кратчайшим кодом для реализации примера чтения и записи Kafka.
Создать файл конфигурации
Поскольку к Kafka должны подключаться и производители, и потребители, я написал отдельный файл конфигурации.config.py
Он используется для сохранения параметров, необходимых для подключения к Kafka, вместо того, чтобы напрямую прописывать эти параметры Hard Code в коде:
# config.py
SERVER = '123.45.32.11:1234'
USERNAME = 'kingname'
PASSWORD = 'kingnameisgod'
TOPIC = 'howtousekafka'
Kafka, использованная в демонстрации в этой статье, была создана коллегами из нашей группы по платформе, для подключения требуется учетная запись и пароль, поэтому я добавил ее в файл конфигурации.USERNAME
иPASSWORD
два. Если используемая вами Kafka не имеет учетной записи и пароля, вам нужно толькоSERVER
иTOPIC
Вот и все.
Создать производителя
Код настолько прост, что даже не требует пояснений. первое использованиеKafkaProducer
Класс подключается к Kafka, получает объект производителя и записывает в него данные.
import json
import time
import datetime
import config
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=config.SERVER,
value_serializer=lambda m: json.dumps(m).encode())
for i in range(100):
data = {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
producer.send(config.TOPIC, data)
time.sleep(1)
параметрbootstrap_servers
Используется для указания адреса подключения к серверу для Kafka.
параметрvalue_serializer
Используется для указания метода сериализации. Здесь я использую json для сериализации данных, так что я передаю словарь в Kafka, и Kafka автоматически преобразует его в строку JSON.
Как показано ниже:
Обратите внимание, что на картинке выше я прописал еще 4 параметра:
security_protocol="SASL_PLAINTEXT"
sasl_mechanism="PLAIN"
sasl_plain_username=config.USERNAME
sasl_plain_password=config.PASSWORD
Эти четыре параметра добавлены, потому что мне нужно подключиться к Kafka через пароль.Если у вашей Kafka нет пароля учетной записи, эти четыре параметра не требуются.
создать потребителя
Потребители Kafka также должны подключиться к Kafka, сначала используйтеKafkaConsumer
Класс инициализирует объект-потребитель, а затем выполняет цикл для чтения данных. код показывает, как показано ниже:
import config
from kafka import KafkaConsumer
consumer = KafkaConsumer(config.TOPIC,
bootstrap_servers=config.SERVER,
group_id='test',
auto_offset_reset='earliest')
for msg in consumer:
print(msg.value)
Первый параметр KafkaConsumer используется для указания темы. Вы можете думать об этой теме как о ключе Redis.
bootstrap_servers используется для указания адреса подключения к серверу Kafka.
Строка, следующая за параметром group_id, может быть заполнена произвольно. Если две программыTopic
иgroup_id
одинаковы, то данные, которые они считывают, не будут повторяться, две программыTopic
то же самое, ноgroup_id
Если они разные, то каждый из них потребляет все данные, не влияя друг на друга.
auto_offset_rest Этот параметр имеет два значения.earliest
иlatest
, если этот параметр опущен, то по умолчаниюlatest
. Этот параметр будет введен отдельно. Пропустите это здесь.
После подключения к Kafka вы можете напрямую использовать итерацию цикла for на объекте-потребителе, чтобы непрерывно получать в нем данные.
Запустить демо
Запустите две программы-потребителя и одну программу-производителя, эффект показан на следующем рисунке.
Мы видим, что данные, считанные двумя программами-потребителями, не повторяются и не пропускаются.
Когда все данные будут использованы, если вы закроете две потребительские программы и снова запустите одну из них, вы обнаружите, что никакие данные не будут напечатаны.
Но если вы измените group_id, программа может начать нормально потреблять с нуля, как показано на следующем рисунке:
Несколько мест, где многие люди путаются
самый ранний и последний
Когда мы создаем объект-потребитель, есть параметр с именемauto_offset_reset='earliest'
. кто-то виделearliest
иlatest
, считать само собой разумеющимся, что настройкаearliest
, то есть читать в обратном направлении из заголовка Темы, установленной наlatest
Это игнорирование предыдущих данных, и после запуска программы начинают считываться новые данные.
Эта точка зрения неверна.
auto_offset_reset
Этот параметр только вgroup
Работает только при первом запуске, со второго запуска этот параметр недействителен.
Предположим, у вас сейчас есть 100 данных в вашей теме, и вы установили новый group_id какtest2
.auto_offset_reset
Установить какearliest
. Затем, когда ваш потребитель работает, Kafka сначала установит для вашего смещения значение 0, а затем позволит вам начать потребление с нуля.
Предположим, у вас сейчас есть 100 данных в вашей теме, и вы установили новый group_id какtest3
.auto_offset_reset
Установить какlatest
. Затем, когда ваш потребитель работает, Kafka не будет возвращать вам никаких данных, потребитель выглядит так, как будто он застрял, но Kafka напрямую заставит состояние первых 100 фрагментов данных быть установленным в состояние, которое было использовано вами. . Итак, в настоящее время ваше смещение равно 99. Потребитель не может прочитать его, пока производитель не вставит новый фрагмент данных. Смещение, соответствующее этим новым данным, становится равным 100.
Предположим, у вас сейчас есть 100 данных в вашей теме, и вы установили новый group_id какtest4
.auto_offset_reset
Установить какearliest
. Затем, когда ваш потребитель работает, Kafka сначала установит для вашего смещения значение 0, а затем позволит вам начать потребление с нуля. Когда 50-й фрагмент данных будет использован, вы закроете программу-потребитель и поместитеauto_offset_reset
Установить какlatest
, а затем перезапустить. В это время потребитель продолжит считывать данные с 51-го. Остальные 50 фрагментов данных не будут пропущены.
Таким образом, функция auto_offset_reset состоит в том, чтобы установить для вас начальное смещение, когда ваша группа запускается в первый раз и смещения нет. И как только ваша группа уже имеет смещение, параметр auto_offset_reset больше не будет работать.
Как распределяются разделы?
Для той же группы по той же теме:
Предположим, что ваша тема имеет 10 разделов, и вы запускаете только 1 потребителя в начале. Затем потребитель будет вращаться, чтобы читать данные из этих 10 разделов.
Когда вы запустите второго потребителя, Kafka возьмет 5 разделов у первого потребителя и передаст их второму потребителю. Таким образом, каждый из двух потребителей читает по 5 разделов. не влияют друг на друга.
Когда снова появляется третий потребитель, Kafka берет еще 1 раздел от первого потребителя и 2 раздела от второго потребителя к третьему потребителю. Следовательно, Потребитель 1 имеет 4 раздела, Потребитель 2 имеет 3 раздела, а Потребитель 3 имеет 3 раздела, которые не влияют друг на друга.
Когда у вас есть 10 потребителей, потребляющих вместе, каждый потребитель читает раздел, не влияя друг на друга.
Когда появляется 11-й потребитель, он ничего не может прочитать, потому что не может выделить раздел.
Итак, в прошлой статье я сказал, что в одной и той же теме, в одной и той же группе, сколько у вас партитонов, вы можете запускать столько процессов, чтобы потреблять их одновременно.
Кафка совершенно без повторений и пропусков?
В крайних случаях Кафка повторит и промахнется, но такие крайние случаи не распространены. Если ваша Kafka часто пропускает данные или всегда имеет дублирующиеся данные, значит, ваша среда настроена неправильно или проблема связана с кодом.
совет
Напоминаем еще раз: профессиональные люди делают профессиональные вещи, не создавайте кластеры Kafka самостоятельно. Позвольте преданному коллеге копировать, создавать и поддерживать, вы просто используете его. Это самый эффективный способ сделать это.