Глубокое понимание канала Голанга

задняя часть Go
Глубокое понимание канала Голанга

предисловие

В Golang есть два основных инструмента параллельного программирования, а именно:channelиgoroutine, в этой статье поговорим оchannel. Любой, кто знаком с Golang, знает известную поговорку: «Используйте общение, чтобы делиться памятью, а не делиться памятью, чтобы общаться». Это предложение имеет два значения, язык Go действительно находится вsyncТрадиционный запорный механизм предусмотрен в упаковке, но более рекомендуетсяchannelдля решения проблемы параллелизма. Эта статья начнется сchannelиспользование,channelПринцип двухчастной парыchannelСделать более глубокое исследование.

использование канала

что такое канал

В прямом смысле,channelСмысл наверное в смысле пайплайна.channelэто безопасная очередь сообщений, используемая сопрограммами go для получения или отправки сообщений,channelЭто похоже на канал между двумя сопрограммами go для синхронизации различных ресурсов. Это можно проиллюстрировать следующей схемой:

channelИспользование простое:

func main() {
    ch := make(chan int, 1) // 创建一个类型为int,缓冲区大小为1的channel
    ch <- 2 // 将2发送到ch
    n, ok := <- ch // n接收从ch发出的值
    if ok {
        fmt.Println(n) // 2
    }
    close(ch) // 关闭channel
}

использоватьchannelСледует отметить несколько моментов:

  • кnil channelОтправка сообщения всегда блокируется;
  • к закрытомуchannelОтправка сообщения вызовет панику во время выполнения(panic);
  • channelПосле закрытия вы не можете продолжатьchannelотправить сообщение, но можно продолжить сchannelполучать сообщения;
  • когдаchannelПри закрытии и пустом буфере продолжить сchannelПри получении сообщения будет получено нулевое значение соответствующего типа.

Небуферизованные каналы против буферизованных каналов

Unbuffered channelsозначает, что размер буфера равен 0channel, этоchannelПолучатель блокируется, пока сообщение не будет получено, а отправитель блокируется, пока получатель не получит сообщение.Этот механизм можно использовать для двух целей.goroutineвыполнять синхронизацию состояний;Buffered channelsИмея буфер, когда буфер заполнен, отправитель блокируется, когда буфер пуст, блокируется получатель.

ЦитироватьThe Nature Of Channels In GoДве фигуры вUnbuffered channelsиBuffered channels, очень ярко, читатели могут испытать это на себе:

Unbuffered channels:

Unbuffered channels
Unbuffered channels

Buffered channels:

Buffered channels
Buffered channels

Обход каналов

for range

channelслужба поддержкиfor rangeИтерации в пути:

package main  

import "fmt"  

func main() {  
    ci := make(chan int, 5)  
    for i := 1; i <= 5; i++ {
        ci <- i
    }    
    close(ci)  

    for i := range ci {  
        fmt.Println(i)  
    }  
}  

Стоит отметить, что при обходе, еслиchannelЕсли он не закрыт, он будет ждать и появитсяdeadlockошибка; если при обходеchannelБыл закрыт, а затем автоматически выйти из обхода после обхода данных. Это,for rangeМетод обхода является блокирующим методом обхода.

for select

selectМожет обрабатывать неблокирующую отправку, получение и мультиплексирование сообщений.

package main  

import "fmt"  

func main() {  
    ci := make(chan int, 2)
    for i := 1; i <= 2; i++ {
        ci <- i
    }
    close(ci)

    cs := make(chan string, 2)
    cs <- "hi"
    cs <- "golang"
    close(cs)

    ciClosed, csClosed := false, false
    for {
        if ciClosed && csClosed {
            return
        }
        select {
        case i, ok := <-ci:
            if ok {
                fmt.Println(i)
            } else {
                ciClosed = true
                fmt.Println("ci closed")
            }
        case s, ok := <-cs:
            if ok {
                fmt.Println(s)
            } else {
                csClosed = true
                fmt.Println("cs closed")
            }
        default:
            fmt.Println("waiting...")
        }
    }
}  

selectимеютcaseкодовый блок дляchannelОтправлять или получать сообщения, либоcaseКогда блок кода готов, выполняется соответствующее ему содержимое;caseКогда блок кода будет готов, выберите его наугад.caseкодовый блок и выполнить; всеcaseЕсли ни один блок кода не готов, подождите; также может бытьdefaultкодовый блок, всеcaseВыполняется, когда ни один блок кода не готовdefaultкодовый блок.

принцип канала

Сначала вставьтеchannelизАдрес источника, читатель может проверить это.

структура данных

