Golang — горутина (coroutine) и канал (pipe)

Go

сопрограмма (горутина)

协程(goroutine)Это часть параллельной обработки приложения в Go, которая может выполнять эффективные параллельные операции.

  • Сопрограммы легкие и дешевле, чем потоки. Его можно создать в памяти, используя 4 КБ стековой памяти.
  • Стек можно сегментировать для динамического увеличения или уменьшения использования памяти. Управление стеком будет автоматически освобождено после выхода сопрограммы.
  • Стек сопрограммы будет расширяться и сжиматься по мере необходимости, и переполнения стека не будет.

Использование сопрограмм

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("In main()")
	go longWait()
	go shortWait()
	fmt.Println("About to sleep in main()")

	//time.Sleep(4 * 1e9)
	time.Sleep(10 * 1e9)
	fmt.Println("At the end of main()")
}

func longWait() {
	fmt.Println("Beginning longWait()")
	time.Sleep(5 * 1e9)
	fmt.Println("End of longWait()")
}

func shortWait() {
	fmt.Println("Beginning shortWait()")
	time.Sleep(2 * 1e9)
	fmt.Println("End of shortWait()")
}

Используется в Гоgoключевое слово для запуска сопрограммы, гдеmainФункцию также можно рассматривать как сопрограмму.

Нетрудно понять, что вывод приведенного выше кода таков:

In main()
About to sleep in main()
Beginning shortWait()
Beginning longWait()
End of shortWait()
End of longWait()
At the end of main()

Но когда мы ставимmainВыход изменился, когда время сна было установлено на 4 с.

In main()
About to sleep in main()
Beginning shortWait()
Beginning longWait()
End of shortWait()
At the end of main()

программа не выводитEnd of longWait(), причина в том,longWait()иmain()Работая в разных сопрограммах, они асинхронны. То есть уже вlongWait()До конца main завершился, и, естественно, никаких выходных данных не видно.

канал

通道(channel)Это особый тип данных в Go, который может взаимодействовать между сопрограммами, отправляя через них типизированные данные, избегая проблем, связанных с разделением памяти.

Способ связи канала обеспечивает синхронизацию, и одновременно к данным может обращаться только одна сопрограмма, которая не появится数据竞争.

Возьмем в качестве примера конвейерную ленту фабрики. Машина размещает предмет (сопрограмма производителя), проходит через конвейерную ленту и достигает следующей машины, чтобы упаковать коробку (сопрограмма потребителя).

использование каналов

Прежде чем научиться использовать трубы, давайте посмотрим на «трагедию».

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("Reveal romantic feelings...")
	go sendLove()
	go responseLove()
	waitFor()
	fmt.Println("Leaving ☠️....")
}

func waitFor() {
	for i := 0; i < 5; i++ {
		fmt.Println("Keep waiting...")
		time.Sleep(1 * 1e9)
	}
}

func sendLove() {
	fmt.Println("Love you, mm ❤️")
}

func responseLove() {
	time.Sleep(6 * 1e9)
	fmt.Println("Love you, too")
}

Со знаниями, полученными выше, это нетрудно увидеть. . . действительно плохо

Reveal romantic feelings...
Love you, mm ❤️
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Leaving ☠️....

Минмин получил ответ от девушки, в которую он был влюблен, но подумал, что другая сторона не примет его чувства, и ушел в слезах. 【ТАТ】

Видно, сколько непонимания будет вызвано тем, что сопрограммы не общаются друг с другом. К счастью, у нас естьchannel, а теперь давайте вместе перепишем концовку истории~

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan string)
	var answer string

	fmt.Println("Reveal fomantic feelings...")
	go sendLove()
	go responseLove(ch)
	waitFor()
	answer = <-ch

	if answer != "" {
		fmt.Println(answer)
	} else {
		fmt.Println("Dead ☠️....")
	}

}

func waitFor() {
	for i := 0; i < 5; i++ {
		fmt.Println("Keep waiting...")
		time.Sleep(1 * 1e9)
	}
}

func sendLove() {
	fmt.Println("Love you, mm ❤️")
}

func responseLove(ch chan string) {
	time.Sleep(6 * 1e9)
	ch <- "Love you, too"
}

Результат:

Reveal fomantic feelings...
Love you, mm ❤️
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Love you, too

Все счастливы.

Здесь мы используемch := make(chan string)Создается конвейер строкового типа, конечно, мы также можем построить другие типы, такие какch := make(chan int)или даже конвейер функцийfuncChan := chan func().

