Celery, артефакт распределенной очереди, что вы знаете?|Python Theme Month

задняя часть Python
Celery, артефакт распределенной очереди, что вы знаете?|Python Theme Month

Эта статья участвует в "Месяце тем Python", подробнее см.Ссылка на мероприятие

Мы часто сталкиваемся с асинхронными задачами в веб-разработке, для некоторых операций, которые потребляют ресурсы и время, если они не извлекаются из приложения отдельно, опыт очень плохой, например: процесс входа в систему с проверочным кодом мобильного телефона, когда пользователь после ввода номер мобильного телефона и нажатие кнопки «Отправить», если он будет напрямую переброшен во внутреннее приложение для выполнения, это вызовет блокировку сетевого ввода-вывода, и все приложение будет очень недружественным Как решить эту проблему элегантно?

Мы можем использовать асинхронную задачу. При получении запроса мы можем запустить асинхронную задачу во время обработки бизнес-логики. Интерфейс сразу возвращает обратный отсчет, чтобы пользователь мог получить проверочный код. В то же время, потому что задача выполняется асинхронно, бэкенд может обрабатывать и другие задачи.Запрос, это идеально.

Есть много инструментов для реализации асинхронных задач, и принцип заключается в реализации очереди сообщений.Здесь мы в основном узнаем о Celery.

Что такое сельдерей?

Знакомство с сельдереем

Celery — это простая, гибкая и надежная распределенная система, написанная на Python, которая может обрабатывать большое количество сообщений, а также предоставляет инструменты, необходимые для работы и обслуживания распределенной системы.

Проще говоря, Celery — это инструмент планирования асинхронных задач, ориентированный на обработку задач в реальном времени и поддерживающий планирование задач. С помощью Celery мы можем быстро создать распределенную очередь задач и легко ею управлять. Хотя Celery написан на питоне, протокол можно реализовать на любом языке. До сих пор Ruby реализовалRCelery, реализованный node.jsnode-celeryиPHP-клиент.

Сельдерей Архитектура

Вот картинка, которая наглядно описывает состав и метод работы Celery.

图片来自网络

Архитектура Celery состоит из следующих трех частей:

Brokers

Имеется в виду middleware/посредник, здесь имеется в виду очередь задач, на что следует обратить вниманиеCelery сам по себе не является очередью задач,этоИнструменты для управления распределенными очередями задачДругими словами, Celery может быстро использовать очереди задач и управлять ими, а Celery может легко интегрироваться с очередями задач, предоставляемыми третьими сторонами, такими как RabbitMQ, Redis и т. д.

Worker

Единица выполнения задачи — это единица выполнения задачи, предоставляемая Celery, короче говоря, это рабочий элемент Celery. Это всего десяток рабочих, и у них только работа хахаха.

Подобно потребителю, он отслеживает очередь задач в режиме реального времени.Когда в очередь ставится новая задача, он извлекает задачу из очереди задач и выполняет ее.

backend/Task result store

Хранилище структуры задач, как следует из названия, — это место, где хранятся результаты задач, выполняемых рабочими. Celery поддерживает хранение результатов задач различными способами, такими как Redis, Memcached и т. д.

Проще говоря, когда пользователь или триггер в нашем приложении помещает задачу в очередь Brokers, рабочий процесс Celery берет задачу и выполняет ее, а затем сохраняет структуру в хранилище результатов задачи.

Использование сельдерея

Простая реализация

Процесс установки Celery и очереди сообщений (redis/RabbitMQ) здесь повторяться не будем, для удобства мы используем здесь redis.кликните сюдаПосетите официальный сайт, чтобы узнать больше о брокерах и бэкэнд-поддержке.

Сначала мы создаем новый файл tasks.py.

import time
from celery import Celery

brokers = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'

app = Celery('tasks', broker=brokers, backend=backend)

@app.task 
def add(x, y):
    time.sleep(2)
    return x + y

В приведенном выше коде мы импортировали библиотеку celery, создали новый экземпляр celery, передали его брокеру и серверной части, а затем создали функцию задачи add.Мы использовали time.sleep(2) для имитации трудоемких операций.

Далее нам нужно запустить службу Celery и запустить ее в текущем терминале командной строки:

