[Перевод] GO Language Study Notes — Буферные каналы и пул потоков

Go

оригинальный, здесь для удобства понимания я перевожу worker pools в thread pools.

Что такое буферный канал

Все рассмотренные ранее каналы не имеют буфера, поэтому блокируются как чтение, так и запись. Также возможно создать буферизованный канал, который будет блокироваться только тогда, когда буфер заполнен, а затем запись или чтение из пустого канала.

Для создания буферизованного канала требуется дополнительный параметр capacity для указания размера буфера:

ch := make(chan type, capacity)

в коде вышеcapacityДолжно быть больше 0, если равно 0, это небуферизованный канал, изученный ранее.

пример

package main

import (  
    "fmt"
)


func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}

В приведенном выше примере мы создали канал с пропускной способностью 2, поэтому запись не будет блокироваться до тех пор, пока не будут записаны 2 строки. Затем прочитайте строки 12 и 13 соответственно, и программа выведет следующее:

naveen  
paul

другой пример

Давайте посмотрим на другой пример. Мы пишем в параллельных горутинах, а затем читаем в основной горутине. Этот пример помогает нам лучше понять буферные каналы.

package main

import (  
    "fmt"
    "time"
)

func write(ch chan int) {  
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {  
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)

    }
}

В приведенном выше коде мы создаем буферный канал емкостью 2 и передаем его в качестве параметра вwriteфункция, затем спать в течение 2 секунд. в то же времяwriteПараллельное выполнение функций, используемых в функцияхforкруговое направлениеchНапишите 0-4. Поскольку емкость равна 2, можно сразу записать в канал 0 и 1 и затем заблокировать, пока не будет прочитано хотя бы одно значение. Так программа сразу выведет следующие 2 строчки:

successfully wrote 0 to ch  
successfully wrote 1 to ch

Когда основная функция спит в течение 2 секунд, введитеfor rangeНачните читать данные в цикле, а затем продолжайте спать в течение 2 секунд. Таким образом, программа выведет:

read value 0 from ch  
successfully wrote 2 to ch

Этот цикл, пока канал не будет закрыт, окончательный вывод программы выглядит следующим образом:

successfully wrote 0 to ch  
successfully wrote 1 to ch  
read value 0 from ch  
successfully wrote 2 to ch  
read value 1 from ch  
successfully wrote 3 to ch  
read value 2 from ch  
successfully wrote 4 to ch  
read value 3 from ch  
read value 4 from ch

тупик

package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

В приведенной выше программе мы хотим записать 3 строки в канал емкостью 2. Программа заблокируется, когда дойдет до строки 11, потому что в это время буфер канала заполнен. Если нет других горутин, читающих из него данные, программа заблокируется. Ошибка заключается в следующем:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:  
main.main()  
    /tmp/sandbox274756028/main.go:11 +0x100

длина и мощность

Емкость относится к тому, сколько данных буферизованный канал может хранить одновременно.Это значение используется при использованииmakeКлючевое слово используется при создании канала. Длина относится к тому, сколько данных было сохранено в текущем канале. Давайте посмотрим на следующий код:

package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}

В приведенном выше коде мы создаем канал емкостью 3, а затем записываем в него 2 строки, поэтому длина канала теперь равна 2. Затем прочитайте 1 строку из канала, поэтому длина теперь равна 1. Вывод программы следующий:

capacity is 3  
length is 2  
read value naveen  
new length is 1

WaitGroup

В следующем разделе мы представим пул потоков (worker pools), чтобы лучше понять, нам нужно представитьWaitGroup, а затем на его основе реализуем пул потоков.

WaitGroup используется для ожидания завершения выполнения группы горутин, прежде чем программа будет заблокирована. Предположим, у нас есть 3 горутины, основная программа будет ждать завершения выполнения этих 3 горутин перед выходом. Нечего сказать, посмотрите на код:

package main

