Использование Celery в проекте Flask (с фабричным шаблоном или без)

Flask

Эта статья принадлежитДополнительный рассказ «Практика веб-разработки на Flask»ряд. Эта статья покажет вам, как интегрировать Celery в проект Flask.

Создайте программу сельдерея

Первым шагом является создание экземпляра программы Celery. Поскольку экземпляр программы Flask обычно называется app, во избежание конфликтов мы обычно называем экземпляр Celery celery или celery_app:

from celery import Celery

celery = Celery(__name__, broker='pyamqp://guest@localhost//')

@celery.task
def add(x, y):
    return x + y

Организация и загрузка конфигураций

Большинство руководств, включая текущую документацию Flask, описывают загрузку конфигурации следующим образом:

celery.conf.update(app.config)  # 这里的 app 是 Flask 程序实例

То есть запишите конфигурацию Celery и конфигурацию Flask вместе, а затем обновите конфигурацию из словаря конфигурации экземпляра программы Flask.

Но проблема в том, что Celery включил новые имена конфигураций в нижнем регистре, начиная с версии 4.0, и некоторые конфигурации заменены новыми именами. Хотя старая конфигурация в верхнем регистре по-прежнему поддерживается, если вы планируете использовать имена конфигураций в нижнем регистре или планируете миграцию в будущем, метод загрузки конфигурации здесь не будет работать, поскольку Flask будет импортировать только прописные буквы при импорте конфигурации из файла или объекта. Переменные конфигурации.

Поэтому я рекомендую писать конфигурацию Celery в отдельный файл, не смешивая его с конфигурацией Flask. Следуя примеру из документации Celery, вы можете создатьceleryconfig.pyФайл (или другое имя) для сохранения конфигурации:

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

Затем используйте следующий метод для загрузки конфигурации (используйте другое имя модуля или не забудьте изменить входящую строку при использовании другого пути):

celery.config_from_object('celeryconfig')

Если вам нужно передать конфигурацию брокера и серверной части при создании экземпляра Celery, вы можете написать ее напрямую или импортировать из модуля конфигурации:

from celeryconfig import broker_url

celery = Celery(__name__, broker=broker_url)

Инициализировать сельдерей в программе Flask

Вы можете создать программу Celery отдельно, но обычно нам нужно добавить к ней поддержку контекста программы Flask, потому что иногда ваша функция задачи Celery использует некоторые переменные, которые зависят от контекста программы Flask.

Ниже мы создаем фабричную функцию для Celery, создаем экземпляр Celery в фабричной функции, загружаем конфигурацию и реализуем поддержку контекста программы Flask:

from flask import Flask
from celery import Celery

from celeryconfig import broker_url


def make_celery(app):
    celery = Celery(__name__, broker=broker_url)
    celery.config_from_object('celeryconfig')

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

app = Flask(__name__)

celery = make_celery(app)

В модуле, который определяет задачи Celery, например tasks.py, вы можете импортировать экземпляр программы Celery:

from app import celery

@celery.task
def add(x, y):
    return x + y

Инициализируйте Celery в программе Flask, используя фабричную функцию

Когда программа Flask также создается с использованием фабричной функции, мы можем создать экземпляр программы Celery глобально, а затем обновить конфигурацию программы Celery и установить контекст в фабричной функции, которая создает экземпляр программы Flask:

from flask import Flask
from celery import Celery

from celeryconfig import broker_url
 
celery = Celery(__name__, broker=broker_url)
 

def create_app():
    app = Flask(__name__)
 
    register_celery(app)
    return app
 

def register_celery(app):
    celery.config_from_object('celeryconfig')
 
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
 
    celery.Task = ContextTask

Также импортируйте экземпляр Celery напрямую и создайте задачу:

from app import celery

@celery.task
def add(x, y):
    return x + y

Первоначально это было полное вводное руководство по Celery, но из-за повреждения жесткого диска в прошлом году соответствующий пример программы был утерян, и у меня не хватило настойчивости переписать ее снова, поэтому в этой статье было извлечено только содержимое, относящееся к Flask.

Поскольку с момента написания первого черновика прошло более полугода, последняя версия Celery также 4.3.0.Если в тексте есть пропуски, или есть лучший способ добиться этого, пожалуйста, прокомментируйте.