Я уже говорил о сопрограммах golang и обнаружил, что они кажутся очень теоретическими, особенно с точки зрения безопасности параллелизма, поэтому я объединил несколько примеров из Интернета, чтобы проверить магию канала, выбора и контекста в подпрограмме go.
Сценарий — вызов микросервиса
мы используемgin(веб-фреймворк) В качестве инструмента для обработки запросов требования следующие: Запрос X будет вызывать три метода A, B и C параллельно и добавлять результаты, возвращенные тремя методами, в качестве ответа на запрос X. Но наш ответ требует времени (время ответа не может превышать 5 секунд),
Может случиться так, что один или оба из A, B и C имеют очень сложную логику обработки, или объем данных слишком велик, что приводит к тому, что время обработки превышает ожидаемое. Затем мы немедленно отсекаем его и возвращаем сумму всех полученных результатов.
Сначала определим основную функцию:
func main() {
r := gin.New()
r.GET("/calculate", calHandler)
http.ListenAndServe(":8008", r)
}
Очень простое, обычное принятие запроса и определение обработчика. где calHandler — это функция, которую мы используем для обработки запроса.
Определите три поддельных микросервиса соответственно, третий из которых будет нашим тайм-аутом~
func microService1() int {
time.Sleep(1*time.Second)
return 1
}
func microService2() int {
time.Sleep(2*time.Second)
return 2
}
func microService3() int {
time.Sleep(10*time.Second)
return 3
}
Далее посмотрим, что именно есть в calHandler
func calHandler(c *gin.Context) {
...
c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
return
}
Типичный ответ джина, нам все равно, какая сумма.
Пункт 1 - одновременные вызовы
Просто используйте go напрямую~ Итак, в начале мы могли бы написать что-то вроде этого:
go microService1()
go microService2()
go microService3()
Это очень просто, но подождите, как мне получить возвращаемое значение, которое считается хорошим? Чтобы получать результаты обработки параллельно, легко придумать использование каналов для захвата. Поэтому мы изменили службу вызова на это:
var resChan = make(chan int, 3) // 因为有3个结果,所以我们创建一个可以容纳3个值的 int channel。
go func() {
resChan <- microService1()
}()
go func() {
resChan <- microService2()
}()
go func() {
resChan <- microService3()
}()
Если есть что подключать, должен быть способ вычислить это, поэтому мы добавляем метод, который перебирает результаты в resChan и вычисляет:
var resContainer, sum int
for {
resContainer = <-resChan
sum += resContainer
}
Таким образом, у нас есть сумма для вычисления результата от resChan каждый раз.
Пункт 2 - сигнал тайм-аута
Это еще не конец, как насчет обработки тайм-аута? Чтобы реализовать обработку тайм-аута, нам нужно ввести одну вещь, а именно контекст.Что такое контекст?Здесь мы используем только одну особенность контекста — уведомление о тайм-ауте (на самом деле эту функцию можно полностью заменить каналом).
Видно, что мы передали c *gin.Context в качестве параметра при определении calHandler, поэтому нам не нужно объявлять его самостоятельно. gin.Context просто понимается как контейнер контекста, который проходит через весь цикл объявления gin, что немного похоже на клон или ощущение квантовой запутанности.
С помощью этого gin.Context мы можем работать с контекстом в одном месте, и другие функции или методы, использующие контекст, также почувствуют изменения, сделанные контекстом.
ctx, _ := context.WithTimeout(c, 3*time.Second) //定义一个超时的 context
Пока время истекло, мы можем использовать ctx.Done() для получения канала тайм-аута (уведомления), а затем другие места, использующие этот ctx, остановятся и отпустят ctx. Обычно ctx.Done() используется вместе с select. Итак, нам нужен еще один цикл для прослушивания ctx.Done().
for {
select {
case <- ctx.Done():
// 返回结果
}
Теперь, когда у нас есть два for, можем ли мы их объединить?
for {
select {
case resContainer = <-resChan:
sum += resContainer
fmt.Println("add", resContainer)
case <- ctx.Done():
fmt.Println("result:", sum)
return
}
}
Эй, хорошо выглядит. Но как нам вывести результат, когда вызов микросервиса завершился нормально? Похоже, нам тоже нужен флаг
var count int
for {
select {
case resContainer = <-resChan:
sum += resContainer
count ++
fmt.Println("add", resContainer)
if count > 2 {
fmt.Println("result:", sum)
return
}
case <- ctx.Done():
fmt.Println("timeout result:", sum)
return
}
}
Мы добавляем счетчик, потому что мы вызываем микросервис только 3 раза, поэтому, когда count больше 2, мы должны завершить и вывести результат.
Пункт 3 — Ожидание параллелизма
Таймер выше — это ленивый подход, потому что мы знаем, сколько раз мы вызывали микросервис, что, если мы этого не знаем или добавим его позже? Было бы слишком неэлегантно каждый раз вручную изменять пороговое значение подсчета? На этом этапе мы можем добавить пакет синхронизации. Одна из функций синхронизации, которую мы будем использовать, — это группа ожидания. Его роль — дождаться запуска набора сопрограмм, а затем выполнить следующие шаги.
Изменим блок кода предыдущего вызова микросервиса:
var success = make(chan int, 1) // 成功的通道标识
wg := sync.WaitGroup{} // 创建一个 waitGroup 组
wg.Add(3) // 我们往组里加3个标识,因为我们要运行3个任务
go func() {
resChan <- microService1()
wg.Done() // 完成一个,Done()一个
}()
go func() {
resChan <- microService2()
wg.Done()
}()
go func() {
resChan <- microService3()
wg.Done()
}()
wg.Wait() // 直到我们前面三个标识都被 Done 了,否则程序一直会阻塞在这里
success <- 1 // 我们发送一个成功信号到通道中
注意
: Если мы поместим приведенный выше код непосредственно в calHandler, возникнет проблема, WaitGroup заблокирует наш нормальный вывод, несмотря ни на что (смерть и жизнь сделают вас тайм-аутом).
Итак, давайте выделим приведенный выше код, относящийся к бизнес-логике, и завершим его.
// rc 是结果 channel, success 是成功与否的 flag channel
func MyLogic(rc chan<- int, success chan<- int) {
wg := sync.WaitGroup{} // 创建一个 waitGroup 组
wg.Add(3) // 我们往组里加3个标识,因为我们要运行3个任务
go func() {
rc <- microService1()
wg.Done() // 完成一个,Done()一个
}()
go func() {
rc <- microService2()
wg.Done()
}()
go func() {
rc <- microService3()
wg.Done()
}()
wg.Wait() // 直到我们前面三个标识都被 Done 了,否则程序一直会阻塞在这里
success <- 1 // 我们发送一个成功信号到通道中
}
В конечном счете, этот MyLogic должен работать как сопрограмма. (Спасибо @TomorrowWu и @chenqinghe за напоминание)
Теперь, когда у нас есть сигнал успеха, добавьте его в цикл мониторинга for и внесите некоторые изменения, чтобы удалить исходную часть оценки подсчета.
for {
select {
case resContainer = <-resChan:
sum += resContainer
fmt.Println("add", resContainer)
case <- success:
fmt.Println("result:", sum)
return
case <- ctx.Done():
fmt.Println("result:", sum)
return
}
}
Три корпуса, с четким разделением труда,
case resContainer = <-resChan:
Используется для получения результата вывода логики и расчета
case <- success:
идеальный нормальный выход
case <- ctx.Done():
это вывод в случае тайм-аута
Немного доработаем и поставим последние два случаяfmt.Println("result:", sum)
Переход на стандартный http-ответ джина
c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
return
На этом весь основной код завершен. Ниже полная версия
package main
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
)
// 一个请求会触发调用三个服务,每个服务输出一个 int,
// 请求要求结果为三个服务输出 int 之和
// 请求返回时间不超过3秒,大于3秒只输出已经获得的 int 之和
func calHandler(c *gin.Context) {
var resContainer, sum int
var success, resChan = make(chan int), make(chan int, 3)
ctx, cancel := context.WithTimeout(c, 5*time.Second)
defer cancel()
// 真正的业务逻辑
go MyLogic(resChan, success)
for {
select {
case resContainer = <-resChan:
sum += resContainer
fmt.Println("add", resContainer)
case <- success:
c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
return
case <- ctx.Done():
c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
return
}
}
}
func main() {
r := gin.New()
r.GET("/calculate", calHandler)
http.ListenAndServe(":8008", r)
}
func MyLogic(rc chan<- int, success chan<- int) {
wg := sync.WaitGroup{} // 创建一个 waitGroup 组
wg.Add(3) // 我们往组里加3个标识,因为我们要运行3个任务
go func() {
rc <- microService1()
wg.Done() // 完成一个,Done()一个
}()
go func() {
rc <- microService2()
wg.Done()
}()
go func() {
rc <- microService3()
wg.Done()
}()
wg.Wait() // 直到我们前面三个标识都被 Done 了,否则程序一直会阻塞在这里
success <- 1 // 我们发送一个成功信号到通道中
}
func microService1() int {
time.Sleep(1*time.Second)
return 1
}
func microService2() int {
time.Sleep(2*time.Second)
return 2
}
func microService3() int {
time.Sleep(6*time.Second)
return 3
}
Приведенная выше программа просто описывает сценарий обработки, в котором время ожидания вызова других микрослужб истекает. В самом процессе нужно добавить много приправ, чтобы обеспечить внешнюю целостность интерфейса.