Узнайте больше о MapReduce в одной статье

Большие данные

Решение Hadoop для крупномасштабных распределенных вычислений данных — это MapReduce. MapReduce — это и модель программирования, и вычислительная среда. Другими словами, разработчики должны разрабатывать программы на основе модели программирования MapReduce, а затем распространять программы в кластере Hadoop для выполнения через вычислительную среду MapReduce. Давайте сначала рассмотрим MapReduce как модель программирования.


Модель программирования MapReduce

MapReduce — очень простая, но очень мощная модель программирования.

Простота заключается в том, что его модель программирования включает в себя только два процесса: карту и уменьшение.Основной вход карты — пара значений .После расчета карты пара значений выводятся; затем те же ключи объединяются, чтобы сформировать ; затем вводят этот в сокращение и выводят ноль или более пар после вычисления.

Но в то же время MapReduce очень мощен: будь то операции реляционной алгебры (вычисления SQL) или матричные операции (вычисления графов), почти все вычислительные требования в области больших данных могут быть достигнуты с помощью программирования MapReduce.

Возьмем в качестве примера программу WordCount. WordCount в основном решает проблему статистики частотности слов при обработке текста, которая заключается в подсчете количества вхождений каждого слова в тексте. Если вы просто посчитаете частоту слов в статье, от десятков K до нескольких M данных, то напишите программу, прочитайте данные в память и постройте хеш-таблицу для записи количества вхождений каждого слова, как показано ниже.


Статистика частоты слов для небольших объемов данных


Но если вы хотите посчитать частоту слов всех веб-страниц (триллионов) в Интернете в мире (это типичное требование поисковых систем, таких как Google), вы не можете написать программу для чтения всех веб-страниц в мире. мир в память, то вам может понадобиться использовать программирование MapReduce для решения.

Программа MapReduce WordCount выглядит следующим образом.

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}

Его ядром является функция отображения, функция сокращения.

Ввод функции карты в основном представляет собой пару . В этом примере значение представляет собой строку данных во всех текстах, которые нужно подсчитать. Ключ здесь не важен, и мы его игнорируем.

public void map(Object key, Text value, Context context)

Процесс вычисления функции карты заключается в извлечении слов из этой строки текста и выводе пары , такой как , для каждого слова.

Вычислительная среда MapReduce соберет эти и соединит одно и то же слово вместе, чтобы сформировать > Например, data, а затем передайте его функции сокращения.

public void reduce(Text key, Iterable<IntWritable> values,Context context)

Значения входных параметров сокращения здесь представляют собой набор, состоящий из множества единиц, а ключ — это конкретное слово word.

Процесс вычисления функции сокращения заключается в суммировании 1 в этом наборе, а затем в объединении слова (слова) и этой суммы (суммы) для формирования выходных данных (). Каждый выход представляет собой слово и сумму статистики частоты его слов.

Если предположить, что есть два блока текстовых данных, которым нужна статистика частоты слов, процесс расчета MapReduce выглядит следующим образом.


Процесс расчета MapReduce


Функция карты может выполнять операции над частью данных, так что большие данные можно разделить на множество блоков (это то, что делает HDFS), а вычислительная среда MapReduce выделяет функцию карты каждому блоку для вычисления, тем самым реализуя большие данные распределенные вычисления.

Как упоминалось выше, модель программирования MapReduce делит процесс вычисления больших данных на два этапа: сопоставление и уменьшение.На этапе сопоставления задача вычисления сопоставления назначается каждому блоку данных, а затем все ключи, выдаваемые сопоставлением, объединяются. Тот же ключ и соответствующее ему значение отправляются в ту же задачу сокращения для обработки.

В этом процессе есть две ключевые проблемы, которые необходимо решить.

  • Как назначить задачу расчета карты на каждый блок данных, как код отправляет серверу, где находится блок данных, как запустить отправку и как узнать, где находятся данные для расчета в файле после его запуска ( какой идентификатор блока данных)


  • вывода карты с разных серверов, как агрегировать один и тот же ключ вместе и отправить его в задачу сокращения

Эти две ключевые проблемы просто соответствуют двум «обработкам платформы MapReduce» в «процессе расчета MapReduce» в статье.

Обработка двух фреймворков MapReduce в процессе расчета MapReduce


Давайте сначала посмотрим, как MapReduce запускает и обрабатывает задание приложения для обработки больших данных.

Механизм запуска и запуска задания MapReduce

На примере Hadoop1 рабочий процесс MapReduce включает следующие ключевые процессы:

  • Процесс приложения больших данных: запустите основной вход в пользовательскую программу MapReduce, в основном укажите классы Map и Reduce, пути к входным и выходным файлам и т. д., а также отправьте задания в кластер Hadoop.

  • Процесс JobTracker: запускает соответствующее количество задач сопоставления и сокращения процессов в соответствии с объемом входных данных, подлежащих обработке, и управляет планированием задач и мониторингом на протяжении всего жизненного цикла задания. Процесс JobTracker глобально уникален во всем кластере Hadoop.

  • Процесс TaskTracker: отвечает за запуск и управление процессом сопоставления и сокращением. Поскольку каждый блок данных должен иметь соответствующую функцию сопоставления, процесс TaskTracker обычно запускается на том же сервере, что и процесс HDFS DataNode, то есть на большинстве серверов в кластере Hadoop выполняются как процесс DataNode, так и процесс TaskTacker.

