Говоря о дизайне очереди сообщений Redis

Redis

1. Предпосылки

Для достижения системной развязки, пиковых бритья и функции Asynchrony, разработчики Enterprise System System рассмотрит использование очередей сообщений. Рынок известен своей высокой пропускной способностьюKafka, широко используется в финансовых или ордерных системах с абсолютной достоверностью информацииRabbitMQ, это промежуточное ПО для сообщений с открытым исходным кодом очень популярно среди разработчиков. Сегодня автор расскажет вам об очереди сообщений, реализованной в redis, она в основном используется в некоторых бизнес-сценариях, не требующих особо высокой надежности сообщений внутри системы, ее характеристики — малый вес, высокая эффективность и простота.

2. Redis реализует структуру данных и команды очереди сообщений

Redis предоставляет различные структуры данных и соответствующие команды для реализации очередей сообщений, что упрощает реализацию простой очереди сообщений при разработке системы. пройти черезlistСтруктура данных, которая может реализовать простую очередь сообщений.zsetСтруктура данных может реализовать очередь отложенных сообщений; для реализации многоадресного механизма сообщения Redis также предоставляетpublish,subscribeкоманда для подписки и публикации сообщений; добавлен redis5.0streamСтруктура данных реализует более профессиональную очередь сообщений. Сначала представим их по отдельности.

2.1 listРеализовать простую очередь

предоставлено RedislistСама структура данных представляет собойFIFO(первым пришел, первым ушел) очередь, бизнес прошелlistОчередь сообщений можно реализовать, добавляя сообщения в конец очереди производителем и удаляя сообщения из головы очереди потребителем, как показано на следующем рисунке:

redis的list数据结构

Постановка сообщений в очередь может добавлять тело сообщения в очередь с временной сложностью O(1) с помощью lpush или rpush и может запускать потребителя каждый период времени (например, 1 с) для отправки сообщения.rpop/lpopВыйдите для потребления, потребительский php-код выглядит следующим образом:

<?php
$redis = new Redis();

$redis->connect('127.0.0.1',6379);

while($message = $redis->rpop('queue_list_test')) {
    var_dump($message);
    
    //处理完逻辑后,休眠1s
    sleep(1)
}

Сон в коде потребителя очень нужен.Если сна нет, то при пустой очереди queue_list_test цикл while всегда будет устанавливать соединение с сервером redis, напрямую подтягивая ЦП клиентского сервера, а также делая сервер Redis всплеск трафика.

При таком способе обработки потребителями будет определенная задержка в потреблении сообщений, по этой причине Redis также обеспечиваетbrpop/blpopкоманды, ониrpop/lpopБлокирующая версия, то есть, когда в списке нет данных, команда будет блокироваться и ждать, пока не истечет время ожидания или не появится сообщение в очереди.Измененный потребительский php-код выглядит следующим образом:

<?php
$redis = new Redis();

$redis->connect('127.0.0.1',6379);

while($message = $redis->brpop('queue_list_test',3600)) {
    var_dump($message);
}

В приведенном выше примере необходимо получить данные списка, имя ключа которого — queue_list_test.Если данных нет, он будет заблокирован на 3600 секунд.Однако, поскольку расширение redis для php реализовано на основе метода сокета php, если php сам настраивает время ожидания чтения сокета, тайм-аут сообщит об ошибке и завершится.Конфигурация по умолчанию элемента конфигурации тайм-аута по умолчанию для сокета в php.ini:

default_socket_timeout = 60

Хороший способ практики - через phpini_setФункция сбрасывает время тайм-аута сокета в процессе, а php-код для решения проблемы с потребителем ошибки тайм-аута выглядит следующим образом:

<?php
ini_set('default_socket_timeout', -1); //不超时

$redis = new Redis();

$redis->connect('127.0.0.1',6379);

while($message = $redis->brpop('queue_list_test',0)) {
    var_dump($message);
}

