Практическая реализация простого сельдерея

Python

Введение

Celery — это распределенная очередь задач, реализованная Python, которая обычно имеет три функции.

  • 1. Чтобы снизить нагрузку на параллелизм, сначала запишите задачу в очередь, а затем запустите ее, когда появятся свободные ресурсы.
  • 2. Чтобы выполнить запланированную задачу, сначала запишите задачу в очередь, а затем выполните ее в указанное время.
  • 3. Асинхронные задачи, в вебе есть трудоемкие задачи, которые можно сначала записать в очередь, а потом выполнить фоновым процессом задачи

Было много статей, описывающих использование и простые принципы Celery, и в этой статье он также будет кратко упомянут, но это не займет много чернил.

В этой статье основное внимание уделяется использованию Python для реализации простого Celery и использованию Celery, реализованного самостоятельно, для реализации асинхронных задач.Как и в предыдущей статье «Python Web: Flask выполняет задачи асинхронно», создайте простую сеть через Flask, а затем выполнить задачу «Время потребления», я надеюсь, что внешний интерфейс может отображать ход выполнения задачи через индикатор выполнения.

Следует отметить, что исходный код Celery здесь интерпретироваться не будет.Его исходный код имеет много инженерных деталей и относительно сложен.Здесь всего лишь простая реализация игрушечного Celery исходя из его сути.Конечно, эта игрушка Celery не может можно сравнить с Celery с точки зрения стабильности и эффективности.По сравнению, но это может быть хорошим пониманием того, как в целом реализован Celery.

В этой статье основное внимание уделяется «разделению форм и духовному союзу», что отличается от деталей реализации Celery, но основной принцип тот же.

Итак, приступим!

Понятие и принцип сельдерея

Celery 5 ключевых понятий, если вы их поймете, у вас будет общее представление о Celery.

  • 1. Задача (задача) Проще говоря, это то, что вы делаете, например, отправляете электронные письма в процессе регистрации пользователя.

  • 2. Рабочий Человек, который обрабатывает задачу в фоновом режиме

  • 3. Брокер (брокер) Суть в очереди, Задание будет передано Брокеру, а Воркер возьмет Задание у Брокера и обработает его

  • 4. Бит Планировщик задач по расписанию, в соответствии с установленным временем, добавить данные в брокер, а затем дождаться обработки работником

  • 5. Бэкенд Объект, используемый для сохранения результата выполнения Worker, каждая Task должна иметь возвращаемое значение, и эти возвращаемые значения находятся в Backend

Здесь мы отложим в сторону различные концепции, посмотрим на Celery с более существенной точки зрения и обнаружим, что это процесс хранения сериализации задач и получения десериализации.

Взяв в качестве примера веб-асинхронные задачи, использование обычно выглядит следующим образом:

  • 1. Есть функция, которой нужно долго обрабатывать ввод-вывод, если ее не выполнять асинхронно, то она заблокируется, что обычно не допускается.
  • 2. Запустите процесс выполнения фоновой задачи
  • 3. Когда требуется выполнить трудоемкую функцию, она не будет выполняться синхронно немедленно, а извлечет ключевые данные функции, сериализует их и сохранит в очереди, а очередь может быть реализована с использованием Redis или других методов.
  • 4. Процесс выполнения фоновой задачи получит данные из очереди и десериализует их.
  • 5. Процесс выполнения фоновой задачи будет использовать исходную функцию и восстановленные данные для завершения выполнения функции, тем самым достигая эффекта асинхронного выполнения.

Процесс не сложный, и разные концепции в Celery отвечают за разные части вышеописанного процесса.

Реализовать простой сельдерей

Далее давайте реализуем Celery, где Celery выбирает Redis в качестве серверной части.

Во-первых, давайте организуем общую структуру.

Во-первых, нам нужно определить класс Task для представления задач, которые должны быть выполнены.Конкретная логика, которая будет выполняться различными задачами, пишется самим пользователем.

