Публичный аккаунт WeChat: стек червоточины bugstack | Блог:bugstack.cn
Осаждайте, делитесь, расширяйте, фокусируйтесь на оригинальных частных случаях, делитесь знаниями самым простым способом изучения программирования, чтобы вы и другие могли извлечь выгоду. Темы, которые были завершены на данный момент, включают: практический пример Netty4.x, реализацию JVM с Java, полноканальный мониторинг на основе JavaAgent, рукописную структуру RPC, пример проектирования архитектуры [Ing] и т. д.
Вы используете меч 🗡, я использую нож 🔪, хороший код сильно сгорел 😏, надеюсь, вы сделаете все возможное 💨!
1. Введение
Запланированные задачи часто используются в ежедневной разработке для: сканирования таблицы базы данных для отправки MQ, оплаты счетов T+n, обновления данных кэша, изменения состояния активности seckill и так далее. Потому что Spring Schedule значительно облегчает нам использование таких сценариев. Итак, что вы знаете о нем помимо приложения?
- Сколько потоков задач инициализировано по умолчанию
- Существует несколько реализаций JobStore, какую из них вы обычно используете?
- Ниже кратко описан процесс выполнения запланированной задачи.
Это круг, я не чувствую, что я просто использую его нормально, и я вообще не обращал внимания на эти. Почувствуйте желание искать ответ! Но просто знать ответ не имеет особого смысла. Так что, если вы действительно хотите улучшить свои навыки, вы должны делать это с нуля.
2. Кейс-проект
Чтобы лучше проводить анализ источников, мы передаем от нас услугу временных задач. Скачать проект, обратите внимание на публичный номер: Bugstack насекомых, ответ: Анализ исходного кода
itstack-demo-code-schedule
└── src
├── main
│ ├── java
│ │ └── org.itstack.demo
│ │ ├── DemoTask.java
│ │ └── JobImpl.java
│ └── resources
│ ├── props
│ │ └── config.properties
│ ├── spring
│ │ └── spring-config-schedule-task.xml
│ ├── logback.xml
│ └── spring-config.xml
└── test
└── java
└── org.itstack.demo.test
├── ApiTest.java
├── MyQuartz.java
└── MyTask.java
В-третьих, конфигурация окружающей среды
- JDK 1.8
- IDEA 2019.3.1
- Spring 4.3.24.RELEASE
- кварц 2.3.2 {разные версии имеют небольшие отличия в коде}
Четыре, анализ исходного кода
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
В зависимости от кварца обновления версии Spring выберите 2.3.2, также если вы используете задачу конфигурации xml, как в случае с этой статьей. Затем произойдут следующие изменения;
Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean
<bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
<property name="jobDetail" ref="taskHandler"/>
<property name="cronExpression" value="0/5 * * * * ?"/>
</bean>
Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean
<bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="taskHandler"/>
<property name="cronExpression" value="0/5 * * * * ?"/>
</bean>
Перед формальным разбором можно посмотреть дефолтную конфигурацию кварца.Многие действия по инициализации нужно получить отсюда параметры.Так же можно настроить свой файл конфигурации. Например, когда у вас много задач, 10 групп потоков, инициализированных по умолчанию, не соответствуют потребностям вашего бизнеса, и вы можете настроить их по мере необходимости.
quart.properties
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
1. Начните с простого случая
Обычно мы используем Schedule, чтобы в основном использовать аннотации или файлы конфигурации xml, но для более простого анализа кода мы начинаем с простой демонстрации и помещаем ее в основную функцию.
DemoTask.java и определяет задачу, ожидающую выполнения
public class DemoTask {
private Logger logger = LoggerFactory.getLogger(DemoTask.class);
public void execute() throws Exception{
logger.info("定时处理用户信息任务:0/5 * * * * ?");
}
}
MyTask.java и тестовый класс, извлеките код, настроенный в xml
public class MyTask {
public static void main(String[] args) throws Exception {
DemoTask demoTask = new DemoTask();
// 定义了;执行的内容
MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
methodInvokingJobDetailFactoryBean.setConcurrent(true);
methodInvokingJobDetailFactoryBean.setName("demoTask");
methodInvokingJobDetailFactoryBean.afterPropertiesSet();
// 定义了;执行的计划
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");
cronTriggerFactoryBean.setName("demoTask");
cronTriggerFactoryBean.afterPropertiesSet();
// 实现了;执行的功能
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.afterPropertiesSet();
schedulerFactoryBean.start();
// 暂停住
System.in.read();
}
}
Если все пойдет хорошо, результат будет следующим:
2020-01-04 10:47:16.369 [main] INFO org.quartz.impl.StdSchedulerFactory[1220] - Using default implementation for ThreadExecutor
2020-01-04 10:47:16.421 [main] INFO org.quartz.core.SchedulerSignalerImpl[61] - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
2020-01-04 10:47:16.422 [main] INFO org.quartz.core.QuartzScheduler[229] - Quartz Scheduler v.2.3.2 created.
2020-01-04 10:47:16.423 [main] INFO org.quartz.simpl.RAMJobStore[155] - RAMJobStore initialized.
2020-01-04 10:47:16.424 [main] INFO org.quartz.core.QuartzScheduler[294] - Scheduler meta-data: Quartz Scheduler (v2.3.2) 'QuartzScheduler' with instanceId 'NON_CLUSTERED'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
2020-01-04 10:47:16.424 [main] INFO org.quartz.impl.StdSchedulerFactory[1374] - Quartz scheduler 'QuartzScheduler' initialized from an externally provided properties instance.
2020-01-04 10:47:16.424 [main] INFO org.quartz.impl.StdSchedulerFactory[1378] - Quartz scheduler version: 2.3.2
2020-01-04 10:47:16.426 [main] INFO org.quartz.core.QuartzScheduler[2293] - JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@3e9b1010
2020-01-04 10:47:16.651 [main] INFO org.quartz.core.QuartzScheduler[547] - Scheduler QuartzScheduler_$_NON_CLUSTERED started.
一月 04, 2020 10:47:16 上午 org.springframework.scheduling.quartz.SchedulerFactoryBean startScheduler
信息: Starting Quartz Scheduler now
2020-01-04 10:47:20.321 [QuartzScheduler_Worker-1] INFO org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
2020-01-04 10:47:25.001 [QuartzScheduler_Worker-2] INFO org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
2020-01-04 10:47:30.000 [QuartzScheduler_Worker-3] INFO org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
2020-01-04 10:47:35.001 [QuartzScheduler_Worker-4] INFO org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
2020-01-04 10:47:40.000 [QuartzScheduler_Worker-5] INFO org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
Process finished with exit code -1
2. Определите содержимое выполнения (MethodInvokingJobDetailFactoryBean)
// 定义了;执行的内容
MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
methodInvokingJobDetailFactoryBean.setConcurrent(true);
methodInvokingJobDetailFactoryBean.setName("demoTask");
methodInvokingJobDetailFactoryBean.afterPropertiesSet();
Этот контент в основном передает тело нашей задачи (то есть задачу для выполнения DemoTask) в MethodInvokingJobDetailFactoryBean для управления, и сначала задает необходимую информацию;
- targetObject: bean-компонент целевого объекта, который является demoTask.
- targetMethod: имя целевого метода, который выполняется
- concurrent: выполнять ли задачи параллельно, непараллельное выполнение задач, если предыдущая задача не выполнена, то в следующий момент она выполняться не будет
- name: конфигурация xml не требуется, beanName можно получить из исходного кода
Наконец, мы моделируем инициализацию, вручную вызывая afterPropertiesSet(). Если наш класс управляется Spring, то класс, реализующий интерфейс InitializingBean, автоматически выполнит afterPropertiesSet() после загрузки информации о конфигурации класса. Как правило, класс, который реализует интерфейс InitializingBean, будет также реализовывать интерфейс FactoryBean, потому что после реализации этого интерфейса вы можете получить свой собственный пользовательский класс инициализации через T getObject(). Это также часто используется при разработке некоторых фреймворков.
MethodInvokingJobDetailFactoryBean.afterPropertiesSet()
public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {
prepare();
// Use specific name if given, else fall back to bean name.
String name = (this.name != null ? this.name : this.beanName);
// Consider the concurrent flag to choose between stateful and stateless job.
Class<?> jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);
// Build JobDetail instance.
JobDetailImpl jdi = new JobDetailImpl();
jdi.setName(name);
jdi.setGroup(this.group);
jdi.setJobClass((Class) jobClass);
jdi.setDurability(true);
jdi.getJobDataMap().put("methodInvoker", this);
this.jobDetail = jdi;
postProcessJobDetail(this.jobDetail);
}
-
Линия 168 исходного кода:Выберите классы задач в зависимости от того, выполняются ли они параллельно. Эти два класса являются внутренними классами MethodInvokingJobDetailFactoryBean. StatefulMethodInvokingJob, который не выполняется параллельно, просто наследует MethodInvokingJob и добавляет аннотации разметки.
-
Строка 171 исходного кода:Создайте JobDetailIMPL, добавьте сведения о задаче и обратите внимание на этот вид JDI.setJobClass (Class) JobClass) на самом деле MethodInvokingJob. MethodInvokingJob также является содержимым, которое мы в конечном итоге отразили для выполнения вызова.
-
Строка 177 исходного кода:После инициализации задачи назначьте ее this.jobDetail = jdi, которая является конечным объектом класса
MethodInvokingJobDetailFactoryBean.getObject()
@Override public JobDetail getObject() { return this.jobDetail; }
-
Исходный код: Строка 220:This.jobDetail возвращается при получении объекта, что объясняет, почему MethodInvokingJobDetailFactoryBean напрямую назначается JobDetail после инициализации;
3. Определите план выполнения (CronTriggerFactoryBeann)
// 定义了;执行的计划
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");
cronTriggerFactoryBean.setName("demoTask");
cronTriggerFactoryBean.afterPropertiesSet();
Этот блок в основном определяет план выполнения задачи и передает содержимое выполнения задачи CronTriggerFactoryBean для управления и одновременно устанавливает необходимую информацию;
- jobDetail: установите тело задачи, объект может быть напрямую назначен в xml, а информация об объекте JobDetail, которая должна быть выполнена, задается в жестком кодировании. То есть JobDetailImpl, который мы установили выше, получен через getObject().
- cronExpression: выражение плана; секунда, минута, час, день, месяц, неделя, год
CronTriggerFactoryBean.afterPropertiesSet()
@Override
public void afterPropertiesSet() throws ParseException {
// ... 校验属性信息
CronTriggerImpl cti = new CronTriggerImpl();
cti.setName(this.name);
cti.setGroup(this.group);
if (this.jobDetail != null) {
cti.setJobKey(this.jobDetail.getKey());
}
cti.setJobDataMap(this.jobDataMap);
cti.setStartTime(this.startTime);
cti.setCronExpression(this.cronExpression);
cti.setTimeZone(this.timeZone);
cti.setCalendarName(this.calendarName);
cti.setPriority(this.priority);
cti.setMisfireInstruction(this.misfireInstruction);
cti.setDescription(this.description);
this.cronTrigger = cti;
}
-
Строка исходного кода 237:Создайте триггер CronTriggerImpl и установите соответствующую информацию о свойствах.
-
Строка 245 исходного кода:Создать класс плана выполнения cti.setCronExpression(this.cronExpression);
public void setCronExpression(String cronExpression) throws ParseException { TimeZone origTz = getTimeZone(); this.cronEx = new CronExpression(cronExpression); this.cronEx.setTimeZone(origTz); }
CronExpression.java и разбор выражений Cron
protected void buildExpression(String expression) throws ParseException { expressionParsed = true; try { // ... 初始化 TreeSet xxx = new TreeSet<Integer>(); int exprOn = SECOND; StringTokenizer exprsTok = new StringTokenizer(expression, " \t", false); while (exprsTok.hasMoreTokens() && exprOn <= YEAR) { String expr = exprsTok.nextToken().trim(); // ... 校验DAY_OF_MONTH和DAY_OF_WEEK字段的特殊字符 StringTokenizer vTok = new StringTokenizer(expr, ","); while (vTok.hasMoreTokens()) { String v = vTok.nextToken(); storeExpressionVals(0, v, exprOn); } exprOn++; } // ... 校验DAY_OF_MONTH和DAY_OF_WEEK字段的特殊字符 } catch (ParseException pe) { throw pe; } catch (Exception e) { throw new ParseException("Illegal cron expression format (" + e.toString() + ")", 0); } }
- Выражения CRON имеют 7 полей, а крошечно-технические характеристики 7 полей на 7 объектов TreeST.
- При заполнении значения объекта TreeEset выражение преобразуется в начальное значение, конечное значение и режим инкрементных вычислений, а затем вычисляется соответствующее значение для размещения объекта набора деревьев.
CronTriggerFactoryBean.getObject()
@Override
public CronTrigger getObject() {
return this.cronTrigger;
}
- Строка исходного кода 257:Возвращайте this.cronTrigger при получении объекта, который является объектом CronTriggerImpl.
4. План выполнения расписания (SchedulerFactoryBean)
// 调度了;执行的计划(scheduler)
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.afterPropertiesSet();
schedulerFactoryBean.start();
Эта часть планирует фабрику, как имя, которое эквивалентно командиру.Его можно планировать глобально, например, отслеживать, какие триггеры готовы, назначать потоки и т. д., а также необходимо установить необходимую атрибутивную информацию;
- Триггеры: при необходимости можно установить несколько триггеров.В этой статье устанавливается cronTriggerFactoryBean.getObject(), который является объектом CronTriggerImpl.
- autoStartup: следует ли автоматически запускать задачу по умолчанию, значение по умолчанию — true
Этот длительный процесс включает в себя: планирование фабрик, пулов потоков, регистрацию задач и т. д. Общий процесс загрузки ядра выглядит следующим образом;
- Весь процесс загрузки занимает много времени, и для анализа извлекаются некоторые основные блоки кода, включая классы;
- StdScheduler
- StdSchedulerFactory
- SimpleThreadPool
- QuartzScheduler
- QuartzSchedulerThread
- RAMJobStore
- CronTriggerImpl
- CronExpression
SchedulerFactoryBean.afterPropertiesSet()
public void afterPropertiesSet() throws Exception {
if (this.dataSource == null && this.nonTransactionalDataSource != null) {
this.dataSource = this.nonTransactionalDataSource;
}
if (this.applicationContext != null && this.resourceLoader == null) {
this.resourceLoader = this.applicationContext;
}
// Initialize the Scheduler instance...
this.scheduler = prepareScheduler(prepareSchedulerFactory());
try {
registerListeners();
registerJobsAndTriggers();
}
catch (Exception ex) {
try {
this.scheduler.shutdown(true);
}
catch (Exception ex2) {
logger.debug("Scheduler shutdown exception after registration failure", ex2);
}
throw ex;
}
}
-
Строка 474 исходного кода:Подготовьтесь к планировщику prepareScheduler(prepareSchedulerFactory()) , выполните следующее;
- SchedulerFactoryBean.prepareScheduler(SchedulerFactory schedulerFactory)
- SchedulerFactoryBean.createScheduler(schedulerFactory, this.schedulerName);
- SchedulerFactoryBean.createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
- Scheduler newScheduler = schedulerFactory.getScheduler();
- StdSchedulerFactory.getScheduler();
- sched = instantiate(); Включает ряд основных операций;
1)初始化threadPool(线程池):开发者可以通过org.quartz.threadPool.class配置指定使用哪个线程池类,比如SimpleThreadPool。 2)初始化jobStore(任务存储方式):开发者可以通过org.quartz.jobStore.class配置指定使用哪个任务存储类,比如RAMJobStore。 3)初始化dataSource(数据源):开发者可以通过org.quartz.dataSource配置指定数据源详情,比如哪个数据库、账号、密码等。 4)初始化其他配置:包括SchedulerPlugins、JobListeners、TriggerListeners等; 5)初始化threadExecutor(线程执行器):默认为DefaultThreadExecutor; 6)创建工作线程:根据配置创建N个工作thread,执行start()启动thread,并将N个thread顺序add进threadPool实例的空闲线程列表availWorkers中; 7)创建调度器线程:创建QuartzSchedulerThread实例,并通过threadExecutor.execute(实例)启动调度器线程; 8)创建调度器:创建StdScheduler实例,将上面所有配置和引用组合进实例中,并将实例存入调度器池中
-
Строка исходного кода 477:Вызовите родительский класс SchedulerAccessor.registerJobsAndTriggers() для регистрации задач и триггеров.
for (Trigger trigger : this.triggers) { addTriggerToScheduler(trigger); }
SchedulerAccessor.addTriggerToScheduler() & SchedulerAccessor является родительским классом SchedulerFactoryBean
private boolean addTriggerToScheduler(Trigger trigger) throws SchedulerException {
boolean triggerExists = (getScheduler().getTrigger(trigger.getKey()) != null);
if (triggerExists && !this.overwriteExistingJobs) {
return false;
}
// Check if the Trigger is aware of an associated JobDetail.
JobDetail jobDetail = (JobDetail) trigger.getJobDataMap().remove("jobDetail");
if (triggerExists) {
if (jobDetail != null && !this.jobDetails.contains(jobDetail) && addJobToScheduler(jobDetail)) {
this.jobDetails.add(jobDetail);
}
try {
getScheduler().rescheduleJob(trigger.getKey(), trigger);
}
catch (ObjectAlreadyExistsException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Unexpectedly encountered existing trigger on rescheduling, assumably due to " +
"cluster race condition: " + ex.getMessage() + " - can safely be ignored");
}
}
}
else {
try {
if (jobDetail != null && !this.jobDetails.contains(jobDetail) &&
(this.overwriteExistingJobs || getScheduler().getJobDetail(jobDetail.getKey()) == null)) {
getScheduler().scheduleJob(jobDetail, trigger);
this.jobDetails.add(jobDetail);
}
else {
getScheduler().scheduleJob(trigger);
}
}
catch (ObjectAlreadyExistsException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Unexpectedly encountered existing trigger on job scheduling, assumably due to " +
"cluster race condition: " + ex.getMessage() + " - can safely be ignored");
}
if (this.overwriteExistingJobs) {
getScheduler().rescheduleJob(trigger.getKey(), trigger);
}
}
}
return true;
}
-
Строка исходного кода 299:addJobToScheduler(jobDetail) всегда будет вызывать RAMJobStore для сохранения информации о задании в HashMap
(100) public void storeJob(JobDetail newJob, boolean replaceExisting) throws ObjectAlreadyExistsException { JobWrapper jw = new JobWrapper((JobDetail)newJob.clone()); boolean repl = false; synchronized (lock) { if (jobsByKey.get(jw.key) != null) { if (!replaceExisting) { throw new ObjectAlreadyExistsException(newJob); } repl = true; } if (!repl) { // get job group HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup()); if (grpMap == null) { grpMap = new HashMap<JobKey, JobWrapper>(100); jobsByGroup.put(newJob.getKey().getGroup(), grpMap); } // add to jobs by group grpMap.put(newJob.getKey(), jw); // add to jobs by FQN map jobsByKey.put(jw.key, jw); } else { // update job detail JobWrapper orig = jobsByKey.get(jw.key); orig.jobDetail = jw.jobDetail; // already cloned } } }
-
Инициализировать группу потоков;
- prepareScheduler
- createScheduler
- schedulerFactory
- StdSchedulerFactory.getScheduler()
- getScheduler()->instantiate()
- Строка 1323 исходного кода: tp.initialize();
SimpleThreadPool.initialize() & здесь count - это число в конфигурации по умолчанию, которое можно изменить.
// create the worker threads and start them Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator(); while(workerThreads.hasNext()) { WorkerThread wt = workerThreads.next(); wt.start(); availWorkers.add(wt); }
5. Запустите запланированное задание
В этом случае используется жестко закодированный метод для вызова schedulerFactoryBean.start() для запуска службы потока. Взаимодействие потоков реализовано через Object sigLock.Метод sigLock.wait() находится в методе запуска QuartzSchedulerThread, поэтому sigLock пробуждает только поток QuartzSchedulerThread. Основной процесс заключается в следующем;
Во время этого процесса запуска классы основного кода следующие:
- StdScheduler
- QuartzScheduler
- QuartzSchedulerThread
- ThreadPool
- RAMJobStore
- CronTriggerImpl
- JobRunShellFactory
QuartzScheduler.start() и запуск
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
// 唤醒线程
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
}
QuartzSchedulerThread.run() и процесс выполнения
@Override
public void run() {
int acquiresFailed = 0;
// 只有调用了halt()方法,才会退出这个死循环
while (!halted.get()) {
try {
// 一、如果是暂停状态,则循环超时等待1000毫秒
// wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..
// 阻塞直到有空闲的线程可用并返回可用的数量
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) {
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
// 二、获取acquire状态的Trigger列表,也就是即将执行的任务
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers
} catch(){//...}
if (triggers != null && !triggers.isEmpty()) {
// 三:获取List第一个Trigger的下次触发时刻
long triggerTime = triggers.get(0).getNextFireTime().getTime();
// 四:获取任务触发集合
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
// 五:设置Triggers为'executing'状态
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
// 六:创建JobRunShell
qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
// 七:执行Job
qsRsrcs.getThreadPool().runInThread(shell)
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows con
continue; // while (!halted)
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
}
qs = null;
qsRsrcs = null;
}
-
Строка исходного кода 391:Создайте JobRunShell.В методе initialize() экземпляр JobRunShell установит JobDetailImpl, содержащий класс бизнес-логики, в качестве свойства-члена для подготовки к последующему выполнению кода бизнес-логики. Выполните код бизнес-логики в методе runInThread(shell).
QuartzSchedulerThread.run() и часть кода
JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; }
-
Строка 398 исходного кода: qsRsrcs.getThreadPool().runInThread(shell)
SimpleThreadPool.runInThread
// 保存所有WorkerThread的集合 private List<WorkerThread> workers; // 空闲的WorkerThread集合 private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); // 任务的WorkerThread集合 private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>(); /** * 维护workers、availWorkers和busyWorkers三个列表数据 * 有任务需要一个线程出来执行:availWorkers.removeFirst();busyWorkers.add() * 然后调用WorkThread.run(runnable)方法 */ public boolean runInThread(Runnable runnable) { if (runnable == null) { return false; } synchronized (nextRunnableLock) { handoffPending = true; // Wait until a worker thread is available while ((availWorkers.size() < 1) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } if (!isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { // If the thread pool is going down, execute the Runnable // within a new additional worker thread (no thread from the pool). WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false; } return true; }
-
Строка 428 исходного кода:WorkerThread , внутренний класс, в основном назначает и пробуждает очередь ожидающих потоков объекта блокировки.
WorkerThread.run(Runnable newRunnable)
public void run(Runnable newRunnable) { synchronized(lock) { if(runnable != null) { throw new IllegalStateException("Already running a Runnable!"); } runnable = newRunnable; lock.notifyAll(); } }
-
Строка 561 исходного кода:Метод запуска WorkerThread, после того как метод выполнит lock.notifyAll(), соответствующий WorkerThread придет к методу run(). иди сюда! Близится рассвет! Наконец дошли до предпоследнего шага метода execute() выполнения дела.Выполняемый объект — это объект JobRunShell.Давайте посмотрим на метод JobRunShell.run() ниже.
WorkerThread.run()
@Override public void run() { boolean ran = false; while (run.get()) { try { synchronized(lock) { while (runnable == null && run.get()) { lock.wait(500); } if (runnable != null) { ran = true; // 启动真正执行的内容,runnable就是JobRunShell runnable.run(); } } } cache(){//...} } //if (log.isDebugEnabled()) try { getLog().debug("WorkerThread is shut down."); } catch(Exception e) { // ignore to help with a tomcat glitch } }
JobRunShell.run() и сверху WorkerThread.run(), вызов для выполнения здесь
public void run() {
qs.addInternalSchedulerListener(this);
try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();
do {
// ...
long startTime = System.currentTimeMillis();
long endTime = startTime;
// execute the job
try {
log.debug("Calling execute on job " + jobDetail.getKey());
// 执行业务代码,也就是我们的task
job.execute(jec);
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getKey() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) {
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getKey() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getKey()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
}
jec.setJobRunTime(endTime - startTime);
// 其他代码
} while (true);
} finally {
qs.removeInternalSchedulerListener(this);
}
}
QuartzJobBean.execte() и продолжить вниз
public final void execute(JobExecutionContext context) throws JobExecutionException {
try {
BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);
MutablePropertyValues pvs = new MutablePropertyValues();
pvs.addPropertyValues(context.getScheduler().getContext());
pvs.addPropertyValues(context.getMergedJobDataMap());
bw.setPropertyValues(pvs, true);
}
catch (SchedulerException ex) {
throw new JobExecutionException(ex);
}
executeInternal(context);
}
MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
try {
// 反射执行业务代码
context.setResult(this.methodInvoker.invoke());
}
catch (InvocationTargetException ex) {
if (ex.getTargetException() instanceof JobExecutionException) {
// -> JobExecutionException, to be logged at info level by Quartz
throw (JobExecutionException) ex.getTargetException();
}
else {
// -> "unhandled exception", to be logged at error level by Quartz
throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());
}
}
catch (Exception ex) {
// -> "unhandled exception", to be logged at error level by Quartz
throw new JobMethodInvocationFailedException(this.methodInvoker, ex);
}
}
V. Резюме
- Кварц, значение кварца, является метафорой для точного понимания времени, как кварцевые часы.
- Анализ исходного кода — очень счастливый процесс, и это счастье — то счастье, которое можно получить после анализа. За вертикальным и горизонтальным взаимодействием стоит высокая степень объектно-ориентированной развязки, прекрасное использование потоков, а выполнение задачи оформляется в расписание, что является просто великолепной работой.
- Для кварца.properties в простых сценариях разработчикам не нужно настраивать конфигурацию, достаточно использовать конфигурацию кварца по умолчанию, но в сценариях с высоким спросом вам все равно нужно настраивать конфигурацию, например достаточно установить через org.quartz. threadPool.threadCount Количество потоков может улучшить производительность в сценариях с несколькими заданиями.
- Quartz сильно отделяет обработку задач, работу и триггер, а также саму задачу от стратегии выполнения задачи, что может облегчить свободное сочетание N задач и M стратегий выполнения.
- Планировщик выделен отдельно, что эквивалентно командиру, который можно планировать глобально, например, отслеживать, какие триггеры готовы, назначать потоки и т. д.
- внешняя ссылка: