Использование Redis для реализации отложенных задач (2)

Java

помещение

Предыдущая статьяRedisупорядоченный наборSorted Setи структура расписанияQuartzПервая версия примера представляет собой простую отложенную задачу, но есть две относительно важные проблемы, которые не решены:

  1. Фрагментация.
  2. монитор.

Содержание этой статьи заключается в улучшении функций этих двух аспектов. Предварительная статья:Использование Redis для реализации отложенных задач (1).

Зачем нужен шардинг

Разместите здесь скрипт запросаdequeue.luaСодержание:

-- 参考jesque的部分Lua脚本实现
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            -- unpack函数能把table转化为可变参数
            redis.call('ZREM', zset_key, unpack(list))
            local result = redis.call('HMGET', hash_key, unpack(list))
            redis.call('HDEL', hash_key, unpack(list))
            return result
        end
    end
end
return nil

Этот скрипт использует всего четыре командыZREVRANGEBYSCORE,ZREM,HMGETиHDEL(TYPEСложность времени командования незначительна):

Заказ временная сложность Параметр Описание
ZREVRANGEBYSCORE O(log(N)+M) Nобщее количество элементов в отсортированном наборе,Mколичество возвращаемых элементов
ZREM O(M*log(N)) Nобщее количество элементов в отсортированном наборе,Mколичество успешно удаленных элементов
HMGET O(L) Lколичество успешно возвращенных доменов
HDEL O(L) Lколичество доменов для удаления

Далее нам нужно проанализировать сцену и конкретные параметры.Если в производственной среде общее количество элементов в заказанном наборе поддерживается на уровне 10 000 в час (то есть бизнес-объем составляет 10 000 заказов в час).Sorted SetиHashВ то же время данные удаляются одновременно, то есть 5000 фрагментов данных, которые находятся в этих двух наборах в течение 30 минут, что является данными в приведенной выше таблице.N = 5000.假设我们初步定义查询的LIMITЗначение равно 100, что указано выше.MЗначение 100, при условии, чтоRedisВремя, затрачиваемое на каждую операционную единицу, просто считаетсяT, затем проанализируйте время обработки 5000 данных:

серийный номер установить кардинальность ZREVRANGEBYSCORE ZREM HMGET HDEL
1 5000 log(5000T) + 100T log(5000T) * 100 100T 100T
2 4900 log(4900T) + 100T log(4900T) * 100 100T 100T
3 4800 log(4800T) + 100T log(4800T) * 100 100T 100T
... ... ... ... ... ...

Теоретически среди четырех команд, используемых скриптом,ZREMЗатраты времени команды самые большие, иZREVRANGEBYSCOREиZREMФункция временной сложностиM * log(N), тем самым контролируя кардинальность элемента множестваNдля уменьшенияLuaВремя, необходимое для запуска сценария, полезно.

Фрагментация

проанализировано вышеdequeue.luaВременная сложность , есть две готовые схемы шардинга:

  • Вариант 1: одиночныйRedisнапример, даSorted SetиHashДанные двух коллекций сегментированы.
  • Решение 2. На основе несколькихRedisЭкземпляр (который может быть дозорным или кластером) реализует операцию сегментирования первого варианта осуществления.

Для простоты количество осколков в последнем примере (shardingCount) рассчитан на 2, количество осколков в производстве должно быть настроено в соответствии с реальной ситуацией. По умолчанию используется длинное целое поле идентификатора пользователя.userIdВзять по модулю для шардинга, предполагая, что в тестовых данныхuserIdраспределяется равномерно.

Общая сущность:

@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
    private String timestamp;
}

Интерфейс очереди задержки:

public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index);

    List<OrderMessage> dequeue(int index);

    String enqueueSha();

    String dequeueSha();
}

Разделение одного экземпляра Redis

одинRedisРазделение экземпляра относительно просто, схема выглядит следующим образом:

Напишите код реализации очереди следующим образом (Некоторые параметры жестко закодированы, только для справки, не копируйте их в продакшн):