Первый взглядchannelструктура:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    // channel中元素大小
    elemsize uint16 
    // 是否已关闭
    closed   uint32
    // channel中元素类型
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

channelБуфер на самом деле представляет собой циклическую очередь,qcountпредставляет количество элементов в очереди,dataqsizпредставляет собой общий размер кольцевой очереди,bufпредставляет указатель на круговой массив;sendxиrecvxИспользуется для определения положения отправленных и полученных в данный момент элементов в циклической очереди соответственно;recvqиsendqОба являются списком, соответственно используемым для хранения в настоящее время ожидающих получения и ожидающих отправки.Goroutine.

посмотри сноваwaitqСтруктура данных:

type waitq struct {
    first *sudog
    last  *sudog
}

type sudog struct {
    // 当前goroutine
    g *g

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

вsudogУказывает список ожиданияGoroutineИнкапсуляция, которая содержит некоторую контекстуальную информацию,firstиlastсоответственно указать на начало списка ожиданияGoroutine.

Компиляция анализа

в анализеchannelПеред принципом мы сначала используемgo toolПроанализируйте следующий код, чтобы увидетьchannelКакие методы времени выполнения вызываются под капотом для различных операций:

ch := make(chan int, 2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok {
    fmt.Println(n)
}
close(ch)

компилировать

go build test.go
go tool objdump -s "main\.main" test | grep CALL

ПучокCALLОтфильтровано:

  test.go:118           0x1092f55               e81612f7ff              CALL runtime.makechan(SB)
  test.go:119           0x1092f74               e82714f7ff              CALL runtime.chansend1(SB)
  test.go:120           0x1092f8e               e80d14f7ff              CALL runtime.chansend1(SB)
  test.go:121           0x1092fa5               e8361ff7ff              CALL runtime.chanrecv1(SB)
  test.go:122           0x1092fbd               e85e1ff7ff              CALL runtime.chanrecv2(SB)
  test.go:126           0x1092fd7               e8841cf7ff              CALL runtime.closechan(SB)
  test.go:124           0x1092fea               e8b156f7ff              CALL runtime.convT64(SB)
  print.go:275          0x1093041               e88a98ffff              CALL fmt.Fprintln(SB)
  test.go:47            0x1093055               e896c1fbff              CALL runtime.morestack_noctxt(SB)

Создайте

Из приведенного выше анализа компиляции видно, что созданиеchannelКогда вызывается метод времени выполненияmakechan:

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    // 计算缓冲区需要的总大小(缓冲区大小*元素大小),并判断是否超出最大可分配范围
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // 缓冲区大小为0,或者channel中元素大小为0(struct{}{})时,只需分配channel必需的空间即可
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0:
        // 通过位运算知道channel中元素类型不是指针,分配一片连续内存空间,所需空间等于 缓冲区数组空间 + hchan必需的空间。
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 元素中包含指针,为hchan和缓冲区分别分配空间
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}

makechanЛогика кода относительно проста, сначала проверьте тип элемента и размер буферного пространства, а затем создайтеhchan, выделить необходимое пространство. Здесь возможны три случая: когда размер буфера равен 0 илиchannelКогда размер элемента равен 0, просто выделитеchannelнеобходимое пространство; когдаchannelКогда тип элемента не является указателем, он должен быть толькоhchanВыделите часть непрерывного пространства памяти с буфером, размер пространства равен пространству буферного массива плюсhchanНеобходимое место; по умолчанию буфер содержит указатели, его необходимоhchanи буферная память выделяются отдельно. последнее обновлениеhchanдругих областях, в том числеelemsize,elemtype,dataqsiz.

Отправить

channelоперация отправки вызывает метод времени выполненияchansend1, существует
chansend1Внутренний вызов сноваchansend, смотрите прямо наchansendРеализация:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // channel为nil
    if c == nil {
        // 如果是非阻塞,直接返回发送不成功
        if !block {
            return false
        }
        // 否则,当前Goroutine阻塞挂起
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.

    // 对于非阻塞且channel未关闭,如果无缓冲区且没有等待接收的Goroutine,或者有缓冲区且缓冲区已满,那么都直接返回发送不成功
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加锁
    lock(&c.lock)

    // 如果channel已关闭
    if c.closed != 0 {
        // 解锁,直接panic
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    // 除了以上情况,当channel未关闭时,就有以下几种情况:

    // 1、当存在等待接收的Goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).

        // 那么直接把正在发送的值发送给等待接收的Goroutine
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 2、当缓冲区未满时
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        // 获取指向缓冲区数组中位于sendx位置的元素的指针
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        // 将当前发送的值拷贝到缓冲区
        typedmemmove(c.elemtype, qp, ep)
        // sendx索引加一
        c.sendx++
        // 因为是循环队列,sendx等于队列长度时置为0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 队列中元素总数加一,并解锁,返回发送成功
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // 3、当既没有等待接收的Goroutine,缓冲区也没有剩余空间,如果是非阻塞的发送,那么直接解锁,返回发送失败
    if !block {
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.
    // 4、如果是阻塞发送,那么就将当前的Goroutine打包成一个sudog结构体,并加入到channel的发送队列sendq里
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)

    // 调用goparkunlock将当前Goroutine设置为等待状态并解锁,进入休眠等待被唤醒
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
    // 被唤醒之后执行清理工作并释放sudog结构体
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

chansendЛогика выполнения , комментарий выше написан очень понятно, давайте разберемся. Для неблокирующей отправки илиchannelЛогика обработки относительно проста для нескольких случаев отказа отправки при закрытом условии, и читатели могут обратиться к комментариям; здесь мы сосредоточимся наchannelНесколько общих ситуаций, когда он не закрыт:

Горутины ждут приема

Если очередь ожидает приемаrecvqсуществуют вGoroutine, затем напрямую отправьте отправляемое значение получателю, ожидающему полученияGoroutine. Схематическая диаграмма выглядит следующим образом:


Посмотрите на это подробноsendметод:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ...

    if sg.elem != nil {
        // 将发送的值直接拷贝到接收值(比如v = <-ch 中的v)的内存地址
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    // 获取等待接收数据的Goroutine
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 唤醒之前等待接收数据的Goroutine
    goready(gp, skip+1)
}

Здесь необходимо пояснитьGoroutineНесколько штатов во время процесса планирования:

_Gidle = iota // goroutine刚刚分配,还没有初始化

_Grunnable // goroutine处于运行队列中, 还没有运行,没有自己的栈

_Grunning // goroutine在运行中,拥有自己的栈,被分配了M(线程)和P(调度上下文)

_Gsyscall // goroutine在执行系统调用

_Gwaiting // goroutine被阻塞

_Gdead // goroutine没有被使用,可能是刚刚退出,或者正在初始化中

_Gcopystack // 表示g当前的栈正在被移除并分配新栈

при звонкеgoreadyкогда будетGoroutineстатус от_Gwaitingустановлен в_Grunnable, дождитесь повторного выполнения следующего расписания.

Когда буфер не заполнен

Когда буфер не заполнен, найтиsendxпозиция массива буферов, на которую указывает, копирует значение, отправляемое в эту позицию, и увеличиваетsendxБлокировка указателя и разблокировки, принципиальная схема выглядит следующим образом:

блокировка отправки

Если он блокирует отправку, то текущийGoroutineупаковано в одинsudogструктуру и добавил кchannelотправить очередьsendqвнутри. Схематическая диаграмма выглядит следующим образом:

тогда позвониgoparkunlockпоставить токGoroutineУстановить как_Gwaitingсостояния и разблокировки, войти в состояние блокировки и дождаться пробуждения (вызовgoready); если планировщик разбудил его, выполнить работу по очистке и, наконец, освободить соответствующийsudogструктура.

перенимать

channelСуществует две формы приема:

<-ch
n, ok := <-ch

Эти два способа соответственно вызывают метод времени выполненияchanrecv1иchanrecv2:

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

Оба метода в конечном итоге будут вызыватьсяchanrecvметод:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }

    // channel为nil
    if c == nil {
        // 非阻塞直接返回(false, false)
        if !block {
            return
        }
        // 阻塞接收,则当前Goroutine阻塞挂起
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.

    // 非阻塞模式,对于以下两种情况:
    // 1、无缓冲区且等待发送队列也为空
    // 2、有缓冲区但缓冲区数组为空且channel未关闭
    // 这两种情况都是接收失败, 直接返回(false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加锁
    lock(&c.lock)
    // 如果channel已关闭,并且缓冲区无元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        // 有等待接收的变量(即 v = <-ch中的v)
        if ep != nil {
            //根据channel元素的类型清理ep对应地址的内存,即ep接收了channel元素类型的零值
            typedmemclr(c.elemtype, ep)
        }
        // 返回(true, false),即接收到值,但不是从channel中接收的有效值
        return true, false
    }

    // 除了以上非常规情况,还有有以下几种常见情况:

    // 1、等待发送的队列sendq里存在Goroutine,那么有两种情况:当前channel无缓冲区,或者当前channel已满
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        // 如果无缓冲区,那么直接从sender接收数据;否则,从buf队列的头部接收数据,并把sender的数据加到buf队列的尾部
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        // 接收成功
        return true, true
    }

    // 2、缓冲区buf中有元素
    if c.qcount > 0 {
        // Receive directly from queue
        // 从recvx指向的位置获取元素
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            // 将从buf中取出的元素拷贝到当前协程
            typedmemmove(c.elemtype, ep, qp)
        }
        // 同时将取出的数据所在的内存清空
        typedmemclr(c.elemtype, qp)
        // 接收索引 +1
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buf元素总数 -1
        c.qcount--
        // 解锁,返回接收成功
        unlock(&c.lock)
        return true, true
    }

    // 3、非阻塞模式,且没有数据可以接受
    if !block {
        // 解锁,直接返回接收失败
        unlock(&c.lock)
        return false, false
    }

    // no sender available: block on this channel.
    // 4、阻塞模式,获取当前Goroutine,打包一个sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 加入到channel的等待接收队列recvq中
    c.recvq.enqueue(mysg)
    // 挂起当前Goroutine,设置为_Gwaiting状态并解锁,进入休眠等待被唤醒
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

    // someone woke us up
    // 被唤醒之后执行清理工作并释放sudog结构体
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

