серия статей
Асинхронная очередь сообщений
Говоря об очередях сообщений, вы обязательно подумаетеKafka
,Rabbitmq
Такие как промежуточное программное обеспечение для сообщений, эти профессиональные промежуточные программы для сообщений предоставляют множество функциональных возможностей, конечно, его развертывание, использование и обслуживание более проблематичны. Если у вас нет таких высоких требований к очередям сообщений и вы хотите быть легковесными, правильно использовать Redis.
Redis черезlist
Структура данных для реализации очереди сообщений.В основном используются следующие команды:
- lpush и rpush в очередь
- lpop и rpop удалены из очереди
- Blpop и Brpop Blocking Dequeue
Не много ерунды, поговорим о коде:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
//发送消息
$redis->lPush($list, $value);
//消费消息
while (true) {
try {
$msg = $redis->rPop($list);
if (!$msg) {
sleep(1);
}
//业务处理
} catch (Exception $e) {
echo $e->getMessage();
}
}
В приведенном выше коде есть проблема: если очередь пуста в течение длительного времени, поп не будет непрерывно зацикливаться, что приведет к увеличению QPS redis и повлияет на производительность. Поэтому мы используемsleep
Чтобы решить, заблокируйте на период времени, когда нет сообщения. Но на самом деле это приведет к другой проблеме, котораяsleep
Это может привести к увеличению задержки обработки сообщений. Мы можем решить эту проблему,blpop/brpop
чтобы заблокировать очередь чтения.
blpop/brpop
Когда в очереди нет данных, он немедленно встенет в состояние сна, и как только прибудет данные, он немедленно проснутся. Задержка сообщений почти ноль. Замена предыдущего LPOP / RPOP с BLPOP / BRPOP решает вышеуказанную проблему идеально.
Еще один момент, который следует отметить, заключается в том, что нам нужно использоватьtry/catch
Для захвата исключения, если он заблокирован все время, сервер Redis обычно активно отключает пустую ссылку, чтобы уменьшить занятость простаивающих ресурсов.
очередь задержки
Сталкиваетесь ли вы со следующими сценариями, когда работаете над проектом электронной коммерции:
- Если пользователь не оплатил более одного часа после оформления заказа, заказ необходимо закрыть
- Если комментарий к заказу не комментировался в течение 7 дней, система должна автоматически сгенерировать комментарий
На этот раз нам нужно использовать очередь задержки, название предполагает необходимость задержки выполнения. Redis отzset
реализовать. Мы можем установить значение отсортированного набора для нашей задачи сообщений, установить оценку значения для времени истечения срока действия сообщения, а затем опросить, чтобы получить сообщения с истекшим сроком действия в отсортированном наборе для обработки.
Код реализации выглядит следующим образом:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->zAdd($delayQueue,$tts, $value);
while(true) {
try{
$msg = $redis->zRangeByScore($delayQueue,0,time(),0,1);
if($msg){
continue;
}
//删除消息
$ok = $redis->zrem($delayQueue,$msg);
if($ok){
//业务处理
}
} catch(\Exception $e) {
}
}
Здесь возникает еще одна проблема: одна и та же задача может быть выбрана несколькими процессами, а затем зашифрована с помощью zrem, а те процессы, которые не получили все, получают задачу просто так, что является пустой тратой времени. Решение: поставитьzrangebyscore
иzrem
Используйте сценарии lua для выполнения атомарных операций, чтобы не возникало таких потерь, когда несколько процессов конкурируют за задачи.
Эта статья также опубликована в общедоступной учетной записи WeChat [Информация Xiaodao]. Пожалуйста, отсканируйте код, чтобы следовать!