Знакомство с сельдереем

Python

1. Что такое сельдерей

  • 1.1 Celery — простая, гибкая и надежная распределенная система, написанная на Python для обработки больших объемов информации, а также предоставляет инструменты, необходимые для работы и обслуживания распределенных систем (это не сама очередь задач, это система управления очередью задач). ), который предоставляет интерфейс, который может помочь нам реализовать распределенные очереди задач).

  • 1.2 Celery фокусируется на обработке задач в реальном времени и поддерживает планирование задач (с помощью rabbitMQ можно реализовать различные обмены).

Проще говоря, это инструмент управления распределенной очередью.Мы можем быстро реализовать и управлять распределенной очередью задач, используя интерфейс, предоставляемый Celery.

  • 1.3 Сельдерей Архитектура

    注释

    • брокер сообщений (почтовый ящик, почтовое отделение): он не предоставляет службу сообщений сам по себе, но может быть интегрирован со сторонним промежуточным программным обеспечением для сообщений, обычно используется redis mongodb rabbitMQ.

    • Блок выполнения задачи (работник) (отправитель): единица выполнения задач, предоставляемая Celery, и рабочие процессы, выполняемые одновременно в узлах распределенной системы.

    • хранилище результатов задачи (получатель): используется для хранения результатов задач, выполняемых Worker, Celery поддерживает сохранение результатов задач различными способами, включая Redis, MongoDB, Django ORM, AMQP и т. д.

  • 1.4 Очереди задач и очереди сообщений

    • Очередь задач — это механизм распределения задач онлайн или по машинам.

    • Ввод очереди сообщений — это единица работы, которую можно рассматривать как задачу, а независимый рабочий процесс постоянно отслеживает очередь на наличие новых задач, которые необходимо обработать.

    • диаграмма

      注释


2. Простой пример

2.1 Создайте экземпляр Celery Создайте файл tasks.py

import time
from celery import Celery

app = Celery('tasks', broker='redis:////127.0.0.1:6379/6', backend='redis:////127.0.0.1:6379/7')


@app.task
def add(x, y):
    time.sleep(10)
    return x + y

ps: tasks - это имя задачи, установите reids как промежуточное ПО

2.2 Создайте файл index.py для вызова и обнаружения задач и просмотра статуса выполнения задач

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from tasks import add, app
from celery.result import AsyncResult
import time

# 立即告知celery去执行add任务,并传入两个参数
result = add.delay(4, 4)
print(result.id)
async = AsyncResult(id=result.id, app=app)

time.sleep(3)
if async.successful():
    result = async.get()
    print(result, "执行成功")
    # result.forget() # 将结果删除
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')

  • ps Если в качестве посредника очереди задач используется Redis, в Redis есть два ключа celery и _kombu.binding.celery, _kombu.binding.celery указывает, что существует очередь задач с именем celery (по умолчанию Celery), а celery является очередью по умолчанию. , Список задач, используя тип списка, вы можете увидеть добавленные данные задачи.

2.3 Подробное объяснение команды выполнения

  • celery -A app.celery_tasks.celery worker -Q queue --loglevel=info
    • Параметр A указывает расположение объекта celery. app.celery_tasks.celery ссылается на экземпляр celery модуля celery_tasks.py в пакете приложения. Обратите внимание, что это должен быть инициализированный экземпляр.

    • Параметр Q относится к воркеру, получающему задачи указанной очереди, который должен быть независимым, когда несколько очередей имеют разные задачи; если он не установлен, он получит все задачи очереди;

    • Параметр l указывает уровень журнала рабочего процесса;

После завершения выполнения результат сохраняется в redis, проверяем данные в redis и обнаруживаем, что есть пара ключ-значение строкового типа сельдерей-задача-мета-064e4262-e1ba-4e87-b4a1-52dd1418188f:данные Срок действия этой пары ключ-значение составляет 24 часа.