chanrecvЛогика обработки метода иchansendОчень похоже, мы по-прежнему анализируем здесь только несколько распространенных ситуаций, а приведенные выше комментарии для других ситуаций также объясняются более четко, и читатели могут проверить соответствующий код и комментарии.

Есть горутины, ожидающие отправки

Если очередь ожидает отправкиsendqв ожиданииGoroutine, то возможны два случая: текущийchannelнет буфера или в настоящее времяchannelполный. отsendqснять первый заблокированныйGoroutine, а затем позвонитеrecvметод:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        // 无缓冲区
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // copy data from sender
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 缓冲区已满
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
        }
        // copy data from queue to receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 将等待发送数据的Goroutine的状态从_Gwaiting置为 _Grunnable,等待下一次调度。
    goready(gp, skip+1)
}

1. Если буфера нет, то напрямую изsenderПолучить данные;
2. Если буфер заполнен, отbufГолова очереди получает данные и помещаетsenderДанные добавляются в конец очереди buf;
3, последний звонокgoreadyФункция будет ждать отправки данныхGoroutineстатус от_Gwaitingустановлен в_Grunnable, ждем следующего расписания.

Следующая диаграмма иллюстрирует, что происходит, когда буфер заполнен:

В буфере buf еще есть данные

если буферbufВ нем еще есть элементы, далее переходим к обычному приему, отbufИзвлеченные элементы копируются в целевой адрес памяти полученных данных текущей сопрограммы. Стоит отметить, что даже в это времяchannelбыл закрыт и все еще может нормально загружаться из буфераbufполучать данные. Эта ситуация относительно проста, и принципиальная схема не рисуется.

