[Go Concurrent Programming] Часть 2 — Горутины и каналы

задняя часть Go
[Go Concurrent Programming] Часть 2 — Горутины и каналы

Goroutines

Горутина — самая основная единица выполнения в Go. На самом деле в каждой программе Go есть по крайней мере одна горутина: основная горутина. Он создается автоматически при запуске программы.

На самом деле горутина использует модель fork-join.

sayHello := func() {
	fmt.Println("hello")
}

go sayHello()

Итак, как мы присоединяемся к горутинам? Необходимо ввести операцию ожидания:

var wg sync.WaitGroup()
sayHello := func() {
	defer wg.Done()
	fmt.Println("hello")
}

wg.Add(1)
go sayHello()
wa.Wait()

Channel

читать и писать каналы

Goroutine Базовая единица планирования в GO, а каналы связи - это механизм между ними. Оператор

// 创建一个双向channel
ch := make(chan interface{})

interface{} означает, что chan может быть любого типа

Канал имеет две основные операции: отправку и получение. Операции отправки и получения используют оператор

// 发送操作
ch <- x 
// 接收操作
x = <-ch 
// 忽略接收到的值,合法
<-ch     

Мы не можем ошибиться в направлении канала:

writeStream := make(chan<- interface{})
readStream := make(<-chan interface{})

<-writeStream
readStream <- struct{}{}

Приведенный выше оператор вызывает следующую ошибку:

invalid operation: <-writeStream (receive from send-only type chan<- interface {}) invalid operation: readStream <- struct {} literal (send to receive-only type <-chan interface {})

закрыть канал

Канал поддерживает операцию закрытия, которая используется для закрытия канала, и любые последующие операции отправки в канале приведут кС исключением паники.对一个已经被close过的channel进行接收操作依然可以接受到之前已经成功发送的数据;如果channel中已经没有数据的话将产生一个零值的数据。

Читать с закрытого канала:

intStream := make(chan int) 
close(intStream)
integer, ok := <- intStream
fmt.Pritf("(%v): %v", ok, integer)
// (false): 0

В приведенном выше примере возвращаемое значение ok используется для определения того, закрыт ли канал Мы также можем обрабатывать закрытый канал более элегантным способом через диапазон:

intStream := make(chan int) 
go func() {
	defer close(intStream) 
	for i:=1; i<=5; i++{ 
		intStream <- i 
	}
}()

for integer := range intStream { 
	fmt.Printf("%v ", integer)
}
// 1 2 3 4 5

буферизованный канал

Создает буферизованный канал, который может содержать три строковых элемента:

ch = make(chan string, 3)

Мы можем непрерывно отправлять три значения во вновь созданный канал без блокировки:

ch <- "A"
ch <- "B"
ch <- "C"

В этот момент внутренняя буферная очередь канала будет заполнена, и четвертая операция отправки будет заблокирована.

Если мы получим значение:

fmt.Println(<-ch) // "A"

Тогда буферная очередь канала не будет ни полной, ни пустой, поэтому ни операции отправки, ни операции приема, выполняемые на канале, не будут блокироваться. Таким образом, буфер очереди канала разделяет горутины приема и отправки.

Буферизованные каналы можно использовать как семафоры, например, для ограничения пропускной способности. В этом случае входящий запрос передается дескриптору, который получает значение из канала, обрабатывает запрос и отправляет значение обратно в канал, чтобы «семафор» был готов к следующему запросу. Емкость буфера канала определяет верхний предел количества одновременных обращений к процессу.

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1 // 等待活动队列清空。
    process(r)  // 可能需要很长时间。
    <-sem    // 完成;使下一个请求可以运行。
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // 无需等待 handle 结束。
    }
}

Однако у него есть проблема с дизайном: Serve создает новую горутину для каждого входящего запроса, хотя одновременно могут выполняться только горутины MaxOutstanding. В результате программа может бесконечно потреблять ресурсы, если запросы поступают быстро. Чтобы компенсировать это, мы можем ограничить создание процессов Go, изменив Serve, что является очевидным решением, но остерегайтесь ошибок, которые появляются после того, как мы их исправим.

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func() {
            process(req) // 这儿有 Bug,解释见下。
            <-sem
        }()
    }
}

Ошибка в цикле for в Go, переменная цикла повторно используется на каждой итерации, поэтому переменная req является общей для всех горутин, а это не то, что нам нужно. Нам нужно убедиться, что req уникален для каждой горутины. Один из способов сделать это — передать значение req в качестве аргумента для закрытия горутины:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func(req *Request) {
            process(req)
            <-sem
        }(req)
    }
}

