1. MapReduce чтения бумаги
1.1 Что такое MapReduce
Адрес бумаги MapReduce:П ДОС. Участвовал. Персик. Квота/6.824/бумага…
Впервые автор столкнулся с MapReduce во время изучения курса распределенных систем MIT6.824.разделяй и властвуйподумал о.
В качестве простого примера я какМанчестер ЮнайтедФанаты «Манчестер Юнайтед» хотят знать историю «Манчестер Юнайтед», поэтому я нашел соответствующие новостные сообщения о «Манчестер Юнайтед» за почти 30 лет.Чтобы узнать, кто является самой актуальной звездой, я посчитал имена каждой звезды в соответствующих новостных сообщениях. частота. Но было бы ослепительно, если бы мне пришлось считать один отчет за другим, поэтому я решил пригласить своих друзей, чтобы узнать об истории футбола.
Предположим, я приглашаю N друзей, тогда новостные сводки за последние 30 лет разделю на N долей, так что после того, как все пересчитают новостные сводки в руках, мы получимN формы, записывается на каждой формеСколько раз появляется имя каждого игрока. Затем я пригласил друзей M. Каждый из этих M друзей подсчитал общее количество появлений имени звезды в таблицах N. Например, A подсчитывает количество раз, когда имя Сяобэй появляется в таблицах N, а B подсчитывает Лу Сяопан. вхождения в N таблиц и т.д. и т.п. Наконец, мы можем получить табличные данные о частоте имен звезд «Манчестер Юнайтед» за последние три десятилетия!
Из приведенного выше примера видно, что если вы хотите MapReduce, вам сначала нужно достаточно друзей (шутка), давайте узнаем о MapReduce с теоретического уровня. MapReduce — это именномодель программирования.
В приведенном выше примере в реальном сценарии мы можем написать программу для просмотра новостных отчетов или запустить программу в нескольких потоках для просмотра новостей, но это всегда слишком медленно из-за ограничения мощности ЦП. Итак, мы развертываем программу на N машинах, и, естественно, нам нужно написать программу для развертывания на M машинах, чтобы интегрировать выходные результаты N машин. Поскольку это так хлопотно, позвольте MapReduce помочь нам в этом!
MapReduce учитывает некоторые проблемы, о которых мы не хотим думать при написании программ, например, какразделить входные данные,Как планировать в кластере,Обработка ошибок исключений для машин и оборудования,связь между машинамиПодождите, MapReduce абстрагирует эти проблемы,Параллельная реализация, отказоустойчивая обработка, распределение данных, балансировка нагрузкиИ поэтому подробности скрыты. Таким образом, нам нужно только реализовать программу в автономном сценарии, и нам не нужно думать, как разделить входной файл и как его интегрировать.
1.2 Модель программирования
В приведенном выше примере мы видим, что у нас естьВходной набор (новости),выходной набор(табличные данные о частотности имён) иПромежуточный набор результатов(табличные данные каждого из N друзей). Процесс перехода от входного набора к промежуточному набору результатов — это процесс сопоставления, а процесс от промежуточного набора результатов к выходному набору — процесс сокращения.
Мы рассматриваем входной набор какКоллекция пар ключ/значение, ключ — это имя документа каждой новостной статьи, а значение — это содержимое каждой новостной статьи. Точно так же промежуточный набор результатов также являетсяКоллекция пар ключ/значение, после карты ключом здесь становится название звезды, а значением является количество раз появления названия звезды. Поскольку это вывод N промежуточных наборов результатов, для N промежуточных наборов результатов значение, соответствующее ключу, станетсписок(значение), который представляет собой список значений. После ввода этого списка значений для сокращения будет получен результирующий набор значений, включая количество вхождений каждой звезды. Это можно резюмировать так:
Input1 -> Map -> a,1 b,1 c,1
Input2 -> Map -> b,1
Input3 -> Map -> a,1 c,1
| | |
| | -> Reduce -> c,2
| -----> Reduce -> b,2
---------> Reduce -> a,2
// 可以得到球星a,b,c出现次数都为2
// 表达式可以表达为
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
1.3 Реализация фреймворка
Выше приведена схема процесса выполнения MapReduce, гдеUserPraogramпрограммы, написанные нашими пользователями,masterМожно понимать как планировщик. Слева направо: входной набор, операция карты, промежуточный набор результатов, операция сокращения и выходной набор. Порядковые номера (1)–(6) на рисунке показывают весь процесс планирования и выполнения MapReduce.
(0) Во-первых, функция разделения библиотеки MapReduce в пользовательской программе разделит набор входных файлов на N частей (функция разделения и значение N могут быть указаны пользовательской программой), каждая из которых имеет размер от 16 МБ до 64 МБ. по размеру (также контролируется необязательными параметрами) size).
(1) (2) Затем будет скопировано большое количество пользовательских программ.Одна из скопированных программ является главной, а остальные программыworker. Этот мастер является бригадиром и будет назначать рабочим задачи N Map и M Reduce.
(3) Рабочий процесс считывает один или несколько сегментов в разделенном наборе файлов и анализирует их из входных данных.пара ключ/значение, как в пользовательской программеВходные параметры функции карты, который выводит промежуточный набор результатов и кэширует его вОЗУсередина.
(4) Кэшированный в памяти промежуточный набор результатов будет периодически сохраняться вЛокальный дискВышеупомянутое, через функцию разделения для записи M регионов, расположение этих M регионов будет передано мастеру, и мастер сообщит информацию рабочим, которым назначена задача «Уменьшить».
(5) Рабочие, которым назначено сокращение, будут считывать промежуточный результат, установленный RPC, и объединять содержимое с тем же ключом в отсортированном виде.
(6) После сортировки каждый уникальный ключ в промежуточном наборе результатов и соответствующий набор значений (список (значение)) будут переданы функции сокращения пользовательской программы в качестве входных параметров и, наконец, выведены в набор выходных файлов.
1.4 Отказоустойчивая обработка
После стольких разговоров мы видим, что MapReduce предназначен для больших масштабов.При организации сотен или тысяч рабочих для обработки большого объема данных неизбежны «забастовки» рабочих (сбой сервера или простои), или Мастер ошибается из-за огромного давления со стороны руководства, и все эти вопросы необходимо учитывать при разработке MapReduce.
1.4.1 Сбой работника
Во время текущего процесса, поскольку мастер назначил рабочие задачи, он захочет узнать условия работы рабочих, поэтому мастер будет периодическиpingкаждый рабочий. Если в течение определенного периода времени от работника не получена обратная связь, считается, что работник потерпел неудачу. Что касается задачи, которая была назначена этому работнику ранее, она считается незавершенной и устанавливается в исходное состояние простоя, которое может быть назначено другим работникам для обработки.
Предположим, естьworker Aа такжеworker B, обработчик A обычно дает сбой, и задача обработчика A берется на себя обработчиком B. В настоящее время промежуточный результирующий набор, сгенерированный задачей map рабочего процесса A, хранится локально в worker A, но поскольку worker A потерпел крах и к нему нельзя получить доступ, worker B должен повторно выполнить задачу map, которая была выполнена worker-ом. A, чтобы сгенерировать промежуточный набор результатов, и уведомить других рабочих, которые выполняют сокращение, чтобы они прекратили чтение данных рабочего процесса A и прочитали данные рабочего процесса B; и если это задача сокращения рабочего процесса A, поскольку сгенерированный выходной набор был сохранен в глобальной файловой системе этого делать не нужно.
1.4.2 основной отказ
Сбои основной задачи могут быть зарегистрированы путем периодической регистрацииcheckpoint(Запись основных данных через регулярные промежутки времени) для решения, то есть, если текущая основная задача завершается сбоем, последняя контрольная точка может быть прочитана для запуска другого основного процесса. Конечно, поскольку есть только один мастер, когда мастер выходит из строя, клиент должен быть проинформирован о том, что мастер в настоящее время недоступен, mapreduce прерывается, и клиент может выполнить повторное выполнение в зависимости от ситуации.
1.5 Хранение
В приведенной выше реализации промежуточные результаты и выходные наборы должны храниться. В больших кластерах пропускная способность сети является относительно дефицитным ресурсом. Чтобы не допустить, чтобы пропускная способность сети стала узким местом производительности, она обычно хранится на локальных дисках для экономии пропускной способности сети. .ресурс. Хранилище MapReduce основано на файловой системе GFS (Google File System).Основная реализация состоит в том, чтобы разделить файл на блоки по 64 МБ, и каждый блок будет иметь копию, хранящуюся на другом компьютере, для обеспечения безопасности данных. Когда рабочий процесс MapReduce не может обработать задачу, мастер организует ближайший рабочий компьютер для его выполнения, чтобы обеспечить меньшее потребление ресурсов.
2. Практическое написание MapReduce
Адрес MIT6.824 Lab1 MapReduce:P DOS. Участвовал. Персик. Quota/6.824/labs/…
2.1 Окружающая среда Строительство
2.1.1 Среда Linux
Используемая среда — WSL (система Windows для Linux), ubuntu16.04 используется с xshell.
Настройка WSL и использование справочных статей:zhuanlan.zhihu.com/p/90173113
Используйте справочные статьи с xshell:woohoo.brief.com/afraid/039411's 2 от…
Если вы столкнулись с проблемой, что xshell не может подключиться к ubuntu после перезагрузки, вы можете перезапустить ssh и повторить попытку, то есть открыть ubuntu и ввести
sudo service ssh --full-restart
2.1.2 перейти на местный стандарт
Загрузка официального сайта Golang:golang.org/dl/
1) Откройте официальный сайт, чтобы загрузить соответствующую версию, я скачал здесь 1.14.4.
2) в убунту~
построить следующийgo
папку, загрузите сжатый пакет
mkdir ~/go
cd ~/go
wget https://dl.google.com/go/go1.14.4.linux-amd64.tar.gz
3) Разархивируйте сжатый пакет в/usr/local
Под содержанием
tar -C /usr/local -zxvf go1.14.4.linux-amd64.tar.gz
4) Добавить/usr/loacl/go/bin
каталог в переменную PATH, добавьте в/etc/profile
или$HOME/.profile
, первый настраивается для каждого пользователя системы, второй настраивается только для текущего пользователя, а последний выполняется путем настройки переменных среды по умолчанию.
vim ~/.bashrc
# 在最后一行添加
export GOROOT=/usr/local/go
export PATH=$PATH:$GOROOT/bin
# 保存退出后生效配置文件
source ~/.bashrc
2.2 WordCount — первая программа MapReduce
Согласно нашему предыдущему базовому пониманию MapReduce, программа MapReduce в основном состоит из трех частей: программа Map, программа Reduce и программа планирования MapReduce.
Далее мы начнем писать нашу первую программу MapReduce с примерами кода, предоставленными курсом MIT6.824.
2.2.1 Пример кода — серийная реализация
1) Сначала мы можем клонировать пример кода.
$ git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824
$ cd 6.824
2) Далее посмотрите на структуру каталогов
src
├─mrapps
│ ├─wc.go
│ ├─ indexer.go
│ └─...
├─main
│ ├─mrsequential.go
│ ├─mrmaster.go
| ├─mrworker.go
| ├─pg*.txt
| └─...
├─mr
├─master.go
├─worker.go
└─rpc.go
- mrapps: включает в себя пример кода, который уже написал карты и программы сокращения, такие как wc.go — программа подсчета, indexer.go — программа текстового индексатора
- main: включает программу последовательного планирования mapreduce mrsequential.go и некоторые входные данные pg*.go
- mr: Мы включаем mster.go, worker.go, rpc.go, параллельную версию планировщика mapreduce, которую нам нужно реализовать.
3) Попробуйте запустить последовательную версию планировщика для реализации программы подсчета
$ cd ~/6.824
$ cd src/main
$ go build -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run mrsequential.go wc.so pg*.txt
$ more mr-out-0
// go build -buildmode=plugin ../mrapps/wc.go这段会执行构建一个插件wc.so以便mrsequential.go调用。
// go run mrsequential.go wc.so pg*.txt表示编译并运行mrsequential.go。其中wc.so是插件,pg*是输入数据集。
// more mr-out-0可以查看输出数据集,其中mr-out-0即输出数据。
ps: Если вы хотите просто понять язык го, вы можете перейти кtour.go-zh.org/listэтот сайт. Этот веб-сайт эквивалентен простому веб-сайту с языковым справочником, его чтение занимает около 2 часов, и вы можете иметь предварительное понимание.
4) После выполнения приведенного выше оператора вам должно быть очень любопытно, как реализован уровень кода! Давайте сначала посмотрим на реализацию map и reduce в программе подсчета wc.go.
// wc.go
// map程序
// map程序对于每一个输入文件会执行一次
// 第一个参数是输入文件的名称,第二个参数是输入文件的完整内容
// 输出是key/value对的切片集合(相当于数组,数组里每一个对象是key/value对)
func Map(filename string, contents string) []mr.KeyValue {
// 定义一个方法,判断输入r是否是一个字母字符(可以用来切分文章中的单词遇到空格返回false)
// 输入是一个rune的数据类型(等同于int32,常用来处理unicode或utf-8字符)
// 返回一个布尔值,r是字母字符则返回false,否则返回true
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// 将文章内容切分成单词数组
// 例如"hello world"就会切分成["hello" "world"]
// FieldsFunc是一个可以按照自定义规则切分字符的函数
words := strings.FieldsFunc(contents, ff)
// 生成一个key/value对集合,其中key是文章中的单词,value都为1
kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}
// reduce程序
// 所有map任务生成的中间结果集会作为reduce的入参被执行一次
// 入参key是单词,values是key出现次数的集合,values中每一个的值都为1,所以values的大小就是单词key出现的次数
// 可参考下图:
// Input1 -> Map -> a,1 b,1 c,1
// Input2 -> Map -> b,1
// Input3 -> Map -> a,1 c,1
// | | |
// | | -> Reduce -> c,2
// | -----> Reduce -> b,2
// ---------> Reduce -> a,2
func Reduce(key string, values []string) string {
// strconv函数将int转化为string返回
return strconv.Itoa(len(values))
}
Далее посмотрите на планировщик mrsequential.go.
// mrsequential.go
// ByKey是用于将中间结果集进行排序用的
// 排序后key相同的就会排在一起,就方便作为reduce的入参
type ByKey []mr.KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func main() {
// 运行时的参数个数若少于3则报错并退出程序
// 在go run mrsequential.go ../mrapps/wc.so pg*.txt中
// 参数0:mrsequential.go
// 参数1:wc.so
// 参数2:pg*.txt(这里其实输入文件算是多个参数)
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: mrsequential ../mrapps/xxx.so inputfiles...\n")
os.Exit(1)
}
// --------------------------map任务开始--------------------------
// 从参数1(wc.so)中读取map程序和reduce程序
mapf, reducef := loadPlugin(os.Args[1])
// 读取每一个文件作为map程序的入参,并输出中间结果
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
// 打开文件
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
// 读取文件内容
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
// 输入map程序
kva := mapf(filename, string(content))
// 中间结果
intermediate = append(intermediate, kva...)
}
// --------------------------map任务结束--------------------------
// 对中间结果进行排序
sort.Sort(ByKey(intermediate))
// 创建输出文件mr-out-0
oname := "mr-out-0"
ofile, _ := os.Create(oname)
// --------------------------reduce任务开始--------------------------
// 由于中间结果集是有序的,所以相同的key/value对会连续放置在一起
// 只需要将key相同的中间结果集作为reduce程序的输入即可
i := 0
for i < len(intermediate) {
// i表示key相同的单词的第一个的位置
// j表示key相同的单词的最后一个的后一位
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
// 遍历从i到j之间的key/value对,全部都是同样的key并且value都为1
// 并作为reduce的入参
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// 输出reduce的结果到mr-out-0文件中
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
// --------------------------reduce任务结束--------------------------
ofile.Close()
}
// 加载插件中的方法
// 入参插件文件名
// 返回值为map方法和reduce方法
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
p, err := plugin.Open(filename)
if err != nil {
log.Fatalf("cannot load plugin %v", filename)
}
// 查找插件中Map方法,并赋给mapf
xmapf, err := p.Lookup("Map")
if err != nil {
log.Fatalf("cannot find Map in %v", filename)
}
mapf := xmapf.(func(string, string) []mr.KeyValue)
// 查找插件中的Reduce方法并赋给reducef
xreducef, err := p.Lookup("Reduce")
if err != nil {
log.Fatalf("cannot find Reduce in %v", filename)
}
reducef := xreducef.(func(string, []string) string)
//返回map和reduce方法
return mapf, reducef
}
2.2.2 Реализация распределенного сценария — реализация мастера и исполнителя
Вы можете видеть, что приведенный выше код является простой последовательной реализацией. Но сила mapreduce заключается в реализации master и worker в распределенных сценариях Далее мы покажем это через реализацию master и worker.
Интерактивная демонстрацияследующее:
можно увидетьОсновная мысльДа:
- Передача данных RPC используется для взаимодействия между работником и мастером.Взаимодействия включают в себя:
- reqTask<-->HandleTaskReq: Рабочий запрашивает задачу у мастера.Задача должна включать этап задачи (карта/уменьшение), номер задачи, имя входного файла, общее количество входных файлов и общее количество редуцировать обработку (промежуточный набор результатов нужно разделить на несколько задач редукции для обработки)** и другую информацию.
- reportTask<-->HandleTask: после того, как рабочий процесс обработает задачу map/reduce, он сообщает мастеру о статусе обработки задачи, а возвращаемая информация должна содержать толькоНомер задачи и флаг завершенияВот и все.
- Мастеру сначала нужно запустить серию операций инициализации:
- Инициализировать информацию о задаче.
- Поместить задачу в очередь задач.
- Включите прослушивание RPC.
- Мастер будет слушать RPC-запрос рабочего и обрабатывать его соответствующим образом:
- HandleTaskReq: обрабатывает вызов RPC задачи запроса рабочего процесса, извлекает задачу из очереди задач и отправляет ее рабочему процессу. и записать локальностатус задачизав исполнении, и записьВремя начала выполнения задачиВ случае тайм-аута обработки воркера задача может быть снова помещена в очередь задач.
- HandleTaskReport: Обработка вызова RPC состояния задачи отчета работника и определение того, завершена ли задача. Если она завершена, измените статус задачи локальной записи на завершенную, в противном случае поместите задачу обратно в очередь задач и измените локальную запись. статус задачи в очередь.
- Мастер должен иметь запланированную задачу для обнаружения операции задачи, в том числе:
- Независимо от того, истекло ли время ожидания задачи, если оно истекло, задачу необходимо вернуть в очередь задач.
- Были ли выполнены задачи на этапе сопоставления, если они завершены, перейдите на этап сокращения и выполните операцию инициализации фазы сокращения.
- Выполнены ли задачи на этапе сокращения, и если да, то завершены ли они.
2.2.3 Реализация конкретного кода
Приступим к написанию программы, которая состоит из нескольких программных файлов, а именно:
-
/mr/rpc.go — определение файла передачи запроса RPC
-
/main/mrmaster.go — главный лаунчер
-
/main/mrworker.go — программа запуска воркеров
-
/mr/master.go — конкретная реализация мастера
-
/mr/worker.go - конкретная реализация рабочего
Прежде всего, нам нужно определить структуру запроса RPC, Здесь нам нужно определить три части: сообщение задачи, сообщение задачи запроса и сообщение задачи отчета, которые определяются следующим образом:
// rpc.go
type TaskStatus string
type TaskPhase string
type TimeDuration time.Duration
// 任务状态常量
const (
// 就绪
TaskStatusReady TaskStatus = "ready"
// 队列中
TaskStatusQueue TaskStatus = "queue"
// 执行中
TaskStatusRunning TaskStatus = "running"
// 已完成
TaskStatusFinish TaskStatus = "finish"
// 任务错误
TaskStatusErr TaskStatus = "error"
)
//任务阶段常量
const (
MapPhase TaskPhase = "map"
ReducePhase TaskPhase = "reduce"
)
// 任务定义
type Task struct {
// 操作阶段:map/reduce
TaskPhase TaskPhase
// map个数
MapNum int
// reduce个数
ReduceNum int
// 任务序号
TaskIndex int
// 文件名
FileName string
// 是否完成
IsDone bool
}
// 请求任务参数
type ReqTaskArgs struct {
// 当前worker存活,可以执行任务
WorkerStatus bool
}
// 请求任务返回值
type ReqTaskReply struct {
// 返回一个任务
Task Task
// 是否完成所有任务
TaskDone bool
}
// 报告任务参数
type ReportTaskArgs struct {
// 当前worker存活,可以执行任务
WorkerStatus bool
// 任务序号
TaskIndex int
// 是否完成
IsDone bool
}
// 报告任务返回值
type ReportTaskReply struct {
// master响应是否处理成功
MasterAck bool
}
mrmasterа такжеmrworkerОсновные и рабочие пусковые установки:
- мистер мастер позвонит
/mr/master.go
изMakeMaster()
для запуска мастера входными параметрами являются набор имен файлов иnReduce
(Указывает, что этап карты должен разделить промежуточный ключ наnReduce
уменьшить количество задач). Вызывается циклически после запуска мастераDone()
Определите, выполнена ли задача. - mrworker загрузит карту и уменьшит функции и передаст
Worker()
Zhonglai запускает рабочий.
masterИдеи реализации:
- Определить структуру мастера
- Очередь задач, которая используется для управления запросами задач.
- запись коллекции входных файлов
- Сохранить количество карт/уменьшить
- Стадия задачи, используемая для обозначения стадии выполнения задачи.
- Статус задачи, используемый для записи глобального статуса задачи
- Мьютекс, используемый для блокировки ресурсов
- Понимание реализации RPC на языке Go: по определению
func (m *Master) method(args *Args, reply *Reply) error
Для этого при прослушивании запроса RPC будет введен соответствующий метод в соответствии с именем метода. Достаточно реализовать функцию обработки запроса задачи и отчета задачи. - существует
Done()
Чтобы реализовать оценку различных сценариев в функции, следует отметить, что переключателю в языке Go не нужно заполнять разрыв, и он будет прерываться по умолчанию.
workerИдеи реализации:
- Реализуйте основной поток, зацикливайте задачи запроса, выполняйте задачи и сообщайте о задачах.
- Реализует вызовы RPC, которые запрашивают задачи и выполняют задачи.
- Для реализации исполнителя задачи карты ему необходимо выводить в файлы nReduce в соответствии со значением ключа. Формат имени файла
mr-x-y
, x — номер задачи карты, y —ihash(key)%nreduce
значение после. Кроме того, хранение промежуточных файлов может быть в формате JSON. - Реализуйте исполнитель задачи сокращения, который аналогичен выполнению задачи карты.После чтения содержимого файла отсортируйте значения ключей и выполните функцию сокращения.
Основная реализация выглядит следующим образом:
// master.go
// 任务状态定义
type TaskState struct {
// 状态
Status TaskStatus
// 开始执行时间
StartTime time.Time
}
// Master结构定义
type Master struct {
// 任务队列
TaskChan chan Task
// 输入文件
Files []string
// map数目
MapNum int
// reduce数目
ReduceNum int
// 任务阶段
TaskPhase TaskPhase
// 任务状态
TaskState []TaskState
// 互斥锁
Mutex sync.Mutex
// 是否完成
IsDone bool
}
// 启动Master
func MakeMaster(files []string, nReduce int) *Master {
m := Master{}
// 初始化Master
m.IsDone = false
m.Files = files
m.MapNum = len(files)
m.ReduceNum = nReduce
m.TaskPhase = MapPhase
m.TaskState = make([]TaskState, m.MapNum)
m.TaskChan = make(chan Task, 10)
for k := range m.TaskState {
m.TaskState[k].Status = TaskStatusReady
}
// 开启线程监听
m.server()
return &m
}
// 启动一个线程监听worker.go的RPC请求
func (m *Master) server() {
rpc.Register(m)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", "127.0.0.1:1234")
os.Remove("mr-socket")
l, e := net.Listen("unix", "mr-socket")
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
// 处理任务请求
func (m *Master) HandleTaskReq(args *ReqTaskArgs, reply *ReqTaskReply) error {
fmt.Println("开始处理任务请求...")
if !args.WorkerStatus {
return errors.New("当前worker已下线")
}
// 任务出队列
task, ok := <-m.TaskChan
if ok == true {
reply.Task = task
// 任务状态置为执行中
m.TaskState[task.TaskIndex].Status = TaskStatusRunning
// 记录任务开始执行时间
m.TaskState[task.TaskIndex].StartTime = time.Now()
} else {
// 若队列中已经没有任务,则任务全部完成,结束
reply.TaskDone = true
}
return nil
}
// 处理任务报告
func (m *Master) HandleTaskReport(args *ReportTaskArgs, reply *ReportTaskReply) error {
fmt.Println("开始处理任务报告...")
if !args.WorkerStatus {
reply.MasterAck = false
return errors.New("当前worker已下线")
}
if args.IsDone == true {
// 任务已完成
m.TaskState[args.TaskIndex].Status = TaskStatusFinish
} else {
// 任务执行错误
m.TaskState[args.TaskIndex].Status = TaskStatusErr
}
reply.MasterAck = true
return nil
}
// 循环调用 Done() 来判定任务是否完成
func (m *Master) Done() bool {
ret := false
finished := true
m.Mutex.Lock()
defer m.Mutex.Unlock()
for key, ts := range m.TaskState {
switch ts.Status {
case TaskStatusReady:
// 任务就绪
finished = false
m.addTask(key)
case TaskStatusQueue:
// 任务队列中
finished = false
case TaskStatusRunning:
// 任务执行中
finished = false
m.checkTask(key)
case TaskStatusFinish:
// 任务已完成
case TaskStatusErr:
// 任务错误
finished = false
m.addTask(key)
default:
panic("任务状态异常...")
}
}
// 任务完成
if finished {
// 判断阶段
// map则初始化reduce阶段
// reduce则结束
if m.TaskPhase == MapPhase {
m.initReduceTask()
} else {
m.IsDone = true
close(m.TaskChan)
}
} else {
m.IsDone = false
}
ret = m.IsDone
return ret
}
// 初始化reduce阶段
func (m *Master) initReduceTask() {
m.TaskPhase = ReducePhase
m.IsDone = false
m.TaskState = make([]TaskState, m.ReduceNum)
for k := range m.TaskState {
m.TaskState[k].Status = TaskStatusReady
}
}
// 将任务放入任务队列中
func (m *Master) addTask(taskIndex int) {
// 构造任务信息
m.TaskState[taskIndex].Status = TaskStatusQueue
task := Task{
FileName: "",
MapNum: len(m.Files),
ReduceNum: m.ReduceNum,
TaskIndex: taskIndex,
TaskPhase: m.TaskPhase,
IsDone: false,
}
if m.TaskPhase == MapPhase {
task.FileName = m.Files[taskIndex]
}
// 放入任务队列
m.TaskChan <- task
}
// 检查任务处理是否超时
func (m *Master) checkTask(taskIndex int) {
timeDuration := time.Now().Sub(m.TaskState[taskIndex].StartTime)
if timeDuration > MaxTaskRunTime {
// 任务超时重新加入队列
m.addTask(taskIndex)
}
}
// worker.go
type KeyValue struct {
Key string
Value string
}
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by MapPhase.
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
// Worker 主线程,循环请求任务以及报告任务
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
// 请求任务
reply := ReqTaskReply{}
reply = reqTask()
if reply.TaskDone {
break
}
// 执行任务
err := doTask(mapf, reducef, reply.Task)
if err != nil {
reportTask(reply.Task.TaskIndex, false)
}
// 报告任务结果
reportTask(reply.Task.TaskIndex, true)
}
return
}
// 请求任务
func reqTask() ReqTaskReply {
// 声明参数并赋值
args := ReqTaskArgs{}
args.WorkerStatus = true
reply := ReqTaskReply{}
// RPC调用
if ok := call("Master.HandleTaskReq", &args, &reply); !ok {
log.Fatal("请求任务失败...")
}
return reply
}
// 报告任务结果
func reportTask(taskIndex int, isDone bool) ReportTaskReply {
// 声明参数并赋值
args := ReportTaskArgs{}
args.IsDone = isDone
args.TaskIndex = taskIndex
args.WorkerStatus = true
reply := ReportTaskReply{}
// RPC调用
if ok := call("Master.HandleTaskReport", &args, &reply); !ok {
log.Fatal("报告任务失败...")
}
return reply
}
// 执行任务
func doTask(mapf func(string, string) []KeyValue, reducef func(string, []string) string, task Task) error {
if task.TaskPhase == MapPhase {
err := DoMapTask(mapf, task.FileName, task.TaskIndex, task.ReduceNum)
return err
} else if task.TaskPhase == ReducePhase {
err := DoReduceTask(reducef, task.MapNum, task.TaskIndex)
return err
} else {
log.Fatal("请求任务的任务阶段返回值异常...")
return errors.New("请求任务的任务阶段返回值异常")
}
return nil
}
// 执行map任务
func DoMapTask(mapf func(string, string) []KeyValue, fileName string, mapTaskIndex int, reduceNum int) error {
fmt.Println("开始处理Map任务...")
// 打开文件
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("cannot open %v", fileName)
return err
}
// 读取文件内容
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", fileName)
return err
}
file.Close()
// 输入map程序
kva := mapf(fileName, string(content))
for i := 0; i < reduceNum; i++ {
// 中间输出文件名mr-X-Y
intermediateFileName := intermediateName(mapTaskIndex, i)
fmt.Printf("doMap文件名%s创建\n", intermediateFileName)
// 创建中间输出文件,并存储为JSON格式
file, _ := os.Create(intermediateFileName)
enc := json.NewEncoder(file)
for _, kv := range kva {
if ihash(kv.Key)%reduceNum == i {
enc.Encode(&kv)
}
}
file.Close()
}
return nil
}
// 执行reduce任务
func DoReduceTask(reducef func(string, []string) string, mapNum int, reduceTaskIndex int) error {
fmt.Println("开始处理Reduce任务...")
// map:string->[]string
res := make(map[string][]string)
for i := 0; i < mapNum; i++ {
// 打开中间文件
intermediateFileName := intermediateName(i, reduceTaskIndex)
file, err := os.Open(intermediateFileName)
if err != nil {
log.Fatalf("cannot open %v", intermediateFileName)
return err
}
// 反序列化JSON格式文件
dec := json.NewDecoder(file)
// 读取文件内容
for {
var kv KeyValue
err := dec.Decode(&kv)
if err != nil {
break
}
_, ok := res[kv.Key]
if !ok {
res[kv.Key] = make([]string, 0)
}
res[kv.Key] = append(res[kv.Key], kv.Value)
}
file.Close()
}
// 提取key值,用于排序
var keys []string
for k := range res {
keys = append(keys, k)
}
// key值排序
sort.Strings(keys)
outputFileName := outputName(reduceTaskIndex)
fmt.Printf("doReduce输出%s文件名\n", outputFileName)
outputFile, _ := os.Create(outputFileName)
for _, k := range keys {
output := reducef(k, res[k])
// 输出reduce的结果到mr-out-X文件中
fmt.Fprintf(outputFile, "%v %v\n", k, output)
}
outputFile.Close()
return nil
}
Наконец, тестовый скрипт (/main/test-mr.sh
), чтобы проверить программу, или запустив программу.
// 脚本验证
$ cd ~/6.824
$ cd src/main
$ sh test-mr.sh
// 运行程序验证
$ cd ~/6.824
$ cd src/main
$ go build -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run mrmaster.go pg*.txt
$ go run mrworker.go ../mrapps/wc.so
// 程序运行结束后查看结果
$ cat mr-out-* | sort | more
Суммировать
Выше я сделал краткую запись изучения mapreduce в начале, за этот период я прочитал статью, изучил всю идею mapreduce, осознал прелесть распределенного, изучил golang от поверхностного к глубокому и, наконец, завершил Lab1. , что является большим достижением. Наконец, если у вас есть какие-либо сомнения или ошибки, пожалуйста, исправьте и обменяйте!
Прикрепите адрес реализации кода:
Код реализует адрес github:GitHub.com/ЯнусЛаЛаЛа/Ми…