celery -A tasks worker  --loglevel=info

Примечание. Если вы хотите запустить следующую команду в Windows:

celery -A celery_app worker --loglevel=info -P eventlet

В противном случае будет сообщено об ошибке. . . . . .

2021-05-16 191231.gif

Мы увидим следующий вывод:

 D:\use_Celery>celery -A tasks worker --loglevel=info -P eventlet

 -------------- celery@DESKTOP-8E96VUV v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2021-07-13 15:12:19
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x35a2270
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2021-07-13 15:12:19,097: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2021-07-13 15:12:19,134: INFO/MainProcess] mingle: searching for neighbors
[2021-07-13 15:12:20,161: INFO/MainProcess] mingle: all alone
[2021-07-13 15:12:20,271: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/0.
[2021-07-13 15:12:20,273: INFO/MainProcess] celery@DESKTOP-8E96VUV ready.

Эти выходные данные включают некоторую информацию об указанном запускаемом приложении Celer, а также о зарегистрированных задачах и т. д. 

В это время воркер уже находится в режиме ожидания, а задачи в брокере нет, нам нужно вызвать задачу для входа в брокер, чтобы воркер мог взять задачу и выполнить ее.

Создаем новый файл add_task.py:

from tasks import add

result = add.delay(5, 6)  # 使用celery提供的接口delay进行调用任务函数

while not result.ready():
    pass
print("完成:", result.get())

Мы можем увидеть журнал выполнения celery в выводе командного окна:

[2021-07-13 15:14:55,615: INFO/MainProcess] Received task: tasks.add[958eceff-c067-4e74-af87-6ae4f94eb80e]
[2021-07-13 15:14:57,615: INFO/MainProcess] Task tasks.add[958eceff-c067-4e74-af87-6ae4f94eb80e] succeeded in 2.0s: 11

Конечно, мы также можем увидеть соответствующую информацию о задаче выполнения в Redis серверной части.

На данный момент простое приложение с сельдереем завершено.

Периодическая/запланированная задача

Celery также может реализовывать временные или периодические задачи.Реализация также очень проста.Вам нужно только настроить периодическую задачу, а затем начать запускать ее.beatсервис может быть.

Создайте новый файл конфигурации Celery celery_conf.py:

from datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=3),
        'args': (16, 16)
    }
}

Затем в tasks.py пройтиapp.config_from_object('celery_config')Прочитайте конфигурацию сельдерея:

# tasks.py
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.config_from_object('celery_config')

Затем перезапустите рабочего, а затем beat:

celery -A tasks beat

Мы можем увидеть следующую информацию:

celery beat v4.4.2 (cliffs) is starting.
__    -    ... __   -        _
LocalTime -> 2021-07-15 09:05:32
Configuration ->
    . broker -> redis://127.0.0.1:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)

Затем мы видим, что командная строка, которая запускает worker, периодически выполняет задачи:

Видно, что каждые 3 секунды задача добавляется в очередь на выполнение.

Как реализовать запланированное задание?

Это также очень просто, нам нужно только изменить файл конфигурации:

# crontab任务
# 每周四7:30调用task.add
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'add-crontab-func': {
        'task': 'tasks.add',
        'schedule': crontab(hour=9, minute=20, day_of_week=4),
        'args': (30, 20),
    },
}
CELERY_TIMEZONE = 'Asia/Shanghai'  # 配置时区信息

Среди них crontab(hour=9, minute=20, day_of_week=4) представляет собой выполнение один раз каждый четверг в 9:20. Пока наша служба Celery всегда открыта, запланированная задача будет выполняться вовремя, вот я также настройка Добавлена ​​информация о часовом поясе.

Вот служба Celery запущена в 9:17 и работает так, как видно из вывода ниже, наша запланированная задача была выполнена в 20.

Суммировать

Из этого мы видим, что использование Celery для управления распределенными очередями значительно повысит эффективность нашей разработки.Здесь я приведу лишь краткое введение и использование Celery.Если вам интересно, вы можете перейти кофициальная документацияУзнайте о более продвинутом и систематическом использовании.

Напоследок хочу поблагодарить свою подругу за толерантность, понимание и поддержку в работе и жизни!