Простая реализация реализации MapReduce

Java Большие данные

Недавно мне посчастливилось прочитать три основные статьи о распределенной системе Google.Основываясь на принципе, что хорошая память не так хороша, как плохое письмо, я хотел бы рассказать о небольшом опыте арендодателя в разработке распределенных систем~

Я полагаю, что учащиеся, которые использовали Hadoop, будут выглядеть так же, ожидая вывода результата.INFO : 2020-01-17 11:44:14,132 Stage-11 map = 0%, reduce = 0%Он показывает процесс выполнения MapReduce.Ниже мы также расширим MapReduce, объясним принцип выполнения MapReduce и реализуем мини-версию MapReduce в соответствии с документом Google.

Что такое MapReduce

MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.

Как указано в статье Google MapReduce, MapReduce — это модель программирования и связанная с ней реализация алгоритмической модели для обработки и создания очень больших наборов данных. Сначала пользователь создает функцию Map для обработкиkey/value pairНабор данных выходного промежуточного продукта основан наkey/value pair, а затем создайте функцию Reduce для объединения всех значений промежуточного значения с одним и тем же значением промежуточного ключа.

Пример MapReduce

Принцип карты и уменьшения

Принцип модели программирования MapReduce таков: используйте вводkey/value pairсбор для получения выводаkey/value pairсобирать. Пользователи библиотеки MapReduce выражают это вычисление с помощью двух функций: Map и Reduce.

Map :Пользовательская функция карты принимает вводkey/value pairзначение, затем производит промежуточноеkey/value pairколлекция значений. Библиотека MapReduce агрегирует все значения промежуточного значения с одним и тем же значением промежуточного ключа и передает его в функцию Reduce.

Reduce :Определяемые пользователем функции сокращения принимают набор значений промежуточного ключа и связанное значение. Функция Reduce объединяет эти значения значений, чтобы сформировать меньший набор значений значений. Обычно за вызов функции Reduce создается только 0 или 1 выходное значение. Обычно мы подаем в функцию Reduce промежуточные значения через итератор, чтобы мы могли обрабатывать большие наборы значений, которые не все помещаются в память.

Пример применения Map и Reduce

  • Подсчитайте количество вхождений каждого слова в документе: функция Map обрабатывает документ, разбивает слова в документе и выводит (слово, 1). Функция Reduce складывает значения значений одного и того же слова и выдает результат (слово, общее количество записей).
  • Инвертированный индекс: функция «Карта» анализирует каждый документ и выводит список (слово, номер документа), вход функции «Уменьшить» — это все (слово, номер документа) заданного слова, сортирует все номера документов и выводит (слово, номер документа). список (номер документа)). Все выходные наборы образуют простой инвертированный индекс, который отслеживает положение слов в документах с помощью простого алгоритма.

Схема процесса выполнения MapReduce

Execution overview
На приведенной выше диаграмме показан полный поток выполнения в нашей реализации MapReduce. Когда пользователь вызывает функцию MapReduce, происходит следующая последовательность действий (следующие порядковые номера соответствуют порядковым номерам на рисунке выше):

  1. Библиотека MapReduce, вызываемая пользовательской программой, сначала делит входной файл на M фрагментов данных, и размер каждого фрагмента данных обычно составляет от 16 МБ до 64 МБ (размером каждого фрагмента данных можно управлять с помощью дополнительных параметров). Затем пользовательская программа создает множество копий программы в кластере.
  2. Одна из этих копий программы имеет специальную программуmaster. Остальные программы в копииworkerпрограмма, поmasterЗадания. Есть задачи M Map и задачи R Reduce, которые нужно назначить,masterВыделить задачу «Карта» или «Уменьшить» до простояworker.
  3. Назначено задаче картыworkerПрограмма считывает соответствующий фрагмент входных данных и анализирует его из фрагмента входных данных.key/value pair, затем поставьтеkey/value pairПередается определяемой пользователем функции Map, которая генерируется и выводится функцией Map.key/value pair, и кэшируется в памяти.
  4. в кэшеkey/value pairОн делится на R областей с помощью функции разделения, а затем периодически записывается на локальный диск. кэшированныйkey/value pairМесто хранения на локальном диске будет возвращеноmaster,Зависит отmasterотвечает за повторную передачу этих мест хранения вReduce worker.
  5. когдаReduce workerпрограмма полученаmasterПосле того, как программа отправит информацию о месте хранения данных, используйте RPC изMap workerКэшированные данные считываются с диска хоста. когдаReduce workerПосле считывания всех промежуточных данных данные с одинаковым значением ключа агрегируются путем сортировки ключа. Поскольку множество разных значений ключа сопоставляются с одной и той же задачей Reduce, необходимо упорядочивать. Если промежуточные данные слишком велики для сортировки в памяти, то сортировка выполняется извне.
  6. Reduce workerПрограмма просматривает отсортированные промежуточные данные для каждого уникального значения промежуточного ключа,Reduce workerПрограмма передает набор этого значения ключа и связанного с ним значения промежуточного значения пользовательской функции сокращения. Вывод функции Reduce добавляется к выходному файлу раздела-владельца.
  7. Когда все задачи Map и Reduce выполнены,masterРазбудите пользовательскую программу. В этот момент вызов MapReduce в пользовательской программе возвращается.

