Урок 12 "Учись быстро" - Каналы

Java задняя часть Go контейнер
Урок 12 "Учись быстро" - Каналы

Существует два способа взаимодействия между разными параллельными сопрограммами: один — через общие переменные, а другой — через очереди. Язык Go поощряет использование очередей для связи и настраивает специальный синтаксис для обмена данными очереди между сопрограммами — каналами.

Каналы — это вход и выход сопрограммы. Как результат сопрограммы, канал представляет собой контейнер, содержащий данные. В качестве входных данных для сопрограммы канал является производителем, который предоставляет данные для сопрограммы. Как контейнер, канал имеет ограниченный размер: когда он заполнен, в него нельзя записать, а когда он пуст, его нельзя прочитать. Канал также имеет свой собственный тип, который может определять тип данных, поступающих в канал.

图片

Создать канал

Существует только один синтаксис для создания канала, который представляет собой глобальную функцию make, которая предоставляет первый параметр типа для определения типа данных, которые может содержать канал, а затем предоставляет второй целочисленный параметр в качестве размера контейнера канала. Параметр размера является необязательным. Если он не заполнен, пропускная способность этого канала равна нулю, который называется "небуферизованный канал". Небуферизованный канал должен гарантировать, что сопрограмма пытается прочитать текущий канал, иначе операция записи будет block Пока не появятся другие сопрограммы для чтения из канала. Небуферизованные каналы всегда заполнены и пусты. Соответствующий канал с ограниченным размером является буферным каналом. В языке Go нет неограниченных каналов, каждый канал имеет ограниченную максимальную пропускную способность.

// 缓冲型通道,里面只能放整数
var bufferedChannel = make(chan int, 1024)
// 非缓冲型通道
var unbufferedChannel = make(chan int)

читать и писать канал

Язык Go разработал специальный синтаксический сахар стрелки

package main

import "fmt"

func main() {
	var ch chan int = make(chan int, 4)
	for i:=0; i<cap(ch); i++ {
		ch <- i   // 写通道
	}
	for len(ch) > 0 {
		var value int = <- ch  // 读通道
		fmt.Println(value)
	}
}

Канал используется как контейнер, его можно использовать как слайс, используя глобальные функции cap() и len() для получения пропускной способности канала и текущего количества элементов в нем. Каналы обычно используются в качестве среды для связи между различными сопрограммами, и они также могут использоваться в одной и той же сопрограмме.

блокировка чтения и записи

Когда канал заполнен, операция записи будет заблокирована, сопрограмма перейдет в спящий режим и не будет разбужена до тех пор, пока другая сопрограмма не прочитает канал из свободного места. Если операции записи нескольких сопрограмм заблокированы, операция чтения активирует только одну сопрограмму.

Когда канал пуст, операция чтения будет заблокирована, а сопрограмма перейдет в спящий режим и не будет разбужена до тех пор, пока другая сопрограмма не напишет в канал и не загрузит данные. Если операции чтения нескольких сопрограмм заблокированы, операция записи активирует только одну сопрограмму.

package main

import "fmt"
import "time"
import "math/rand"

func send(ch chan int) {
	for {
		var value = rand.Intn(100)
		ch <- value
		fmt.Printf("send %d\n", value)
	}
}

func recv(ch chan int) {
	for {
		value := <- ch
		fmt.Printf("recv %d\n", value)
		time.Sleep(time.Second)
	}
}

func main() {
	var ch = make(chan int, 1)
	// 子协程循环读
	go recv(ch)
	// 主协程循环写
	send(ch)
}

--------
send 81
send 87
recv 81
recv 87
send 47
recv 47
send 59

закрыть канал

Каналы в языке Go немного похожи на файлы, которые не только поддерживают операции чтения и записи, но и поддерживают закрытие. Чтение закрытого канала немедленно возвращает «нулевое значение» типа канала, а запись в закрытый канал вызывает исключение. Если элементы в канале являются целыми числами, операции чтения не могут использовать возвращаемое значение, чтобы определить, закрыт ли канал.

package main

import "fmt"

func main() {
	var ch = make(chan int, 4)
	ch <- 1
	ch <- 2
	close(ch)

	value := <- ch
	fmt.Println(value)
	value = <- ch
	fmt.Println(value)
	value = <- ch
	fmt.Println(value)
}

-------
1
2
0

В настоящее время необходимо ввести новую точку знаний - использование сахара синтаксиса диапазона для обхода каналов.

Мы много раз видели синтаксис for range, он универсален, помимо обхода массивов, срезов, словарей, он также может проходить по каналам, заменяя оператор стрелки. Когда канал будет пуст, цикл приостановит блокировку, а когда канал будет закрыт, блокировка прекратится и цикл завершится. Когда цикл завершен, мы знаем, что канал закрыт.

package main

import "fmt"

func main() {
	var ch = make(chan int, 4)
	ch <- 1
	ch <- 2
	close(ch)

 // for range 遍历通道
	for value := range ch {
		fmt.Println(value)
	}
}

------
1
2

Если канал явно не закрыт, когда он больше не используется программой, он будет автоматически закрыт и собран мусор. Однако элегантные программы должны рассматривать каналы как ресурсы, и хорошей практикой является явное закрытие каждого ресурса, который больше не используется.

безопасность записи канала

Выше упоминалось, что выполнение операции записи в канал, который был закрыт, вызовет исключение, а это означает, что мы должны убедиться, что канал не закрыт при записи в канал.

package main

import "fmt"

func send(ch chan int) {
	i := 0
	for {
		i++
		ch <- i
	}
}

func recv(ch chan int) {
	value := <- ch
	fmt.Println(value)
	value = <- ch
	fmt.Println(value)
	close(ch)
}

func main() {
	var ch = make(chan int, 4)
	go recv(ch)
	send(ch)
}

---------
1
2
panic: send on closed channel

goroutine 1 [running]:
main.send(0xc42008a000)
	/Users/qianwp/go/src/github.com/pyloque/practice/main.go:9 +0x44
main.main()
	/Users/qianwp/go/src/github.com/pyloque/practice/main.go:24 +0x66
exit status 2

Как это обеспечить? В Go нет встроенной функции, которая может определить, закрыт ли канал. Даже если такая функция существует, когда вы считаете, что канал не закрыт, это не значит, что его нельзя закрывать при записи данных в канал, в параллельной среде она может быть закрыта другими сопрограммами в любой момент .

Лучший способ обеспечить безопасность записи в канал — это чтобы сопрограмма, отвечающая за запись в канал, закрывала сам канал, а сопрограмма, читающая канал, не должна закрывать канал.

package main

import "fmt"

func send(ch chan int) {
 ch <- 1
 ch <- 2
 ch <- 3
 ch <- 4
 close(ch)
}

func recv(ch chan int) {
 for v := range ch {
  fmt.Println(v)
 }
}

func main() {
 var ch = make(chan int, 1)
 go send(ch)
 recv(ch)
}

-----------
1
2
3
4

Этот метод действительно может решить сценарий с однократным письмом и множественным чтением, но что нам делать, если мы столкнемся с ситуациями многократного письма и однократного чтения? Любая сопрограмма, которая читает и записывает канал, не может закрыть канал по своему желанию, иначе это заставит другие сопрограммы писать в канал и вызывать исключение. В это время другим несвязанным сопрограммам должно быть разрешено это делать.Эта сопрограмма должна дождаться завершения выполнения всех сопрограмм канала записи, прежде чем закрыть канал. Как другие сопрограммы могут узнать, что все каналы записи завершили работу? Для этого требуется использовать объект WaitGroup, предоставляемый встроенным пакетом синхронизации, который использует счетчик для ожидания завершения указанного события.

package main

import "fmt"
import "time"
import "sync"

func send(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done() // 计数值减一
	i := 0
	for i < 4 {
		i++
		ch <- i
	}
}

func recv(ch chan int) {
	for v := range ch {
		fmt.Println(v)
	}
}