2.4 Анализ тела сообщения

  • body: это информация, закодированная в base64 после сериализации, включая конкретные параметры задачи, включая методы, которые должны быть выполнены, параметры и некоторую базовую информацию о задаче.
  • content-encoding: метод кодирования сериализованных данных
  • content-type: Метод сериализации данных задачи. По умолчанию в python используется встроенный модуль сериализации pickle (ps: Типы, поддерживаемые модулем pickle Все нативные типы, поддерживаемые python: логические, целые, с плавающей точкой, сложные, строковые, байт, нет. Списки, кортежи, словари и наборы, состоящие из любого примитивного типа. Функции, классы, экземпляры классов, часто используемые методы: дампы, дамп, загрузки, загрузка)
{
    "body": "gAJ9cQAoWAQAAAB0YXNrcQFYCQAAAHRhc2tzLmFkZHECWAIAAABpZHEDWCQAAABjNDMwMzZkMi03Yzc3LTQ0MDUtOTYwNC1iZDc3ZTcyNzNlN2FxBFgEAAAAYXJnc3EFSwRLBIZxBlgGAAAAa3dhcmdzcQd9cQhYBwAAAHJldHJpZXNxCUsAWAMAAABldGFxCk5YBwAAAGV4cGlyZXNxC05YAwAAAHV0Y3EMiFgJAAAAY2FsbGJhY2tzcQ1OWAgAAABlcnJiYWNrc3EOTlgJAAAAdGltZWxpbWl0cQ9OToZxEFgHAAAAdGFza3NldHERTlgFAAAAY2hvcmRxEk51Lg==",
    "content-encoding": "binary",
    "content-type": "application/x-python-serialize",
    "headers": {},
    "properties": {
        "reply_to": "caa78c3a-618a-31f0-84a9-b79db708af02",
        "correlation_id": "c43036d2-7c77-4405-9604-bd77e7273e7a",
        "delivery_mode": 2,
        "delivery_info": {
            "priority": 0,
            "exchange": "celery",
            "routing_key": "celery"
        },
        "body_encoding": "base64",
        "delivery_tag": "e7e288b5-ecbb-4ec6-912c-f42eb92dbd72"
    }
}

2.5 Конфигурация сельдерея

CELERY_DEFAULT_QUEUE:默认队列
BROKER_URL  : 代理人的网址
CELERY_RESULT_BACKEND:结果存储地址
CELERY_TASK_SERIALIZER:任务序列化方式
CELERY_RESULT_SERIALIZER:任务执行结果序列化方式
CELERY_TASK_RESULT_EXPIRES:任务过期时间
CELERY_ACCEPT_CONTENT:指定任务接受的内容序列化类型(序列化),一个列表;

2.6 Как получить результат выполнения задачи

r = func.delay(...)
r.ready()     			# 查看任务状态,返回布尔值,  任务执行完成, 返回 True, 否则返回 False.
r.wait()      			# 等待任务完成, 返回任务执行结果,很少使用;
r.get(timeout=1)       # 获取任务执行结果,可以设置等待时间
r.result      			# 任务执行结果.
r.state       			# PENDING, START, SUCCESS,任务当前的状态
r.status     				# PENDING, START, SUCCESS,任务当前的状态
r.successful  			# 任务成功返回true
r.traceback 				# 如果任务抛出了一个异常,你也可以获取原始的回溯信息

2.7 Метод декоратора Celery celery.task

  • task() украшает задачу (функцию) как асинхронную

@celery.task()
def func():
	# do something
    pass
    
  • Базовый класс, который может переопределять задачи

class MyTask(celery.Task):
    # 任务失败时执行
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))
    # 任务成功时执行
    def on_success(self, retval, task_id, args, kwargs):
        pass
    # 任务重试时执行
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        pass


параметр

  • task_id : идентификатор задачи
  • einfo: сведения о задаче при сбое выполнения
  • ex: тип ошибки при сбое
  • retval: результат выполнения, возвращаемый при успешном выполнении задачи.

2.8 Полный файл конфигурации

# 注意,celery4版本后,CELERY_BROKER_URL改为BROKER_URL
BROKER_URL = 'amqp://username:passwd@host:port/虚拟主机名'
# 指定结果的接受地址
CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
# 指定任务序列化方式
CELERY_TASK_SERIALIZER = 'msgpack' 
# 指定结果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
# 任务过期时间,celery任务执行结果的超时时间
CELERY_TASK_RESULT_EXPIRES = 60 * 20   
# 指定任务接受的序列化类型.
CELERY_ACCEPT_CONTENT = ["msgpack"]   
# 任务发送完成是否需要确认,这一项对性能有一点影响     
CELERY_ACKS_LATE = True  
# 压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据
CELERY_MESSAGE_COMPRESSION = 'zlib' 
# 规定完成任务的时间
CELERYD_TASK_TIME_LIMIT = 5  # 在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程
# celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目
CELERYD_CONCURRENCY = 4 
# celery worker 每次去rabbitmq预取任务的数量
CELERYD_PREFETCH_MULTIPLIER = 4 
# 每个worker执行了多少任务就会死掉,默认是无限的
CELERYD_MAX_TASKS_PER_CHILD = 40 
# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
CELERY_DEFAULT_QUEUE = "default" 
# 设置详细的队列
CELERY_QUEUES = {
    "default": { # 这是上面指定的默认队列
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    "topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
        "routing_key": "topic.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "task_eeg": { # 设置扇形交换机
        "exchange": "tasks",
        "exchange_type": "fanout",
        "binding_key": "tasks",
    },
    
}

2.8 Задачи синхронизации Celery

  • Укажите запланированную задачу и добавьте ее в конфигурацию Перезапустите воркер
# config.py
from datetime import timedelta
from celery.schedules import crontab
 
CELERYBEAT_SCHEDULE = {
    'ptask': {
        'task': 'tasks.period_task',
        'schedule': timedelta(seconds=5),
    },
}

# 添加定时任务
@app.task(bind=True)
def period_task(self):
    print 'period task done: {0}'.format(self.request.id)
 

PS: если время включает в себя время данных, лучше установить его на время UTC.

  • Запустите запланированный процесс задачи
celery -A task beat

2.9 Цепочки задач

Связанная задача — это асинхронная или запланированная задача, выполняемая несколькими подзадачами.

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()
 
@app.task()
def fetch_page(url):
    return myhttplib.get(url)
 
@app.task()
def parse_page(page):
    return myparser.parse_document(page)
 
@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

 fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])

3 Celery, rabbitmq реализует три режима обмена

  • рабочий процесс сельдерея

celery流程

  • borker: промежуточное ПО сообщений
  • worker: Блок выполнения задач
  • storgae: хранилище результатов выполнения задачи

3.1 Введение в распространенные режимы обмена (прямой, разветвленный, тема, заголовок)

Когда задачи, которые мы выполняем, должны быть классифицированы в соответствии с конкретными потребностями, мы можем создать несколько очередей для задач, и каждый метод обмена очередями может быть указан.Следует отметить, что Redis может предоставить только метод прямого обмена, который также указан по умолчанию. , поэтому мы заменили посредника на rabbitmq.

Во-первых, давайте посмотрим, что такое режимы обмена?

  • Режим прямого обмена

    direct

    Этот режим поставляется с rabbitmq (redis), поэтому нам нужно только указать routing_key в процессе фактического использования. Или укажите имя очереди.

    ps: Если указанное нами имя очереди отсутствует в конфигурации, созданная нами задача сообщения будет автоматически отменена, поэтому нам нужно проверить правильность очереди в конфигурации, потому что rabbitmq имеет возможность хранить только очереди и не может хранить информация сообщения.

  • Режим обмена ответвлениями

    fanout

    • Нет необходимости указывать routing_key в режиме fanout, будут выполняться все Очереди, для которых exchange_type имеет значение fanout.
    • Таким образом, очередь и разветвление могут быть связаны «многие ко многим».
  • Режим обмена темами

    topic

    Любое сообщение, отправленное в Topic Exchange, будет перенаправлено во все очереди, которым важна тема, указанная в RouteKey.

    • Этот режим более сложен. Проще говоря, каждая очередь имеет свою собственную тему, и все сообщения имеют «заголовок» (RouteKey). очередь.

    • Этот режим требует RouteKey и, возможно, заранее привязывает Exchange и Queue.

    • При привязке укажите тему, о которой заботится очередь. Например, "#.log.#" означает, что очередь заботится обо всех сообщениях, связанных с журналом (сообщение, RouteKey которого равно "MQ.log.error", будет переадресовано в это очередь. очередь).

    • "#" указывает на 0 или несколько ключевых слов, а "" указывает на ключевое слово. Например, «log.» может совпадать с «log.warn», но не может совпадать с «log.warn.timeout», а «log.#» может совпадать с обоими указанными выше.

    • Аналогичным образом, если Exchange не найдет очередь, соответствующую RouteKey, сообщение будет отброшено.

3.2 Как использовать эти три режима на практических примерах

  • Сначала установите rabbitmq и запустите rabbitmq-сервер.

  • Создайте файл rabbitmq_config.py и измените конфигурацию, ранее указанную в tasks.py, на rabbitmq_config, код выглядит следующим образом.

#coding:utf-8
from celery.schedules import crontab
import sys
import os


sys.path.insert(0, os.getcwd())
CELERY_IMPORTS = ("tasks", )
CELERY_RESULT_BACKEND = "amqp"
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "guest"
BROKER_PASSWORD = "guest"
BROKER_VHOST = "/"

  • Создайте желаемую биржу
default_exchange = Exchange('dedfault', type='direct')

# 定义一个媒体交换机,类型是直连交换机
media_exchange = Exchange('media', type='direct')

# 定义一个image交换机,类型是fanout交换机
image_exchange = Exchange('media', type='direct')

# 创建三个队列,一个是默认队列,一个是video、一个image
CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)
# 定义默认队列和默认的交换机routing_key
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

  • Укажите задачи в tasks.py

# 视频压缩
@app.task
def video_compress(video_name):
    time.sleep(10)
    print('Compressing the:', video_name)
    return 'success'


@app.task
def video_upload(video_name):
    time.sleep(5)
    print( u'正在上传视频')
    return 'success'


# 压缩照片
@app.task
def image_compress(image_name):
    time.sleep(10)
    print('Compressing the:', image_name)
    return 'success'


# 其他任务
@app.task
def other(str):
    time.sleep(10)
    print ('Do other things')
    return 'success'


  • указанный маршрут

CELERY_ROUTES = ({'tasks.image_compress': {
                        'queue': 'images',
                        'routing_key': 'media.image'
                 }},{'tasks.video_upload': {
                        'queue': 'videos',
                        'routing_key': 'media.video'
                 }},{'tasks.video_compress': {
                        'queue': 'videos',
                        'routing_key': 'media.video'
                 }}, )


Теперь выполните созданную задачу

Есть два способа запустить воркер

Первый: указать очередь

Второй: не указывать (все исполнение)

ps Для того, чтобы лучше видеть добавленные нами очереди, а также соответствующий режим обмена, запустите все очереди

начать рабочий

[queue] содержит созданную очередь, остальные параметры можно сравнить ранее в этой статье [tasks] показывает все наши задачи

---- **** ----- 
--- * ***  * -- Darwin-18.2.0-x86_64-i386-64bit 2018-12-28 15:38:00
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x104e78d68
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> default          exchange=dedfault(direct) key=default
                .> images           exchange=media(direct) key=media.image
                .> others           exchange=other(fanout) key=other.others
                .> videos           exchange=media(direct) key=media.video

[tasks]
  . tasks.add
  . tasks.dr
  . tasks.image_compress
  . tasks.other
  . tasks.period_task
  . tasks.task
  . tasks.video_compress
  . tasks.video_upload

[2018-12-28 15:38:00,906: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//

Результаты выполнения (вы можете увидеть соответствующие результаты выполнения в фоновом управлении rabbitimq):

[2018-12-28 15:38:00,906: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-12-28 15:38:00,933: INFO/MainProcess] mingle: searching for neighbors
[2018-12-28 15:38:02,013: INFO/MainProcess] mingle: all alone
[2018-12-28 15:38:02,091: INFO/MainProcess] celery@zhanlingjiedeMacBook-Pro.local ready.
[2018-12-28 15:38:42,386: INFO/MainProcess] Received task: tasks.add[1fdfbc23-e106-49ab-ac25-d46c2b5e8960]  
[2018-12-28 15:38:42,429: INFO/ForkPoolWorker-3] Task tasks.add[1fdfbc23-e106-49ab-ac25-d46c2b5e8960] succeeded in 0.040455893002217636s: 5
[2018-12-28 15:38:46,397: INFO/MainProcess] Received task: tasks.image_compress[cab797c5-eaae-4f11-b55c-041f4256ead9]  
[2018-12-28 15:38:46,410: INFO/MainProcess] Received task: tasks.other[0b00fd52-2251-42ef-9743-49df3f2906ed]  
[2018-12-28 15:38:56,401: WARNING/ForkPoolWorker-4] Compressing the:
[2018-12-28 15:38:56,402: WARNING/ForkPoolWorker-4] 这是我上传的图片
[2018-12-28 15:38:56,412: WARNING/ForkPoolWorker-3] Do other things
[2018-12-28 15:38:56,447: INFO/ForkPoolWorker-3] Task tasks.other[0b00fd52-2251-42ef-9743-49df3f2906ed] succeeded in 10.036200570997607s: 'success'
[2018-12-28 15:38:56,461: INFO/ForkPoolWorker-4] Task tasks.image_compress[cab797c5-eaae-4f11-b55c-041f4256ead9] succeeded in 10.061314186998061s: 'success'

Пополнить

Как правило, чтобы использовать сельдерей для объединения с реальной сценой, фреймворк будет использоваться для использования django + celery + redis (rabbitmq)