предисловие
В Golang есть два основных инструмента параллельного программирования, а именно:channel
иgoroutine
, в этой статье поговорим оchannel
. Любой, кто знаком с Golang, знает известную поговорку: «Используйте общение, чтобы делиться памятью, а не делиться памятью, чтобы общаться». Это предложение имеет два значения, язык Go действительно находится вsync
Традиционный запорный механизм предусмотрен в упаковке, но более рекомендуетсяchannel
для решения проблемы параллелизма. Эта статья начнется сchannel
использование,channel
Принцип двухчастной парыchannel
Сделать более глубокое исследование.
использование канала
что такое канал
В прямом смысле,channel
Смысл наверное в смысле пайплайна.channel
это безопасная очередь сообщений, используемая сопрограммами go для получения или отправки сообщений,channel
Это похоже на канал между двумя сопрограммами go для синхронизации различных ресурсов. Это можно проиллюстрировать следующей схемой:
channel
Использование простое:
func main() {
ch := make(chan int, 1) // 创建一个类型为int,缓冲区大小为1的channel
ch <- 2 // 将2发送到ch
n, ok := <- ch // n接收从ch发出的值
if ok {
fmt.Println(n) // 2
}
close(ch) // 关闭channel
}
использоватьchannel
Следует отметить несколько моментов:
- к
nil
channel
Отправка сообщения всегда блокируется; - к закрытому
channel
Отправка сообщения вызовет панику во время выполнения(panic)
; -
channel
После закрытия вы не можете продолжатьchannel
отправить сообщение, но можно продолжить сchannel
получать сообщения; - когда
channel
При закрытии и пустом буфере продолжить сchannel
При получении сообщения будет получено нулевое значение соответствующего типа.
Небуферизованные каналы против буферизованных каналов
Unbuffered channels
означает, что размер буфера равен 0channel
, этоchannel
Получатель блокируется, пока сообщение не будет получено, а отправитель блокируется, пока получатель не получит сообщение.Этот механизм можно использовать для двух целей.goroutine
выполнять синхронизацию состояний;Buffered channels
Имея буфер, когда буфер заполнен, отправитель блокируется, когда буфер пуст, блокируется получатель.
ЦитироватьThe Nature Of Channels In GoДве фигуры вUnbuffered channels
иBuffered channels
, очень ярко, читатели могут испытать это на себе:
Unbuffered channels
:
Buffered channels
:
Обход каналов
for range
channel
служба поддержкиfor range
Итерации в пути:
package main
import "fmt"
func main() {
ci := make(chan int, 5)
for i := 1; i <= 5; i++ {
ci <- i
}
close(ci)
for i := range ci {
fmt.Println(i)
}
}
Стоит отметить, что при обходе, еслиchannel
Если он не закрыт, он будет ждать и появитсяdeadlock
ошибка; если при обходеchannel
Был закрыт, а затем автоматически выйти из обхода после обхода данных. Это,for range
Метод обхода является блокирующим методом обхода.
for select
select
Может обрабатывать неблокирующую отправку, получение и мультиплексирование сообщений.
package main
import "fmt"
func main() {
ci := make(chan int, 2)
for i := 1; i <= 2; i++ {
ci <- i
}
close(ci)
cs := make(chan string, 2)
cs <- "hi"
cs <- "golang"
close(cs)
ciClosed, csClosed := false, false
for {
if ciClosed && csClosed {
return
}
select {
case i, ok := <-ci:
if ok {
fmt.Println(i)
} else {
ciClosed = true
fmt.Println("ci closed")
}
case s, ok := <-cs:
if ok {
fmt.Println(s)
} else {
csClosed = true
fmt.Println("cs closed")
}
default:
fmt.Println("waiting...")
}
}
}
select
имеютcase
кодовый блок дляchannel
Отправлять или получать сообщения, либоcase
Когда блок кода готов, выполняется соответствующее ему содержимое;case
Когда блок кода будет готов, выберите его наугад.case
кодовый блок и выполнить; всеcase
Если ни один блок кода не готов, подождите; также может бытьdefault
кодовый блок, всеcase
Выполняется, когда ни один блок кода не готовdefault
кодовый блок.
принцип канала
Сначала вставьтеchannel
изАдрес источника, читатель может проверить это.
структура данных
Первый взглядchannel
структура:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
// channel中元素大小
elemsize uint16
// 是否已关闭
closed uint32
// channel中元素类型
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
channel
Буфер на самом деле представляет собой циклическую очередь,qcount
представляет количество элементов в очереди,dataqsiz
представляет собой общий размер кольцевой очереди,buf
представляет указатель на круговой массив;sendx
иrecvx
Используется для определения положения отправленных и полученных в данный момент элементов в циклической очереди соответственно;recvq
иsendq
Оба являются списком, соответственно используемым для хранения в настоящее время ожидающих получения и ожидающих отправки.Goroutine
.
посмотри сноваwaitq
Структура данных:
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
// 当前goroutine
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
вsudog
Указывает список ожиданияGoroutine
Инкапсуляция, которая содержит некоторую контекстуальную информацию,first
иlast
соответственно указать на начало списка ожиданияGoroutine
.
Компиляция анализа
в анализеchannel
Перед принципом мы сначала используемgo tool
Проанализируйте следующий код, чтобы увидетьchannel
Какие методы времени выполнения вызываются под капотом для различных операций:
ch := make(chan int, 2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok {
fmt.Println(n)
}
close(ch)
компилировать
go build test.go
go tool objdump -s "main\.main" test | grep CALL
ПучокCALL
Отфильтровано:
test.go:118 0x1092f55 e81612f7ff CALL runtime.makechan(SB)
test.go:119 0x1092f74 e82714f7ff CALL runtime.chansend1(SB)
test.go:120 0x1092f8e e80d14f7ff CALL runtime.chansend1(SB)
test.go:121 0x1092fa5 e8361ff7ff CALL runtime.chanrecv1(SB)
test.go:122 0x1092fbd e85e1ff7ff CALL runtime.chanrecv2(SB)
test.go:126 0x1092fd7 e8841cf7ff CALL runtime.closechan(SB)
test.go:124 0x1092fea e8b156f7ff CALL runtime.convT64(SB)
print.go:275 0x1093041 e88a98ffff CALL fmt.Fprintln(SB)
test.go:47 0x1093055 e896c1fbff CALL runtime.morestack_noctxt(SB)
Создайте
Из приведенного выше анализа компиляции видно, что созданиеchannel
Когда вызывается метод времени выполненияmakechan
:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 计算缓冲区需要的总大小(缓冲区大小*元素大小),并判断是否超出最大可分配范围
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 缓冲区大小为0,或者channel中元素大小为0(struct{}{})时,只需分配channel必需的空间即可
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// 通过位运算知道channel中元素类型不是指针,分配一片连续内存空间,所需空间等于 缓冲区数组空间 + hchan必需的空间。
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素中包含指针,为hchan和缓冲区分别分配空间
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
makechan
Логика кода относительно проста, сначала проверьте тип элемента и размер буферного пространства, а затем создайтеhchan
, выделить необходимое пространство. Здесь возможны три случая: когда размер буфера равен 0 илиchannel
Когда размер элемента равен 0, просто выделитеchannel
необходимое пространство; когдаchannel
Когда тип элемента не является указателем, он должен быть толькоhchan
Выделите часть непрерывного пространства памяти с буфером, размер пространства равен пространству буферного массива плюсhchan
Необходимое место; по умолчанию буфер содержит указатели, его необходимоhchan
и буферная память выделяются отдельно. последнее обновлениеhchan
других областях, в том числеelemsize
,elemtype
,dataqsiz
.
Отправить
channel
операция отправки вызывает метод времени выполненияchansend1
, существуетchansend1
Внутренний вызов сноваchansend
, смотрите прямо наchansend
Реализация:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// channel为nil
if c == nil {
// 如果是非阻塞,直接返回发送不成功
if !block {
return false
}
// 否则,当前Goroutine阻塞挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 对于非阻塞且channel未关闭,如果无缓冲区且没有等待接收的Goroutine,或者有缓冲区且缓冲区已满,那么都直接返回发送不成功
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// 如果channel已关闭
if c.closed != 0 {
// 解锁,直接panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 除了以上情况,当channel未关闭时,就有以下几种情况:
// 1、当存在等待接收的Goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 那么直接把正在发送的值发送给等待接收的Goroutine
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 2、当缓冲区未满时
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 获取指向缓冲区数组中位于sendx位置的元素的指针
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 将当前发送的值拷贝到缓冲区
typedmemmove(c.elemtype, qp, ep)
// sendx索引加一
c.sendx++
// 因为是循环队列,sendx等于队列长度时置为0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 队列中元素总数加一,并解锁,返回发送成功
c.qcount++
unlock(&c.lock)
return true
}
// 3、当既没有等待接收的Goroutine,缓冲区也没有剩余空间,如果是非阻塞的发送,那么直接解锁,返回发送失败
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
// 4、如果是阻塞发送,那么就将当前的Goroutine打包成一个sudog结构体,并加入到channel的发送队列sendq里
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 调用goparkunlock将当前Goroutine设置为等待状态并解锁,进入休眠等待被唤醒
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// 被唤醒之后执行清理工作并释放sudog结构体
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
chansend
Логика выполнения , комментарий выше написан очень понятно, давайте разберемся. Для неблокирующей отправки илиchannel
Логика обработки относительно проста для нескольких случаев отказа отправки при закрытом условии, и читатели могут обратиться к комментариям; здесь мы сосредоточимся наchannel
Несколько общих ситуаций, когда он не закрыт:
Горутины ждут приема
Если очередь ожидает приемаrecvq
существуют вGoroutine
, затем напрямую отправьте отправляемое значение получателю, ожидающему полученияGoroutine
. Схематическая диаграмма выглядит следующим образом:
Посмотрите на это подробно
send
метод:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
// 将发送的值直接拷贝到接收值(比如v = <-ch 中的v)的内存地址
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// 获取等待接收数据的Goroutine
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒之前等待接收数据的Goroutine
goready(gp, skip+1)
}
Здесь необходимо пояснитьGoroutine
Несколько штатов во время процесса планирования:
_Gidle = iota // goroutine刚刚分配,还没有初始化
_Grunnable // goroutine处于运行队列中, 还没有运行,没有自己的栈
_Grunning // goroutine在运行中,拥有自己的栈,被分配了M(线程)和P(调度上下文)
_Gsyscall // goroutine在执行系统调用
_Gwaiting // goroutine被阻塞
_Gdead // goroutine没有被使用,可能是刚刚退出,或者正在初始化中
_Gcopystack // 表示g当前的栈正在被移除并分配新栈
при звонкеgoready
когда будетGoroutine
статус от_Gwaiting
установлен в_Grunnable
, дождитесь повторного выполнения следующего расписания.
Когда буфер не заполнен
Когда буфер не заполнен, найтиsendx
позиция массива буферов, на которую указывает, копирует значение, отправляемое в эту позицию, и увеличиваетsendx
Блокировка указателя и разблокировки, принципиальная схема выглядит следующим образом:
блокировка отправки
Если он блокирует отправку, то текущийGoroutine
упаковано в одинsudog
структуру и добавил кchannel
отправить очередьsendq
внутри. Схематическая диаграмма выглядит следующим образом:
тогда позвониgoparkunlock
поставить токGoroutine
Установить как_Gwaiting
состояния и разблокировки, войти в состояние блокировки и дождаться пробуждения (вызовgoready
); если планировщик разбудил его, выполнить работу по очистке и, наконец, освободить соответствующийsudog
структура.
перенимать
channel
Существует две формы приема:
<-ch
n, ok := <-ch
Эти два способа соответственно вызывают метод времени выполненияchanrecv1
иchanrecv2
:
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
Оба метода в конечном итоге будут вызыватьсяchanrecv
метод:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// channel为nil
if c == nil {
// 非阻塞直接返回(false, false)
if !block {
return
}
// 阻塞接收,则当前Goroutine阻塞挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 非阻塞模式,对于以下两种情况:
// 1、无缓冲区且等待发送队列也为空
// 2、有缓冲区但缓冲区数组为空且channel未关闭
// 这两种情况都是接收失败, 直接返回(false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// 如果channel已关闭,并且缓冲区无元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
// 有等待接收的变量(即 v = <-ch中的v)
if ep != nil {
//根据channel元素的类型清理ep对应地址的内存,即ep接收了channel元素类型的零值
typedmemclr(c.elemtype, ep)
}
// 返回(true, false),即接收到值,但不是从channel中接收的有效值
return true, false
}
// 除了以上非常规情况,还有有以下几种常见情况:
// 1、等待发送的队列sendq里存在Goroutine,那么有两种情况:当前channel无缓冲区,或者当前channel已满
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 如果无缓冲区,那么直接从sender接收数据;否则,从buf队列的头部接收数据,并把sender的数据加到buf队列的尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
// 接收成功
return true, true
}
// 2、缓冲区buf中有元素
if c.qcount > 0 {
// Receive directly from queue
// 从recvx指向的位置获取元素
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
// 将从buf中取出的元素拷贝到当前协程
typedmemmove(c.elemtype, ep, qp)
}
// 同时将取出的数据所在的内存清空
typedmemclr(c.elemtype, qp)
// 接收索引 +1
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf元素总数 -1
c.qcount--
// 解锁,返回接收成功
unlock(&c.lock)
return true, true
}
// 3、非阻塞模式,且没有数据可以接受
if !block {
// 解锁,直接返回接收失败
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
// 4、阻塞模式,获取当前Goroutine,打包一个sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 加入到channel的等待接收队列recvq中
c.recvq.enqueue(mysg)
// 挂起当前Goroutine,设置为_Gwaiting状态并解锁,进入休眠等待被唤醒
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
// 被唤醒之后执行清理工作并释放sudog结构体
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
chanrecv
Логика обработки метода иchansend
Очень похоже, мы по-прежнему анализируем здесь только несколько распространенных ситуаций, а приведенные выше комментарии для других ситуаций также объясняются более четко, и читатели могут проверить соответствующий код и комментарии.
Есть горутины, ожидающие отправки
Если очередь ожидает отправкиsendq
в ожиданииGoroutine
, то возможны два случая: текущийchannel
нет буфера или в настоящее времяchannel
полный. отsendq
снять первый заблокированныйGoroutine
, а затем позвонитеrecv
метод:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
// 无缓冲区
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// 缓冲区已满
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将等待发送数据的Goroutine的状态从_Gwaiting置为 _Grunnable,等待下一次调度。
goready(gp, skip+1)
}
1. Если буфера нет, то напрямую изsender
Получить данные;
2. Если буфер заполнен, отbuf
Голова очереди получает данные и помещаетsender
Данные добавляются в конец очереди buf;
3, последний звонокgoready
Функция будет ждать отправки данныхGoroutine
статус от_Gwaiting
установлен в_Grunnable
, ждем следующего расписания.
Следующая диаграмма иллюстрирует, что происходит, когда буфер заполнен:
В буфере buf еще есть данные
если буферbuf
В нем еще есть элементы, далее переходим к обычному приему, отbuf
Извлеченные элементы копируются в целевой адрес памяти полученных данных текущей сопрограммы. Стоит отметить, что даже в это времяchannel
был закрыт и все еще может нормально загружаться из буфераbuf
получать данные. Эта ситуация относительно проста, и принципиальная схема не рисуется.
блокировка приема
Если он находится в режиме блокировки и нет данных для приема, то текущийGoroutine
упаковано в одинsudog
принять участие вchannel
очередь ожиданияrecvq
, электрический токGoroutine
статус установлен на_Gwaiting
, ждет, чтобы проснуться. Схематическая диаграмма выглядит следующим образом:
Если после текущегоGoroutine
Просыпайтесь по планировщику, выполняйте работу по очистке и, наконец, выпускайте соответствующийsudog
структура.
закрытие
После отправки и получения данных последний должен закрытьсяchannel
в настоящее время:
func closechan(c *hchan) {
// nil channel检查
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 已关闭的channel不能再次关闭
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 设置关闭状态为1
c.closed = 1
var glist glist
// release all readers
// 遍历recvq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 遍历sendq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
将glist中所有Goroutine的状态置为_Grunnable,等待调度器进行调度
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
1. Закрытьchannel
, он пройдетrecvq
иsendq
(на самом деле толькоrecvq
илиsendq
),выигратьsudog
приостановлено вGoroutine
принять участие вglist
список и очиститьsudog
Некоторая информация и статус на .
2, а затем пройтиglist
список, для каждогоGoroutine
перечислитьgoready
функция, всеGoroutine
установлен в_Grunnable
Статус, ожидание расписания.
3. КогдаGoroutine
После пробуждения он продолжит выполнениеchansend
иchanrecv
ток в функцииGoroutine
Оставшаяся логика после пробуждения.
Суммировать
Подводя итог, эта статья сначала проходитchannel
базовая пара использованияchannel
Определение и подробности использованияchannel
Основные операции, включая отправку, получение и закрытие, рассматриваются более подробно и подробно. Внимательные читатели также должны найтиchannel
Операция тесно связана с планированием сопрограмм, но эта статья посвященаgoroutine
Расписание является разовым, и время для последующих действий созрело для изучения этой части контента.