Java реализует системное ограничение тока

Java

В микросервисной системе кэширование, ограничение тока и слияние — это три инструмента для обеспечения высокой доступности системы.Сегодня мы поговорим об ограничении тока.

Ограничение по току — это один из способов обеспечить высокую доступность системы. Конечно, это также часто задаваемый вопрос для крупных производителей. Если интервьюер Али спросит: «Как добиться текущего ограничения в 1К запросов в секунду? Написал несколько схем ограничения тока, ароматно бы, ха-ха :smirk:! Не мудрствуя лукаво, перечислю несколько часто используемых реализаций ограничения тока.

Guava RateLimiter

Guava – отличный проект с открытым исходным кодом в области Java, включая коллекции, строки и кэши, обычно используемые в повседневной разработке. RateLimiter – часто используемый инструмент ограничения тока.

RateLimiter реализован на основе алгоритма ведра токенов, если есть 10 токенов в секунду, внутренняя реализация будет производить 1 токен каждые 100 мс.

Использование Guava RateLimiter
  1. Введите pom-зависимости:

    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>23.0</version>
    </dependency>
    
  2. Код:

    public class GuavaRateLimiterTest {
        //比如每秒生产10个令牌,相当于每100ms生产1个令牌
        private RateLimiter rateLimiter = RateLimiter.create(10);
    
        /**
         * 模拟执行业务方法
         */
        public void exeBiz() {
            if (rateLimiter.tryAcquire(1)) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("线程" + Thread.currentThread().getName() + ":执行业务逻辑");
            } else {
                System.out.println("线程" + Thread.currentThread().getName() + ":被限流");
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            GuavaRateLimiterTest limiterTest = new GuavaRateLimiterTest();
            Thread.sleep(500);//等待500ms,让limiter生产一些令牌
    
            //模拟瞬间生产100个线程请求
            for (int i = 0; i < 100; i++) {
                new Thread(limiterTest::exeBiz).start();
            }
        }
    }
    

Количество скользящих окон

Например, интерфейс позволяет 100 запросов в секунду, установите скользящее окно, в окне 10 сеток, каждая сетка занимает 100 мс и перемещается один раз каждые 100 мс. Чем больше будет разбито сеток скользящего окна, тем плавнее будет прокрутка скользящего окна и тем точнее будет текущая ограничивающая статистика.

滑窗计数器

Код:

/**
 * 滑窗计数器
 */
public class SliderWindowRateLimiter implements Runnable {
    //每秒允许的最大访问数
    private final long maxVisitPerSecond;
    //将每秒时间划分N个块
    private final int block;
    //每个块存储的数量
    private final AtomicLong[] countPerBlock;
    //滑动窗口划到了哪个块儿,可以理解为滑动窗口的起始下标位置
    private volatile int index;
    //目前总的数量
    private AtomicLong allCount;

    /**
     * 构造函数
     *
     * @param block,每秒钟划分N个窗口
     * @param maxVisitPerSecond 每秒最大访问数量
     */
    public SliderWindowRateLimiter(int block, long maxVisitPerSecond) {
        this.block = block;
        this.maxVisitPerSecond = maxVisitPerSecond;
        countPerBlock = new AtomicLong[block];
        for (int i = 0; i < block; i++) {
            countPerBlock[i] = new AtomicLong();
        }
        allCount = new AtomicLong(0);
    }

    /**
     * 判断是否超过最大允许数量
     *
     * @return
     */
    public boolean isOverLimit() {
        return currentQPS() > maxVisitPerSecond;
    }

    /**
     * 获取目前总的访问数
     *
     * @return
     */
    public long currentQPS() {
        return allCount.get();
    }

    /**
     * 请求访问进来,判断是否可以执行业务逻辑
     */
    public void visit() {
        countPerBlock[index].incrementAndGet();
        allCount.incrementAndGet();

        if (isOverLimit()) {
            System.out.println(Thread.currentThread().getName() + "被限流" + ",currentQPS:" + currentQPS() + ",index:" + index);
        } else {
            System.out.println(Thread.currentThread().getName() + "执行业务逻辑" + ",currentQPS:" + currentQPS() + ",index:" + index);
        }
    }

    /**
     * 定时执行器,
     * 每N毫秒滑块移动一次,然后再设置下新滑块的初始化数字0,然后新的请求会落到新的滑块上
     * 同时总数减掉新滑块上的数字,并且重置新的滑块上的数量
     */
    @Override
    public void run() {
        index = (index + 1) % block;
        long val = countPerBlock[index].getAndSet(0);
        allCount.addAndGet(-val);
    }