Затем определите очередь задач, Брокер в Celery, для хранения задач, которые должны быть выполнены.

Затем необходимо определить рабочий процесс выполнения, а рабочий должен получать задачи от брокера и выполнять их.

Наконец, вам нужно определить класс для хранения данных, возвращаемых задачей, то есть Backend в Celery.

Это выглядит немного сложно, но не паникуйте, на самом деле это очень просто.

реализовать класс задачи

Во-первых, реализуйте task.py, который используется для определения некоторой логики, связанной с задачей.

# task.py
import abc
import json
import uuid
import traceback
import pickle

from broker import Broker
from backend import Backend

class BaseTask(abc.ABC):
    """
    Example Usage:
        class AdderTask(BaseTask):
            task_name = "AdderTask"
            def run(self, a, b):
                result = a + b
                return result
        adder = AdderTask()
        adder.delay(9, 34)
    """

    task_name = None

    def __init__(self):
        if not self.task_name:
            raise ValueError("task_name should be set")
        self.broker = Broker()
 
    @abc.abstractmethod # abstractmethod 派生类必须重写实现逻辑
    def run(self, *args, **kwargs):
        # 写上你具体的逻辑
        raise NotImplementedError("Task `run` method must be implemented.")

    # 更新任务状态
    def update_state(self, task_id, state, meta={}):
        _task = {"state": state, "meta": meta}
        serialized_task = json.dumps(_task)
        backend = Backend()
        backend.enqueue(queue_name=task_id, item=serialized_task)
        print(f"task info: {task_id} succesfully queued")

    # 异步执行
    def delay(self, *args, **kwargs):
        try:
            self.task_id = str(uuid.uuid4())
            _task = {"task_id": self.task_id, "args": args, "kwargs": kwargs}
            serialized_task = json.dumps(_task)
            # 加入redis中
            self.broker.enqueue(queue_name=self.task_name, item=serialized_task)
            print(f"task: {self.task_id} succesfully queued")
        except Exception:
            # traceback.print_exc()
            raise Exception("Unable to publish task to the broker.")
        return self.task_id

# 获取数据
def async_result(task_id):
    backend = Backend()
    _dequeued_item = backend.dequeue(queue_name=task_id)
    dequeued_item = json.loads(_dequeued_item)
    state = dequeued_item["state"]
    meta = dequeued_item["meta"]
    class Info():
        def __init__(self, state, meta):
            self.state = state
            self.meta = meta
    info = Info(state, meta)
    return info

В приведенном выше коде определен класс BaseTask, который наследуется от abc.ABC Python, чтобы стать абстрактным базовым классом, в котором имя_задачи должно быть определено в начале, потому что позже нам нужно найти соответствующую очередь задач через имя_задачи.

Метод run() класса BaseTask дополнен методом abc.abstractmethod, который требует, чтобы производный класс BaseTask переопределял метод run(), чтобы пользователи могли настраивать собственную логику задачи.

Для обновления состояния задачи используется метод update_state() класса BaseTask, логика его очень проста: сначала параметры сериализуются в JSON, а затем вызывается метод enqueue() Backend для сохранения данных. Backend здесь на самом деле является экземпляром Redis, метод enqueue() запишет данные в список Redis.Следует отметить, что ключом списка здесь является task_id, который является идентификатором текущей задачи.

Метод delay() класса BaseTask используется для асинхронного выполнения задач.Сначала создайте уникальный идентификатор для задачи через uuid, затем сериализуйте параметры метода через JSON, а затем вызовите enqueue() Брокера для сохранения Брокер здесь на самом деле также Для экземпляра Redis метод enqueue() также записывает данные в список Redis, но ключом списка является имя_задачи, которое является именем текущей задачи.

Кроме того, также реализован метод async_result(), который используется для асинхронного получения данных задачи.Через этот метод можно получить результат выполнения задачи или различные данные в процессе выполнения задачи.Структура данные имеют простое соглашение и должны иметь состояние, представляющее состояние задачи, конечно, и должна быть мета для представления некоторой информации о текущей задаче.