Мы также используем оператора связи<-.

  • Поток в канал:ch <- content, используйте канал ch для отправки содержимого переменной.

  • Выход из канала:answer := <- chВ переменную Answer поступают данные из канала CH.

  • <- chМожет вызываться отдельно для получения следующего значения канала, текущее значение отбрасывается, но может использоваться для проверки, например:

    if <- ch != 100 {
        /* do something */
    }
    

блокировка канала

  • Для того же канала операция отправки не заканчивается, пока получатель не будет готов. Это означает, что если в небуферизованном канале нет места для приема данных, ввод новых входных данных невозможен, то есть отправитель блокируется.
  • Для того же канала операции получения блокируются до тех пор, пока отправитель не станет доступным. Если в канале нет данных, приемник останется заблокированным.

Вышеуказанные два свойства отражают无缓冲通道Функции:同一时间只允许至多一个数据存在于通道中.

Давайте почувствуем это на примере:

package main

import "fmt"

func main() {
	ch1 := make(chan int)
	go pump(ch1)
	fmt.Println(<-ch1)
}

func pump(ch chan int) {
	for i := 0; ; i++ {
		ch <- i
	}
}

Вывод программы:

0

здесьpump()функция называется生产者.

Разблокировать канал

package main

import "fmt"
import "time"

func main() {
	ch1 := make(chan int)
	go pump(ch1)
	go suck(ch1)
	time.Sleep(1e9)
}

func pump(ch chan int) {
	for i := 0; ; i++ {
		ch <- i
	}
}

func suck(ch chan int) {
	for {
		fmt.Println(<-ch)
	}
}

Здесь мы определяемsuckфункция, как接收者, и датьmainВремя работы сопрограммы составляет 1 с, поэтому генерируется выходная мощность [TAT] 70 Вт+.

тупик канала

Два сегмента канала блокируют друг друга, что приводит к тупиковой ситуации. Среда выполнения Go проверяет и паникует, останавливая программу. Небуферизованные каналы блокируются.

package main

import "fmt"

func main() {
	out := make(chan int)
	out <- 2
	go f1(out)
}

func f1(in chan int) {
	fmt.Println(<-in)
}
fatal error: all goroutines are asleep - deadlock!

видимо вout <- 2Когда получателя нет, основной поток блокируется.

Канал синхронизации

В дополнение к обычному небуферизованному каналу существует также специальный буферизованный канал——同步通道.

buf := 100
ch1 := make(chan string, buf)

buf— количество элементов, которые канал может удерживать одновременно, т.е.ch1размер буфера, вbufКанал не будет заблокирован, пока он не будет заполнен.

Если емкость превышает 0, канал асинхронный: до того, как буфер управления коммуникациями не заполнена полным или боковыми блокирующими элементами, будут получены в отправленном порядке.

Синхронизировать:ch := make(chan type, value)

  • значение == 0 --> синхронный, небуферизованный (блокирующий)
  • значение > 0 --> асинхронный, буферизованный (неблокирующий) зависит от элемента значения

Использование буферизации каналов может сделать программы более масштабируемыми.

В первую очередь старайтесь использовать небуферизованные каналы и используйте буферизацию только в неопределенных ситуациях.

package main

import "fmt"
import "time"

func main() {
	c := make(chan int, 50)
	go func() {
		time.Sleep(15 * 1e9)
		x := <-c
		fmt.Println("received", x)
	}()
	fmt.Println("sending", 10)
	c <- 10
	fmt.Println("send", 10)
}

Режим семафора

func compute(ch chan int) {
    ch <- someComputation()
}

func main() {
    ch := make(chan int)
    go compute(ch)
    doSomethingElaseForAWhile()
    result := <-ch
}

Корутины проходят через каналchВведите значение для обработки сигнала окончания.mainожидание потока<-chпока из него не будет получено значение.

Мы можем использовать его для обработки сортировки слайсов:

done := make(chan bool)

doSort := func(s []int) {
    sort(s)
    done <- true
}
i := pivot(s)
go doSort(s[:i])
go doSort(s[i:])
<-done
<-done

Семафор с буферизованным каналом