import (  
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {  
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {  
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}

WaitGroupЭто тип структуры, мы создаем группу ожидания по умолчанию в строке 18, которая внутренне реализуется на основе счетчика. мы называемAddметод и передать ему число в качестве параметра, счетчик будет увеличиваться на значение переданного параметра. при звонкеDoneметод, счетчик будет уменьшен на 1.WaitМетод блокирует горутину до тех пор, пока счетчик не достигнет нуля.

В приведенном выше коде, вызывая циклwg.Add(1)чтобы счетчик стал 3, запустите 3 горутины одновременно, а затем используйтеwg.Wait()Заблокируйте основную горутину, пока счетчик не обнулится. в функцииprocess, вызовwg.Done()чтобы уменьшить счетчик после завершения выполнения трех горутин,wg.Done()будет выполнено 3 раза, счетчик обнулится, а основная горутина будет разблокирована.

перечислитьwgАдрес горутине очень важен! Если адрес не передан, то у каждой горутины будет копия, так что каждая горутина не будет уведомлена об окончании.mainфункция.

Вывод программы следующий:

started Goroutine  2  
started Goroutine  0  
started Goroutine  1  
Goroutine 0 ended  
Goroutine 2 ended  
Goroutine 1 ended  
All go routines finished executing

Ваш вывод может немного отличаться от приведенного выше.

Пулы потоков (рабочие пулы)

Важным применением буферных каналов является реализация пулов потоков.

Вообще говоря, пул потоков — это набор потоков, ожидающих назначения им задач, и после завершения задачи он продолжает ожидать следующей задачи.

Затем мы реализуем пул потоков для вычисления суммы каждой цифры входного числа. Например, если вводится 123, возвращается 9 (1+2+3), а число, вводимое в пул потоков, генерируется псевдослучайным алгоритмом.

Вот основные шаги, которые нам нужны:

  • Создайте набор горутин, которые прослушивают буферный канал и ждут задач.
  • Добавьте задачи в буферный канал.
  • Запишите результат в другой буферный канал после завершения задачи.
  • Чтение данных из канала, где результат сохраняется и выводится.

Сначала создадим структуру для хранения задач и результатов:

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

каждыйJobИмеетсяidс однимrandomnoИспользуется для хранения случайных чисел, подлежащих вычислению. а такжеResultтипы включаютJobсвойства иsumofdigitsСохраните результат.

Затем создайте буферный канал для получения задач и результатов:

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

Горутина получает задачи от заданий и записывает результаты в результаты.

следующееdigitsФункция используется для вычисления суммы и возврата результата, мы передаемSleepдля имитации трудоемких вычислений.

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

Следующая функция создает горутину:

func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}

чтениемjobsзадачи в созданииResultструктура и функции храненияdigitsвычисленный результат, прежде чем записать его вresultsэтот канал. Эта функция получает параметр-указатель типа WaitGroup.wg, и вызвать после завершения вычисленияwg.Done().

createWorkerPoolЭта функция используется для создания пула потоков:

func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

Вышеуказанная функция создаетnoOfWorkersПул потоков горутин, вызываемый перед созданием горутин.wg.Add(1)для увеличения счетчика, затем установитеwgадрес переданworkerфункция. После создания используйтеwg.Wait()Подождите, пока все горутины закончат выполнение, затем снова вызовитеcloseзакрыть функциюresultsЭтот канал, чтобы никакая горутина не могла писать данные в будущем.

Далее давайте напишем функцию для назначения задач пулу потоков:

func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

Приведенная выше функция определяет количество задач, записанных переданными параметрами. Максимальное значение случайного числа равно998, и использовать счетчик в циклеiСоздать как идентификаторjobструктурировать и писатьjobs, закрыть, когда закончитеjobs.

Затем создайте функцию для чтенияresultsканал и распечатать:

func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

Вышеупомянутая функция читаетresultsИ напечатайте идентификатор, случайное число и результат, и, наконец, отправьтеdoneЗапись данных в этот канал означает, что он распечатал все результаты.

Все готово, приступаемmainфункция:

func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

Во-первых, мы записываем время, когда программа начинает выполняться, и, наконец, вычисляем время выполнения программы, вычитая время начала из времени окончания, Это время нам нужно, чтобы сравнить разницу между различными номерами пулов потоков.

Создайте файл с именемdoneканал и передать егоresultфункцию, чтобы вы могли распечатать вывод и получать уведомления, когда весь вывод завершен.

Наконец, создается и считывается пул потоков из 10 горутин.doneдождаться завершения расчета.

Полный код выглядит следующим образом:

package main

import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

Результаты приведены ниже:

Job id 1, input random no 636, sum of digits 15  
Job id 0, input random no 878, sum of digits 23  
Job id 9, input random no 150, sum of digits 6  
...
total time taken  20.01081009 seconds

Программа будет иметь 100 строк вывода, потому что мы создали 100 заданий, ваш порядок вывода может отличаться от моего, и время также может отличаться, в зависимости от конфигурации оборудования. Всего у меня ушло 20 секунд.

следующее увеличениеnoOfWorkersК 20 мы увеличили количество горутин в пуле потоков (удвоилось), и время выполнения определенно должно сократиться (почти вдвое). На моей машине вывод программы выглядит следующим образом:

...
total time taken  10.004364685 seconds

Таким образом, мы можем понять, что увеличение горутин в пуле потоков сократит время работы программы. Вы можете настроить его, как вам нравитсяmainсерединаnoOfJobsа такжеnoOfWorkersзначение для анализа результатов.