Исходный код графического канала Golang

Go

предисловие

Во-первых, давайте возьмем схему расположения каналов.Нижний слой канала на самом деле не сложный, и не используются глубокие знания.В основном он вращается вокруг круговой очереди и двух связанных списков. Я верю, что прочитав эту статью, вы сможете освоить реализацию канала.

HD-адрес

hchan结构

Введение в канал

  • Канал — это тип канала, через который сообщения могут отправляться и приниматься между подпрограммами.
  • Метод связи между подпрограммами, предоставляемый уровнем языка go.

В повседневной разработке вы должны быть знакомы с использованием канала, но после понимания основного использования вам интересно узнать о его базовой реализации, почему он может реализовать связь между параллельными подпрограммами? С этим любопытством давайте изучим реализацию исходного кода внизу канала!

использование канала

Вот самое простое использование канала:

package main
import "fmt"

func main() {
  c := make(chan int)
  go func() {
    // 发送数据到channel
    c <- 1
  }()
  // 从channel取出数据
  x := <- c
  close(c)
  fmt.Println(x)
}

ввод исходного кода канала

Make,

go tool compile -N -l -S main.go>hello.s

Просмотрите часть основного содержимого с инструкцией CALL следующим образом:

0x0043 00067 (main.go:42) CALL  runtime.makechan(SB)
0x006a 00106 (main.go:44) CALL  runtime.newproc(SB)
0x008b 00139 (main.go:47) CALL  runtime.chanrecv1(SB)
0x0032 00050 (main.go:45) CALL  runtime.chansend1(SB)
0x00a3 00163 (main.go:48) CALL  runtime.closechan(SB)

Соответствие можно предположить:

  • make(chan int) соответствует: функция runtime.makechan
  • Создание сопрограммы: функция runtime.newproc
  • ch
  • x :=
  • close(c) Закрыть оператор канала, соответствующий: функция runtime.closechan

Связанный исходный код нужно только перейти к пакету времени выполнения, глобальный поиск можно найти в файле runtime/chan.go

func makechan(t *chantype, size int) *hchan {}
func chansend1(c *hchan, elem unsafe.Pointer) {}
func chanrecv1(c *hchan, elem unsafe.Pointer) {}
func closechan(c *hchan) {}

Анализ исходного кода

Все три приведенные выше функции используют параметр типа hchan, который является основной структурой данных канала.Давайте сначала проанализируем hchan.

При отладке с точками останова в IDE вы также можете увидеть внутреннюю структуру данных chan

  • Местонахождение: src/runtime/chan.go

структура данных чан

  • Внутренняя структура данных канала представляет собой двунаправленный циклический список фиксированной длины.
  • Запишите данные, введенные гладкими, заполненными и с этого момента начали писать 0
  • Два важных компонента тян:bufиwaitq, все поведения и реализации вращаются вокруг двух компонентов

Эта картинка, предоставленная Go Yedu на github, более яркая и прямо процитирована.

type hchan struct {
  // 当前队列中总元素个数
  qcount   uint           // total data in the queue
  // 环形队列长度,即缓冲区大小(申明channel时指定的大小)
  dataqsiz uint           // size of the circular queue
  // 环形队列指针
  buf      unsafe.Pointer // points to an array of dataqsiz elements
  // buf中每个元素的大小
  elemsize uint16
  // 当前通道是否处于关闭状态,创建通道时该字段为0,关闭时字段为1
  closed   uint32
  // 元素类型,用于传值过程的赋值
  elemtype *_type // element type
  // 环形缓冲区中已发送位置索引
  sendx    uint   // send index
  // 环形缓冲区中已接收位置索引
  recvx    uint   // receive index
  // 等待读消息的groutine队列
  recvq    waitq  // list of recv waiters
  // 等待写消息的groutine队列
  sendq    waitq  // list of send waiters
  // 互斥锁,为每个读写操作锁定通道(发送和接收必须互斥)
  lock mutex
}

// 等待读写的队列数据结构,保证先进先出
type waitq struct {
  first *sudog
  last  *sudog
}

создать канал

Обзор:

При создании канала вы можете помещать в канал разные типы данных, и пространство, занимаемое разными типами данных, также отличается, что определяет, сколько места для хранения необходимо открыть для полей hchan и buf в hchan. Различные ситуации обрабатываются по-разному в исходном коде go. Возможны три ситуации:

Общий принцип таков: общий объем памяти = объем памяти, требуемый hchan + объем памяти, требуемый элементами.

  • Очередь пуста или размер элемента равен 0: только пространство памяти, которое необходимо открыть, является размером самого hchan
  • Элемент не является указателем: объем памяти, который нужно открыть = размер самого hchan + размер каждого элемента * длина применяемой очереди
  • Элемент имеет тип указателя: в этом случае buf нужно освобождать место отдельно, а размер памяти, занимаемой buf, равен размеру каждого элемента * длине очереди, применяемой для

входить:

  • Chantype: тип канала
  • Размер: размер канала

вывод:

  • Создан объект хчан

