предисловие
В Golang есть два основных инструмента параллельного программирования, а именно:channelиgoroutine, в этой статье поговорим оchannel. Любой, кто знаком с Golang, знает известную поговорку: «Используйте общение, чтобы делиться памятью, а не делиться памятью, чтобы общаться». Это предложение имеет два значения, язык Go действительно находится вsyncТрадиционный запорный механизм предусмотрен в упаковке, но более рекомендуетсяchannelдля решения проблемы параллелизма. Эта статья начнется сchannelиспользование,channelПринцип двухчастной парыchannelСделать более глубокое исследование.
использование канала
что такое канал
В прямом смысле,channelСмысл наверное в смысле пайплайна.channelэто безопасная очередь сообщений, используемая сопрограммами go для получения или отправки сообщений,channelЭто похоже на канал между двумя сопрограммами go для синхронизации различных ресурсов. Это можно проиллюстрировать следующей схемой:
channelИспользование простое:
func main() {
ch := make(chan int, 1) // 创建一个类型为int,缓冲区大小为1的channel
ch <- 2 // 将2发送到ch
n, ok := <- ch // n接收从ch发出的值
if ok {
fmt.Println(n) // 2
}
close(ch) // 关闭channel
}
использоватьchannelСледует отметить несколько моментов:
- к
nilchannelОтправка сообщения всегда блокируется; - к закрытому
channelОтправка сообщения вызовет панику во время выполнения(panic); -
channelПосле закрытия вы не можете продолжатьchannelотправить сообщение, но можно продолжить сchannelполучать сообщения; - когда
channelПри закрытии и пустом буфере продолжить сchannelПри получении сообщения будет получено нулевое значение соответствующего типа.
Небуферизованные каналы против буферизованных каналов
Unbuffered channelsозначает, что размер буфера равен 0channel, этоchannelПолучатель блокируется, пока сообщение не будет получено, а отправитель блокируется, пока получатель не получит сообщение.Этот механизм можно использовать для двух целей.goroutineвыполнять синхронизацию состояний;Buffered channelsИмея буфер, когда буфер заполнен, отправитель блокируется, когда буфер пуст, блокируется получатель.
ЦитироватьThe Nature Of Channels In GoДве фигуры вUnbuffered channelsиBuffered channels, очень ярко, читатели могут испытать это на себе:
Unbuffered channels:
Buffered channels:
Обход каналов
for range
channelслужба поддержкиfor rangeИтерации в пути:
package main
import "fmt"
func main() {
ci := make(chan int, 5)
for i := 1; i <= 5; i++ {
ci <- i
}
close(ci)
for i := range ci {
fmt.Println(i)
}
}
Стоит отметить, что при обходе, еслиchannelЕсли он не закрыт, он будет ждать и появитсяdeadlockошибка; если при обходеchannelБыл закрыт, а затем автоматически выйти из обхода после обхода данных. Это,for rangeМетод обхода является блокирующим методом обхода.
for select
selectМожет обрабатывать неблокирующую отправку, получение и мультиплексирование сообщений.
package main
import "fmt"
func main() {
ci := make(chan int, 2)
for i := 1; i <= 2; i++ {
ci <- i
}
close(ci)
cs := make(chan string, 2)
cs <- "hi"
cs <- "golang"
close(cs)
ciClosed, csClosed := false, false
for {
if ciClosed && csClosed {
return
}
select {
case i, ok := <-ci:
if ok {
fmt.Println(i)
} else {
ciClosed = true
fmt.Println("ci closed")
}
case s, ok := <-cs:
if ok {
fmt.Println(s)
} else {
csClosed = true
fmt.Println("cs closed")
}
default:
fmt.Println("waiting...")
}
}
}
selectимеютcaseкодовый блок дляchannelОтправлять или получать сообщения, либоcaseКогда блок кода готов, выполняется соответствующее ему содержимое;caseКогда блок кода будет готов, выберите его наугад.caseкодовый блок и выполнить; всеcaseЕсли ни один блок кода не готов, подождите; также может бытьdefaultкодовый блок, всеcaseВыполняется, когда ни один блок кода не готовdefaultкодовый блок.
принцип канала
Сначала вставьтеchannelизАдрес источника, читатель может проверить это.
структура данных
Первый взглядchannelструктура:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
// channel中元素大小
elemsize uint16
// 是否已关闭
closed uint32
// channel中元素类型
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
channelБуфер на самом деле представляет собой циклическую очередь,qcountпредставляет количество элементов в очереди,dataqsizпредставляет собой общий размер кольцевой очереди,bufпредставляет указатель на круговой массив;sendxиrecvxИспользуется для определения положения отправленных и полученных в данный момент элементов в циклической очереди соответственно;recvqиsendqОба являются списком, соответственно используемым для хранения в настоящее время ожидающих получения и ожидающих отправки.Goroutine.
посмотри сноваwaitqСтруктура данных:
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
// 当前goroutine
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
вsudogУказывает список ожиданияGoroutineИнкапсуляция, которая содержит некоторую контекстуальную информацию,firstиlastсоответственно указать на начало списка ожиданияGoroutine.
Компиляция анализа
в анализеchannelПеред принципом мы сначала используемgo toolПроанализируйте следующий код, чтобы увидетьchannelКакие методы времени выполнения вызываются под капотом для различных операций:
ch := make(chan int, 2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok {
fmt.Println(n)
}
close(ch)
компилировать
go build test.go
go tool objdump -s "main\.main" test | grep CALL
ПучокCALLОтфильтровано:
test.go:118 0x1092f55 e81612f7ff CALL runtime.makechan(SB)
test.go:119 0x1092f74 e82714f7ff CALL runtime.chansend1(SB)
test.go:120 0x1092f8e e80d14f7ff CALL runtime.chansend1(SB)
test.go:121 0x1092fa5 e8361ff7ff CALL runtime.chanrecv1(SB)
test.go:122 0x1092fbd e85e1ff7ff CALL runtime.chanrecv2(SB)
test.go:126 0x1092fd7 e8841cf7ff CALL runtime.closechan(SB)
test.go:124 0x1092fea e8b156f7ff CALL runtime.convT64(SB)
print.go:275 0x1093041 e88a98ffff CALL fmt.Fprintln(SB)
test.go:47 0x1093055 e896c1fbff CALL runtime.morestack_noctxt(SB)
Создайте
Из приведенного выше анализа компиляции видно, что созданиеchannelКогда вызывается метод времени выполненияmakechan:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 计算缓冲区需要的总大小(缓冲区大小*元素大小),并判断是否超出最大可分配范围
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 缓冲区大小为0,或者channel中元素大小为0(struct{}{})时,只需分配channel必需的空间即可
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// 通过位运算知道channel中元素类型不是指针,分配一片连续内存空间,所需空间等于 缓冲区数组空间 + hchan必需的空间。
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素中包含指针,为hchan和缓冲区分别分配空间
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
makechanЛогика кода относительно проста, сначала проверьте тип элемента и размер буферного пространства, а затем создайтеhchan, выделить необходимое пространство. Здесь возможны три случая: когда размер буфера равен 0 илиchannelКогда размер элемента равен 0, просто выделитеchannelнеобходимое пространство; когдаchannelКогда тип элемента не является указателем, он должен быть толькоhchanВыделите часть непрерывного пространства памяти с буфером, размер пространства равен пространству буферного массива плюсhchanНеобходимое место; по умолчанию буфер содержит указатели, его необходимоhchanи буферная память выделяются отдельно. последнее обновлениеhchanдругих областях, в том числеelemsize,elemtype,dataqsiz.
Отправить
channelоперация отправки вызывает метод времени выполненияchansend1, существуетchansend1Внутренний вызов сноваchansend, смотрите прямо наchansendРеализация:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// channel为nil
if c == nil {
// 如果是非阻塞,直接返回发送不成功
if !block {
return false
}
// 否则,当前Goroutine阻塞挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 对于非阻塞且channel未关闭,如果无缓冲区且没有等待接收的Goroutine,或者有缓冲区且缓冲区已满,那么都直接返回发送不成功
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// 如果channel已关闭
if c.closed != 0 {
// 解锁,直接panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 除了以上情况,当channel未关闭时,就有以下几种情况:
// 1、当存在等待接收的Goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 那么直接把正在发送的值发送给等待接收的Goroutine
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 2、当缓冲区未满时
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 获取指向缓冲区数组中位于sendx位置的元素的指针
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 将当前发送的值拷贝到缓冲区
typedmemmove(c.elemtype, qp, ep)
// sendx索引加一
c.sendx++
// 因为是循环队列,sendx等于队列长度时置为0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 队列中元素总数加一,并解锁,返回发送成功
c.qcount++
unlock(&c.lock)
return true
}
// 3、当既没有等待接收的Goroutine,缓冲区也没有剩余空间,如果是非阻塞的发送,那么直接解锁,返回发送失败
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
// 4、如果是阻塞发送,那么就将当前的Goroutine打包成一个sudog结构体,并加入到channel的发送队列sendq里
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 调用goparkunlock将当前Goroutine设置为等待状态并解锁,进入休眠等待被唤醒
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// 被唤醒之后执行清理工作并释放sudog结构体
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
chansendЛогика выполнения , комментарий выше написан очень понятно, давайте разберемся. Для неблокирующей отправки илиchannelЛогика обработки относительно проста для нескольких случаев отказа отправки при закрытом условии, и читатели могут обратиться к комментариям; здесь мы сосредоточимся наchannelНесколько общих ситуаций, когда он не закрыт:
Горутины ждут приема
Если очередь ожидает приемаrecvqсуществуют вGoroutine, затем напрямую отправьте отправляемое значение получателю, ожидающему полученияGoroutine. Схематическая диаграмма выглядит следующим образом:
Посмотрите на это подробно
sendметод:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
// 将发送的值直接拷贝到接收值(比如v = <-ch 中的v)的内存地址
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// 获取等待接收数据的Goroutine
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒之前等待接收数据的Goroutine
goready(gp, skip+1)
}
Здесь необходимо пояснитьGoroutineНесколько штатов во время процесса планирования:
_Gidle = iota // goroutine刚刚分配,还没有初始化
_Grunnable // goroutine处于运行队列中, 还没有运行,没有自己的栈
_Grunning // goroutine在运行中,拥有自己的栈,被分配了M(线程)和P(调度上下文)
_Gsyscall // goroutine在执行系统调用
_Gwaiting // goroutine被阻塞
_Gdead // goroutine没有被使用,可能是刚刚退出,或者正在初始化中
_Gcopystack // 表示g当前的栈正在被移除并分配新栈
при звонкеgoreadyкогда будетGoroutineстатус от_Gwaitingустановлен в_Grunnable, дождитесь повторного выполнения следующего расписания.
Когда буфер не заполнен
Когда буфер не заполнен, найтиsendxпозиция массива буферов, на которую указывает, копирует значение, отправляемое в эту позицию, и увеличиваетsendxБлокировка указателя и разблокировки, принципиальная схема выглядит следующим образом:
блокировка отправки
Если он блокирует отправку, то текущийGoroutineупаковано в одинsudogструктуру и добавил кchannelотправить очередьsendqвнутри. Схематическая диаграмма выглядит следующим образом:
тогда позвониgoparkunlockпоставить токGoroutineУстановить как_Gwaitingсостояния и разблокировки, войти в состояние блокировки и дождаться пробуждения (вызовgoready); если планировщик разбудил его, выполнить работу по очистке и, наконец, освободить соответствующийsudogструктура.
перенимать
channelСуществует две формы приема:
<-ch
n, ok := <-ch
Эти два способа соответственно вызывают метод времени выполненияchanrecv1иchanrecv2:
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
Оба метода в конечном итоге будут вызыватьсяchanrecvметод:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// channel为nil
if c == nil {
// 非阻塞直接返回(false, false)
if !block {
return
}
// 阻塞接收,则当前Goroutine阻塞挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 非阻塞模式,对于以下两种情况:
// 1、无缓冲区且等待发送队列也为空
// 2、有缓冲区但缓冲区数组为空且channel未关闭
// 这两种情况都是接收失败, 直接返回(false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// 如果channel已关闭,并且缓冲区无元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
// 有等待接收的变量(即 v = <-ch中的v)
if ep != nil {
//根据channel元素的类型清理ep对应地址的内存,即ep接收了channel元素类型的零值
typedmemclr(c.elemtype, ep)
}
// 返回(true, false),即接收到值,但不是从channel中接收的有效值
return true, false
}
// 除了以上非常规情况,还有有以下几种常见情况:
// 1、等待发送的队列sendq里存在Goroutine,那么有两种情况:当前channel无缓冲区,或者当前channel已满
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 如果无缓冲区,那么直接从sender接收数据;否则,从buf队列的头部接收数据,并把sender的数据加到buf队列的尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
// 接收成功
return true, true
}
// 2、缓冲区buf中有元素
if c.qcount > 0 {
// Receive directly from queue
// 从recvx指向的位置获取元素
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
// 将从buf中取出的元素拷贝到当前协程
typedmemmove(c.elemtype, ep, qp)
}
// 同时将取出的数据所在的内存清空
typedmemclr(c.elemtype, qp)
// 接收索引 +1
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf元素总数 -1
c.qcount--
// 解锁,返回接收成功
unlock(&c.lock)
return true, true
}
// 3、非阻塞模式,且没有数据可以接受
if !block {
// 解锁,直接返回接收失败
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
// 4、阻塞模式,获取当前Goroutine,打包一个sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 加入到channel的等待接收队列recvq中
c.recvq.enqueue(mysg)
// 挂起当前Goroutine,设置为_Gwaiting状态并解锁,进入休眠等待被唤醒
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
// 被唤醒之后执行清理工作并释放sudog结构体
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
chanrecvЛогика обработки метода иchansendОчень похоже, мы по-прежнему анализируем здесь только несколько распространенных ситуаций, а приведенные выше комментарии для других ситуаций также объясняются более четко, и читатели могут проверить соответствующий код и комментарии.
Есть горутины, ожидающие отправки
Если очередь ожидает отправкиsendqв ожиданииGoroutine, то возможны два случая: текущийchannelнет буфера или в настоящее времяchannelполный. отsendqснять первый заблокированныйGoroutine, а затем позвонитеrecvметод:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
// 无缓冲区
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// 缓冲区已满
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将等待发送数据的Goroutine的状态从_Gwaiting置为 _Grunnable,等待下一次调度。
goready(gp, skip+1)
}
1. Если буфера нет, то напрямую изsenderПолучить данные;
2. Если буфер заполнен, отbufГолова очереди получает данные и помещаетsenderДанные добавляются в конец очереди buf;
3, последний звонокgoreadyФункция будет ждать отправки данныхGoroutineстатус от_Gwaitingустановлен в_Grunnable, ждем следующего расписания.
Следующая диаграмма иллюстрирует, что происходит, когда буфер заполнен:
В буфере buf еще есть данные
если буферbufВ нем еще есть элементы, далее переходим к обычному приему, отbufИзвлеченные элементы копируются в целевой адрес памяти полученных данных текущей сопрограммы. Стоит отметить, что даже в это времяchannelбыл закрыт и все еще может нормально загружаться из буфераbufполучать данные. Эта ситуация относительно проста, и принципиальная схема не рисуется.
блокировка приема
Если он находится в режиме блокировки и нет данных для приема, то текущийGoroutineупаковано в одинsudogпринять участие вchannelочередь ожиданияrecvq, электрический токGoroutineстатус установлен на_Gwaiting, ждет, чтобы проснуться. Схематическая диаграмма выглядит следующим образом:
Если после текущегоGoroutineПросыпайтесь по планировщику, выполняйте работу по очистке и, наконец, выпускайте соответствующийsudogструктура.
закрытие
После отправки и получения данных последний должен закрытьсяchannelв настоящее время:
func closechan(c *hchan) {
// nil channel检查
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 已关闭的channel不能再次关闭
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 设置关闭状态为1
c.closed = 1
var glist glist
// release all readers
// 遍历recvq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 遍历sendq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
将glist中所有Goroutine的状态置为_Grunnable,等待调度器进行调度
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
1. Закрытьchannel, он пройдетrecvqиsendq(на самом деле толькоrecvqилиsendq),выигратьsudogприостановлено вGoroutineпринять участие вglistсписок и очиститьsudogНекоторая информация и статус на .
2, а затем пройтиglistсписок, для каждогоGoroutineперечислитьgoreadyфункция, всеGoroutineустановлен в_GrunnableСтатус, ожидание расписания.
3. КогдаGoroutineПосле пробуждения он продолжит выполнениеchansendиchanrecvток в функцииGoroutineОставшаяся логика после пробуждения.
Суммировать
Подводя итог, эта статья сначала проходитchannelбазовая пара использованияchannelОпределение и подробности использованияchannelОсновные операции, включая отправку, получение и закрытие, рассматриваются более подробно и подробно. Внимательные читатели также должны найтиchannelОперация тесно связана с планированием сопрограмм, но эта статья посвященаgoroutineРасписание является разовым, и время для последующих действий созрело для изучения этой части контента.