Обучающая серия Hadoop (4) по принципу MapReduce

Java алгоритм Hadoop

Оригинальная ссылка:pengtuo.tech/большие данные исследований и разработок/2018/…

Эта статья познакомитHadoopважная вычислительная базаMapReduce.

полныйMapReduceКаркас состоит из двух частей:

  1. Алгоритмический логический уровень, а именноmap,shuffleа такжеreduceТри важных компонента алгоритма, эта статья представит этот уровень;
  2. Фактический уровень работы, т. е. в какой форме и в каком процессе выполняется задание логики алгоритма на распределенном узле, поскольку автоматическийMapReduce version2В дальнейшем задания сдаютсяYARNуправления, поэтому эта часть не будет рассматриваться в этой статье.

Среди других статей серии:

1. Что такое MapReduce?

MapReduceЭто среда параллельных распределенных вычислений, основанная на Java. Приложения для обработки данных, написанные с ее использованием, могут работать на больших коммерческих аппаратных кластерах для обработки больших наборов данных.РаспараллеливаниеПроблема в том, что обработка данных может происходить с данными, хранящимися в файловой системе (неструктурированные) или в базе данных (структурированные).MapReduceМестоположение данных может быть использовано для обработки данных рядом с местом их хранения, чтобы свести к минимуму коммуникационные издержки.

MapReduceПлатформа выполняет различные задачи параллельно, объединяя распределенные серверы, и управляет всей связью и передачей данных между различными частями системы; она также может автоматически выполнять параллельную обработку вычислительных задач, автоматически разделять вычислительные данные и вычислительные задачи и автоматически разделять вычисления. данные и вычислительные задачи на узлах кластера.Он автоматически назначает и выполняет задачи и собирает результаты вычислений, а также назначает множество сложных деталей в нижней части системы, участвующих в параллельных вычислениях, таких как хранение распределения данных, передача данных и отказоустойчивая обработка для система обработки, снижающая нагрузку на разработчиков.

MapReduceЭто также модель и метод параллельного программирования (Модель и методология программирования). С помощью идеи дизайна функционального языка программирования Lisp он обеспечивает простой метод параллельного программирования, который сильно абстрагирует сложный процесс параллельных вычислений, выполняемый в крупномасштабных кластерах, на две функции: Map и Reduce. Основные задачи параллельных вычислений реализуется двумя функциональными программами Map и Reduce, а абстрактные операции и интерфейсы параллельного программирования обеспечивают простое и удобное завершение программирования и вычислительной обработки крупномасштабных данных.

2. Алгоритм

Каркас MapReduce обычно состоит из трех операций (или шагов):

  1. Map: каждый рабочий узел будетmapФункция применяется к локальным данным и записывает вывод во временное хранилище. Главный узел гарантирует, что обрабатывается только одна копия избыточных входных данных.
  2. Shuffle: Рабочие узлы в соответствии с выходным ключом (поmapгенерация функции) для перераспределения данных, сортировки, группировки и копирования карты данных, чтобы все данные, принадлежащие ключу, находились на одном и том же рабочем узле.
  3. Reduce: рабочие узлы теперь обрабатывают каждый набор выходных данных для каждого ключа параллельно.

Блок-схема MapReduce:

MapReduceРазрешить распределенную работуMapоперация до тех пор, пока каждыйMapдействовать независимо от другихMapоперации могут выполняться параллельно.

Другой, более подробный, будетMapReduceОн разделен на 5 шагов, чтобы понять:

  1. Prepare the Map() input:MapReduceкадр первыйMapпроцессор, которому затем назначаются входные данные для обработки -- пары ключ-значениеK1, и предоставить процессору все входные данные, связанные с ключом;
  2. Run the user-provided Map() code:Map()существуетK1Запустите один раз для пары ключ-значение, сгенерировавK2вывод указанной пары ключ-значение;
  3. Shuffle the Map output to the Reduce processors: преобразовать ранее сгенерированныйK2Пара ключ-значение перемещается на один и тот же рабочий узел в зависимости от того, является ли «ключ» одним и тем же;
  4. Run the user-provided Reduce() code: для каждого рабочего узлаK2пара ключ-значениеReduce()действовать;
  5. Produce the final output:MapReduceколлекция рамок всеReduceвывод и нажмитеK2Отсортируйте его, чтобы получить окончательный результат для вывода.

В реальной производственной среде данные, скорее всего, будут разбросаны по каждому серверу.При исходном методе обработки больших данных данные отправляются в то место, где находится код для обработки, что очень неэффективно и занимает много времени. полоса пропускания В этом случаеMapReduceФреймворк обрабатывает это, помещаяMap()операция илиReduce()Отправляйте на сервер, где находятся данные, и используйте «мобильные вычисления вместо мобильных данных», чтобы ускорить работу всего фреймворка.Большая часть вычислений происходит на узлах с данными на локальных дисках, тем самым сокращая сетевой трафик.

Mapper

ОдинMapФункция заключается в выполнении определенной операции над каждым элементом концептуального списка, состоящего из независимых элементов, поэтому каждый элемент работает независимо, а исходный список не изменяется, поскольку здесь создается новый список для сохранения нового ответа. Это означает, что,MapОперации легко распараллеливаются

MapReduceобрамленныйMapиReduceфункции основаны на(key, value)Определена структура данных формы.Mapв поле данных (Data Domain), чтобы получить пару ключ-значение и вернуть список пар ключ-значение:

