Приложение Redis — ограничение тока

Redis

серия статей

В сценариях с высоким параллелизмом есть три мощные системы защиты: кэширование, переход на более раннюю версию и ограничение тока. Целью кэша является повышение скорости доступа к системе и увеличение емкости, с которой может справиться система; переход на более раннюю версию — временное экранирование в случае сбоя службы или влияния на производительность основного процесса. В некоторых сценариях необходимо ограничить количество одновременных запросов, таких как всплески, захваты, публикации, комментарии, вредоносные поисковые роботы и т. д.

Алгоритм ограничения тока Общие алгоритмы ограничения тока включают счетчики, дырявые ведра и ведра с маркерами.

прилавок

Как следует из названия, это запись по одной, а затем оценка того, превышает ли число в ограниченном временном окне лимит.

function isActionAllowed($userId, $action, $period, $maxCount) 
{
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6379);
    $key = sprintf('hist:%s:%s', $userId, $action);
    $now = msectime();   # 毫秒时间戳

    $pipe=$redis->multi(Redis::PIPELINE); //使用管道提升性能
    $pipe->zadd($key, $now, $now); //value 和 score 都使用毫秒时间戳
    $pipe->zremrangebyscore($key, 0, $now - $period); //移除时间窗口之前的行为记录,剩下的都是时间窗口内的
    $pipe->zcard($key);  //获取窗口内的行为数量
    $pipe->expire($key, $period + 1);  //多加一秒过期时间
    $replies = $pipe->exec();
    return $replies[2] <= $maxCount;
}
for ($i=0; $i<20; $i++){
    var_dump(isActionAllowed("110", "reply", 60*1000, 5)); //执行可以发现只有前5次是通过的
}

//返回当前的毫秒时间戳
function msectime() {
    list($msec, $sec) = explode(' ', microtime());
    $msectime = (float)sprintf('%.0f', (floatval($msec) + floatval($sec)) * 1000);
    return $msectime;
 }

дырявое ведро

Идея алгоритма Leaky Bucket очень проста: вода (запрос) сначала поступает в дырявое ведро, а дырявое ведро выходит из воды с определенной скоростью (у интерфейса есть скорость отклика) rate), а затем отклоняется запрос, видно, что алгоритм дырявого ведра может принудительно ограничивать скорость передачи данных.Схема выглядит следующим образом:

Конкретный код реализован следующим образом

<?php

class Funnel {

    private $capacity;
    private $leakingRate;
    private $leftQuote;
    private $leakingTs;

    public function __construct($capacity, $leakingRate)
    {
        $this->capacity = $capacity;    //漏斗容量
        $this->leakingRate = $leakingRate;//漏斗流水速率
        $this->leftQuote = $capacity; //漏斗剩余空间
        $this->leakingTs = time(); //上一次漏水时间
    }

    public function makeSpace()
    {
        $now = time();
        $deltaTs = $now-$this->leakingTs; //距离上一次漏水过去了多久
        $deltaQuota = $deltaTs * $this->leakingRate; //可腾出的空间
        if($deltaQuota < 1) {  
            return;
        }
        $this->leftQuote += $deltaQuota;   //增加剩余空间
        $this->leakingTs = time();         //记录漏水时间
        if($this->leftQuota > $this->capacaty){
            $this->leftQuote = $this->capacity;
        }
    }

    public function watering($quota)
    {
        $this->makeSpace(); //漏水操作
        if($this->leftQuote >= $quota) {
            $this->leftQuote -= $quota;
            return true;
        }
        return false;
    }
}


$funnels = [];
global $funnel;

function isActionAllowed($userId, $action, $capacity, $leakingRate)
{
    $key = sprintf("%s:%s", $userId, $action);
    $funnel = $GLOBALS['funnel'][$key] ?? '';
    if (!$funnel) {
        $funnel  = new Funnel($capacity, $leakingRate);
        $GLOBALS['funnel'][$key] = $funnel;
    }
    return $funnel->watering(1);
}

for ($i=0; $i<20; $i++){
    var_dump(isActionAllowed("110", "reply", 15, 0.5)); //执行可以发现只有前15次是通过的
}

Основной логикой является makeSpace, который вызывается перед каждым поливом, чтобы вызвать утечку и освободить место для воронки. Для воронок мы можем использовать хеш-структуру в Redis для хранения соответствующих полей.При поливе поля извлекаются для логических операций, а затем сохраняются в хеш-структуре для завершения определения частоты поведения. Но есть проблема в том, что атомарность всего процесса не может быть гарантирована, а значит, его нужно контролировать блокировкой, а если блокировка не сработает, то ее нужно повторить или отказаться от нее, что приведет к деградации производительности и повлияет на пользовательский опыт и сложность кода также повышены, в настоящее время Redis предоставляет подключаемый модуль, и появляется Redis-Cell.

Redis-Cell

Redis 4.0 предоставляет модуль Redis для ограничения тока с именем redis-cell, который предоставляет алгоритм воронки и предоставляет атомарные инструкции по ограничению тока.

Этот модуль имеет только одну инструкцию cl.throttle, их аргументы и возвращаемые значения сложны.

