Предисловие: Можно сказать, что меня действительно привлекает GO, так это параллелизм.После глубокого понимания этого механизма я получил много пользы.Далее я буду использовать свои слабые познания, чтобы рассказать о механизме параллелизма в GO.
Во-первых, процесс инициализации
До этого давайте посмотрим на код сборки в ASM_ARM64.S О логике запуска этого блока
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·hashinit(SB)
CALL runtime·schedinit(SB)
// create a new goroutine to start program
PUSHQ $runtime·main·f(SB) // entry
PUSHQ $0 // arg size
CALL runtime·newproc(SB)
POPQ AX
POPQ AX
// start this M
CALL runtime·mstart(SB)
Следующим шагом будет анализ
1. Количество процессоров и размер страницы также получаются через функцию osinit, что достаточно просто
2. Затем посмотрите на функцию schedinit (важный код, относящийся к этому разделу).
func schedinit() {
//获取当前的G
_g_ := getg()
if raceenabled {
_g_.racectx, raceprocctx0 = raceinit()
}
//设置M的最大数量
sched.maxmcount = 10000
//初始化栈空间
stackinit()
//内存空间初始化操作
mallocinit()
//初始化当前的M
mcommoninit(_g_.m)
//将P的数量调整为CPU数量
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procs > _MaxGomaxprocs {
procs = _MaxGomaxprocs
}
//初始化P
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
}
3. Выше мы видим, что функция procrsize вызывается для инициализации P, затем давайте взглянем на функцию procrsize. Этот фрагмент кода слишком длинный, поэтому анализируется в несколько частей (выложен только важный код)
(1) Инициализировать новый P
for i := int32(0); i < nprocs; i++ {
pp := allp[i]
if pp == nil {
//新建一个P对象
pp = new(p)
pp.id = i
pp.status = _Pgcstop
//保存到allp数组(负责存储P的数组)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
//如果P还没有cache,那么进行分配
if pp.mcache == nil {
if old == 0 && i == 0 {
if getg().m.mcache == nil {
throw("missing mcache?")
}
pp.mcache = getg().m.mcache // bootstrap
} else {
pp.mcache = allocmcache()//分配cache
}
}
}
(2) Выпуск неиспользованного P
for i := nprocs; i < old; i++ {
p := allp[i]
// 将本地任务添加到全局队列中
for p.runqhead != p.runqtail {
p.runqtail--
gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
// 插入全局队列的头部
globrunqputhead(gp)
}
//释放P所绑定的cache
freemcache(p.mcache)
p.mcache = nil
//将当前的P的G复用链接到全局
gfpurge(p)
p.status = _Pdead
// can't free P itself because it can be referenced by an M in syscall
}
После этих двух шагов мы создаем партию P, и простаивающий P будет помещен в список ожидания планировщика Sched.
Во-вторых, процесс создания G
Из вышеприведенного ассемблерного кода видно, что далее будет вызываться функция newproc для создания основного G, а затем функция main будет использоваться для выполнения runtime.main, после чего будет создан поток (этот поток отвечает для мониторинга системы во время работы), а затем введите основную функцию в программе GO для запуска.
Сначала посмотрите на код newproc
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)//获取参数的地址
pc := getcallerpc(unsafe.Pointer(&siz))//获取调用方的PC支
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, 0, pc)//真正创建G的地方
})
}
Далее смотрим на основной код newpro1
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
//从当前P复用链表来获取G
_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
//如果获取失败,则新建一个
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg)
}
//将得到的G放入P的运行队列中
runqput(_p_, newg, true)
//下面三个条件分别为:是否有空闲的P;M是否处于自旋状态;当前是否创建runteime.main
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && runtimeInitTime != 0 {
wakep()
}
}
Код этой функции wakep() тоже стоит посмотреть, эта идея может быть использована в обычном программировании кода.
func wakep() {
//线程被唤醒后需要绑定一个P,这里使用cas操作,可以避免唤醒过多线程,这里也对应了上面的三个判断条件之一
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
Код Startm остается для читателей, чтобы увидеть сами по себе, в противном случае весь этот блог ощущается в качестве кода. Основная идея состоит в том, чтобы получить простоя P (если входящий P пуст), а затем попытаться получить простаивание M (Fortle M сначала). Управляется планировщиком Schear, эта структура также может быть просмотрен), если вы не можете получить его, создайте M и т. Д.
Три, Канал
Этот немного проще, и кода не так много, но он все равно дает большой выигрыш.
1. Создать канал
Сначала посмотрите на определение структуры (с удалениями)
type hchan struct {
qcount uint // 队列中数据个数
dataqsiz uint // 缓冲槽大小
buf unsafe.Pointer // 指向缓冲槽的指针
elemsize uint16 // 数据大小
closed uint32 // 表示 channel 是否关闭
elemtype *_type // 数据类型
sendx uint // 发送位置索引
recvx uint // 接收位置索引
recvq waitq // 接收等待列表
sendq waitq // 发送等待列表
lock mutex // 锁
}
type sudog struct {
g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
Приведенный выше recvq на самом деле является списком G, в котором операция чтения заблокирована на канале, а sendq на самом деле является списком G, в котором операция записи заблокирована на канале, поэтому G может быть заблокирован на разных каналах одновременно, поэтому как это решить? В это время была введена sudog, которая на самом деле является оболочкой для G, представляющей G в очереди ожидания.
Далее посмотрим на процесс создания
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// 大小不超过64K
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
var c *hchan
// 整个创建过程还是简单明了的
if elem.kind&kindNoPointers != 0 || size == 0 {
//一次性分配内存
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
c.buf = unsafe.Pointer(c)
}
} else {
c = new(hchan)
c.buf = newarray(elem, int(size))
}
//设置数据大小,类型和缓冲槽大小
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
2, отправлено
Код функции отправки немного длинный, и разделение объясняется далее.
(1) Если recvq имеет блокировку G, то взять G из очереди и передать данные в G
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
(2) Если в hchan.buf еще есть место, поместите данные в
//通过比较qcount和datasiz来判断是否还有可用空间
if c.qcount < c.dataqsiz {
// 将数据放入buf中
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
(3) hchan.buf заполнен, тогда он будет заблокирован
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
//初始化一些参数
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将当前 goroutine加入等待队列
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
Здесь мы видим, что если он заполнен, появится sudog, который после инициализации войдет в очередь ожидания от имени текущего G.
3. Получить
Точно так же получение также разделено на три случая
(1) в настоящее время находится отправка Goroutine, заблокированная на канале, и Buf заполнен
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
(2) Есть данные в buf
if c.qcount > 0 {
// 直接从队列中接收
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
(3) В buf нет данных, тогда он будет заблокирован
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 同样的,由sudog代表G去排队
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
Резюме: Хоть логика этого кода и не сложная, но все равно много времени уходит на проектирование многих вещей.Теперь я понимаю логику выполнения G в M, но до сих пор не знаю деталей.Буду продолжать изучить его позже. Прочитав его в целом, в первую очередь можно сказать, что вы хорошо разбираетесь в механизме параллелизма, что обязательно пригодится для написания связанного кода в будущем. Во-вторых, я узнал некоторые идеи программирования, такие как операция cas, как лучше инкапсулировать и абстрагироваться.