@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {

    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    /**
     * 分片数量
     */
    private static final long SHARDING_COUNT = 2L;
    private static final String ORDER_QUEUE_PREFIX = "ORDER_QUEUE_";
    private static final String ORDER_DETAIL_QUEUE_PREFIX = "ORDER_DETAIL_QUEUE_";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();

    private final JedisProvider jedisProvider;

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        List<String> keys = Lists.newArrayList();
        long index = message.getUserId() % SHARDING_COUNT;
        keys.add(ORDER_QUEUE_PREFIX + index);
        keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
        try (Jedis jedis = jedisProvider.provide()) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(), keys, args);
        }
    }

    @Override
    public List<OrderMessage> dequeue(int index) {
        // 30分钟之前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE_PREFIX + index);
        keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
        try (Jedis jedis = jedisProvider.provide()) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), keys, args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    }

    @Override
    public String enqueueSha() {
        return ENQUEUE_LUA_SHA.get();
    }

    @Override
    public String dequeueSha() {
        return DEQUEUE_LUA_SHA.get();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 加载Lua脚本
        loadLuaScript();
    }

    private void loadLuaScript() throws Exception {
        try (Jedis jedis = jedisProvider.provide()) {
            ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
            String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            String sha = jedis.scriptLoad(luaContent);
            ENQUEUE_LUA_SHA.compareAndSet(null, sha);
            resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
            luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            sha = jedis.scriptLoad(luaContent);
            DEQUEUE_LUA_SHA.compareAndSet(null, sha);
        }
    }
}

Потребители реализуют регулярные задачи следующим образом:

DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    private static final AtomicInteger COUNTER = new AtomicInteger();
    /**
     * 初始化业务线程池
     */
    private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // 这里为了简单起见,分片的下标暂时使用Quartz的任务执行上下文存放
        int shardingIndex = context.getMergedJobDataMap().getInt("shardingIndex");
        LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]...", shardingIndex);
        List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
        if (null != dequeue) {
            final CountDownLatch latch = new CountDownLatch(1);
            BUSINESS_WORKER_POOL.execute(new ConsumeTask(latch, dequeue, shardingIndex));
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]...", shardingIndex);
    }

    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final CountDownLatch latch;
        private final List<OrderMessage> messages;
        private final int shardingIndex;

        @Override
        public void run() {
            try {
                for (OrderMessage message : messages) {
                    LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}", shardingIndex, JSON.toJSONString(message));
                    // 模拟耗时
                    TimeUnit.MILLISECONDS.sleep(50);
                }
            } catch (Exception ignore) {
            } finally {
                latch.countDown();
            }
        }
    }
}

Запускать задачи по времени и записывать тестовые данныеCommandLineRunnerРеализация выглядит следующим образом:

@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void run(String... args) throws Exception {
        int shardingCount = 2;
        // 准备测试数据
        prepareOrderMessageData(shardingCount);
        for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
            scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
        }
    }

    private void prepareOrderMessageData(int shardingCount) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        try (Jedis jedis = jedisProvider.provide()) {
            List<OrderMessage> messages = Lists.newArrayList();
            for (int i = 0; i < 100; i++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(i));
                message.setOrderId("ORDER_ID_" + i);
                message.setUserId((long) i);
                message.setTimestamp(LocalDateTime.now().format(f));
                messages.add(message);
            }
            for (OrderMessage message : messages) {
                // 30分钟前
                Double score = Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
                long index = message.getUserId() % shardingCount;
                jedis.hset("ORDER_DETAIL_QUEUE_" + index, message.getOrderId(), JSON.toJSONString(message));
                jedis.zadd("ORDER_QUEUE_" + index, score, message.getOrderId());
            }
        }
    }

    private List<ConsumerTask> prepareConsumerTasks(int shardingCount) {
        List<ConsumerTask> tasks = Lists.newArrayList();
        for (int i = 0; i < shardingCount; i++) {
            JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
                    .withIdentity("OrderMessageConsumer-" + i, "DelayTask")
                    .usingJobData("shardingIndex", i)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
                    .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                    .build();
            tasks.add(new ConsumerTask(jobDetail, trigger));
        }
        return tasks;
    }

    @Getter
    @RequiredArgsConstructor
    private static class ConsumerTask {

        private final JobDetail jobDetail;
        private final Trigger trigger;
    }
}

Запустите приложение, и вывод будет следующим:

2019-08-28 00:13:20.648  INFO 50248 --- [           main] c.t.s.s.NoneJdbcSpringApplication        : Started NoneJdbcSpringApplication in 1.35 seconds (JVM running for 5.109)
2019-08-28 00:13:20.780  INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务开始执行,shardingIndex:[0]...
2019-08-28 00:13:20.781  INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务开始执行,shardingIndex:[1]...
2019-08-28 00:13:20.788  INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","timestamp":"2019-08-28 00:13:20.657","userId":99}
2019-08-28 00:13:20.788  INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","timestamp":"2019-08-28 00:13:20.657","userId":98}
2019-08-28 00:13:20.840  INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[1],处理订单消息,内容:{"amount":97,"orderId":"ORDER_ID_97","timestamp":"2019-08-28 00:13:20.657","userId":97}
2019-08-28 00:13:20.840  INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer      : shardingIndex:[0],处理订单消息,内容:{"amount":96,"orderId":"ORDER_ID_96","timestamp":"2019-08-28 00:13:20.657","userId":96}
// ... 省略大量输出
2019-08-28 00:13:21.298  INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务执行完毕,shardingIndex:[0]...
2019-08-28 00:13:21.298  INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer      : 订单消息消费者定时任务执行完毕,shardingIndex:[1]...
// ... 省略大量输出

