Алгоритм ограничения тока — базовая реализация

алгоритм

Существует три мощных инструмента для защиты системы при разработке системы с высокой степенью параллелизма:Кэш, даунгрейд, текущий лимит, сегодня мы поговорим о限流

тайник: Кэш предназначен для повышения скорости доступа к системе и увеличения вычислительной мощности системы.понижение рейтинга: Даунгрейд — это стратегическое понижение некоторых служб и страниц в соответствии с текущей деловой ситуацией и трафиком, когда нагрузка на сервер резко возрастает, чтобы высвободить ресурсы сервера для обеспечения нормальной работы основных задач.Ограничение: Целью ограничения тока является защита системы путем ограничения скорости одновременного доступа/запросов или ограничения скорости запросов в пределах временного окна.По достижении предела скорости он может отказать в обслуживании, поставить в очередь или ждать, понизить версию, и т.п.

Я в основном использую таймеры для реализации следующих алгоритмовTimer,Вообще-то про время,может и не нужен таймер,видноGuavaизRateLimiter, преимущество таймера в том, что мне не нужно иметь дело с временной логикой, но он должен использовать поток для выполнения логики.Когда нагрузка на логические вычисления слишком высока, поток не может справиться с этим, и эффект не хорошо. Вы можете использовать егоScheduledThreadPoolExecutorПул потоков для выполнения, уменьшите давление

Также использовал много队列Структура данных, поскольку для большинства моделей производитель-потребитель требуются очереди, характеристики «первым поступил — первым обслужен».

Первый раздел предназначен для создания среды, написания требований, требований к интерфейсу и тестовых примеров, а следующие четыре раздела — это основные алгоритмы.

1. Строительство окружающей среды

мы моделируемFilter#doFilterИнтерфейс протестирован и все реализованоAbstractLimiter#limitметод

Filterвыполнить

public interface Filter {

    default public void init() {
    }

    public void doFilter(ServletRequest request, ServletResponse response,
                         FilterChain chain);

    default public void destroy() {
    }
}

FilterChainвыполнить

public interface FilterChain {

    void doFilter(ServletRequest request, ServletResponse response);
}

ServletRequestвыполнить

public class ServletRequest {

    private String msg;

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    @Override
    public String toString() {
        return "ServletRequest{" +
                "msg='" + msg + '\'' +
                '}';
    }

    public ServletRequest(String msg) {
        this.msg = msg;
    }
}

ServletResponseвыполнить

public class ServletResponse {


}

AbstractLimiterвыполнить

public abstract class AbstractLimiter {

    /**
     * 最大流量
     */
    protected final int MAX_FlOW;

    /**
     * 构造器 , 输入每秒最大流量
     * @param MAX_FlOW 最大流量
     */
    public AbstractLimiter(int MAX_FlOW) {
        this.MAX_FlOW = MAX_FlOW;
    }


    /**
     * 具体实现的方法
     * @param request 请求
     * @param response 响应
     * @param chain 执行
     */
    public abstract void limit(ServletRequest request, ServletResponse response, FilterChain chain);

}

Demoтестовый класс

public class Demo {

    @Test
    public void test() {
        
        // 过滤器
        Filter filter = new Filter() {
            AbstractLimiter limit = null;

            @Override
            public void init() {
                // 入口 ,我们都是每秒限制 100个请求
                limit = new LeakyBucketLimiter(100);
            }

            @Override
            public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) {
                limit.limit(request, response, chain);
            }
        };

        // 过滤器初始化
        filter.init();

        // 计时器
        long start = System.currentTimeMillis();
        
        // 计数器
        AtomicInteger integer = new AtomicInteger(0);

        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 模拟4000次请求
        IntStream.range(0, 4000).forEach(e -> {
            try {
                // 模拟请求延迟
                TimeUnit.MILLISECONDS.sleep(1);
            } catch (InterruptedException e1) {
                //
            }

            // 多线程执行
            pool.execute(()->{
                filter.doFilter(new ServletRequest("" + e), new ServletResponse(), new FilterChain() {
                    @Override
                    public void doFilter(ServletRequest request, ServletResponse response) {
                        // 回调接口
                        integer.incrementAndGet();
                        System.out.println("请求 : "+request.getMsg() + " 通过, 执行线程 "+Thread.currentThread().getName());
                    }
                });
            });
        });

