Эта статья участвует в "Месяце тем Python", подробнее см.Ссылка на мероприятие
Мы часто сталкиваемся с асинхронными задачами в веб-разработке, для некоторых операций, которые потребляют ресурсы и время, если они не извлекаются из приложения отдельно, опыт очень плохой, например: процесс входа в систему с проверочным кодом мобильного телефона, когда пользователь после ввода номер мобильного телефона и нажатие кнопки «Отправить», если он будет напрямую переброшен во внутреннее приложение для выполнения, это вызовет блокировку сетевого ввода-вывода, и все приложение будет очень недружественным Как решить эту проблему элегантно?
Мы можем использовать асинхронную задачу. При получении запроса мы можем запустить асинхронную задачу во время обработки бизнес-логики. Интерфейс сразу возвращает обратный отсчет, чтобы пользователь мог получить проверочный код. В то же время, потому что задача выполняется асинхронно, бэкенд может обрабатывать и другие задачи.Запрос, это идеально.
Есть много инструментов для реализации асинхронных задач, и принцип заключается в реализации очереди сообщений.Здесь мы в основном узнаем о Celery.
Что такое сельдерей?
Знакомство с сельдереем
Celery — это простая, гибкая и надежная распределенная система, написанная на Python, которая может обрабатывать большое количество сообщений, а также предоставляет инструменты, необходимые для работы и обслуживания распределенной системы.
Проще говоря, Celery — это инструмент планирования асинхронных задач, ориентированный на обработку задач в реальном времени и поддерживающий планирование задач. С помощью Celery мы можем быстро создать распределенную очередь задач и легко ею управлять. Хотя Celery написан на питоне, протокол можно реализовать на любом языке. До сих пор Ruby реализовалRCelery, реализованный node.jsnode-celeryиPHP-клиент.
Сельдерей Архитектура
Вот картинка, которая наглядно описывает состав и метод работы Celery.
Архитектура Celery состоит из следующих трех частей:
Brokers
Имеется в виду middleware/посредник, здесь имеется в виду очередь задач, на что следует обратить вниманиеCelery сам по себе не является очередью задач,этоИнструменты для управления распределенными очередями задачДругими словами, Celery может быстро использовать очереди задач и управлять ими, а Celery может легко интегрироваться с очередями задач, предоставляемыми третьими сторонами, такими как RabbitMQ, Redis и т. д.
Worker
Единица выполнения задачи — это единица выполнения задачи, предоставляемая Celery, короче говоря, это рабочий элемент Celery. Это всего десяток рабочих, и у них только работа хахаха.
Подобно потребителю, он отслеживает очередь задач в режиме реального времени.Когда в очередь ставится новая задача, он извлекает задачу из очереди задач и выполняет ее.
backend/Task result store
Хранилище структуры задач, как следует из названия, — это место, где хранятся результаты задач, выполняемых рабочими. Celery поддерживает хранение результатов задач различными способами, такими как Redis, Memcached и т. д.
Проще говоря, когда пользователь или триггер в нашем приложении помещает задачу в очередь Brokers, рабочий процесс Celery берет задачу и выполняет ее, а затем сохраняет структуру в хранилище результатов задачи.
Использование сельдерея
Простая реализация
Процесс установки Celery и очереди сообщений (redis/RabbitMQ) здесь повторяться не будем, для удобства мы используем здесь redis.кликните сюдаПосетите официальный сайт, чтобы узнать больше о брокерах и бэкэнд-поддержке.
Сначала мы создаем новый файл tasks.py.
import time
from celery import Celery
brokers = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery('tasks', broker=brokers, backend=backend)
@app.task
def add(x, y):
time.sleep(2)
return x + y
В приведенном выше коде мы импортировали библиотеку celery, создали новый экземпляр celery, передали его брокеру и серверной части, а затем создали функцию задачи add.Мы использовали time.sleep(2) для имитации трудоемких операций.
Далее нам нужно запустить службу Celery и запустить ее в текущем терминале командной строки:
celery -A tasks worker --loglevel=info
Примечание. Если вы хотите запустить следующую команду в Windows:
celery -A celery_app worker --loglevel=info -P eventlet
В противном случае будет сообщено об ошибке. . . . . .
Мы увидим следующий вывод:
D:\use_Celery>celery -A tasks worker --loglevel=info -P eventlet
-------------- celery@DESKTOP-8E96VUV v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2021-07-13 15:12:19
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x35a2270
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results: redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2021-07-13 15:12:19,097: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2021-07-13 15:12:19,134: INFO/MainProcess] mingle: searching for neighbors
[2021-07-13 15:12:20,161: INFO/MainProcess] mingle: all alone
[2021-07-13 15:12:20,271: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/0.
[2021-07-13 15:12:20,273: INFO/MainProcess] celery@DESKTOP-8E96VUV ready.
Эти выходные данные включают некоторую информацию об указанном запускаемом приложении Celer, а также о зарегистрированных задачах и т. д.
В это время воркер уже находится в режиме ожидания, а задачи в брокере нет, нам нужно вызвать задачу для входа в брокер, чтобы воркер мог взять задачу и выполнить ее.
Создаем новый файл add_task.py:
from tasks import add
result = add.delay(5, 6) # 使用celery提供的接口delay进行调用任务函数
while not result.ready():
pass
print("完成:", result.get())
Мы можем увидеть журнал выполнения celery в выводе командного окна:
[2021-07-13 15:14:55,615: INFO/MainProcess] Received task: tasks.add[958eceff-c067-4e74-af87-6ae4f94eb80e]
[2021-07-13 15:14:57,615: INFO/MainProcess] Task tasks.add[958eceff-c067-4e74-af87-6ae4f94eb80e] succeeded in 2.0s: 11
Конечно, мы также можем увидеть соответствующую информацию о задаче выполнения в Redis серверной части.
На данный момент простое приложение с сельдереем завершено.
Периодическая/запланированная задача
Celery также может реализовывать временные или периодические задачи.Реализация также очень проста.Вам нужно только настроить периодическую задачу, а затем начать запускать ее.beatсервис может быть.
Создайте новый файл конфигурации Celery celery_conf.py:
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'add': {
'task': 'tasks.add',
'schedule': timedelta(seconds=3),
'args': (16, 16)
}
}
Затем в tasks.py пройтиapp.config_from_object('celery_config')
Прочитайте конфигурацию сельдерея:
# tasks.py
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.config_from_object('celery_config')
Затем перезапустите рабочего, а затем beat:
celery -A tasks beat
Мы можем увидеть следующую информацию:
celery beat v4.4.2 (cliffs) is starting.
__ - ... __ - _
LocalTime -> 2021-07-15 09:05:32
Configuration ->
. broker -> redis://127.0.0.1:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
Затем мы видим, что командная строка, которая запускает worker, периодически выполняет задачи:
Видно, что каждые 3 секунды задача добавляется в очередь на выполнение.
Как реализовать запланированное задание?
Это также очень просто, нам нужно только изменить файл конфигурации:
# crontab任务
# 每周四7:30调用task.add
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'add-crontab-func': {
'task': 'tasks.add',
'schedule': crontab(hour=9, minute=20, day_of_week=4),
'args': (30, 20),
},
}
CELERY_TIMEZONE = 'Asia/Shanghai' # 配置时区信息
Среди них crontab(hour=9, minute=20, day_of_week=4) представляет собой выполнение один раз каждый четверг в 9:20. Пока наша служба Celery всегда открыта, запланированная задача будет выполняться вовремя, вот я также настройка Добавлена информация о часовом поясе.
Вот служба Celery запущена в 9:17 и работает так, как видно из вывода ниже, наша запланированная задача была выполнена в 20.
Суммировать
Из этого мы видим, что использование Celery для управления распределенными очередями значительно повысит эффективность нашей разработки.Здесь я приведу лишь краткое введение и использование Celery.Если вам интересно, вы можете перейти кофициальная документацияУзнайте о более продвинутом и систематическом использовании.
Напоследок хочу поблагодарить свою подругу за толерантность, понимание и поддержку в работе и жизни!