func main() {
	var ch = make(chan int, 4)
	var wg = new(sync.WaitGroup)
	wg.Add(2) // 增加计数值
	go send(ch, wg)  // 写
	go send(ch, wg)  // 写
	go recv(ch)
	// Wait() 阻塞等待所有的写通道协程结束
 // 待计数值变成零,Wait() 才会返回
 wg.Wait()
	// 关闭通道
 close(ch)
 time.Sleep(time.Second)
}

---------
1
2
3
4
1
2
3
4

несколько каналов

В реальном мире также существует сценарий передачи сообщений, то есть потребители имеют несколько источников потребления.Пока один источник производит данные, потребители могут считывать данные для потребления. В это время данные нескольких исходных каналов могут быть объединены в целевой канал, а затем использованы в целевом канале.

package main

import "fmt"
import "time"

// 每隔一会生产一个数
func send(ch chan int, gap time.Duration) {
	i := 0
	for {
		i++
		ch <- i
		time.Sleep(gap)
	}
}

// 将多个原通道内容拷贝到单一的目标通道
func collect(source chan int, target chan int) {
	for v := range source {
		target <- v
	}
}

// 从目标通道消费数据
func recv(ch chan int) {
	for v := range ch {
		fmt.Printf("receive %d\n", v)
	}
}


func main() {
	var ch1 = make(chan int)
	var ch2 = make(chan int)
	var ch3 = make(chan int)
	go send(ch1, time.Second)
	go send(ch2, 2 * time.Second)
	go collect(ch1, ch3)
	go collect(ch2, ch3)
	recv(ch3)
}

---------
receive 1
receive 1
receive 2
receive 2
receive 3
receive 4
receive 3
receive 5
receive 6
receive 4
receive 7
receive 8
receive 5
receive 9
....

Однако приведенная выше форма громоздка и требует отдельной сопрограммы агрегации для каждого источника потребления. Язык Go привносит синтаксический сахар "мультиплексирования" в этот сценарий использования, который является оператором select, который будет обсуждаться ниже. Он может управлять несколькими каналами для одновременного чтения и записи. Если все каналы не могут читать и записывать, он блокируется как целом., это будет продолжаться до тех пор, пока есть один канал для чтения и записи. Ниже мы используем оператор select для упрощения приведенной выше логики.

package main

import "fmt"
import "time"

func send(ch chan int, gap time.Duration) {
	i := 0
	for {
		i++
		ch <- i
		time.Sleep(gap)
	}
}

func recv(ch1 chan int, ch2 chan int) {
	for {
		select {
			case v := <- ch1:
				fmt.Printf("recv %d from ch1\n", v)
			case v := <- ch2:
				fmt.Printf("recv %d from ch2\n", v)
		}
	}
}

func main() {
	var ch1 = make(chan int)
	var ch2 = make(chan int)
	go send(ch1, time.Second)
	go send(ch2, 2 * time.Second)
	recv(ch1, ch2)
}

------------
recv 1 from ch2
recv 1 from ch1
recv 2 from ch1
recv 3 from ch1
recv 2 from ch2
recv 4 from ch1
recv 3 from ch2
recv 5 from ch1

Выше показана форма канала чтения мультиплексированного оператора select, а ниже показана форма канала записи.Пока существует канал, который может записывать в него, это нарушит блокировку.

select {
  case ch1 <- v:
      fmt.Println("send to ch1")
  case ch2 <- v:
      fmt.Println("send to ch2")
}

неблокирующее чтение и запись

Чтение и запись, о которых мы говорили ранее, блокируют чтение и запись, а язык Go также обеспечивает неблокирующее чтение и запись каналов. Операции чтения не блокируются, когда канал пуст, а операции записи не блокируются, когда канал заполнен. Неблокирующие операции чтения и записи полагаются на ветвь оператора select по умолчанию. Когда все каналы оператора select недоступны для чтения и записи, если определена ветвь по умолчанию, будет выполняться логика ветвления по умолчанию, что приводит к неблокировке. Ниже мы демонстрируем сценарий с одним производителем и несколькими потребителями. Производитель записывает данные в два канала одновременно и отбрасывает их, если они не могут быть записаны.

package main

import "fmt"
import "time"