        System.out.println("总耗时" + (System.currentTimeMillis() - start));
        System.out.println("一共通过 : " + integer.get());
    }
}

2. Алгоритм счетчика

计数器算法(Counter)Гу Минси это счетчик.Например я могу пропускать 100 запросов в секунду.Каждый раз когда мне приходит запрос,я буду увеличивать счетчик на 1.Когда счетчик дойдет до 100,я не пропущу запрос,а у него Вопрос: Например, когда я получил 100 запросов на 999мс, он был инициализирован, когда прошло 1000мс, но пришло еще 100 запросов.На данный момент, 200 запросов фактически были обработаны в этом 0.1S.Серьезно перегружен, в это время сервер не может обработать это, и время ожидания всех запросов истекает....

public class CounterLimiter extends AbstractLimiter {

    private static final Integer initFlow = 0;

    private final AtomicInteger flow;

    public CounterLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
		
        // 初始化计数器
        flow = new AtomicInteger(initFlow);

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                // 每1000ms初始化一次
                flow.set(initFlow);
            }
        }, 0, 1000);
    }

    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        // 比较是否超载
        if (flow.get() < MAX_FlOW) {
		// 通过 : 计数器+1
            flow.incrementAndGet();
            chain.doFilter(request, response);
        }
    }
}

3. Алгоритм скользящего окна

滑动窗口算法(Rolling - Window)Можно сказать, что это улучшение алгоритма счетчика.Он подразделяет калькулятор.Например, я подразделяю 1000 мс 1S на 10 100 мс, и у нас есть 10 счетчиков, таких как приведенная выше задача, задача 999 мс и 1000 мс, Поскольку мы непрерывны, я также считаю 1000 мс, приходящие в это время, и эта ситуация не произойдет в это время,

Чем выше степень детализации, тем точнее подсчитываются ресурсы.Hystrixа такжеsentinelЭто все такое мышление Алгоритм скользящего окна в основном рассматривает проблему меньшего количества вычислительных ресурсов.

Мой алгоритм не оптимален, на самом деле не нужно использоватьArrayBlockingQueueДля поддержки ползунка, так как мы выполняем с одним потоком и не будет проблем с многопоточностью, мы можем фактически использоватьLinkedListДля имитации очереди есть и другие моменты, на которые вы также можете обратить внимание.

public class RollingWindowFilter extends AbstractLimiter {

    /**
     * 我们的滑动窗口对象,包含多个窗口
     */
    private final Slider slider;

    /**
     * 程序中暴露的唯一一个计数器,可以称之为当前窗口
     */
    private AtomicInteger counter;

    /**
     * 计数器初始化大小
     */
    private static final int INIT_SIZE = 0;

    /**
     * 比如窗口分为10块,这个代表先进入9块窗口的计算值 , 为什么要引入是因为不浪费计算资源, 好多都是重复计算
     */
    private final AtomicInteger preCount;


