оригинальный, здесь для удобства понимания я перевожу worker pools в thread pools.
Что такое буферный канал
Все рассмотренные ранее каналы не имеют буфера, поэтому блокируются как чтение, так и запись. Также возможно создать буферизованный канал, который будет блокироваться только тогда, когда буфер заполнен, а затем запись или чтение из пустого канала.
Для создания буферизованного канала требуется дополнительный параметр capacity для указания размера буфера:
ch := make(chan type, capacity)
в коде вышеcapacityДолжно быть больше 0, если равно 0, это небуферизованный канал, изученный ранее.
пример
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
fmt.Println(<- ch)
fmt.Println(<- ch)
}
В приведенном выше примере мы создали канал с пропускной способностью 2, поэтому запись не будет блокироваться до тех пор, пока не будут записаны 2 строки. Затем прочитайте строки 12 и 13 соответственно, и программа выведет следующее:
naveen
paul
другой пример
Давайте посмотрим на другой пример. Мы пишем в параллельных горутинах, а затем читаем в основной горутине. Этот пример помогает нам лучше понять буферные каналы.
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)
}
}
В приведенном выше коде мы создаем буферный канал емкостью 2 и передаем его в качестве параметра вwrite
функция, затем спать в течение 2 секунд. в то же времяwrite
Параллельное выполнение функций, используемых в функцияхfor
круговое направлениеch
Напишите 0-4. Поскольку емкость равна 2, можно сразу записать в канал 0 и 1 и затем заблокировать, пока не будет прочитано хотя бы одно значение. Так программа сразу выведет следующие 2 строчки:
successfully wrote 0 to ch
successfully wrote 1 to ch
Когда основная функция спит в течение 2 секунд, введитеfor range
Начните читать данные в цикле, а затем продолжайте спать в течение 2 секунд. Таким образом, программа выведет:
read value 0 from ch
successfully wrote 2 to ch
Этот цикл, пока канал не будет закрыт, окончательный вывод программы выглядит следующим образом:
successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch
тупик
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
ch <- "steve"
fmt.Println(<-ch)
fmt.Println(<-ch)
}
В приведенной выше программе мы хотим записать 3 строки в канал емкостью 2. Программа заблокируется, когда дойдет до строки 11, потому что в это время буфер канала заполнен. Если нет других горутин, читающих из него данные, программа заблокируется. Ошибка заключается в следующем:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/tmp/sandbox274756028/main.go:11 +0x100
длина и мощность
Емкость относится к тому, сколько данных буферизованный канал может хранить одновременно.Это значение используется при использованииmake
Ключевое слово используется при создании канала. Длина относится к тому, сколько данных было сохранено в текущем канале. Давайте посмотрим на следующий код:
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
В приведенном выше коде мы создаем канал емкостью 3, а затем записываем в него 2 строки, поэтому длина канала теперь равна 2. Затем прочитайте 1 строку из канала, поэтому длина теперь равна 1. Вывод программы следующий:
capacity is 3
length is 2
read value naveen
new length is 1
WaitGroup
В следующем разделе мы представим пул потоков (worker pools), чтобы лучше понять, нам нужно представитьWaitGroup
, а затем на его основе реализуем пул потоков.
WaitGroup используется для ожидания завершения выполнения группы горутин, прежде чем программа будет заблокирована. Предположим, у нас есть 3 горутины, основная программа будет ждать завершения выполнения этих 3 горутин перед выходом. Нечего сказать, посмотрите на код:
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
WaitGroupЭто тип структуры, мы создаем группу ожидания по умолчанию в строке 18, которая внутренне реализуется на основе счетчика. мы называемAdd
метод и передать ему число в качестве параметра, счетчик будет увеличиваться на значение переданного параметра. при звонкеDone
метод, счетчик будет уменьшен на 1.Wait
Метод блокирует горутину до тех пор, пока счетчик не достигнет нуля.
В приведенном выше коде, вызывая циклwg.Add(1)
чтобы счетчик стал 3, запустите 3 горутины одновременно, а затем используйтеwg.Wait()
Заблокируйте основную горутину, пока счетчик не обнулится. в функцииprocess
, вызовwg.Done()
чтобы уменьшить счетчик после завершения выполнения трех горутин,wg.Done()
будет выполнено 3 раза, счетчик обнулится, а основная горутина будет разблокирована.
перечислитьwg
Адрес горутине очень важен! Если адрес не передан, то у каждой горутины будет копия, так что каждая горутина не будет уведомлена об окончании.main
функция.
Вывод программы следующий:
started Goroutine 2
started Goroutine 0
started Goroutine 1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing
Ваш вывод может немного отличаться от приведенного выше.
Пулы потоков (рабочие пулы)
Важным применением буферных каналов является реализация пулов потоков.
Вообще говоря, пул потоков — это набор потоков, ожидающих назначения им задач, и после завершения задачи он продолжает ожидать следующей задачи.
Затем мы реализуем пул потоков для вычисления суммы каждой цифры входного числа. Например, если вводится 123, возвращается 9 (1+2+3), а число, вводимое в пул потоков, генерируется псевдослучайным алгоритмом.
Вот основные шаги, которые нам нужны:
- Создайте набор горутин, которые прослушивают буферный канал и ждут задач.
- Добавьте задачи в буферный канал.
- Запишите результат в другой буферный канал после завершения задачи.
- Чтение данных из канала, где результат сохраняется и выводится.
Сначала создадим структуру для хранения задач и результатов:
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
каждыйJob
Имеетсяid
с однимrandomno
Используется для хранения случайных чисел, подлежащих вычислению. а такжеResult
типы включаютJob
свойства иsumofdigits
Сохраните результат.
Затем создайте буферный канал для получения задач и результатов:
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
Горутина получает задачи от заданий и записывает результаты в результаты.
следующееdigits
Функция используется для вычисления суммы и возврата результата, мы передаемSleep
для имитации трудоемких вычислений.
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
Следующая функция создает горутину:
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
чтениемjobs
задачи в созданииResult
структура и функции храненияdigits
вычисленный результат, прежде чем записать его вresults
этот канал. Эта функция получает параметр-указатель типа WaitGroup.wg
, и вызвать после завершения вычисленияwg.Done()
.
createWorkerPool
Эта функция используется для создания пула потоков:
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
Вышеуказанная функция создаетnoOfWorkers
Пул потоков горутин, вызываемый перед созданием горутин.wg.Add(1)
для увеличения счетчика, затем установитеwg
адрес переданworker
функция. После создания используйтеwg.Wait()
Подождите, пока все горутины закончат выполнение, затем снова вызовитеclose
закрыть функциюresults
Этот канал, чтобы никакая горутина не могла писать данные в будущем.
Далее давайте напишем функцию для назначения задач пулу потоков:
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
Приведенная выше функция определяет количество задач, записанных переданными параметрами. Максимальное значение случайного числа равно998
, и использовать счетчик в циклеi
Создать как идентификаторjob
структурировать и писатьjobs
, закрыть, когда закончитеjobs
.
Затем создайте функцию для чтенияresults
канал и распечатать:
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
Вышеупомянутая функция читаетresults
И напечатайте идентификатор, случайное число и результат, и, наконец, отправьтеdone
Запись данных в этот канал означает, что он распечатал все результаты.
Все готово, приступаемmain
функция:
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
Во-первых, мы записываем время, когда программа начинает выполняться, и, наконец, вычисляем время выполнения программы, вычитая время начала из времени окончания, Это время нам нужно, чтобы сравнить разницу между различными номерами пулов потоков.
Создайте файл с именемdone
канал и передать егоresult
функцию, чтобы вы могли распечатать вывод и получать уведомления, когда весь вывод завершен.
Наконец, создается и считывается пул потоков из 10 горутин.done
дождаться завершения расчета.
Полный код выглядит следующим образом:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
Результаты приведены ниже:
Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken 20.01081009 seconds
Программа будет иметь 100 строк вывода, потому что мы создали 100 заданий, ваш порядок вывода может отличаться от моего, и время также может отличаться, в зависимости от конфигурации оборудования. Всего у меня ушло 20 секунд.
следующее увеличениеnoOfWorkers
К 20 мы увеличили количество горутин в пуле потоков (удвоилось), и время выполнения определенно должно сократиться (почти вдвое). На моей машине вывод программы выглядит следующим образом:
...
total time taken 10.004364685 seconds
Таким образом, мы можем понять, что увеличение горутин в пуле потоков сократит время работы программы. Вы можете настроить его, как вам нравитсяmain
серединаnoOfJobs
а такжеnoOfWorkers
значение для анализа результатов.