Простой распределенный планировщик задач на основе Redis — реализация языка Java

Java распределенный

После недели планирования задач кластера Java Quartz, к сожалению, я не смог это сделать, а в Интернете очень мало статей на эту тему. динамически увеличивать или уменьшать задачи, что раздражает. В отчаянии я сам создал простой планировщик задач, но это заняло менее 2 дней, и он казался очень простым и удобным в использовании, с небольшим объемом кода и хорошей масштабируемостью.

Реализация распределенного планировщика задач имеет несколько ключевых соображений.

  1. Одиночные задачи и циклические задачи выполнять легко, но сложность заключается в том, как разобрать выражения cron и рассчитать время?
  2. Как несколько процессов могут обеспечить взаимное исключение задачи одновременно?
  3. Как динамически изменять задачи сложения и вычитания?

пример кода

Перед реализацией подробного объяснения давайте посмотрим, как это используется планировщиком.

class Demo {
    public static void main(String[] args) {
        var redis = new RedisStore();
        // sample 为任务分组名称
        var store = new RedisTaskStore(redis, "sample");
        // 5s 为任务锁寿命
        var scheduler = new DistributedScheduler(store, 5);
        // 注册一个单次任务
        scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {
            System.out.println("once1");
        }));
        // 注册一个循环任务
        scheduler.register(Trigger.periodOfDelay(5, 5), Task.of("period2", () -> {
            System.out.println("period2");
        }));
        // 注册一个 CRON 任务
        scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {
            System.out.println("cron3");
        }));
        // 设置全局版本号
        scheduler.version(1);
        // 注册监听器
        scheduler.listener(ctx -> {
            System.out.println(ctx.task().name() + " is complete");
        });
        // 启动调度器
        scheduler.start();
    }
}

Когда задачу обновления кода необходимо увеличить или уменьшить (или изменить время планирования), необходимо увеличить только глобальный номер версии, задачи в существующем процессе будут автоматически перепланированы, а те задачи, которые не были зарегистрированы ( сокращение задачи) будет автоматически очищен. Вновь добавленная задача (новая задача) не будет планироваться в процессе старого кода (код без новой задачи не может быть запланирована), а очищенная задача (старая задача) будет внепланирована в процессе старого кода.

Например, мы хотим отменить задачу периода2 и добавить задачу периода4.

class Demo {
    public static void main(String[] args) {
        var redis = new RedisStore();
        // sample 为任务分组名称
        var store = new RedisTaskStore(redis, "sample");
        // 5s 为任务锁寿命
        var scheduler = new DistributedScheduler(store, 5);
        // 注册一个单次任务
        scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {
            System.out.println("once1");
        }));
        // 注册一个 CRON 任务
        scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {
            System.out.println("cron3");
        }));
        // 注册一个循环任务
        scheduler.register(Trigger.periodOfDelay(5, 10), Task.of("period4", () -> {
            System.out.println("period4");
        }));
        // 递增全局版本号
        scheduler.version(2);
        // 注册监听器
        scheduler.listener(ctx -> {
            System.out.println(ctx.task().name() + " is complete");
        });
        // 启动调度器
        scheduler.start();
    }
}

cron4j

<dependency>
	<groupId>it.sauronsoftware.cron4j</groupId>
	<artifactId>cron4j</artifactId>
	<version>2.2.5</version>
</dependency>

Эта библиотека с открытым исходным кодом включает в себя базовые функции разбора выражений cron, а также предоставляет функции планирования задач, но здесь нет необходимости использовать планировщик. Я буду использовать только его возможности синтаксического анализа выражений и простой способ определить, соответствует ли текущее время выражению (не пора ли запустить задачу).

У нас очень низкие требования к точности времени cron, и этого достаточно, чтобы судить о том, является ли текущее время временем запуска задачи раз в 1 минуту.

class SchedulingPattern {
    // 表达式是否有效
    boolean validate(String cronExpr);
    // 是否应该运行任务了(一分钟判断一次)
    boolean match(long nowTs);
}

Взаимность задач

Поскольку это распределенный планировщик задач, в среде с несколькими процессами для управления одной и той же задачей во время планирования может выполняться только один процесс. Это легко сделать с распределенными блокировками Redis. Блокировку необходимо удерживать в течение определенного периода времени (например, 5 секунд по умолчанию).

Все процессы планируют эту задачу одновременно, но только один процесс может захватить блокировку. Из-за несоответствия времени в распределенной среде, процессы на разных машинах будут иметь небольшое окно разницы во времени, и блокировка должна поддерживать время окна.Здесь я установил по умолчанию 5 с (можно настроить), что требует, чтобы разница во времени между разными машинами не может быть. Если она превышает 5 с, при превышении этого значения произойдет повторное планирование.

public boolean grabTask(String name) {
    var holder = new Holder<Boolean>();
    redis.execute(jedis -> {
        var lockKey = keyFor("task_lock", name);
        var ok = jedis.set(lockKey, "true", SetParams.setParams().nx().ex(lockAge));
        holder.value(ok != null);
    });
    return holder.value();
}

глобальный номер версии