> cl.throttle tom:reply 14 30 60 1
1) (integer) 0    # 0表示允许,1表示拒绝
2) (integer) 15    # 漏斗容量capacity
3) (integer) 14    # 漏斗剩余空间left_quota
4) (integer) -1    # 如果拒绝了,需要多长时间后再重试,单位秒
5) (integer) 2    # 多长时间后,漏斗完全空出来,单位秒

Эта инструкция означает, что частота поведения ответа пользователя tom может быть до 30 раз каждые 60 секунд, начальная емкость воронки равна 15 (поскольку она начинается с 0, а это от 15 до 14), а пространство по умолчанию, занимаемое каждым поведением, равно 1 (необязательный параметр). Если он отклонен, возьмите четвертое значение возвращаемого массива и выполните спящий режим, который можно использовать в качестве времени повтора или асинхронно запланированных задач для повторения.

корзина с жетонами

Алгоритм Token Bucket имеет тот же эффект, что и алгоритм Leaky Bucket, но в противоположном направлении, что легче понять: со временем система будет переходить в корзину с постоянным интервалом времени 1/QPS (если QPS=100). , интервал 10мс).Добавить в него Токен (Представьте, что есть кран, который постоянно подливает воду), если ведро полное, то оно не будет добавлено. При поступлении нового запроса каждый возьмет по Токену, а если Токена нет, он будет заблокирован или отказ в обслуживании.

Еще одним преимуществом ведра токенов является то, что скорость может быть легко изменена.Как только скорость необходимо увеличить, скорость токенов, помещаемых в ведро, может быть увеличена по мере необходимости.Как правило, в ведро добавляется определенное количество токенов. через равные промежутки времени (например, 100 миллисекунд). Некоторые варианты алгоритма вычисляют количество токенов, которое должно быть увеличено в режиме реального времени.

Конкретная реализация может относиться кphp реализовать алгоритм управления потоком на основе Redis Token Bucket

<?php
class TrafficShaper
{ 
    private $_config; // redis设定
    private $_redis;  // redis对象
    private $_queue;  // 令牌桶
    private $_max;    // 最大令牌数

    /**
     * 初始化
     * @param Array $config redis连接设定
     */
    public function __construct($config, $queue, $max)
    {
        $this->_config = $config;
        $this->_queue = $queue;
        $this->_max = $max;
        $this->_redis = $this->connect();
    }

    /**
     * 加入令牌
     * @param Int $num 加入的令牌数量
     * @return Int 加入的数量
     */
    public function add($num = 0)
    {
        // 当前剩余令牌数
        $curnum = intval($this->_redis->lSize($this->_queue));
        // 最大令牌数
        $maxnum = intval($this->_max);
        // 计算最大可加入的令牌数量,不能超过最大令牌数
        $num = $maxnum >= $curnum + $num ? $num : $maxnum - $curnum;
        // 加入令牌
        if ($num > 0) {
            $token = array_fill(0, $num, 1);
            $this->_redis->lPush($this->_queue, ...$token);
            return $num;
        }
        return 0;
    }

    /**
     * 获取令牌
     * @return Boolean
     */
    public function get()
    {
        return $this->_redis->rPop($this->_queue) ? true : false;
    }

    /**
     * 重设令牌桶,填满令牌
     */
    public function reset()
    {
        $this->_redis->delete($this->_queue);
        $this->add($this->_max);
    }

    private function connect()
    {
        try {
            $redis = new Redis();
            $redis->connect($this->_config['host'], $this->_config['port'], $this->_config['timeout'], $this->_config['reserved'], $this->_config['retry_interval']);
            if (empty($this->_config['auth'])) {
                $redis->auth($this->_config['auth']);
            }
            $redis->select($this->_config['index']);
        } catch (\RedisException $e) {
            throw new Exception($e->getMessage());
            return false;
        }
        return $redis;
    }
} 

$config = array(
    'host' => 'localhost',
    'port' => 6379,
    'index' => 0,
    'auth' => '',
    'timeout' => 1,
    'reserved' => NULL,
    'retry_interval' => 100,
);
// 令牌桶容器
$queue = 'mycontainer';
 // 最大令牌数
$max = 5;
// 创建TrafficShaper对象
$oTrafficShaper = new TrafficShaper($config, $queue, $max);
// 重设令牌桶,填满令牌
$oTrafficShaper->reset();
// 循环获取令牌,令牌桶内只有5个令牌,因此最后3次获取失败
for ($i = 0; $i < 8; $i++) {
    var_dump($oTrafficShaper->get());
}
// 加入10个令牌,最大令牌为5,因此只能加入5个
$add_num = $oTrafficShaper->add(10);
var_dump($add_num);
// 循环获取令牌,令牌桶内只有5个令牌,因此最后1次获取失败
for ($i = 0; $i < 6; $i++) {
    var_dump($oTrafficShaper->get());
}
?>

Эта статья также опубликована в общедоступной учетной записи WeChat [Информация Xiaodao]. Пожалуйста, отсканируйте код, чтобы следовать!