func send(ch1 chan int, ch2 chan int) {
	i := 0
	for {
		i++
		select {
			case ch1 <- i:
				fmt.Printf("send ch1 %d\n", i)
			case ch2 <- i:
				fmt.Printf("send ch2 %d\n", i)
			default:
		}
	}
}

func recv(ch chan int, gap time.Duration, name string) {
	for v := range ch {
		fmt.Printf("receive %s %d\n", name, v)
		time.Sleep(gap)
	}
}

func main() {
        // 无缓冲通道
	var ch1 = make(chan int)
	var ch2 = make(chan int)
	// 两个消费者的休眠时间不一样,名称不一样
	go recv(ch1, time.Second, "ch1")
	go recv(ch2, 2 * time.Second, "ch2")
	send(ch1, ch2)
}

------------
send ch1 27
send ch2 28
receive ch1 27
receive ch2 28
send ch1 6708984
receive ch1 6708984
send ch2 13347544
send ch1 13347775
receive ch2 13347544
receive ch1 13347775
send ch1 20101642
receive ch1 20101642
send ch2 26775795
receive ch2 26775795
...

Из вывода видно, что большое количество данных отбрасывается, а данные, считываемые потребителями, являются прерывистыми. Если вы убьете ветку по умолчанию в операторе select и запустите ее снова, результат будет следующим

send ch2 1
send ch1 2
receive ch1 2
receive ch2 1
receive ch1 3
send ch1 3
receive ch2 4
send ch2 4
send ch1 5
receive ch1 5
receive ch1 6
send ch1 6
receive ch1 7

Можно видеть, что данные, считываемые потребителями, непрерывны, но каждые данные предоставляются только одному потребителю. Ветвь оператора select по умолчанию очень важна, она является ключом к определению того, заблокированы ли операции чтения и записи канала или нет.

В Java также есть каналы

На других языках каналы выражаются в виде очередей.В языке Java буферизованные каналы встроены в java.util.concurrent.ArrayBlockingQueue для одновременных пакетов, а небуферизованные каналы также встроены в java.util.concurrent.SynchronousQueue для параллельных пакеты. Внутренняя форма реализации ArrayBlockingQueue представляет собой массив, и многопоточное чтение и запись должны использовать блокировки для управления одновременным доступом. Однако, подобно эффекту мультиплексирования, обеспечиваемому языком Go, язык Java не имеет встроенной реализации.

Внутренняя структура канала

Внутренняя структура канала в языке Go представляет собой кольцевой массив, который управляет отправкой и получением элементов путем чтения и записи смещений. Чтобы обеспечить потокобезопасность, внутри будет глобальная блокировка для контроля параллелизма. Для операций отправки и получения существует очередь для хранения блокирующей сопрограммы.

图片

type hchan struct {
  qcount uint  // 通道有效元素个数
  dataqsize uint   // 通道容量,循环数组总长度
  buf unsafe.Pointer // 数组地址
  elemsize uint16 // 内部元素的大小
  closed uint32 // 是否已关闭 0或者1
  elemtype *_type // 内部元素类型信息
  sendx uint // 循环数组的写偏移量
  recvx uint // 循环数组的读偏移量
  recvq waitq // 阻塞在读操作上的协程队列
  sendq waitq // 阻塞在写操作上的协程队列
  
  lock mutex // 全局锁
}

Эта круговая очередь точно такая же, как структура ArrayBlockingQueue, встроенная в язык Java. Из этой структуры данных мы также можем сделать вывод, что очередь по существу реализована с использованием общих переменных для блокировки, а общие переменные являются сущностью параллельного взаимодействия.

class ArrayBlockingQueue extends AbstractQueue {
  Object[] items;
  int takeIndex;
  int putIndex;
  int count;
  ReentrantLock lock;
  ...
}

Поэтому читатели не должны думать, что канал языка Go волшебный.Язык Go просто разработал набор синтаксического сахара, который легко использовать для канала, делая эту структуру данных доступной. Его внутренняя реализация аналогична параллельным очередям в других языках.

Прочитайте больше глав «Быстро изучите язык Go», нажмите и удерживайте изображение, чтобы определить QR-код, и подпишитесь на общедоступную учетную запись «Code Cave».