Мы прикрепляем глобальный номер версии к списку задач. Когда бизнесу необходимо увеличить или уменьшить задачи планирования, мы можем вызвать перезагрузку задачи процесса, изменив номер версии. Этот процесс перезагрузки включает в себя опрос глобального номера версии (ключ Redis), и, если номер версии изменяется, немедленно перезагружает конфигурацию списка задач и перепланирует все задачи.

private void scheduleReload() {
    // 1s 对比一次
    this.scheduler.scheduleWithFixedDelay(() -> {
        try {
            if (this.reloadIfChanged()) {
                this.rescheduleTasks();
            }
        } catch (Exception e) {
            LOG.error("reloading tasks error", e);
        }
    }, 0, 1, TimeUnit.SECONDS);
}

Чтобы изменить расписание задачи, сначала отмените все текущие запланированные задачи, а затем запланируйте все задачи, которые только что были загружены.

private void rescheduleTasks() {
    this.cancelAllTasks();
    this.scheduleTasks();
}

private void cancelAllTasks() {
    this.futures.forEach((name, future) -> {
        LOG.warn("cancelling task {}", name);
        future.cancel(false);
    });
    this.futures.clear();
}

Поскольку задача должна сохраняться, разработан формат сериализации задачи. Это также очень просто. Просто используйте текстовый символ, чтобы разделить свойства конфигурации задачи.

// 一次性任务(startTime)
ONCE@2019-04-29T15:26:29.946+0800
// 循环任务,(startTime,endTime,period),这里任务的结束时间是天荒地老
PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5
// cron 任务,一分钟一次
CRON@*/1 * * * *

$ redis-cli
127.0.0.1:6379> hgetall sample_triggers
1) "task3"
2) "CRON@*/1 * * * *"
3) "task2"
4) "PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5"
5) "task1"
6) "ONCE@2019-04-29T15:26:29.946+0800"
7) "task4"
8) "PERIOD@2019-04-29T15:26:29.957+0800|292278994-08-17T15:12:55.807+0800|10"

Пул потоков

Для планирования времени будет создан отдельный поток (пул потоков с одним потоком), а выполнение задачи будет выполняться другим пулом потоков (количество можно настроить).

class DistributedScheduler {
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private ExecutorService executor = Executors.newFixedThreadPool(threads);
}

Причина, по которой вы хотите отделиться от пула потоков, заключается в том, чтобы выполнение задач (IO) не влияло на точное планирование времени.

FixedDelay vs FixedRate

Встроенный планировщик Java предоставляет две стратегии планирования: FixedDelay и FixedRate. FixedDelay гарантирует, что два последовательных запуска одной и той же задачи имеют одинаковую задержку (nextRun.startTime - lastRun.endTime), а FixedRate гарантирует наличие определенного интервала (nextRun.startTime - lastRun.startTime) между последовательными запусками одной и той же задачи.

FixedDelay — это как если вы работаете сверхурочно до 12:00 ночи, вы можете вернуться на работу в 12:00 следующего дня (гарантия фиксированного времени отдыха), в то время как FixedRate не так внимателен, вы продолжаете приходить на работу в 9:00 следующего дня. Если вам не повезло, и вы все еще работаете сверхурочно в 9 часов следующего дня, то сегодня у вас нет выходных, возвращайтесь к работе.

class ScheduledExecutorService {
    void scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    void scheduleAtFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

Распределенному планировщику требуется точное время планирования, поэтому необходимо использовать режим FixedRate, чтобы гарантировать, что одна и та же задача нескольких узлов выполняется одновременно. Если принят режим FixedDelay, время планирования различных процессов будет распределено по времени, а временное окно распределенных блокировок в 5 секунд по умолчанию не будет играть роли во взаимном исключении.

Поддержка задач без мьютекса

Для задач взаимного исключения требуется, чтобы выполнялся один процесс задачи, а задачи без взаимного исключения — это задачи без распределенных блокировок, и несколько процессов могут выполняться одновременно. Взаимное исключение требуется по умолчанию.

class Task {
    /**
     * 是否需要考虑多进程互斥(true表示不互斥,多进程能同时跑)
     */
    private boolean concurrent;
    private String name;
    private Runnable runner;
    ...
    public static Task of(String name, Runnable runner) {
        return new Task(name, false, runner);
    }

    public static Task concurrent(String name, Runnable runner) {
        return new Task(name, true, runner);
    }
}

Добавить интерфейс обратного вызова

Учитывая, что пользователям планировщика может потребоваться отслеживать состояние задачи для запуска, где добавлен простой интерфейс обратного вызова, текущая функция относительно проста. Возможность сообщать о результатах работы (успешных или нештатных) и трудоемкой операции

class TaskContext {
    private Task task;
    private long cost;  // 运行时间
    private boolean ok;
    private Throwable e;
}

interface ISchedulerListener {
    public void onComplete(TaskContext ctx);
}

Поддержка расширения хранилища

В настоящее время реализовано только хранилище задач в виде Redis и Memory.Также возможно расширение до zk, etcd и реляционных баз данных.Просто реализуйте следующие интерфейсы.

interface ITaskStore {
  public long getRemoteVersion();
  public Map<String, String> getAllTriggers();
  public void saveAllTriggers(long version, Map<String, String> triggers);
  public boolean grabTask(String name);
}

кодовый адрес

GitHub.com/friend kucho/он сказал…