Map(k1,v1) → list(k2,v2)

MapФункция вызывается параллельно, применяется к каждой паре ключ-значение во входном наборе данных (с ключом K1). Затем каждый вызов возвращает список пар ключ-значение (с ключом K2). после,MapReduceРамка собирается из всех списков с одинаковымиkey(здесь k2) все пары ключ-значение и сгруппировать их вместе для каждогоkeyСоздайте группу.

Reducer

иReduceправильное слияние элементов списка. Хотя не какMapФункции настолько параллельны, но поскольку у редукции всегда есть простой ответ, а крупномасштабные операции относительно независимы, функции редукции также полезны в высокопараллельных средах.ReduceФункция применяется к каждой группе параллельно, в результате получается набор значений в одной предметной области:

Reduce(k2, list (v2)) → list(v3)

ReduceТерминал получает упорядоченные группы данных из разных задач. В настоящее времяReduce()Это будет соответствовать логике кода, написанной программистом.reduceОперации, такие как подсчет и суммирование по одной и той же паре ключ-значение и т. д. еслиReduceЕсли количество данных, принимаемых терминалом, достаточно мало, они сохраняются непосредственно в памяти, если количество данных превышает определенную долю размера буфера, данные будут объединены, а затем записаны на диск.

Partitioner

Как упоминалось ранее,MapНа этапе есть операция разделения на группы Процесс разделения данныхPartition, а класс java, отвечающий за разбиение,Partitioner.

Partitionerкомпоненты позволяютMapправильноKeyраздел, так что разные разделыKeyпо-разномуReduceобработка, тем самымPartitionerсумма равнаReducerчисло, аPartitionerсоответствует одномуReduceработу, которую можно считатьReduceРазделение входных данных можно запрограммировать и контролировать в соответствии с реальной бизнес-ситуацией, улучшаяReduceэффективности или балансировки нагрузки.MapReduceВстроенная перегородка естьHashPartition.

Всегда полезно иметь несколько разбиений, потому что время, затрачиваемое на обработку разбиений, мало по сравнению со временем, затрачиваемым на обработку всего ввода. Когда разделение небольшое, балансировка нагрузки может выполняться лучше, но оно не должно быть слишком маленьким.Если оно слишком мало, время, затрачиваемое на управление разделением и загрузку задач, будет составлять большую часть общего времени выполнения.

На картинке нижеmapзадачи иreduceСхематическая диаграмма задачи:

3. Пример подсчета слов

Вот код Java для подсчета частоты слов:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  // 继承 Mapper 类,实现自己的 map 功能
  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    // map 功能必须实现的函数
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  // 继承 Reducer 类,实现自己的 reduce 功能
  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    // 初始化Configuration,读取mapreduce系统配置信息
    Configuration conf = new Configuration();

    // 构建 Job 并且加载计算程序 WordCount.class
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);

    //指定 Mapper、Combiner、Reducer,也就是我们自己继承实现的类
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    // 设置输入输出数据
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Приведенный выше код будет найден при указанииMapperа такжеReducer, также указаноCombinerсвоего рода,Combinerявляется локализованнымreduceоперации (так мы видимWordCountкласс используетсяreduceзагружен), этоmapпоследующее выполнение операции, сmapна том же хосте, в основном наmapПеред расчетом промежуточного файла выполните простую операцию слияния и повторения значения ключа, чтобы уменьшить размер промежуточного файла, чтобы в последующемShuffleЭто может снизить стоимость сетевой передачи и повысить эффективность сетевой передачи.

ОтправитьMRКоманда для работы:

hadoop jar {程序的 jar 包} {任务名称} {数据输入路径} {数据输出路径}

Например:

hadoop jar hadoop-mapreduce-wordcount.jar WordCount /sample/input /sample/output

Схематическая диаграмма приведенного выше кода:

Промежуточные результаты Map -> Shuffle -> Reduce, включая конечный результат, сохраняются на локальном диске.

4. Преимущества и недостатки MapReduce

MapReduceДва основных преимущества:

1) Параллельная обработка:

существуетMapReduceВ , мы делим задание на несколько узлов, и каждый узел обрабатывает часть задания одновременно. следовательно,MapReduceОснованный на парадигме «разделяй и властвуй», он помогает нам обрабатывать данные, используя разные машины. Поскольку данные обрабатываются параллельно несколькими машинами, а не одной машиной, время, необходимое для обработки данных, намного меньше.

2) Расположение данных:

Переносим расчет наMapReduceданные в кадре вместо перемещения данных в вычислительную часть. Данные распределяются между несколькими узлами, где каждый узел обрабатывает ту часть данных, которая находится на нем.

Это дает следующие преимущества:

  • Перемещение блоков обработки туда, где хранятся данные, снижает затраты на сеть;
  • Сокращение времени обработки, так как все узлы обрабатывают часть своих данных параллельно;
  • Каждый узел получает часть данных для обработки, поэтому вероятность перегрузки узла отсутствует.

Однако MapReduce также имеет свои ограничения:

  1. Потоковые вычисления и вычисления в реальном времени не могут выполняться, можно вычислять только автономные данные;
  2. Промежуточные результаты сохраняются на диске, что увеличивает нагрузку ввода-вывода на диск, а скорость чтения относительно низкая;
  3. проблемы развития, такие какwordcountфункция требует много настроек и кода, иSparkбудет очень просто.