Недавно я исследовал артефакт асинхронной задачи-Celery и обнаружил, что он очень прост в использовании.Можно сказать, что он высокодоступен.Если вы отправляете команду выполнения задачи в Celery, пока работает исполнительный блок (рабочий) Celery , он обязательно выполнится; в случае выполнения Если устройство (рабочий процесс) выйдет из строя, например, сбой питания или отключение сети, до тех пор, пока исполнительный модуль (рабочий процесс) возобновит работу, он будет продолжать выполнять введенные вами команды. Это имеет важное практическое значение: если система транзакций получает большое количество запросов на транзакцию, но хост зависает, внешние пользователи все еще могут продолжать отправлять запросы на транзакцию, и пользователям не нужно ждать после отправки запросов на транзакцию. После восстановления хоста отправленный запрос на транзакцию может продолжать выполняться, но время, в течение которого пользователь должен получить подтверждение транзакции, продлевается, но это не влияет на работу пользователя.
Знакомство с сельдереем
Это асинхронный инструмент планирования задач.Пользователи используют Celery для генерации задач, заимствуют посредников для передачи задач, а блоки выполнения задач потребляют задачи от посредников. Блок выполнения задач может быть развернут на одной машине или распределен, поэтому Celery представляет собой асинхронную очередь задач с высокодоступной моделью производитель-потребитель. Вы можете передать свои задачи в Celery для обработки, или вы можете позволить Celery автоматически планировать задачи, такие как crontab, а затем делать другие вещи.Вы можете проверить статус выполнения задачи в любое время, или вы можете позволить Celery автоматически отправлять результаты выполнения после завершения исполнения.
Сценарии применения:
-
Задачи с большим количеством одновременных запросов. Интернет стал популярен, и транзакции людей с едой, одеждой, жильем и транспортом могут осуществляться онлайн, что не может избежать чрезвычайно высоких одновременных запросов задач в определенное время.Некоторое время назад объем транзакций резко увеличился, а подтверждение транзакций время было долгим.В это время задача запроса транзакции может быть передана в Celery для асинхронного выполнения, а результат возвращается пользователю после выполнения. Пользователям не нужно ждать после отправки, и пользователи будут уведомлены после завершения задачи (успешной покупки или оплаты), что повышает общую пропускную способность и время отклика веб-сайта, а также может обеспечить высокий уровень параллелизма практически без увеличения затрат на оборудование. .
-
Запланированные задачи.在云计算,大数据,集群等技术越来越普及,生产环境的机器也越来越多,定时任务是避免不了的,如果每台机器上运行着自己的 crontab 任务,管理起来相当麻烦,例如当进行灾备切换时,某些 crontab 任务可能需要单独手工调起,给运维人员造成极大的麻烦,有了 Celery ,你可以集中管理所有机器的定时任务,而且灾备无论何时切换,crontab 任务总能正确的执行。
-
Асинхронные задачи. Некоторые трудоемкие операции, такие как операции ввода-вывода, сетевые запросы, вы можете перейти к асинхронному выполнению Celery, пользователь может выполнять другие действия, когда задача будет завершена, вернет результаты пользователю, может улучшить пользователя опыт.
Преимущества сельдерея
-
Написан на чистом Python, с открытым исходным кодом. Это уже стоит на плечах гигантов, и пока Celery написан на чистом Python, протокол можно реализовать на любом языке. На данный момент есть RCelery, реализованный на Ruby, node-celery, реализованный на node.js, и клиент PHP, а совместимость языков также может быть достигнута с помощью веб-перехватчиков.
-
Гибкая конфигурация. Конфигурация по умолчанию удовлетворила большинство потребностей, поэтому вы можете в основном использовать ее без написания файла конфигурации.Конечно, если у вас есть персонализированная настройка, вы можете использовать файл конфигурации или вы можете записать конфигурацию в исходный код. файл.
-
Легко контролировать. Все статусы миссии находятся под вашим контролем.
-
Сложная обработка ошибок.
-
Гибкая очередь задач и маршрутизация задач. Вы можете легко запустить задачу в указанном вами очереди, которая называется маршрутизацией задач.
Архитектура сельдерея
Чтобы изучить инструмент, лучше всего сначала понять его архитектуру, дополненную краткими кодами для практики, а наиболее углубленно прочитать его исходный код.На следующем рисунке показана схема архитектуры Celery.
производитель задач: вызов API, функций и декораторов, предоставляемых Celery, для создания задач и передачи их в очередь задач — все это производители задач.
Планирование задач: Процесс Celery Beat будет читать содержимое файла конфигурации и периодически отправлять задачи, которые необходимо выполнить в конфигурации, в очередь задач.
Маклер: Celery взаимодействует с сообщениями, обычно используя посредника (брокера) для прохождения перед клиентом и работником.Процесс начинается с того, что клиент добавляет сообщение в очередь, а затем посредник отправляет сообщение работнику. Официальными инструментами для реализации Брокера являются:
| название | государство | монитор | дистанционное управление |
|---|---|---|---|
| RabbitMQ | стабильность | да | да |
| Redis | стабильность | да | да |
| Mongo DB | экспериментальный | да | да |
| Beanstalk | экспериментальный | нет | нет |
| Amazon SQS | экспериментальный | нет | нет |
| Couch DB | экспериментальный | нет | нет |
| Zookeeper | экспериментальный | нет | нет |
| Django DB | экспериментальный | нет | нет |
| SQLAlchemy | экспериментальный | нет | нет |
| Iron MQ | третья сторона | нет | нет |
В реальном использовании мы можем выбрать RabbitMQ или Redis в качестве посредника.
работник исполнительного отдела: Worker – это единица выполнения задачи и потребитель, принадлежащий очереди задач.Он постоянно следит за очередью задач.Когда в очереди появляется новая задача, она будет извлечена и выполнена. Работник может работать на разных машинах, если он указывает на одного и того же посредника.Рабочий процесс также может отслеживать одну или несколько очередей задач.Важная причина, по которой Celery является распределенной очередью задач, заключается в том, что рабочие процессы могут быть распределены и запущены на нескольких хостах. Вам не нужно перезапускать воркер после изменения файла конфигурации, оно вступит в силу автоматически.
сервер хранения результатов задач: используется для постоянного хранения результатов выполнения задач Worker.Celery поддерживает различные способы хранения результатов задач, включая AMQP, Redis, memcached, MongoDb, SQLAlchemy и т. д.
Пример использования сельдерея:
Возьмите версию Python 3.6.5 в качестве примера.
1. Установите библиотеки Python: celery, redis.
pip install celery #安装celery
pip install celery[librabbitmq,redis,auth,msgpack] #安装celery对应的依赖
Другие зависимости Celery следующие:
Сериализация:
celery [auth]: сериализовать с использованием auth.
celery[msgpack]: сериализовать с помощью msgpack.
сельдерей [yaml]: сериализовать с помощью yaml.
Параллелизм:
celery [eventlet]: используйте пул событий.
celery[gevent]: используйте пул gevent.
сельдерей [потоки]: использовать пул потоков.
Транспорт и серверная часть:
celery[librabbitmq]: библиотека C, использующая librabbitmq.
celery[redis]: используйте Redis в качестве транспорта сообщений или результатов.
celery[mongodb]: используйте MongoDB в качестве транспорта для обмена сообщениями (экспериментально) или в качестве серверной части результатов (поддерживается).
celery[sqs]: используйте Amazon SQS в качестве транспорта сообщений (экспериментальный вариант).
celery [memcache]: в результате использования бэкенда memcache.
celery[cassandra]: использует Apache Cassandra в качестве серверной части результата.
celery[couchdb]: использовать CouchDB в качестве транспорта сообщений (экспериментально).
celery[couchbase]: используйте CouchBase в качестве серверной части результата.
celery[beanstalk]: используйте Beanstalk в качестве транспорта для обмена сообщениями (экспериментально).
celery[zookeeper]: использовать Zookeeper в качестве метода передачи сообщений.
Сельдерей [Zeromq]: Zeromq используется в качестве режима передачи сообщений (экспериментальный).
celery [sqlalchemy]: SQLAlchemy используется как режим передачи сообщений (экспериментальный) или как результат задней части (уже поддерживается).
celery[pyro]: используйте метод обмена сообщениями Pyro4 (экспериментальный).
celery[slmq]: используйте транспорт SoftLayerMessageQueue (экспериментальный).
2. Установите Redis, в качестве примера возьмем операционную систему ubuntu (если вы используете RabbitMQ, то можете установить его самостоятельно).
Установить из исходников:
$ wget http://download.redis.io/releases/redis-4.0.11.tar.gz
$ tar xzf redis-4.0.11.tar.gz
$ cd redis-4.0.11
$ make
Измените файл конфигурации redis redis.conf, измените bind=127.0.0.0.1 на bind=0.0.0.0, что означает разрешить удаленный доступ к базе данных redis.
запустить redis-сервер
$ cd src
$ ./redis-server ../redis.conf
3. Первое приложение сельдерея.
Функция: смоделируйте трудоемкую операцию и напечатайте IP-адрес машины, на которой находится работник.И посредник, и хранилище результатов используют базу данных Redis.
#encoding=utf-8
#filename my_first_celery.py
from celery import Celery
import time
import socket
app = Celery(''tasks'', broker='redis://127.0.0.1:6379/0',backend ='redis://127.0.0.1:6379/0' )
def get_host_ip():
"""
查询本机ip地址
:return: ip
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
@app.task
def add(x, y):
time.sleep(3) # 模拟耗时操作
s = x + y
print("主机IP {}: x + y = {}".format(get_host_ip(),s))
return s
Запустите этого рабочего:
celery -A my_first_celery worker -l info
Здесь -A означает имя модуля нашей программы, worker означает запуск исполняющей единицы, а -l - уровень пакета, что означает уровень журнала для печати. Вы можете использовать команду celery --help для просмотра справочной документации по командам celery. После выполнения команды рабочий интерфейс отображает следующую информацию:
aaron@ubuntu:~/project$ celery -A my_first_celery worker -l info
-------------- celery@ubuntu v4.2.1 (windowlicker)
---- **** -----
--- * *** * -- Linux-4.10.0-37-generic-x86_64-with-Ubuntu-16.04-xenial 2018-08-27 22:46:00
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f1ce0747080
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results: redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. my_first_celery.add
[2018-08-27 22:46:00,726: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2018-08-27 22:46:00,780: INFO/MainProcess] mingle: searching for neighbors
[2018-08-27 22:46:02,075: INFO/MainProcess] mingle: all alone
[2018-08-27 22:46:02,125: INFO/MainProcess] celery@ubuntu ready.
Это довольно ясно. Если вы не хотите использовать команду celery для запуска рабочего, вы можете напрямую использовать файл для управления, изменить my_first_celery.py (добавить функцию входа main)
if __name__ == '__main__':
app.start()
повторно выполнить
python my_first_celery.py worker
Вот и все.
4. Вызовите задачу
Напишите следующий скрипт start_task.py в том же каталоге my_first_celery.py, как показано ниже.
from my_first_celery import add #导入我们的任务函数add
import time
result = add.delay(12,12) #异步调用,这一步不会阻塞,程序会立即往下运行
while not result.ready():# 循环检查任务是否执行完毕
print(time.strftime("%H:%M:%S"))
time.sleep(1)
print(result.get()) #获取任务的返回结果
print(result.successful()) #判断任务是否成功执行
воплощать в жизнь
python start_task.py
Результат выглядит следующим образом:
22:50:59
22:51:00
22:51:01
24
True
Было обнаружено, что после ожидания около 3 секунд задача вернула результат 24 и была успешно завершена.В это время информация, добавленная в рабочий интерфейс, выглядит следующим образом:
[2018-08-27 22:50:58,840: INFO/MainProcess] Received task: my_first_celery.add[a0c4bb6b-17af-474c-9eab-407d593a7807]
[2018-08-27 22:51:01,898: WARNING/ForkPoolWorker-1] 主机IP 192.168.195.128: x + y = 24
[2018-08-27 22:51:01,915: INFO/ForkPoolWorker-1] Task my_first_celery.add[a0c4bb6b-17af-474c-9eab-407d593a7807] succeeded in 3.067237992000173s: 24
Информация здесь очень подробная, среди которых a0c4bb6b-17af-474c-9eab-407d593a7807 - это taskid.Пока указан бэкэнд, вы можете перейти к бэкенду, чтобы найти текущий результат в любое время по id задачи. использование выглядит следующим образом:
>>> from my_first_celery import add
>>> taskid= 'a0c4bb6b-17af-474c-9eab-407d593a7807'
>>> add.AsyncResult(taskid).get()
24
>>>#或者
>>> from celery.result import AsyncResult
>>> AsyncResult(taskid).get()
24
Важное примечание. Если вы хотите выполнить задание на рабочем компьютере удаленно, скопируйте my_first_celery.py и start_tasks.py на удаленный хост (необходимо установить сельдерей), измените my_first_celery.py, чтобы он указывал на того же посредника и хранилище результатов. , а затем выполните start_tasks.py. Вы можете удаленно выполнять задания на рабочих машинах. Код для функции my_first_celery.add необязателен, также следует вызывать задачу так:
from my_first_celery import app
app.send_task("my_first_celery.add",args=(1,3))
5. Первый проект Celery
В производственной среде часто бывает большое количество задач, которые нужно планировать.Неудобно иметь один файл.Celery безусловно поддерживает модульную структуру.Я написал небольшой проект Celery для обучения,включая операции с очередями,планирование задач и т. д. Практическая работа, дерево каталогов выглядит так:
вinit.py — это пустой файл, цель которого — сообщить Python, что myCeleryProj является импортируемым пакетом.
app.py
from celery import Celery
app = Celery("myCeleryProj", include=["myCeleryProj.tasks"])
app.config_from_object("myCeleryProj.settings")
if __name__ == "__main__":
app.start()
settings.py
from kombu import Queue
import re
from datetime import timedelta
from celery.schedules import crontab
CELERY_QUEUES = ( # 定义任务队列
Queue("default", routing_key="task.#"), # 路由键以“task.”开头的消息都进default队列
Queue("tasks_A", routing_key="A.#"), # 路由键以“A.”开头的消息都进tasks_A队列
Queue("tasks_B", routing_key="B.#"), # 路由键以“B.”开头的消息都进tasks_B队列
)
CELERY_TASK_DEFAULT_QUEUE = "default" # 设置默认队列名为 default
CELERY_TASK_DEFAULT_EXCHANGE = "tasks"
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_TASK_DEFAULT_ROUTING_KEY = "task.default"
CELERY_ROUTES = (
[
(
re.compile(r"myCeleryProj\.tasks\.(taskA|taskB)"),
{"queue": "tasks_A", "routing_key": "A.import"},
), # 将tasks模块中的taskA,taskB分配至队列 tasks_A ,支持正则表达式
(
"myCeleryProj.tasks.add",
{"queue": "default", "routing_key": "task.default"},
), # 将tasks模块中的add任务分配至队列 default
],
)
# CELERY_ROUTES = (
# [
# ("myCeleryProj.tasks.*", {"queue": "default"}), # 将tasks模块中的所有任务分配至队列 default
# ],
# )
# CELERY_ROUTES = (
# [
# ("myCeleryProj.tasks.add", {"queue": "default"}), # 将add任务分配至队列 default
# ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 将taskA任务分配至队列 tasks_A
# ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 将taskB任务分配至队列 tasks_B
# ],
# )
BROKER_URL = "redis://127.0.0.1:6379/0" # 使用redis 作为消息代理
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0" # 任务结果存在Redis
CELERY_RESULT_SERIALIZER = "json" # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERYBEAT_SCHEDULE = {
"add": {
"task": "myCeleryProj.tasks.add",
"schedule": timedelta(seconds=10),
"args": (10, 16),
},
"taskA": {
"task": "myCeleryProj.tasks.taskA",
"schedule": crontab(hour=21, minute=10),
},
"taskB": {
"task": "myCeleryProj.tasks.taskB",
"schedule": crontab(hour=21, minute=12),
},
}
tasks.py
import os
from myCeleryProj.app import app
import time
import socket
def get_host_ip():
"""
查询本机ip地址
:return: ip
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
@app.task
def add(x, y):
s = x + y
time.sleep(3) # 模拟耗时操作
print("主机IP {}: x + y = {}".format(get_host_ip(), s))
return s
@app.task
def taskA():
print("taskA begin...")
print(f"主机IP {get_host_ip()}")
time.sleep(3)
print("taskA done.")
@app.task
def taskB():
print("taskB begin...")
print(f"主机IP {get_host_ip()}")
time.sleep(3)
print("taskB done.")
readme.txt
#启动 worker
#分别在三个终端窗口启动三个队列的worker,执行命令如下所示:
celery -A myCeleryProj.app worker -Q default -l info
celery -A myCeleryProj.app worker -Q tasks_A -l info
celery -A myCeleryProj.app worker -Q tasks_B -l info
#当然也可以一次启动多个队列,如下则表示一次启动两个队列tasks_A,tasks_B。
celery -A myCeleryProj.app worker -Q tasks_A,tasks_B -l info
#则表示一次启动两个队列tasks_A,tasks_B。
#最后我们再开启一个窗口来调用task: 注意观察worker界面的输出
>>> from myCeleryProj.tasks import *
>>> add.delay(4,5);taskA.delay();taskB.delay() #同时发起三个任务
<AsyncResult: 21408d7b-750d-4c88-9929-fee36b2f4474>
<AsyncResult: 737b9502-77b7-47a6-8182-8e91defb46e6>
<AsyncResult: 69b07d94-be8b-453d-9200-12b37a1ca5ab>
#也可以使用下面的方法调用task
>>> from myCeleryProj.app import app
>>> app.send_task(myCeleryProj.tasks.add,args=(4,5)
>>> app.send_task(myCeleryProj.tasks.taskA)
>>> app.send_task(myCeleryProj.tasks.taskB)
(Заканчивать)
При необходимости подпишитесь на общедоступную учетную запись WeChat somenzz и ответьте celery, чтобы загрузить исходный код.