Общий механизм синхронизации для реализации блокировок взаимного исключения, когда семафоры используются для ограничения доступа к ресурсам и решения проблем чтения и записи.

  • Емкость буферизованного канала должна быть такой же, как емкость синхронизированного ресурса.
  • Длина канала (количество сохраняемых в данный момент элементов) совпадает с количеством используемых текущих ресурсов.
  • Емкость минус длина канала равняется количеству необработанных ресурсов.
//创建一个长度可变但容量为0的通道
type Empty interface {}
type semaphore chan Empty

Инициализировать семафор

sem = make(semaphore, N)

Работа с семафором и установка мьютекса

func (s semaphore) P (n int) {
    e := new(Empty)
    for i := 0; i < n; i++ {
        s <- e
    }
}

func (a semaphore) V (n int) {
    for i := 0; i < n; i++ {
        <- s
    }
}

/* mutexes */
func (s semaphore) Lock() {
	s.P(1)
}

func (s semaphore) Unlock(){
	s.V(1)
}

/* signal-wait */
func (s semaphore) Wait(n int) {
	s.P(n)
}

func (s semaphore) Signal() {
	s.V(1)
}

Шаблон фабрики каналов

Вместо того, чтобы передавать канал в качестве параметра, сгенерируйте канал внутри функции и верните его.

package main

import (
	"fmt"
	"time"
)

func main() {
	stream := pump()
	go suck(stream)
	time.Sleep(1e9)
}

func pump() chan int {
	ch := make(chan int)
	go func() {
		for i := 0; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func suck(ch chan int) {
	for {
		fmt.Println(<-ch)
	}
}

Каналы используются для петель

forЦикл отchЗначение продолжает извлекаться из канала до тех пор, пока канал не будет закрыт. (Это означает, что должна быть другая сопрограмма для записиch, и закрывается, когда запись завершена)

for v := range ch {
    fmt.Println("The value is", v)
}
package main

import (
	"fmt"
	"time"
)

func main() {
	suck(pump())
	time.Sleep(1e9)
}

func pump() chan int {
	ch := make(chan int)
	go func() {
		for i := 0; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func suck(ch chan int) {
	go func() {
		for v := range ch {
			fmt.Println(v)
		}
	}()
}

направление канала

Канал может означать, что он только отправляет или только принимает:

var send_only chan<- int    // channel can only send data
var recv_only <-chan int    // channel can only receive data

Канал только для приема (

Шаблоны каналов и селекторов

Возьмем классический пример筛法求素数научиться этому.

Основная идея этого алгоритма состоит в том, чтобы ввести筛法(алгоритм со временной сложностью O(x * ln(lnx))), сортирует заданное возвращенное положительное целое число от наибольшего к наименьшему, а затем отфильтровывает все непростые числа, тогда наименьшее из оставшихся чисел является простым число, затем удалите кратные этому числу и так далее.

Предположим, что набор положительных целых чисел от 1 до 30 отсортирован от наибольшего к наименьшему.

Первый проход удаляет не простое число 1, а затем наименьшее из оставшихся чисел равно 2.

Поскольку 2 — простое число, выньте его, а затем удалите все числа, кратные 2, так что останется число:

3 5 7 9 11 13 15 17 19 21 23 25 27 29

Среди оставшихся чисел 3 — наименьшее и простое число, выньте и удалите все числа, кратные 3, и повторяйте цикл, пока все числа не будут просеяны.

код показывает, как показано ниже:

// 一般写法
package main

import (
	"fmt"
)

func generate(ch chan int) {
	for i := 2; i < 100; i++ {
		ch <- i
	}
}

func filter(in, out chan int, prime int) {
	for {
		i := <-in
		if i%prime != 0 {
			out <- i
		}
	}
}

func main() {
	ch := make(chan int)
	go generate(ch)
	for {
		prime := <-ch
		fmt.Print(prime, " ")
		ch1 := make(chan int)
		go filter(ch, ch1, prime)
		ch = ch1
	}
}
// 习惯写法
package main

import (
	"fmt"
)

func generate() chan int {
	ch := make(chan int)
	go func() {
		for i := 2; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func filter(in chan int, prime int) chan int {
	out := make(chan int)
	go func() {
		for {
			if i := <-in; i%prime != 0 {
				out <- i
			}
		}
	}()
	return out
}

func sieve() chan int {
	out := make(chan int)
	go func() {
		ch := generate()
		for {
			prime := <-ch
			ch = filter(ch, prime)
			out <- prime
		}
	}()
	return out
}

func main() {
	primes := sieve()
	for {
		fmt.Println(<-primes)
	}
}