Нет публики:Маленькое кофейное шоу Java,Веб-сайт:javaxks.com
Автор: блог Гейла, ссылка:blog.CSDN.net/QQ330983778…
дизайн
Когда я раньше изучал Redis, я обнаружил, что команда Youzan ранее поделилась статьей о дизайне очереди задержки:Есть такие очереди с задержкойПопробуй это сейчас
Бизнес-процесс
Во-первых, давайте проанализируем процесс
- Пользователь отправляет задачу. Задача сначала помещается в очередь задержки.
- После того, как очередь задержки получает задачу, она сначала помещает задачу в пул заданий, а затем вычисляет время ее выполнения.
- Затем создайте отложенную задачу (содержащую только идентификатор задачи) и поместите ее в корзину.
- Компонент времени постоянно опрашивает каждую корзину и получает метаинформацию задачи из пула заданий, когда приходит время.
- Действительность задачи мониторинга считается пройденной, если она была удалена. Продолжить опрос. Посчитайте время еще раз, если задача легальна
- Если легально, рассчитайте время, если легально: поставьте задачу в соответствующую очередь готовности по теме, а затем удалите ее из корзины. Если время неверно, пересчитайте время и снова поместите его в ведро, а также удалите содержимое в предыдущем ведре.
- Потребитель опрашивает очередь готовности соответствующей темы. Сделайте свою собственную бизнес-логику после получения работы. При этом сервер пересчитывает время выполнения задания, полученного потребителем, в соответствии с его установленным TTR, и кладет его в корзину.
- После завершения потребления отправляется сообщение о завершении, и сервер удаляет соответствующую информацию в соответствии с идентификатором задания.
Пользовательский пул задач отложенная задача цикл времени отложенная задача отправить задачу отправить отложенную задачу задача опроса пришло время пользователь получает задачу установить время ожидания завершения, а затем сохранить ее в отложенной задаче задача время ожидания завершение задачи или удаление задачи обнаружена задача Нет очереди для удалить пользовательский пул задач, задержать задачу, отложить выполнение ожидающих задач
объект
Теперь мы можем понять несколько компонентов, которые существуют посередине.
- Очередь задержки, которая является очередью задержки Redis. реализовать передачу сообщений
- Пул заданий Пул заданий содержит метаинформацию о заданиях. Согласно описанию в статье используется структура данных с использованием K/V, ключ — это идентификатор, а значение — задание.
- Delay Bucket используется для хранения отложенных бизнес-задач. В статье описывается использование метода опроса, чтобы положить в Ведро, чтобы знать, что он не различается по теме.Лично здесь по умолчанию используется вставка заказа.
- Компонент времени таймера, отвечающий за сканирование каждого Bucket. Согласно описанию статьи, существует несколько таймеров, но один и тот же таймер может одновременно сканировать только одно ведро.
- Очередь готовности отвечает за хранение задач, которые необходимо выполнить, но, согласно описанию, существует несколько очередей готовности в зависимости от темы.
Среди них Timer отвечает за опрос, а Job pool, Delay Bucket и Ready Queue представляют собой наборы различных обязанностей.
статус задачи
- готово: исполняемое состояние,
- задержка: неисполняемое состояние, ожидание тактов.
- зарезервировано: было прочитано потребителем, но не завершено потребление.
- удалено: было использовано или удалено.
Внешний интерфейс
| интерфейс | описывать | данные |
|---|---|---|
| add | добавить задачу | Данные о работе |
| pop | Удалить отложенные задачи | тема - это группа задач |
| finish | Миссия выполнена | идентификатор задачи |
| delete | удалить задачу | идентификатор задачи |
дополнительный контент
- Прежде всего, согласно описанию состояния, операции завершения и удаления переводят задачу в удаленное состояние.
- В соответствии с операцией, описанной в статье, задача была удалена из метаданных при выполнении операции завершения или удаления.В это время удаленное состояние может существовать только в течение очень короткого времени, поэтому оно удаляется непосредственно в фактическом реализация.
- В статье не сказано, что делать, когда истекает время ожидания ответа, поэтому человек теперь помещает его обратно в очередь ожидания.
- Поскольку в статье используется кластер, блокировка setnx в Redis используется, чтобы гарантировать отсутствие повторяющихся циклов, когда несколько циклов времени обрабатывают несколько сегментов. Поскольку здесь реализована простая реализация, установить временную очередь для каждого сегмента очень просто. Это также для удобства простой обработки. Про распределенные блокировки вы можете посмотреть описание в моей предыдущей статье.
выполнить
Теперь мы заканчиваем дизайн в соответствии с содержанием дизайна. Мы завершаем этот дизайн в четыре шага
задачи и связанные объекты
В настоящее время требуются два объекта: один объект задачи (задание), а другой — объект, отвечающий за сохранение ссылки на задание (отложенное задание).
объект задачи
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {
/**
* 延迟任务的唯一标识,用于检索任务
*/
@JsonSerialize(using = ToStringSerializer.class)
private Long id;
/**
* 任务类型(具体业务类型)
*/
private String topic;
/**
* 任务的延迟时间
*/
private long delayTime;
/**
* 任务的执行超时时间
*/
private long ttrTime;
/**
* 任务具体的消息内容,用于处理具体业务逻辑用
*/
private String message;
/**
* 重试次数
*/
private int retryCount;
/**
* 任务状态
*/
private JobStatus status;
}
объект ссылки на задачу
@Data
@AllArgsConstructor
public class DelayJob implements Serializable {
/**
* 延迟任务的唯一标识
*/
private long jodId;
/**
* 任务的执行时间
*/
private long delayDate;
/**
* 任务类型(具体业务类型)
*/
private String topic;
public DelayJob(Job job) {
this.jodId = job.getId();
this.delayDate = System.currentTimeMillis() + job.getDelayTime();
this.topic = job.getTopic();
}
public DelayJob(Object value, Double score) {
this.jodId = Long.parseLong(String.valueOf(value));
this.delayDate = System.currentTimeMillis() + score.longValue();
}
}
контейнер
В настоящее время нам нужно завершить создание трех контейнеров: пула заданий, контейнера отложенных задач и контейнера отложенных задач.
Пул задач заданий обеспечивает базовые операции для общей структуры K/V.
@Component
@Slf4j
public class JobPool {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "job.pool";
private BoundHashOperations getPool () {
BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
return ops;
}
/**
* 添加任务
* @param job
*/
public void addJob (Job job) {
log.info("任务池添加任务:{}", JSON.toJSONString(job));
getPool().put(job.getId(),job);
return ;
}
/**
* 获得任务
* @param jobId
* @return
*/
public Job getJob(Long jobId) {
Object o = getPool().get(jobId);
if (o instanceof Job) {
return (Job) o;
}
return null;
}
/**
* 移除任务
* @param jobId
*/
public void removeDelayJob (Long jobId) {
log.info("任务池移除任务:{}",jobId);
// 移除任务
getPool().delete(jobId);
}
}
Откладывайте задачи, используйте сортируемый ZSet для сохранения данных, предоставляйте такие операции, как получение минимального значения
@Slf4j
@Component
public class DelayBucket {
@Autowired
private RedisTemplate redisTemplate;
private static AtomicInteger index = new AtomicInteger(0);
@Value("${thread.size}")
private int bucketsSize;
private List <String> bucketNames = new ArrayList <>();
@Bean
public List <String> createBuckets() {
for (int i = 0; i < bucketsSize; i++) {
bucketNames.add("bucket" + i);
}
return bucketNames;
}
/**
* 获得桶的名称
* @return
*/
private String getThisBucketName() {
int thisIndex = index.addAndGet(1);
int i1 = thisIndex % bucketsSize;
return bucketNames.get(i1);
}
/**
* 获得桶集合
* @param bucketName
* @return
*/
private BoundZSetOperations getBucket(String bucketName) {
return redisTemplate.boundZSetOps(bucketName);
}
/**
* 放入延时任务
* @param job
*/
public void addDelayJob(DelayJob job) {
log.info("添加延迟任务:{}", JSON.toJSONString(job));
String thisBucketName = getThisBucketName();
BoundZSetOperations bucket = getBucket(thisBucketName);
bucket.add(job,job.getDelayDate());
}
/**
* 获得最新的延期任务
* @return
*/
public DelayJob getFirstDelayTime(Integer index) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
if (CollectionUtils.isEmpty(set)) {
return null;
}
ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
Object value = typedTuple.getValue();
if (value instanceof DelayJob) {
return (DelayJob) value;
}
return null;
}
/**
* 移除延时任务
* @param index
* @param delayJob
*/
public void removeDelayTime(Integer index,DelayJob delayJob) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
bucket.remove(delayJob);
}
}
Задачи, которые необходимо выполнить, внутренне подразделяются по темам, и каждая тема соответствует набору списков.
@Component
@Slf4j
public class ReadyQueue {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "process.queue";
private String getKey(String topic) {
return NAME + topic;
}
/**
* 获得队列
* @param topic
* @return
*/
private BoundListOperations getQueue (String topic) {
BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
return ops;
}
/**
* 设置任务
* @param delayJob
*/
public void pushJob(DelayJob delayJob) {
log.info("执行队列添加任务:{}",delayJob);
BoundListOperations listOperations = getQueue(delayJob.getTopic());
listOperations.leftPush(delayJob);
}
/**
* 移除并获得任务
* @param topic
* @return
*/
public DelayJob popJob(String topic) {
BoundListOperations listOperations = getQueue(topic);
Object o = listOperations.leftPop();
if (o instanceof DelayJob) {
log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));
return (DelayJob) o;
}
return null;
}
}
обработка опроса
Настройте пул потоков, чтобы настроить операцию опроса для каждого сегмента.
@Component
public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {
@Autowired
private DelayBucket delayBucket;
@Autowired
private JobPool jobPool;
@Autowired
private ReadyQueue readyQueue;
@Value("${thread.size}")
private int length;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ExecutorService executorService = new ThreadPoolExecutor(
length,
length,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue <Runnable>());
for (int i = 0; i < length; i++) {
executorService.execute(
new DelayJobHandler(
delayBucket,
jobPool,
readyQueue,
i));
}
}
}
тестовый запрос
/**
* 测试用请求
* @author daify
* @date 2019-07-29 10:26
**/
@RestController
@RequestMapping("delay")
public class DelayController {
@Autowired
private JobService jobService;
/**
* 添加
* @param request
* @return
*/
@RequestMapping(value = "add",method = RequestMethod.POST)
public String addDefJob(Job request) {
DelayJob delayJob = jobService.addDefJob(request);
return JSON.toJSONString(delayJob);
}
/**
* 获取
* @return
*/
@RequestMapping(value = "pop",method = RequestMethod.GET)
public String getProcessJob(String topic) {
Job process = jobService.getProcessJob(topic);
return JSON.toJSONString(process);
}
/**
* 完成一个执行的任务
* @param jobId
* @return
*/
@RequestMapping(value = "finish",method = RequestMethod.DELETE)
public String finishJob(Long jobId) {
jobService.finishJob(jobId);
return "success";
}
@RequestMapping(value = "delete",method = RequestMethod.DELETE)
public String deleteJob(Long jobId) {
jobService.deleteJob(jobId);
return "success";
}
}
контрольная работа
Добавить отложенные задачи
Запрос через почтальона: localhost:8000/delay/add
На данный момент эта отложенная задача добавляется в пул потоков.
2019-08-12 21:21:36.589 INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
2019-08-12 21:21:36.609 INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
Согласно настройке, задача будет добавлена в ReadyQueue через 10 секунд.
2019-08-12 21:21:46.744 INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
получить задание
В это время мы запрашиваем localhost:8000/delay/pop
В это время задача отвечает, и ее время ожидания устанавливается при изменении состояния, а затем помещается в DelayBucket.
2019-08-09 19:36:02.342 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}
2019-08-09 19:36:02.364 INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}
2019-08-09 19:36:02.384 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}
По задумке, через 30 секунд, если задача не израсходована, она будет повторно помещена в ReadyQueue.
2019-08-12 21:21:48.239 INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:48.261 INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}
Удаление/потребление задач
Теперь мы запрашиваем: localhost:8000/delay/delete
В это время задача будет удалена из пула заданий, и метаданные больше не существуют, но задача все еще зацикливается в DelayBucket. Однако, когда обнаруживается, что метаданных больше нет в цикле, отложенная задача будут удалены. удалить.
2019-08-12 21:21:54.880 INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool : 任务池移除任务:3
2019-08-12 21:21:59.104 INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}
Адрес загрузки исходного кода, указанный в этой статье:git ee.com/Стихи о теле и ритме доктора Ю/…