Установка тайм-аута brpop на 0 означает, что если в очереди не будет сообщений, сценарий будет ждать бесконечно долго. Это решает проблему отложенного потребления потребителями.

2.2 zsetРеализовать очередь отложенного потребления

в rediszsetа такжеsetТочно так же все они представляют собой наборы строк, и они не позволяют повторяющимся элементам появляться в одномsetсередина. Их отличие состоит в том, что каждый элемент отсортированного множества имеет оценку (score) связан с ним, и Redis сортирует элементы набора от меньшего к большему по количеству баллов. Только представьте, если вы добавите сообщение вzsetструктуру и установите временную метку сообщения, которое будет использовано, в соответствующуюscore, формирует ли он очередь потребления временных рядов? Как показано ниже:

zset数据结构与时间序列的消费队列

Очередь отложенных сообщений реализована следующим образом:

  • продюсер черезzaddКоманда добавляет сообщение в очередь потребления временного ряда,scoreОтметка времени (момент в будущем), когда сообщение должно быть использовано.
  • потребителей черезzrangebyscoreКоманда извлекает сообщения, которые необходимо использовать на данный момент.

延时消费队列实现原理

2.3 PubSubРеализовать многоадресную рассылку сообщений

Использование, о котором мы упоминали ранееlistРеализованы и используются простые очередиzsetРеализованная очередь отложенного потребления не поддерживает многоадресную рассылку сообщений.Если несколько разных групп потребителей хотят использовать сообщения в очереди, они могут последовательно соединить логику группы потребителей только для непрерывного потребления:

消息队列多消费者串联消费

Редис одинPubSubМодуль для поддержки многоадресной рассылки сообщений, то есть PublisherSubscriber (шаблон издатель/подписчик). Многоадресная рассылка сообщений позволяет производителям создавать сообщения только один раз, а промежуточное ПО отвечает за копирование сообщений в несколько очередей сообщений, каждая из которых используется соответствующей группой потребителей:

PubSub实现消息多播

оPubSubСоответствующие команды и методы использования .PubSubУ конструкции есть фатальный недостаток:Сохранение сообщений не поддерживается

PubSubПроизводитель сообщения передает сообщение, и Redis напрямую находит соответствующего получателя для его передачи. Если потребителя также нет, сообщение отбрасывается. Например, есть три потребителя в начале, один потребитель внезапно кладет трубку, производитель будет продолжать отправлять сообщения, а два других потребителя могут продолжать получать сообщения, но при повторном подключении зависшего потребителя соединение разрывается. период сообщение, отправленное производителем, полностью теряется для потребителя. Если Redis аварийно завершает работу или перезагружается, это означает, что потребителя больше нет, и все сообщения будут отбрасываться напрямую.PubSubСообщение не является постоянным.

из-заPubSubС этими недостатками практически невозможно найти сценарии его применения в области очередей сообщений!

Автор Redis также начал отдельный проектDisqueОн специально используется для очередей сообщений, но он еще не разработан и находится в бета-версии. К июню 2018 года Redis5.0 добавилStreamСтруктура данных, эта функция переносит в Redis постоянную очередь сообщений, с тех порPubSubФункция как очередь сообщений исчезла в толпе,DisqueРелизной версии никогда не будет.

2.4 Мощная постоянная очередь сообщений, поддерживающая многоадресную рассылку сообщений ——stream

Самая большая особенность Redis 5.0 — добавление новой структуры данных.streamВ качестве очереди сообщений он в значительной степени опирается на дизайн Kafka, и с тех пор Redis имеет действительно профессиональную очередь сообщений. Структура показана на следующем рисунке:

Stream消息对了架构

