Как реализовать простой фреймворк ForkJoin с помощью Golang
Киньте адрес моего проекта
Простой принцип
-
Что такое ForkJoin
войти в контакт с
ForkJoin
Фреймворки для обученияJava
серединаStream
Параллельный поток в параллельном потоке, нижний слой параллельного потокаForkJoin
РамкаForkJoin
Фреймворк больше подходит для машин с многоядерными процессорами, как правило, используется для обработки большой задачи, которую можно разделить на несколько небольших задач, не имеющих зависимости друг от друга. постоянно сокращается, и эти небольшие задачи распределяются по центральному процессору.В ядре подзадачи выполняются параллельно, что значительно увеличивает скорость обработки задач.Многие из конкретных блогов очень хороши, и я не буду здесь вдаваться в подробности, позвольте мне дать вам несколько адресов блогов, которые я узнал в то время.
-
кража задач
Алгоритм кражи задач на самом деле
Worker
Его можно получить из головы соответствующей рабочей очереди или другогоWorker
Получить элементы в конце рабочей очереди.Каждый раз, когда вы опрашиваете очередь задач, начинайте с каждого
Worker
Получить задачи из соответствующей очереди задач.Если обнаружится, что в это время в очереди задач нет ожидающих выполнения задач, то в это время будет принята стратегия случайного выбора, и одна будет выбрана случайным образом.Worker
Соответствующая рабочая очередь, чтобы украсть ее задачи -
Присоединить результаты подзадачи
В Java необходимо постоянно получать статус выполнения задачи.Если задача выполнена, то будет возвращен результат обработки задачи.В Golang, благодаря наличию chan, Java's Future mode очень легко реализовать ,только при присоединении задачи.Достаточно прочитать канал,потому что когда мы ставим шапку чан в 1,если в канале нет данных,читающая сторона будет заблокирована в ожидании
func (f *ForkJoinTask) Join() (bool, interface{}) {
for {
select {
case data, ok := <-f.result:
if ok {
return true, data
}
case <-f.ctx.Done():
panic(f.taskPool.err)
}
}
}
основной код
очередь задач
Пройдите очередь задач. Очередей задач несколько, но очередей задач несколько.Каждый раз из этих очередей задач будет получена задача.Если задача существует, то задача будет упакована в структуру, после получения задачи выполнение задания будет получено.worker
, затем отправьте упакованную задачу вWorker
Отправляйте задачи асинхронно в канале chan
func (fp *ForkJoinPool) run(ctx context.Context) {
go func() {
wId := int32(0)
for {
select {
case <-ctx.Done():
fmt.Printf("here is err")
fp.err = fp.wp.err
return
default:
hasTask, job, ft := fp.taskQueue.dequeueByTali(wId)
if hasTask {
fp.wp.Submit(ctx, &struct {
T Task
F *ForkJoinTask
C context.Context
}{T: job, F: ft, C: ctx})
}
wId = (wId + 1) % fp.cap
}
}
}()
}
Получить работника
существуетForkJoin
При инициализации по количеству ядер процессораWorker
Пул инициализирован
func newPool(ctx context.Context, cancel context.CancelFunc) *Pool {
...
wCnt := runtime.NumCPU()
for i := 0; i < wCnt; i ++ {
w := newWorker(p)
w.run(ctx)
p.workers = append(p.workers, w)
}
...
}
Следовательно, задача обработки обязательно нуждается в соответствующем воркере для выполнения, поэтому каждый раз, когда вы получаетеworker
пойдет первымworker
Определить, есть ли еще свободное место в пулеworker
, если он существует, получить его напрямуюworker
, в противном случае напрямую создайтеworker
брать на себя выполнение заданий
func (p *Pool) retrieveWorker(ctx context.Context) *Worker {
var w *Worker
idleWorker := p.workers
if len(idleWorker) >= 1 {
p.lock.Lock()
n := len(idleWorker) - 1
w = idleWorker[n]
p.workers = idleWorker[:n]
p.lock.Unlock()
} else {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
w = cacheWorker.(*Worker)
} else {
w = &Worker{
pool: p,
job: make(chan *struct {
T Task
F *ForkJoinTask
C context.Context
}, 1),
}
}
w.run(ctx)
}
return w
}
Worker
Объект, который фактически выполняет задачу, каждыйworker
связать одинgoruntine
, и естьchan
канал для получения задач асинхронно и вgoruntine
Задача вынимается и выполняется асинхронно посередине, когда задача выполняется,worker
вернуться кworker
бассейн
func (w *Worker) run(ctx context.Context) {
go func() {
var tmpTask *ForkJoinTask
defer func() {
if p := recover(); p != nil {
w.pool.panicHandler(p)
if tmpTask != nil {
w.pool.err = p
close(tmpTask.result)
}
}
}()
for {
select {
case <-ctx.Done():
fmt.Println("An exception occurred and the task has stopped")
return
default:
for job := range w.job {
if job == nil {
w.pool.workerCache.Put(w)
return
}
tmpTask = job.F
job.F.result <- job.T.Compute()
w.pool.releaseWorker(w)
}
}
}
}()
}
достижение
области улучшаются
-
Алгоритм кражи задач
Текущий алгоритм кражи задач версии 0.1 нельзя назвать похожим на Java ForkJoin, который поддерживает двух рабочих процессов для одновременного получения задач из очереди, но блокирует всю очередь при получении задач, поэтому производительность параллелизма не очень хорошая. ● Используйте CAS для замены пессимистичных блокировок, чтобы два исполнителя могли одновременно считывать данные из очереди.
Worker
В то же время, когда элемент получен из очереди задач длиной всего 1, оптимистическая блокировка поднимается до пессимистической блокировки для контроля. -
Контроль количества рабочих
Текущее количество воркеров будет постоянно создаваться по мере непрерывной декомпозиции задачи.Если задача декомпозирована слишком глубоко, может быть создано большое количество воркеров.
Worker
, так что надо дальше разбиратьсяForkJoin
Планирование ресурсов потока