Разделение нескольких экземпляров Redis

одинRedisНа самом деле существует проблема с сегментированием экземпляров, т.е.RedisЭкземпляр всегда обрабатывает команды клиента в одном потоке, даже если клиент выполняется несколькими потоками.RedisКоманда, схема выглядит следующим образом:

В этом случае, хотя сегментирование снижаетLuaсложность скриптовых команд, ноRedisМодель обработки команд для (один поток) Также может стать еще одним узкими местами проблем. Поэтому вы можете рассмотреть, основываясь на несколькихRedisЭкземпляры карены.

Здесь для простоты два одноточечныхRedisПримеры — это примеры кодирования. код показывает, как показано ниже:

// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {

    private final Map<Long, JedisPool> pools = Maps.newConcurrentMap();
    private JedisPool defaultPool;

    @Override
    public void afterPropertiesSet() throws Exception {
        JedisPool pool = new JedisPool("localhost");
        defaultPool = pool;
        pools.put(0L, pool);
        // 这个是虚拟机上的redis实例
        pool = new JedisPool("192.168.56.200");
        pools.put(1L, pool);
    }

    public Jedis provide(Long index) {
        return pools.getOrDefault(index, defaultPool).getResource();
    }
}

// 订单消息
@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
}

// 订单延时队列接口
public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index);

    List<OrderMessage> dequeue(long index);

    String enqueueSha(long index);

    String dequeueSha(long index);
}

// 延时队列实现
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {

    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    private static final long SHARDING_COUNT = 2L;
    private static final String ORDER_QUEUE = "ORDER_QUEUE";
    private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final ConcurrentMap<Long, String> ENQUEUE_LUA_SHA = Maps.newConcurrentMap();
    private static final ConcurrentMap<Long, String> DEQUEUE_LUA_SHA = Maps.newConcurrentMap();

    private final JedisProvider jedisProvider;

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        List<String> keys = Lists.newArrayList();
        long index = message.getUserId() % SHARDING_COUNT;
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(index), keys, args);
        }
    }

    @Override
    public List<OrderMessage> dequeue(long index) {
        // 30分钟之前
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    }

    @Override
    public String enqueueSha(long index) {
        return ENQUEUE_LUA_SHA.get(index);
    }

    @Override
    public String dequeueSha(long index) {
        return DEQUEUE_LUA_SHA.get(index);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 加载Lua脚本
        loadLuaScript();
    }

    private void loadLuaScript() throws Exception {
        for (long i = 0; i < SHARDING_COUNT; i++) {
            try (Jedis jedis = jedisProvider.provide(i)) {
                ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
                String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
                String sha = jedis.scriptLoad(luaContent);
                ENQUEUE_LUA_SHA.put(i, sha);
                resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
                luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
                sha = jedis.scriptLoad(luaContent);
                DEQUEUE_LUA_SHA.put(i, sha);
            }
        }
    }
}

// 消费者
public class OrderMessageConsumer implements Job {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
    private static final AtomicInteger COUNTER = new AtomicInteger();
    // 初始化业务线程池
    private final ExecutorService businessWorkerPool = Executors.newSingleThreadExecutor(r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        long shardingIndex = context.getMergedJobDataMap().getLong("shardingIndex");
        LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]...", shardingIndex);
        List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
        if (null != dequeue) {
            // 这里的倒数栅栏,在线程池资源充足的前提下可以去掉
            final CountDownLatch latch = new CountDownLatch(1);
            businessWorkerPool.execute(new ConsumeTask(latch, dequeue, shardingIndex));
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]...", shardingIndex);
    }

    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final CountDownLatch latch;
        private final List<OrderMessage> messages;
        private final long shardingIndex;

        @Override
        public void run() {
            try {
                for (OrderMessage message : messages) {
                    LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}", shardingIndex, JSON.toJSONString(message));
                    // 模拟处理耗时50毫秒
                    TimeUnit.MILLISECONDS.sleep(50);
                }
            } catch (Exception ignore) {
            } finally {
                latch.countDown();
            }
        }
    }
}

