[Перевод] Использование Redis Queue для реализации асинхронных задач во Flask

Python

Если в вашем приложении есть длительные задачи, вы должны отделить их от обычного процесса и выполнять в фоновом режиме.

Возможно, ваше веб-приложение потребует от пользователя загрузить аватар (изображение может потребоваться обрезать) и подтверждение электронной почты во время регистрации. Если вы обрабатываете изображения и отправляете письма с подтверждением непосредственно в обработчике запросов, конечный пользователь должен дождаться завершения этих операций. Вместо этого вы бы предпочли поместить эти задачи в очередь задач и обработать их рабочим потоком, и в этом случае приложение может немедленно отвечать на запросы клиентов. В результате конечные пользователи могут продолжать другие операции на стороне клиента, а ваше приложение может быть выпущено для ответа на запросы других пользователей.

В этой статье рассказывается о том, как настроить приложение Flask.Redis Queue(RQ) для обработки длительных задач.

Конечно, сельдерей также является хорошим решением. Однако он немного сложнее и вводит больше зависимостей, чем Redis Queue.

содержание

Цель этой статьи

Прочитав эту статью, вы должны узнать:

  1. Интегрируйте Redis Queue в приложение Flask и создайте соответствующие задачи.
  2. Используйте Docker для создания образа приложения, включающего Flask и Redis.
  3. Используйте отдельные рабочие потоки для обработки длительных задач в фоновом режиме.
  4. настроитьRQ DashboardИспользуется для мониторинга очередей задач, заданий и рабочих потоков.
  5. Масштабируйте количество рабочих потоков с помощью Docker.

процесс работы

В этой статье наша цель — использовать возможности Redis Queue для разработки приложения Flask, которое может обрабатывать длительные задачи, где выполнение длительных задач не зависит от выполнения обычных запросов и ответов.

  1. Конечный пользователь запрашивает у сервера создание новой задачи через POST.
  2. Как показано на рисунке, новая задача будет добавлена ​​в очередь задач, а затем сервер вернет идентификатор задачи клиенту.
  3. Созданная задача будет выполняться в фоновом режиме на сервере, а клиенту нужно только использовать AJAX для непрерывного опроса статуса задачи.

Flask 集成 Redis Queue 的调用时序图

В итоге мы реализуем приложение, которое выглядит так:

开发完成

Конфигурация проекта

Хотите продолжить чтение? Клонируйте следующий репозиторий, чтобы увидеть код и структуру внутри:

$ git clone https://github.com/mjhea0/flask-redis-queue --branch base --single-branch
$ cd flask-redis-queue

Поскольку нам нужно управлять в общей сложности тремя процессами (Flask, Redis и worker), чтобы упростить эту серию рабочих процессов, мы решили использовать Docker для развертывания, и, наконец, нам нужно всего лишь запустить все приложение в одном терминале.

Запустите приложение следующим образом:

$ docker-compose up -d --build

Доступ с помощью вашего браузераhttp://localhost:5004, вы должны увидеть следующую страницу:

flask、redis queue 和 docker

триггер задачи

когдаproject/client/static/main.jsПосле того, как слушатель в слушателе прослушает нажатие кнопки, он получит тип задачи, соответствующий кнопке —1,2или3, и отправить полученный тип задачи в качестве параметра на сервер через запрос AJAX POST.

$('.btn').on('click', function() {
  $.ajax({
    url: '/tasks',
    data: { type: $(this).data('type') },
    method: 'POST'
  })
  .done((res) => {
    getStatus(res.data.task_id)
  })
  .fail((err) => {
    console.log(err)
  });
});

На стороне сервераproject/server/main/views.pyОн будет отвечать за обработку запроса, отправленного клиентом:

@main_blueprint.route('/tasks', methods=['POST'])
def run_task():
    task_type = request.form['type']
    return jsonify(task_type), 202

Давайте соберем Redis Queue.

Redis Queue

Сначала нам нужноdocker-compose.ymlДобавьте конфигурацию для запуска двух новых процессов — Redis и worker:

version: '3.7'

