Что такое отложенная задача
Отложенные задачи, как следует из названия, — это задачи, которые выполняются после задержки. Например, пусть у нас есть функция публикации информации.Операция должна публиковать информацию вовремя в 7:00 каждое утро, но еще не все ушли на работу в 7:00 утра.В это время мы можем использовать отсроченная задача реализовать отсроченное высвобождение информации. Если вы назначите время для отправки информации на следующий день, прежде чем уйти с работы накануне, информация будет отправлена вовремя в назначенное время на следующий день. Если у вас есть официальная учетная запись, вы знаете, что фон официальной учетной записи также имеет функцию регулярной отправки статей. В целом использование отложенных задач все еще очень широко. По поводу реализации отложенных задач я знаю не менее 3-х способов, которые будут представлены один за другим позже.Сегодня я расскажу о том, как использовать Redis для реализации отложенных задач.
Особенности отложенных задач
Прежде чем внедрять конкретное решение, мы могли бы подумать о том, какой контент необходимо хранить, чтобы реализовать систему задержки (под хранением здесь не обязательно подразумевают персистентность, его также можно размещать в памяти, в зависимости от важности задачи задержки). Первое, что нужно хранить, это описание задачи. Если отложенная задача, с которой вы хотите справиться, заключается в задержке выпуска информации, то вам следует, по крайней мере, сохранить идентификатор информации. Кроме того, если у вас есть несколько типов задач, таких как: задержка push-сообщений, задержка очистки данных и т. д., вам также необходимо хранить типы задач. Все вышеперечисленное относится к описанию задачи. В дополнение к этому вы также должны сохранить момент времени, когда задача была выполнена, обычно метку времени. Кроме того, нам также необходимо отсортировать по времени выполнения задач, потому что в очереди отложенных задач может быть много задач, и должны выполняться только те задачи, которые достигли момента времени, поэтому время выполнения задачи должно быть поддерживается.
Использование Redis для реализации отложенных задач
Выше перечислены элементы, которые должна иметь система отложенных задач. Возвращаясь к Redis, какая структура данных может хранить не только описание задачи, но и время ее выполнения, и может быть отсортирована по времени выполнения задачи? Подумав об этом, кажется, что есть только Sorted Set. Мы можем сериализовать описание задачи в строку, поместить ее в значение Sorted Set, а затем использовать метку времени выполнения задачи в качестве оценки.Используя естественную функцию сортировки Sorted Set, чем раньше время выполнения , тем выше рейтинг. Таким образом, нам нужно только открыть один или несколько потоков синхронизации, проверить элементы в отсортированном наборе, оценка которых меньше или равна текущей метке времени через равные промежутки времени (это можно сделать с помощью команды zrangebyscore), а затем выполнить задачи, соответствующие элементам. Разумеется, после выполнения задачи элемент необходимо удалить из Sorted Set, чтобы избежать повторного выполнения задачи. Если несколько потоков опрашивают отсортированный набор, также учитываются проблемы параллелизма.Если срок действия задачи истекает и ее получают несколько потоков, необходимо убедиться, что только один поток может выполнить задачу.Это можно сделать с помощью zrem. Чтобы реализовать команду, задача может быть выполнена только после успешного удаления, чтобы гарантировать, что задача не будет повторно выполняться несколькими задачами.
Далее посмотрите на код. Сначала посмотрите на структуру проекта:
Всего 4 класса: Класс Constants определяет константы, связанные с ключом Redis. DelayTaskConsumer — потребитель отложенных задач, этот класс отвечает за извлечение задач с истекшим сроком действия из Redis и инкапсулирует логику потребления задач. DelayTaskProducer — это производитель отложенных задач, который в основном используется для размещения отложенных задач в Redis. RedisClient — это инструментальный класс клиента Redis.
Наиболее важными классами являются DelayTaskConsumer и DelayTaskProducer.
Давайте сначала посмотрим на производителя DelayTaskProducer:
public class DelayTaskProducer {
public void produce(String newsId,long timeStamp){
Jedis client = RedisClient.getClient();
try {
client.zadd(Constants.DELAY_TASK_QUEUE,timeStamp,newsId);
}finally {
client.close();
}
}
}
Код очень простой: описание задачи (для удобства здесь хранится только id информации) и временная метка выполнения задачи в Sorted Set Redis.
Далее идет потребитель DelayTaskConsumer отложенной задачи:
public class DelayTaskConsumer {
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public void start(){
scheduledExecutorService.scheduleWithFixedDelay(new DelayTaskHandler(),1,1, TimeUnit.SECONDS);
}
public static class DelayTaskHandler implements Runnable{
@Override
public void run() {
Jedis client = RedisClient.getClient();
try {
Set<String> ids = client.zrangeByScore(Constants.DELAY_TASK_QUEUE, 0, System.currentTimeMillis(),
0, 1);
if(ids==null||ids.isEmpty()){
return;
}
for(String id:ids){
Long count = client.zrem(Constants.DELAY_TASK_QUEUE, id);
if(count!=null&&count==1){
System.out.println(MessageFormat.format("发布资讯。id - {0} , timeStamp - {1} , " +
"threadName - {2}",id,System.currentTimeMillis(),Thread.currentThread().getName()));
}
}
}finally {
client.close();
}
}
}
}
Сначала посмотрите на метод запуска. В этом методе мы используем ScheduledExecutorService Java, чтобы открыть пул потоков планирования, который планирует метод запуска в DelayTaskHandler каждую 1 секунду.
Класс DelayTaskHandler представляет собой конкретную логику планирования. Существует два основных шага: один — извлечь отложенную задачу с истекшим сроком действия из отсортированного набора Redis, а другой — выполнить отложенную задачу с истекшим сроком действия. Отложенная задача получения срока выполнения реализуется командой zrangeByScore, а проблема многопоточного параллелизма обрабатывается командой zrem. Код не сложный, поэтому я не буду его здесь объяснять.
Следующий тест:
public class DelayTaskTest {
public static void main(String[] args) {
DelayTaskProducer producer=new DelayTaskProducer();
long now=new Date().getTime();
System.out.println(MessageFormat.format("start time - {0}",now));
producer.produce("1",now+ TimeUnit.SECONDS.toMillis(5));
producer.produce("2",now+TimeUnit.SECONDS.toMillis(10));
producer.produce("3",now+ TimeUnit.SECONDS.toMillis(15));
producer.produce("4",now+TimeUnit.SECONDS.toMillis(20));
for(int i=0;i<10;i++){
new DelayTaskConsumer().start();
}
}
}
Сначала мы создали 4 отложенные задачи, а время выполнения составило 5 секунд, 10 секунд, 15 секунд и 20 секунд после запуска программы, а затем запустили 10 потребителей для использования отложенных задач. Эффект операции следующий:
Видно, что задача действительно может быть выполнена примерно в соответствующий момент времени, но есть небольшая ошибка времени, потому что мы вытягиваем задачи через временные задачи, а не в режиме реального времени, и некоторые задачи вытягиваются. Сетевые накладные расходы, кроме того, наша логика обработки задач обрабатывается синхронно, и следующий пакет задач может быть подтянут только после того, как будет обработана предыдущая задача, эти факторы будут вызывать отклонения во времени выполнения отложенных задач.
Суммировать
Выше приведена идея реализации отложенных задач через Redis. Здесь представлена только самая простая версия, и на самом деле есть много мест, которые можно оптимизировать. Например, мы можем поместить логику обработки задачи в отдельный пул потоков для выполнения, чтобы потребитель задачи отвечал только за планирование задачи, а преимущество в том, что это может уменьшить отклонение задачи. время исполнения. Так же для удобства описание задачи здесь это только id задачи.Если есть много разных типов задач, типа задачи отправки информации и задачи проталкивания сообщений упомянутых выше, то это необходимо дополнительно хранить тип задачи.Дифференцируйте или используйте разные наборы сортировки для хранения отложенных задач.
Кроме того, приведенный выше пример каждый раз вытягивает только одну отложенную задачу.Если количество задач, которые нужно обработать в определенный момент, очень велико, будут некоторые задачи с серьезной задержкой, которые можно оптимизировать здесь. задача за раз, например 10 задач, а затем обработка их одна за другой может значительно повысить эффективность планирования, потому что, если используется вышеуказанный метод, для извлечения 10 задач требуется время планирования 10. Интервал составляет 1 секунду, и это занимает 10 всего секунд, чтобы вытащить задачи 10. Если его изменить, чтобы вытащить задачи 10 за раз, это займет всего 1 раз, и повышение эффективности будет довольно большим.
Конечно, его можно оптимизировать и под другим углом. Глядя на приведенный выше код, когда задача для выполнения извлекается, задача выполняется напрямую, а поток завершается после выполнения задачи, но в это время в очереди может быть много задач для выполнения (потому что мы вытягивать задачу Когда есть задача, количество вытягиваний ограничено), так что фактически здесь можно использовать цикл. Когда задача, которую нужно выполнить, не может быть вытянута, планирование завершается. Когда есть задача, после выполнение завершено, вы также можете проверить, есть ли накопленные задачи.Поток не завершается до тех пор, пока больше не накопится задач.
Последнее, что нужно учитывать, это то, что приведенный выше код не обрабатывает случай сбоя выполнения задачи, то есть, если выполнение задачи завершается сбоем, то нет возможности повторить попытку. Поэтому при его использовании в производственной среде также необходимо учитывать сбой обработки задачи. Простой способ — перехватить исключение во время обработки задачи, а когда во время обработки возникает исключение, поместить задачу обратно в Redis Sorted или повторить попытку обработки текущим потоком. Однако таким образом своевременность задачи не может быть гарантирована.Возможно, что задача, первоначально запланированная для выполнения на 7:00 утра, будет отложена до 7:10 из-за сбоя и повторной попытки.Это должно быть взвешивается в зависимости от бизнеса.Например, вы можете добавить количество повторных попыток или разрешить ли повторные попытки в описании задачи, чтобы в случае сбоя выполнения задачи можно было принять разные меры компенсации в соответствии с разными задачами.
Итак, каковы преимущества и недостатки использования Redis для реализации отложенных задач? Преимущество в том, что он может удовлетворить пропускную способность. Минус в том, что есть риск потери задачи (при зависании инстанса redis). Поэтому, если требования к производительности относительно высоки, а потеря задач в некоторых случаях допустима, то можно использовать этот метод.
Подпишитесь на официальный аккаунт, чтобы увидеть больше качественных оригинальных статей.