streamСама структура представляет собой массив очереди уникальной по времени последовательности, аналогичной очереди сообщений временных рядов, о которой мы упоминали выше с zset. Каждое сообщение имеет идентификатор сообщения в видеtimestampInMillis-sequence, например, 1607226700982-5, что указывает на то, что текущее сообщение генерируется с отметкой времени 1607226700982 в миллисекундах и является 5-м сообщением в течение этой миллисекунды.Идентификатор сообщения может быть автоматически сгенерирован сервером или указан самим клиентом (тем самым вы Можно заметить, что Stream поддерживает функцию отсрочки задач), но форма должна быть «целое-целое», а идентификатор добавленного позже сообщения должен быть больше идентификатора предыдущего сообщения.

Каждый поток имеет уникальное имя, которое является ключом Redis.При первом использовании командыxaddАвтоматически создается по команде. Как видно из рисунка, каждый поток может быть связан с несколькими группами потребителей, и каждая группа потребителей будет иметь курсорlast_deliverred_idПереместиться вперед в очереди массива Stream, указав, какое сообщение использовала текущая группа потребителей.streamВнутреннее имя также уникально, и группа потребителей должна пройтиxgroup_createСоздается отдельно и указывается для запуска потребления с идентификатора сообщения, который соответствует группе внутреннего потребления.last_deliverred_idПеременная.

Состояния групп потребителей независимы и не зависят друг от друга, каждое сообщение Stream будет доставлено каждой группе потребителей, и каждая группа потребителей может монтировать несколько потребителей (Consumer), между этими потребителями существуют конкурентные отношения, любой потребитель, прочитавший сообщение, сделает курсорlast_deliverred_idДвигаясь вперед, каждый потребитель также имеет уникальное имя в группе.

Переменные состояния внутри потребителяpending_idsИспользуется для записи того, какие сообщения в данный момент прочитаны клиентом, но не были подтверждены. этоpending_idsЭта переменная используется для того, чтобы клиент использовал сообщение хотя бы один раз и не осталось необработанным из-за потери при передаче по сети.

оstreamДизайнstream, это лучшее решение для Redis для создания очереди сообщений в будущем!

3. Традиционный дизайн очереди сообщений Redis

Поскольку Redis 5.0 был только что запущен, по историческим причинам большинство очередей Redis в корпоративной системе используют традиционную структуру очереди сообщений, то есть используют традиционную структуру очереди сообщений.list + zsetРеализация структурной инкапсуляции.В традиционной схеме проектирования очередей Redis стоит изучить много идей для решения проблем.В этом разделе мы поговорим о проблемах, с которыми сталкиваются традиционные очереди сообщений Redis, и их решениях.

3.1 Интеграция «отложенных» очередей и «немедленных» очередей

Как мы уже говорили, традиционная очередь сообщений Redis используетlistДелайте очередь мгновенного потребления, проходитеlpop/rpopилиblpop/brpopкоманда для отмены данных о потреблении; используйтеzsetДелайте отложенную очередь потребления, проходитеzrangebyscoreКоманда для получения данных о потреблении. Но потребителям все равно, каким способом получать данные о потреблении из очереди. Хорошей практикой является инкапсуляция метода и предоставление его потребителю, чтобы каждый раз ему нужно было только извлекать данные из очереди для потребления.Обычная реализация заключается в том, что каждый раз, когда потребитель извлекает данные из очереди с помощью этого метода, система Первый изzsetПеренести просроченные задачи из очереди задержки вlistСразу очередь, потом проходlpop/rpopБерем данные из очереди и возвращаем их потребителю, как показано на схеме:

封装后的消费者

Следует отметить, что процесс инкапсуляции системного метода выборки сообщений из очереди и переноса просроченных сообщений из очереди с отложенной в немедленную очередь не похож наlpop/rpopТо же самое атомарно, чтобы обеспечить параллельную безопасность системы, требуется сотрудничество.luascript для выполнения задания, пример кода выглядит следующим образом:

