Приложение Redis — асинхронная очередь сообщений и очередь задержки

Redis

серия статей

Асинхронная очередь сообщений

Говоря об очередях сообщений, вы обязательно подумаетеKafka,RabbitmqТакие как промежуточное программное обеспечение для сообщений, эти профессиональные промежуточные программы для сообщений предоставляют множество функциональных возможностей, конечно, его развертывание, использование и обслуживание более проблематичны. Если у вас нет таких высоких требований к очередям сообщений и вы хотите быть легковесными, правильно использовать Redis.

Redis черезlistСтруктура данных для реализации очереди сообщений.В основном используются следующие команды:

  • lpush и rpush в очередь
  • lpop и rpop удалены из очереди
  • Blpop и Brpop Blocking Dequeue

Не много ерунды, поговорим о коде:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

//发送消息
$redis->lPush($list, $value);

//消费消息
while (true) {
    try {
        $msg = $redis->rPop($list);
        if (!$msg) {
            sleep(1);
        }
        //业务处理
     
    } catch (Exception $e) {
        echo $e->getMessage();
    }
}

В приведенном выше коде есть проблема: если очередь пуста в течение длительного времени, поп не будет непрерывно зацикливаться, что приведет к увеличению QPS redis и повлияет на производительность. Поэтому мы используемsleepЧтобы решить, заблокируйте на период времени, когда нет сообщения. Но на самом деле это приведет к другой проблеме, котораяsleepЭто может привести к увеличению задержки обработки сообщений. Мы можем решить эту проблему,blpop/brpopчтобы заблокировать очередь чтения.

blpop/brpopКогда в очереди нет данных, он немедленно встенет в состояние сна, и как только прибудет данные, он немедленно проснутся. Задержка сообщений почти ноль. Замена предыдущего LPOP / RPOP с BLPOP / BRPOP решает вышеуказанную проблему идеально.

Еще один момент, который следует отметить, заключается в том, что нам нужно использоватьtry/catchДля захвата исключения, если он заблокирован все время, сервер Redis обычно активно отключает пустую ссылку, чтобы уменьшить занятость простаивающих ресурсов.

очередь задержки

Сталкиваетесь ли вы со следующими сценариями, когда работаете над проектом электронной коммерции:

  • Если пользователь не оплатил более одного часа после оформления заказа, заказ необходимо закрыть
  • Если комментарий к заказу не комментировался в течение 7 дней, система должна автоматически сгенерировать комментарий

На этот раз нам нужно использовать очередь задержки, название предполагает необходимость задержки выполнения. Redis отzsetреализовать. Мы можем установить значение отсортированного набора для нашей задачи сообщений, установить оценку значения для времени истечения срока действия сообщения, а затем опросить, чтобы получить сообщения с истекшим сроком действия в отсортированном наборе для обработки.

Код реализации выглядит следующим образом:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$redis->zAdd($delayQueue,$tts, $value);


while(true) {
    try{
        $msg = $redis->zRangeByScore($delayQueue,0,time(),0,1);
        if($msg){
            continue;
        }
        //删除消息
        $ok = $redis->zrem($delayQueue,$msg);
        if($ok){
            //业务处理
        }
    } catch(\Exception $e) {

    }
}

Здесь возникает еще одна проблема: одна и та же задача может быть выбрана несколькими процессами, а затем зашифрована с помощью zrem, а те процессы, которые не получили все, получают задачу просто так, что является пустой тратой времени. Решение: поставитьzrangebyscoreиzremИспользуйте сценарии lua для выполнения атомарных операций, чтобы не возникало таких потерь, когда несколько процессов конкурируют за задачи.

Эта статья также опубликована в общедоступной учетной записи WeChat [Информация Xiaodao]. Пожалуйста, отсканируйте код, чтобы следовать!