Распределенное ограничение тока

Redis задняя часть распределенный Lua

предисловие

Эта статья продолжается вышеОграничение тока приложенияпровести обсуждение.

Текущая схема ограничения, упомянутая ранее, действительна только для одной JVM, то есть приложения для одной машины. Для распределенных приложений, которые сейчас распространены, также должна быть распределенная схема ограничения тока.

На основе этого я попытался написать этот компонент:

GitHub.com/crossover J я…

DEMO

Используется следующее

GitHub.com/crossover J я…

для демонстрации.

Текущий лимит принимается в интерфейсе, предоставляемом приложением Order. Первый — это bean-компонент, настроенный с помощью инструмента ограничения тока:

@Configuration
public class RedisLimitConfig {


    @Value("${redis.limit}")
    private int limit;


    @Autowired
    private JedisConnectionFactory jedisConnectionFactory;

    @Bean
    public RedisLimit build() {
        RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection();
        JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection();
        RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
                .limit(limit)
                .build();

        return redisLimit;
    }
}

Затем используйте компонент в контроллере:

    @Autowired
    private RedisLimit redisLimit ;

    @Override
    @CheckReqNo
    public BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) {
        BaseResponse<OrderNoResVO> res = new BaseResponse();

        //限流
        boolean limit = redisLimit.limit();
        if (!limit){
            res.setCode(StatusEnum.REQUEST_LIMIT.getCode());
            res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage());
            return res ;
        }

        res.setReqNo(orderNoReq.getReqNo());
        if (null == orderNoReq.getAppId()){
            throw new SBCException(StatusEnum.FAIL);
        }
        OrderNoResVO orderNoRes = new OrderNoResVO() ;
        orderNoRes.setOrderId(DateUtil.getLongTime());
        res.setCode(StatusEnum.SUCCESS.getCode());
        res.setMessage(StatusEnum.SUCCESS.getMessage());
        res.setDataBody(orderNoRes);
        return res ;
    }
    

Для удобства также предусмотрены аннотации:

    @Override
    @ControllerLimit
    public BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) {
        BaseResponse<OrderNoResVO> res = new BaseResponse();
        // 业务逻辑
        return res ;
    }

Эта аннотация перехватывает HTTP-запрос и возвращается непосредственно, когда запрос достигает порогового значения.

Также можно использовать обычные методы:

@CommonLimit
public void doSomething(){}

Исключение будет выдано, когда вызов достигнет порога.

Для имитации параллелизма вUserПриложение запускает 10 потоков, вызывающих Order(Количество текущего лимита 5) интерфейс (вы также можете использовать профессиональные инструменты параллельного тестирования, такие как JMeter и т. д.).

    @Override
    public BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) {
        //调用远程服务
        OrderNoReqVO vo = new OrderNoReqVO();
        vo.setAppId(1L);
        vo.setReqNo(userReq.getReqNo());

        for (int i = 0; i < 10; i++) {
            executorService.execute(new Worker(vo, orderServiceClient));
        }

        UserRes userRes = new UserRes();
        userRes.setUserId(123);
        userRes.setUserName("张三");

        userRes.setReqNo(userReq.getReqNo());
        userRes.setCode(StatusEnum.SUCCESS.getCode());
        userRes.setMessage("成功");

        return userRes;
    }
    

    private static class Worker implements Runnable {

        private OrderNoReqVO vo;
        private OrderServiceClient orderServiceClient;

        public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) {
            this.vo = vo;
            this.orderServiceClient = orderServiceClient;
        }

        @Override
        public void run() {

            BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo);
            logger.info("远程返回:" + JSON.toJSONString(orderNo));

        }
    }    

Два приложения Order запускаются для проверки распределенного эффекта.

Эффект следующий:

Принцип реализации

Принцип реализации на самом деле очень прост. Так как должен быть достигнут эффект распределенного глобального троттлинга, естественно требуется сторонний компонент для учета количества запросов.

Среди них Redis очень подходит для таких сценариев.

  • Текущее время (с точностью до секунд) записывается в Redis в качестве ключа для каждого запроса, а время ожидания установлено на 2 секунды, и Redis автоматически увеличивает значение ключа.
  • Возвращает ошибку при достижении порога.
  • Операция записи в Redis выполняется с помощью сценариев Lua, а однопоточный механизм Redis может обеспечить атомарность каждого запроса Redis.

Lua-скрипт выглядит следующим образом:

--lua 下标从 1 开始
-- 限流 key
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])

-- 获取当前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")

if curentLimit + 1 > limit then
    -- 达到限流大小 返回
    return 0;
else
    -- 没有达到阈值 value + 1
    redis.call("INCRBY", key, 1)
    redis.call("EXPIRE", key, 2)
    return curentLimit + 1
end

Логика вызова в Java:

    public boolean limit() {
        String key = String.valueOf(System.currentTimeMillis() / 1000);
        Object result = null;
        if (jedis instanceof Jedis) {
            result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
        } else if (jedis instanceof JedisCluster) {
            result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
        } else {
            //throw new RuntimeException("instance is error") ;
            return false;
        }

        if (FAIL_CODE != (Long) result) {
            return true;
        } else {
            return false;
        }
    }

Поэтому необходимо вызывать этот метод только в том месте, где требуется ограничение тока, чтобы оценить возвращаемое значение для достижения цели ограничения тока.

Конечно, это всего лишь грубый счетчик с использованием Redis.Если вы хотите реализовать алгоритм ведра токенов, подобный приведенному выше, вы можете реализовать его самостоятельно на основе Lua.

Строитель Строитель

При разработке этого компонента мы хотим предоставить пользователям понятный, удобочитаемый и подверженный ошибкам API, насколько это возможно.

Например, на первом шаге, как построить объект ограничения тока.

