Всем привет! Давно не видел сериал "Язык Deep Decryption Go". Сегодня поговорим о канале. Желаю вам приятного чтения!
Модель параллелизма
Параллелизм и параллелизм
Всем известен знаменитый закон Мура. В 1965 году Гордон Мур, тогдашний Fairchild, опубликовал статью, предсказывающую, что количество транзисторов и резисторов на полупроводниковых микросхемах будет удваиваться каждый год в течение следующих десяти лет, двух лет». Этот прогноз был в основном верным примерно в 2012 году.
Но закон Мура в конечном итоге перестает действовать, когда транзисторные схемы приближаются к пределу своих возможностей. Повышение производительности компьютера за счет увеличения количества транзисторов больше не работает. В результате люди стали менять свое мышление и использовать другие методы повышения производительности компьютеров, поэтому и появились многоядерные компьютеры.
Это кажется хорошей идеей, но люди столкнулись с ограничением другого закона, то есть закона Амдала, который предлагает модель для измерения повышения эффективности программы в параллельном режиме. Этот закон гласит, что верхняя граница прироста производительности, которую программа может получить за счет параллелизма, зависит от того, сколько кода должно быть написано последовательно.
Например, интерфейсная программа, работающая с пользователем, должна иметь дело с пользователем. Пользователь нажимает кнопку, прежде чем перейти к следующему шагу, который должен выполняться последовательно. Эффективность такого рода программ зависит от скорости взаимодействия с пользователем, сколько у вас ядер — это зря. Пользователь просто не нажимает «Далее», что делать?
С появлением облачных вычислений примерно в 2000 году люди могут легко получать ресурсы в вычислительном облаке, легко расширять свои услуги по горизонтали и легко мобилизовывать ресурсы нескольких машин и даже распределять вычислительные задачи среди машин, распределенных по всему миру. Но это также приносит много проблем и проблем. Например, как общаться между машинами, агрегировать результаты и т. д. Одна из самых сложных задач — найти модель, описывающую параллелизм.
Все мы знаем, что очень сложно иметь параллельный код без ошибок. Некоторые ошибки параллелизма обнаруживаются спустя годы после запуска системы, часто по странным причинам, таким как увеличение числа пользователей до определенного предела.
Проблемы параллелизма обычно включают следующее:
гонка данных. Проще говоря, два или более потока читают и записывают переменную одновременно, что приводит к неожиданным результатам.
атомарность. В определенном контексте атомарные операции неразделимы. Определение контекста очень важно. Какой-то код, выглядишь атомарным в программе, типа простейшего i++, но на машинном уровне этот оператор обычно занимает несколько инструкций для выполнения (Load, Incr, Store), не неделимый, тоже не атомарный. Атомарность позволяет нам уверенно создавать программы, безопасные для параллелизма.
Синхронизация доступа к памяти. Область кода, которая должна контролироваться только одним потоком в каждый момент времени, называется критической секцией. В языке Go мьютекс в пакете синхронизации обычно используется для завершения синхронного управления доступом. Блокировки, как правило, приносят относительно большие потери производительности, поэтому обычно необходимо учитывать, будет ли заблокированная область часто входить, и как контролировать степень детализации блокировки.
тупик. В заблокированной программе каждый поток ожидает других потоков, создавая неловкую ситуацию сквозного соединения, и программа не может продолжать работу.
Лайвлок. Представьте, что вы идете по тропинке, и к вам приближается человек. Вы идете налево, стараясь избежать его, он делает наоборот, идет направо и не может пройти мимо обоих. После этого они оба хотели пойти в направлении, противоположном первоначальному, и результат был тот же. Это livelock, похоже, он работает, но прогресс работы просто не может двигаться вперед.
голод. Параллельный поток не может получить ресурсы, необходимые ему для дальнейшей работы. Обычно есть очень жадный поток, который занимает ресурсы в течение длительного времени и не освобождает их, так что другие потоки не могут получить ресурсы.
Что касается разницы между параллелизмом и параллелизмом, приведите классическое описание:
Параллелизм — это способность иметь дело с несколькими вещами одновременно. Параллелизм — это возможность делать несколько вещей одновременно.
Объяснение к «Заметкам по изучению языка го» учителя Юхэня:
Параллелизм относится к способности логически обрабатывать несколько задач одновременно; параллелизм относится к физическому выполнению нескольких задач одновременно.
Согласно книге «Параллелизм в Go», концепция компьютера является результатом абстракции, и параллелизм и параллелизм не являются исключением. Он описывает разницу между параллелизмом и параллелизмом следующим образом:
Concurrency is a property of the code; parallelism is a property of the running program.
Параллелизм — это свойство кода, а параллелизм — это свойство запущенных программ. Не обращайте внимания на мой плохой перевод. Это новинка, не так ли? Такое заявление я вижу впервые, и, если подумать, оно вполне резонно.
Мы постоянно говорим, что написанный код является параллельным или параллельным, но какие гарантии мы можем предоставить? Если вы запустите параллельный код на машине только с одним ядром, будет ли он все еще параллельным? Каким бы талантливым вы ни были, вы не можете писать параллельные программы. В лучшем случае код выглядит «параллельным», и все.
Конечно, на первый взгляд это кажется параллельным, но это всего лишь уловка ЦП.Несколько потоков делят ресурсы ЦП в режиме разделения времени, что кажется «параллельным» в грубом временном интервале.
Итак, на самом деле мы можем писать только «параллельный», а не «параллельный» код, и мы просто хотим, чтобы параллельный код выполнялся параллельно. Возможность распараллеливания параллельного кода зависит от уровня абстракции: примитивы параллелизма в коде, среда выполнения, операционная система (виртуальная машина, контейнер). Уровень становится все ниже и ниже, а требования все выше и выше. Поэтому, когда мы говорим о параллелизме или параллелизме, мы фактически указываем контекст, то есть уровень абстракции.
Пример приведен в книге «Параллелизм в Go»: если два человека одновременно откроют программу-калькулятор на компьютере, то две программы точно не повлияют друг на друга, что является параллельным. В этом примере контекст — это два человека-машины, а два вычислительных процесса — параллельные элементы.
По мере снижения уровня абстракции модель параллелизма на самом деле становится сложнее и важнее, и чем ниже уровень параллелизма, тем важнее он для нас. Чтобы параллельные программы выполнялись правильно, необходимо углубиться в модель параллелизма.
Перед выпуском языка Go мы писали параллельный код с учетом самой низкой абстракции: системных потоков. После выпуска Go к этой цепочке абстракций добавилась еще одна горутина. И Go позаимствовал концепцию у известного ученого-компьютерщика Тони Хоара: канал. Тони Хоар — автор знаменитой статьи «Сообщение о последовательных процессах».
Похоже, что все становится сложнее, поскольку Go вводит абстракцию более низкого уровня, но это не так. Поскольку горутина не так абстрактна, как кажется, на самом деле она является заменой системным потокам. Когда Gopher пишет код, он не заботится о системных потоках, большую часть времени ему нужно учитывать только горутины и каналы. Конечно, иногда используются некоторые понятия общей памяти, обычно относящиеся к вещам в пакете синхронизации, например sync.Mutex.
Что такое CSP
CSP часто называют ключевым фактором успеха Go в параллельном программировании. Полное название CSP — «Communicating Sequential Processes», это также статья Тони Хоара, опубликованная в ACM в 1978 году. В документе указывается, что язык программирования должен обращать внимание на примитивы ввода и вывода, особенно код параллельного программирования.
В то время, когда была опубликована эта статья, люди изучали идею модульного программирования, и самым горячим вопросом в то время был вопрос о том, использовать операторы goto или нет. В то время идея объектно-ориентированного программирования была на подъеме, и параллельное программирование почти никого не заботило.
В статье CSP также является настраиваемым языком программирования, и автор определяет операторы ввода и вывода для связи между процессами. Считается, что процессы управляются вводом и производят выходные данные для потребления другими процессами.Процессы могут быть процессами, потоками или даже блоками кода. Входная команда: !, используется для записи в процессы, вывод: ?, используется для чтения из процессов. Канал, о котором пойдет речь в этой статье, использует этот дизайн.
Хоар также предложил команду ->, если оператор слева от -> возвращает false, оператор справа от него не будет выполнен.
С помощью этих команд ввода и вывода Хоар доказал, что если общение между процессами является первоочередной задачей в языке программирования, то проблема параллельного программирования становится простой.
Go был первым языком, который представил эти идеи из CSP и развил их. Хотя синхронизация доступа к памяти (изначально — синхронизация доступа к памяти) в некоторых случаях очень полезна, в Go тоже есть соответствующий пакет синхронизации, но он очень подвержен ошибкам в больших программах.
Go с самого начала включил идею CSP в ядро языка, поэтому параллельное программирование становится уникальным преимуществом Go, и его легко понять.
Модель параллельного программирования большинства языков программирования основана на потоках и управлении доступом к синхронизации памяти, модель параллельного программирования Go заменена горутинами и каналами. Горутины похожи на потоки, а каналы — на мьютексы (для управления доступом к синхронизации памяти).
Горутина освобождает программистов и позволяет нам думать о проблемах ближе к бизнесу. Вместо того, чтобы рассматривать всевозможные утомительные низкоуровневые проблемы, такие как библиотеки потоков, накладные расходы потоков, планирование потоков и т. д., горутина создана, чтобы решать их за вас.
Каналы по своей природе комбинируются с другими каналами. Мы можем направить каналы, которые собирают результаты различных подсистем, в один и тот же канал. Канал также можно комбинировать с выбором, отменой, тайм-аутом. А у мьютекса этих функций нет.
Принцип параллелизма в Go очень хорош, а цель проста: максимально использовать каналы, относиться к горутинам как к бесплатным ресурсам и использовать их небрежно.
Чтобы объяснить, содержание первых двух частей взято из английской книги с открытым исходным кодом «Concurrency In Go», которую настоятельно рекомендуется прочитать.
Введение окончено, мы официально начинаем сегодняшний главный герой: канал.
что такое канал
Горутина и канал — два краеугольных камня параллельного программирования на языке Go. Горутины используются для выполнения параллельных задач, а каналы используются для синхронизации и связи между горутинами.
Канал устанавливает конвейер между gouroutines, передает данные в конвейере и реализует связь между gouroutines; поскольку он потокобезопасен, его очень удобно использовать; канал также обеспечивает характеристики «первым пришел — первым обслужен»; также влияет на блокировку и пробуждение горутин.
Я полагаю, вы, должно быть, видели предложение:
Do not communicate by sharing memory; instead, share memory by communicating.
Не общайтесь, делясь памятью, делитесь памятью, общаясь.
Это философия параллелизма Go, которая опирается на модель CSP и реализуется на основе каналов.
Я просто запутался, разве эти два предложения не означают одно и то же?
По содержанию первых двух разделов я лично понимаю это предложение: первая половина предложения — о параллельном программировании через некоторые компоненты в пакете синхронизации; вторая половина предложения — о том, что Go рекомендует использовать каналы для параллельного программирования. И то, и другое действительно необходимо и эффективно. На самом деле, прочитав анализ исходного кода канала после прочтения этой статьи, вы обнаружите, что нижний уровень канала предназначен для управления параллелизмом через мьютекс. Просто каналы представляют собой примитивы параллельного программирования более высокого уровня, которые инкапсулируют больше функций.
Что касается выбора базовых примитивов параллельного программирования в пакете синхронизации или канале, в главе 2 «Философия параллелизма Go» в книге «Параллелизм в Go» есть дерево решений и подробное обсуждение, и я рекомендую вам прочитать ее еще раз. . Я выложил картинку:
канал реализует CSP
Канал — очень важный тип в языке Go и первый объект в Go. Через каналы Go реализует совместное использование памяти посредством связи. Каналы являются важным средством передачи данных и синхронизации между несколькими горутинами.
Использование атомарных функций и блокировок чтения-записи может обеспечить безопасный общий доступ к ресурсам, но использование каналов более элегантно.
channel буквально означает «канал», подобно каналу в Linux. Синтаксис объявления канала следующий:
chan T // 声明一个双向通道
chan<- T // 声明一个只能用于发送的通道
<-chan T // 声明一个只能用于接收的通道
объявление одностороннего канала, с<-
чтобы указать, что он указывает направление канала. Пока вы понимаете, что код написан в порядке слева направо, вы можете сразу уловить направление канала.
Поскольку канал является ссылочным типом, его значение равно nil до его инициализации, а канал инициализируется с помощью функции make. Вы можете передать ему значение int, представляющее размер (емкость) буфера канала, и будет построен буферизованный канал; если не передано или 0, построен небуферизованный канал.
Между ними есть некоторые различия: небуферизованный канал не может буферизовать элементы, а определенная последовательность операций на нем такова: «отправить->получить->отправить->получить->...», если 2 элемента отправляются на небуферизованный канал. чан подряд, а если не получен, то блокируется второй раз, для работы буферизованного канала его надо "расслабить", ведь у него есть "буферизирующий" ореол.
почему канал
Go реализует модель связи CSP через каналы, которые в основном используются для передачи сообщений и уведомлений о событиях между горутинами.
С каналами и горутинами параллельное программирование в Go становится чрезвычайно простым и безопасным, что позволяет программистам сосредоточиться на бизнесе и повысить эффективность разработки.
Знайте, что технологии — это не самое главное, это всего лишь инструмент для ведения бизнеса. Эффективный язык разработки позволяет сэкономить время для более важных вещей, таких как написание статей.
Принцип реализации канала
Операции отправки и получения в chan транслируются в базовые функции отправки и получения во время компиляции.
Каналы делятся на два типа: с буферизацией и без буферизации. Операции с небуферизованными каналами на самом деле можно рассматривать как «синхронный режим», а операции с буферизацией — как «асинхронный режим».
В синхронном режиме отправитель и получатель должны быть готовы синхронно, и данные могут передаваться между ними только тогда, когда готовы оба (как вы увидите позже, это фактически копия памяти). В противном случае, если какая-либо из сторон выполнит операцию отправки или получения первой, она будет приостановлена и может быть разбужена только появлением другой стороны.
В асинхронном режиме операции отправки и приема могут выполняться гладко, пока доступен слот буфера (с оставшейся емкостью). В противном случае одна сторона операции (например, запись) также будет приостановлена и не активируется до тех пор, пока не будет выполнена противоположная операция (например, получение).
Подытожим: в синхронном режиме отправитель и получатель должны быть сопряжены для успешной операции, иначе она будет заблокирована, в асинхронном режиме операция будет успешной, только если в слоте буфера осталась свободная емкость, иначе она также будет заблокирован.
структура данных
Перейдите непосредственно к исходному коду (версия 1.9.2):
type hchan struct {
// chan 里元素数量
qcount uint
// chan 底层循环数组的长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被关闭的标志
closed uint32
// chan 中元素类型
elemtype *_type // element type
// 已发送元素在循环数组中的索引
sendx uint // send index
// 已接收元素在循环数组中的索引
recvx uint // receive index
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters
// 保护 hchan 中所有字段
lock mutex
}
Значение полей написано в комментариях, и остановимся на нескольких полях:
buf
Указывает на базовый кольцевой массив, доступный только для буферизованных каналов.
sendx
,recvx
Оба указывают на базовый круговой массив, указывая текущие значения индекса позиции элемента (относительно базового массива), которые можно отправлять и получать.
sendq
,recvq
Соответственно представлять заблокированные горутины, которые заблокированы из-за попытки чтения или отправки данных в канал.
waitq
даsudog
двусвязный список изsudog
На самом деле это оболочка горутины:
type waitq struct {
first *sudog
last *sudog
}
lock
Используется для обеспечения того, чтобы каждая операция чтения или записи канала была атомарной.
Например, создайте структуру данных канала с емкостью 6 и элементом типа int следующим образом:
Создайте
Мы знаем, что у канала есть два направления: отправка и получение. Теоретически мы можем создать канал, который только отправляет или только получает, но после того, как этот канал будет создан, как им пользоваться? Канал, который может только отправлять, как его получить? Аналогично канал, который может только получать, как на него отправлять данные?
В общем, используйтеmake
Создайте канал, который может отправлять и получать:
// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)
пройти черезкомпиляцияАнализ, мы знаем, что функция, которая в конечном итоге создает чан,makechan
:
func makechan(t *chantype, size int64) *hchan
Из прототипа функции созданный chan является указателем. Таким образом, мы можем передавать каналы напрямую между функциями без передачи указателя на канал.
В частности, посмотрите на код:
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// 省略了检查 channel size,align 的代码
// ……
var c *hchan
// 如果元素类型不含指针 或者 size 大小为 0(无缓冲类型)
// 只进行一次内存分配
if elem.kind&kindNoPointers != 0 || size == 0 {
// 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
// 只分配 "hchan 结构体大小 + 元素大小*个数" 的内存
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
// 如果是缓冲型 channel 且元素大小不等于 0(大小等于 0的元素类型:struct{})
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
// 1. 非缓冲型的,buf 没用,直接指向 chan 起始地址处
// 2. 缓冲型的,能进入到这里,说明元素无指针且元素类型为 struct{},也无影响
// 因为只会用到接收和发送游标,不会真正拷贝东西到 c.buf 处(这会覆盖 chan的内容)
c.buf = unsafe.Pointer(c)
}
} else {
// 进行两次内存分配操作
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
// 循环数组长度
c.dataqsiz = uint(size)
// 返回 hchan 指针
return c
}
После создания нового чан память выделяется в куче, которая выглядит так:
Чтобы объяснить, это изображение взято из PPT на Gopher Con, см. Ссылки для адреса. Этот материал очень понятен и прост для понимания, и я рекомендую вам прочитать его.
Далее давайте воспользуемся примером из справочника [глубоко в нижнем слое каналов], чтобы понять весь процесс создания, отправки и получения.
func goroutineA(a <-chan int) {
val := <- a
fmt.Println("G1 received data: ", val)
return
}
func goroutineB(b <-chan int) {
val := <- b
fmt.Println("G2 received data: ", val)
return
}
func main() {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
ch <- 3
time.Sleep(time.Second)
}
Сначала создайте небуферизованный канал, затем запустите две горутины и передайте в них ранее созданный канал. Затем отправьте данные 3 на этот канал и, наконец, засните на 1 секунду, а затем программа выйдет.
Строка 14 программы создает небуферизованный канал.Мы смотрим только на некоторые важные поля в структуре chan, чтобы увидеть состояние chan в целом.В начале ничего нет:
перенимать
Прежде чем продолжить анализ примеров из предыдущих разделов, давайте взглянем на исходный код, связанный с получением. После того, как конкретный процесс получения ясен, легко понять конкретные примеры.
Существует два способа записи операции приема: один с «ок», который отражает, закрыт ли канал, другой без «ок», таким образом, при получении нулевого значения соответствующего типа невозможно знать, что оно отправлено реальным отправителем.Значение или нулевое значение типа по умолчанию возвращается получателю после закрытия канала. Оба метода записи имеют свои сценарии применения.
После обработки компилятором эти два метода записи окончательно соответствуют двум функциям в исходном коде:
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
chanrecv1
Функция обрабатывает случай без "ok",chanrecv2
Затем он отразит, закрыт ли канал, вернув поле «получено». Полученное значение является специальным и будет "подставлено" в параметрelem
Указанный адрес очень похож на то, как он написан на C/C++. Если полученное значение опущено в коде, elem здесь равен нулю.
Во всяком случае, в конце концов обратился кchanrecv
функция:
// 位于 src/runtime/chan.go
// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 省略 debug 内容 …………
// 如果是一个 nil 的 channel
if c == nil {
// 如果不阻塞,直接返回 (false, false)
if !block {
return
}
// 否则,接收一个 nil 的 channel,goroutine 挂起
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
// 不会执行到这里
throw("unreachable")
}
// 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
// 当我们观察到 channel 没准备好接收:
// 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
// 2. 缓冲型,但 buf 里没有元素
// 之后,又观察到 closed == 0,即 channel 未关闭。
// 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
// 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// channel 已关闭,并且循环数组 buf 里没有元素
// 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
// 也就是说即使是关闭状态,但在缓冲型的 channel,
// buf 里有元素的情况下还能接收到元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
// 解锁
unlock(&c.lock)
if ep != nil {
// 从一个已关闭的 channel 执行接收操作,且未忽略返回值
// 那么接收的值将是一个该类型的零值
// typedmemclr 根据类型清理相应地址的内存
typedmemclr(c.elemtype, ep)
}
// 从一个已关闭的 channel 接收,selected 会返回true
return true, false
}
// 等待发送队列里有 goroutine 存在,说明 buf 是满的
// 这有可能是:
// 1. 非缓冲型的 channel
// 2. 缓冲型的 channel,但 buf 满了
// 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
// 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 缓冲型,buf 里有元素,可以正常接收
if c.qcount > 0 {
// 直接从循环数组里找到要接收的元素
qp := chanbuf(c, c.recvx)
// …………
// 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循环数组里相应位置的值
typedmemclr(c.elemtype, qp)
// 接收游标向前移动
c.recvx++
// 接收游标归零
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf 数组里的元素个数减 1
c.qcount--
// 解锁
unlock(&c.lock)
return true, true
}
if !block {
// 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
unlock(&c.lock)
return false, false
}
// 接下来就是要被阻塞的情况了
// 构造一个 sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 待接收数据的地址保存下来
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
// 进入channel 的等待接收队列
c.recvq.enqueue(mysg)
// 将当前 goroutine 挂起
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// 被唤醒了,接着从这里继续执行一些扫尾工作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
Вышеприведенный код более подробно прокомментирован, вы можете посмотреть исходный код построчно, давайте рассмотрим его подробно.
-
Если канал имеет нулевое значение (nil), в неблокирующем режиме он вернется напрямую. В режиме блокировки функция gopark будет вызываться для приостановки горутины, которая заблокируется навсегда. Потому что, когда канал нулевой, если вы хотите не блокировать, вы можете только закрыть его, но закрытие нулевого канала снова вызовет панику, поэтому шансов разбудить нет. Подробнее можно посмотреть в функции closechan.
-
Как и в случае с функцией отправки, мы затем реализуем операцию, которая быстро обнаруживает сбой и возвращает в неблокирующем режиме без получения блокировки. Кстати, когда мы обычно пишем код, мы находим какие-то граничные условия и быстро возвращаемся, что может сделать логику кода более понятной, потому что следующая нормальная ситуация все меньше и больше фокусируется, а люди, читающие код, могут больше фокусироваться. основная логика кода.
// 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回 (false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
Когда мы наблюдаем, что канал не готов к приему:
- Небуферизованный, никакие горутины не ждут в очереди ожидания отправки.
- Буферизовано, но в buf нет элементов
После этого наблюдается закрыто == 0, то есть канал не закрыт.
Поскольку канал не может быть открыт повторно, канал не закрывается во время предыдущего наблюдения, поэтому в этом случае можно напрямую объявить о сбое приема и быстро вернуть его. Поскольку он не выбран и данные не получены, возвращаемое значение равно (false, false).
-
В следующей операции сначала будет размещена блокировка, а степень детализации относительно велика. Если канал закрыт и массив цикла buf не имеет элементов. В соответствии со случаем небуферизованного закрытия и буферизованного закрытия, но buf не имеет элементов, возвращается нулевое значение соответствующего типа, но полученный флаг равен false, что сообщает вызывающей стороне, что канал был закрыт, и значение, которое вы вынул не был нормально отправлен отправителем.данные. Но если в контексте выбора, этот случай выбран. Здесь представлено множество сценариев, в которых каналы используются в качестве сигналов уведомления.
-
Затем, если есть очередь, ожидающая отправки, канал заполнен, либо небуферизованный канал, либо буферизованный канал, но буфер заполнен. В обоих случаях данные могут быть получены нормально.
Итак, вызовите функцию recv:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是非缓冲型的 channel
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
// 未忽略接收的数据
if ep != nil {
// 直接拷贝数据,从 sender goroutine -> receiver goroutine
recvDirect(c.elemtype, sg, ep)
}
} else {
// 缓冲型的 channel,但 buf 已满。
// 将循环数组 buf 队首的元素拷贝到接收数据的地址
// 将发送者的数据入队。实际上这时 revx 和 sendx 值相等
// 找到接收游标
qp := chanbuf(c, c.recvx)
// …………
// 将接收游标处的数据拷贝给接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将发送者数据拷贝到 buf
typedmemmove(c.elemtype, qp, sg.elem)
// 更新游标值
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
// 解锁
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒发送的 goroutine。需要等到调度器的光临
goready(gp, skip+1)
}
Если он не буферизован, он копируется непосредственно из стека отправителя в стек получателя.
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// dst is on our stack or the heap, src is on another stack.
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
В противном случае это буферизованный канал и буфер заполнен. Это означает, что курсор-отправитель и курсор-получатель перекрываются, поэтому сначала нужно найти курсор-получатель:
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
Скопируйте туда элемент на адрес получателя. Затем скопируйте данные, которые должны быть отправлены отправителем, в принимающий курсор. На этом операция приема данных и отправки данных завершена. Затем переместите курсоры отправки и получения на единицу соответственно и начните с 0, если происходит «зацикливание».
Наконец, выньте goroutine в sudog, вызовите goready, чтобы изменить его состояние на «runnable», подождите, пока отправитель проснется, и подождите, пока планировщик запланирует.
-
Тогда, если в бафе канала еще есть данные, значит, их можно нормально принимать. Обратите внимание, что сюда можно зайти, даже когда канал закрыт. Этот шаг относительно прост.Обычно данные в принимающем курсоре в buf копируются на адрес, по которому данные были получены.
-
На последнем этапе ситуация, которая приходит сюда, собирается заблокироваться. Конечно, если значение, переданное блоком, ложно, он не будет блокироваться и просто вернется напрямую.
Сначала создайте sudog, а затем сохраните различные значения. Обратите внимание, что здесь будет храниться адрес полученных данных.elem
поле, при пробуждении полученные данные будут сохранены по адресу, на который указывает это поле. Затем добавьте sudog в очередь recvq канала. Вызовите функцию goparkunlock, чтобы приостановить горутину.
Следующий код — это всякие доделки после пробуждения горутины.
Продолжим предыдущий пример. Как упоминалось ранее, строка 14 создает небуферизованный канал, затем строки 15 и 16 соответственно создают горутину и выполняют операцию приема каждая. Из предыдущего анализа исходного кода мы знаем, что эти две горутины (позже называемые G1 и G2) будут заблокированы в операции получения. G1 и G2 будут висеть в очереди recq канала, образуя двойной круговой связанный список.
До строки 17 программы общая структура данных chan выглядит следующим образом:
buf
Указывает на массив длины 0, qcount равен 0, что указывает на отсутствие элементов в канале. Фокусrecvq
иsendq
, они являются структурами waitq, а waitq на самом деле является двусвязным списком.Элементом связанного списка является sudog, который содержитg
поле,g
Представляет горутину, поэтому sudog можно рассматривать как горутину. recvq хранит горутины, которые пытаются прочитать канал, но заблокированы, а sendq хранит горутины, которые пытаются писать в канал, но заблокированы.
На данный момент мы видим, что в recvq висят две горутины, а именно G1 и G2, которые были запущены ранее. G1 и G2 заблокированы, потому что нет горутины для приема, а канал не буферизован. В sendq нет заблокированных горутин.
recvq
Структура данных следующая. Вот прямая ссылка на картинку в статье, которая использует трехмерные элементы и хорошо прорисована:
Давайте посмотрим на состояние тян в это время в целом:
G1 и G2 приостановлены, и статусWAITING
. Планировщик горутин сегодня не в центре внимания, конечно, статьи по теме обязательно будут написаны позже. Позвольте мне кратко сказать здесь, что горутины — это сопрограммы пользовательского режима, управляемые средой выполнения Go, в отличие от потоков ядра, которыми управляет ОС. Горутины более легкие, поэтому мы можем легко создать десятки тысяч горутин.
Поток ядра может управлять несколькими горутинами.Когда одна из горутин заблокирована, поток ядра может запланировать выполнение других горутин, а сам поток ядра не будет блокироваться. Это то, что мы обычно говоримM:N
Модель:
M:N
Модель обычно состоит из трех частей: M, P, G. M — это поток ядра, отвечающий за запуск горутины; P — контекст, который сохраняет контекст, необходимый для запуска горутины, а также поддерживает список готовых к выполнению горутин; G — горутина, которую нужно запустить. M и P являются базисами, на которых работает G.
Вернитесь к примеру. Предположим, что у нас есть только одно M, когда G1 (go goroutineA(ch)
) бежать кval := <- a
, он переходит из исходного состояния выполнения в состояние ожидания (результат после вызова gopark):
G1 покидает отношения с M, но планировщик не позволит M бездействовать, поэтому он затем запланирует запуск другой горутины:
У G2 такой же опыт. Теперь и G1, и G2 приостановлены, ожидая, пока отправитель отправит данные в канал, прежде чем их можно будет спасти.
Отправить
В приведенном выше примере и G1, и G2 теперь находятся в очереди recvq.
ch <- 3
Строка 17 отправляет элемент 3 в канал.
Операция отправки в конечном итоге преобразуется вchansend
Функция, непосредственно в исходном коде, также в основном аннотирована, и вы можете понять основной процесс:
// 位于 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果 channel 是 nil
if c == nil {
// 不能阻塞,直接返回 false,表示未发送成功
if !block {
return false
}
// 当前 goroutine 被挂起
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
// 省略 debug 相关……
// 对于不阻塞的 send,快速检测失败场景
//
// 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
// 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
// 2. channel 是缓冲型的,但循环数组已经装满了元素
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 锁住 channel,并发安全
lock(&c.lock)
// 如果 channel 关闭了
if c.closed != 0 {
// 解锁
unlock(&c.lock)
// 直接 panic
panic(plainError("send on closed channel"))
}
// 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 对于缓冲型的 channel,如果还有缓冲空间
if c.qcount < c.dataqsiz {
// qp 指向 buf 的 sendx 位置
qp := chanbuf(c, c.sendx)
// ……
// 将数据从 ep 处拷贝到 qp
typedmemmove(c.elemtype, qp, ep)
// 发送游标值加 1
c.sendx++
// 如果发送游标值等于容量值,游标值归 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 缓冲区的元素数量加一
c.qcount++
// 解锁
unlock(&c.lock)
return true
}
// 如果不需要阻塞,则直接返回错误
if !block {
unlock(&c.lock)
return false
}
// channel 满了,发送方会被阻塞。接下来会构造一个 sudog
// 获取当前 goroutine 的指针
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 当前 goroutine 进入发送等待队列
c.sendq.enqueue(mysg)
// 当前 goroutine 被挂起
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// 从这里开始被唤醒了(channel 有机会可以发送了)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 被唤醒后,channel 关闭了。坑爹啊,panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 去掉 mysg 上绑定的 channel
mysg.c = nil
releaseSudog(mysg)
return true
}
Приведенный выше код более подробно прокомментирован, давайте рассмотрим его подробнее.
-
Если канал окажется пустым, текущая горутина будет приостановлена.
-
Для неблокирующей операции отправки, если канал не закрыт и нет лишнего буферного пространства (Примечание: а. канал небуферизован, и в очереди ожидания на получение нет горутины; б. канал буферизован, но круговой массив состоит из полных элементов)
Для этого в исходном коде рантайма есть много комментариев. Это суждение предназначено для быстрого обнаружения сбоя отправки без блокировки отправки, чтобы быстро вернуться.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
Комментарий в основном говорит о том, почему эту штуку можно разблокировать, и я объясню это подробно.if
Сначала в условии считываются две переменные: block и c.closed. block является параметром функции и не изменится, c.closed может быть изменен другими горутинами, так как блокировки нет, это первые два выражения условия "и".
В последнем пункте участвуют три переменные: c.dataqsiz, c.recvq.first, c.qcount.c.dataqsiz == 0 && c.recvq.first == nil
Ссылается на небуферизованный канал, и в recvq нет горутины, ожидающей получения;c.dataqsiz > 0 && c.qcount == c.dataqsiz
Ссылается на буферизованный канал, но циклический массив заполнен. здесьc.dataqsiz
На самом деле не будет модифицирован, на момент создания были определены. Разблокировать реальное воздействиеc.qcount
иc.recvq.first
.
Условий для этой части два.word-sized read
, который должен читать две словесные операции:c.closed
иc.recvq.first
(небуферизованный) илиc.qcount
(буферный тип).
когда мы нашлиc.closed == 0
верно, то есть канал не был закрыт, и при обнаружении состояния третьей части наблюдается, чтоc.recvq.first == nil
илиc.qcount == c.dataqsiz
когда (здесь игнорируетсяc.dataqsiz
), делается вывод, что эту операцию отправки следует рассматривать как неудачную, и быстро возвращается значение false.
Здесь задействованы два наблюдения: канал не закрыт, канал не готов к отправке. Оба они приведут к противоречивым наблюдениям из-за отсутствия блокировок. Например, я сначала заметил, что канал не закрыт, а потом заметил, что канал не готов к отправке, в это время я думал, что условие if может быть выполнено, но если c.closed становится 1 в это время, значит условие не выполнено.кто тебе дал разлочить!
Однако, поскольку закрытый канал не может изменить состояние канала с «готов к отправке» на «не готов к отправке», канал не закрывается, когда я наблюдаю «не готов к отправке». Несмотря на тоc.closed == 1
, то есть между этими двумя наблюдениями канал закрыт, что также означает, что между этими двумя наблюдениями канал удовлетворяет двум условиям:not closed
иnot ready for sending
, в настоящее время для меня не проблема вернуть false напрямую.
Эта часть объяснения довольно запутана, на самом деле, цель этого — получить меньше блокировок и повысить производительность.
-
Если он обнаружит, что канал был закрыт, немедленно паникуйте.
-
Если вы можете удалить sudog (представляющий горутину) из очереди ожидания recvq, это означает, что канал пуст и в нем нет элементов, поэтому будут ожидающие получатели. В это время будет вызвана функция отправки, чтобы скопировать элементы непосредственно из стека отправителя в стек получателя.
sendDirect
функция завершена.
// send 函数处理向一个空的 channel 发送操作
// ep 指向被发送的元素,会被直接拷贝到接收的 goroutine
// 之后,接收的 goroutine 会被唤醒
// c 必须是空的(因为等待队列里有 goroutine,肯定是空的)
// c 必须被上锁,发送操作执行完后,会使用 unlockf 函数解锁
// sg 必须已经从等待队列里取出来了
// ep 必须是非空,并且它指向堆或调用者的栈
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 省略一些用不到的
// ……
// sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
if sg.elem != nil {
// 直接拷贝内存(从发送者到接收者)
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// sudog 上绑定的 goroutine
gp := sg.g
// 解锁
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒接收的 goroutine. skip 和打印栈相关,暂时不理会
goready(gp, skip+1)
}
Продолжай смотретьsendDirect
функция:
// 向一个非缓冲型的 channel 发送数据、从一个无元素的(非缓冲型或缓冲型但空)的 channel
// 接收数据,都会导致一个 goroutine 直接操作另一个 goroutine 的栈
// 由于 GC 假设对栈的写操作只能发生在 goroutine 正在运行中并且由当前 goroutine 来写
// 所以这里实际上违反了这个假设。可能会造成一些问题,所以需要用到写屏障来规避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 在当前 goroutine 的栈上,dst 是另一个 goroutine 的栈
// 直接进行内存"搬迁"
// 如果目标地址的栈发生了栈收缩,当我们读出了 sg.elem 后
// 就不能修改真正的 dst 位置的值了
// 因此需要在读和写之前加上一个屏障
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
Это включает операцию одной горутины, непосредственно записывающей стек другой горутины.Вообще говоря, стеки разных горутин уникальны друг для друга. И это также нарушает некоторые допущения GC. Во избежание проблем в процессе записи добавляется барьер записи, чтобы гарантировать правильное завершение операции записи. Преимущество этого заключается в том, что одна копия в памяти уменьшается: нет необходимости сначала копировать buf в канал, отправитель напрямую в получатель, нет посредника, чтобы иметь значение, эффективность повышается, и это идеально .
Затем разблокируйте, разбудите приемник, дождитесь прибытия планировщика, и приемник снова увидит солнце и сможет продолжить выполнение кода после операции приема.
- если
c.qcount < c.dataqsiz
, указывающий, что буфер доступен (это должен быть буферизованный канал). Сначала используйте функцию, чтобы получить позицию, в которой должен находиться отправляемый элемент:
qp := chanbuf(c, c.sendx)
// 返回循环队列里第 i 个元素的地址处
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
c.sendx
Указывает на позицию следующего элемента для отправки в циклическом массиве, а затем вызываетtypedmemmove
Функция копирует его в массив цикла. послеc.sendx
Добавьте 1 к общему количеству элементов на 1:c.qcount++
и, наконец, разблокировать и вернуться.
-
Если вышеуказанные условия не выполняются, канал заполнен. Независимо от того, буферизован канал или нет, отправитель «закрыт» (горутина заблокирована). Если блок ложный, разблокируйте напрямую и верните ложь.
-
Последний случай — это тот случай, когда вас действительно нужно заблокировать. Сначала создайте sudog и поставьте его в очередь (поле sendq канала). тогда позвони
goparkunlock
Приостановите текущую горутину, разблокируйте ее и дождитесь подходящего времени, чтобы проснуться.
После пробуждения изgoparkunlock
Следующая строка кода запускается и продолжает выполняться.
Здесь есть некоторые операции привязки, sudog связывает горутину через поле g, а горутина связывает sudog через ожидание, а sudog также связывает черезelem
поле связывает адрес элемента для отправки, иc
Полевые привязки здесь «изрыты» в канале.
Поэтому адрес отправляемого элемента фактически хранится в структуре sudog, которая является текущей горутиной.
Ну, читайте исходный код. Давайте разберем пример дальше. Я думаю, что все почти забыли пример. Я снова вставлю код:
func goroutineA(a <-chan int) {
val := <- a
fmt.Println("goroutine A received data: ", val)
return
}
func goroutineB(b <-chan int) {
val := <- b
fmt.Println("goroutine B received data: ", val)
return
}
func main() {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
ch <- 3
time.Sleep(time.Second)
ch1 := make(chan struct{})
}
В разделе отправки мы сказали, что G1 и G2 теперь приостановлены, ожидая, пока отправитель их спасет. В строке 17 основная сопрограмма отправляет элемент 3 в ch, посмотрим, что будет дальше.
По результатам предыдущего анализа исходного кода мы знаем, что когда отправитель обнаруживает, что в recvq ch есть получатель, ожидающий приема, он удаляет из очереди sudog, «рекомендует» sudo первого указателя в recvq и добавьте его в очередь горутин Runnable P.
Затем отправитель копирует отправляющий элемент на адрес элемента sudog и, наконец, вызывает goready, чтобы разбудить G1, и состояние становится работоспособным.
Когда планировщик посещает G1, он переводит G1 в рабочее состояние и выполняет следующий код goroutineA. G представляет другие возможные горутины.
На самом деле это включает в себя операцию, когда одна сопрограмма записывает другой стек сопрограмм. На одной стороне канала с нетерпением ждут два получателя. В это время отправитель на другой стороне канала готов отправить данные в канал. Чтобы быть эффективным, нет необходимости «передавать» данные через буфер канала один раз, и копировать данные непосредственно с адреса источника в канал.Адрес назначения может быть, и эффективность высока!
Вышеприведенное изображение представляет собой схематическую диаграмму,3
Будет скопировано в место в стеке G1, которое является адресом val, и сохранено в поле elem.
закрытие
Закрытие канала выполнит функциюclosechan
:
func closechan(c *hchan) {
// 关闭一个 nil channel,panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 上锁
lock(&c.lock)
// 如果 channel 已经关闭
if c.closed != 0 {
unlock(&c.lock)
// panic
panic(plainError("close of closed channel"))
}
// …………
// 修改关闭状态
c.closed = 1
var glist *g
// 将 channel 所有等待接收队列的里 sudog 释放
for {
// 从接收队列里出队一个 sudog
sg := c.recvq.dequeue()
// 出队完毕,跳出循环
if sg == nil {
break
}
// 如果 elem 不为空,说明此 receiver 未忽略接收数据
// 给它赋一个相应类型的零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 取出 goroutine
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
// 相连,形成链表
gp.schedlink.set(glist)
glist = gp
}
// 将 channel 等待发送队列里的 sudog 释放
// 如果存在,这些 goroutine 将会 panic
for {
// 从发送队列里出队一个 sudog
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 发送者会 panic
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
// 形成链表
gp.schedlink.set(glist)
glist = gp
}
// 解锁
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 遍历链表
for glist != nil {
// 取最后一个
gp := glist
// 向前走一步,下一个唤醒的 g
glist = glist.schedlink.ptr()
gp.schedlink = 0
// 唤醒相应 goroutine
goready(gp, 3)
}
}
Логика закрытия относительно проста: для канала заблокированный отправитель и получатель хранятся в recvq и sendq соответственно. После закрытия канала для ожидающего приемника принимается нулевое значение соответствующего типа. Для ожидающих отправителей он будет паниковать напрямую. Поэтому нельзя опрометчиво закрывать канал, не зная, есть у канала приемник или нет.
Функция закрытия сначала блокирует большой замок, затем соединяет всех отправителей и получателей на этом канале в связанный список sudog, а затем разблокирует их. Наконец, снова разбудите все судоги.
Что делать после пробуждения. Отправитель продолжит выполнение кода после функции goparkunlock в функции chansend.К сожалению, он обнаруживает, что канал был закрыт, паника. Приемнику повезло больше, после доделки он возвращается. Здесь selected возвращает true, а полученное возвращаемое значение возвращает разные значения в зависимости от того, закрыт ли канал. получено ложно, если канал закрыт, и истинно в противном случае. В данном случае мы проанализировали, полученное возвращает false.
канал продвинутый
Подведем итоги работы канала:
действовать | nil channel | closed channel | not nil, not closed channel |
---|---|---|---|
close | panic | panic | изящное завершение работы |
читать | блокировать | прочитать нулевое значение соответствующего типа | Блокировать или читать данные в обычном режиме. Буферизованный канал пуст или небуферизованный канал блокируется, когда нет ожидающих отправителей. |
написать ч | блокировать | panic | Блокируйте или записывайте данные в обычном режиме. Небуферизованные каналы блокируются, когда не ждут приемников или когда буфер буферизованного канала заполнен. |
Подводя итог, можно сказать, что существует три ситуации, в которых возникает паника: запись в закрытый канал, закрытие нулевого канала, повторное закрытие канала.
Чтение и запись в нулевой канал будут заблокированы.
Характер отправки и получения элементов
В чем суть отправляющих и принимающих элементов канала? На справочный материал [глубоко в низ канала] отвечают так:
Remember all transfer of value on the go channels happens with the copy of value.
Другими словами, операции отправки и получения канала по сути являются «копией значения», будь то из стека горутины отправителя в chan buf, или из chan buf в горутину получателя, или непосредственно из горутины отправителя в получатель. горутина.
Вот еще один пример из текста, и я добавлю более подробное объяснение. Кстати, это англоязычный блог, который хорошо написан. Там нет такого большого анализа исходного кода, как наша статья. Он разделяет ситуацию в коде и описывает ее отдельно. У каждого есть свои преимущества и недостатки. Рекомендуется читать оригинальный текст, впечатления от чтения лучше.
type user struct {
name string
age int8
}
var u = user{name: "Ankur", age: 25}
var g = &u
func modifyUser(pu *user) {
fmt.Println("modifyUser Received Vaule", pu)
pu.name = "Anand"
}
func printUser(u <-chan *user) {
time.Sleep(2 * time.Second)
fmt.Println("printUser goRoutine called", <-u)
}
func main() {
c := make(chan *user, 5)
c <- g
fmt.Println(g)
// modify g
g = &user{name: "Ankur Anand", age: 100}
go printUser(c)
go modifyUser(g)
time.Sleep(5 * time.Second)
fmt.Println(g)
}
результат операции:
&{Ankur 25}
modifyUser Received Vaule &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}
Вот хорошийshare memory by communicating
пример.
Вначале строится структура u, адрес 0x56420, а ее содержимое находится над адресом на рисунке. тогда поставь&u
назначить указательg
, адрес g — 0x565bb0, а его содержимое — адрес, указывающий на u.
В основной программе сначала пошлите g в c, согласноcopy value
Суть того, что входит в чан буф0x56420
, которое является значением указателя g (а не тем, на что он указывает), поэтому при печати элемента, полученного из канала,&{Ankur 25}
. Поэтому указатель g не "отправляется" в канал, а копируется его значение.
Снова:
Remember all transfer of value on the go channels happens with the copy of value.
утечка ресурсов
Каналы могут вызывать утечки горутин.
Причина утечки в том, что после того, как горутина оперирует каналом, он находится в состоянии блокировки отправки или получения, в то время как канал находится в заполненном или пустом состоянии и не был изменен. В то же время сборщик мусора не будет восстанавливать такие ресурсы, из-за чего горутина будет все время оставаться в очереди ожидания.
В разделе «Утечка ресурсов» главы 8 «Заметок по изучению языка Go» г-на Юкена приводится пример, вы можете увидеть его сами.
happened before
Определение из Википедии:
In computer science, the happened-before relation (denoted: ->) is a relation between the result of two events, such that if one event should happen before another event, the result must reflect that, even if those events are in reality executed out of order (usually to optimize program flow).
Проще говоря, если существует связь «произошло до» между событием а и событием b, то есть а -> b, то результаты а и b должны отражать эту связь. Поскольку современные компиляторы и ЦП будут выполнять различные оптимизации, включая перераспределение компилятора, перераспределение памяти и т. д., в параллельном коде, ограничение «произошло до» очень важно.
Согласно параллельному обмену программами г-на Хуанга Юепаня на Gopher China 2019, соотношение между отправкой канала, завершением отправки, получением и завершением приема было следующим:
- энный
send
долженhappened before
энныйreceive finished
, будь то буферизованный или небуферизованный канал. - Для буферизованного канала пропускной способности m n-й
receive
долженhappened before
п+месsend finished
. - Для небуферизованных каналов n-й
receive
долженhappened before
энныйsend finished
. - канал закрыть должен
happened before
получатель уведомлен.
Давайте объясним это один за другим.
Первое, мы тоже правы с точки зрения исходного кода, отправлять не обязательноhappened before
получить, потому что иногда горутина сначала принимается, затем горутина приостанавливается, а затем пробуждается отправителем, и отправка происходит после получения. Но в любом случае, чтобы завершить прием, его нужно сначала отправить.
Второй, буферизованный канал, когда происходит n+m-я отправка, возможны следующие две ситуации:
Если n-й прием не происходит. В этот момент канал заполнен и отправка заблокирована. Затем, когда произойдет n-й прием, горутина отправителя будет разбужена, а затем процесс отправки продолжится. Таким образом, энныйreceive
долженhappened before
п+месsend finished
.
Если n-й прием уже произошел, это напрямую удовлетворяет требованию.
Статью 3 также легче понять. Если n-я отправка заблокирована, горутина отправителя зависает, и n-я отправка приходит в это время, прежде чем n-я отправка завершится. Если n-ая отправка не заблокирована, значит, там уже ждал n-й прием, причем не только до завершения отправки, но и до отправки.
В-четвертых, вызовите исходный код, сначала установите Close = 1, затем разбудите ожидающий приемник и скопируйте нулевое значение в приемник.
Ссылка [Совместное использование параллельного программирования Bird's Nest] В области комментариев этого сообщения в блоге есть ссылка для скачивания PPT Это выступление г-на Чао на конференции Gopher 2019.
О произошедших ранее, вот пример, упомянутый в «Go Language Advanced Programming» в Chai Big and Cao Da's новой книги.
В разделе 1.5 книги сначала рассказывается о модели последовательной согласованной памяти, которая является основой параллельного программирования.
Перейдем непосредственно к примеру:
var done = make(chan bool)
var msg string
func aGoroutine() {
msg = "hello, world"
done <- true
}
func main() {
go aGoroutine()
<-done
println(msg)
}
Сначала определите готовый канал и строку для печати. В основной функции запустите горутину, дождитесь получения значения от done и выполните операцию печати msg. Если нет основной функции<-done
В этой строке кода напечатанное msg пустое, потому что у горутины нет времени на планирование, и прежде чем она сможет присвоить значение msg, основная программа завершит работу. В Go основная сопрограмма завершает работу, не дожидаясь других сопрограмм.
добавлять<-done
После этой строки кода он заблокируется здесь. После того, как aGoroutine отправит значение done, она проснется и продолжит печатать msg. А до этого было назначено msg, так что распечатаетhello, world
.
Произошедшее до того, как здесь полагается, является первым пунктом, упомянутым выше. Первая отправка должна произойти до завершения первого приема, т.е.done <- true
до<-done
происходит, что означает, что основная функция завершила выполнение<-done
затем выполнитьprintln(msg)
В этой строке кода msg уже присвоено значение, поэтому желаемый результат будет напечатан.
В книге я также использовал третье правило, упомянутое выше, для изменения кода:
var done = make(chan bool)
var msg string
func aGoroutine() {
msg = "hello, world"
<-done
}
func main() {
go aGoroutine()
done <- true
println(msg)
}
Также получить тот же результат, почему? Согласно третьему правилу, для небуферизованных каналов первый прием должен произойти до завершения первой отправки. Это,
существуетdone <- true
перед окончанием,<-done
Это уже произошло, а это значит, что msg было присвоено значение, и оно в конечном итоге распечатаетhello, world
.
Как красиво закрыть канал
Эта часть контента в основном взята из англоязычной статьи на Go 101. Справочный материал [Как изящно закрыть канал] может перейти непосредственно к исходному тексту.
В статье сначала «выплевываются» некоторые проблемы при проектировании канала Go, а затем приводится несколько примеров того, как изящно закрыть канал в нескольких разных ситуациях. По соглашению я дам свою интерпретацию на основе оригинального авторского содержания.После прочтения этого раздела вы можете оглянуться на оригинальный английский текст, это будет очень интересно.
По поводу использования каналов есть несколько неудобств:
- Невозможно узнать, закрыт ли канал, не изменив состояние самого канала.
- Закрытие закрытого канала вызовет панику. Поэтому очень опасно, если сторона, закрывающая канал, закрывает канал, не зная, закрыт ли канал.
- Отправка данных в закрытый канал вызовет панику. Поэтому, если сторона, отправляющая данные в канал, не знает, закрыт канал или нет, то необдуманно отправлять данные в канал очень опасно.
Текст действительно дает функцию проверки, закрыт ли канал:
func IsClosed(ch <-chan T) bool {
select {
case <-ch:
return true
default:
}
return false
}
func main() {
c := make(chan T)
fmt.Println(IsClosed(c)) // false
close(c)
fmt.Println(IsClosed(c)) // true
}
Глядя на код, проблем на самом деле много. Во-первых, функция IsClosed — это функция с побочными эффектами. Каждый раз, когда он вызывается, элемент в канале считывается, изменяя состояние канала. Это нехорошая функция, просто работай, когда работаешь, и заботься об овцах!
Во-вторых, результат, возвращаемый функцией IsClosed, представляет собой только момент вызова, и нет гарантии, что другие горутины будут выполнять над ним какие-то операции после вызова, изменяя его состояние. Например, функция IsClosed возвращает true, но другая горутина в это время закрывает канал, а вы отправляете ей данные с этим устаревшим сообщением «канал не закрыт», из-за чего возникает паника. Разумеется, дважды канал не закроется, если результат, возвращаемый функцией IsClosed, истинен, значит, канал действительно закрыт.
Существует широко распространенный принцип закрытия каналов:
don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.
Не закрывайте канал со стороны получателя и не закрывайте канал при наличии нескольких отправителей.
Легко понять, что отправитель — это тот, кто отправляет элементы в канал, поэтому отправитель может решить, когда не отправлять данные и закрыть канал. Однако, если отправителей несколько, один отправитель не может определить положение других отправителей, и канал не может быть закрыт необдуманно.
Но сказанное выше не самое существенное, самый существенный принцип только один:
don't close (or send values to) closed channels.
Есть два менее изящных способа закрыть канал:
-
Используйте механизм отложенного восстановления, чтобы безопасно закрыть канал или отправить данные в канал. Даже если есть паника, за ней стоит отложенное восстановление.
-
Используйте sync.Once, чтобы гарантировать только одно закрытие.
Я не буду публиковать код, просто перейдите к исходному тексту.
Изюминка этого раздела здесь, как изящно закрыть канал?
По количеству отправителя и получателя бывают следующие ситуации:
- отправитель, получатель
- один отправитель, M получателей
- N отправителей, один получатель
- N отправителей, M получателей
Для 1 и 2 само собой отправитель только один, просто закройте его прямо со стороны отправителя, проблем нет. Сосредоточьтесь на случаях 3 и 4.
В третьем случае способ изящного закрытия канала таков: единственный получатель говорит «пожалуйста, прекратите посылать больше», закрывая дополнительный сигнальный канал.
Решение состоит в том, чтобы добавить канал, который передает сигнал закрытия, а приемник выдает команду на закрытие канала данных через сигнальный канал. отправители прекращают отправку данных после прослушивания сигнала закрытия. Я изменил код, чтобы сделать его более кратким:
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumSenders = 1000
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
select {
case <- stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// the receiver
go func() {
for value := range dataCh {
if value == Max-1 {
fmt.Println("send stop signal to senders.")
close(stopCh)
return
}
fmt.Println(value)
}
}()
select {
case <- time.After(time.Hour):
}
}
StopCh здесь — это сигнальный канал, который сам имеет только одного отправителя, поэтому его можно закрыть напрямую. После того, как отправители получают сигнал закрытия, выбирается ветвь выбора «case
Следует отметить, что приведенный выше код явно не закрывает dataCh. В языке Go для канала, если в конце нет горутины для ссылки на него, независимо от того, закрыт канал или нет, он в конечном итоге будет восстановлен gc. Поэтому в данном случае так называемое изящное закрытие канала заключается в том, чтобы не закрывать канал и позволить gc сделать это за вас.
В последнем случае способ изящно закрыть канал: любой из них говорит «давайте закончим игру», уведомляя модератора о закрытии дополнительного сигнального канала.
В отличие от третьего случая, здесь есть M получателей.Если третье решение будет принято напрямую, и стопЧ закроется непосредственно получателем, канал будет закрываться неоднократно, вызывая панику. Следовательно, необходимо добавить посредника, и М приемников отправить ему «запрос» на закрытие dataCh.После того как посредник получит первый запрос, он сразу выдаст указание на закрытие dataCh (закрывая stopCh, повторное закрытие не произойдет при на этот раз. , потому что у отправителя stopCh есть только один посредник). Кроме того, N отправителей здесь также могут отправлять запросы посреднику на закрытие канала данных.
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// It must be a buffered channel.
toStop := make(chan string, 1)
var stoppedBy string
// moderator
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
select {
case toStop <- "sender#" + id:
default:
}
return
}
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
for {
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max-1 {
select {
case toStop <- "receiver#" + id:
default:
}
return
}
fmt.Println(value)
}
}
}(strconv.Itoa(i))
}
select {
case <- time.After(time.Hour):
}
}
В коде toStop — это роль посредника, который используется для получения запросов на близкие данные, отправленных отправителями и получателями.
Здесь toStop объявлен как буферизованный канал. Предполагая, что toStop объявляет небуферизованный канал, первый отправленный запрос на закрытие dataCh может быть потерян. Поскольку и отправитель, и получатель отправляют запросы через оператор select, если горутина, в которой находится посредник, не готова, оператор select не будет выбран, и будет использоваться параметр по умолчанию, и ничего не будет сделано. Таким образом, первый запрос на закрытие dataCh теряется.
Если мы объявим емкость toStop как Num (отправителей) + Num (получателей), часть, которая отправляет запрос dataCh, может быть изменена на более сжатую форму:
...
toStop := make(chan string, NumReceivers + NumSenders)
...
value := rand.Intn(Max)
if value == 0 {
toStop <- "sender#" + id
return
}
...
if value == Max-1 {
toStop <- "receiver#" + id
return
}
...
Отправьте запрос непосредственно в toStop, потому что емкость toStop достаточно велика, поэтому вам не нужно беспокоиться о блокировке, и, естественно, вам не нужно добавлять случай по умолчанию в оператор select, чтобы избежать блокировки.
Как видите, dataCh здесь тоже не особо закрыт, что то же самое, что и в третьем случае.
Выше приведены некоторые из самых основных ситуаций, но они могут охватывать почти все ситуации и их варианты. Просто помни:
don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.
И более существенный принцип:
don't close (or send values to) closed channels.
Закрытый канал все еще может считывать данные
Чтение данных из буферизованного канала может по-прежнему считывать действительные значения, когда канал закрыт. Прочитанные данные недействительны только тогда, когда возвращенное ok является ложным.
func main() {
ch := make(chan int, 5)
ch <- 18
close(ch)
x, ok := <-ch
if ok {
fmt.Println("received: ", x)
}
x, ok = <-ch
if !ok {
fmt.Println("channel closed, data invalid.")
}
}
результат операции:
received: 18
channel closed, data invalid.
Создается буферизованный канал, в него отправляется элемент, и канал закрывается. После двух попыток чтения данных из канала значение все еще может быть нормально прочитано с первого раза. Подтверждение, возвращенное во второй раз, является ложным, что указывает на то, что канал закрыт и в нем нет данных.
приложение канала
Комбинация Channel и goroutine — убийца параллельного программирования в Go. Практическое применение Канала также часто бросается в глаза.В сочетании с выбором, отменой, таймером и т. д. он может реализовывать различные функции. Далее разберемся с применением канала.
стоп-сигнал
В предыдущем разделе о том, как изящно закрыть канал, уже было рассмотрено многое, поэтому я пропущу этот.
Есть еще довольно много сценариев, где каналы используются для остановки сигналов, часто канал закрывается или в канал отправляется элемент, чтобы сторона, принимающая канал, могла знать эту информацию, а затем делать какие-то другие операции.
время задачи
В сочетании с таймером обычно есть два способа игры: реализовать контроль тайм-аута и реализовать регулярное выполнение задачи.
Иногда вам нужно выполнить операцию, но вы не хотите, чтобы она занимала слишком много времени, можно сделать последний таймер:
select {
case <-time.After(100 * time.Millisecond):
case <-s.stopc:
return false
}
После ожидания в течение 100 мс, если s.stopc не прочитал данные или закрыт, он сразу же завершится. Это пример из исходного кода etcd, который можно найти повсюду.
Также относительно просто выполнять задачу на регулярной основе:
func worker() {
ticker := time.Tick(1 * time.Second)
for {
select {
case <- ticker:
// 执行定时任务
fmt.Println("执行 1s 定时任务")
}
}
}
Каждую 1 секунду выполнять запланированную задачу.
Разделите производителя и потребителя
При запуске сервиса запускается n воркеров как пул рабочих сопрограмм, и эти сопрограммы работают в одномfor {}
В бесконечном цикле потребляйте рабочие задачи из канала и выполняйте:
func main() {
taskCh := make(chan int, 100)
go worker(taskCh)
// 塞任务
for i := 0; i < 10; i++ {
taskCh <- i
}
// 等待 1 小时
select {
case <-time.After(time.Hour):
}
}
func worker(taskCh <-chan int) {
const N = 5
// 启动 5 个工作协程
for i := 0; i < N; i++ {
go func(id int) {
for {
task := <- taskCh
fmt.Printf("finish task: %d by worker %d\n", task, id)
time.Sleep(time.Second)
}
}(i)
}
}
5 рабочих сопрограмм постоянно берут задачи из рабочей очереди.Производителю нужно только отправить задачи в канал, чтобы разделить производителя и потребителя.
Вывод программы:
finish task: 1 by worker 4
finish task: 2 by worker 2
finish task: 4 by worker 3
finish task: 3 by worker 1
finish task: 0 by worker 0
finish task: 6 by worker 0
finish task: 8 by worker 3
finish task: 9 by worker 1
finish task: 7 by worker 4
finish task: 5 by worker 2
параллелизм управления
Иногда необходимо регулярно выполнять сотни задач, например, выполнять какие-то офлайновые вычислительные задачи по городам каждый день. Однако количество параллелизма не должно быть слишком большим, поскольку процесс выполнения задачи зависит от некоторых сторонних ресурсов, что ограничивает скорость запросов. В это время вы можете контролировать количество параллелизма через канал.
Следующий пример взят из Advanced Programming in Go:
var limit = make(chan int, 3)
func main() {
// …………
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
// …………
}
Создайте буферизованный канал емкостью 3. Затем просмотрите список задач, каждая задача запускает горутину для завершения. Для фактического выполнения задачи действие обращения к третьей стороне завершается в w(), перед выполнением w() необходимо получить "лицензию" из лимита.После получения лицензии можно выполнить w(), а после выполнения задания вернуть "лицензию". Это позволяет вам контролировать количество горутин, работающих одновременно.
здесь,limit <- 1
Поместите его внутри func, а не снаружи, объяснил автор книги Чай Да в группе читателей:
Если он находится на внешнем уровне, он предназначен для управления количеством системных горутин, которые могут блокировать цикл for и влиять на бизнес-логику.
На самом деле лимит не имеет ничего общего с логикой, а только с настройкой производительности, семантика внутреннего слоя и внешнего слоя разная.
Еще один момент, который следует отметить, это то, что если w() паникует, «лицензия» может не быть возвращена, поэтому вам нужно использовать отсрочку, чтобы гарантировать это.
Суммировать
Наконец-то закончил писать, и вы наконец-то прочитали, поздравляю!
Напомним, что эта статья начинается с параллелизма и параллелизма, а затем охватывает CSP.Go использует каналы для реализации CSP. Затем я рассказал о том, что такое канал, зачем он нужен, а затем подробно разобрал принцип реализации канала, что также является самой важной частью полного текста. После этого я рассказал о нескольких продвинутых примерах и, наконец, перечислил несколько сценариев применения канала.
Я надеюсь, что вы сможете использовать эту статью для чтения исходного кода Go. Эта часть исходного кода невелика. Как и пакет контекста, он краток и лаконичен, и его стоит прочитать.
В справочных материалах я перечислил множество статей и книг, многие из которых стоит прочесть, а также упомянул их в тексте.
После того, как вы поймете основные принципы этого канала, вам будет очень интересно читать эти статьи на английском языке. Раньше у меня был менталитет «боязнь трудностей» по отношению к нему.После того, как вы это поймете, вам будет очень интересно читать, потому что вы действительно можете это понять.
Наконец, приятного чтения!
использованная литература
【Параллелизм в Go】GitHub.com/ARP IT Джиндал…
[Книга с открытым исходным кодом для расширенного программирования на языке Go]Разрушение 2010. Талант/продвинутый…
【Кратко и ясно】Аудитория Что/пост/го волну…
[Chai Da && Cao Da 'Advanced Programming на языке Go »]Разрушение 2010. Талант/продвинутый…
[Практика параллельного программирования]book.Douban.com/subject/262…
【Заметки Цао Да голанг】GitHub.com/часто123/потяните…
[Анимация реализации графического канала Internet Technology Nest]Tickets.WeChat.QQ.com/Yes/40U AP Gambling…
[Изучайте Golang вместе, рекомендуемые материалы очень полезны]сегмент fault.com/ah/119000001…
[Как изящно закрыть канал]go101.org/article/check…
【Перейти в конец канала】взрыв co's.IO/diving-deep…
[Очень хороший дизайн канала Кавьи на Gopher Con]спикеру Д. 3 года. Amazon AWS.com/present Вопрос о…
【приложение канала】Уууу. Это 0n net.com/archives/go…
【Примеры применения】Чжу Ясен.com/post/go_but…
【применение】Тони Пендулум.com/2014/09/29/…
【Совместное использование одновременного программирования Bird's Nest】www.colonot.com/2019/04/28/…
[Go-Questions, исходный проект Code Farmer's Peach Blossom]GitHub.com/езда на велосипеде/go-Q U…
[Книга с открытым исходным кодом GitBook Code Farmer Taohuayuan]qcrao91.gitbook.io/go/