Реализовать брокера и серверную часть

Broker и Backend используются в task.py, затем давайте реализуем эти два, сначала реализуем Broker.

# broker.py
import redis # pip install redis

class Broker:
    """
    use redis as our broker.
    This implements a basic FIFO queue using redis.
    """
    def __init__(self):
        host = "localhost"
        port = 6379
        password = None
        self.redis_instance = redis.StrictRedis(
            host=host, port=port, password=password, db=0, socket_timeout=8.0
        )

    def enqueue(self, item, queue_name):
        self.redis_instance.lpush(queue_name, item)

    def dequeue(self, queue_name):
        dequed_item = self.redis_instance.brpop(queue_name, timeout=3)
        if not dequed_item:
            return None
        dequed_item = dequed_item[1]
        return dequed_item

Говорить не о чем, то есть задается два метода для хранения и чтения данных, для хранения используется метод lpush, который будет вставлять данные в список Redis слева, и метод brpop используется для чтения данные, и он будет сохранен из списка.Возьмите первый элемент справа, верните значение элемента и удалите его из списка и сформируйте очередь с левым входом и правым выходом.

Для простоты код Бэкенда точно такой же, как и у Брокера, просто он используется для хранения информации о задаче, поэтому код публиковаться не будет.

Процесс выполнения фоновой задачи Worker

Затем для реализации фонового процесса выполнения задачи Worker

# worker.py
import json

class Worker:
    """
    Example Usage:
        task = AdderTask()
        worker = Worker(task=task)
        worker.start()
    """
    def __init__(self, task) -> None:
        self.task = task

    def start(self,):
        while True:
            try:
                _dequeued_item = self.task.broker.dequeue(queue_name=self.task.task_name)
                dequeued_item = json.loads(_dequeued_item)
                task_id = dequeued_item["task_id"]
                task_args = dequeued_item["args"]
                task_kwargs = dequeued_item["kwargs"]
                task_kwargs['task_id'] = task_id
                self.task.run(*task_args, **task_kwargs)
                print("succesful run of task: {0}".format(task_id))
            except Exception:
                print("Unable to execute task.")
                continue   

В приведенном выше коде определен класс Worker.Класс Worker должен указать конкретный экземпляр задачи во время инициализации, затем получить данные, относящиеся к задаче, от брокера, а затем вызвать метод run() для завершения выполнения задачи, что относительно просто.

Использование игрушки Сельдерей

Ключевая структура игрушечного Celery была определена, а затем давайте ее использовать.Здесь мы по-прежнему будем использовать часть кода из статьи «Python Web: Flask Asynchronous Task Execution», например, внешний код и внешний код. Код -end здесь обсуждаться не будет.Чтение можно сначала прочитать, чтобы облегчить понимание следующего содержимого.

Сначала определите трудоемкую задачу

# app.py
class LongTask(BaseTask):

    task_name = "LongTask"

    def run(self, task_id):
        """Background task that runs a long function with progress reports."""
        verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
        adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
        noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']

        message = ''
        total = random.randint(10, 50)

        for i in range(total):
            if not message or random.random() < 0.25:
                message = '{0} {1} {2}...'.format(random.choice(verb),
                                                  random.choice(adjective),
                                                  random.choice(noun))
            self.update_state(task_id=task_id, state='PROGRESS',
                              meta={'current': i, 'total': total,
                                    'status': message})
            time.sleep(1)
        
        self.update_state(task_id=task_id, state='FINISH', meta={'current':100, 'total': 100,'status': 'Task completed!', 'result':32})
        return

Каждая трудоемкая задача должна наследоваться от BaseTask и переписать свой метод run().Логика в методе run() — это конкретная логика, которая будет обрабатываться текущей трудоемкой задачей.