    public static void main(String[] args) {
        SliderWindowRateLimiter sliderWindowRateLimiter = new SliderWindowRateLimiter(10, 100);

        //固定的速率移动滑块
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate(sliderWindowRateLimiter, 100, 100, TimeUnit.MILLISECONDS);

        //模拟不同速度的请求
        new Thread(() -> {
            while (true) {
                sliderWindowRateLimiter.visit();
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //模拟不同速度的请求
        new Thread(() -> {
            while (true) {
                sliderWindowRateLimiter.visit();
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

сигнал

При использовании семафора ресурсы семафора высвобождаются с фиксированной скоростью. Когда поток получает ресурс, он выполняет бизнес-код.

Код:

public class SemaphoreOne {
    private static Semaphore semaphore = new Semaphore(10);

    public static void bizMethod() throws InterruptedException {
        if (!semaphore.tryAcquire()) {
            System.out.println(Thread.currentThread().getName() + "被拒绝");
            return;
        }

        System.out.println(Thread.currentThread().getName() + "执行业务逻辑");
        Thread.sleep(500);//模拟处理业务逻辑需要1秒
        semaphore.release();
    }

    public static void main(String[] args) {

        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                semaphore.release(10);
                System.out.println("释放所有锁");
            }
        }, 1000, 1000);

        for (int i = 0; i < 10000; i++) {
            try {
                Thread.sleep(10);//模拟每隔10ms就有1个请求进来
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(() -> {
                try {
                    SemaphoreOne.bizMethod();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

ведро с жетонами

Алгоритм ведра токенов: ведро, в котором хранятся токены с фиксированной емкостью, и токены добавляются в ведро с фиксированной скоростью. Если поступает запрос, вам необходимо сначала получить токен из корзины, а когда в корзине нет доступных токенов, задача отклоняется.

Преимущество сегментов токенов заключается в том, что скорость добавления токенов может быть изменена, а после увеличения скорости можно обрабатывать пакетный трафик.

20161109175913747

Код:

public class TokenBucket {
    /**
     * 定义的桶
     */
    public class Bucket {
        //容量
        int capacity;
        //速率,每秒放多少
        int rateCount;
        //目前token个数
        AtomicInteger curCount = new AtomicInteger(0);

        public Bucket(int capacity, int rateCount) {
            this.capacity = capacity;
            this.rateCount = rateCount;
        }

        public void put() {
            if (curCount.get() < capacity) {
                System.out.println("目前数量==" + curCount.get() + ", 我还可以继续放");
                curCount.addAndGet(rateCount);
            }
        }

        public boolean get() {
            if (curCount.get() >= 1) {
                curCount.decrementAndGet();
                return true;
            }
            return false;
        }
    }

    @Test
    public void testTokenBucket() throws InterruptedException {

        Bucket bucket = new Bucket(5, 2);

        //固定线程,固定的速率往桶里放数据,比如每秒N个
        ScheduledThreadPoolExecutor scheduledCheck = new ScheduledThreadPoolExecutor(1);
        scheduledCheck.scheduleAtFixedRate(() -> {
            bucket.put();
        }, 0, 1, TimeUnit.SECONDS);

        //先等待一会儿,让桶里放点token
        Thread.sleep(6000);

        //模拟瞬间10个线程进来拿token
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                if (bucket.get()) {
                    System.out.println(Thread.currentThread() + "获取到了资源");
                } else {
                    System.out.println(Thread.currentThread() + "被拒绝");
                }
            }).start();
        }

        //等待,往桶里放token
        Thread.sleep(3000);

        //继续瞬间10个线程进来拿token
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                if (bucket.get()) {
                    System.out.println(Thread.currentThread() + "获取到了资源");
                } else {
                    System.out.println(Thread.currentThread() + "被拒绝");
                }
            }).start();
        }
    }
}

Суммировать

В этой статье в основном представлены несколько методов ограничения тока: Guava RateLimiter, простой подсчет, подсчет скользящего окна, семафор, ведро токенов, конечно же, алгоритм ограничения тока, алгоритм дырявого ведра, ограничение тока nginx и так далее. Эти методы, написанные Half Cigarette, являются только методами, которые люди всегда использовали в реальных проектах, или методами, которые были написаны во время участия в письменном тесте Али.

Если у вас есть лучшие идеи, пожалуйста, поделитесь со мной!

Оригинальная ссылка