После успешного завершения задачи выходные данные MapReduce сохраняются в выходных файлах R (для каждой задачи Reduce создается один выходной файл, а имя файла задается пользователем). Как правило, пользователям не нужно объединять эти выходные файлы R в один файл, мы часто используем эти файлы в качестве входных данных для другого MapReduce или используем их в другом распределенном приложении, которое может обрабатывать несколько разделенных файлов.

Реализация программы MapReduce

Суть MapReduce заключается в реализации логического кода Map and Reduce, который показывает, что арендодатель завершит реализацию Map и Reduce в процессе выполнения Map и Reduce, описанном выше.

реализовать карту

1. Следующая функция doMap управляет задачей карты: она читает входной файл (inFile), вызывает определяемую пользователем функцию карты (mapF) для содержимого этого файла, а затем разбивает выходные данные mapF на промежуточные файлы nReduce.

2. Каждая задача сокращения соответствует промежуточному файлу. Имя файла включает номер задачи карты и номер задачи сокращения. Используйте имя файла, сгенерированное функцией reduceName, в качестве промежуточного файла для задачи сокращения. Вызовите ihash() для каждого ключевого мода nReduce, чтобы выбрать соответствующую задачу сокращения.

3. mapF — это функция карты, предоставляемая приложением. Первым аргументом должно быть имя входного файла. Второй аргумент должен быть содержимым всего входного файла. mapF() возвращает срез, содержащий пары ключ/значение для сокращения.

4. В следующей программе данные, обработанные mapF, записываются в файл в формате json.Для удобства обработки данных каждый фрагмент данных, обрабатываемых в следующей программе, отделяется символами новой строки.

func reduceName(jobName string, mapTask int, reduceTask int) string {
	return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}