/**
     * Get the Lua script to migrate expired jobs back onto the queue.
     *
     * KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
     * KEYS[2] - The queue we are moving jobs to, for example: queues:foo
     * ARGV[1] - The current UNIX timestamp
     *
     * @return string
     */
    public static function migrateExpiredJobs()
    {
        return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
    end
end

return val
LUA;
    }

вышесказанноеluaсценарий будетzrangebyscore,zremrangebyrank,rpushТри инструкции redis упакованы вместе, чтобы обеспечить атомарность переноса сообщений, чтобы наша система могла безопасно включать несколько потребителей.

Производителям также необходимо инкапсулировать системный подход, благодаряzadd,lpush/rpushИнструкции являются атомарными.Нам нужно только определить, нужно ли отложить сообщение для выполнения, а затем вызвать команду, чтобы поместить задержанное сообщение в очередь с задержкой и поместить мгновенное сообщение в немедленную очередь:

redis延时队列生产者

3.2 Реализовать «механизм подтверждения» и «повторную попытку тайм-аута» сообщений

Мы упоминали ранееKafkaа такжеRabbitMQЭти профессиональные очереди сообщенийackмеханизм подтверждения сообщения.ackМеханизм гарантирует, что сообщения в очереди потребляются хотя бы один раз, обычно очередь обеспечиваетРучное подтверждение,АвтоматическиДва механизма, которые должны использовать потребители, предъявляющие высокие требования к надежности сообщений.Ручной механизм подтверждения. Если традиционная очередь сообщений Redis должна быть реализованаackмеханизм, первое, что приходит на ум, это использоватьsetСтруктура данных, которая добавляет сообщения, полученные потребителями, вsetВ коллекции, когда потребитель закончит обработку сообщения, он отправит сообщение изsetУдалить из коллекции, готовоack.

Ранее мы использовалиzsetсделать очередь задержки,zsetСлишкомsetи предоставляет значение атрибута ранжированияscore, если мы используемzsetчтобы завершить нашу систему очередиack, вы также можете использоватьscoreВыполните еще одну полезную функцию:тайм-аут повторной попытки. Как показано ниже:

带有ack机制的消息队列

Система сообщений Redis с добавлением «повторной попытки тайм-аута» и «механизма подтверждения» намного сложнее, чем раньше. Когда потребитель берет задачу из системы сообщений с помощью системного метода, система сообщений сначала переносит задержанную очередь, сообщения с истекшим сроком действия и сообщения без подтверждения из очереди подтверждения в немедленную очередь (чтобы обеспечить безопасность параллелизма в системе, эти два процесса сообщения о миграцииluaСценарий завершен); перед тем, как сообщение будет передано потребителю, система сообщений должна добавить сообщение в очередь подтверждения и установить время повторной попытки тайм-аута.retry_after_time, то есть если сообщение переданоretry_after_timeВремя все еще не подтверждено, затем сообщение будет повторно перенесено в мгновенную очередь потребителя снова; нормальная обработка после того, как потребитель должен быть передан сообщению обработчиком сообщений, полные данные потребления, подтверждение (то есть из очереди подтверждения удаляются сообщения ).

3.3 «Изящный перезапуск» для потребителей

несмотря на то чтоackЭто может гарантировать, что сообщения в очереди могут быть использованы хотя бы один раз, но обработка больших задач в бизнес-сценариях часто занимает много времени.После завершения потребления перезапустите потребителя, который мы называем потребителем.изящный перезапуск.

Читатели, знакомые с nginx, должны знать, чтоnginx -s reloadNginx для обеспечения того, чтобы в то же время предлагать услуги по перезагрузке последнего файла конфигурации и перезапуску дочернего процесса. Его принцип работы заключается в том, что через сигналы монитора главного процесса, передаваемые дочернему процессу, дочерний процесс не завершает работу с текущей службой, а затем перезапускает новый главный дочерний процесс. Так что nginx до и после перезапуска через команду номер основного процесса не изменился.

