Технический блог:GitHub.com/Делайте это с душой/Особые…
В то же время, вы также можете обратить внимание на мой публичный аккаунт WeChat.AlwaysBeta, вас ждет еще больше захватывающего контента.
Введение
Цитируя официальное заявление: ZMQ (ZeroMQ, в дальнейшем именуемый ZMQ) — это простой и удобный в использовании транспортный уровень, библиотека сокетов, подобная фреймворку, который делает программирование для сокетов более простым, лаконичным и более производительным.
— это библиотека очередей обработки сообщений, которая эластично масштабируется для нескольких потоков, ядер и мейнфреймов.
Явная цель ZMQ — «стать частью стандартного стека сетевых протоколов, а затем и ядром Linux». Им еще предстоит увидеть свой успех. Тем не менее, это, безусловно, многообещающая и очень нужная оболочка над «традиционными» сокетами BSD. ZMQ делает написание высокопроизводительных сетевых приложений чрезвычайно простым и увлекательным.
Он существенно отличается от RabbitMQ, ActiveMQ и т. д. ZeroMQ — это вовсе не сервер очереди сообщений, а скорее набор базовых сетевых коммуникационных библиотек, добавляющий уровень инкапсуляции к исходному Socket API, чтобы упростить наши операции.
Три режима работы
Режим запрос-ответ:
Когда дело доходит до модели «запрос-ответ», я должен сказать о ее модели потока сообщений. Модель потока сообщений означает, что в этом режиме необходимо строго соблюдать метод «один вопрос и один ответ».
После отправки сообщения, если ответ не получен, при отправке второго сообщения будет выдано исключение. Точно так же для Rep не разрешено отправлять сообщение, пока сообщение не будет получено.
На основе этого формируется режим ответа «один вопрос и один ответ».
server:
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
message = socket.recv()
print("Received: %s" % message)
socket.send("I am OK!")
client:
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.send('Are you OK?')
response = socket.recv()
print("response: %s" % response)
Режим публикации-подписки:
В режиме «публикация-подписка» «издатель» привязывается к указанному адресу, например «192.168.10.1:5500», а «подписчик» подключается к этому адресу. В этом режиме поток сообщений является однонаправленным, разрешая поток только от «издателя» к «подписчику». А "издатель" просто отправляет сообщение, вне зависимости от того, есть ли "подписчик". У «издателя» может быть несколько подписчиков, а «подписчик» также может подписаться на нескольких издателей.
Хотя мы знаем, что «издателю» все равно, существует ли «подписчик» при отправке сообщения, поэтому запуск сначала «издателя», а затем запуска «подписчика» может легко привести к потере некоторых сообщений. Затем может быть сделано заявление: «Сначала я запускаю «подписчика», затем запускаю «издателя», и я могу решить эту проблему?»
Для ZeroMQ такой подход также не гарантирует 100% надежности. В мире ZeroMQ есть термин «медленный плотник», который означает, что даже если я сначала запущу «подписчика», а затем «издателя», «подписчик» всегда потеряет первый пакет данных. Потому что, когда «абонент» устанавливает TCP-соединение с конечной точкой, оно будет содержать несколько миллисекунд времени рукопожатия.Хотя время короткое, оно существует. Кроме того, фоновый ввод-вывод ZeroMQ выполняется одним способом, поэтому, если между двумя сторонами не наложена стратегия синхронизации, потеря сообщений неизбежна.
Некоторые другие особенности модели «публикация-подписка» в ZeroMQ:
- Справедливая очередь, когда «подписчик» подключен к нескольким издателям, он будет читать сообщения от каждого «издателя» сбалансированным образом, и не будет ситуации, когда один «издатель» подавляет другие «издатели».
- Для версий выше ZMQ3.0 правила фильтрации выполняются на «издателе». Для версий ниже ZMQ3.0 правила фильтрации действуют на стороне «подписчика». Фактически, именно здесь обрабатывается сообщение.
server:
# -*- coding=utf-8 -*-
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
for i in range(10):
print('send message...' + str(i))
socket.send('message' + str(i))
time.sleep(1)
client:
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE, '')
while True:
response = socket.recv()
print("response: %s" % response)
Параллельный конвейерный режим:
Прежде чем объяснять "конвейерный режим", необходимо прояснить, что в ZeroMQ нет абсолютного различия между сервером и клиентом. Весь прием и передача данных осуществляются в единицах соединений, и различаются только типы, определенные ZeroMQ. Так же, как когда сокет привязан к адресу, вы можете использоватьbind
, вы также можете использоватьconnect
, только серверная часть, как мы обычно понимаемbind
по адресу, понимая клиентаconnec
на этот адрес.
«Конвейерный режим» обычно используется для распределения задач и сбора результатов.Генератор задач генерирует задачи, «справедливо» распределяет их между всеми работниками, находящимися под его юрисдикцией, и после завершения сборщик результатов собирает результаты выполнения задач.
Общий процесс прост для понимания: рабочий подключается к генератору задач, ждет, пока задача будет сгенерирована, и после завершения отправляет результат сборщику результатов. Если вы хотите различать понятия клиента и сервера, генератор задач и сборщик результатов здесь — это сервер, а рабочий — это клиент.
Как упоминалось ранее, распределение задач здесь «справедливое», потому что алгоритм LRU используется внутри для поиска простаивающего работника, который не работал дольше всего. Но справедливость здесь относительна, при запуске генератора задач первый подключенный к нему воркер будет нести задачи, сгенерированные всем генератором задач в одно мгновение.
Таким образом, он состоит из трех частей: push для отправки данных, работа для кэширования данных и получение для обработки сбора данных. В отличие от Publish-Subscribe есть кеш данных и нагрузка по обработке.
При отключении соединения данные не будут потеряны, и данные будут продолжать отправляться на противоположный конец после повторного подключения.
server:
# -*- coding=utf-8 -*-
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")
for i in range(10):
socket.send('message' + str(i))
# 没启 worker 时不会发消息
print('send message...' + str(i))
time.sleep(1)
work:
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
receive = context.socket(zmq.PULL)
receive.connect('tcp://127.0.0.1:5557')
sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')
while True:
data = receive.recv()
print('transform...' + data)
sender.send(data)
client:
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")
while True:
response = socket.recv()
print("response: %s" % response)
выше.
Справочная документация: