Golang реализует эссе ForkJoin

Go

Как реализовать простой фреймворк ForkJoin с помощью Golang

Киньте адрес моего проекта

go-fork-join

Простой принцип

  • Что такое ForkJoin

    войти в контакт сForkJoinФреймворки для обученияJavaсерединаStreamПараллельный поток в параллельном потоке, нижний слой параллельного потокаForkJoinРамка

    ForkJoinФреймворк больше подходит для машин с многоядерными процессорами, как правило, используется для обработки большой задачи, которую можно разделить на несколько небольших задач, не имеющих зависимости друг от друга. постоянно сокращается, и эти небольшие задачи распределяются по центральному процессору.В ядре подзадачи выполняются параллельно, что значительно увеличивает скорость обработки задач.

    Многие из конкретных блогов очень хороши, и я не буду здесь вдаваться в подробности, позвольте мне дать вам несколько адресов блогов, которые я узнал в то время.

  • кража задач

    Алгоритм кражи задач на самом делеWorkerЕго можно получить из головы соответствующей рабочей очереди или другогоWorkerПолучить элементы в конце рабочей очереди.

    Каждый раз, когда вы опрашиваете очередь задач, начинайте с каждогоWorkerПолучить задачи из соответствующей очереди задач.Если обнаружится, что в это время в очереди задач нет ожидающих выполнения задач, то в это время будет принята стратегия случайного выбора, и одна будет выбрана случайным образом.WorkerСоответствующая рабочая очередь, чтобы украсть ее задачи

  • Присоединить результаты подзадачи

    В Java необходимо постоянно получать статус выполнения задачи.Если задача выполнена, то будет возвращен результат обработки задачи.В Golang, благодаря наличию chan, Java's Future mode очень легко реализовать ,только при присоединении задачи.Достаточно прочитать канал,потому что когда мы ставим шапку чан в 1,если в канале нет данных,читающая сторона будет заблокирована в ожидании

func (f *ForkJoinTask) Join() (bool, interface{}) {
	for {
		select {
		case data, ok := <-f.result:
			if ok {
				return true, data
			}
		case <-f.ctx.Done():
			panic(f.taskPool.err)
		}
	}
}

основной код

очередь задач

Пройдите очередь задач. Очередей задач несколько, но очередей задач несколько.Каждый раз из этих очередей задач будет получена задача.Если задача существует, то задача будет упакована в структуру, после получения задачи выполнение задания будет получено.worker, затем отправьте упакованную задачу вWorkerОтправляйте задачи асинхронно в канале chan

func (fp *ForkJoinPool) run(ctx context.Context) {
	go func() {
		wId := int32(0)
		for {
			select {
			case <-ctx.Done():
				fmt.Printf("here is err")
				fp.err = fp.wp.err
				return
			default:
				hasTask, job, ft := fp.taskQueue.dequeueByTali(wId)
				if hasTask {
					fp.wp.Submit(ctx, &struct {
						T Task
						F *ForkJoinTask
						C context.Context
					}{T: job, F: ft, C: ctx})
				}
				wId = (wId + 1) % fp.cap
			}
		}
	}()
}

Получить работника

существуетForkJoinПри инициализации по количеству ядер процессораWorkerПул инициализирован

func newPool(ctx context.Context, cancel context.CancelFunc) *Pool {
	...
	wCnt := runtime.NumCPU()
	for i := 0; i < wCnt; i ++ {
		w := newWorker(p)
		w.run(ctx)
		p.workers = append(p.workers, w)
	}
	...
}

Следовательно, задача обработки обязательно нуждается в соответствующем воркере для выполнения, поэтому каждый раз, когда вы получаетеworkerпойдет первымworkerОпределить, есть ли еще свободное место в пулеworker, если он существует, получить его напрямуюworker, в противном случае напрямую создайтеworkerбрать на себя выполнение заданий

func (p *Pool) retrieveWorker(ctx context.Context) *Worker {

	var w *Worker

	idleWorker := p.workers

	if len(idleWorker) >= 1 {
		p.lock.Lock()
		n := len(idleWorker) - 1
		w = idleWorker[n]
		p.workers = idleWorker[:n]
		p.lock.Unlock()
	} else {
		if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
			w = cacheWorker.(*Worker)
		} else {
			w = &Worker{
				pool: p,
				job: make(chan *struct {
					T Task
					F *ForkJoinTask
					C context.Context
				}, 1),
			}
		}
		w.run(ctx)
	}
	return w
}

Worker

Объект, который фактически выполняет задачу, каждыйworkerсвязать одинgoruntine, и естьchanканал для получения задач асинхронно и вgoruntineЗадача вынимается и выполняется асинхронно посередине, когда задача выполняется,workerвернуться кworkerбассейн

func (w *Worker) run(ctx context.Context) {
	go func() {

		var tmpTask *ForkJoinTask

		defer func() {
			if p := recover(); p != nil {
				w.pool.panicHandler(p)
				if tmpTask != nil {
					w.pool.err = p
					close(tmpTask.result)
				}
			}
		}()

		for {
			select {
			case <-ctx.Done():
				fmt.Println("An exception occurred and the task has stopped")
				return
			default:
				for job := range w.job {
					if job == nil {
						w.pool.workerCache.Put(w)
						return
					}
					tmpTask = job.F
					job.F.result <- job.T.Compute()
					w.pool.releaseWorker(w)
				}
			}
		}
	}()
}

достижение

benchtest

области улучшаются

  • Алгоритм кражи задач

    Текущий алгоритм кражи задач версии 0.1 нельзя назвать похожим на Java ForkJoin, который поддерживает двух рабочих процессов для одновременного получения задач из очереди, но блокирует всю очередь при получении задач, поэтому производительность параллелизма не очень хорошая. ● Используйте CAS для замены пессимистичных блокировок, чтобы два исполнителя могли одновременно считывать данные из очереди.WorkerВ то же время, когда элемент получен из очереди задач длиной всего 1, оптимистическая блокировка поднимается до пессимистической блокировки для контроля.

  • Контроль количества рабочих

    Текущее количество воркеров будет постоянно создаваться по мере непрерывной декомпозиции задачи.Если задача декомпозирована слишком глубоко, может быть создано большое количество воркеров.Worker, так что надо дальше разбиратьсяForkJoinПланирование ресурсов потока