То же самое относится и к потребителям очередей. «Элегантный перезапуск» может быть достигнут путем мониторинга сигналов. Различные языки программирования предоставляют разные функции обработки сигналов. Ниже приведен пример php.pcntlРасширение для реализации обработки сигналов, следующая логика реализации основного кода:

class Worker
{
    public $shouldQuit = false;
    
    public function daemon(string $queueName = '')
    {
        $this->listenForSignals();

        while (!$this->shouldQuit) {
            //这里是业务的处理逻辑
        }
    }
    
    //监听信号
    protected function listenForSignals()
    {
        if ($this->supportsAsyncSignals()) {
            pcntl_async_signals(true);

            pcntl_signal(SIGTERM, function () {
                $this->shouldQuit = true;
            });
            pcntl_signal(SIGUSR2, function () {
                $this->shouldQuit = true;
            });
            pcntl_signal(SIGCONT, function () {
                $this->shouldQuit = true;
            });
        }
    }
}

В приведенном выше коде, когда мы используем команду kill для уничтожения номера процесса-потребителя, программа-потребитель получит сигнал, установит для следующего условия цикла значение false и завершит работу после обработки текущей задачи, чтобы добиться «мягкого перезапуска». Однако в развитии бизнеса часто используетсяsupervisorуправлять потребителями,supervisorВы также можете взаимодействовать с потребителями через сигналы и рекомендовать статью читателям.Заинтересованные могут учиться друг у друга:Использование супервизора в проектах PHP

3.4 Обработка сообщений об ошибках

пройти черезackМеханизм гарантирует, что сообщение будет использовано хотя бы один раз, или его можно будет корректно перезапустить, прослушивая сигнал потребителя, всегда будет сообщение об ошибке потребления, что неизбежно в бизнесе. Для очереди сообщений, реализованной Redis, мы можем установить функцию обработки сбоев для потребителя.Когда сообщение терпит неудачу, программа-потребитель решает, как с ним обращаться (обычно сообщение об ошибке добавляется в базу данных для последующего ручного исследования). Ниже приведена общая логика реализации php-кода:

class Worker
{
    public function daemon(string $queueName = '')
    {
        while (!$this->shouldQuit) {
            try {
                //这里是业务的处理逻辑
            } catch (\Exception $e) {
                //消息失败了,判断是否需要重试,若是不需要重试,若有失败处理函数,记录下来失败信息
                $this->failedHandler($e, $jobInfo);
                die($e->getMessage()) . PHP_EOL;
            } catch (\Throwable $e) {
                die($e->getMessage()) . PHP_EOL;       //系统错误, 退出
            }
        }
    }
    
    public function failedHandler(\Exception $e, array $jobInfo)
    {
        if (class_exists($jobInfo['commandName'])) {
            $consumerInstance = new $jobInfo['commandName']($jobInfo['data']);
            if (method_exists($consumerInstance, $this->failedHandlerMethod)) {
                //调用失败处理函数
                $failedMethod = $this->failedHandlerMethod;
                $consumerInstance->$failedMethod($e);
            }
        }
    }
}

4. Резюме

Реализация полнофункциональной, надежной и высокодоступной очереди сообщений Redis не так проста, и необходимо учитывать множество факторов. Из-за потребностей бизнеса сам redis претерпел период исторического развития, и в настоящее времяstreamЭто относительно надежное решение для организации очереди сообщений. традиционное использованиеlist + zsetОценка сообщений Redis реализована в Enterprise Service в течение многих лет, и еще не полностью заменена - это зерно правды. Этот документ анализирует проблемы, с которыми сталкиваются традиционные реализации очереди сообщений, что касается этих проблем, принять решение Да. В то время как традиционная очередь сообщения RedisstreamЗаменить, но, изучая этот раздел, мы понимаем реализацию идеи по очереди сообщений, и понять некоторые из комплексной методологии реализации системы, этот процесс все еще очень доволен.