Автор: Линь Гуаньхун / Призраки под рукой
Самородки:Талант /user/178526…
Гитхаб:GitHub.com/afan913337456…
Облачная колонка Tencent:cloud.Tencent.com/developer/U…
Колонка блокчейна червоточины:woohoo.impulsecommunity.com/article/153…
содержание
- пролог
- Общий процесс заказа
- Подумайте о узких местах
- очередь заказов
- Первая очередь заказов
- Вторая очередь заказов
- Суммировать
- Реализовать выбор очереди
- отвечать
- Реализовать выбор очереди
- Пример кода для Go-версии второй очереди
пролог
Текущая разработка направлена в основном на
传统电商应用
и区块链技术
В совокупности платформа блокчейна по-прежнему以太坊
, Кроме того, книги, написанные мной в эти дни и изданные издательством Университета Цинхуа, наконец-то были опубликованы и расставлены на полках после августа.Имена:《区块链以太坊DApp开发实战》
, теперь доступен для покупок в Интернете.
Идеи, которыми мы поделимся в этой статье, обычно используются в приложениях электронной коммерции.
订单队列
.
Общий процесс заказа
В приложениях электронной коммерции простые и интуитивно понятные шаги для пользователей по завершению всего процесса от заказа до оплаты могут быть представлены на следующем рисунке:
в,订单信息持久化
, заключается в хранении данных в базе данных. После того, как конечный клиент совершит платеж,更新订单状态
Операция — это ссылка обратного вызова, установленная сторонней платежной платформой для обратного вызова.NotifyUrl
, продолжать.
Процесс обновления заполнения статуса заказа показан на следующем рисунке:
Подумайте о узких местах
серверная прямая瓶颈点
, сначала рассмотримTPS
. Убираем точки подразделения, мы в основном смотрим на订单信息持久化
узкое место.
В бизнес-сценариях с высокой степенью параллелизма, таких как秒杀
,优惠价抢购
Ждать. Будет много запросов на заказ за короткий промежуток времени.订单信息持久化
часть, без оптимизации, а прямо на уровне базы данных频繁的
Для операций чтения и записи база данных не выдержит, и она легко станет первым сервисом, который выйдет из строя, как, например, обычный процесс записи ордеров, показанный на рисунке ниже:
можно увидеть,每
Для сохранения информации о заказе обычно требуется операция сетевого подключения (связь с базой данных) и несколькоI/O
работать.
выгода от连接池
Технология, мы можем напрямую получить дескриптор открытого соединения из пула и использовать его напрямую, без повторной инициации полного HTTP-запроса каждый раз при подключении к базе данных, что аналогично принципу пула потоков.
Кроме того, мы можем добавить дополнительные оптимизации в описанный выше процесс, например, для некоторой информации, которую необходимо прочитать, ее можно заранее сохранить в слое кэша памяти и добавить для обслуживания обновлений, чтобы ее можно было быстро прочитать, когда это используется.
Даже если у нас есть некоторые из вышеперечисленных методов оптимизации, но для写操作
изI/O
время блокировки, в高并发请求
Когда еще легко сделать базу данных невыносимой, легко показаться链接多开异常
,操作超时
И другие вопросы.
В дополнение к операциям, упомянутым выше, существуют следующие средства оптимизации операций на этом уровне:
- Кластер базы данных, использующий разделение чтения и записи для уменьшения нагрузки при записи.
- Подбаза данных, разные бизнес-таблицы размещаются в разных базах данных, что создает проблемы с распределенными транзакциями.
- Пиковое отсечение с использованием модели очереди
У каждого метода есть свои особенности, ведь эта статья о订单队列
Архитектурная идея Итак, давайте посмотрим, как внедрить очереди заказов в систему заказов.
очередь заказов
В Интернете есть много статей о практике организации очередей заказов, и в большинстве из них отсутствует согласованность запросов и ответов.
第一种订单队列
блок-схема:
На картинке выше изображена модель очереди, упоминаемая в большинстве статей, есть две проблемы, которые не решены:
- Если для заказа есть сторонний платеж, как обеспечить согласованность ① и ②, например, один из способов обработки не работает;
- Если для заказа есть сторонняя оплата, ① оплата завершена, и сторонняя платежная платформа перезвонила
notifyUrl
, и в это время ② все еще находится в очереди на обработку, как поступить в этой ситуации.
Прежде всего, убедитесь, что приведенная выше схема потока заказов не является проблемой. Он имеет следующие преимущества и недостатки, и есть решения двух упомянутых проблем.
преимущество:
- Пользователям не нужно ждать постоянной обработки заказа, они могут получить ответ напрямую, что позволяет быстро разместить заказ.
- Персистентная обработка с использованием очереди в порядке очереди не повлияет на уровень базы данных вместе с упомянутыми выше высокими одновременными запросами.
- очень изменчивый,
搭配中间件
сильная комбинация.
недостаток:
- Когда в очереди стоят несколько заказов, скорость обработки шага ② не может соответствовать. Это приводит ко второй проблеме.
- сложнее в реализации
Я дам решения проблем, упомянутых выше. Давайте взглянем на другую блок-схему очереди заказов.
第二种订单队列
блок-схема:
Модель конструкции очереди второго порядка, обратите внимание на ее同步等待
Результат обработки персистентности решает проблему непротиворечивости между персистентностью и ответом, но есть серьезная трудоемкая проблема ожидания.Его преимущества и недостатки заключаются в следующем:
преимущество:
- Сильная согласованность между настойчивостью и реакцией.
- Персистентная обработка с использованием очереди в порядке очереди не повлияет на уровень базы данных вместе с упомянутыми выше высокими одновременными запросами.
- Простота реализации
недостаток:
- Когда несколько заказов поставлены в очередь, скорость обработки блока сохраняемости не может поддерживаться, в результате чего клиент ожидает ответа синхронно.
Это своего рода очередь заказов, я опубликую ее нижеGolang
Реализован код версии.
Суммировать
Сравнивая две приведенные выше модели общего порядка, если из用户体验的角度
Для расстановки приоритетов первый не требует от пользователя ожидания持久化处理
Результат значительно лучше, чем у второго. Если техническая команда идеальна, а технология сильна, следует также рассмотреть первый метод реализации.
Если вы просто хотите добиться宁愿用户等待到超时
Не хочу быть смытым сервисом уровня хранилища, поэтому буду рассматривать второй.
Реализовать выбор очереди
Здесь мы разберемся дальше, какие есть варианты реализации функционала модуля очереди.
Я полагаю, что многие товарищи с более опытным бэкенд-разработкой уже думали об использовании существующего промежуточного ПО, например известногоRedis
,RocketMQ
,а такжеKafka
И т. д., все они являются опцией.
Кроме того, мы также можем напрямую написать код для создания очереди сообщений в текущей системе обслуживания для достижения цели.Ниже я использую диаграмму для классификации типов очередей.
Разные реализации очередей могут напрямую вести к разным функциям, а также иметь разные преимущества и недостатки:
Преимущества кэша L1:
- Кэш первого уровня, самый быстрый. Не нужно связываться, получайте прямо из слоя памяти;
- Его просто реализовать, если не учитывать постоянство и кластеризацию.
Недостатки кэша L1:
- Если учесть постоянство и кластеризацию, реализовать их будет сложнее.
- Независимо от сохраняемости, если сервер отключен или обслуживание прервано по другим причинам, информация о заказе в очереди будет потеряна.
Преимущества промежуточного программного обеспечения:
- Программное обеспечение является зрелым, на практике используется хорошо известное промежуточное ПО для обработки сообщений, а документация обширна;
- Поддержка различных стратегий сохранения, таких как Redis.
增量
Постоянство, которое может свести к минимуму потерю информации о заказе из-за неожиданных сбоев; - Поддержка кластеризации, синхронизации master-slave, что является важным требованием для распределенных систем.
Недостатки промежуточного ПО:
- Во время распределенного развертывания необходимо установить связь по каналу, что приводит к сетевой связи для операций чтения и записи.
отвечать
Вернемся к модели первого порядка:
Вопрос 1:
Если за заказ взимается сторонняя оплата, как можно гарантировать согласованность ① и ②?
Во-первых, давайте посмотрим, что происходит, когда возникает несоответствие:
- ① Сбой, пользователь не может получить результат из-за проблем с сетью или возврата на другие страницы. Если ② успешно, то окончательный статус заказа должен быть оплачен. Пользователь может войти в персональный центр заказов для завершения оплаты заказа;
- ① и ② оба терпят неудачу, тогда заказ терпит неудачу;
- ① успех, ② неудача, в это время пользователь находится в
响应页面
После завершения платежного действия пользователь видит, что информация о заказе пуста.
В приведенной выше ситуации, очевидно, только 3 нужно восстановить информацию о заказе, и решения таковы:
- Когда к интерфейсу обратного вызова на стороне сервера обращается сторонняя платежная платформа, соответствующая информация о заказе не может быть найдена. Затем сохраните такие данные, которые были оплачены, но сначала не имеют информации о заказе, например, хранятся в
表A
. одновременно запустить定时任务B
В частности, просмотрите таблицу A, а затем перейдите к списку заказов, чтобы узнать, существует ли уже соответствующая информация о заказах, обновите, если есть, продолжите, если нет, или следуйте установленной стратегии обнаружения. - Когда ② связано с сервером
非崩溃性原因
И когда это не удается:- Если это не удается, повторно вставьте исходные данные заказа в
队列头部
, ожидая следующей обработки повторного сохранения.
- Если это не удается, повторно вставьте исходные данные заказа в
- Когда ② из-за сервера
崩溃性
Когда причина не работает:-
定时任务B
После нескольких тестов безрезультатно, то по данным сторонней платежной платформы прошел обратный вызов订单附属信息
Восстановить порядок.
-
- В течение всего процесса восстановления заказа пользователь просматривает информацию о заказе пустую.
-
定时任务B
где сервис最好
и ссылка обратного звонкаnotifyUrl
Служба интерфейса такая же, что гарантирует, что когда B повесит трубку, служба обратного вызова также повесит трубку, а затем сторонняя платежная платформа не сможет вызвать обратный вызов, у них будет重试逻辑
, опираясь на это, при перезапуске службы обратного вызова можно завершить восстановление информации о заказе.
Вопрос 2:
Если для заказа есть оплата третьей стороной, ① платеж был завершен, и сторонняя платежная платформа перезвонила на notifyUrl, и в это время ② все еще находится в очереди на обработку, как поступить в этой ситуации?
Решения для справки问题1
из定时任务B
Обнаружение механизма модификации.
Пример кода для Go-версии второй очереди
определить некоторые константы
const (
QueueOrderKey = "order_queue"
QueueBufferSize = 1024 // 请求队列大小
QueueHandleTime = time.Second * 7 // 单个 mission 超时时间
)
Определите интерфейсы входа и выхода для облегчения нескольких реализаций
// 定义出入队接口,方便多种实现
type IQueue interface {
Push(key string,data []byte) error
Pop(key string) ([]byte,error)
}
Определение объектов запросов и ответов
// 定义请求与响应实体
type QueueTimeoutResp struct {
Timeout bool // 超时标志位
Response chan interface{}
}
type QueueRequest struct {
ReqId string `json:"req_id"` // 单次请求 id
Order *model.OrderCombine `json:"order"` // 订单信息 bean
AccessTime int64 `json:"access_time"` // 请求时间
ResponseChan *QueueTimeoutResp `json:"-"`
}
определить сущность очереди
// 定义队列实体
type Queue struct {
mapLock sync.Mutex
RequestChan chan *QueueRequest // 缓存管道,装载请求
RequestMap map[string]*QueueTimeoutResp
Queue IQueue
}
Создайте очередь и получите параметры интерфейса
// 实例化队列,接收接口参数
func NewQueue(queue IQueue) *Queue {
return &Queue{
mapLock: sync.Mutex{},
RequestChan: make(chan *QueueRequest, QueueBufferSize),
RequestMap: make(map[string]*QueueTimeoutResp, QueueBufferSize),
Queue: queue,
}
}
получить запрос
// 接收请求
func (q *Queue) AcceptRequest(req *QueueRequest) interface{} {
if req.ResponseChan == nil {
req.ResponseChan = &QueueTimeoutResp{
Timeout: false,
Response: make(chan interface{},1),
}
}
userKey := key(req) // 唯一 key 生成函数
req.ReqId = userKey
q.mapLock.Lock()
q.RequestMap[userKey] = req.ResponseChan // 内存层存储对应的 req 的 resp 管道指针
q.mapLock.Unlock()
q.RequestChan <- req // 接收请求
log("userKey : ", userKey)
ticker := time.NewTicker(QueueHandleTime) // 以超时时间 QueueHandleTime 启动一个定时器
defer func() {
ticker.Stop() // 释放定时器
q.mapLock.Lock()
delete(q.RequestMap,userKey) // 当处理完一个 req,从 map 中移出
q.mapLock.Unlock()
}()
select {
case <-ticker.C: // 超时
req.ResponseChan.Timeout = true
Queue_TimeoutCounter++ // 辅助计数,int 类型
log("timeout: ",userKey)
return lghError.HandleTimeOut // 返回超时错误的信息
case result := <-req.ResponseChan.Response: // req 被完整处理
return result
}
}
Возьмите req из конвейера запросов и поместите его в контейнер очереди, функция находится вgorutine
вбегает
// 从请求管道中取出 req 放入到队列容器中,该函数在 gorutine 中运行
func (q *Queue) addToQueue() {
for {
req := <-q.RequestChan // 取出一个 req
data, err := json.Marshal(req)
if err != nil {
log("redis queue parse req failed : ", err.Error())
continue
}
if err = q.Queue.Push(QueueOrderKey, data);err != nil { // push 入队,这里有时间消耗
log("lpush req failed. Error : ", err.Error())
continue
}
log("lpush req success. req time: ", req.AccessTime)
}
}
Выньте процесс req, функция находится вgorutine
вбегает
// 取出 req 处理,该函数在 gorutine 中运行
func (q *Queue) readFromQueue() {
for {
data, err := q.Queue.Pop(QueueOrderKey) // pop 出队,这里也有时间消耗
if err != nil {
log("lpop failed. Error : ", err.Error())
continue
}
if data == nil || len(data) == 0 {
time.Sleep(time.Millisecond * 100) // 空数据的 req,停顿下再取
continue
}
req := &QueueRequest{}
if err = json.Unmarshal(data, req);err != nil {
log("Lpop: json.Unmarshal failed. Error : ", err.Error())
continue
}
userKey := req.ReqId
q.mapLock.Lock()
resultChan, ok := q.RequestMap[userKey] // 取出对应的 resp 管道指针
q.mapLock.Unlock()
if !ok {
// 中间件重启时,比如 redis 重启而读取旧 key,会进入这里
Queue_KeyNotFound ++ // 计数 int 类型
log("key not found, rollback: ", userKey)
continue
}
simulationTimeOutReq4(req) // 模拟出来任务的函数,入参为 req
if resultChan.Timeout {
// 处理期间,已经超时,这里做可以拓展回滚操作
Queue_MissionTimeout ++
log("handle mission timeout: ", userKey)
continue
}
log("request result send to chan succeee, userKey : ", userKey)
ret := util.GetCommonSuccess(req.AccessTime)
resultChan.Response <- &ret // 输入处理成功
}
}
запускать
func (q *Queue) Start() {
go q.addToQueue()
go q.readFromQueue()
}
запустить пример
func test(){
...
runtime.GOMAXPROCS(4)
redisQueue := NewQueue(NewFastCacheQueue())
redisQueue.Start()
reqNumber := testReqNumber
wg := sync.WaitGroup{}
wg.Add(reqNumber)
for i :=0;i<reqNumber;i++ {
go func(index int) {
combine := model.OrderCombine{}
ret := AcceptRequest(&QueueRequest{
UserId: int64(index),
Order: &combine,
AccessTime: time.Now().Unix(),
ResponseChan: nil,
})
fmt.Println("ret: ------------- ",ret.String())
wg.Done()
}(i)
}
wg.Wait()
time.Sleep(3*time.Second)
fmt.Println("TimeoutCounter: ",Queue_TimeoutCounter,"KeyNotFound: ",Queue_KeyNotFound,"MissionTimeout: ",Queue_MissionTimeout)
}