services:

  web:
    build: .
    image: web
    container_name: web
    ports:
      - '5004:5000'
    command: python manage.py run -h 0.0.0.0
    volumes:
      - .:/usr/src/app
    environment:
      - FLASK_DEBUG=1
      - APP_SETTINGS=project.server.config.DevelopmentConfig
    depends_on:
      - redis

  worker:
    image: web
    command: python manage.py run_worker
    volumes:
      - .:/usr/src/app
    environment:
      - APP_SETTINGS=project.server.config.DevelopmentConfig
    depends_on:
      - redis

  redis:
    image: redis:4.0.11-alpine

Добавьте новую задачу в каталог "project/server/main"tasks.py:

# project/server/main/tasks.py

import time

def create_task(task_type):
    time.sleep(int(task_type) * 10)
    return True

Обновите наш код представления, чтобы он мог подключаться к Redis и ставить задачу в очередь, и, наконец, возвращать идентификатор задачи клиенту:

@main_blueprint.route('/tasks', methods=['POST'])
def run_task():
    task_type = request.form['type']
    with Connection(redis.from_url(current_app.config['REDIS_URL'])):
        q = Queue()
        task = q.enqueue(create_task, task_type)
    response_object = {
        'status': 'success',
        'data': {
            'task_id': task.get_id()
        }
    }
    return jsonify(response_object), 202

Не забудьте правильно импортировать используемые выше библиотеки:

import redis
from rq import Queue, Connection
from flask import render_template, Blueprint, jsonify, \
    request, current_app

from project.server.main.tasks import create_task

возобновитьBaseConfigдокумент:

class BaseConfig(object):
    """基础配置"""
    WTF_CSRF_ENABLED = True
    REDIS_URL = 'redis://redis:6379/0'
    QUEUES = ['default']

Внимательный читатель мог заметить, что мы цитируемredisобслуживание (вdocker-compose.ymlадрес введен в ), использованиеREDIS_URLвместоlocalhostили конкретный IP. Как подключиться к другим сервисам через имя хоста в Docker, вы можете найти в Docker Compose.официальная документацияНайдите ответ в .

Наконец, мы можем использовать Redis QueueworkerДля решения поставленной задачи возглавил бригаду.

@cli.command('run_worker')
def run_worker():
    redis_url = app.config['REDIS_URL']
    redis_connection = redis.from_url(redis_url)
    with Connection(redis_connection):
        worker = Worker(app.config['QUEUES'])
        worker.work()

Здесь мы запускаем работника с помощью пользовательской команды CLI.

Следует отметить, что при оформлении@cli.command()Код запуска имеет доступ к контексту приложения, а также кproject/server/config.pyПеременные конфигурации, определенные в .

Также необходимо импортировать правильную библиотеку:

import redis
from rq import Connection, Worker

Добавьте информацию о зависимостях приложения в файл требований:

redis==2.10.6
rq==0.12.0

Соберите и запустите новый контейнер Docker:

$ docker-compose up -d --build

Попробуем запустить задачу:

$ curl -F type=0 http://localhost:5004/tasks

Вы должны получить что-то вроде этого:

{
  "data": {
    "task_id": "bdad64d0-3865-430e-9cc3-ec1410ddb0fd"
  },
  "status": "success"
}

статус задачи

Вернемся к слушателю нажатия клавиш клиента:

$('.btn').on('click', function() {
  $.ajax({
    url: '/tasks',
    data: { type: $(this).data('type') },
    method: 'POST'
  })
  .done((res) => {
    getStatus(res.data.task_id)
  })
  .fail((err) => {
    console.log(err)
  });
});

Всякий раз, когда возвращается запрос AJAX на создание задачи, мы убираем идентификатор задачи и продолжаем вызыватьgetStatus(). какgetStatus()Он также успешно возвращается, после чего мы добавляем новую строку в таблицу DOM.

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.data.task_id}</td>
        <td>${res.data.task_status}</td>
        <td>${res.data.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);
    const taskStatus = res.data.task_status;
    if (taskStatus === 'finished' || taskStatus === 'failed') return false;
    setTimeout(function() {
      getStatus(res.data.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err);
  });
}

Обновить код слоя вида:

