Недавно мне посчастливилось прочитать три основные статьи о распределенной системе Google.Основываясь на принципе, что хорошая память не так хороша, как плохое письмо, я хотел бы рассказать о небольшом опыте арендодателя в разработке распределенных систем~
Я полагаю, что учащиеся, которые использовали Hadoop, будут выглядеть так же, ожидая вывода результата.INFO : 2020-01-17 11:44:14,132 Stage-11 map = 0%, reduce = 0%Он показывает процесс выполнения MapReduce.Ниже мы также расширим MapReduce, объясним принцип выполнения MapReduce и реализуем мини-версию MapReduce в соответствии с документом Google.
Что такое MapReduce
MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.
Как указано в статье Google MapReduce, MapReduce — это модель программирования и связанная с ней реализация алгоритмической модели для обработки и создания очень больших наборов данных. Сначала пользователь создает функцию Map для обработкиkey/value pairНабор данных выходного промежуточного продукта основан наkey/value pair, а затем создайте функцию Reduce для объединения всех значений промежуточного значения с одним и тем же значением промежуточного ключа.
Пример MapReduce
Принцип карты и уменьшения
Принцип модели программирования MapReduce таков: используйте вводkey/value pairсбор для получения выводаkey/value pairсобирать. Пользователи библиотеки MapReduce выражают это вычисление с помощью двух функций: Map и Reduce.
Map :Пользовательская функция карты принимает вводkey/value pairзначение, затем производит промежуточноеkey/value pairколлекция значений. Библиотека MapReduce агрегирует все значения промежуточного значения с одним и тем же значением промежуточного ключа и передает его в функцию Reduce.
Reduce :Определяемые пользователем функции сокращения принимают набор значений промежуточного ключа и связанное значение. Функция Reduce объединяет эти значения значений, чтобы сформировать меньший набор значений значений. Обычно за вызов функции Reduce создается только 0 или 1 выходное значение. Обычно мы подаем в функцию Reduce промежуточные значения через итератор, чтобы мы могли обрабатывать большие наборы значений, которые не все помещаются в память.
Пример применения Map и Reduce
- Подсчитайте количество вхождений каждого слова в документе: функция Map обрабатывает документ, разбивает слова в документе и выводит (слово, 1). Функция Reduce складывает значения значений одного и того же слова и выдает результат (слово, общее количество записей).
- Инвертированный индекс: функция «Карта» анализирует каждый документ и выводит список (слово, номер документа), вход функции «Уменьшить» — это все (слово, номер документа) заданного слова, сортирует все номера документов и выводит (слово, номер документа). список (номер документа)). Все выходные наборы образуют простой инвертированный индекс, который отслеживает положение слов в документах с помощью простого алгоритма.
Схема процесса выполнения MapReduce
- Библиотека MapReduce, вызываемая пользовательской программой, сначала делит входной файл на M фрагментов данных, и размер каждого фрагмента данных обычно составляет от 16 МБ до 64 МБ (размером каждого фрагмента данных можно управлять с помощью дополнительных параметров). Затем пользовательская программа создает множество копий программы в кластере.
- Одна из этих копий программы имеет специальную программу
master. Остальные программы в копииworkerпрограмма, поmasterЗадания. Есть задачи M Map и задачи R Reduce, которые нужно назначить,masterВыделить задачу «Карта» или «Уменьшить» до простояworker. - Назначено задаче карты
workerПрограмма считывает соответствующий фрагмент входных данных и анализирует его из фрагмента входных данных.key/value pair, затем поставьтеkey/value pairПередается определяемой пользователем функции Map, которая генерируется и выводится функцией Map.key/value pair, и кэшируется в памяти. - в кэше
key/value pairОн делится на R областей с помощью функции разделения, а затем периодически записывается на локальный диск. кэшированныйkey/value pairМесто хранения на локальном диске будет возвращеноmaster,Зависит отmasterотвечает за повторную передачу этих мест хранения вReduce worker. - когда
Reduce workerпрограмма полученаmasterПосле того, как программа отправит информацию о месте хранения данных, используйте RPC изMap workerКэшированные данные считываются с диска хоста. когдаReduce workerПосле считывания всех промежуточных данных данные с одинаковым значением ключа агрегируются путем сортировки ключа. Поскольку множество разных значений ключа сопоставляются с одной и той же задачей Reduce, необходимо упорядочивать. Если промежуточные данные слишком велики для сортировки в памяти, то сортировка выполняется извне. -
Reduce workerПрограмма просматривает отсортированные промежуточные данные для каждого уникального значения промежуточного ключа,Reduce workerПрограмма передает набор этого значения ключа и связанного с ним значения промежуточного значения пользовательской функции сокращения. Вывод функции Reduce добавляется к выходному файлу раздела-владельца. - Когда все задачи Map и Reduce выполнены,
masterРазбудите пользовательскую программу. В этот момент вызов MapReduce в пользовательской программе возвращается.
После успешного завершения задачи выходные данные MapReduce сохраняются в выходных файлах R (для каждой задачи Reduce создается один выходной файл, а имя файла задается пользователем). Как правило, пользователям не нужно объединять эти выходные файлы R в один файл, мы часто используем эти файлы в качестве входных данных для другого MapReduce или используем их в другом распределенном приложении, которое может обрабатывать несколько разделенных файлов.
Реализация программы MapReduce
Суть MapReduce заключается в реализации логического кода Map and Reduce, который показывает, что арендодатель завершит реализацию Map и Reduce в процессе выполнения Map и Reduce, описанном выше.
реализовать карту
1. Следующая функция doMap управляет задачей карты: она читает входной файл (inFile), вызывает определяемую пользователем функцию карты (mapF) для содержимого этого файла, а затем разбивает выходные данные mapF на промежуточные файлы nReduce.
2. Каждая задача сокращения соответствует промежуточному файлу. Имя файла включает номер задачи карты и номер задачи сокращения. Используйте имя файла, сгенерированное функцией reduceName, в качестве промежуточного файла для задачи сокращения. Вызовите ihash() для каждого ключевого мода nReduce, чтобы выбрать соответствующую задачу сокращения.
3. mapF — это функция карты, предоставляемая приложением. Первым аргументом должно быть имя входного файла. Второй аргумент должен быть содержимым всего входного файла. mapF() возвращает срез, содержащий пары ключ/значение для сокращения.
4. В следующей программе данные, обработанные mapF, записываются в файл в формате json.Для удобства обработки данных каждый фрагмент данных, обрабатываемых в следующей программе, отделяется символами новой строки.
func reduceName(jobName string, mapTask int, reduceTask int) string {
return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
func doMap(
jobName string, // MapReduce 的任务名称
mapTask int, // 当前执行的 mapTask
inFile string, // 输入的的文件
nReduce int, // reduceTask 的数量
mapF func(filename string, contents string) []KeyValue, // 用户自定义的 map 函数
) {
f, err := os.Open(inFile)
if err != nil {
debug("open file err %v", err)
}
defer f.Close()
dat, err := ioutil.ReadAll(f)
if err != nil {
debug("open map file err %v", err)
}
res := mapF(inFile, string(dat))
for _, kv := range res {
hash := ihash(kv.Key)
r := hash % nReduce
// mrtmp.xxx-0-0
fd, err := os.OpenFile(reduceName(jobName, mapTask, r), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
debug("open mrtmp.xxx file err %v", err)
continue
}
enc := json.NewEncoder(fd)
if err := enc.Encode(&kv); err != nil {
debug("encode json err %v", err)
continue
}
fd.Close()
}
}
func ihash(s string) int {
h := fnv.New32a()
h.Write([]byte(s))
return int(h.Sum32() & 0x7fffffff)
}
Реализовать Уменьшить
doReduce управляет задачей редукции: она читает промежуточный файл задачи, сортирует пары данных в промежуточном файле по ключу, вызывает определяемую пользователем функцию reduceF для каждого ключа и записывает выходные данные reduceF на диск.
func reduceName(jobName string, mapTask int, reduceTask int) string {
return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
func doReduce(
jobName string, // MapReduce 的任务名称
reduceTask int, // 当前运行的 reduce 任务的任务号
outFile string, // 结果输出的文件路径
nMap int, // map 任务的个数
reduceF func(key string, values []string) string, // 用户的自定义 reduce 函数
) {
kvMap := make(map[string][]string)
for i := 0; i < nMap; i++ {
func() {
inFileName := reduceName(jobName, i, reduceTask)
inFile, err := os.Open(inFileName)
if err != nil {
panic("can't open file:" + inFileName)
}
defer inFile.Close()
// Read and Decoder the file
var kv KeyValue
for decoder := json.NewDecoder(inFile); decoder.Decode(&kv) != io.EOF; {
kvMap[kv.Key] = append(kvMap[kv.Key], kv.Value)
}
}()
}
var keys []string
// sort by key
for k := range kvMap {
keys = append(keys, k)
}
sort.Strings(keys)
// reduce
outfd, err := os.Create(outFile)
if err != nil {
panic("can't create file:" + outFile)
}
defer outfd.Close()
enc := json.NewEncoder(outfd)
for _, k := range keys {
reducedValue := reduceF(k, kvMap[k])
enc.Encode(KeyValue{Key: k, Value: reducedValue})
}
}
Инкапсуляция doMap и doReduce
Следующая функция предназначена для последовательного вызова doMap и doReduce для генерации результатов задач MapReduce и вывода их в файл результатов.
func Sequential(jobName string, files []string, nreduce int,
mapF func(string, string) []KeyValue,
reduceF func(string, []string) string,
) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}
Реализовать статистику частоты слов и инвертированный индекс с помощью MapReduce.
Выше мы упомянули примеры MapReduce в практических приложениях, а ниже мы сделаем простую реализацию этих двух примеров.
Реализовать статистику частоты слов
Чтобы реализовать функцию статистики частотности слов, наша идея использования фреймворка MapReduce заключается в реализации пользовательских карт и функций сокращения: 1, карта: прочитайте документ, извлеките слова из документа одно за другим, создайте пары ключ-значение, такие как (слово, 1), а затем запишите компас данных в промежуточный файл.
2, уменьшить: прочитать промежуточный файл, упорядочить в соответствии с парой значений ключа, агрегировать те же данные, что и ключ, и подсчитать количество слов, которые появляются для каждого слова, а затем записать результат в файл.
package main
import (
"6.824/src/mapreduce"
"fmt"
"os"
"strconv"
"strings"
"unicode"
)
func mapF(filename string, contents string) (res []mapreduce.KeyValue) {
// Your code here (Part II).
f := func(c rune) bool {
return !unicode.IsLetter(c)
}
words := strings.FieldsFunc(contents, f)
for _, w := range words {
kv := mapreduce.KeyValue{Key: w, Value: "1"}
res = append(res, kv)
}
return res
}
func reduceF(key string, values []string) string {
// Your code here (Part II).
sum := 0
for _, e := range values {
data, err := strconv.Atoi(e)
if err != nil {
fmt.Printf("Reduce err %s%v\n", key, err)
continue
}
sum += data
}
return strconv.Itoa(sum)
}
func main() {
if len(os.Args) < 4 {
fmt.Printf("%s: see usage comments in file\n", os.Args[0])
} else {
var mr *mapreduce.Master
mr = mapreduce.Sequential("wcseq", os.Args[3:], 3, mapF, reduceF)
mr.Wait()
}
}
Реализовать инвертированный индекс
Точно так же, разрабатывая нашу собственную карту и уменьшая методы на основе понимания инвертированного индекса, 1, карта: будет читать документ, использовать слово в документе в качестве ключа и документ, в котором слово находится в качестве значения, и записывать его в промежуточный файл.
2. Сокращение: прочитать промежуточный файл, отсортировать его в соответствии с парой ключ-значение, объединить данные с одним и тем же ключом вместе, объединить имена файлов, в которых встречается слово, и записать их в файл результатов.
package main
import (
"bytes"
"os"
"strconv"
"strings"
"unicode"
)
import "fmt"
import "6.824/src/mapreduce"
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// Your code here (Part V).
words := strings.FieldsFunc(value, func(c rune) bool {
return !unicode.IsLetter(c)
})
for _, w := range words {
res = append(res, mapreduce.KeyValue{Key: w, Value: document})
}
return res
}
func reduceF(key string, values []string) string {
// Your code here (Part V).
sum := 0
var buffer bytes.Buffer
if key == "www" {
fmt.Println(values)
}
isExist := make(map[string]string)
for _, e := range values {
if _, ok := isExist[e]; !ok {
buffer.WriteString(e)
buffer.WriteString(",")
sum += 1
isExist[e] = e
}
}
iiRes := strconv.Itoa(sum) + " " + strings.TrimRight(buffer.String(), ",")
return iiRes
}
func main() {
if len(os.Args) < 4 {
fmt.Printf("%s: see usage comments in file\n", os.Args[0])
} else {
var mr *mapreduce.Master
mr = mapreduce.Sequential("iiseq", os.Args[3:], 3, mapF, reduceF)
mr.Wait()
}
}
резюме
Эта статья ссылается на документ Google, реализует автономную версию платформы MapReduce и реализует два простых экземпляра MapReduce.Код в этой статье можно найти в документации арендодателя.GitHubСкачать для просмотра.
Ссылаться на
MapReduce: Simplified Data Processing on Large Clusters