Основной процесс:

  • Проверка различных параметров
  • присвоение данных
  • Создать буферную память (различить три случая, когда элемент пустой, элемент имеет указатель и элемент не имеет указателя)

HD-адрес

channel创建

// 对应的源码为 c := make(chan int, size)
// c := make(chan int) 这种情况下,size = 0
func makechan(t *chantype, size int) *hchan {
  elem := t.elem

  // 总共需要的buff大小 = channel中创建的这种元素类型的大小(elem.size)* size
  mem, overflow := math.MulUintptr(elem.size, uintptr(size))

  var c *hchan
  // 下面是为buf创建并分配存储空间
  switch {
  case mem == 0:
    // size为0,或者每个元素占用的大小为0
    // 这时为buf分配大小时,只需要分配hchan结构体本身占用的大小即可
    // hchanSize是一个常量,表示空的hchan需要占用的字节大小
    // hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    // raceaddr内部实现为:return unsafe.Pointer(&c.buf)
    c.buf = c.raceaddr()
  case elem.ptrdata == 0:
    // 如果队列中不存在指针,那么每个元素都需要被存储并占用空间,占用大小为前面乘法算出来的mem
    // 同时还要加上hchan本身占用的空间大小,加起来就是整个hchan占用的空间大小
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    // 把buf指针指向空的hchan占用空间大小的末尾
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    // Elements contain pointers.
    // 如果chan中的元素是指针类型的数据,为buf单独开辟mem大小的空间,用来保存所有的数据
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }
  // 设置chan的总大小
  c.elemsize = uint16(elem.size)
  // 元素类型
  c.elemtype = elem
  // 环形队列的大小,即用户创建时设置的大小
  c.dataqsiz = uint(size)
  return c
}

отправить данные на канал

Обзор:

При отправке данных в канал интуитивно понятно, что нужно поместить данные в циклическую очередь chan, но go сделал некоторые оптимизации: сначала определите, есть ли подпрограмма, ожидающая получения данных, и если да, отправьте данные непосредственно в groutine, разбудить groutine, а не ставить в очередь. Конечно, есть и другая ситуация: если очередь заполнена, его можно только поставить в очередь и ждать, пока данные не заберут, прежде чем их можно будет отправить.

входить:

  • чан объект
  • данные для отправки
  • блокировать ли
  • Перезвони

вывод: нет

Основная логика:

  1. Если recvq не пуст, выньте подпрограмму, ожидающую получения данных от recvq, и отправьте данные в подпрограмму.
  2. Если recvq пуст, поместите данные в buf
  3. Если buf заполнен, упакуйте данные для отправки и текущую подпрограмму в объект Sudog и поместите его в sendq, а подпрограмму переведите в состояние ожидания.

Есть подпрограммы, ожидающие получения данных

HD-адрес

channel发送-有receive

Нет подпрограмм, ожидающих получения данных, кольцевая очередь не заполнена

HD-адрес

channel发送-无等待groutine-队列未满

Подпрограммы, ожидающие получения данных, отсутствуют, кольцевая очередь заполнена

HD-адрес

channel发送-无等待groutine-队列已满

Отправить источник данных

