33. Как вручную реализовать пул сопрограмм?

Go

Привет всем, меня зовут Мин.

Во время самостоятельного изучения Golang я написал подробные учебные заметки и разместил их в своем личном общедоступном аккаунте WeChat «Время программирования Go». вы только изучаете язык Go, уделяйте ему внимание, учитесь и развивайтесь вместе.

Мой онлайн-блог:golang.iswbm.comМой Github: github.com/iswbm/GolangCodingTime


Создать сопрограмму в Golang чрезвычайно просто: вы просто определяете функцию и используете ключевое слово go для ее выполнения.

Если вы столкнулись с другими языками, вы обнаружите, что при использовании потоков, чтобы уменьшить накладные расходы на частое создание и уничтожение потоков, мы обычно используем пулы потоков для повторного использования потоков.

Технология объединения использует мультиплексирование для повышения производительности Вам нужен пул сопрограмм в Golang?

В Golang горутина — это легковесный поток, его создание и планирование выполняются в пользовательском режиме и не требуют входа в ядро, а это означает, что накладные расходы на создание и уничтожение сопрограмм очень малы.

Поэтому я думаю, что в большинстве случаев разработчикам не нужно использовать пул сопрограмм.

Но не исключено, что есть определенные сценарии, где это необходимо, потому что я не скажу, пока не столкнусь с этим.

отложить в сторонуЭто необходимоЭтот вопрос, с чисто технической точки зрения, как мы можем реализовать общий пул сопрограмм?

Давайте учиться писать

Сначала определяют структуру бассейна COROUTINE (бассейн), содержащей два свойства, оба типа Chan.

Одна работа, которая используется для получения задач задачи

Одним из них является sem, который используется для установки размера пула сопрограмм, то есть количества сопрограмм, которые могут выполняться одновременно.

type Pool struct {
    work chan func()   // 任务
    sem  chan struct{} // 数量
}

Затем определите новую функцию для создания объекта пула сопрограмм, есть деталь, на которую следует обратить внимание.

работа - это небуферизованный канал

А sem — это буферный канал, а size — это размер пула сопрограмм

func New(size int) *Pool {
    return &Pool{
        work: make(chan func()),
        sem:  make(chan struct{}, size),
    }
}

Наконец, привяжите две функции к объекту пула сопрограмм.

1,NewTask: добавить задачи в пул сопрограмм.

При первом вызове NewTask для добавления задачи, так как работа — это небуферизованный канал, обязательно возьмет ветку второго случая: используйте go worker для запуска сопрограммы.

func (p *Pool) NewTask(task func()) { 
    select {
        case p.work <- task:
        case p.sem <- struct{}{}:
            go p.worker(task)
    }
}

2,worker: используется для выполнения задач

Чтобы реализовать повторное использование сопрограмм, используется бесконечный цикл for, так что сопрограмма не завершается после выполнения задачи, а продолжает получать новые задачи.

func (p *Pool) worker(task func()) { 
    defer func() { <-p.sem }()
    for {
        task()
        task = <-p.work
    }
}

Эти две функции являются ключевыми функциями, реализуемыми пулом сопрограмм, и логика внутри заслуживает тщательного изучения:

1. Если заданное количество пулов сопрограмм больше 2, в это время задача передается в NewTask во второй раз.При выборе кейса, если первая сопрограмма еще запущена, будет взят второй кейс. воссоздать сопрограмму для выполнения задачи

2. Если количество входящих задач больше, чем установленное количество пулов сопрограмм, и все задачи в это время все еще выполняются, то вызовите NewTask в это время, чтобы передать задачу, оба случая не попадут и всегда будут Заблокируйте до завершения выполнения задачи, рабочий канал в рабочей функции может получать новые задачи и продолжать выполняться.

Выше приведен процесс реализации пула сопрограмм.

Он также очень прост в использовании, посмотрите на следующий код, и вы поймете

func main()  {
    pool := New(128)
    pool.NewTask(func(){
        fmt.Println("run task")
    })
}

Чтобы вы увидели эффект, я установил количество пулов сопрограмм равным 2, запускаю четыре задачи и печатаю текущее время после сна в течение 2 секунд.

func main()  {
    pool := New(2)

    for i := 1; i <5; i++{
        pool.NewTask(func(){
            time.Sleep(2 * time.Second)
            fmt.Println(time.Now())
        })
    }

    // 保证所有的协程都执行完毕
    time.Sleep(5 * time.Second)
}

Результаты выполнения следующие.Вы можете видеть, что всего задач 4. Поскольку размер пула сопрограмм равен 2, 4 задачи выполняются в два пакета (как видно из времени печати)

2020-05-24 23:18:02.014487 +0800 CST m=+2.005207182
2020-05-24 23:18:02.014524 +0800 CST m=+2.005243650
2020-05-24 23:18:04.019755 +0800 CST m=+4.010435443
2020-05-24 23:18:04.019819 +0800 CST m=+4.010499440