Golang реализует простую структуру сканера (4) — очередь реализует планирование параллельных задач.

Go

предисловие

в предыдущей статье«Golang реализует простую структуру сканера (3) — простая параллельная версия»В , мы реализовали простейший одновременный поисковый робот с планировщиком для каждогоRequestСоздаватьgoroutine, каждыйgoroutineпрошлоеWorkerРаспределяйте задачи в очереди и заканчивайте их выполнение. всеWorkerхватают одногоchannelзадачи в. Но на этом пути есть еще некоторые недостатки, такие как слабый контроль: все рабочие борются за одно и то же.channelВ задаче у нас нет возможности контролировать, какая рабочая задача дается.

По сути, мы можем сами сделать механизм распределения задач, мы сами решаем, на какого работника распределить

Примечание: Этот параллелизм изменен на основе простой реализации параллелизма из предыдущей статьи, поэтому публикуется не весь код, а только некоторые измененные части. Чтобы просмотреть полный код проекта, вы можете просмотреть предыдущую статью или скачать его с гитхабаПросмотр исходного кода проекта

1. Архитектура проекта

На основе реализации простого параллелизма в предыдущей статье мы изменим следующиеSchedulerмеханизм распределения задач

  • когдаSchedulerполучилRequest, нельзя отправить напрямуюWorker, ни для каждогоRequestСоздаватьgoroutine, поэтому здесь используется очередь запросов
  • В то же время мы хотимWorkerРеализует еще один элемент управления, который может решить, на какую задачу отправитьWorker, так что здесь нам также понадобитсяWorkerочередь
  • когда естьRequestа такжеWorker, мы можем отправить выбранный Запрос выбранномуWorker

2. Очередь реализует планировщик задач

Создайте файл queued.go в каталоге планировщика.

package scheduler

import "crawler/engine"

// 使用队列来调度任务

type QueuedScheduler struct {
	requestChan chan engine.Request		// Request channel
    // Worker channel, 其中每一个Worker是一个 chan engine.Request 类型
	workerChan  chan chan engine.Request	
}

// 提交请求任务到 requestChannel
func (s *QueuedScheduler) Submit(request engine.Request) {
	s.requestChan <- request
}

func (s *QueuedScheduler) ConfigMasterWorkerChan(chan engine.Request) {
	panic("implement me")
}

// 告诉外界有一个 worker 可以接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
	s.workerChan <- w
}

func (s *QueuedScheduler) Run() {
    // 生成channel
	s.workerChan = make(chan chan engine.Request)
	s.requestChan = make(chan engine.Request)
	go func() {
		// 创建请求队列和工作队列
		var requestQ []engine.Request
		var workerQ []chan engine.Request
		for {
			var activeWorker chan engine.Request
			var activeRequest engine.Request
			
            // 当requestQ和workerQ同时有数据时
			if len(requestQ) > 0 && len(workerQ) > 0 {
				activeWorker = workerQ[0]
				activeRequest = requestQ[0]
			}
			
			select {
			case r := <-s.requestChan: // 当 requestChan 收到数据
				requestQ = append(requestQ, r)
			case w := <-s.workerChan: // 当 workerChan 收到数据
				workerQ = append(workerQ, w)
			case activeWorker <- activeRequest: // 当请求队列和认读队列都不为空时,给任务队列分配任务
				requestQ = requestQ[1:]
				workerQ = workerQ[1:]
			}
		}
	}()
}

3. Гусеничный двигатель

Модифицированный файл concurrent.go выглядит следующим образом.

package engine

import (
	"log"
)

// 并发引擎
type ConcurrendEngine struct {
	Scheduler   Scheduler
	WorkerCount int
}

// 任务调度器
type Scheduler interface {
	Submit(request Request) // 提交任务
	ConfigMasterWorkerChan(chan Request)
	WorkerReady(w chan Request)
	Run()
}

func (e *ConcurrendEngine) Run(seeds ...Request) {

	out := make(chan ParseResult)
	e.Scheduler.Run()

	// 创建 goruntine
	for i := 0; i < e.WorkerCount; i++ {
		createWorker(out, e.Scheduler)
	}

	// engine把请求任务提交给 Scheduler
	for _, request := range seeds {
		e.Scheduler.Submit(request)
	}

	itemCount := 0
	for {
		// 接受 Worker 的解析结果
		result := <-out
		for _, item := range result.Items {
			log.Printf("Got item: #%d: %v\n", itemCount, item)
			itemCount++
		}

		// 然后把 Worker 解析出的 Request 送给 Scheduler
		for _, request := range result.Requests {
			e.Scheduler.Submit(request)
		}
	}
}

func createWorker(out chan ParseResult, s Scheduler) {
    // 为每一个Worker创建一个channel
	in := make(chan Request)
	go func() {
		for {
			s.WorkerReady(in) // 告诉调度器任务空闲
			request := <-in
			result, err := worker(request)
			if err != nil {
				continue
			}
			out <- result
		}
	}()
}

4. основная функция

package main

import (
	"crawler/engine"
	"crawler/scheduler"
	"crawler/zhenai/parser"
)

func main() {
	e := engine.ConcurrendEngine{
		Scheduler:   &scheduler.QueuedScheduler{},// 这里调用并发调度器
		WorkerCount: 50,
	}
	e.Run(engine.Request{
		Url:       "http://www.zhenai.com/zhenghun",
		ParseFunc: parser.ParseCityList,
	})
}

Результаты приведены ниже:

5. Резюме

В этой статье мы используем очередь для реализации планирования одновременных задач, тем самым реализуя контроль над Worker. Теперь у нас есть две реализации параллелизма, но их методы планирования различны.Чтобы унифицировать код, содержание следующей статьи:

  • Сделайте изоморфизм в проект
  • Добавьте модуль хранения данных.

Если вы хотите получитьИнженеры Google подробно объясняют язык GoДля видеоресурсов вы можете оставить свой адрес электронной почты в области комментариев.

Проэктисходный кодОн был размещен на Github. Для каждой версии есть записи. Вы можете проверить это. Не забудьте поставить звезду. Заранее спасибо.

Если вы считаете, что блог хорош, пожалуйста, поставьте лайк, пожалуйста,