Оригинальная ссылка:pengtuo.tech/большие данные исследований и разработок/2018/…
Эта статья познакомитHadoop
важная вычислительная базаMapReduce
.
полныйMapReduce
Каркас состоит из двух частей:
- Алгоритмический логический уровень, а именно
map
,shuffle
а такжеreduce
Три важных компонента алгоритма, эта статья представит этот уровень; - Фактический уровень работы, т. е. в какой форме и в каком процессе выполняется задание логики алгоритма на распределенном узле, поскольку автоматический
MapReduce version2
В дальнейшем задания сдаютсяYARN
управления, поэтому эта часть не будет рассматриваться в этой статье.
Среди других статей серии:
- Обучающая серия Hadoop (1) Создание псевдораспределенной среды Hadoop
- Подробный анализ HDFS в обучающей серии Hadoop (2)
- Подробный анализ YARN в обучающей серии Hadoop (3)
1. Что такое MapReduce?
MapReduce
Это среда параллельных распределенных вычислений, основанная на Java. Приложения для обработки данных, написанные с ее использованием, могут работать на больших коммерческих аппаратных кластерах для обработки больших наборов данных.РаспараллеливаниеПроблема в том, что обработка данных может происходить с данными, хранящимися в файловой системе (неструктурированные) или в базе данных (структурированные).MapReduce
Местоположение данных может быть использовано для обработки данных рядом с местом их хранения, чтобы свести к минимуму коммуникационные издержки.
MapReduce
Платформа выполняет различные задачи параллельно, объединяя распределенные серверы, и управляет всей связью и передачей данных между различными частями системы; она также может автоматически выполнять параллельную обработку вычислительных задач, автоматически разделять вычислительные данные и вычислительные задачи и автоматически разделять вычисления. данные и вычислительные задачи на узлах кластера.Он автоматически назначает и выполняет задачи и собирает результаты вычислений, а также назначает множество сложных деталей в нижней части системы, участвующих в параллельных вычислениях, таких как хранение распределения данных, передача данных и отказоустойчивая обработка для система обработки, снижающая нагрузку на разработчиков.
MapReduce
Это также модель и метод параллельного программирования (Модель и методология программирования). С помощью идеи дизайна функционального языка программирования Lisp он обеспечивает простой метод параллельного программирования, который сильно абстрагирует сложный процесс параллельных вычислений, выполняемый в крупномасштабных кластерах, на две функции: Map и Reduce. Основные задачи параллельных вычислений реализуется двумя функциональными программами Map и Reduce, а абстрактные операции и интерфейсы параллельного программирования обеспечивают простое и удобное завершение программирования и вычислительной обработки крупномасштабных данных.
2. Алгоритм
Каркас MapReduce обычно состоит из трех операций (или шагов):
-
Map
: каждый рабочий узел будетmap
Функция применяется к локальным данным и записывает вывод во временное хранилище. Главный узел гарантирует, что обрабатывается только одна копия избыточных входных данных. -
Shuffle
: Рабочие узлы в соответствии с выходным ключом (поmap
генерация функции) для перераспределения данных, сортировки, группировки и копирования карты данных, чтобы все данные, принадлежащие ключу, находились на одном и том же рабочем узле. -
Reduce
: рабочие узлы теперь обрабатывают каждый набор выходных данных для каждого ключа параллельно.
Блок-схема MapReduce:
MapReduce
Разрешить распределенную работуMap
операция до тех пор, пока каждыйMap
действовать независимо от другихMap
операции могут выполняться параллельно.
Другой, более подробный, будетMapReduce
Он разделен на 5 шагов, чтобы понять:
-
Prepare the Map() input:
MapReduce
кадр первыйMap
процессор, которому затем назначаются входные данные для обработки -- пары ключ-значениеK1
, и предоставить процессору все входные данные, связанные с ключом; -
Run the user-provided Map() code:
Map()
существуетK1
Запустите один раз для пары ключ-значение, сгенерировавK2
вывод указанной пары ключ-значение; -
Shuffle the Map output to the Reduce processors: преобразовать ранее сгенерированный
K2
Пара ключ-значение перемещается на один и тот же рабочий узел в зависимости от того, является ли «ключ» одним и тем же; -
Run the user-provided Reduce() code: для каждого рабочего узла
K2
пара ключ-значениеReduce()
действовать; -
Produce the final output:
MapReduce
коллекция рамок всеReduce
вывод и нажмитеK2
Отсортируйте его, чтобы получить окончательный результат для вывода.
В реальной производственной среде данные, скорее всего, будут разбросаны по каждому серверу.При исходном методе обработки больших данных данные отправляются в то место, где находится код для обработки, что очень неэффективно и занимает много времени. полоса пропускания В этом случаеMapReduce
Фреймворк обрабатывает это, помещаяMap()
операция илиReduce()
Отправляйте на сервер, где находятся данные, и используйте «мобильные вычисления вместо мобильных данных», чтобы ускорить работу всего фреймворка.Большая часть вычислений происходит на узлах с данными на локальных дисках, тем самым сокращая сетевой трафик.
Mapper
ОдинMap
Функция заключается в выполнении определенной операции над каждым элементом концептуального списка, состоящего из независимых элементов, поэтому каждый элемент работает независимо, а исходный список не изменяется, поскольку здесь создается новый список для сохранения нового ответа. Это означает, что,Map
Операции легко распараллеливаются
MapReduce
обрамленныйMap
иReduce
функции основаны на(key, value)
Определена структура данных формы.Map
в поле данных (Data Domain), чтобы получить пару ключ-значение и вернуть список пар ключ-значение:
Map(k1,v1) → list(k2,v2)
Map
Функция вызывается параллельно, применяется к каждой паре ключ-значение во входном наборе данных (с ключом K1). Затем каждый вызов возвращает список пар ключ-значение (с ключом K2). после,MapReduce
Рамка собирается из всех списков с одинаковымиkey
(здесь k2) все пары ключ-значение и сгруппировать их вместе для каждогоkey
Создайте группу.
Reducer
иReduce
правильное слияние элементов списка. Хотя не какMap
Функции настолько параллельны, но поскольку у редукции всегда есть простой ответ, а крупномасштабные операции относительно независимы, функции редукции также полезны в высокопараллельных средах.Reduce
Функция применяется к каждой группе параллельно, в результате получается набор значений в одной предметной области:
Reduce(k2, list (v2)) → list(v3)
Reduce
Терминал получает упорядоченные группы данных из разных задач. В настоящее времяReduce()
Это будет соответствовать логике кода, написанной программистом.reduce
Операции, такие как подсчет и суммирование по одной и той же паре ключ-значение и т. д. еслиReduce
Если количество данных, принимаемых терминалом, достаточно мало, они сохраняются непосредственно в памяти, если количество данных превышает определенную долю размера буфера, данные будут объединены, а затем записаны на диск.
Partitioner
Как упоминалось ранее,Map
На этапе есть операция разделения на группы Процесс разделения данныхPartition
, а класс java, отвечающий за разбиение,Partitioner
.
Partitioner
компоненты позволяютMap
правильноKey
раздел, так что разные разделыKey
по-разномуReduce
обработка, тем самымPartitioner
сумма равнаReducer
число, аPartitioner
соответствует одномуReduce
работу, которую можно считатьReduce
Разделение входных данных можно запрограммировать и контролировать в соответствии с реальной бизнес-ситуацией, улучшаяReduce
эффективности или балансировки нагрузки.MapReduce
Встроенная перегородка естьHashPartition
.
Всегда полезно иметь несколько разбиений, потому что время, затрачиваемое на обработку разбиений, мало по сравнению со временем, затрачиваемым на обработку всего ввода. Когда разделение небольшое, балансировка нагрузки может выполняться лучше, но оно не должно быть слишком маленьким.Если оно слишком мало, время, затрачиваемое на управление разделением и загрузку задач, будет составлять большую часть общего времени выполнения.
На картинке нижеmap
задачи иreduce
Схематическая диаграмма задачи:
3. Пример подсчета слов
Вот код Java для подсчета частоты слов:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
// 继承 Mapper 类,实现自己的 map 功能
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// map 功能必须实现的函数
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
// 继承 Reducer 类,实现自己的 reduce 功能
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
// 初始化Configuration,读取mapreduce系统配置信息
Configuration conf = new Configuration();
// 构建 Job 并且加载计算程序 WordCount.class
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
//指定 Mapper、Combiner、Reducer,也就是我们自己继承实现的类
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// 设置输入输出数据
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Приведенный выше код будет найден при указанииMapper
а такжеReducer
, также указаноCombiner
своего рода,Combiner
является локализованнымreduce
операции (так мы видимWordCount
класс используетсяreduce
загружен), этоmap
последующее выполнение операции, сmap
на том же хосте, в основном наmap
Перед расчетом промежуточного файла выполните простую операцию слияния и повторения значения ключа, чтобы уменьшить размер промежуточного файла, чтобы в последующемShuffle
Это может снизить стоимость сетевой передачи и повысить эффективность сетевой передачи.
ОтправитьMR
Команда для работы:
hadoop jar {程序的 jar 包} {任务名称} {数据输入路径} {数据输出路径}
Например:
hadoop jar hadoop-mapreduce-wordcount.jar WordCount /sample/input /sample/output
Схематическая диаграмма приведенного выше кода:
Промежуточные результаты Map -> Shuffle -> Reduce, включая конечный результат, сохраняются на локальном диске.
4. Преимущества и недостатки MapReduce
MapReduce
Два основных преимущества:
1) Параллельная обработка:
существуетMapReduce
В , мы делим задание на несколько узлов, и каждый узел обрабатывает часть задания одновременно. следовательно,MapReduce
Основанный на парадигме «разделяй и властвуй», он помогает нам обрабатывать данные, используя разные машины. Поскольку данные обрабатываются параллельно несколькими машинами, а не одной машиной, время, необходимое для обработки данных, намного меньше.
2) Расположение данных:
Переносим расчет наMapReduce
данные в кадре вместо перемещения данных в вычислительную часть. Данные распределяются между несколькими узлами, где каждый узел обрабатывает ту часть данных, которая находится на нем.
Это дает следующие преимущества:
- Перемещение блоков обработки туда, где хранятся данные, снижает затраты на сеть;
- Сокращение времени обработки, так как все узлы обрабатывают часть своих данных параллельно;
- Каждый узел получает часть данных для обработки, поэтому вероятность перегрузки узла отсутствует.
Однако MapReduce также имеет свои ограничения:
- Потоковые вычисления и вычисления в реальном времени не могут выполняться, можно вычислять только автономные данные;
- Промежуточные результаты сохраняются на диске, что увеличивает нагрузку ввода-вывода на диск, а скорость чтения относительно низкая;
- проблемы развития, такие как
wordcount
функция требует много настроек и кода, иSpark
будет очень просто.