Среда задач синхронизации Python: анализ исходного кода APScheduler (1)

Python
Среда задач синхронизации Python: анализ исходного кода APScheduler (1)

предисловие

APScheduler – это хорошо известная платформа для выполнения задач по расписанию на языке Python. Она может удовлетворить потребности в выполнении задач по расписанию или периодическом выполнении программных задач. Она похожа на crontab в Linux, но более мощная, чем crontab. Эта структура может не только добавлять и удалить запланированные задачи, но также предоставляет функции для различных постоянных задач.

Слабо распределенная структура APScheduler, поскольку каждый объект задачи хранится в текущем узле, может распространяться только в виде человеческой плоти, например, с помощью Redis.

Когда я впервые столкнулся с APScheduler, я бы сказал, что у него много концепций. Когда я впервые связался с ним, это было потому, что было слишком много концепций, и было удобнее использовать crontab напрямую. Но сейчас многие проекты компании основаны на APScheduler, так что давайте взглянем на его исходный код.

предварительная концепция

Упомяните ключевые понятия в APScheduler максимально простым языком.

  • Job: объект задачи, который является задачей, которую вы хотите выполнить.
  • JobStores: метод хранения задач, по умолчанию хранится в памяти, а также может поддерживать redis, mongodb и т. д.
  • Исполнители: Исполнители — это вещи, которые выполняют задачи
  • Триггер: триггер, достижение определенного условия запускает соответствующую логику вызова
  • Планировщик: планировщик, то, что соединяет вышеперечисленные части.

APScheduler предоставляет несколько планировщиков, и разные планировщики подходят для разных сценариев.В настоящее время наиболее распространенным, который я использую, является фоновый планировщик BackgroundScheduler, который подходит для планирования программ, требующих запуска программ в фоновом режиме.

Существуют различные другие планировщики:

BlockingScheduler: хорошо подходит для ситуаций, когда в процессе выполняется только одна задача, обычно используется, когда планировщик — это единственное, что вы хотите запустить.

AsyncIOScheduler: подходит для использования с инфраструктурой asyncio.

GeventScheduler: подходит для использования с gevent framework

TornadoScheduler: подходит для приложений, использующих структуру Tornado.

TwistedScheduler: приложения, использующие платформу Twisted.

QtScheduler: когда использовать QT

В этой статье анализируется только логика, связанная с BackgroundScheduler, сначала кратко рассмотрим официальный пример, а затем проанализируем его слой за слоем.

Анализ BackgroundScheduler

Официальный код примера выглядит следующим образом.

from datetime import datetime
import time
import os
from apscheduler.schedulers.background import BackgroundScheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'interval', seconds=3) # 添加一个任务,3秒后运行
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        # 这是在这里模拟应用程序活动(使主线程保持活动状态)。
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        # 关闭调度器
        scheduler.shutdown()

Приведенный выше код очень прост. Сначала создайте экземпляр планировщика с помощью метода BackgroundScheduler, а затем вызовите метод add_job, чтобы добавить задачи для выполнения в JobStores. По умолчанию он хранится в памяти, точнее, он хранится в словаре. , и, наконец, через метод start запускает планировщик, и APScheduler будет запускать триггер с именем interval каждые 3 секунды, чтобы планировщик планировал выполнение исполнителем по умолчанию логики в методе tick.

Когда программа полностью выполнена, вызовите метод shutdown, чтобы закрыть планировщик.

BackgroundScheduler на самом деле основан на потоках, а потоки имеют концепцию потоков демона.Если включен режим потока демона, планировщик не нужно закрывать.

Сначала взгляните на исходный код класса BackgroundScheduler.

# apscheduler/schedulers/background.py

class BackgroundScheduler(BlockingScheduler):

    _thread = None

    def _configure(self, config):
        self._daemon = asbool(config.pop('daemon', True))
        super()._configure(config)

    def start(self, *args, **kwargs):
        # 创建事件通知
        # 多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活。
        self._event = Event() 
        BaseScheduler.start(self, *args, **kwargs)
        self._thread = Thread(target=self._main_loop, name='APScheduler')
        # 设置为守护线程,Python主线程运行完后,直接结束不会理会守护线程的情况,
        # 如果是非守护线程,Python主线程会在运行完后,等待其他非守护线程运行完后,再结束
        self._thread.daemon = self._daemon # daemon 是否为守护线程
        self._thread.start() # 启动线程

    def shutdown(self, *args, **kwargs):
        super().shutdown(*args, **kwargs)
        self._thread.join()
        del self._thread

В приведенном выше коде даны подробные комментарии и дано краткое пояснение.

Метод _configure в основном используется для настройки параметров, здесь определяется параметр self._daemon, а затем через метод super вызывается метод _configure родительского класса.

Метод запуска является его методом запуска, и логика также очень проста.Он создает событие потока Событие, которое представляет собой механизм синхронизации потока.Если вы посмотрите на его исходный код, вы обнаружите, что события потока реализованы на основе условных блокировок События потока предоставляют три основных метода: set(), wait() и clear().

  • Метод set() установит состояние флага события в true.
  • Метод clear() устанавливает состояние флага события в false
  • Метод wait() блокирует поток до тех пор, пока статус флага события не станет истинным.

После создания события потока вызывается метод start() его родительского класса.Этот метод является настоящим методом запуска.Временно откладываем его.После запуска создается поток через метод Thread.Целевая функция нитьself._main_loop,это основная тренировка планировщика.Если планировщик не закрыт,он всегда будет выполнять логику в основном цикле,чтобы реализовать различные функции APScheduler.Это очень важный метод.Точно так же временно освободить его. После завершения создания поток запуска в порядке.