Логика здесь на самом деле очень простая, просто случайным образом извлекайте слова из нескольких списков.

В итерации for, если вы хотите, чтобы передняя часть знала конкретную ситуацию текущей задачи для итерации, вам необходимо обновить соответствующие данные в задней части с помощью метода update_state() BaseTask и использовать task_id в качестве ключа список в Redis.

Когда вся логика выполнена, информация со статусом FINISH сохраняется в бэкенде.

Напишите интерфейс для запуска этой трудоемкой задачи

# app.py
@app.route('/longtask', methods=['POST'])
def longtask():
    long_task = LongTask()
    task_id = long_task.delay()
    return jsonify({}), 202, {'Location': url_for('taskstatus',
                                                  task_id=task_id)}

Логика очень проста: создайте экземпляр LongTask() и вызовите метод delay(), этот метод сохранит текущую задачу в очереди, а текущий запрос передаст task_id текущей задачи в поле taskstatus в заголовке ответа. внешний интерфейс.

После того, как внешний интерфейс получен, task_id можно использовать для получения такой информации, как текущий статус выполнения задачи, чтобы реализовать визуализацию внешнего интерфейса.

Затем определите соответствующий интерфейс для получения информации о текущей задаче, вызовите метод async_result() и передайте task_id.

# app.py
@app.route('/status/<task_id>')
def taskstatus(task_id):
    info = async_result(task_id)
    print(info)
    if info.state == 'PENDING':
        response = {
            'state': info.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif info.state != 'FAILURE':
        response = {
            'state': info.state,
            'current': info.meta.get('current', 0),
            'total': info.meta.get('total', 1),
            'status': info.meta.get('status', '')
        }
        if 'result' in info.meta:
            response['result'] = info.meta['result']
    else:
        # something went wrong in the background job
        response = {
            'state': info.state,
            'current': 1,
            'total': 1,
            'status': str(info.meta),  # this is the exception raised
        }
    return jsonify(response)

Наконец, вам нужно определить логику, которая запускает процесс выполнения фоновой задачи.

# run_worker.py
from worker import Worker
from app import LongTask

if __name__ == "__main__":
    long_task = LongTask()
    worker = Worker(task=long_task)
    worker.start()

На данный момент общая структура построена, используйте ее.

Первый запуск Redis.

redis-server

Затем запустите Flask.

python app.py

Наконец, запустите процесс выполнения фоновой задачи, который эквивалентен процессу Celery.celery -A xxx worker --loglevel=infoЗаказ.

python run_worker.py

Выполнение нескольких задач одновременно, эффект следующий

Вот некоторые из соответствующих отпечатков:

python run_worker.py
Unable to execute task.
Unable to execute task.
Unable to execute task.
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
python app.py
 * Serving Flask app "app" (lazy loading)
 * Environment: production
   WARNING: Do not use the development server in a production environment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with stat

 * Debugger is active!
 * Debugger PIN: 145-285-706
127.0.0.1 - - [25/Sep/2019 11:14:07] "GET / HTTP/1.1" 200 -
task: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
127.0.0.1 - - [25/Sep/2019 11:14:11] "POST /longtask HTTP/1.1" 202 -
<task.async_result.<locals>.Info object at 0x107f50780>
127.0.0.1 - - [25/Sep/2019 11:14:11] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
<task.async_result.<locals>.Info object at 0x107f50a20>
127.0.0.1 - - [25/Sep/2019 11:14:13] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -

хвостик

Следует отметить, что в приведенном выше коде с помощью Worker необходимо создать конкретную задачу. В настоящее время экземпляр задачи отличается от экземпляра задачи, созданного через интерфейс в app.py. Worker использует разные экземпляры и использует один и тот же параметры для достижения той же цели.

Код загружен на Github:GitHub.com/АюЛяо/игрушка…

Если вы считаете, что статья полезна, пожалуйста, нажмите звездочку «Я смотрю» в правом нижнем углу, ее можно нажать, спасибо.