блокировка приема

Если он находится в режиме блокировки и нет данных для приема, то текущийGoroutineупаковано в одинsudogпринять участие вchannelочередь ожиданияrecvq, электрический токGoroutineстатус установлен на_Gwaiting, ждет, чтобы проснуться. Схематическая диаграмма выглядит следующим образом:

Если после текущегоGoroutineПросыпайтесь по планировщику, выполняйте работу по очистке и, наконец, выпускайте соответствующийsudogструктура.

закрытие

После отправки и получения данных последний должен закрытьсяchannelв настоящее время:

func closechan(c *hchan) {
    // nil channel检查
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    // 已关闭的channel不能再次关闭
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }
    // 设置关闭状态为1
    c.closed = 1

    var glist glist

    // release all readers
    // 遍历recvq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
    // 遍历sendq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    将glist中所有Goroutine的状态置为_Grunnable,等待调度器进行调度
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

1. Закрытьchannel, он пройдетrecvqиsendq(на самом деле толькоrecvqилиsendq),выигратьsudogприостановлено вGoroutineпринять участие вglistсписок и очиститьsudogНекоторая информация и статус на .

2, а затем пройтиglistсписок, для каждогоGoroutineперечислитьgoreadyфункция, всеGoroutineустановлен в_GrunnableСтатус, ожидание расписания.

3. КогдаGoroutineПосле пробуждения он продолжит выполнениеchansendиchanrecvток в функцииGoroutineОставшаяся логика после пробуждения.

Суммировать

Подводя итог, эта статья сначала проходитchannelбазовая пара использованияchannelОпределение и подробности использованияchannelОсновные операции, включая отправку, получение и закрытие, рассматриваются более подробно и подробно. Внимательные читатели также должны найтиchannelОперация тесно связана с планированием сопрограмм, но эта статья посвященаgoroutineРасписание является разовым, и время для последующих действий созрело для изучения этой части контента.

использованная литература

1,The Nature Of Channels In Go
2,Concurrency in Golang