После того, как поток создан, определите демон потока.Если демон имеет значение True, это означает, что текущий поток является потоком демона, в противном случае это поток, не являющийся демоном.

Кратко упомянем, что если поток является потоком демона, то после выполнения логики основного потока Python он выйдет напрямую и проигнорирует поток демона.Если это поток, не являющийся демоном, после выполнения основного потока Python он будет ждать, пока запустятся все остальные потоки, не являющиеся демонами, и завершится после выполнения.

Метод выключения сначала вызывает метод выключения родительского класса, затем вызывает метод соединения и, наконец, напрямую удаляет объект потока.

Прочитав код класса BackgroundScheduler, вернитесь к примеру кода в начале.После создания экземпляра планировщика с помощью BackgroundScheduler вызывается метод add_job, и к методу add_job добавляются три параметра, которые являются методом тика, вы хотите выполнять регулярно.Имя триггера триггера называется интервалом, а параметр этого триггера – секунды=3.

Можно ли изменить имя триггера триггера на любой символ? Это невозможно.APScheduler на самом деле использует технику точки входа в Python здесь.Если вы прошли процесс создания пакета Python и загрузки его в PYPI, у вас должно сложиться представление о точке входа. На самом деле точку входа можно не только запаковать навсегда, но и использовать для модульной архитектуры плагинов.Этот контент шире, и о нем мы поговорим позже.

Проще говоря, методу add_job() необходимо передать соответствующее имя триггера, интервал будет соответствовать классу apscheduler.triggers.interval.IntervalTrigger, а параметр секунд является параметром этого класса.

Анатомия метода add_job

Исходный код метода add_job выглядит следующим образом.

# apscheduler/schedulers/base.py/BaseScheduler

    def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
                misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
                next_run_time=undefined, jobstore='default', executor='default',
                replace_existing=False, **trigger_args):
        job_kwargs = {
            'trigger': self._create_trigger(trigger, trigger_args),
            'executor': executor,
            'func': func,
            'args': tuple(args) if args is not None else (),
            'kwargs': dict(kwargs) if kwargs is not None else {},
            'id': id,
            'name': name,
            'misfire_grace_time': misfire_grace_time,
            'coalesce': coalesce,
            'max_instances': max_instances,
            'next_run_time': next_run_time
        }
        # 过滤
        job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if
                          value is not undefined)
        # 实例化具体的任务对象
        job = Job(self, **job_kwargs)

        # Don't really add jobs to job stores before the scheduler is up and running
        with self._jobstores_lock:
            if self.state == STATE_STOPPED:
                self._pending_jobs.append((job, jobstore, replace_existing))
                self._logger.info('Adding job tentatively -- it will be properly scheduled when '
                                  'the scheduler starts')
            else:
                self._real_add_job(job, jobstore, replace_existing)

        return job

Кода в методе add_job не так много, в начале создается словарь job_kwargs, который содержит триггеры, исполнители и т.д., что просто и разумно.

  • Триггер триггера создается методом self._create_trigger(), для этого метода требуется два параметра, триггером в коде фактически является строка интервала, а trigger_args — соответствующий параметр.
  • Exectuor executor в настоящее время используется по умолчанию, что будет обсуждаться позже.
  • Метод обратного вызова func — это логика, которую мы действительно хотим выполнить.Триггер запускает планировщик, а планировщик вызывает исполнителя для выполнения определенной логики.
  • misfire_grace_time: Комментарий интерпретируется как "задача будет выполняться в течение нескольких секунд после указанного времени выполнения", что можно понять, только прочитав соответствующие документы. Например, задача изначально выполнялась в 12:00, но не была запланирована почему-то в 12:00.Теперь в 12:30 при планировании будет оцениваться разница между текущим временем и предварительно запланированным временем.Если для параметра misfire_grace_time установлено значение 20, задача, которая не была запланирована ранее, не будет запланировано для выполнения. Если для параметра misfire_grace_time установлено значение 60, оно будет запланировано.
  • объединение: если задача по какой-то причине фактически не выполняется, что приводит к накоплению задач, например, 10 одинаковых людей, если объединение имеет значение True, будет выполнен только последний слой.Если объединение имеет значение False, попробуйте выполнить 10 последовательно. Второсортный.
  • max_instances: одновременно может выполняться до нескольких экземпляров задачи.
  • next_run_time: время следующего выполнения задачи.

Затем выполните фильтр, а затем передайте параметры в класс Job, чтобы завершить создание экземпляра объекта задачи.

Дальнейшая логика относительно проста.Сначала определите, можно ли получить блокировку self._jobstores_lock.Это фактически реентерабельная блокировка.В Python реализация реентерабельных блокировок основана на обычных мьютексных блокировках,но есть дополнительная переменная для подсчета. Каждый раз, когда добавляется блокировка, переменная увеличивается на единицу, а переменная уменьшается на единицу каждый раз, когда блокировка снимается.Только когда переменная равна 0, мьютекс фактически освобождается.

После получения блокировки сначала оцените состояние текущего планировщика, если он STATE_STOPPED (состояние остановки), добавьте задачу в_pending_jobsВ списке ожидания, если он не остановлен, он будет вызван_real_add_jobметод, который затем возвращает объект задания.

фактически_real_add_jobЭтот метод является реальным способом добавления задания объекта задачи в указанный бэкенд хранилища.

Когда объект задачи добавляется в указанное хранилище (по умолчанию хранится непосредственно в памяти), планировщик извлекает его для выполнения.

В примере кода после выполнения метода планировщика add_job немедленно выполняется метод запуска планировщика.

конец

Учитывая количество слов, эта статья будет здесь первой, а анализ APScheduler будет продолжен позже.

Если статья была вам полезна, нажмите «Смотрю», чтобы поддержать Erliang, увидимся в следующей статье.