@main_blueprint.route('/tasks/<task_id>', methods=['GET'])
def get_status(task_id):
    with Connection(redis.from_url(current_app.config['REDIS_URL'])):
        q = Queue()
        task = q.fetch_job(task_id)
    if task:
        response_object = {
            'status': 'success',
            'data': {
                'task_id': task.get_id(),
                'task_status': task.get_status(),
                'task_result': task.result,
            }
        }
    else:
        response_object = {'status': 'error'}
    return jsonify(response_object)

Вызовите следующую команду, чтобы добавить задачу в очередь:

$ curl -F type=1 http://localhost:5004/tasks

Затем используйте приведенный выше возврат в телеtask_idЧтобы запросить новый интерфейс сведений о задаче:

$ curl http://localhost:5004/tasks/5819789f-ebd7-4e67-afc3-5621c28acf02

{
  "data": {
    "task_id": "5819789f-ebd7-4e67-afc3-5621c28acf02",
    "task_result": true,
    "task_status": "finished"
  },
  "status": "success"
}

Также давайте попробуем эффект в браузере:

flask, redis queue, docker

консоль задач

RQ DashboardЭто легкая веб-система мониторинга для Redis Queue.

Чтобы интегрировать RQ Dashboard, сначала вам нужно создать папку «dashboard» в разделе «project», а затем создать в ней новую папку.Dockerfile:

FROM python:3.7.0-alpine

RUN pip install rq-dashboard

EXPOSE 9181

CMD ["rq-dashboard"]

Затем добавьте указанный выше модуль в качестве службы вdocker-compose.ymlсередина:

version: '3.7'

services:

  web:
    build: .
    image: web
    container_name: web
    ports:
      - '5004:5000'
    command: python manage.py run -h 0.0.0.0
    volumes:
      - .:/usr/src/app
    environment:
      - FLASK_DEBUG=1
      - APP_SETTINGS=project.server.config.DevelopmentConfig
    depends_on:
      - redis

  worker:
    image: web
    command: python manage.py run_worker
    volumes:
      - .:/usr/src/app
    environment:
      - APP_SETTINGS=project.server.config.DevelopmentConfig
    depends_on:
      - redis

  redis:
    image: redis:4.0.11-alpine

  dashboard:
    build: ./project/dashboard
    image: dashboard
    container_name: dashboard
    ports:
      - '9181:9181'
    command: rq-dashboard -H redis

Создайте и запустите новый контейнер:

$ docker-compose up -d --build

Открытымhttp://localhost:9181Взгляните на всю консоль:

rq dashboard

Вы можете попробовать запустить некоторые задачи, чтобы опробовать функциональность консоли:

rq dashboard

Вы также можете наблюдать за изменением приложения, увеличивая количество воркеров:

$ docker-compose up -d --build --scale worker=3

Эпилог

Это базовое руководство по настройке очереди Redis в Flask для обработки длительных задач. Вы можете использовать эту очередь для выполнения любого процесса, который может заблокировать или замедлить работу пользователя.

Хотите продолжать бросать себе вызов?

  1. регистрDigital OceanИ используйте Docker Swarm для развертывания этого приложения на нескольких узлах.
  2. Добавьте модульные тесты для интерфейсов. (можно использоватьfakeredisдля имитации экземпляра Redis)
  3. использоватьFlask-SocketIOИзмените опрос клиента на подключение через веб-сокет.

допустимыйэтот репозиторийНайдите код этой статьи.

Если вы обнаружите ошибки в переводе или в других областях, требующих доработки, добро пожаловать наПрограмма перевода самородковВы также можете получить соответствующие бонусные баллы за доработку перевода и PR. начало статьиПостоянная ссылка на эту статьюЭто ссылка MarkDown этой статьи на GitHub.


Программа перевода самородковэто сообщество, которое переводит высококачественные технические статьи из ИнтернетаНаггетсДелитесь статьями на английском языке на . Охват контентаAndroid,iOS,внешний интерфейс,задняя часть,блокчейн,продукт,дизайн,искусственный интеллектЕсли вы хотите видеть более качественные переводы, пожалуйста, продолжайте обращать вниманиеПрограмма перевода самородков,официальный Вейбо,Знай колонку.