Фреймворк распределенных задач синхронизации Quartz

Java

предисловие

В проекте всегда написано несколько задач на время, чтобы справиться с некоторыми вещами. Некоторые простые задачи синхронизации можно выполнить с помощью собственных задач синхронизации Spring. Но если требуется большое количество запланированных задач, как управлять ими единообразно?

В этой статье представлена ​​структура распределенного планирования Quartz.

вводить

О кварце

Quartz — еще один проект с открытым исходным кодом организации с открытым исходным кодом OpenSymphony в области планирования заданий.Это система управления расписанием задач с открытым исходным кодом, полностью разработанная java. В настоящее время это проект Terracotta. На официальном сайте по адресу http://www.quartz-scheduler.org/ можно скачать релизную версию Quartz и его исходный код.

Функции

  • Простая интеграция (полностью написана на Java)
  • Может быть развернут на одной машине, не полагаясь на кластер
  • Может работать независимо через JVM

Job

Для создания задачи нужно только реализовать интерфейс Job

курок

  • Можно выполнить через Календарь (кроме праздников)
  • Укажите определенное время для выполнения беспроводного цикла, например, каждые пять минут.
  • Выполняется в фиксированное время, например, каждый понедельник в 10:00.

Обычно используются SimpleTrigger и CronTrigger, и эти триггеры реализуют интерфейс Trigger. Или подклассы ScheduleBuilder SimpleScheduleBuilder и CronScheduleBuilder.

Для простых случаев, например выполнения несколько раз в день, используйте SimpleTrigger. Для сложных выражений времени, таких как время 15-го числа каждого месяца, используйте классы CronTrigger и CromExpression.

Уведомление :Задание может быть привязано к нескольким триггерам, но триггер может быть привязан только к одному заданию.

место хранения

Существует два метода хранения RAMJobStore и JDBCJobStore.

RAMJobStore не требует хранения информации о планировании из внешней базы данных в памяти JVM, поэтому, когда приложение перестанет работать, вся информация о планировании будет потеряна, а количество сохраненных заданий и триггеров также будет ограничено.

JDBCJobStore поддерживает кластеризацию.Все триггеры и задания хранятся в базе данных.Независимо от остановки и перезапуска сервера задачи могут быть возобновлены и поддерживается обработка транзакций.

настоящий бой

Подготовить

Вышеупомянутое кратко представило Quartz, а затем начало реального боя.В этой статье используется интеграция SpringBoot.

Адрес проекта: https://gitee.com/lqlm/toolsList_lqcoder

Во-первых, создайте таблицу базы данных, потому что их слишком много, и она не будет включена в статью.Вы можете перейти на официальный сайт, чтобы загрузить ее, или вы можете использовать мой адрес для загрузки, чтобы загрузить ее.

Адрес: https://lqcoder.com/quartz.sql

После завершения создания:

Table Name
Description
QRTZ_CALENDARS
Сохранение информации о кварцевом календаре
QRTZCRONTRIGGERS
Хранит CronTrigger, включая выражения Cron и информацию о часовом поясе.
QRTZFIREDTRIGGERS
Хранит информацию о состоянии, связанную с инициированным триггером, и информацию о выполнении связанного задания.
QRTZPAUSEDTRIGGER_GRPS Хранит информацию о приостановленной группе триггеров.
QRTZSCHEDULERSTATE
Храните небольшой объем информации о состоянии планировщика и других экземпляров планировщика.
QRTZ_LOCKS
Информация о пессимистической блокировке хранимой процедуры
QRTZJOBDETAILS
Хранит подробную информацию о каждом настроенном задании.
QRTZJOBLISTENERS
Хранит информацию о настроенном JobListener
QRTZSIMPLETRIGGERS
Храните простые триггеры, включая повторения, интервалы и количество касаний
QRTZBLOGTRIGGERS
Триггер хранится как тип Blob
QRTZTRIGGERLISTENERS
Хранит информацию о настроенном TriggerListener
QRTZ_TRIGGERS
Хранит информацию о настроенном триггере