Наиболее распространенным способом, естественно, является конструктор.Если полей несколько, можно использовать перекрывающийся конструктор:

public A(){}
public A(int a){}
public A(int a,int b){}

Недостатки тоже очевидны: если слишком много параметров, будет сложно читать, и даже если параметры одного типа, клиент меняет порядок, но предупреждения и непредсказуемых результатов нет.

Второе решение может использовать шаблон JavaBean, используяsetterметод построения:

A a = new A();
a.setA(a);
a.setB(b);

Этот подход ясен и легко читается, но легко оставить объект в несогласованном состоянии, оставив объект в небезопасном для потоков состоянии.

Итак, вот третий способ создания объектов, конструктор:

public class RedisLimit {

    private JedisCommands jedis;
    private int limit = 200;

    private static final int FAIL_CODE = 0;

    /**
     * lua script
     */
    private String script;

    private RedisLimit(Builder builder) {
        this.limit = builder.limit ;
        this.jedis = builder.jedis ;
        buildScript();
    }


    /**
     * limit traffic
     * @return if true
     */
    public boolean limit() {
        String key = String.valueOf(System.currentTimeMillis() / 1000);
        Object result = null;
        if (jedis instanceof Jedis) {
            result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
        } else if (jedis instanceof JedisCluster) {
            result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
        } else {
            //throw new RuntimeException("instance is error") ;
            return false;
        }

        if (FAIL_CODE != (Long) result) {
            return true;
        } else {
            return false;
        }
    }


    /**
     * read lua script
     */
    private void buildScript() {
        script = ScriptUtil.getScript("limit.lua");
    }


    /**
     *  the builder
     * @param <T>
     */
    public static class Builder<T extends JedisCommands>{
        private T jedis = null ;

        private int limit = 200;


        public Builder(T jedis){
            this.jedis = jedis ;
        }

        public Builder limit(int limit){
            this.limit = limit ;
            return this;
        }

        public RedisLimit build(){
            return new RedisLimit(this) ;
        }

    }
}

Таким образом, когда клиент использует:

RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
                .limit(limit)
                .build();

Это более просто и позволяет избежать разделения процесса создания на несколько подэтапов.

Это полезно, когда есть несколько параметров конструктора, но нет обязательных полей.

Так кстати метод билдера распределенных замков тоже обновился:

GitHub.com/crossover J я…

Дополнительные сведения см. в разделе «Эффективное использование Java».

API

Как видно из вышеизложенного, процесс использования заключается в вызовеlimitметод.

   //限流
    boolean limit = redisLimit.limit();
    if (!limit){
       //具体限流逻辑
    }

Чтобы уменьшить навязчивость и упростить клиент, предусмотрены два метода аннотации.

@ControllerLimit

Эту аннотацию можно использовать для@RequestMappingВ декорированном интерфейсе он предоставит ответ ограничения тока после ограничения тока.

Реализация выглядит следующим образом:

@Component
public class WebIntercept extends WebMvcConfigurerAdapter {

    private static Logger logger = LoggerFactory.getLogger(WebIntercept.class);


    @Autowired
    private RedisLimit redisLimit;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new CustomInterceptor())
                .addPathPatterns("/**");
    }


    private class CustomInterceptor extends HandlerInterceptorAdapter {
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
                                 Object handler) throws Exception {


            if (redisLimit == null) {
                throw new NullPointerException("redisLimit is null");
            }

            if (handler instanceof HandlerMethod) {
                HandlerMethod method = (HandlerMethod) handler;

                ControllerLimit annotation = method.getMethodAnnotation(ControllerLimit.class);
                if (annotation == null) {
                    //skip
                    return true;
                }

                boolean limit = redisLimit.limit();
                if (!limit) {
                    logger.warn("request has bean limit");
                    response.sendError(500, "request limit");
                    return false;
                }

            }

            return true;

        }
    }
}

Фактически, он реализует перехватчик в SpringMVC и оценивает, используются ли аннотации в процессе перехвата, чтобы вызвать текущую ограничивающую логику.

Предпосылка состоит в том, что приложение должно сканировать этот класс и позволить Spring управлять им.

@ComponentScan(value = "com.crossoverjie.distributed.intercept")

@CommonLimit

Конечно, его можно использовать и в обычных методах. Принцип реализации — Spring AOP (перехватчик SpringMVC тоже АОП по своей природе).

@Aspect
@Component
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class CommonAspect {

    private static Logger logger = LoggerFactory.getLogger(CommonAspect.class);

    @Autowired
    private RedisLimit redisLimit ;

    @Pointcut("@annotation(com.crossoverjie.distributed.annotation.CommonLimit)")
    private void check(){}

    @Before("check()")
    public void before(JoinPoint joinPoint) throws Exception {

        if (redisLimit == null) {
            throw new NullPointerException("redisLimit is null");
        }

        boolean limit = redisLimit.limit();
        if (!limit) {
            logger.warn("request has bean limit");
            throw new RuntimeException("request has bean limit") ;
        }

    }
}

Это очень просто, и в процессе перехвата также вызывается текущий лимит.

Разумеется, пакет также необходимо сканировать при использовании:

@ComponentScan(value = "com.crossoverjie.distributed.intercept")

Суммировать

ОграничениеВ системе с высоким параллелизмом и большим трафиком это мощный инструмент для защиты приложений, и есть много зрелых решений.Я надеюсь предоставить некоторые идеи для друзей, которые только что узнали об этом.

Весь приведенный выше исходный код:

Заинтересованные друзья могут нажать «Звезда» или отправить PR.

Дополнительный

Недавно я обобщил некоторые знания, связанные с Java, и заинтересованные друзья могут поддерживать их вместе.

адрес:GitHub.com/crossover J я…