Redis advanced — очередь сортировки и задержки

Redis
Redis advanced — очередь сортировки и задержки

В последнее время я интенсивно изучаю Redis и пишу, изучая

Как перед чтением, выработайте привычку

1. Инструкция по использованию типа SortSet

Zset — это, вероятно, самая отличительная структура данных, предоставляемая Redis, и это также структура данных, которую интервьюеры любят задавать в интервью.

  • С одной стороны это набор, гарантирующий уникальность значения,
  • С одной стороны, он может дать каждому значению оценку, которая представляет собой вес сортировки.

Его внутренняя реализация использует структуру данных, называемую «список переходов».

2. Общие команды SortSet

После удаления последнего значения в zset структура данных автоматически удаляется, а память освобождается.

> zadd books 9.0 "think in java"
> zadd books 8.9 "java concurrency"
> zadd books 8.6 "java cookbook"

> zrange books 0 -1     # 按 score 排序列出,参数区间为排名范围
1) "java cookbook"
2) "java concurrency"
3) "think in java"

> zrevrange books 0 -1  # 按 score 逆序列出,参数区间为排名范围
1) "think in java"
2) "java concurrency"
3) "java cookbook"

> zcard books           # 相当于 count()
(integer) 3

> zscore books "java concurrency"   # 获取指定 value 的 score
"8.9000000000000004"                # 内部 score 使用 double 类型进行存储,所以存在小数点精度问题

> zrank books "java concurrency"    # 排名
(integer) 1

> zrangebyscore books 0 8.91        # 根据分值区间遍历 zset
1) "java cookbook"
2) "java concurrency"

> zrangebyscore books -inf 8.91 withscores  # 根据分值区间 (-∞, 8.91] 遍历 zset,同时返回分值。inf 代表 infinite,无穷大的意思。
1) "java cookbook"
2) "8.5999999999999996"
3) "java concurrency"
4) "8.9000000000000004"

> zrem books "java concurrency"             # 删除 value
(integer) 1
> zrange books 0 -1
1) "java cookbook"
2) "think in java"

3. Сценарии использования

Таблица лидеров

  • Список поклонников, значение — идентификатор пользователя поклонника, а оценка — время внимания.
  • Видеосайты должны ранжировать видео, загружаемые пользователями, и поддержание рейтинга может быть многогранным: по времени, по количеству просмотров, по количеству полученных лайков и т. д.

Я случайно написал это раньше«Список популярных статей по практике Redis»хороший пример

Очередь по весу/Очередь с задержкой

Например: заказать сверхурочную работу без оплаты, отменить заказ, восстановить запасы.

Рекомендуется использовать kafka, профессиональный MQ, если есть жесткие требования к очередям сообщений (нельзя потерять). Эти профессиональные сообщения промежуточного программного обеспечения обеспечивают множество функциональных возможностей, конечно, его развертывание, использование и обслуживание являются более хлопотными. Если у вас нет таких высоких требований к очередям сообщений и вы хотите быть легковесными, правильно использовать Redis.

PHP — пример отложенной очереди

Как следующий код повышает надежность:

  • function run()требует lua для реализации原子性действовать
  • Рассмотрите возможность создания очередей задач и очередей выполнения в постоянных базах данных, таких как MySQL и MongoDB, для повышения отказоустойчивости.
<?php

trait RedisConnectTrait
{
    private $servers = array();
    private $instances = array();

    /**
     * 设置 Redis 配置
     * @param array $serversConfig
     * @return $this
     * @throws Exception
     */
    private function setServers( array $serversConfig = [ [ '127.0.0.1', 6379, 0.01 ] ] )
    {
        if ( !$serversConfig )
            throw new \Exception( 'Redis链接配置不能为空', false );

        $this->servers = $serversConfig;
        return $this;
    }

    private function initInstances()
    {
        if (empty($this->instances)) {
            foreach ($this->servers as $server) {
                list($host, $port, $timeout) = $server;

                $redis = new \Redis();
                $redis->connect($host, $port, $timeout);
                // $redis->select( ['index'] );

                $this->instances[] = $redis;
            }
        }
    }
}

class RedisDelayQueueUtil
{
    use RedisConnectTrait;

    const QUEUE_PREFIX = 'delay_queue:';
    protected $redis = null;
    protected $key = '';

    public function __construct( string $queueName, array $config = [] )
    {
        $instances = $this->setServers( $config )->initInstances();

        $this->key = self::QUEUE_PREFIX . $queueName;
        $this->redis = $instances[ 0 ];
        // $this->redis->auth($config['auth']);
    }

    public function delTask($value)
    {
        return $this->redis->zRem($this->key, $value);
    }

    public function getTask()
    {
        //获取任务,以0和当前时间为区间,返回一条记录
        return $this->redis->zRangeByScore( $this->key, 0, time(), [ 'limit' => [ 0, 1 ] ] );
    }

    public function addTask($name, $time, $data)
    {
        //添加任务,以时间作为score,对任务队列按时间从小到大排序
        return $this->redis->zAdd(
            $this->key,
            $time,
            json_encode([
                'task_name' => $name,
                'task_time' => $time,
                'task_params' => $data,
            ], JSON_UNESCAPED_UNICODE )
        );
    }

    public function run()
    {
        //每次只取一条任务
        $task = $this->getTask();
        if (empty($task)) {
            return false;
        }

        $task = $task[0];
        if ($this->delTask($task)) {
            $task = json_decode($task, true);
            //处理任务
            echo '任务:' . $task['task_name'] . ' 运行时间:' . date('Y-m-d H:i:s') . PHP_EOL;

            return true;
        }

        return false;
    }
}

$dq = new RedisDelayQueueUtil('close_order', [
    'host' => '127.0.0.1',
    'port' => 6379,
    'auth' => '',
    'timeout' => 60,
]);

$dq->addTask('close_order_111', time() + 30, ['order_id' => '111']);
$dq->addTask('close_order_222', time() + 60, ['order_id' => '222']);
$dq->addTask('close_order_333', time() + 90, ['order_id' => '333']);

set_time_limit(0);

$i2Count = 0;
while ( 10 < $i2Count ) {
    $dq->run();
    usleep(100000);
    $i2Count++;
}