Как показано ниже.

Механизм запуска и запуска задания MapReduce


Конкретный процесс запуска и расчета задания выглядит следующим образом:

  • Процесс приложения хранит пакеты jar заданий пользователя в HDFS, и эти пакеты jar будут распространяться на серверы в кластере Hadoop для выполнения вычислений MapReduce в будущем.

  • Приложение отправляет задание в JobTracker.

  • JobTacker создает дерево JobInProcess в соответствии с политикой планирования заданий, и каждое задание будет иметь собственное дерево JobInProcess.

  • JobInProcess создает соответствующее количество TaskInProcess в соответствии с количеством сегментов входных данных (обычно количеством блоков данных) и установленным количеством редукторов.

  • Процесс TaskTracker и процесс JobTracker регулярно обмениваются данными.

  • Если у TaskTracker есть свободные вычислительные ресурсы (простаивающие ядра процессора), JobTracker будет назначать ему задачи. При назначении задачи будет вычислять ей задачу по имени сервера TaskTracker, совпадающему с блоком данных на той же машине.Заставляет запущенную вычислительную задачу нормально обрабатывать данные на этой машине.

  • После того, как TaskRunner получит задачу, в соответствии с типом задачи (отобразить или уменьшить), параметрами задачи (путь пакета JAR задания, путь к файлу входных данных, начальная позиция и смещение данных, которые должны быть обработаны в файле, а также DataNode для нескольких резервные копии блоков данных, имя хоста и т. д.), чтобы запустить соответствующую карту или уменьшить процесс.

  • После запуска программы карты или сокращения проверьте, есть ли файл пакета jar для локального выполнения задачи.Если нет, загрузите его из HDFS, а затем загрузите код карты или сокращения, чтобы начать выполнение.

  • Если это процесс сопоставления, считывайте данные из HDFS (обычно блоки данных для чтения хранятся на локальном компьютере). Если это процесс сокращения, запишите данные результата в HDFS.

С помощью описанного выше процесса MapReduce может распределять вычислительные задачи заданий больших данных во всем кластере Hadoop, а данные, которые должны обрабатываться каждой задачей вычисления карты, обычно можно считывать с локального диска. Что нужно сделать пользователю, так это просто написать функцию отображения и функцию сокращения, и ему не нужно заботиться о том, как эти две функции распределяются и запускаются в кластере, а также о том, как блоки данных распределяются для вычислительных задач. Все это делает вычислительная среда MapReduce.

Механизм слияния и объединения данных MapReduce

В примере WordCount для подсчета количества вхождений одного и того же слова во всех входных данных, а карта может обрабатывать только часть данных, популярное слово будет появляться почти во всех картах, и эти слова должны быть объединены вместе для статистика, чтобы получить правильный результат.

На самом деле, почти во всех сценариях обработки больших данных необходимо решать проблему ассоциации данных.Такой простой, как WordCount, вам нужно только объединить ключи, и такой сложный, как операция объединения базы данных, вам нужно иметь дело с двумя типами (или более типов) данных.Подключение по ключу.

Вычислительная среда MapReduce обрабатывает операции слияния и соединения данных между выводом карты и вводом сокращения.Для этого процесса используется специальный словарь, называемый перемешиванием.

Процесс тасования MapReduce

Результат расчета каждой задачи карты будет записан в локальную файловую систему.Когда задача карты будет близка к завершению расчета, вычислительная среда MapReduce запустит процесс перемешивания и вызовет интерфейс Partitioner на стороне карты, и для каждого Выберите раздел сокращения, а затем отправьте его соответствующему процессу сокращения по протоколу http. Таким образом, независимо от того, на каком узле сервера находится карта, один и тот же ключ будет отправлен одному и тому же процессу сокращения. Сторона сокращения сортирует и объединяет полученные , складывает те же ключи вместе, чтобы сформировать , и передает его для сокращения выполнения.

Partitioner по умолчанию платформы MapReduce использует хеш-значение ключа для модуляции количества задач сокращения. Один и тот же ключ должен относиться к одному и тому же идентификатору задачи сокращения. С точки зрения реализации такой код Partitioner нуждается только в одной строке, как показано ниже.

/** Use {@link Object#hashCode()} to partition. */ 
public int getPartition(K2 key, V2 value, int numReduceTasks) { 
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 
}

Перемешивание — это место, где происходят чудеса в процессе вычисления больших данных Будь то MapReduce или Spark, если это пакетное вычисление больших данных, должен быть процесс перемешивания для связывания данных, а также неотъемлемая связь и ценность. данных будет раскрыто. Если вы не понимаете shuffle, вы запутаетесь в программировании карты и сокращения, и вы не знаете, как правильно спроектировать вывод карты и ввод сокращения. Перемешивание также является наиболее сложной и требующей высокой производительности частью всего процесса MapReduce.В раннем коде MapReduce половина кода посвящена обработке в случайном порядке.