Инфраструктура микросервисов на основе Sanic

задняя часть база данных Микросервисы API

вводить

Одной из самых больших проблем, с которыми сталкивается использование Python для веб-разработки, является производительность, что немного сложно для решения проблемы C10K. Некоторые асинхронные фреймворки Tornado, Twisted, Gevent и др. предназначены для решения проблем с производительностью. Эти фреймворки немного улучшили производительность, но также есть множество странных проблем, которые трудно решить.

В python3.6 официальная библиотека асинхронных сопрограмм asyncio официально стала стандартом. Сохраняя удобство, производительность была значительно улучшена, и многие асинхронные фреймворки использовали asyncio.

Более ранняя асинхронная структура — это aiohttp, которая предоставляет серверную и клиентскую стороны и обеспечивает хорошую инкапсуляцию asyncio. Однако метод разработки отличается от самого популярного микрофреймворка flask: разработка flask проста, легка и эффективна.

В последнее время микросервисы — самая популярная модель разработки, которая решает проблему сложности, повышает эффективность разработки и упрощает развертывание.

Он сочетается с этими преимуществами, основываясь на Sanic, интегрированном с несколькими популярными библиотеками для создания микросервисов. Саначевые рамки представляют собой аналогичную эквивационную структуру, аналогичную колбу, простой и легкой и высокой производительности.

Этот проект представляет собой микросервисную структуру, основанную на Sanic.

Функции

  • Используя асинхронный фреймворк Sanic, он прост, легок и эффективен.
  • Использование uvloop в качестве основного движка во многих случаях делает sanic не хуже, чем Golang, в одномашинном параллелизме.
  • Используйте asyncpg в качестве драйвера базы данных, подключитесь к базе данных и выполните выполнение инструкции sql.
  • Использовать aiohttp для Клиента, доступ к другим микросервисам.
  • Используйте peewee в качестве ORM, но только для проектирования моделей и миграции.
  • Используйте opentracing как распределенную систему трассировки.
  • Используйте unittest для модульного тестирования и используйте mocks, чтобы избежать доступа к другим микросервисам.
  • Использование swagger в качестве стандарта API может автоматически генерировать документацию по API.

использовать

адрес проекта:sanic-ms

Example

Сервер

Использование Sanic Asynchronous Framework имеет высокую производительность, но неправильное использование приведет к блокировке, а асинхронная библиотека должна использоваться для запросов IO.Будьте осторожны при добавлении библиотек. Sanic использует асинхронный привод uvloop, uvloop написан на Cython на основе libuv, и его производительность выше, чем у nodejs.

Описание функции:

перед стартом

@app.listener('before_server_start')
async def before_srver_start(app, loop):
    queue = asyncio.Queue()
    app.queue = queue
    loop.create_task(consume(queue, app.config.ZIPKIN_SERVER))
    reporter = AioReporter(queue=queue)
    tracer = BasicTracer(recorder=reporter)
    tracer.register_required_propagators()
    opentracing.tracer = tracer
    app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)
  • Создать пул соединений с БД
  • Создать клиентское соединение
  • Создать очередь, использовать диапазон для отслеживания журнала
  • Создайте файл opentracing.tracer для трассировки журналов.

промежуточное ПО

@app.middleware('request')
async def cros(request):
    if request.method == 'POST' or request.method == 'PUT':
        request['data'] = request.json
    span = before_request(request)
    request['span'] = span


@app.middleware('response')
async def cors_res(request, response):
    span = request['span'] if 'span' in request else None
    if response is None:
        return response
    result = {'code': 0}
    if not isinstance(response, HTTPResponse):
        if isinstance(response, tuple) and len(response) == 2:
            result.update({
                'data': response[0],
                'pagination': response[1]
            })
        else:
            result.update({'data': response})
        response = json(result)
        if span:
            span.set_tag('http.status_code', "200")
    if span:
        span.set_tag('component', request.app.name)
        span.finish()
    return response
  • Создание диапазонов для отслеживания журналов
  • Инкапсулируйте ответ в унифицированном формате

Обработка исключений

Обработать выброшенное исключение и вернуть единый формат

Задача

Создайте диапазон в очереди потребления задач для отслеживания журнала

Асинхронная обработка

Поскольку используется асинхронная структура, некоторые запросы ввода-вывода могут обрабатываться параллельно.

Example:

async def async_request(datas):
    # async handler request
    results = await asyncio.gather(*[data[2] for data in datas])
    for index, obj in enumerate(results):
        data = datas[index]
        data[0][data[1]] = results[index]

@user_bp.get('/<id:int>')
@doc.summary("get user info")
@doc.description("get user info by id")
@doc.produces(Users)
async def get_users_list(request, id):
    async with request.app.db.acquire(request) as cur:
        record = await cur.fetch(
            """ SELECT * FROM users WHERE id = $1 """, id)
        datas = [
            [record, 'city_id', get_city_by_id(request, record['city_id'])]
            [record, 'role_id', get_role_by_id(request, record['role_id'])]
        ]
        await async_request(datas)
        return record