    /**
     * 我们默认队列大小是 20 ,其实颗粒度很高了50ms计算一次, 可以重载构造参数调整
     *
     * @param MAX_FlOW 最大流量
     */
    public RollingWindowFilter(int MAX_FlOW) {

        super(MAX_FlOW);

        // 初始化窗口,感觉改名字叫做Windows比较好 ....
        slider = new Slider(20);


        // 初始化对象
        preCount = new AtomicInteger(INIT_SIZE);

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {

                ArrayBlockingQueue<AtomicInteger> queue = slider.blocks;

                // 当前窗口大小
                int size = queue.size();

                /**
                 * 初始化窗口长度
                 */
                if (size < slider.capacity) {
                    try {

                        /**
                         * 计算前面窗口的计数器总和
                         * 
                         * 这里其实由多线程的并发问题 ,其实可以设置一个标识符来表示完成与否 .. 我懒得改了 ,或者你就大量实例化对象,不用我这个单一对象
                         */
                        preCount.set(INIT_SIZE);
                        if (size > 0) {
                            queue.forEach(e -> preCount.addAndGet(e.get()));
                        }

                        // 新建一个计数器, 放入对应的滑块 ,其实就是队尾
                        counter = new AtomicInteger(INIT_SIZE);
                        queue.put(counter);
                    } catch (InterruptedException e) {
                        //
                    }

                }

                /**
                 * 当窗口长度初始化完成
                 */
                if (size == slider.capacity) {

                    try {
                        // 出局最先进来的那个
                        queue.take();

                        // 计算前面窗口的计数器总和 , 有多线程并发问题
                        preCount.set(INIT_SIZE);
                        queue.forEach(e -> preCount.addAndGet(e.get()));

                        
                        // 新建一个计数器, 放入对应的滑块 ,其实就是队尾
                        counter = new AtomicInteger(INIT_SIZE);
                        queue.put(counter);
                    } catch (InterruptedException e) {
                        //
                    }
                }
            }
        }, 0, 1000 / slider.capacity);
    }


    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {

        int cur = counter.get();
        int pre = preCount.get();
        int sum = cur + pre;

        if (sum < MAX_FlOW) {
            counter.incrementAndGet();
            chain.doFilter(request, response);
        }
    }


    /**
     * 滑块组成 , 一个队列维护一个块 , 其实可以用LinkedList来维护 , 我是懒得改
     * <p>
     * 一般内部类来说看JDK源码你会发现都会用private static修饰 ,因为反射不是静态内部类,无法实例化 , 和构造器不加修饰
     */
    private static class Slider {
        // 多少个计数器
        private final int capacity;
        // 放置计数器
        private final ArrayBlockingQueue<AtomicInteger> blocks;

        Slider(int capacity) {
            this.blocks = new ArrayBlockingQueue<>(capacity);
            this.capacity = capacity;
        }
    }
}

4. Алгоритм дырявого ведра

На самом деле так называемая漏桶算法(Leaky Bucket), давайте подумаем, есть вход воды и выход воды, кто управляет нами двумя, вход воды не что иное, как большое количество запросов, а выход воды это запрос, который мы отпускаем, так что он а生产者 - 消费者模型, производитель — это запрос, потребитель — это запрос, который мы потребляем с определенной скоростью,

Алгоритм дырявого ведра может сделать请求流出的速率是均匀的, независимо от того, сколько запросов вы делаете, скорость моего оттока одинакова, когда ведро заполнено, оно переполняется, а если оно не заполнено, оно ждет, когда его выльют.

Когда вы поймете два абзаца выше, вы поймете код ниже, и мои комментарии очень ясны.

public class LeakyBucketLimiter extends AbstractLimiter {

    /**
     * 我们的漏斗
     */
    private final LeakyBucket leakyBucket;

    /**
     * 构造器 , 输入每秒最大流量
     *
     * @param MAX_FlOW 最大流量
     */
    public LeakyBucketLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
        this.leakyBucket = new LeakyBucket(MAX_FlOW);
    }

    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        try {
            // 1. 获取桶当前水的大小
            int size = leakyBucket.bucket.size();

            // 2. 比较桶里的水是否满了
            if (size < leakyBucket.waterSize) {

                // 没有满我们就将水放进去,其实这里put也行 , offer也行 , 看需求
                leakyBucket.bucket.put(new Water(request, response, chain));
            }
        } catch (InterruptedException e) {
            //
        }
    }

    static class LeakyBucket {

        /**
         * 能放多少水,其实就是队列大小
         */
        final int waterSize;

        /**
         * 我们的放水的桶
         */
        final ArrayBlockingQueue<Water> bucket;

        public LeakyBucket(int MAX_FlOW) {
            this.waterSize = MAX_FlOW;
            bucket = new ArrayBlockingQueue<>(this.waterSize);

            /**
             * 模拟消费 , 1S只能过去100个 ,说明 100ms 可以消耗10个, 看你的颗粒度
             */
            new Timer().schedule(new TimerTask() {
                @Override
                public void run() {
                    // 100ms 流出去10个
                    for (int i = 0; i < (waterSize / 10); i++) {
                        try {
                            // 流出的水
                            Water water = bucket.take();

                            // 执行掉
                            water.chain.doFilter(water.request, water.response);
                        } catch (InterruptedException e) {
                            //
                        }
                    }
                }
            }, 0, 100);
        }
    }


    /**
     * 我们的节点对象, 其实可以称之为 成功注入的水 , 等着被漏桶流出去
     */
    static class Water {

        private ServletRequest request;

        private ServletResponse response;

        private FilterChain chain;

        public Water(ServletRequest request, ServletResponse response, FilterChain chain) {
            this.request = request;
            this.response = response;
            this.chain = chain;
        }
    }
}

