помещение
Предыдущая статьяRedis
упорядоченный наборSorted Set
и структура расписанияQuartz
Первая версия примера представляет собой простую отложенную задачу, но есть две относительно важные проблемы, которые не решены:
- Фрагментация.
- монитор.
Содержание этой статьи заключается в улучшении функций этих двух аспектов. Предварительная статья:Использование Redis для реализации отложенных задач (1).
Зачем нужен шардинг
Разместите здесь скрипт запросаdequeue.lua
Содержание:
-- 参考jesque的部分Lua脚本实现
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
if type == 'zset' then
local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
if list ~= nil and #list > 0 then
-- unpack函数能把table转化为可变参数
redis.call('ZREM', zset_key, unpack(list))
local result = redis.call('HMGET', hash_key, unpack(list))
redis.call('HDEL', hash_key, unpack(list))
return result
end
end
end
return nil
Этот скрипт использует всего четыре командыZREVRANGEBYSCORE
,ZREM
,HMGET
иHDEL
(TYPE
Сложность времени командования незначительна):
Заказ | временная сложность | Параметр Описание |
---|---|---|
ZREVRANGEBYSCORE |
O(log(N)+M) |
N общее количество элементов в отсортированном наборе,M количество возвращаемых элементов |
ZREM |
O(M*log(N)) |
N общее количество элементов в отсортированном наборе,M количество успешно удаленных элементов |
HMGET |
O(L) |
L количество успешно возвращенных доменов |
HDEL |
O(L) |
L количество доменов для удаления |
Далее нам нужно проанализировать сцену и конкретные параметры.Если в производственной среде общее количество элементов в заказанном наборе поддерживается на уровне 10 000 в час (то есть бизнес-объем составляет 10 000 заказов в час).Sorted Set
иHash
В то же время данные удаляются одновременно, то есть 5000 фрагментов данных, которые находятся в этих двух наборах в течение 30 минут, что является данными в приведенной выше таблице.N = 5000
.假设我们初步定义查询的LIMIT
Значение равно 100, что указано выше.M
Значение 100, при условии, чтоRedis
Время, затрачиваемое на каждую операционную единицу, просто считаетсяT
, затем проанализируйте время обработки 5000 данных:
серийный номер | установить кардинальность | ZREVRANGEBYSCORE |
ZREM |
HMGET |
HDEL |
---|---|---|---|---|---|
1 | 5000 |
log(5000T) + 100T |
log(5000T) * 100 |
100T |
100T |
2 | 4900 |
log(4900T) + 100T |
log(4900T) * 100 |
100T |
100T |
3 | 4800 |
log(4800T) + 100T |
log(4800T) * 100 |
100T |
100T |
... | ... | ... | ... | ... | ... |
Теоретически среди четырех команд, используемых скриптом,ZREM
Затраты времени команды самые большие, иZREVRANGEBYSCORE
иZREM
Функция временной сложностиM * log(N)
, тем самым контролируя кардинальность элемента множестваN
для уменьшенияLua
Время, необходимое для запуска сценария, полезно.
Фрагментация
проанализировано вышеdequeue.lua
Временная сложность , есть две готовые схемы шардинга:
- Вариант 1: одиночный
Redis
например, даSorted Set
иHash
Данные двух коллекций сегментированы. - Решение 2. На основе нескольких
Redis
Экземпляр (который может быть дозорным или кластером) реализует операцию сегментирования первого варианта осуществления.
Для простоты количество осколков в последнем примере (shardingCount
) рассчитан на 2, количество осколков в производстве должно быть настроено в соответствии с реальной ситуацией. По умолчанию используется длинное целое поле идентификатора пользователя.userId
Взять по модулю для шардинга, предполагая, что в тестовых данныхuserId
распределяется равномерно.
Общая сущность:
@Data
public class OrderMessage {
private String orderId;
private BigDecimal amount;
private Long userId;
private String timestamp;
}
Интерфейс очереди задержки:
public interface OrderDelayQueue {
void enqueue(OrderMessage message);
List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index);
List<OrderMessage> dequeue(int index);
String enqueueSha();
String dequeueSha();
}
Разделение одного экземпляра Redis
одинRedis
Разделение экземпляра относительно просто, схема выглядит следующим образом:
Напишите код реализации очереди следующим образом (Некоторые параметры жестко закодированы, только для справки, не копируйте их в продакшн):
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
private static final String MIN_SCORE = "0";
private static final String OFFSET = "0";
private static final String LIMIT = "10";
/**
* 分片数量
*/
private static final long SHARDING_COUNT = 2L;
private static final String ORDER_QUEUE_PREFIX = "ORDER_QUEUE_";
private static final String ORDER_DETAIL_QUEUE_PREFIX = "ORDER_DETAIL_QUEUE_";
private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
private final JedisProvider jedisProvider;
@Override
public void enqueue(OrderMessage message) {
List<String> args = Lists.newArrayList();
args.add(message.getOrderId());
args.add(String.valueOf(System.currentTimeMillis()));
args.add(message.getOrderId());
args.add(JSON.toJSONString(message));
List<String> keys = Lists.newArrayList();
long index = message.getUserId() % SHARDING_COUNT;
keys.add(ORDER_QUEUE_PREFIX + index);
keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
try (Jedis jedis = jedisProvider.provide()) {
jedis.evalsha(ENQUEUE_LUA_SHA.get(), keys, args);
}
}
@Override
public List<OrderMessage> dequeue(int index) {
// 30分钟之前
String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
}
@SuppressWarnings("unchecked")
@Override
public List<OrderMessage> dequeue(String min, String max, String offset, String limit, int index) {
List<String> args = new ArrayList<>();
args.add(min);
args.add(max);
args.add(offset);
args.add(limit);
List<OrderMessage> result = Lists.newArrayList();
List<String> keys = Lists.newArrayList();
keys.add(ORDER_QUEUE_PREFIX + index);
keys.add(ORDER_DETAIL_QUEUE_PREFIX + index);
try (Jedis jedis = jedisProvider.provide()) {
List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), keys, args);
if (null != eval) {
for (String e : eval) {
result.add(JSON.parseObject(e, OrderMessage.class));
}
}
}
return result;
}
@Override
public String enqueueSha() {
return ENQUEUE_LUA_SHA.get();
}
@Override
public String dequeueSha() {
return DEQUEUE_LUA_SHA.get();
}
@Override
public void afterPropertiesSet() throws Exception {
// 加载Lua脚本
loadLuaScript();
}
private void loadLuaScript() throws Exception {
try (Jedis jedis = jedisProvider.provide()) {
ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
String sha = jedis.scriptLoad(luaContent);
ENQUEUE_LUA_SHA.compareAndSet(null, sha);
resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
sha = jedis.scriptLoad(luaContent);
DEQUEUE_LUA_SHA.compareAndSet(null, sha);
}
}
}
Потребители реализуют регулярные задачи следующим образом:
DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
private static final AtomicInteger COUNTER = new AtomicInteger();
/**
* 初始化业务线程池
*/
private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
return thread;
});
@Autowired
private OrderDelayQueue orderDelayQueue;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 这里为了简单起见,分片的下标暂时使用Quartz的任务执行上下文存放
int shardingIndex = context.getMergedJobDataMap().getInt("shardingIndex");
LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]...", shardingIndex);
List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
if (null != dequeue) {
final CountDownLatch latch = new CountDownLatch(1);
BUSINESS_WORKER_POOL.execute(new ConsumeTask(latch, dequeue, shardingIndex));
try {
latch.await();
} catch (InterruptedException ignore) {
//ignore
}
}
LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]...", shardingIndex);
}
@RequiredArgsConstructor
private static class ConsumeTask implements Runnable {
private final CountDownLatch latch;
private final List<OrderMessage> messages;
private final int shardingIndex;
@Override
public void run() {
try {
for (OrderMessage message : messages) {
LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}", shardingIndex, JSON.toJSONString(message));
// 模拟耗时
TimeUnit.MILLISECONDS.sleep(50);
}
} catch (Exception ignore) {
} finally {
latch.countDown();
}
}
}
}
Запускать задачи по времени и записывать тестовые данныеCommandLineRunner
Реализация выглядит следующим образом:
@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {
@Autowired
private Scheduler scheduler;
@Autowired
private JedisProvider jedisProvider;
@Override
public void run(String... args) throws Exception {
int shardingCount = 2;
// 准备测试数据
prepareOrderMessageData(shardingCount);
for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
}
}
private void prepareOrderMessageData(int shardingCount) throws Exception {
DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
try (Jedis jedis = jedisProvider.provide()) {
List<OrderMessage> messages = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
OrderMessage message = new OrderMessage();
message.setAmount(BigDecimal.valueOf(i));
message.setOrderId("ORDER_ID_" + i);
message.setUserId((long) i);
message.setTimestamp(LocalDateTime.now().format(f));
messages.add(message);
}
for (OrderMessage message : messages) {
// 30分钟前
Double score = Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
long index = message.getUserId() % shardingCount;
jedis.hset("ORDER_DETAIL_QUEUE_" + index, message.getOrderId(), JSON.toJSONString(message));
jedis.zadd("ORDER_QUEUE_" + index, score, message.getOrderId());
}
}
}
private List<ConsumerTask> prepareConsumerTasks(int shardingCount) {
List<ConsumerTask> tasks = Lists.newArrayList();
for (int i = 0; i < shardingCount; i++) {
JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
.withIdentity("OrderMessageConsumer-" + i, "DelayTask")
.usingJobData("shardingIndex", i)
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
.build();
tasks.add(new ConsumerTask(jobDetail, trigger));
}
return tasks;
}
@Getter
@RequiredArgsConstructor
private static class ConsumerTask {
private final JobDetail jobDetail;
private final Trigger trigger;
}
}
Запустите приложение, и вывод будет следующим:
2019-08-28 00:13:20.648 INFO 50248 --- [ main] c.t.s.s.NoneJdbcSpringApplication : Started NoneJdbcSpringApplication in 1.35 seconds (JVM running for 5.109)
2019-08-28 00:13:20.780 INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[0]...
2019-08-28 00:13:20.781 INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[1]...
2019-08-28 00:13:20.788 INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","timestamp":"2019-08-28 00:13:20.657","userId":99}
2019-08-28 00:13:20.788 INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","timestamp":"2019-08-28 00:13:20.657","userId":98}
2019-08-28 00:13:20.840 INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":97,"orderId":"ORDER_ID_97","timestamp":"2019-08-28 00:13:20.657","userId":97}
2019-08-28 00:13:20.840 INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":96,"orderId":"ORDER_ID_96","timestamp":"2019-08-28 00:13:20.657","userId":96}
// ... 省略大量输出
2019-08-28 00:13:21.298 INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[0]...
2019-08-28 00:13:21.298 INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[1]...
// ... 省略大量输出
Разделение нескольких экземпляров Redis
одинRedis
На самом деле существует проблема с сегментированием экземпляров, т.е.Redis
Экземпляр всегда обрабатывает команды клиента в одном потоке, даже если клиент выполняется несколькими потоками.Redis
Команда, схема выглядит следующим образом:
В этом случае, хотя сегментирование снижаетLua
сложность скриптовых команд, ноRedis
Модель обработки команд для (один поток) Также может стать еще одним узкими местами проблем. Поэтому вы можете рассмотреть, основываясь на несколькихRedis
Экземпляры карены.
Здесь для простоты два одноточечныхRedis
Примеры — это примеры кодирования. код показывает, как показано ниже:
// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {
private final Map<Long, JedisPool> pools = Maps.newConcurrentMap();
private JedisPool defaultPool;
@Override
public void afterPropertiesSet() throws Exception {
JedisPool pool = new JedisPool("localhost");
defaultPool = pool;
pools.put(0L, pool);
// 这个是虚拟机上的redis实例
pool = new JedisPool("192.168.56.200");
pools.put(1L, pool);
}
public Jedis provide(Long index) {
return pools.getOrDefault(index, defaultPool).getResource();
}
}
// 订单消息
@Data
public class OrderMessage {
private String orderId;
private BigDecimal amount;
private Long userId;
}
// 订单延时队列接口
public interface OrderDelayQueue {
void enqueue(OrderMessage message);
List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index);
List<OrderMessage> dequeue(long index);
String enqueueSha(long index);
String dequeueSha(long index);
}
// 延时队列实现
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
private static final String MIN_SCORE = "0";
private static final String OFFSET = "0";
private static final String LIMIT = "10";
private static final long SHARDING_COUNT = 2L;
private static final String ORDER_QUEUE = "ORDER_QUEUE";
private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
private static final ConcurrentMap<Long, String> ENQUEUE_LUA_SHA = Maps.newConcurrentMap();
private static final ConcurrentMap<Long, String> DEQUEUE_LUA_SHA = Maps.newConcurrentMap();
private final JedisProvider jedisProvider;
@Override
public void enqueue(OrderMessage message) {
List<String> args = Lists.newArrayList();
args.add(message.getOrderId());
args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
args.add(message.getOrderId());
args.add(JSON.toJSONString(message));
List<String> keys = Lists.newArrayList();
long index = message.getUserId() % SHARDING_COUNT;
keys.add(ORDER_QUEUE);
keys.add(ORDER_DETAIL_QUEUE);
try (Jedis jedis = jedisProvider.provide(index)) {
jedis.evalsha(ENQUEUE_LUA_SHA.get(index), keys, args);
}
}
@Override
public List<OrderMessage> dequeue(long index) {
// 30分钟之前
String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index);
}
@SuppressWarnings("unchecked")
@Override
public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
List<String> args = new ArrayList<>();
args.add(min);
args.add(max);
args.add(offset);
args.add(limit);
List<OrderMessage> result = Lists.newArrayList();
List<String> keys = Lists.newArrayList();
keys.add(ORDER_QUEUE);
keys.add(ORDER_DETAIL_QUEUE);
try (Jedis jedis = jedisProvider.provide(index)) {
List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
if (null != eval) {
for (String e : eval) {
result.add(JSON.parseObject(e, OrderMessage.class));
}
}
}
return result;
}
@Override
public String enqueueSha(long index) {
return ENQUEUE_LUA_SHA.get(index);
}
@Override
public String dequeueSha(long index) {
return DEQUEUE_LUA_SHA.get(index);
}
@Override
public void afterPropertiesSet() throws Exception {
// 加载Lua脚本
loadLuaScript();
}
private void loadLuaScript() throws Exception {
for (long i = 0; i < SHARDING_COUNT; i++) {
try (Jedis jedis = jedisProvider.provide(i)) {
ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
String sha = jedis.scriptLoad(luaContent);
ENQUEUE_LUA_SHA.put(i, sha);
resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
sha = jedis.scriptLoad(luaContent);
DEQUEUE_LUA_SHA.put(i, sha);
}
}
}
}
// 消费者
public class OrderMessageConsumer implements Job {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);
private static final AtomicInteger COUNTER = new AtomicInteger();
// 初始化业务线程池
private final ExecutorService businessWorkerPool = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
return thread;
});
@Autowired
private OrderDelayQueue orderDelayQueue;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
long shardingIndex = context.getMergedJobDataMap().getLong("shardingIndex");
LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]...", shardingIndex);
List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex);
if (null != dequeue) {
// 这里的倒数栅栏,在线程池资源充足的前提下可以去掉
final CountDownLatch latch = new CountDownLatch(1);
businessWorkerPool.execute(new ConsumeTask(latch, dequeue, shardingIndex));
try {
latch.await();
} catch (InterruptedException ignore) {
//ignore
}
}
LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]...", shardingIndex);
}
@RequiredArgsConstructor
private static class ConsumeTask implements Runnable {
private final CountDownLatch latch;
private final List<OrderMessage> messages;
private final long shardingIndex;
@Override
public void run() {
try {
for (OrderMessage message : messages) {
LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}", shardingIndex, JSON.toJSONString(message));
// 模拟处理耗时50毫秒
TimeUnit.MILLISECONDS.sleep(50);
}
} catch (Exception ignore) {
} finally {
latch.countDown();
}
}
}
}
// 配置
@Configuration
public class QuartzConfiguration {
@Bean
public AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory() {
return new AutowiredSupportQuartzJobFactory();
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setSchedulerName("RamScheduler");
factory.setAutoStartup(true);
factory.setJobFactory(autowiredSupportQuartzJobFactory);
return factory;
}
public static class AutowiredSupportQuartzJobFactory extends AdaptableJobFactory implements BeanFactoryAware {
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
}
@Override
protected Object createJobInstance(@Nonnull TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
autowireCapableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
}
// CommandLineRunner
@Component
public class QuartzJobStartCommandLineRunner implements CommandLineRunner {
@Autowired
private Scheduler scheduler;
@Autowired
private JedisProvider jedisProvider;
@Override
public void run(String... args) throws Exception {
long shardingCount = 2;
prepareData(shardingCount);
for (ConsumerTask task : prepareConsumerTasks(shardingCount)) {
scheduler.scheduleJob(task.getJobDetail(), task.getTrigger());
}
}
private void prepareData(long shardingCount) {
for (long i = 0L; i < shardingCount; i++) {
Map<String, Double> z = Maps.newHashMap();
Map<String, String> h = Maps.newHashMap();
for (int k = 0; k < 100; k++) {
OrderMessage message = new OrderMessage();
message.setAmount(BigDecimal.valueOf(k));
message.setUserId((long) k);
message.setOrderId("ORDER_ID_" + k);
// 30 min ago
z.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
h.put(message.getOrderId(), JSON.toJSONString(message));
}
Jedis jedis = jedisProvider.provide(i);
jedis.hmset("ORDER_DETAIL_QUEUE", h);
jedis.zadd("ORDER_QUEUE", z);
}
}
private List<ConsumerTask> prepareConsumerTasks(long shardingCount) {
List<ConsumerTask> tasks = Lists.newArrayList();
for (long i = 0; i < shardingCount; i++) {
JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class)
.withIdentity("OrderMessageConsumer-" + i, "DelayTask")
.usingJobData("shardingIndex", i)
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
.build();
tasks.add(new ConsumerTask(jobDetail, trigger));
}
return tasks;
}
@Getter
@RequiredArgsConstructor
private static class ConsumerTask {
private final JobDetail jobDetail;
private final Trigger trigger;
}
}
Добавьте функцию запуска и запустите ее, вывод консоли будет следующим:
// ...省略大量输出
2019-09-01 14:08:27.664 INFO 13056 --- [ main] c.t.multi.NoneJdbcSpringApplication : Started NoneJdbcSpringApplication in 1.333 seconds (JVM running for 5.352)
2019-09-01 14:08:27.724 INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[1]...
2019-09-01 14:08:27.724 INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[0]...
2019-09-01 14:08:27.732 INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99}
2019-09-01 14:08:27.732 INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99}
2019-09-01 14:08:27.782 INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
2019-09-01 14:08:27.782 INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98}
// ...省略大量输出
2019-09-01 14:08:28.239 INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[1]...
2019-09-01 14:08:28.240 INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[0]...
// ...省略大量输出
следует избегать в производствеRedis
Единая точка обслуживания, обычно используемаяSentinel с древовидным методом развертывания master-slave (см. «Разработка, эксплуатация и обслуживание Redis»),2 комплектаRedis
Схема развертывания дозорного выглядит следующим образом:
Какие элементы мониторинга необходимы
Нам нужно знать в относительном реальном времениRedis
Сколько данных о невыполненной работе находится в очереди задержки, сколько времени требуется, чтобы выйти из очереди, и другие параметры элемента мониторинга, чтобы мы могли лучше знать, нормально ли работает модуль очереди задержки, есть ли узкое место в производительности и так далее. Конкретные элементы мониторинга необходимо настраивать по мере необходимости.Здесь для удобства примера отслеживаются только два элемента мониторинга:
- отсортированный набор
Sorted Set
Количество элементов в бэклоге. - каждый звонок
dequeue.lua
кропотливый.
Он принимает метод представления данных приложением в реальном времени, который зависит отspring-boot-starter-actuator
,Prometheus
,Grafana
Система мониторинга построена, если вы не знакомы с этой системой, вы можете прочитать две предварительные статьи:
- Фреймворк измерения приложений JVM Боевой микрометр
- Мониторинг различных показателей пула потоков в режиме реального времени с помощью микрометра
монитор
Импорт зависимостей:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.2.0</version>
</dependency>
выбрать здесьGauge
изMeter
Собирайте данные мониторинга и добавляйте классы мониторингаOrderDelayQueueMonitor
:.
// OrderDelayQueueMonitor
@Component
public class OrderDelayQueueMonitor implements InitializingBean {
private static final long SHARDING_COUNT = 2L;
private final ConcurrentMap<Long, AtomicLong> remain = Maps.newConcurrentMap();
private final ConcurrentMap<Long, AtomicLong> lua = Maps.newConcurrentMap();
private ScheduledExecutorService executor;
@Autowired
private JedisProvider jedisProvider;
@Override
public void afterPropertiesSet() throws Exception {
executor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "OrderDelayQueueMonitor");
thread.setDaemon(true);
return thread;
});
for (long i = 0L; i < SHARDING_COUNT; i++) {
AtomicLong l = new AtomicLong();
Metrics.gauge("order.delay.queue.lua.cost", Collections.singleton(Tag.of("index", String.valueOf(i))),
l, AtomicLong::get);
lua.put(i, l);
AtomicLong r = new AtomicLong();
Metrics.gauge("order.delay.queue.remain", Collections.singleton(Tag.of("index", String.valueOf(i))),
r, AtomicLong::get);
remain.put(i, r);
}
// 每五秒上报一次集合中的剩余数据
executor.scheduleWithFixedDelay(new MonitorTask(jedisProvider), 0, 5, TimeUnit.SECONDS);
}
public void recordRemain(Long index, long count) {
remain.get(index).set(count);
}
public void recordLuaCost(Long index, long count) {
lua.get(index).set(count);
}
@RequiredArgsConstructor
private class MonitorTask implements Runnable {
private final JedisProvider jedisProvider;
@Override
public void run() {
for (long i = 0L; i < SHARDING_COUNT; i++) {
try (Jedis jedis = jedisProvider.provide(i)) {
recordRemain(i, jedis.zcount("ORDER_QUEUE", "-inf", "+inf"));
}
}
}
}
}
оригинальныйRedisOrderDelayQueue#dequeue()
Сделайте доработку:
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {
// ... 省略没有改动的代码
private final OrderDelayQueueMonitor orderDelayQueueMonitor;
// ... 省略没有改动的代码
@Override
public List<OrderMessage> dequeue(String min, String max, String offset, String limit, long index) {
List<String> args = new ArrayList<>();
args.add(min);
args.add(max);
args.add(offset);
args.add(limit);
List<OrderMessage> result = Lists.newArrayList();
List<String> keys = Lists.newArrayList();
keys.add(ORDER_QUEUE);
keys.add(ORDER_DETAIL_QUEUE);
try (Jedis jedis = jedisProvider.provide(index)) {
long start = System.nanoTime();
List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args);
long end = System.nanoTime();
// 添加dequeue的耗时监控-单位微秒
orderDelayQueueMonitor.recordLuaCost(index, TimeUnit.NANOSECONDS.toMicros(end - start));
if (null != eval) {
for (String e : eval) {
result.add(JSON.parseObject(e, OrderMessage.class));
}
}
}
return result;
}
// ... 省略没有改动的代码
}
Другие конфигурации кратко упомянуты здесь.
application.yaml
открытьprometheus
Доступ к конечным точкам:
server:
port: 9091
management:
endpoints:
web:
exposure:
include: 'prometheus'
Prometheus
Конфигурация службы минимизирует интервал запроса, ориентировочно установленный на 5 секунд:
# my global config
global:
scrape_interval: 5s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
# Alertmanager configuration
alerting:
alertmanagers:
- static_configs:
- targets:
# - alertmanager:9093
# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first_rules.yml"
# - "second_rules.yml"
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
metrics_path: '/actuator/prometheus'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ['localhost:9091']
Grafana
Основные элементы конфигурации следующие:
出队耗时 order_delay_queue_lua_cost 分片编号-{{index}}
订单延时队列积压量 order_delay_queue_remain 分片编号-{{index}}
наконец может бытьGrafana
Конфигурация обновляется каждые 5 секунд, и эффект следующий:
Элементы мониторинга здесь нужно чаще настраивать по запросу, честно говоря, работа по мониторингу зачастую самая сложная и громоздкая.
резюме
Полный текст представляет собой относительно подробное введение, основанное наRedis
Реализован конкретный процесс реализации фрагментации и мониторинга отложенных задач.Основной код приведен только для справки, и есть некоторые специфические детали, такие какPrometheus
,Grafana
Некоторые приложения , здесь не будут подробно раскрыты из-за нехватки места. Честно говоря, сделать выбор промежуточного ПО и архитектуры исходя из реальных сценариев — дело непростое, и зачастую первоначальная реализация не является самой большой сложностью, а большей проблемой является оптимизация и мониторинг потом.
Приложение
-
Markdown
Оригинал:GitHub.com/Zhaojun из розетки жирным шрифтом/Нет... - Страница на гитхабе:woohoo.throwable.club/2019/09/01/…
- Кодовая страница:кинутьable.coding.what/2019/09/01/…
(Конец этой статьи c-3-d 20190901 Я плохо себя чувствую и некоторое время волочусь)
Технический публичный аккаунт ("Throwable Digest"), который время от времени выкладывает оригинальные технические статьи автора (никогда не занимайтесь плагиатом и не перепечатывайте):