func doMap(
	jobName string, //  MapReduce 的任务名称
	mapTask int, // 当前执行的 mapTask
	inFile string, // 输入的的文件
	nReduce int, // reduceTask 的数量
	mapF func(filename string, contents string) []KeyValue, // 用户自定义的 map 函数
) {
	f, err := os.Open(inFile)
	if err != nil {
		debug("open file err %v", err)
	}

	defer f.Close()

	dat, err := ioutil.ReadAll(f)
	if err != nil {
		debug("open map file err %v", err)
	}

	res := mapF(inFile, string(dat))

	for _, kv := range res {

		hash := ihash(kv.Key)
		r := hash % nReduce
		// mrtmp.xxx-0-0
		fd, err := os.OpenFile(reduceName(jobName, mapTask, r), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
		if err != nil {
			debug("open mrtmp.xxx file err %v", err)
			continue
		}

		enc := json.NewEncoder(fd)
		if err := enc.Encode(&kv); err != nil {
			debug("encode json err %v", err)
			continue
		}
		fd.Close()
	}
}

func ihash(s string) int {
	h := fnv.New32a()
	h.Write([]byte(s))
	return int(h.Sum32() & 0x7fffffff)
}

Реализовать Уменьшить

doReduce управляет задачей редукции: она читает промежуточный файл задачи, сортирует пары данных в промежуточном файле по ключу, вызывает определяемую пользователем функцию reduceF для каждого ключа и записывает выходные данные reduceF на диск.

func reduceName(jobName string, mapTask int, reduceTask int) string {
	return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}

func doReduce(
	jobName string, //  MapReduce 的任务名称
	reduceTask int, // 当前运行的 reduce 任务的任务号
	outFile string, // 结果输出的文件路径
	nMap int, // map 任务的个数
	reduceF func(key string, values []string) string, // 用户的自定义 reduce 函数
) {
	kvMap := make(map[string][]string)
	for i := 0; i < nMap; i++ {
		func() {
			inFileName := reduceName(jobName, i, reduceTask)
			inFile, err := os.Open(inFileName)
			if err != nil {
				panic("can't open file:" + inFileName)
			}
			defer inFile.Close()

			// Read and Decoder the file
			var kv KeyValue
			for decoder := json.NewDecoder(inFile); decoder.Decode(&kv) != io.EOF; {
				kvMap[kv.Key] = append(kvMap[kv.Key], kv.Value)
			}

		}()
	}
	var keys []string

	// sort by key
	for k := range kvMap {
		keys = append(keys, k)
	}
	sort.Strings(keys)

	// reduce
	outfd, err := os.Create(outFile)
	if err != nil {
		panic("can't create file:" + outFile)
	}
	defer outfd.Close()
	enc := json.NewEncoder(outfd)
	for _, k := range keys {
		reducedValue := reduceF(k, kvMap[k])
		enc.Encode(KeyValue{Key: k, Value: reducedValue})
	}
}

Инкапсуляция doMap и doReduce

Следующая функция предназначена для последовательного вызова doMap и doReduce для генерации результатов задач MapReduce и вывода их в файл результатов.

func Sequential(jobName string, files []string, nreduce int,
	mapF func(string, string) []KeyValue,
	reduceF func(string, []string) string,
) (mr *Master) {
	mr = newMaster("master")
	go mr.run(jobName, files, nreduce, func(phase jobPhase) {
		switch phase {
		case mapPhase:
			for i, f := range mr.files {
				doMap(mr.jobName, i, f, mr.nReduce, mapF)
			}
		case reducePhase:
			for i := 0; i < mr.nReduce; i++ {
				doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
			}
		}
	}, func() {
		mr.stats = []int{len(files) + nreduce}
	})
	return
}

Реализовать статистику частоты слов и инвертированный индекс с помощью MapReduce.

Выше мы упомянули примеры MapReduce в практических приложениях, а ниже мы сделаем простую реализацию этих двух примеров.

Реализовать статистику частоты слов

Чтобы реализовать функцию статистики частотности слов, наша идея использования фреймворка MapReduce заключается в реализации пользовательских карт и функций сокращения: 1, карта: прочитайте документ, извлеките слова из документа одно за другим, создайте пары ключ-значение, такие как (слово, 1), а затем запишите компас данных в промежуточный файл.

2, уменьшить: прочитать промежуточный файл, упорядочить в соответствии с парой значений ключа, агрегировать те же данные, что и ключ, и подсчитать количество слов, которые появляются для каждого слова, а затем записать результат в файл.

package main

import (
	"6.824/src/mapreduce"
	"fmt"
	"os"
	"strconv"
	"strings"
	"unicode"
)

func mapF(filename string, contents string) (res []mapreduce.KeyValue) {
	// Your code here (Part II).
	f := func(c rune) bool {
		return !unicode.IsLetter(c)
	}

	words := strings.FieldsFunc(contents, f)
	for _, w := range words {
		kv := mapreduce.KeyValue{Key: w, Value: "1"}
		res = append(res, kv)
	}
	return res
}

func reduceF(key string, values []string) string {
	// Your code here (Part II).
	sum := 0
	for _, e := range values {
		data, err := strconv.Atoi(e)
		if err != nil {
			fmt.Printf("Reduce err %s%v\n", key, err)
			continue
		}
		sum += data
	}
	return strconv.Itoa(sum)
}

func main() {
	if len(os.Args) < 4 {
		fmt.Printf("%s: see usage comments in file\n", os.Args[0])
	} else {
		var mr *mapreduce.Master
		mr = mapreduce.Sequential("wcseq", os.Args[3:], 3, mapF, reduceF)
		mr.Wait()
	}
}

Реализовать инвертированный индекс

Точно так же, разрабатывая нашу собственную карту и уменьшая методы на основе понимания инвертированного индекса, 1, карта: будет читать документ, использовать слово в документе в качестве ключа и документ, в котором слово находится в качестве значения, и записывать его в промежуточный файл.

2. Сокращение: прочитать промежуточный файл, отсортировать его в соответствии с парой ключ-значение, объединить данные с одним и тем же ключом вместе, объединить имена файлов, в которых встречается слово, и записать их в файл результатов.

package main

import (
	"bytes"
	"os"
	"strconv"
	"strings"
	"unicode"
)
import "fmt"
import "6.824/src/mapreduce"

func mapF(document string, value string) (res []mapreduce.KeyValue) {
	// Your code here (Part V).
	words := strings.FieldsFunc(value, func(c rune) bool {
		return !unicode.IsLetter(c)
	})
	for _, w := range words {
		res = append(res, mapreduce.KeyValue{Key: w, Value: document})
	}
	return res
}

func reduceF(key string, values []string) string {
	// Your code here (Part V).
	sum := 0
	var buffer bytes.Buffer
	if key == "www" {
		fmt.Println(values)
	}
	isExist := make(map[string]string)
	for _, e := range values {
		if _, ok := isExist[e]; !ok {
			buffer.WriteString(e)
			buffer.WriteString(",")
			sum += 1
			isExist[e] = e
		}
	}
	iiRes := strconv.Itoa(sum) + " " + strings.TrimRight(buffer.String(), ",")
	return iiRes
}

func main() {
	if len(os.Args) < 4 {
		fmt.Printf("%s: see usage comments in file\n", os.Args[0])
	} else {
		var mr *mapreduce.Master
		mr = mapreduce.Sequential("iiseq", os.Args[3:], 3, mapF, reduceF)
		mr.Wait()
	}
}

резюме

Эта статья ссылается на документ Google, реализует автономную версию платформы MapReduce и реализует два простых экземпляра MapReduce.Код в этой статье можно найти в документации арендодателя.GitHubСкачать для просмотра.

Ссылаться на

MapReduce: Simplified Data Processing on Large Clusters

Подписывайтесь на нас

关注我们