get_city_by_id, get_role_by_id — параллельная обработка.

Ссылки по теме

sanic

Дизайн модели и ORM

Peewee — это простая и маленькая ORM с несколькими (но выразительными) концепциями, что делает ее простой в освоении и интуитивно понятной в использовании.

ORM использует peewee только для проектирования и миграции моделей, а операции с базами данных используют asyncpg.

Example:

# models.py

class Users(Model):
    id = PrimaryKeyField()
    create_time = DateTimeField(verbose_name='create time',
                                default=datetime.datetime.utcnow)
    name = CharField(max_length=128, verbose_name="user's name")
    age = IntegerField(null=False, verbose_name="user's age")
    sex = CharField(max_length=32, verbose_name="user's sex")
    city_id = IntegerField(verbose_name='city for user', help_text=CityApi)
    role_id = IntegerField(verbose_name='role for user', help_text=RoleApi)

    class Meta:
        db_table = 'users'


# migrations.py

from sanic_ms.migrations import MigrationModel, info, db

class UserMigration(MigrationModel):
    _model = Users

    # @info(version="v1")
    # def migrate_v1(self):
    #     migrate(self.add_column('sex'))

def migrations():
    try:
        um = UserMigration()
        with db.transaction():
            um.auto_migrate()
            print("Success Migration")
    except Exception as e:
        raise e

if __name__ == '__main__':
    migrations()
  • Запустите команду python migrations.py
  • Функция migrate_v1 добавляет поле пола, а поле имени должно быть добавлено первым в BaseModel.
  • Информационный декоратор создаст таблицу migrate_record для записи миграции, версия должна быть уникальной в каждой модели, используйте версию для записи о том, была ли она выполнена, и автор, дата и время также могут быть записаны
  • Функция миграции должна начинаться с **migrate_**

Ссылки по теме

peewee

операции с базой данных

asyncpg is the fastest driver among common Python, NodeJS and Go implementations

Используйте Asyncpg в качестве драйвера базы данных, инкапсулируйте соединение с базой данных и выполняйте операции базы данных.

Одной из причин отказа от использования ORM для операций с базой данных является производительность, ORM будет иметь потери в производительности, а высокопроизводительную библиотеку asyncpg использовать нельзя. Во-вторых, отдельный микросервис очень прост, структура таблицы не очень сложна, и можно обрабатывать простые операторы SQL, и нет необходимости вводить ORM. Используйте peewee только для дизайна модели

Example:

sql = "SELECT * FROM users WHERE name=$1"
name = "test"
async with request.app.db.acquire(request) as cur:
    data = await cur.fetchrow(sql, name)

async with request.app.db.transaction(request) as cur:
    data = await cur.fetchrow(sql, name)
  • Функция Acquire() не является транзакционной. Для нетранзакционных операций, которые включают только запросы, она может повысить эффективность запросов.
  • Функция tansaction() — это транзакционная операция, а транзакционные операции должны использоваться для добавления, удаления и изменения.
  • Параметр запроса передается, чтобы получить диапазон для отслеживания журнала.
  • TODOРазделение чтения и записи базы данных

Ссылки по теме

asyncpg benchmarks

клиент

Используя клиент в aiohttp, клиент просто инкапсулируется для доступа между микросервисами.

Не создавайте сессию на запрос, скорее всего вам нужна сессия на приложение, которое выполняет все запросы вместе. Сеанс содержит внутри пул соединений, повторное использование соединения и поддержка активности (оба включены по умолчанию) могут повысить общую производительность.

Example:

@app.listener('before_server_start')
async def before_srver_start(app, loop):
    app.client =  Client(loop, url='http://host:port')

async def get_role_by_id(request, id):
    cli = request.app.client.cli(request)
    async with cli.get('/cities/{}'.format(id)) as res:
        return await res.json()

@app.listener('before_server_stop')
async def before_server_stop(app, loop):
    app.client.close()

Для доступа к различным микросервисам вы можете создать несколько разных клиентов, чтобы каждый клиент оставался активным.

Ссылки по теме

aiohttp

Журнал и распределенная система отслеживания

Чтобы использовать официальное ведение журнала, файл конфигурации — logging.yml, а версия Sanic должна быть 0.6.0 или выше. JsonFormatter конвертирует логи в формат json для ввода в ES

Enter OpenTracing: by offering consistent, expressive, vendor-neutral APIs for popular platforms, OpenTracing makes it easy for developers to add (or switch) tracing implementations with an O(1) configuration change. OpenTracing also offers a lingua franca for OSS instrumentation and platform-specific tracing helper libraries. Please refer to the Semantic Specification.

регистратор декораторов

@logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO)
async def get_city_by_id(request, id):
    cli = request.app.client.cli(request)
  • тип: тип журнала, такой как метод, маршрут
  • категория: категория журнала, по умолчанию это имя приложения
  • подробно: подробности журнала
  • описание: описание журнала, по умолчанию комментарий к функции
  • трассировка: трассировка журнала, значение по умолчанию — True
  • Уровень: уровень журнала, по умолчанию это информация

Распределенная система трассировки

  • OpenTracing основан на распределенных системах трассировки, таких как Dapper и Zipkin, и установил единый стандарт.
  • Opentracing отслеживает каждый запрос, записывает каждый микросервис, через который проходит запрос, и связывает их в цепочку, что имеет решающее значение для анализа узких мест производительности микросервисов.
  • Использует инфраструктуру opentracing, но на выходе конвертирует в формат zipkin. Поскольку большинство распределенных систем трассировки используют экономию для связи с учетом проблем с производительностью, в духе простоты и стиля Restful связь RPC не используется. Вывод в виде журналов, вы можете использовать fluentd, logstash и другие журналы для сбора и последующего ввода в Zipkin. Zipkin поддерживает ввод HTTP.
  • Сгенерированный диапазон сначала помещается в очередь без блокировки, а диапазон очереди используется в задаче. Частота апсемплинга может быть добавлена ​​позже.
  • Для БД клиент добавил трассировку

Ссылки по теме

opentracing zipkin jaeger

API-интерфейс

В документации API используется стандарт swagger.

Example:

from sanic_ms import doc

@user_bp.post('/')
@doc.summary('create user')
@doc.description('create user info')
@doc.consumes(Users)
@doc.produces({'id': int})
async def create_user(request):
    data = request['data']
    async with request.app.db.transaction(request) as cur:
        record = await cur.fetchrow(
            """ INSERT INTO users(name, age, city_id, role_id)
                VALUES($1, $2, $3, $4, $5)
                RETURNING id
            """, data['name'], data['age'], data['city_id'], data['role_id']
        )
        return {'id': record['id']}
  • сводка: сводка API
  • описание: подробное описание
  • потребляет: данные тела запроса
  • производит: возвращаемые данные ответа
  • тег: тег API
  • Параметр, переданный в Consumers и Products, может быть моделью peewee, которая будет анализировать модель для создания данных API, и параметром help_text в поле field для представления эталонного объекта.
  • http://host:ip/openapi/spec.json, чтобы получить сгенерированные данные json.

Ссылки по теме

swagger

Данные ответа

При возврате не возвращайте ответ sanic, а возвращайте исходные данные напрямую.Возвращенные данные будут обработаны в промежуточном программном обеспечении и возвращены в унифицированный формат.Конкретный формат может быть [View]

модульный тест

Модульные тесты используют unittest. Mock создал MockClient сам по себе, потому что у unittest нет asyncio mock, а тестовый интерфейс sanic также отправляет запросы запросов, поэтому это более хлопотно.Вы можете использовать pytest позже.

Example:

from sanic_ms.tests import APITestCase
from server import app

class TestCase(APITestCase):
    _app = app
    _blueprint = 'visit'

    def setUp(self):
        super(TestCase, self).setUp()
        self._mock.get('/cities/1',
                       payload={'id': 1, 'name': 'shanghai'})
        self._mock.get('/roles/1',
                       payload={'id': 1, 'name': 'shanghai'})

    def test_create_user(self):
        data = {
            'name': 'test',
            'age': 2,
            'city_id': 1,
            'role_id': 1,
        }
        res = self.client.create_user(data=data)
        body = ujson.loads(res.text)
        self.assertEqual(res.status, 200)
  • где _blueprint — имя чертежа
  • В функции setUp используйте _mock для регистрации фиктивной информации, чтобы не было доступа к реальному серверу, а полезной нагрузкой была возвращенная информация о теле.
  • Используйте клиентскую переменную для вызова каждой функции, данные — это информация о теле, params — это информация о параметрах пути, а другие параметры — это параметры маршрута.

покрытие кода

coverage erase
coverage run --source . -m sanic_ms tests
coverage xml -o reports/coverage.xml
coverage2clover -i reports/coverage.xml -o reports/clover.xml
coverage html -d reports
  • Покрытие2colver предназначено для преобразования покрытия.xml в clover.xml, формат, требуемый bamboo, — это clover.

Ссылки по теме

unittest coverage

Обработка исключений

Используйте app.error_handler = CustomHander() для обработки выброшенного исключения.

Example:

from sanic_ms.exception import ServerError

@visit_bp.delete('/users/<id:int>')
async def del_user(request, id):
    raise ServerError(error='内部错误',code=10500, message="msg")
  • code: код ошибки, 0 если нет исключения, остальные значения являются исключениями
  • сообщение: информация о коде состояния
  • ошибка: пользовательское сообщение об ошибке
  • status_code: код состояния http, используйте стандартный код состояния http