5. Алгоритм Token Bucket

令牌桶算法(Token Bucket)является противоположностью алгоритма дырявого ведра, и он также生产者消费者模型, это просто обмен ролями, он же мы для управления генерацией, запрос на выполнение потребления, например: Например, если мы ограничим ток до 100, то мы будем генерировать 10 токенов каждые 100мс, а когда количество токенов достигает 100, будем No production, при поступлении запроса удалит токен, если будет получен, то будет передан, а если не может быть получен, то будет отклонен

По этому мы можем сравнить его с алгоритмом дырявого ведра.Предполагая, что мы только начали, и в это время есть 100 запросов, ведро токенов может отклонить 90 токенов, потому что я произвел только 10 токенов, а как быть с дырявым ведром он не будет, он положит в него все 100 запросов и будет медленно потреблять его, потому что мой объем ведра 100, столько запросов можно положить, в этом разница между ними.... На самом деле, это почти стабильно.

生产者消费者模型Преобразование мышления может сделать мышление более ясным, а выбор модели иногда является подходящим способом решения проблемы.

Алгоритм Token Bucket в основном используется в Интернете.GuavaизRateLimiterРеализовано, здесь я реализую два типа, один реализовать самому, другой использоватьRateLimiter,

1. Самостоятельное ведро токенов

public class TokenBucketLimiter extends AbstractLimiter {

    /**
     * 令牌桶
     */
    private final TokenBucket tokenBucket;

    /**
     * 构造器 , 输入每秒最大流量
     *
     * @param MAX_FlOW 最大流量
     */
    public TokenBucketLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
        this.tokenBucket = new TokenBucket(MAX_FlOW);
    }


    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        /**
         * 这里我们就不使用 take的阻塞思想了 ,直接poll去拉去 ,然后等待5mS ,  如果拉去不到直接返回失败 , 其实等待的长了点
         */
        try {
            // 尝试去获取一个令牌
            Token token = tokenBucket.bucket.poll(5, TimeUnit.MILLISECONDS);
            
            // 拿到通过
            if (null != token) {
                chain.doFilter(request, response);
            }

        } catch (InterruptedException e) {
            //
        }

    }


    /**
     * 令牌桶
     */
    private static class TokenBucket {
        /**
         * 令牌存放的位置 , 用一个队列维护
         */
        private final ArrayBlockingQueue<Token> bucket;

        /**
         * 桶最多存放多少个令牌
         */
        private final int tokenSize;

        public TokenBucket(int MAX_FlOW) {
            this.tokenSize = MAX_FlOW;
            this.bucket = new ArrayBlockingQueue<>(this.tokenSize);

            new Timer().schedule(new TimerTask() {
                @Override
                public void run() {
                    for (int x = 0; x < (tokenSize / 10); x++) {
                        try {
                            if (bucket.size() < tokenSize) {
                                // 定时放入令牌
                                bucket.put(new Token());
                            }
                        } catch (InterruptedException e) {
                            //
                        }
                    }
                }
            }, 0, 100);
        }
    }

    /**
     * 令牌
     */
    private static class Token {

    }
}

2. Реализовать ведро токенов на основе RateLimiter от Guava.

public class GuavaRateLimiter extends AbstractLimiter {

    /**
     * 令牌桶
     */
    private final RateLimiter limiter;

    /**
     * 每次需要的令牌个数
     */
    private static final int ACQUIRE_NUM = 1;
    /**
     * 最长等待时间
     */
    private static final int WAIT_TIME_PER_MILLISECONDS = 5;

    /**
     * 构造器 , 输入每秒最大流量
     *
     * @param MAX_FlOW 最大流量
     */
    public GuavaRateLimiter(final int MAX_FlOW) {
        super(MAX_FlOW);
        limiter = RateLimiter.create(MAX_FlOW);
    }


    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        /**
         * 意思就是 我尝试去获取1个令牌 ,最大等待时间是 5 ms , 其实太长了, 真是开发也就1ms不到
         */
        boolean flag = limiter.tryAcquire(ACQUIRE_NUM, WAIT_TIME_PER_MILLISECONDS, TimeUnit.MILLISECONDS);
        if (flag) {
            chain.doFilter(request, response);
        }
    }
}