// 配置
@Configuration
public class QuartzConfiguration {

    @Bean
    public AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory() {
        return new AutowiredSupportQuartzJobFactory();
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setSchedulerName("RamScheduler");
        factory.setAutoStartup(true);
        factory.setJobFactory(autowiredSupportQuartzJobFactory);
        return factory;
    }

    public static class AutowiredSupportQuartzJobFactory extends AdaptableJobFactory implements BeanFactoryAware {

        private AutowireCapableBeanFactory autowireCapableBeanFactory;

        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
        }

        @Override
        protected Object createJobInstance(@Nonnull TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            autowireCapableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }
}

// CommandLineRunner
@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void run(String... args) throws Exception {
        long shardingCount = 2;
        prepareData(shardingCount);
        for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
            scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
        }
    }

    private void prepareData(long shardingCount) {
        for (long i = 0L; i < shardingCount; i++) {
            Map<String, Double> z = Maps.newHashMap();
            Map<String, String> h = Maps.newHashMap();
            for (int k = 0; k < 100; k++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(k));
                message.setUserId((long) k);
                message.setOrderId("ORDER_ID_" + k);
                // 30 min ago
                z.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
                h.put(message.getOrderId(), JSON.toJSONString(message));
            }
            Jedis jedis = jedisProvider.provide(i);
            jedis.hmset("ORDER_DETAIL_QUEUE", h);
            jedis.zadd("ORDER_QUEUE", z);
        }
    }

    private List<ConsumerTask> prepareConsumerTasks(long shardingCount) {
        List<ConsumerTask> tasks = Lists.newArrayList();
        for (long i = 0; i < shardingCount; i++) {
            JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
                    .withIdentity("OrderMessageConsumer-" + i, "DelayTask")
                    .usingJobData("shardingIndex", i)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
                    .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                    .build();
            tasks.add(new ConsumerTask(jobDetail, trigger));
        }
        return tasks;
    }

    @Getter
    @RequiredArgsConstructor
    private static class ConsumerTask {

        private final JobDetail jobDetail;
        private final Trigger trigger;
    }
}

Добавьте функцию запуска и запустите ее, вывод консоли будет следующим:

// ...省略大量输出
2019-09-01 14:08:27.664  INFO 13056 --- [           main] c.t.multi.NoneJdbcSpringApplication      : Started NoneJdbcSpringApplication in 1.333 seconds (JVM running for 5.352)
2019-09-01 14:08:27.724  INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务开始执行,shardingIndex:[1]...
2019-09-01 14:08:27.724  INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务开始执行,shardingIndex:[0]...
2019-09-01 14:08:27.732  INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99}
2019-09-01 14:08:27.732  INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[0],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99}
2019-09-01 14:08:27.782  INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
2019-09-01 14:08:27.782  INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer   : shardingIndex:[1],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
// ...省略大量输出
2019-09-01 14:08:28.239  INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务执行完毕,shardingIndex:[1]...
2019-09-01 14:08:28.240  INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer   : 订单消息消费者定时任务执行完毕,shardingIndex:[0]...
// ...省略大量输出

следует избегать в производствеRedisЕдиная точка обслуживания, обычно используемаяSentinel с древовидным методом развертывания master-slave (см. «Разработка, эксплуатация и обслуживание Redis»),2 комплектаRedisСхема развертывания дозорного выглядит следующим образом:

Какие элементы мониторинга необходимы

Нам нужно знать в относительном реальном времениRedisСколько данных о невыполненной работе находится в очереди задержки, сколько времени требуется, чтобы выйти из очереди, и другие параметры элемента мониторинга, чтобы мы могли лучше знать, нормально ли работает модуль очереди задержки, есть ли узкое место в производительности и так далее. Конкретные элементы мониторинга необходимо настраивать по мере необходимости.Здесь для удобства примера отслеживаются только два элемента мониторинга:

  • отсортированный наборSorted SetКоличество элементов в бэклоге.
  • каждый звонокdequeue.luaкропотливый.

Он принимает метод представления данных приложением в реальном времени, который зависит отspring-boot-starter-actuator,Prometheus,GrafanaСистема мониторинга построена, если вы не знакомы с этой системой, вы можете прочитать две предварительные статьи:

монитор

Импорт зависимостей:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
    <version>1.2.0</version>
</dependency>

выбрать здесьGaugeизMeterСобирайте данные мониторинга и добавляйте классы мониторингаOrderDelayQueueMonitor:.

