Как использовать Redis для реализации отложенной обработки

Redis

задний план

В последнее время необходимо выполнить функцию обработки задержки, в основном для выполнения обработки задержки в соответствии с полем задержки в сообщении после использования сообщений от kafka.Есть некоторые моменты, требующие внимания в фактическом процессе реализации, которые записываются следующим образом.

Процесс реализации

Когда речь заходит о функции тайминга в java, первое, что приходит на ум, это Timer и ScheduledThreadPoolExecutor, но, напротив, Timer можно исключить по следующим основным причинам:

  • Timer использует абсолютное время, и изменения системного времени окажут определенное влияние на Timer, а ScheduledThreadPoolExecutor использует относительное время, так что такой проблемы нет.
  • Timer использует один поток для обработки задач, длительные задачи вызывают задержку обработки других задач, а ScheduledThreadPoolExecutor может настраивать количество потоков.
  • Timer не обрабатывает исключения во время выполнения.Как только задача вызывает исключение во время выполнения, весь Timer аварийно завершает работу, в то время как ScheduledThreadPoolExecutor перехватывает исключения во время выполнения (которые можно обработать в методе обратного вызова afterExecute()), поэтому это безопаснее.
  1. Запланированный поток пула исполнителей Решил использовать ScheduledThreadPoolExecutor для реализации, следующим шагом будет написание кода (общий код процесса). Основная реализация задержки выглядит следующим образом:
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new 
ThreadPoolExecutor.AbortPolicy());
//从消息中取出延迟时间及相关信息的代码略
int delayTime = 0;
executorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            //具体操作逻辑
        }},0,delayTime, TimeUnit.SECONDS);

Среди них NamedThreadFactory — это настроенная мной фабрика потоков. Она в основном определяет имя пула потоков и соответствующую печать журнала для последующего анализа проблем. Я не буду ее здесь представлять. Политика отказа также является политикой отказа по умолчанию. Потом я его протестировал, и функции, отвечающие целевым требованиям, могут выполняться через заданную задержку, пока вроде бы функция выполнена. Вы можете удивиться, это слишком просто, что сказать, но этот метод прост в реализации, но есть потенциальная проблема, в чем проблема, давайте посмотрим на исходный код ScheduledThreadPoolExecutor:

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, 
    TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}

Запланированный поток пула исполнителей Из-за собственных характеристик задержки и цикла DelayWorkQueue используется по умолчанию, и в отличие от обычно используемого нами SingleThreadExecutor, мы можем использовать собственную LinkedBlockingQueue и задавать размер очереди.Проблема вот в чем. DelayWrokQueue — это неограниченная очередь, а наш целевой источник данных — kafka, которая представляет собой очередь сообщений с высокой степенью параллелизма и высокой пропускной способностью.Вполне вероятно, что за определенный период времени придет большое количество сообщений, что приведет к OOM. При использовании многопоточности необходимо учитывать возможность ООМ, потому что последствия ООМ часто бывают серьезными, а временное решение системного ООМ можно только перезапустить, что может привести к необратимым проблемам, таким как потеря пользовательских данных, поэтому от этап проектирования кодирования Используйте наилучшие средства, чтобы избежать этих проблем.

  1. Используя комбинацию Redis и потоков, на этот раз мы изменили свое мышление и использовали Redis, чтобы помочь нам выполнить буферизацию, чтобы избежать проблемы слишком большого количества сообщений OOM. Связанный redis zset API:
//添加元素
ZADD key score member [[score member] [score member] …]
//根据分值及限制数量查询
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
//从zset中删除指定成员
ZREM key member [member …]

Мы используем структуру zset базовой структуры данных Redis и используем оценку для хранения значения нашего целевого времени отправки.Общий поток обработки выглядит следующим образом:

  • Первый шаг хранения данных: получено сообщение о заказе от kafka в 9:10, запрос уведомления о доставке через 30 минут, затем мы добавим 30 минут к текущему времени и преобразуем его в метку времени в качестве счета и ключа a Номер заказа a хранится в Redis. код показывает, как показано ниже:
public void onMessage(String topic, String message) {
        String orderId;
		int delayTime = 0;
        try {
            Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
            }.getType());
            if (msgMap.isEmpty()) {
                return;
            }
            LOGGER.info("onMessage kafka content:{}", msgMap.toString());
	    orderId = msgMap.get("orderId");
            if(StringUtils.isNotEmpty(orderId)){
                delayTime = Integer.parseInt(msgMap.get("delayTime"));
                Calendar calendar = Calendar.getInstance();
                //计算出预计发送时间
                calendar.add(Calendar.MINUTE, delayTime);
                long sendTime = calendar.getTimeInMillis();
                RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);
                LOGGER.info("orderId:{}---放入redis中等待发送---sendTime:{}", ---orderId:{}, sendTime);
            }
        } catch (Exception e) {
            LOGGER.info("onMessage 延时发送异常:{}", e);
        }
    }

  • Второй шаг обработки данных: Конкретное время планирования другого потока определяется в соответствии с потребностями бизнеса.Я выполняю его здесь каждые 3 минуты.Внутренняя логика: взять определенное количество данных zset из redis, как его взять, использовать zrangeByScore метод zset, в соответствии с Оценка данных отсортирована, конечно, вы можете привести период времени.Здесь, от 0 до настоящего времени, для потребления, следует отметить, что после извлечения данных нам нужно использовать метод zrem для удаления извлеченных данных из zset, чтобы предотвратить другие потоки.Повторное потребление данных. После этого выполняется соответствующая логика, такая как уведомление о следующей доставке. код показывает, как показано ниже:
public void run(){
        //获取批量大小
        int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));
        try {
            //批量获取离发送时间最近的orderNum条数据
	    Calendar calendar = Calendar.getInstance();
	    long now = calendar.getTimeInMillis();
	    //获取无限早到现在的事件key(防止上次批量数量小于放入数量,存在历史数据未消费情况)
	    Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);
	    LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
            if (CollectionUtils.isNotEmpty(orders)){
                //删除key 防止重复发送
                for (String orderId : orderIds) {
                    RedisUtils.getInstance().zrem(Constant.DELAY, orderId);
                }
	        //接下来执行发送等业务逻辑                 
            }
        } catch (Exception e) {
            LOGGER.warn("task.run exception:{}", e);
        }
    }

На данный момент функция использования Redis и потоков для завершения отложенной отправки завершена.

Эпилог

Затем сравните преимущества и недостатки двух вышеперечисленных реализаций:

  • Первый метод прост в реализации, не зависит от внешних компонентов и позволяет быстро достичь целевой функции, но недостатки также очевидны, и его нужно использовать в конкретных сценариях, конечно, это хороший выбор, когда сообщений источника данных не так много.

  • Второй способ немного сложнее в реализации, но он может адаптироваться к сценариям с большим количеством сообщений, zset Redis используется как эффект «мидлвара», а реализация функции, которая помогает нам задерживать, может лучше адаптироваться к сценариям с высоким параллелизмом.НедостаткиВ процессе написания необходимо учитывать множество практических факторов, таких как время цикла выполнения потока, отправка может быть отложена на определенное время, установка размера пакетных данных и скоро.

Подводя итог, это краткое изложение двух методов реализации моей реализации функции задержки на этот раз Какой метод использовать, необходимо выбрать в соответствии с реальной ситуацией, и я надеюсь, что он может вам помочь. ps: В связи с моими ограниченными техническими возможностями в статье могут быть неточные или неправильные технические описания.Пожалуйста, обратите внимание, что я немедленно исправлю это, чтобы не вводить всех в заблуждение, спасибо!