Другое решение — создать новую переменную с таким же именем, как в примере:

func Serve(queue chan *Request) {
    for req := range queue {
        req := req // 为该 Go 程创建 req 的新实例。
        sem <- 1
        go func() {
            process(req)
            <-sem
        }()
    }
}

Давайте посмотрим на другой пример языковой библии. Он одновременно делает запросы на три зеркальных места, которые географически рассеиваются. Они соответственно отправляют полученные ответы на буферизованный канал, и, наконец, приемник принимает только первый полученный ответ, который является самым быстрым ответом. Таким образом, функция MirroredQuery может быть возвращена результатами до двух других медленных зеркальных сайтов.

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    // 仅仅返回最快的那个response
    return <-responses 
}

func request(hostname string) (response string) { /* ... */ }

Если мы используем небуферизованные каналы, то две медленные горутины будут убиты, потому что никто не получаетзастрял навсегда. Эта ситуация называетсяутечка горутин, это будет ошибка. В отличие от мусорных переменных, утекшие горутины не собираются автоматически, поэтому важно убедиться, что каждая горутина, которая больше не нужна, завершается корректно.

Channels of channels

Наиболее важной особенностью Go является то, что каналы являются первоклассными значениями, которые можно назначать и передавать, как и любое другое значение. Эта функция часто используется для реализации безопасного параллельного демультиплексирования.

Мы можем использовать эту функцию для реализации простого RPC. Ниже приведено приблизительное определение типа запроса.

type Request struct {
    args        []int
    f           func([]int) int
    resultChan  chan int
}

Клиент предоставляет функцию и ее аргументы, кроме того есть объект запроса в ответ на полученный канал.

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// 发送请求
clientRequests <- request
// 等待回应
fmt.Printf("answer: %d\n", <-request.resultChan)

Функция обработчика на стороне сервера:

func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}

Channels pipeline

Каналы также можно использовать для соединения нескольких горутин вместе, при этом выход одного канала является входом следующего канала. Эта серия Каналов представляет собой так называемый пайплайн (pipeline). Следующая программа объединяет три горутины, используя два канала:

Первая горутина — это счетчик, который генерирует последовательность целых чисел в виде 0, 1, 2, ..., а затем по каналу отправляет последовательность целых чисел второй горутине; вторая горутина — программа для возведения в квадрат, исправление Возведение в квадрат каждого полученного целого числа, а затем отправка результата возведения в квадрат третьей горутине через второй канал; третья горутина — это программа печати, которая печатает каждое полученное целое число.

func counter(out chan<- int) {
	for x := 0; x < 100; x++ {
		out <- x
	}
	close(out)
}

func squarer(out chan<- int, in <-chan int) {
	for v := range in {
		out <- v * v
	}
	close(out)
}

func printer(in <-chan int) {
	for v := range in {
		fmt.Println(v)
	}
}

func main() {
	naturals := make(chan int)
	squares := make(chan int)

	go counter(naturals)
	go squarer(squares, naturals)
	printer(squares)
}

Выберите мультиплексирование

select используется для выбора одного из набора возможных сообщений для дальнейшей обработки. Если какая-либо коммуникация может быть дополнительно обработана, она выбирается случайным образом и выполняется соответствующий оператор. В противном случае, если нет значения по умолчанию, оператор select будет блокироваться до тех пор, пока не завершится одно из взаимодействий.

select {
case <-ch1:
    // ...
case x := <-ch2:
    // ...use x...
case ch3 <- y:
    // ...
default:
    // ...
}

Как использовать оператор select, чтобы установить ограничение по времени для операции. Код выводит значение переменной news или сообщение об истечении времени ожидания, в зависимости от того, какой из двух операторов приема выполняется первым:

select {
case news := <-NewsAgency:
    fmt.Println(news)
case <-time.After(time.Minute):
    fmt.Println("Time out: no news in one minute.")
}

Следующий оператор select получит значение из канала прерывания, если он имеет значение; если он не имеет значения, он ничего не сделает. Это неблокирующая операция приема, повторение которой называется «опросом канала».

select {
case <-abort:
    fmt.Printf("Launch aborted!\n")
    return
default:
    // do nothing
}

Использованная литература.

  1. Concurrency in Go
  2. gopl
  3. Effective Go
Категории