- Оригинальный адрес:Asynchronous Tasks with Flask and Redis Queue
- Оригинальный автор:Michael Herman
- Перевод с:Программа перевода самородков
- Постоянная ссылка на эту статью:GitHub.com/rare earth/gold-no…
- Переводчик:Лю Цзяи
- Корректор:kasheemlew
Если в вашем приложении есть длительные задачи, вы должны отделить их от обычного процесса и выполнять в фоновом режиме.
Возможно, ваше веб-приложение потребует от пользователя загрузить аватар (изображение может потребоваться обрезать) и подтверждение электронной почты во время регистрации. Если вы обрабатываете изображения и отправляете письма с подтверждением непосредственно в обработчике запросов, конечный пользователь должен дождаться завершения этих операций. Вместо этого вы бы предпочли поместить эти задачи в очередь задач и обработать их рабочим потоком, и в этом случае приложение может немедленно отвечать на запросы клиентов. В результате конечные пользователи могут продолжать другие операции на стороне клиента, а ваше приложение может быть выпущено для ответа на запросы других пользователей.
В этой статье рассказывается о том, как настроить приложение Flask.Redis Queue(RQ) для обработки длительных задач.
Конечно, сельдерей также является хорошим решением. Однако он немного сложнее и вводит больше зависимостей, чем Redis Queue.
содержание
Цель этой статьи
Прочитав эту статью, вы должны узнать:
- Интегрируйте Redis Queue в приложение Flask и создайте соответствующие задачи.
- Используйте Docker для создания образа приложения, включающего Flask и Redis.
- Используйте отдельные рабочие потоки для обработки длительных задач в фоновом режиме.
- настроитьRQ DashboardИспользуется для мониторинга очередей задач, заданий и рабочих потоков.
- Масштабируйте количество рабочих потоков с помощью Docker.
процесс работы
В этой статье наша цель — использовать возможности Redis Queue для разработки приложения Flask, которое может обрабатывать длительные задачи, где выполнение длительных задач не зависит от выполнения обычных запросов и ответов.
- Конечный пользователь запрашивает у сервера создание новой задачи через POST.
- Как показано на рисунке, новая задача будет добавлена в очередь задач, а затем сервер вернет идентификатор задачи клиенту.
- Созданная задача будет выполняться в фоновом режиме на сервере, а клиенту нужно только использовать AJAX для непрерывного опроса статуса задачи.
В итоге мы реализуем приложение, которое выглядит так:
Конфигурация проекта
Хотите продолжить чтение? Клонируйте следующий репозиторий, чтобы увидеть код и структуру внутри:
$ git clone https://github.com/mjhea0/flask-redis-queue --branch base --single-branch
$ cd flask-redis-queue
Поскольку нам нужно управлять в общей сложности тремя процессами (Flask, Redis и worker), чтобы упростить эту серию рабочих процессов, мы решили использовать Docker для развертывания, и, наконец, нам нужно всего лишь запустить все приложение в одном терминале.
Запустите приложение следующим образом:
$ docker-compose up -d --build
Доступ с помощью вашего браузераhttp://localhost:5004, вы должны увидеть следующую страницу:
триггер задачи
когдаproject/client/static/main.jsПосле того, как слушатель в слушателе прослушает нажатие кнопки, он получит тип задачи, соответствующий кнопке —1
,2
или3
, и отправить полученный тип задачи в качестве параметра на сервер через запрос AJAX POST.
$('.btn').on('click', function() {
$.ajax({
url: '/tasks',
data: { type: $(this).data('type') },
method: 'POST'
})
.done((res) => {
getStatus(res.data.task_id)
})
.fail((err) => {
console.log(err)
});
});
На стороне сервераproject/server/main/views.pyОн будет отвечать за обработку запроса, отправленного клиентом:
@main_blueprint.route('/tasks', methods=['POST'])
def run_task():
task_type = request.form['type']
return jsonify(task_type), 202
Давайте соберем Redis Queue.
Redis Queue
Сначала нам нужноdocker-compose.ymlДобавьте конфигурацию для запуска двух новых процессов — Redis и worker:
version: '3.7'
services:
web:
build: .
image: web
container_name: web
ports:
- '5004:5000'
command: python manage.py run -h 0.0.0.0
volumes:
- .:/usr/src/app
environment:
- FLASK_DEBUG=1
- APP_SETTINGS=project.server.config.DevelopmentConfig
depends_on:
- redis
worker:
image: web
command: python manage.py run_worker
volumes:
- .:/usr/src/app
environment:
- APP_SETTINGS=project.server.config.DevelopmentConfig
depends_on:
- redis
redis:
image: redis:4.0.11-alpine
Добавьте новую задачу в каталог "project/server/main"tasks.py:
# project/server/main/tasks.py
import time
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True
Обновите наш код представления, чтобы он мог подключаться к Redis и ставить задачу в очередь, и, наконец, возвращать идентификатор задачи клиенту:
@main_blueprint.route('/tasks', methods=['POST'])
def run_task():
task_type = request.form['type']
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
task = q.enqueue(create_task, task_type)
response_object = {
'status': 'success',
'data': {
'task_id': task.get_id()
}
}
return jsonify(response_object), 202
Не забудьте правильно импортировать используемые выше библиотеки:
import redis
from rq import Queue, Connection
from flask import render_template, Blueprint, jsonify, \
request, current_app
from project.server.main.tasks import create_task
возобновитьBaseConfig
документ:
class BaseConfig(object):
"""基础配置"""
WTF_CSRF_ENABLED = True
REDIS_URL = 'redis://redis:6379/0'
QUEUES = ['default']
Внимательный читатель мог заметить, что мы цитируемredis
обслуживание (вdocker-compose.ymlадрес введен в ), использованиеREDIS_URL
вместоlocalhost
или конкретный IP. Как подключиться к другим сервисам через имя хоста в Docker, вы можете найти в Docker Compose.официальная документацияНайдите ответ в .
Наконец, мы можем использовать Redis QueueworkerДля решения поставленной задачи возглавил бригаду.
@cli.command('run_worker')
def run_worker():
redis_url = app.config['REDIS_URL']
redis_connection = redis.from_url(redis_url)
with Connection(redis_connection):
worker = Worker(app.config['QUEUES'])
worker.work()
Здесь мы запускаем работника с помощью пользовательской команды CLI.
Следует отметить, что при оформлении@cli.command()
Код запуска имеет доступ к контексту приложения, а также кproject/server/config.pyПеременные конфигурации, определенные в .
Также необходимо импортировать правильную библиотеку:
import redis
from rq import Connection, Worker
Добавьте информацию о зависимостях приложения в файл требований:
redis==2.10.6
rq==0.12.0
Соберите и запустите новый контейнер Docker:
$ docker-compose up -d --build
Попробуем запустить задачу:
$ curl -F type=0 http://localhost:5004/tasks
Вы должны получить что-то вроде этого:
{
"data": {
"task_id": "bdad64d0-3865-430e-9cc3-ec1410ddb0fd"
},
"status": "success"
}
статус задачи
Вернемся к слушателю нажатия клавиш клиента:
$('.btn').on('click', function() {
$.ajax({
url: '/tasks',
data: { type: $(this).data('type') },
method: 'POST'
})
.done((res) => {
getStatus(res.data.task_id)
})
.fail((err) => {
console.log(err)
});
});
Всякий раз, когда возвращается запрос AJAX на создание задачи, мы убираем идентификатор задачи и продолжаем вызыватьgetStatus()
. какgetStatus()
Он также успешно возвращается, после чего мы добавляем новую строку в таблицу DOM.
function getStatus(taskID) {
$.ajax({
url: `/tasks/${taskID}`,
method: 'GET'
})
.done((res) => {
const html = `
<tr>
<td>${res.data.task_id}</td>
<td>${res.data.task_status}</td>
<td>${res.data.task_result}</td>
</tr>`
$('#tasks').prepend(html);
const taskStatus = res.data.task_status;
if (taskStatus === 'finished' || taskStatus === 'failed') return false;
setTimeout(function() {
getStatus(res.data.task_id);
}, 1000);
})
.fail((err) => {
console.log(err);
});
}
Обновить код слоя вида:
@main_blueprint.route('/tasks/<task_id>', methods=['GET'])
def get_status(task_id):
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
task = q.fetch_job(task_id)
if task:
response_object = {
'status': 'success',
'data': {
'task_id': task.get_id(),
'task_status': task.get_status(),
'task_result': task.result,
}
}
else:
response_object = {'status': 'error'}
return jsonify(response_object)
Вызовите следующую команду, чтобы добавить задачу в очередь:
$ curl -F type=1 http://localhost:5004/tasks
Затем используйте приведенный выше возврат в телеtask_id
Чтобы запросить новый интерфейс сведений о задаче:
$ curl http://localhost:5004/tasks/5819789f-ebd7-4e67-afc3-5621c28acf02
{
"data": {
"task_id": "5819789f-ebd7-4e67-afc3-5621c28acf02",
"task_result": true,
"task_status": "finished"
},
"status": "success"
}
Также давайте попробуем эффект в браузере:
консоль задач
RQ DashboardЭто легкая веб-система мониторинга для Redis Queue.
Чтобы интегрировать RQ Dashboard, сначала вам нужно создать папку «dashboard» в разделе «project», а затем создать в ней новую папку.Dockerfile:
FROM python:3.7.0-alpine
RUN pip install rq-dashboard
EXPOSE 9181
CMD ["rq-dashboard"]
Затем добавьте указанный выше модуль в качестве службы вdocker-compose.ymlсередина:
version: '3.7'
services:
web:
build: .
image: web
container_name: web
ports:
- '5004:5000'
command: python manage.py run -h 0.0.0.0
volumes:
- .:/usr/src/app
environment:
- FLASK_DEBUG=1
- APP_SETTINGS=project.server.config.DevelopmentConfig
depends_on:
- redis
worker:
image: web
command: python manage.py run_worker
volumes:
- .:/usr/src/app
environment:
- APP_SETTINGS=project.server.config.DevelopmentConfig
depends_on:
- redis
redis:
image: redis:4.0.11-alpine
dashboard:
build: ./project/dashboard
image: dashboard
container_name: dashboard
ports:
- '9181:9181'
command: rq-dashboard -H redis
Создайте и запустите новый контейнер:
$ docker-compose up -d --build
Открытымhttp://localhost:9181Взгляните на всю консоль:
Вы можете попробовать запустить некоторые задачи, чтобы опробовать функциональность консоли:
Вы также можете наблюдать за изменением приложения, увеличивая количество воркеров:
$ docker-compose up -d --build --scale worker=3
Эпилог
Это базовое руководство по настройке очереди Redis в Flask для обработки длительных задач. Вы можете использовать эту очередь для выполнения любого процесса, который может заблокировать или замедлить работу пользователя.
Хотите продолжать бросать себе вызов?
- регистрDigital OceanИ используйте Docker Swarm для развертывания этого приложения на нескольких узлах.
- Добавьте модульные тесты для интерфейсов. (можно использоватьfakeredisдля имитации экземпляра Redis)
- использоватьFlask-SocketIOИзмените опрос клиента на подключение через веб-сокет.
допустимыйэтот репозиторийНайдите код этой статьи.
Если вы обнаружите ошибки в переводе или в других областях, требующих доработки, добро пожаловать наПрограмма перевода самородковВы также можете получить соответствующие бонусные баллы за доработку перевода и PR. начало статьиПостоянная ссылка на эту статьюЭто ссылка MarkDown этой статьи на GitHub.
Программа перевода самородковэто сообщество, которое переводит высококачественные технические статьи из ИнтернетаНаггетсДелитесь статьями на английском языке на . Охват контентаAndroid,iOS,внешний интерфейс,задняя часть,блокчейн,продукт,дизайн,искусственный интеллектЕсли вы хотите видеть более качественные переводы, пожалуйста, продолжайте обращать вниманиеПрограмма перевода самородков,официальный Вейбо,Знай колонку.