В этой статье используется метод Cron для его создания единообразно.

Примечание. 4 таблицы данных, необходимые для метода cron:qrtzтриггеры, квартцcronтриггеры, квартцfiredтриггеры, квартцjob_details.

интеграционный проект

Создайте проект SpringBoot и добавьте зависимости кварца, а также добавьте зависимости c3p0, поскольку база данных, используемая кварцем, отделена от проекта.

  <!--spring boot集成quartz-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
        <dependency>
            <groupId>c3p0</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.0.2</version>
        </dependency>

В то же время создайте содержимое файлаquart.properties в разделе ресурсов.

org.quartz.scheduler.instanceName = MyScheduler
org.quartz.threadPool.threadCount = 10
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = myDS
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL = jdbc:mysql:数据库地址
org.quartz.dataSource.myDS.user = 数据库账号
org.quartz.dataSource.myDS.password = 数据库密码
org.quartz.dataSource.myDS.maxConnections = 连接数

Затем создайте класс Job

/**
 * @author snluomeng
 * @date 2019/12/19 16:27
 */
@Slf4j
public class MyJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("==================开始执行任务==================");
    }
}

Создайте класс инструментов, а затем добавляйте, удаляйте и изменяйте временные задачи.

Сначала создайте фабрику отправки

private static SchedulerFactory schedulerFactory = new StdSchedulerFactory();

Добавить запланированные задачи

 public static void addJob(String jobName, String jobGroupName,
                              String triggerName, String triggerGroupName, Class jobClass, String cron) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            // 任务名,任务组,任务执行类