// ep指向要发送数据的首地址
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

  // 先上锁
  lock(&c.lock)

  // 如果channel已经关闭,抛出错误
  // 下面这个错误经常会遇到,都是对channel使用不当报出来的
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
  }

  // 从接收队列中取出元素,如果取到数据,就将数据传过去
  if sg := c.recvq.dequeue(); sg != nil {
    // 调用send方法,将值传过去
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
  }

  // 走到这里,说明没有等待接收数据的Groutine
  // 如果缓冲区没有满,直接将要发送的数据复制到缓冲区
  if c.qcount < c.dataqsiz {
    // c.sendx是已发送的索引位置,这个方法通过指针偏移找到索引位置
    // 相当于执行c.buf(c.sendx)
    qp := chanbuf(c, c.sendx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }

    // 复制数据,内部调用了memmove,是用汇编实现的
    // 通知接收方数据给你了,将接收方协程由等待状态改成可运行状态,
    // 将当前协程加入协程队列,等待被调度
    typedmemmove(c.elemtype, qp, ep)

    // 数据索引前移,如果到了末尾,又从0开始
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }

    // 元素个数加1,释放锁并返回
    c.qcount++
    unlock(&c.lock)
    return true
  }

  // 走到这里,说明缓冲区也写满了
  // 同步非阻塞的情况,直接返回
  if !block {
    unlock(&c.lock)
    return false
  }

  // 以下为同步阻塞的情况
  // 此时会将当前的Groutine以及要发送的数据放入到sendq队列中,并且切换出该Groutine
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil

  // 将Groutine放入sendq队列
  c.sendq.enqueue(mysg)

  // Groutine转入 waiting 状态,gopark是调度相关的代码
  // 在用户看来,向channel发送数据的代码语句会阻塞
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  KeepAlive(ep)

  // G被唤醒
  if mysg != gp.waiting {
    throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  if gp.param == nil {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
  gp.param = nil
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  mysg.c = nil

  // G被唤醒,状态改成可执行状态,从这里开始继续执行
  releaseSudog(mysg)
  return true
}

функция отправки

// 要发送的数据ep,被拷贝到接收者sg中,之后sg被唤醒继续执行
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {

  // 拷贝数据
  if sg.elem != nil {
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  // 放入调度队列,等待被调度
  goready(gp, skip+1)
}

читать данные

Обзор:

Процесс чтения данных из канала аналогичен процессу отправки, который в основном является обратным операции отправки.

При чтении данных из канала вместо прямого обращения к циклической очереди для получения данных сначала определите, есть ли подпрограмма, ожидающая отправки данных. вверх по засыпке. Если нет подпрограммы, ожидающей отправки данных, извлеките данные из циклической очереди.

входить:

  • чан объект
  • Указатель, получающий данные
  • блокировать ли

Выход: успешен ли прием

Основная логика:

  1. Если есть подпрограмма, ожидающая отправки данных, удалите подпрограмму, ожидающую отправки данных из sendq, и выньте данные.
  2. Если нет ожидающей подпрограммы и есть данные в циклической очереди, взять данные из очереди
  3. Если нет ожидающей подпрограммы и нет данных в циклической очереди, заблокируйте подпрограмму, упакуйте подпрограмму как sudogo и добавьте ее в очередь ожидания recevq.

В sendq есть ожидающие подпрограммы

HD-адрес

发送数据-有等待的groutine

В sendq нет ожидающих подпрограмм, очередь не пуста

HD-адрес

发送数据-无等待的groutine-队列不为空

В sendq нет ожидающих подпрограмм, очередь пуста

HD-адрес

发送数据-无等待的groutine-队列为空

читать источник данных

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

  // 上锁
  lock(&c.lock)
  // 优先从发送队列中取数据,如果有等待发送数据的groutine,直接从发送数据的协程中取出数据
  if sg := c.sendq.dequeue(); sg != nil {
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
  }

  // chan环形队列中如果有有数据
  if c.qcount > 0 {
    // 从接收数据的索引出取出数据
    // 等价于 c.buf[c.recvx]
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    // 将数据拷贝到接收数据的协程
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    // 接收数据的索引前移
    c.recvx++
    // 环形队列,如果到了末尾,再从0开始
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    // 发送数据的索引移动位置
    c.qcount--
    unlock(&c.lock)
    return true, true
  }

  // 同步非阻塞,协程直接返回
  if !block {
    unlock(&c.lock)
    return false, false
  }

  // 同步阻塞
  // 如果代码走到这,说明没有任何数据可以获取到,阻塞住协程,并加入channel的接收队列中
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  gp.waiting = mysg
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.param = nil

  // 添加到接收队列中
  c.recvq.enqueue(mysg)
  // 调度
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

  // someone woke us up
  if mysg != gp.waiting {
    throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  closed := gp.param == nil
  gp.param = nil
  mysg.c = nil

  // G被唤醒,从这里继续执行
  releaseSudog(mysg)
  return true, !closed
}

закрыть канал

Вход: канал

вывод: нет

Основной процесс:

  • выключенное состояние
  • Разбудите все сопрограммы, ожидающие чтения канала
  • Все сопрограммы, ожидающие записи в канал, выдают исключение
func closechan(c *hchan) {
  // channel为空,抛出异常
  if c == nil {
    panic(plainError("close of nil channel"))
  }

  // 上锁
  lock(&c.lock)

  // 如果channel已经被关闭,抛出异常
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }

  // 设置关闭状态的值
  c.closed = 1

  // 申明一个存放g的list,把所有的groutine放进来
  // 目的是尽快释放锁,因为队列中可能还有数据需要处理,可能用到锁
  var glist gList

  // release all readers
  // 唤醒所有等待读取chanel数据的协程
  for {
    sg := c.recvq.dequeue()
    // 等待队列处理完毕,退出
    if sg == nil {
      break
    }
    if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    // 加入临时队列
    glist.push(gp)
  }

  // release all writers (they will panic)
  // 处理所有要发送数据的协程,抛出异常
  for {
    sg := c.sendq.dequeue()
    if sg == nil {
      break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    // 加入临时队列
    glist.push(gp)
  }
  unlock(&c.lock)

  // Ready all Gs now that we've dropped the channel lock.
  // 处理临时队列中所有的groutine
  for !glist.empty() {
    gp := glist.pop()
    gp.schedlink = 0

    // 放入调度队列,等待被调度
    goready(gp, 3)
  }
}

Суммировать

При использовании CHANNEL это кажется очень сложным.Это любопытство потребовалось для изучения реализации исходного кода.После прочтения это было не так уж сложно, и логика базовой реализации была очень ясной. В этом документе объединена логика базового способа, включая создание канала, отправку данных, получение данных и т.п. Конечно, это также включает в себя планирование и другие знания, которые будут проанализированы в конце.

Ссылаться на