В микросервисной системе кэширование, ограничение тока и слияние — это три инструмента для обеспечения высокой доступности системы.Сегодня мы поговорим об ограничении тока.
Ограничение по току — это один из способов обеспечить высокую доступность системы. Конечно, это также часто задаваемый вопрос для крупных производителей. Если интервьюер Али спросит: «Как добиться текущего ограничения в 1К запросов в секунду? Написал несколько схем ограничения тока, ароматно бы, ха-ха :smirk:! Не мудрствуя лукаво, перечислю несколько часто используемых реализаций ограничения тока.
Guava RateLimiter
Guava – отличный проект с открытым исходным кодом в области Java, включая коллекции, строки и кэши, обычно используемые в повседневной разработке. RateLimiter – часто используемый инструмент ограничения тока.
RateLimiter реализован на основе алгоритма ведра токенов, если есть 10 токенов в секунду, внутренняя реализация будет производить 1 токен каждые 100 мс.
Использование Guava RateLimiter
-
Введите pom-зависимости:
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency>
-
Код:
public class GuavaRateLimiterTest { //比如每秒生产10个令牌,相当于每100ms生产1个令牌 private RateLimiter rateLimiter = RateLimiter.create(10); /** * 模拟执行业务方法 */ public void exeBiz() { if (rateLimiter.tryAcquire(1)) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程" + Thread.currentThread().getName() + ":执行业务逻辑"); } else { System.out.println("线程" + Thread.currentThread().getName() + ":被限流"); } } public static void main(String[] args) throws InterruptedException { GuavaRateLimiterTest limiterTest = new GuavaRateLimiterTest(); Thread.sleep(500);//等待500ms,让limiter生产一些令牌 //模拟瞬间生产100个线程请求 for (int i = 0; i < 100; i++) { new Thread(limiterTest::exeBiz).start(); } } }
Количество скользящих окон
Например, интерфейс позволяет 100 запросов в секунду, установите скользящее окно, в окне 10 сеток, каждая сетка занимает 100 мс и перемещается один раз каждые 100 мс. Чем больше будет разбито сеток скользящего окна, тем плавнее будет прокрутка скользящего окна и тем точнее будет текущая ограничивающая статистика.
Код:
/**
* 滑窗计数器
*/
public class SliderWindowRateLimiter implements Runnable {
//每秒允许的最大访问数
private final long maxVisitPerSecond;
//将每秒时间划分N个块
private final int block;
//每个块存储的数量
private final AtomicLong[] countPerBlock;
//滑动窗口划到了哪个块儿,可以理解为滑动窗口的起始下标位置
private volatile int index;
//目前总的数量
private AtomicLong allCount;
/**
* 构造函数
*
* @param block,每秒钟划分N个窗口
* @param maxVisitPerSecond 每秒最大访问数量
*/
public SliderWindowRateLimiter(int block, long maxVisitPerSecond) {
this.block = block;
this.maxVisitPerSecond = maxVisitPerSecond;
countPerBlock = new AtomicLong[block];
for (int i = 0; i < block; i++) {
countPerBlock[i] = new AtomicLong();
}
allCount = new AtomicLong(0);
}
/**
* 判断是否超过最大允许数量
*
* @return
*/
public boolean isOverLimit() {
return currentQPS() > maxVisitPerSecond;
}
/**
* 获取目前总的访问数
*
* @return
*/
public long currentQPS() {
return allCount.get();
}
/**
* 请求访问进来,判断是否可以执行业务逻辑
*/
public void visit() {
countPerBlock[index].incrementAndGet();
allCount.incrementAndGet();
if (isOverLimit()) {
System.out.println(Thread.currentThread().getName() + "被限流" + ",currentQPS:" + currentQPS() + ",index:" + index);
} else {
System.out.println(Thread.currentThread().getName() + "执行业务逻辑" + ",currentQPS:" + currentQPS() + ",index:" + index);
}
}
/**
* 定时执行器,
* 每N毫秒滑块移动一次,然后再设置下新滑块的初始化数字0,然后新的请求会落到新的滑块上
* 同时总数减掉新滑块上的数字,并且重置新的滑块上的数量
*/
@Override
public void run() {
index = (index + 1) % block;
long val = countPerBlock[index].getAndSet(0);
allCount.addAndGet(-val);
}
public static void main(String[] args) {
SliderWindowRateLimiter sliderWindowRateLimiter = new SliderWindowRateLimiter(10, 100);
//固定的速率移动滑块
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(sliderWindowRateLimiter, 100, 100, TimeUnit.MILLISECONDS);
//模拟不同速度的请求
new Thread(() -> {
while (true) {
sliderWindowRateLimiter.visit();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//模拟不同速度的请求
new Thread(() -> {
while (true) {
sliderWindowRateLimiter.visit();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
сигнал
При использовании семафора ресурсы семафора высвобождаются с фиксированной скоростью. Когда поток получает ресурс, он выполняет бизнес-код.
Код:
public class SemaphoreOne {
private static Semaphore semaphore = new Semaphore(10);
public static void bizMethod() throws InterruptedException {
if (!semaphore.tryAcquire()) {
System.out.println(Thread.currentThread().getName() + "被拒绝");
return;
}
System.out.println(Thread.currentThread().getName() + "执行业务逻辑");
Thread.sleep(500);//模拟处理业务逻辑需要1秒
semaphore.release();
}
public static void main(String[] args) {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
semaphore.release(10);
System.out.println("释放所有锁");
}
}, 1000, 1000);
for (int i = 0; i < 10000; i++) {
try {
Thread.sleep(10);//模拟每隔10ms就有1个请求进来
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
try {
SemaphoreOne.bizMethod();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
ведро с жетонами
Алгоритм ведра токенов: ведро, в котором хранятся токены с фиксированной емкостью, и токены добавляются в ведро с фиксированной скоростью. Если поступает запрос, вам необходимо сначала получить токен из корзины, а когда в корзине нет доступных токенов, задача отклоняется.
Преимущество сегментов токенов заключается в том, что скорость добавления токенов может быть изменена, а после увеличения скорости можно обрабатывать пакетный трафик.
Код:
public class TokenBucket {
/**
* 定义的桶
*/
public class Bucket {
//容量
int capacity;
//速率,每秒放多少
int rateCount;
//目前token个数
AtomicInteger curCount = new AtomicInteger(0);
public Bucket(int capacity, int rateCount) {
this.capacity = capacity;
this.rateCount = rateCount;
}
public void put() {
if (curCount.get() < capacity) {
System.out.println("目前数量==" + curCount.get() + ", 我还可以继续放");
curCount.addAndGet(rateCount);
}
}
public boolean get() {
if (curCount.get() >= 1) {
curCount.decrementAndGet();
return true;
}
return false;
}
}
@Test
public void testTokenBucket() throws InterruptedException {
Bucket bucket = new Bucket(5, 2);
//固定线程,固定的速率往桶里放数据,比如每秒N个
ScheduledThreadPoolExecutor scheduledCheck = new ScheduledThreadPoolExecutor(1);
scheduledCheck.scheduleAtFixedRate(() -> {
bucket.put();
}, 0, 1, TimeUnit.SECONDS);
//先等待一会儿,让桶里放点token
Thread.sleep(6000);
//模拟瞬间10个线程进来拿token
for (int i = 0; i < 10; i++) {
new Thread(() -> {
if (bucket.get()) {
System.out.println(Thread.currentThread() + "获取到了资源");
} else {
System.out.println(Thread.currentThread() + "被拒绝");
}
}).start();
}
//等待,往桶里放token
Thread.sleep(3000);
//继续瞬间10个线程进来拿token
for (int i = 0; i < 10; i++) {
new Thread(() -> {
if (bucket.get()) {
System.out.println(Thread.currentThread() + "获取到了资源");
} else {
System.out.println(Thread.currentThread() + "被拒绝");
}
}).start();
}
}
}
Суммировать
В этой статье в основном представлены несколько методов ограничения тока: Guava RateLimiter, простой подсчет, подсчет скользящего окна, семафор, ведро токенов, конечно же, алгоритм ограничения тока, алгоритм дырявого ведра, ограничение тока nginx и так далее. Эти методы, написанные Half Cigarette, являются только методами, которые люди всегда использовали в реальных проектах, или методами, которые были написаны во время участия в письменном тесте Али.
Если у вас есть лучшие идеи, пожалуйста, поделитесь со мной!