Введение
Celery — это распределенная очередь задач, реализованная Python, которая обычно имеет три функции.
- 1. Чтобы снизить нагрузку на параллелизм, сначала запишите задачу в очередь, а затем запустите ее, когда появятся свободные ресурсы.
- 2. Чтобы выполнить запланированную задачу, сначала запишите задачу в очередь, а затем выполните ее в указанное время.
- 3. Асинхронные задачи, в вебе есть трудоемкие задачи, которые можно сначала записать в очередь, а потом выполнить фоновым процессом задачи
Было много статей, описывающих использование и простые принципы Celery, и в этой статье он также будет кратко упомянут, но это не займет много чернил.
В этой статье основное внимание уделяется использованию Python для реализации простого Celery и использованию Celery, реализованного самостоятельно, для реализации асинхронных задач.Как и в предыдущей статье «Python Web: Flask выполняет задачи асинхронно», создайте простую сеть через Flask, а затем выполнить задачу «Время потребления», я надеюсь, что внешний интерфейс может отображать ход выполнения задачи через индикатор выполнения.
Следует отметить, что исходный код Celery здесь интерпретироваться не будет.Его исходный код имеет много инженерных деталей и относительно сложен.Здесь всего лишь простая реализация игрушечного Celery исходя из его сути.Конечно, эта игрушка Celery не может можно сравнить с Celery с точки зрения стабильности и эффективности.По сравнению, но это может быть хорошим пониманием того, как в целом реализован Celery.
В этой статье основное внимание уделяется «разделению форм и духовному союзу», что отличается от деталей реализации Celery, но основной принцип тот же.
Итак, приступим!
Понятие и принцип сельдерея
Celery 5 ключевых понятий, если вы их поймете, у вас будет общее представление о Celery.
-
1. Задача (задача) Проще говоря, это то, что вы делаете, например, отправляете электронные письма в процессе регистрации пользователя.
-
2. Рабочий Человек, который обрабатывает задачу в фоновом режиме
-
3. Брокер (брокер) Суть в очереди, Задание будет передано Брокеру, а Воркер возьмет Задание у Брокера и обработает его
-
4. Бит Планировщик задач по расписанию, в соответствии с установленным временем, добавить данные в брокер, а затем дождаться обработки работником
-
5. Бэкенд Объект, используемый для сохранения результата выполнения Worker, каждая Task должна иметь возвращаемое значение, и эти возвращаемые значения находятся в Backend
Здесь мы отложим в сторону различные концепции, посмотрим на Celery с более существенной точки зрения и обнаружим, что это процесс хранения сериализации задач и получения десериализации.
Взяв в качестве примера веб-асинхронные задачи, использование обычно выглядит следующим образом:
- 1. Есть функция, которой нужно долго обрабатывать ввод-вывод, если ее не выполнять асинхронно, то она заблокируется, что обычно не допускается.
- 2. Запустите процесс выполнения фоновой задачи
- 3. Когда требуется выполнить трудоемкую функцию, она не будет выполняться синхронно немедленно, а извлечет ключевые данные функции, сериализует их и сохранит в очереди, а очередь может быть реализована с использованием Redis или других методов.
- 4. Процесс выполнения фоновой задачи получит данные из очереди и десериализует их.
- 5. Процесс выполнения фоновой задачи будет использовать исходную функцию и восстановленные данные для завершения выполнения функции, тем самым достигая эффекта асинхронного выполнения.
Процесс не сложный, и разные концепции в Celery отвечают за разные части вышеописанного процесса.
Реализовать простой сельдерей
Далее давайте реализуем Celery, где Celery выбирает Redis в качестве серверной части.
Во-первых, давайте организуем общую структуру.
Во-первых, нам нужно определить класс Task для представления задач, которые должны быть выполнены.Конкретная логика, которая будет выполняться различными задачами, пишется самим пользователем.
Затем определите очередь задач, Брокер в Celery, для хранения задач, которые должны быть выполнены.
Затем необходимо определить рабочий процесс выполнения, а рабочий должен получать задачи от брокера и выполнять их.
Наконец, вам нужно определить класс для хранения данных, возвращаемых задачей, то есть Backend в Celery.
Это выглядит немного сложно, но не паникуйте, на самом деле это очень просто.
реализовать класс задачи
Во-первых, реализуйте task.py, который используется для определения некоторой логики, связанной с задачей.
# task.py
import abc
import json
import uuid
import traceback
import pickle
from broker import Broker
from backend import Backend
class BaseTask(abc.ABC):
"""
Example Usage:
class AdderTask(BaseTask):
task_name = "AdderTask"
def run(self, a, b):
result = a + b
return result
adder = AdderTask()
adder.delay(9, 34)
"""
task_name = None
def __init__(self):
if not self.task_name:
raise ValueError("task_name should be set")
self.broker = Broker()
@abc.abstractmethod # abstractmethod 派生类必须重写实现逻辑
def run(self, *args, **kwargs):
# 写上你具体的逻辑
raise NotImplementedError("Task `run` method must be implemented.")
# 更新任务状态
def update_state(self, task_id, state, meta={}):
_task = {"state": state, "meta": meta}
serialized_task = json.dumps(_task)
backend = Backend()
backend.enqueue(queue_name=task_id, item=serialized_task)
print(f"task info: {task_id} succesfully queued")
# 异步执行
def delay(self, *args, **kwargs):
try:
self.task_id = str(uuid.uuid4())
_task = {"task_id": self.task_id, "args": args, "kwargs": kwargs}
serialized_task = json.dumps(_task)
# 加入redis中
self.broker.enqueue(queue_name=self.task_name, item=serialized_task)
print(f"task: {self.task_id} succesfully queued")
except Exception:
# traceback.print_exc()
raise Exception("Unable to publish task to the broker.")
return self.task_id
# 获取数据
def async_result(task_id):
backend = Backend()
_dequeued_item = backend.dequeue(queue_name=task_id)
dequeued_item = json.loads(_dequeued_item)
state = dequeued_item["state"]
meta = dequeued_item["meta"]
class Info():
def __init__(self, state, meta):
self.state = state
self.meta = meta
info = Info(state, meta)
return info
В приведенном выше коде определен класс BaseTask, который наследуется от abc.ABC Python, чтобы стать абстрактным базовым классом, в котором имя_задачи должно быть определено в начале, потому что позже нам нужно найти соответствующую очередь задач через имя_задачи.
Метод run() класса BaseTask дополнен методом abc.abstractmethod, который требует, чтобы производный класс BaseTask переопределял метод run(), чтобы пользователи могли настраивать собственную логику задачи.
Для обновления состояния задачи используется метод update_state() класса BaseTask, логика его очень проста: сначала параметры сериализуются в JSON, а затем вызывается метод enqueue() Backend для сохранения данных. Backend здесь на самом деле является экземпляром Redis, метод enqueue() запишет данные в список Redis.Следует отметить, что ключом списка здесь является task_id, который является идентификатором текущей задачи.
Метод delay() класса BaseTask используется для асинхронного выполнения задач.Сначала создайте уникальный идентификатор для задачи через uuid, затем сериализуйте параметры метода через JSON, а затем вызовите enqueue() Брокера для сохранения Брокер здесь на самом деле также Для экземпляра Redis метод enqueue() также записывает данные в список Redis, но ключом списка является имя_задачи, которое является именем текущей задачи.
Кроме того, также реализован метод async_result(), который используется для асинхронного получения данных задачи.Через этот метод можно получить результат выполнения задачи или различные данные в процессе выполнения задачи.Структура данные имеют простое соглашение и должны иметь состояние, представляющее состояние задачи, конечно, и должна быть мета для представления некоторой информации о текущей задаче.
Реализовать брокера и серверную часть
Broker и Backend используются в task.py, затем давайте реализуем эти два, сначала реализуем Broker.
# broker.py
import redis # pip install redis
class Broker:
"""
use redis as our broker.
This implements a basic FIFO queue using redis.
"""
def __init__(self):
host = "localhost"
port = 6379
password = None
self.redis_instance = redis.StrictRedis(
host=host, port=port, password=password, db=0, socket_timeout=8.0
)
def enqueue(self, item, queue_name):
self.redis_instance.lpush(queue_name, item)
def dequeue(self, queue_name):
dequed_item = self.redis_instance.brpop(queue_name, timeout=3)
if not dequed_item:
return None
dequed_item = dequed_item[1]
return dequed_item
Говорить не о чем, то есть задается два метода для хранения и чтения данных, для хранения используется метод lpush, который будет вставлять данные в список Redis слева, и метод brpop используется для чтения данные, и он будет сохранен из списка.Возьмите первый элемент справа, верните значение элемента и удалите его из списка и сформируйте очередь с левым входом и правым выходом.
Для простоты код Бэкенда точно такой же, как и у Брокера, просто он используется для хранения информации о задаче, поэтому код публиковаться не будет.
Процесс выполнения фоновой задачи Worker
Затем для реализации фонового процесса выполнения задачи Worker
# worker.py
import json
class Worker:
"""
Example Usage:
task = AdderTask()
worker = Worker(task=task)
worker.start()
"""
def __init__(self, task) -> None:
self.task = task
def start(self,):
while True:
try:
_dequeued_item = self.task.broker.dequeue(queue_name=self.task.task_name)
dequeued_item = json.loads(_dequeued_item)
task_id = dequeued_item["task_id"]
task_args = dequeued_item["args"]
task_kwargs = dequeued_item["kwargs"]
task_kwargs['task_id'] = task_id
self.task.run(*task_args, **task_kwargs)
print("succesful run of task: {0}".format(task_id))
except Exception:
print("Unable to execute task.")
continue
В приведенном выше коде определен класс Worker.Класс Worker должен указать конкретный экземпляр задачи во время инициализации, затем получить данные, относящиеся к задаче, от брокера, а затем вызвать метод run() для завершения выполнения задачи, что относительно просто.
Использование игрушки Сельдерей
Ключевая структура игрушечного Celery была определена, а затем давайте ее использовать.Здесь мы по-прежнему будем использовать часть кода из статьи «Python Web: Flask Asynchronous Task Execution», например, внешний код и внешний код. Код -end здесь обсуждаться не будет.Чтение можно сначала прочитать, чтобы облегчить понимание следующего содержимого.
Сначала определите трудоемкую задачу
# app.py
class LongTask(BaseTask):
task_name = "LongTask"
def run(self, task_id):
"""Background task that runs a long function with progress reports."""
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
for i in range(total):
if not message or random.random() < 0.25:
message = '{0} {1} {2}...'.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
self.update_state(task_id=task_id, state='PROGRESS',
meta={'current': i, 'total': total,
'status': message})
time.sleep(1)
self.update_state(task_id=task_id, state='FINISH', meta={'current':100, 'total': 100,'status': 'Task completed!', 'result':32})
return
Каждая трудоемкая задача должна наследоваться от BaseTask и переписать свой метод run().Логика в методе run() — это конкретная логика, которая будет обрабатываться текущей трудоемкой задачей.
Логика здесь на самом деле очень простая, просто случайным образом извлекайте слова из нескольких списков.
В итерации for, если вы хотите, чтобы передняя часть знала конкретную ситуацию текущей задачи для итерации, вам необходимо обновить соответствующие данные в задней части с помощью метода update_state() BaseTask и использовать task_id в качестве ключа список в Redis.
Когда вся логика выполнена, информация со статусом FINISH сохраняется в бэкенде.
Напишите интерфейс для запуска этой трудоемкой задачи
# app.py
@app.route('/longtask', methods=['POST'])
def longtask():
long_task = LongTask()
task_id = long_task.delay()
return jsonify({}), 202, {'Location': url_for('taskstatus',
task_id=task_id)}
Логика очень проста: создайте экземпляр LongTask() и вызовите метод delay(), этот метод сохранит текущую задачу в очереди, а текущий запрос передаст task_id текущей задачи в поле taskstatus в заголовке ответа. внешний интерфейс.
После того, как внешний интерфейс получен, task_id можно использовать для получения такой информации, как текущий статус выполнения задачи, чтобы реализовать визуализацию внешнего интерфейса.
Затем определите соответствующий интерфейс для получения информации о текущей задаче, вызовите метод async_result() и передайте task_id.
# app.py
@app.route('/status/<task_id>')
def taskstatus(task_id):
info = async_result(task_id)
print(info)
if info.state == 'PENDING':
response = {
'state': info.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif info.state != 'FAILURE':
response = {
'state': info.state,
'current': info.meta.get('current', 0),
'total': info.meta.get('total', 1),
'status': info.meta.get('status', '')
}
if 'result' in info.meta:
response['result'] = info.meta['result']
else:
# something went wrong in the background job
response = {
'state': info.state,
'current': 1,
'total': 1,
'status': str(info.meta), # this is the exception raised
}
return jsonify(response)
Наконец, вам нужно определить логику, которая запускает процесс выполнения фоновой задачи.
# run_worker.py
from worker import Worker
from app import LongTask
if __name__ == "__main__":
long_task = LongTask()
worker = Worker(task=long_task)
worker.start()
На данный момент общая структура построена, используйте ее.
Первый запуск Redis.
redis-server
Затем запустите Flask.
python app.py
Наконец, запустите процесс выполнения фоновой задачи, который эквивалентен процессу Celery.celery -A xxx worker --loglevel=info
Заказ.
python run_worker.py
Выполнение нескольких задач одновременно, эффект следующий
Вот некоторые из соответствующих отпечатков:
python run_worker.py
Unable to execute task.
Unable to execute task.
Unable to execute task.
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
python app.py
* Serving Flask app "app" (lazy loading)
* Environment: production
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 145-285-706
127.0.0.1 - - [25/Sep/2019 11:14:07] "GET / HTTP/1.1" 200 -
task: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
127.0.0.1 - - [25/Sep/2019 11:14:11] "POST /longtask HTTP/1.1" 202 -
<task.async_result.<locals>.Info object at 0x107f50780>
127.0.0.1 - - [25/Sep/2019 11:14:11] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
<task.async_result.<locals>.Info object at 0x107f50a20>
127.0.0.1 - - [25/Sep/2019 11:14:13] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
хвостик
Следует отметить, что в приведенном выше коде с помощью Worker необходимо создать конкретную задачу. В настоящее время экземпляр задачи отличается от экземпляра задачи, созданного через интерфейс в app.py. Worker использует разные экземпляры и использует один и тот же параметры для достижения той же цели.
Код загружен на Github:GitHub.com/АюЛяо/игрушка…
Если вы считаете, что статья полезна, пожалуйста, нажмите звездочку «Я смотрю» в правом нижнем углу, ее можно нажать, спасибо.