При разработке микросервисов API-шлюз играет роль предоставления API-интерфейсов для внешнего мира, а данные API-интерфейсов часто зависят от других сервисов, а сложные API-интерфейсы зависят от нескольких или даже десятков сервисов. Хотя время, затрачиваемое одной зависимой службой, как правило, невелико, если несколько служб последовательно зависят, время, затрачиваемое на весь API, будет значительно увеличено.
Так за счет чего его оптимизировать? Первое, о чем мы думаем, — это параллельная работа с зависимостями, что может сократить время, затрачиваемое на всю зависимость.Базовая библиотека Go предоставляет намWaitGroupИнструмент используется для управления параллелизмом, но в реальном бизнес-сценарии, если есть ошибка в нескольких зависимостях, мы ожидаем немедленного возврата вместо ожидания выполнения всех зависимостей перед возвратом результата и присвоения переменных в WaitGroup часто необходимо заблокировать, каждая функция зависимости должна добавлять добавление и выполнение, что более подвержено ошибкам для новичков.
Основываясь на вышеизложенном, фреймворк go-zero предоставляет нам инструменты параллельной обработки.MapReduce, инструмент готов к использованию "из коробки" и не требует какой-либо инициализации.Давайте посмотрим на сравнение затрат времени между использованием MapReduce и его отсутствием на следующем рисунке:
Для тех же зависимостей последовательная обработка занимает 200 мс. Потребляемая по времени после использования Mapreccuce равен максимальной трудоемке всех зависимостей 100 мс. Видно, что Mapreatuce может значительно снизить время обслуживания, и эффект будет больше Очевидно, что и зависимости увеличиваются, снижая трудоемкую обработку без увеличения давления сервера
Инструменты параллельной обработкиMapReduce
MapReduceЭто программная архитектура, предложенная Google для параллельных вычислений крупномасштабных наборов данных.Инструмент MapReduce в go-zero основан на этой архитектурной идее.
Инструмент MapReduce в структуре go-zero в основном используется для одновременной обработки пакетных данных для повышения производительности службы.
Мы демонстрируем использование MapReduce на нескольких примерах.
MapReduce в основном имеет три параметра. Первый параметр генерирует для создания данных, второй параметр — преобразователь для обработки данных, а третий параметр — редуктор для агрегирования и возврата данных после преобразователя.Опция opts устанавливает количество потоков для одновременной обработки.
Сцена первая: некоторые функциональные результаты часто должны полагаться на более чем одну службу, например, списки результатов часто зависят от обслуживания клиентов, запасов, услуг, заказов на обслуживание и т. д., как правило, зависимые услуги предоставляются в форме иностранного rpc. , чтобы уменьшить, мы склонны полагаться на отнимающую много времени необходимость полагаться на параллельную обработку
func productDetail(uid, pid int64) (*ProductDetail, error) {
var pd ProductDetail
err := mr.Finish(func() (err error) {
pd.User, err = userRpc.User(uid)
return
}, func() (err error) {
pd.Store, err = storeRpc.Store(pid)
return
}, func() (err error) {
pd.Order, err = orderRpc.Order(pid)
return
})
if err != nil {
log.Printf("product detail error: %v", err)
return nil, err
}
return &pd, nil
}
В этом примере возврат сведений о продукте зависит от нескольких служб для получения данных, поэтому параллельная обработка зависимостей значительно повысит производительность интерфейса.
Сценарий 2: Много раз нам нужно обработать пакет данных, например, для пакета идентификаторов пользователей, чтобы проверить легитимность каждого пользователя, и если в процессе проверки возникает ошибка, проверка считается не пройденной, и возвращаемый результат является действительным идентификатором пользователя.
func checkLegal(uids []int64) ([]int64, error) {
r, err := mr.MapReduce(func(source chan<- interface{}) {
for _, uid := range uids {
source <- uid
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
uid := item.(int64)
ok, err := check(uid)
if err != nil {
cancel(err)
}
if ok {
writer.Write(uid)
}
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
var uids []int64
for p := range pipe {
uids = append(uids, p.(int64))
}
writer.Write(uids)
})
if err != nil {
log.Printf("check error: %v", err)
return nil, err
}
return r.([]int64), nil
}
func check(uid int64) (bool, error) {
// do something check user legal
return true, nil
}
В этом примере, если в процессе проверки возникает ошибка, метод отмены используется для завершения процесса проверки, и возвращается ошибка. Весь процесс проверки завершается. Если результат проверки uid оказывается ложным, окончательный результат не возвращает uid
Меры предосторожности при использовании MapReduce
- Отмена может быть вызвана как в маппере, так и в редюсере, параметром является ошибка, он вернется сразу после вызова, а результат возврата равен нулю, ошибка
- Если в маппере не вызывается write.Write, элемент не будет агрегироваться редюсером в конце
- Если в редюсере не вызывается Writer.Wirte, результат равен нулю, ErrReduceNoOutput
- Редьюсер однопоточный, и здесь последовательно агрегируются результаты всех картографов.
Анализ принципа реализации:
В MapReduce сначала сгенерируйте данные, выполнив generate (параметр — небуферизованный канал) через метод buildSource, и верните небуферизованный канал, картограф будет считывать данные из канала
func buildSource(generate GenerateFunc) chan interface{} {
source := make(chan interface{})
go func() {
defer close(source)
generate(source)
}()
return source
}
Метод отмены определен в методе MapReduceWithSource.Этот метод можно вызывать как в маппере, так и в редукторе.После вызова основной поток вернется сразу же после получения сигнала закрытия.
cancel := once(func(err error) {
if err != nil {
retErr.Set(err)
} else {
// 默认的error
retErr.Set(ErrCancelWithNil)
}
drain(source)
// 调用close(ouput)主线程收到Done信号,立马返回
finish()
})
ExecuteMappers вызывается в методе mapperDispatcher. ExecuteMappers использует данные, сгенерированные buildSource. Каждый элемент запускает горутину для его обработки отдельно. Максимальное число одновременных операций по умолчанию — 16, которое можно установить с помощью WithWorkers.
var wg sync.WaitGroup
defer func() {
wg.Wait() // 保证所有的item都处理完成
close(collector)
}()
pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done) // 将mapper处理完的数据写入collector
for {
select {
case <-done: // 当调用了cancel会触发立即返回
return
case pool <- lang.Placeholder: // 控制最大并发数
item, ok := <-input
if !ok {
<-pool
return
}
wg.Add(1)
go func() {
defer func() {
wg.Done()
<-pool
}()
mapper(item, writer) // 对item进行处理,处理完调用writer.Write把结果写入collector对应的channel中
}()
}
}
Единая горутина редуктора обрабатывает данные, записанные преобразователем в сборщик.Если редюсер вручную не вызывает write.Write, в конечном итоге будет выполнен метод finish, чтобы закрыть вывод, чтобы избежать взаимоблокировки.
go func() {
defer func() {
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reducer(collector, writer, cancel)
}()
Инструментарий также предоставляет множество методов для различных бизнес-сценариев.Принцип реализации аналогичен принципу MapReduce.Заинтересованные студенты могут просмотреть исходный код, чтобы узнать
- Функция MapReduceVoid похожа на MapReduce, но результат не возвращается, возвращается только ошибка
- Finish обрабатывает фиксированное количество зависимостей, возвращает ошибку и немедленно возвращается, если возникает ошибка.
- FinishVoid похож на метод Finish и не имеет возвращаемого значения.
- Карта только генерирует и обрабатывает карту и возвращает канал
- MapVoid похож на карту, без возврата
В этом документе представлен инструмент фреймворка Go-Zero MapReduce, который в реальном проекте очень практичен. Хороший инструмент для повышения производительности службы и эффективности разработки очень полезен, я надеюсь, что эта статья принесет нам пользу.