1. Предпосылки
Для достижения системной развязки, пиковых бритья и функции Asynchrony, разработчики Enterprise System System рассмотрит использование очередей сообщений. Рынок известен своей высокой пропускной способностьюKafka
, широко используется в финансовых или ордерных системах с абсолютной достоверностью информацииRabbitMQ
, это промежуточное ПО для сообщений с открытым исходным кодом очень популярно среди разработчиков. Сегодня автор расскажет вам об очереди сообщений, реализованной в redis, она в основном используется в некоторых бизнес-сценариях, не требующих особо высокой надежности сообщений внутри системы, ее характеристики — малый вес, высокая эффективность и простота.
2. Redis реализует структуру данных и команды очереди сообщений
Redis предоставляет различные структуры данных и соответствующие команды для реализации очередей сообщений, что упрощает реализацию простой очереди сообщений при разработке системы. пройти черезlist
Структура данных, которая может реализовать простую очередь сообщений.zset
Структура данных может реализовать очередь отложенных сообщений; для реализации многоадресного механизма сообщения Redis также предоставляетpublish
,subscribe
команда для подписки и публикации сообщений; добавлен redis5.0stream
Структура данных реализует более профессиональную очередь сообщений. Сначала представим их по отдельности.
2.1 list
Реализовать простую очередь
предоставлено Redislist
Сама структура данных представляет собойFIFO
(первым пришел, первым ушел) очередь, бизнес прошелlist
Очередь сообщений можно реализовать, добавляя сообщения в конец очереди производителем и удаляя сообщения из головы очереди потребителем, как показано на следующем рисунке:
Постановка сообщений в очередь может добавлять тело сообщения в очередь с временной сложностью O(1) с помощью lpush или rpush и может запускать потребителя каждый период времени (например, 1 с) для отправки сообщения.rpop/lpop
Выйдите для потребления, потребительский php-код выглядит следующим образом:
<?php
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
while($message = $redis->rpop('queue_list_test')) {
var_dump($message);
//处理完逻辑后,休眠1s
sleep(1)
}
Сон в коде потребителя очень нужен.Если сна нет, то при пустой очереди queue_list_test цикл while всегда будет устанавливать соединение с сервером redis, напрямую подтягивая ЦП клиентского сервера, а также делая сервер Redis всплеск трафика.
При таком способе обработки потребителями будет определенная задержка в потреблении сообщений, по этой причине Redis также обеспечиваетbrpop/blpop
команды, ониrpop/lpop
Блокирующая версия, то есть, когда в списке нет данных, команда будет блокироваться и ждать, пока не истечет время ожидания или не появится сообщение в очереди.Измененный потребительский php-код выглядит следующим образом:
<?php
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
while($message = $redis->brpop('queue_list_test',3600)) {
var_dump($message);
}
В приведенном выше примере необходимо получить данные списка, имя ключа которого — queue_list_test.Если данных нет, он будет заблокирован на 3600 секунд.Однако, поскольку расширение redis для php реализовано на основе метода сокета php, если php сам настраивает время ожидания чтения сокета, тайм-аут сообщит об ошибке и завершится.Конфигурация по умолчанию элемента конфигурации тайм-аута по умолчанию для сокета в php.ini:
default_socket_timeout = 60
Хороший способ практики - через phpini_set
Функция сбрасывает время тайм-аута сокета в процессе, а php-код для решения проблемы с потребителем ошибки тайм-аута выглядит следующим образом:
<?php
ini_set('default_socket_timeout', -1); //不超时
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
while($message = $redis->brpop('queue_list_test',0)) {
var_dump($message);
}
Установка тайм-аута brpop на 0 означает, что если в очереди не будет сообщений, сценарий будет ждать бесконечно долго. Это решает проблему отложенного потребления потребителями.
2.2 zset
Реализовать очередь отложенного потребления
в rediszset
а такжеset
Точно так же все они представляют собой наборы строк, и они не позволяют повторяющимся элементам появляться в одномset
середина. Их отличие состоит в том, что каждый элемент отсортированного множества имеет оценку (score
) связан с ним, и Redis сортирует элементы набора от меньшего к большему по количеству баллов. Только представьте, если вы добавите сообщение вzset
структуру и установите временную метку сообщения, которое будет использовано, в соответствующуюscore
, формирует ли он очередь потребления временных рядов? Как показано ниже:
Очередь отложенных сообщений реализована следующим образом:
- продюсер через
zadd
Команда добавляет сообщение в очередь потребления временного ряда,score
Отметка времени (момент в будущем), когда сообщение должно быть использовано. - потребителей через
zrangebyscore
Команда извлекает сообщения, которые необходимо использовать на данный момент.
2.3 PubSub
Реализовать многоадресную рассылку сообщений
Использование, о котором мы упоминали ранееlist
Реализованы и используются простые очередиzset
Реализованная очередь отложенного потребления не поддерживает многоадресную рассылку сообщений.Если несколько разных групп потребителей хотят использовать сообщения в очереди, они могут последовательно соединить логику группы потребителей только для непрерывного потребления:
Редис одинPubSub
Модуль для поддержки многоадресной рассылки сообщений, то есть PublisherSubscriber (шаблон издатель/подписчик). Многоадресная рассылка сообщений позволяет производителям создавать сообщения только один раз, а промежуточное ПО отвечает за копирование сообщений в несколько очередей сообщений, каждая из которых используется соответствующей группой потребителей:
оPubSub
Соответствующие команды и методы использования .PubSub
У конструкции есть фатальный недостаток:Сохранение сообщений не поддерживается
PubSub
Производитель сообщения передает сообщение, и Redis напрямую находит соответствующего получателя для его передачи. Если потребителя также нет, сообщение отбрасывается. Например, есть три потребителя в начале, один потребитель внезапно кладет трубку, производитель будет продолжать отправлять сообщения, а два других потребителя могут продолжать получать сообщения, но при повторном подключении зависшего потребителя соединение разрывается. период сообщение, отправленное производителем, полностью теряется для потребителя. Если Redis аварийно завершает работу или перезагружается, это означает, что потребителя больше нет, и все сообщения будут отбрасываться напрямую.PubSub
Сообщение не является постоянным.
из-заPubSub
С этими недостатками практически невозможно найти сценарии его применения в области очередей сообщений!
Автор Redis также начал отдельный проект
Disque
Он специально используется для очередей сообщений, но он еще не разработан и находится в бета-версии. К июню 2018 года Redis5.0 добавилStream
Структура данных, эта функция переносит в Redis постоянную очередь сообщений, с тех порPubSub
Функция как очередь сообщений исчезла в толпе,Disque
Релизной версии никогда не будет.
2.4 Мощная постоянная очередь сообщений, поддерживающая многоадресную рассылку сообщений ——stream
Самая большая особенность Redis 5.0 — добавление новой структуры данных.stream
В качестве очереди сообщений он в значительной степени опирается на дизайн Kafka, и с тех пор Redis имеет действительно профессиональную очередь сообщений. Структура показана на следующем рисунке:
stream
Сама структура представляет собой массив очереди уникальной по времени последовательности, аналогичной очереди сообщений временных рядов, о которой мы упоминали выше с zset. Каждое сообщение имеет идентификатор сообщения в видеtimestampInMillis-sequence
, например, 1607226700982-5, что указывает на то, что текущее сообщение генерируется с отметкой времени 1607226700982 в миллисекундах и является 5-м сообщением в течение этой миллисекунды.Идентификатор сообщения может быть автоматически сгенерирован сервером или указан самим клиентом (тем самым вы Можно заметить, что Stream поддерживает функцию отсрочки задач), но форма должна быть «целое-целое», а идентификатор добавленного позже сообщения должен быть больше идентификатора предыдущего сообщения.
Каждый поток имеет уникальное имя, которое является ключом Redis.При первом использовании командыxadd
Автоматически создается по команде. Как видно из рисунка, каждый поток может быть связан с несколькими группами потребителей, и каждая группа потребителей будет иметь курсорlast_deliverred_id
Переместиться вперед в очереди массива Stream, указав, какое сообщение использовала текущая группа потребителей.stream
Внутреннее имя также уникально, и группа потребителей должна пройтиxgroup_create
Создается отдельно и указывается для запуска потребления с идентификатора сообщения, который соответствует группе внутреннего потребления.last_deliverred_id
Переменная.
Состояния групп потребителей независимы и не зависят друг от друга, каждое сообщение Stream будет доставлено каждой группе потребителей, и каждая группа потребителей может монтировать несколько потребителей (Consumer
), между этими потребителями существуют конкурентные отношения, любой потребитель, прочитавший сообщение, сделает курсорlast_deliverred_id
Двигаясь вперед, каждый потребитель также имеет уникальное имя в группе.
Переменные состояния внутри потребителяpending_ids
Используется для записи того, какие сообщения в данный момент прочитаны клиентом, но не были подтверждены. этоpending_ids
Эта переменная используется для того, чтобы клиент использовал сообщение хотя бы один раз и не осталось необработанным из-за потери при передаче по сети.
оstream
Дизайнstream
, это лучшее решение для Redis для создания очереди сообщений в будущем!
3. Традиционный дизайн очереди сообщений Redis
Поскольку Redis 5.0 был только что запущен, по историческим причинам большинство очередей Redis в корпоративной системе используют традиционную структуру очереди сообщений, то есть используют традиционную структуру очереди сообщений.list
+ zset
Реализация структурной инкапсуляции.В традиционной схеме проектирования очередей Redis стоит изучить много идей для решения проблем.В этом разделе мы поговорим о проблемах, с которыми сталкиваются традиционные очереди сообщений Redis, и их решениях.
3.1 Интеграция «отложенных» очередей и «немедленных» очередей
Как мы уже говорили, традиционная очередь сообщений Redis используетlist
Делайте очередь мгновенного потребления, проходитеlpop/rpop
илиblpop/brpop
команда для отмены данных о потреблении; используйтеzset
Делайте отложенную очередь потребления, проходитеzrangebyscore
Команда для получения данных о потреблении. Но потребителям все равно, каким способом получать данные о потреблении из очереди. Хорошей практикой является инкапсуляция метода и предоставление его потребителю, чтобы каждый раз ему нужно было только извлекать данные из очереди для потребления.Обычная реализация заключается в том, что каждый раз, когда потребитель извлекает данные из очереди с помощью этого метода, система Первый изzset
Перенести просроченные задачи из очереди задержки вlist
Сразу очередь, потом проходlpop/rpop
Берем данные из очереди и возвращаем их потребителю, как показано на схеме:
Следует отметить, что процесс инкапсуляции системного метода выборки сообщений из очереди и переноса просроченных сообщений из очереди с отложенной в немедленную очередь не похож наlpop/rpop
То же самое атомарно, чтобы обеспечить параллельную безопасность системы, требуется сотрудничество.lua
script для выполнения задания, пример кода выглядит следующим образом:
/**
* Get the Lua script to migrate expired jobs back onto the queue.
*
* KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
* KEYS[2] - The queue we are moving jobs to, for example: queues:foo
* ARGV[1] - The current UNIX timestamp
*
* @return string
*/
public static function migrateExpiredJobs()
{
return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end
return val
LUA;
}
вышесказанноеlua
сценарий будетzrangebyscore
,zremrangebyrank
,rpush
Три инструкции redis упакованы вместе, чтобы обеспечить атомарность переноса сообщений, чтобы наша система могла безопасно включать несколько потребителей.
Производителям также необходимо инкапсулировать системный подход, благодаряzadd
,lpush/rpush
Инструкции являются атомарными.Нам нужно только определить, нужно ли отложить сообщение для выполнения, а затем вызвать команду, чтобы поместить задержанное сообщение в очередь с задержкой и поместить мгновенное сообщение в немедленную очередь:
3.2 Реализовать «механизм подтверждения» и «повторную попытку тайм-аута» сообщений
Мы упоминали ранееKafka
а такжеRabbitMQ
Эти профессиональные очереди сообщенийack
механизм подтверждения сообщения.ack
Механизм гарантирует, что сообщения в очереди потребляются хотя бы один раз, обычно очередь обеспечиваетРучное подтверждение,АвтоматическиДва механизма, которые должны использовать потребители, предъявляющие высокие требования к надежности сообщений.Ручной механизм подтверждения. Если традиционная очередь сообщений Redis должна быть реализованаack
механизм, первое, что приходит на ум, это использоватьset
Структура данных, которая добавляет сообщения, полученные потребителями, вset
В коллекции, когда потребитель закончит обработку сообщения, он отправит сообщение изset
Удалить из коллекции, готовоack
.
Ранее мы использовалиzset
сделать очередь задержки,zset
Слишкомset
и предоставляет значение атрибута ранжированияscore
, если мы используемzset
чтобы завершить нашу систему очередиack
, вы также можете использоватьscore
Выполните еще одну полезную функцию:тайм-аут повторной попытки. Как показано ниже:
Система сообщений Redis с добавлением «повторной попытки тайм-аута» и «механизма подтверждения» намного сложнее, чем раньше. Когда потребитель берет задачу из системы сообщений с помощью системного метода, система сообщений сначала переносит задержанную очередь, сообщения с истекшим сроком действия и сообщения без подтверждения из очереди подтверждения в немедленную очередь (чтобы обеспечить безопасность параллелизма в системе, эти два процесса сообщения о миграцииlua
Сценарий завершен); перед тем, как сообщение будет передано потребителю, система сообщений должна добавить сообщение в очередь подтверждения и установить время повторной попытки тайм-аута.retry_after_time
, то есть если сообщение переданоretry_after_time
Время все еще не подтверждено, затем сообщение будет повторно перенесено в мгновенную очередь потребителя снова; нормальная обработка после того, как потребитель должен быть передан сообщению обработчиком сообщений, полные данные потребления, подтверждение (то есть из очереди подтверждения удаляются сообщения ).
3.3 «Изящный перезапуск» для потребителей
несмотря на то чтоack
Это может гарантировать, что сообщения в очереди могут быть использованы хотя бы один раз, но обработка больших задач в бизнес-сценариях часто занимает много времени.После завершения потребления перезапустите потребителя, который мы называем потребителем.изящный перезапуск.
Читатели, знакомые с nginx, должны знать, чтоnginx -s reload
Nginx для обеспечения того, чтобы в то же время предлагать услуги по перезагрузке последнего файла конфигурации и перезапуску дочернего процесса. Его принцип работы заключается в том, что через сигналы монитора главного процесса, передаваемые дочернему процессу, дочерний процесс не завершает работу с текущей службой, а затем перезапускает новый главный дочерний процесс. Так что nginx до и после перезапуска через команду номер основного процесса не изменился.
То же самое относится и к потребителям очередей. «Элегантный перезапуск» может быть достигнут путем мониторинга сигналов. Различные языки программирования предоставляют разные функции обработки сигналов. Ниже приведен пример php.pcntl
Расширение для реализации обработки сигналов, следующая логика реализации основного кода:
class Worker
{
public $shouldQuit = false;
public function daemon(string $queueName = '')
{
$this->listenForSignals();
while (!$this->shouldQuit) {
//这里是业务的处理逻辑
}
}
//监听信号
protected function listenForSignals()
{
if ($this->supportsAsyncSignals()) {
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () {
$this->shouldQuit = true;
});
pcntl_signal(SIGUSR2, function () {
$this->shouldQuit = true;
});
pcntl_signal(SIGCONT, function () {
$this->shouldQuit = true;
});
}
}
}
В приведенном выше коде, когда мы используем команду kill для уничтожения номера процесса-потребителя, программа-потребитель получит сигнал, установит для следующего условия цикла значение false и завершит работу после обработки текущей задачи, чтобы добиться «мягкого перезапуска». Однако в развитии бизнеса часто используетсяsupervisor
управлять потребителями,supervisor
Вы также можете взаимодействовать с потребителями через сигналы и рекомендовать статью читателям.Заинтересованные могут учиться друг у друга:Использование супервизора в проектах PHP
3.4 Обработка сообщений об ошибках
пройти черезack
Механизм гарантирует, что сообщение будет использовано хотя бы один раз, или его можно будет корректно перезапустить, прослушивая сигнал потребителя, всегда будет сообщение об ошибке потребления, что неизбежно в бизнесе. Для очереди сообщений, реализованной Redis, мы можем установить функцию обработки сбоев для потребителя.Когда сообщение терпит неудачу, программа-потребитель решает, как с ним обращаться (обычно сообщение об ошибке добавляется в базу данных для последующего ручного исследования). Ниже приведена общая логика реализации php-кода:
class Worker
{
public function daemon(string $queueName = '')
{
while (!$this->shouldQuit) {
try {
//这里是业务的处理逻辑
} catch (\Exception $e) {
//消息失败了,判断是否需要重试,若是不需要重试,若有失败处理函数,记录下来失败信息
$this->failedHandler($e, $jobInfo);
die($e->getMessage()) . PHP_EOL;
} catch (\Throwable $e) {
die($e->getMessage()) . PHP_EOL; //系统错误, 退出
}
}
}
public function failedHandler(\Exception $e, array $jobInfo)
{
if (class_exists($jobInfo['commandName'])) {
$consumerInstance = new $jobInfo['commandName']($jobInfo['data']);
if (method_exists($consumerInstance, $this->failedHandlerMethod)) {
//调用失败处理函数
$failedMethod = $this->failedHandlerMethod;
$consumerInstance->$failedMethod($e);
}
}
}
}
4. Резюме
Реализация полнофункциональной, надежной и высокодоступной очереди сообщений Redis не так проста, и необходимо учитывать множество факторов. Из-за потребностей бизнеса сам redis претерпел период исторического развития, и в настоящее времяstream
Это относительно надежное решение для организации очереди сообщений. традиционное использованиеlist
+ zset
Оценка сообщений Redis реализована в Enterprise Service в течение многих лет, и еще не полностью заменена - это зерно правды. Этот документ анализирует проблемы, с которыми сталкиваются традиционные реализации очереди сообщений, что касается этих проблем, принять решение Да. В то время как традиционная очередь сообщения Redisstream
Заменить, но, изучая этот раздел, мы понимаем реализацию идеи по очереди сообщений, и понять некоторые из комплексной методологии реализации системы, этот процесс все еще очень доволен.