// OrderDelayQueueMonitor
@Component
public class OrderDelayQueueMonitor implements InitializingBean {

    private static final long SHARDING_COUNT = 2L;
    private final ConcurrentMap<Long, AtomicLong> remain = Maps.newConcurrentMap();
    private final ConcurrentMap<Long, AtomicLong> lua = Maps.newConcurrentMap();
    private ScheduledExecutorService executor;

    @Autowired
    private JedisProvider jedisProvider;

    @Override
    public void afterPropertiesSet() throws Exception {
        executor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, "OrderDelayQueueMonitor");
            thread.setDaemon(true);
            return thread;
        });
        for (long i = 0L; i < SHARDING_COUNT; i++) {
            AtomicLong l = new AtomicLong();
            Metrics.gauge("order.delay.queue.lua.cost", Collections.singleton(Tag.of("index", String.valueOf(i))),
                    l, AtomicLong::get);
            lua.put(i, l);
            AtomicLong r = new AtomicLong();
            Metrics.gauge("order.delay.queue.remain", Collections.singleton(Tag.of("index", String.valueOf(i))),
                    r, AtomicLong::get);
            remain.put(i, r);
        }
        // 每五秒上报一次集合中的剩余数据
        executor.scheduleWithFixedDelay(new MonitorTask(jedisProvider), 0, 5, TimeUnit.SECONDS);
    }

    public void recordRemain(Long index, long count) {
        remain.get(index).set(count);
    }

    public void recordLuaCost(Long index, long count) {
        lua.get(index).set(count);
    }

    @RequiredArgsConstructor
    private class MonitorTask implements Runnable {

        private final JedisProvider jedisProvider;

        @Override
        public void run() {
            for (long i = 0L; i < SHARDING_COUNT; i++) {
                try (Jedis jedis = jedisProvider.provide(i)) {
                    recordRemain(i, jedis.zcount("ORDER_QUEUE", "-inf", "+inf"));
                }
            }
        }
    }
}

оригинальныйRedisOrderDelayQueue#dequeue()Сделайте доработку:

@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
    
    // ... 省略没有改动的代码
    private final OrderDelayQueueMonitor orderDelayQueueMonitor;

    // ... 省略没有改动的代码

    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        List<String> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(ORDER_DETAIL_QUEUE);
        try (Jedis jedis = jedisProvider.provide(index)) {
            long start = System.nanoTime();
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
            long end = System.nanoTime();
            // 添加dequeue的耗时监控-单位微秒
            orderDelayQueueMonitor.recordLuaCost(index, TimeUnit.NANOSECONDS.toMicros(end - start));
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parseObject(e, OrderMessage.class));
                }
            }
        }
        return result;
    } 

    // ... 省略没有改动的代码

}      

Другие конфигурации кратко упомянуты здесь.

application.yamlоткрытьprometheusДоступ к конечным точкам:

server:
  port: 9091
management:
  endpoints:
    web:
      exposure:
        include: 'prometheus'

PrometheusКонфигурация службы минимизирует интервал запроса, ориентировочно установленный на 5 секунд:

# my global config
global:
  scrape_interval:     5s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'
    metrics_path: '/actuator/prometheus'
    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
    static_configs:
    - targets: ['localhost:9091']

GrafanaОсновные элементы конфигурации следующие:

出队耗时 order_delay_queue_lua_cost 分片编号-{{index}}
订单延时队列积压量 order_delay_queue_remain 分片编号-{{index}}

наконец может бытьGrafanaКонфигурация обновляется каждые 5 секунд, и эффект следующий:

Элементы мониторинга здесь нужно чаще настраивать по запросу, честно говоря, работа по мониторингу зачастую самая сложная и громоздкая.

резюме

Полный текст представляет собой относительно подробное введение, основанное наRedisРеализован конкретный процесс реализации фрагментации и мониторинга отложенных задач.Основной код приведен только для справки, и есть некоторые специфические детали, такие какPrometheus,GrafanaНекоторые приложения , здесь не будут подробно раскрыты из-за нехватки места. Честно говоря, сделать выбор промежуточного ПО и архитектуры исходя из реальных сценариев — дело непростое, и зачастую первоначальная реализация не является самой большой сложностью, а большей проблемой является оптимизация и мониторинг потом.

Приложение

(Конец этой статьи c-3-d 20190901 Я плохо себя чувствую и некоторое время волочусь)

Технический публичный аккаунт ("Throwable Digest"), который время от времени выкладывает оригинальные технические статьи автора (никогда не занимайтесь плагиатом и не перепечатывайте):