//            Trigger.TriggerState state = sched.getTriggerState();

            JobDetail jobDetail=  JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
            // 触发器
            TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
            // 触发器名,触发器组
            triggerBuilder.withIdentity(triggerName, triggerGroupName);
            triggerBuilder.startNow();
            // 触发器时间设定
            triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
            // 创建Trigger对象
            CronTrigger trigger = (CronTrigger) triggerBuilder.build();

            // 调度容器设置JobDetail和Trigger
            sched.scheduleJob(jobDetail, trigger);

            // 启动
            if (!sched.isShutdown()) {
                sched.start();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

Создать процесс

Получено с заводаОбъект планировщика

Scheduler sched = schedulerFactory.getScheduler();

Установите класс реализации задания и некоторую статическую информацию.

  //jobClass 设置Job的实现类
  //jobName Job名称
  //jobGroupName Job组名称
JobDetail jobDetail=  JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();

триггер сборки

// 触发器
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
 // 触发器名,触发器组
triggerBuilder.withIdentity(triggerName, triggerGroupName);
triggerBuilder.startNow();
// 触发器时间设定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
// 创建Trigger对象
CronTrigger trigger = (CronTrigger) triggerBuilder.build();

Затем установите как задание, так и триггер на объект планировщика.

// 调度容器设置JobDetail和Trigger
sched.scheduleJob(jobDetail, trigger);

запускать

// 启动
sched.start();

бегать

Поскольку используется проект SpringBoot, временная задача добавляется непосредственно в класс запуска.

Параметры: JobName JsobgropName опущен в среднем классе реализации, время выполнения задачи

QuartUtil.addJob("测试定时任务","test","测试定时任务","testTrigger",MyJob.class,"0/5 * * * * ?");

Затем посмотрите выходной журнал:

Вы видите, что он уже выполняется. Теперь давайте посмотрим на данные в базе данных. Таблицы для просмотра:qrtzтриггеры, квартцcronтриггеры, квартцfiredтриггеры, квартцjob_details.

qrtzjobДетали:

уже существует

Что произойдет, если я сейчас остановлю проект, а затем перезапущу его?

Было обнаружено, что было выброшено исключение, потому что мы уже добавили эту временную задачу, поэтому повторные добавления не будут работать.

В это время мы можем начать прямо.

Тот же метод запуска пакета

   public static void startJobs() {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            sched.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

Очень просто запустить все запланированные задачи напрямую, получить объект планировщика, а затем запустить.

Изменить запланированные задачи

Изменение запланированной задачи также требует получения объекта Scheduler, который в основном аналогичен процессу добавления, за исключением того, что в конце вместо вызова scheduleJob() вызывается метод rescheduleJob().Есть два способа указать таймер название.

  • Во-первых, вызвать rescheduleJob() для прямого изменения
  • Второй - сначала удалить, а потом добавить
  /**
     * @Description: 修改一个任务的触发时间
     *
     * @param jobName
     * @param jobGroupName
     * @param triggerName 触发器名
     * @param triggerGroupName 触发器组名
     * @param cron   时间设置,参考quartz说明文档
     */
    public static void modifyJobTime(String jobName,
                                     String jobGroupName, String triggerName, String triggerGroupName, String cron) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
            CronTrigger trigger = (CronTrigger) sched.getTrigger(triggerKey);
            if (trigger == null) {
                return;
            }

            String oldTime = trigger.getCronExpression();
            if (!oldTime.equalsIgnoreCase(cron)) {
                /** 方式一 :调用 rescheduleJob 开始 */
                // 触发器
                TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
                // 触发器名,触发器组
                triggerBuilder.withIdentity(triggerName, triggerGroupName);
                triggerBuilder.startNow();
                // 触发器时间设定
                triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
                // 创建Trigger对象
                trigger = (CronTrigger) triggerBuilder.build();
                // 方式一 :修改一个任务的触发时间
                sched.rescheduleJob(triggerKey, trigger);
                /** 方式一 :调用 rescheduleJob 结束 */

                /** 方式二:先删除,然后在创建一个新的Job  */
                //JobDetail jobDetail = sched.getJobDetail(JobKey.jobKey(jobName, jobGroupName));
                //Class<? extends Job> jobClass = jobDetail.getJobClass();
                //removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
                //addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, cron);
                /** 方式二 :先删除,然后在创建一个新的Job */
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

удалить задачу

Удаление запланированной задачи уже имеет экземпляр при ее изменении.Обратите внимание, что вам необходимо указать имя задачи, группу задач и группу триггеров с именем триггера.

    /**
     * @Description: 移除一个任务
     *
     * @param jobName
     * @param jobGroupName
     * @param triggerName
     * @param triggerGroupName
     */
    public static void removeJob(String jobName, String jobGroupName,
                                 String triggerName, String triggerGroupName) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
            sched.pauseTrigger(triggerKey);// 停止触发器
            sched.unscheduleJob(triggerKey);// 移除触发器
            sched.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 删除任务
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

передать параметры

Теперь другой вопрос, что я хочу передать параметры классу реализации Job, что мне делать?

При добавлении запланированной задачи при создании JobDetail есть параметр метода setJobData() как JobDataMap, посмотрите исходный код JobBuilder

Вы можете видеть, что JobBuilder предоставляет параметр, переданный методом setJobData, как JobDataMap, который имеет тип Map.

При создании запланированной задачи вы можете:

JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("testKey","测试传递参数");
JobDetail jobDetail=  JobBuilder.newJob(jobClass).setJobData(jobDataMap).withIdentity(jobName, jobGroupName).build();

Затем возьмите его непосредственно в методе класса реализации Job.

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("==================开始执行任务==================");
        log.info("执行任务线程ID{}",Thread.currentThread().getId());
        JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        log.info("参数为{}",jobDataMap.get("testKey"));
    }

JobDataMap можно использовать непосредственно как Map.

Один параметр можно добавить с помощью использованияJobData(), параметр — KV, метод значения тот же, и параметр также сохраняется в базе данных.

Если вам нужно управление запросами, вы можете напрямую запросить базу данных

Принципиальный анализ

Выше кратко описано, как его использовать, затем вам должно быть интересно, как он работает.Далее давайте проанализируем, как реализован Quartz.

Обратите внимание, что указанные выше добавления, удаления и изменения должны сначала получить экземпляр Планировщика через фабрику schedulerFactory (фабричный режим), а теперь начать анализ с первого шага

В этой статье кратко анализируются два шага фабрики планировщика и добавление задач по времени.

Фабрика планировщиков

    public Scheduler getScheduler() throws SchedulerException {
        //读取quartz配置文件,未指定则顺序遍历各个path下的quartz.properties文件
        if (this.cfg == null) {
            //如果为空就初始化
            this.initialize();
        }
        // 获取调度器池,采用了单例模式
        // 为了避免并发getInstance是synchronized加锁的
        SchedulerRepository schedRep = SchedulerRepository.getInstance();
        // 从调度器池中取出当前配置所用的调度器
        Scheduler sched = schedRep.lookup(this.getSchedulerName());
        if (sched != null) {
            if (!sched.isShutdown()) {
                return sched;
            }

            schedRep.remove(this.getSchedulerName());
        }
        // 如果调度器池中没有当前配置的调度器,则实例化一个调度器,主要动作包括:
        // 1)初始化threadPool(线程池):开发者可以通过org.quartz.threadPool.class配置指定使用哪个线程池类,比如SimpleThreadPool。先class load线程池类,接着动态生成线程池实例bean,然后通过反射,使用setXXX()方法将以org.quartz.threadPool开头的配置内容赋值给bean成员变量;
        // 2)初始化jobStore(任务存储方式):开发者可以通过org.quartz.jobStore.class配置指定使用哪个任务存储类,比如RAMJobStore。先class load任务存储类,接着动态生成实例bean,然后通过反射,使用setXXX()方法将以org.quartz.jobStore开头的配置内容赋值给bean成员变量;
        // 3)初始化dataSource(数据源):开发者可以通过org.quartz.dataSource配置指定数据源详情,比如哪个数据库、账号、密码等。jobStore要指定为JDBCJobStore,dataSource才会有效;
        // 4)初始化其他配置:包括SchedulerPlugins、JobListeners、TriggerListeners等;
        // 5)初始化threadExecutor(线程执行器):默认为DefaultThreadExecutor;
        // 6)创建工作线程:根据配置创建N个工作thread,执行start()启动thread,并将N个thread顺序add进threadPool实例的空闲线程列表availWorkers中;
        // 7)创建调度器线程:创建QuartzSchedulerThread实例,并通过threadExecutor.execute(实例)启动调度器线程;
        // 8)创建调度器:创建StdScheduler实例,将上面所有配置和引用组合进实例中,并将实例存入调度器池中
        sched = instantiate();
        return sched;
    }

Добавить запланированные задачи

public Date scheduleJob(JobDetail jobDetail,
                            Trigger trigger) throws SchedulerException {
        // 检查调度器是否开启
        validateState();
        //参数校验省略
        if (jobDetail == null) {
            throw new SchedulerException("JobDetail cannot be null");
        }.....
        OperableTrigger trig = (OperableTrigger)trigger;
        //校验触发器参数
        if (trigger.getJobKey() == null) {
            trig.setJobKey(jobDetail.getKey());
        } else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
            throw new SchedulerException(
                    "Trigger does not reference given job!");
        }
        trig.validate();
        Calendar cal = null;
        if (trigger.getCalendarName() != null) {
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
        }
        //获取时间
        Date ft = trig.computeFirstFireTime(cal);
        // 把job和trigger注册进调度器的jobStore
        resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
        // 通知job监听者
        notifySchedulerListenersJobAdded(jobDetail);
        // 通知调度器线程
        notifySchedulerThread(trigger.getNextFireTime().getTime());
        // 通知trigger监听者
        notifySchedulerListenersSchduled(trigger);
        return ft;
    }
    public void validateState() throws SchedulerException {
        //如果关闭则抛出异常
        if (isShutdown()) {
            throw new SchedulerException("The Scheduler has been shutdown.");
        }
        // other conditions to check (?)
    }