предисловие
в предыдущей статье«Golang реализует простую структуру сканера (3) — простая параллельная версия»В , мы реализовали простейший одновременный поисковый робот с планировщиком для каждогоRequest
Создаватьgoroutine
, каждыйgoroutine
прошлоеWorker
Распределяйте задачи в очереди и заканчивайте их выполнение. всеWorker
хватают одногоchannel
задачи в. Но на этом пути есть еще некоторые недостатки, такие как слабый контроль: все рабочие борются за одно и то же.channel
В задаче у нас нет возможности контролировать, какая рабочая задача дается.
По сути, мы можем сами сделать механизм распределения задач, мы сами решаем, на какого работника распределить
Примечание: Этот параллелизм изменен на основе простой реализации параллелизма из предыдущей статьи, поэтому публикуется не весь код, а только некоторые измененные части. Чтобы просмотреть полный код проекта, вы можете просмотреть предыдущую статью или скачать его с гитхабаПросмотр исходного кода проекта
1. Архитектура проекта
На основе реализации простого параллелизма в предыдущей статье мы изменим следующиеScheduler
механизм распределения задач
- когда
Scheduler
получилRequest
, нельзя отправить напрямуюWorker
, ни для каждогоRequest
Создаватьgoroutine
, поэтому здесь используется очередь запросов - В то же время мы хотим
Worker
Реализует еще один элемент управления, который может решить, на какую задачу отправитьWorker
, так что здесь нам также понадобитсяWorker
очередь - когда есть
Request
а такжеWorker
, мы можем отправить выбранный Запрос выбранномуWorker
2. Очередь реализует планировщик задач
Создайте файл queued.go в каталоге планировщика.
package scheduler
import "crawler/engine"
// 使用队列来调度任务
type QueuedScheduler struct {
requestChan chan engine.Request // Request channel
// Worker channel, 其中每一个Worker是一个 chan engine.Request 类型
workerChan chan chan engine.Request
}
// 提交请求任务到 requestChannel
func (s *QueuedScheduler) Submit(request engine.Request) {
s.requestChan <- request
}
func (s *QueuedScheduler) ConfigMasterWorkerChan(chan engine.Request) {
panic("implement me")
}
// 告诉外界有一个 worker 可以接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
s.workerChan <- w
}
func (s *QueuedScheduler) Run() {
// 生成channel
s.workerChan = make(chan chan engine.Request)
s.requestChan = make(chan engine.Request)
go func() {
// 创建请求队列和工作队列
var requestQ []engine.Request
var workerQ []chan engine.Request
for {
var activeWorker chan engine.Request
var activeRequest engine.Request
// 当requestQ和workerQ同时有数据时
if len(requestQ) > 0 && len(workerQ) > 0 {
activeWorker = workerQ[0]
activeRequest = requestQ[0]
}
select {
case r := <-s.requestChan: // 当 requestChan 收到数据
requestQ = append(requestQ, r)
case w := <-s.workerChan: // 当 workerChan 收到数据
workerQ = append(workerQ, w)
case activeWorker <- activeRequest: // 当请求队列和认读队列都不为空时,给任务队列分配任务
requestQ = requestQ[1:]
workerQ = workerQ[1:]
}
}
}()
}
3. Гусеничный двигатель
Модифицированный файл concurrent.go выглядит следующим образом.
package engine
import (
"log"
)
// 并发引擎
type ConcurrendEngine struct {
Scheduler Scheduler
WorkerCount int
}
// 任务调度器
type Scheduler interface {
Submit(request Request) // 提交任务
ConfigMasterWorkerChan(chan Request)
WorkerReady(w chan Request)
Run()
}
func (e *ConcurrendEngine) Run(seeds ...Request) {
out := make(chan ParseResult)
e.Scheduler.Run()
// 创建 goruntine
for i := 0; i < e.WorkerCount; i++ {
createWorker(out, e.Scheduler)
}
// engine把请求任务提交给 Scheduler
for _, request := range seeds {
e.Scheduler.Submit(request)
}
itemCount := 0
for {
// 接受 Worker 的解析结果
result := <-out
for _, item := range result.Items {
log.Printf("Got item: #%d: %v\n", itemCount, item)
itemCount++
}
// 然后把 Worker 解析出的 Request 送给 Scheduler
for _, request := range result.Requests {
e.Scheduler.Submit(request)
}
}
}
func createWorker(out chan ParseResult, s Scheduler) {
// 为每一个Worker创建一个channel
in := make(chan Request)
go func() {
for {
s.WorkerReady(in) // 告诉调度器任务空闲
request := <-in
result, err := worker(request)
if err != nil {
continue
}
out <- result
}
}()
}
4. основная функция
package main
import (
"crawler/engine"
"crawler/scheduler"
"crawler/zhenai/parser"
)
func main() {
e := engine.ConcurrendEngine{
Scheduler: &scheduler.QueuedScheduler{},// 这里调用并发调度器
WorkerCount: 50,
}
e.Run(engine.Request{
Url: "http://www.zhenai.com/zhenghun",
ParseFunc: parser.ParseCityList,
})
}
Результаты приведены ниже:
5. Резюме
В этой статье мы используем очередь для реализации планирования одновременных задач, тем самым реализуя контроль над Worker. Теперь у нас есть две реализации параллелизма, но их методы планирования различны.Чтобы унифицировать код, содержание следующей статьи:
- Сделайте изоморфизм в проект
- Добавьте модуль хранения данных.
Если вы хотите получитьИнженеры Google подробно объясняют язык GoДля видеоресурсов вы можете оставить свой адрес электронной почты в области комментариев.
Проэктисходный кодОн был размещен на Github. Для каждой версии есть записи. Вы можете проверить это. Не забудьте поставить звезду. Заранее спасибо.
Если вы считаете, что блог хорош, пожалуйста, поставьте лайк, пожалуйста,