предисловие
В предыдущей статье мы узнали о процессах чтения и записи HDFS, решений высокой доступности, федерации и Sequence Files, краткий обзорПроцесс написания HDFSБар
Клиент вызывает метод создания распределенной файловой системы. Этот процесс заключается в удаленном вызове метода создания узла имен. В это время узел имен выполняет четыре действия.
-
Проверьте, правильно ли он работает
-
Определите, существует ли файл для записи в HDFS
-
Проверьте, есть ли у клиента разрешение на создание
-
Зарегистрируйте эту операцию (журнал редактирования)
В этот момент метод create вернет OutputStream, который также должен взаимодействовать с NameNode, и вызовет метод addBlock() NameNode, чтобы узнать, на какие узлы данных должен быть записан блок.
При записи данных они сначала записываются на чаке с 4-байтовой контрольной суммой, всего 516 байт, а затем эти чаки записываются в пакет большей структуры.После того, как пакет заполнен несколькими чаками, поставьте пакет в очередь называется очередью данных, а затем сделать две вещи
-
Пакеты в очереди данных передаются узлу данных DataNode, и порядок передачи соответствует списку, возвращаемому методом addBlock() узла NameNode.
-
При передаче в DataNode также передается в очередь подтверждений ack queue
-
Сделайте контрольную сумму для данных, переданных в DataNode, и сравните ее с исходной контрольной суммой перед упаковкой.
-
Если проверка прошла успешно, удалите пакет из очереди подтверждения очереди подтверждения, иначе пакет будет повторно вставлен в очередь данных для повторной передачи
После завершения NameNode может узнать, что реплика была создана с помощью механизма пульса, а затем вызвать метод addBlock() для записи файла после этого.
Ненормальная ситуация не будет повторно объяснена, вы можете сразу перейти ко второй статье для просмотра.
1. Модель программирования MapReduce
MapReduce — это распределенная вычислительная среда, разработанная с идеей «разделяй и властвуй».
Когда один сервер не может выполнять сложные или ресурсоемкие задачи, его можно разделить на небольшие задачи, которые выполняются параллельно на разных серверах, а результаты каждой небольшой задачи могут быть суммированы в конце.
MapReduce состоит из двух этапов:Разделите стадию «Карта» на небольшие задачи, а стадию «Уменьшение» — на подведение итогов., как показано на рисунке ниже, следует отметить, что три небольшие задачи могут выполняться параллельно
1.1 Этап карты
Ввод функции map() — это пара ключ-значение, вывод — серия пар ключ-значение, а результат вывода записывается на локальный диск.
1.2 Ступень уменьшения
Ввод функции reduce() — это пара ключ-значение (то есть вывод функции map()), а вывод — серия пар ключ-значение, которые в конечном итоге записываются в HDFS.
Общая логика очень ясна на рисунке ниже, а процесс перемешивания будет объяснен позже.
2. Пример программирования MapReduce
Статистика частоты слов, которую невозможно избежать, подсчитывает количество вхождений каждого слова в статье.
2.1 Схематический анализ
Слева направо идет файл, HDFS хранит его в блоках, и каждый блок тоже можно рассматривать как сплит, и тогда он дает пару kv (0, Дорогая Медвежья река), Почему ключ 0? 0 здесь на самом деле является смещением, которое будет меняться в зависимости от размера байтов данных в файле. В текущем примере мы пока не можем его использовать.Все, что нам нужно сделать, это разделить Дорогой Медведь Ривер как значение, а затем выполнить статистику.После того, как статистика будет завершена, начните читать Дорогой Автомобиль во второй строке, и выводить то же самое.
После подсчета трех блоков, разделенных на этот файл, одно и то же слово агрегируется на тот же узел для статистики, и можно получить результат.
Вопросы, о которых следует знать
1. Мы видим, что на приведенном выше рисунке 4 слова и 4 задачи редукции, но количество задач редукции определяется самими разработчиками, это просто вопрос SetReduceNum(4).
2. Почему сокращение может знать, сколько слов есть, давайте поговорим о случайном порядке.
3. Осторожно, вы должны обнаружить, что после перетасовки 4 (Уважаемый, 1), а разве не должен быть только один ключ?
2.2 код картографа
public class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String word : words) {
// 每个单词出现1次,作为中间结果输出
context.write(new Text(word), new IntWritable(1));
}
}
}
здесьLongWritable соответствует типу Long в Java, а Text соответствует типу String., поскольку существуют проблемы сериализации и десериализации, когда данные передаются от одного узла к другому в распределенной среде, сам Hadoop предоставляет некоторые классы с функциями сериализации, которые мы можем использовать, что мы обычно и видим. Пара ключ-значение (Long, String ), который здесь становится (LongWritable, Text).
После этого метод map() переопределяется для достижения сегментации слов, а затем каждое слово используется в качестве ключа и выводится в состоянии (слово, 1).
Если вы хотите увидеть эти методы API, вы можете перейти кофициальный сайт хаупПроверьте, я все еще использую здесь 2.7.3, студенты, которые читали предыдущую статью, также должны знать
Здесь два маппера, потому что первый маппер — это старый маппер, а теперь используется новый. После нажатия кнопки «Метод» вы увидите метод map(), который вы только что использовали.
2.3 Код редуктора
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
/*
key: hello
value: List(1, 1, ...)
*/
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : values) {
sum = sum + count.get();
}
context.write(key, new IntWritable(sum));// 输出最终结果
};
}
С основанием предыдущего 2.2 этот код больше не поясняется, то есть значение накапливается, а потом получается сумма.Ключ по-прежнему ссылается на слово, а потом выводится в состоянии (слово , сумма).
Дополнение: когда список в значении очень большой, он выберет увеличение памяти кластера или установит некоторые ограничения при чтении предложений (пользовательский класс InputFormat, по умолчанию MapReduce — TextInputFormat), чтобы уменьшить размер данных.
2.4 Метод main() выполнения программы
В основном каждый из основных методов здесь напрямую копируется, а затем заполняется параметрами установленного метода для непосредственного использования.
public class WordMain {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
if (args.length != 2 || args == null) {
System.out.println("please input Path!");
System.exit(0);
}
Configuration configuration = new Configuration();
// 生成一个job实例
Job job = Job.getInstance(configuration, WordMain.class.getSimpleName());
// 打jar包之后,找程序入口用
job.setJarByClass(WordMain.class);
// 通过job设置输入/输出格式
// MR的默认输入格式就是TextInputFormat,所以注释掉也没问题
//job.setInputFormatClass(TextInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入/输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置处理Map/Reduce阶段的类
job.setMapperClass(WordMap.class);
job.setReducerClass(WordReduce.class);
//如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型
//job.setMapOutputKeyClass(.class)
// 设置最终输出key/value的类型m
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 提交作业
job.waitForCompletion(true);
}
}
Режим работы можно запустить локально, в кластере, или в maven запаковать и запустить, а результаты работы можно посмотреть через пряжу, так как учитывая, что вы можете не успеть собрать кластер, чтобы играть здесь, я буду не отображать его здесь Я поделюсь кратким ответом позже, когда у меня будет возможность Кластер из 3 узлов построен.
2.5 combiner
Локальная агрегация на стороне карты, независимо от того, сколько операций объединителя выполняется, это не повлияет на конечный результат
Примечание. Не все программы MapReduce подходят для использования, например, для поиска среднего значения.
WordCountMap与WordCountReduce代码不变
WordCountMain中,增加job.setCombinerClass(WordCountReduce.class);
Пара ключ-значение выглядит как на первой картинке в начале.Теперь когда мы просто проходим Mapping будет большое количество пар ключ-значение.Они будут передаваться в соответствующий Reduction по сети.Если они все согласно (слово, 1) После того, как формат передан, объем передаваемых данных становится очень большим, поэтому лучшее решение в это время — делать сводку по определенному слову локально, то есть операцию объединения, как показано на цифра, два (Дорогой, 1) изменить Становится один (Дорогой, 2), 2 (Автомобиль, 1) становится (Автомобиль, 2) и т.д...
2.6 Процесс перемешивания
Когда задача карты выводится, она будет выводиться в кольцевой буфер.Каждый кольцевой буфер имеет размер 100 МБ.При непрерывном чтении и записи данных память кольцевого буфера достигает 80%.В это время он будет вызвать переполнение для записи на диск.Эти файлы записываются на диск, и эта операция записи на диск будет проходить через 3 процесса
Первый — это секционирование.По умолчанию ключ используется для операций секционирования.Среда MapReduce предоставляет HashPartitioner для операций секционирования.
Пара kv кольцевого буфера должна вызывать метод getPartition() перед попаданием на диск, в данный момент мы видим, что он использует более хитрый метод:Сначала вычисляем хэш-код этого ключа, а затем модулируем количество редукторов.В это время мы смотрим на картинку выше, количество редукторов равно 4, затем мы модулируем 4 числом, и результат будет только 4 Это 0,1,2,3, поэтому эти четыре результата будут соответствовать разным буферам.
Остальное — это задача сокращения для извлечения данных, которые будут помещены в память в начале и будут переполнены и записаны на диск, когда их нельзя будет записать.
Конечно, если в начале выполнить операцию setCombine, она станет (Уважаемый, 4).На рисунке, потому что мы пример, на практике каждый раздел имеет много разных слов, которые будут выполняться во время операции сокращения Операция слияния, то есть одинаковые ключи объединяются, а затем сортируются по алфавиту.
объединение, слияние и окончательное сокращение задачи, эти функции одинаковы, но этапы действия разные, что удобно для повышения производительности. Пока бизнес-требования выполняются, иногда карта может удовлетворить требования, а иногда требуются две фазы карты и сокращения.
После этого результат каждой задачи редукции будет записываться в файл в HDFS. Когда задача карты будет завершена, будет appMaster, когда пряжа будет упомянута позже, и будет сделано подтверждение опроса, а после завершения подтверждения задача сокращения будет уведомлена о необходимости извлечения с локального диска. формируется относительно ясное понятие, что тоже очень нормально.
2.7 Вторичная сортировка
В MapReduce сортировка разделов и группировка выполняются по ключу.Если нужно настроить тип ключа и правило сортировки ключа, как это реализовать (совместно с объяснением кода)
public class Person implements WritableComparable<Person> {
private String name;
private int age;
private int salary;
public Person() {
}
public Person(String name, int age, int salary) {
//super();
this.name = name;
this.age = age;
this.salary = salary;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getSalary() {
return salary;
}
public void setSalary(int salary) {
this.salary = salary;
}
@Override
public String toString() {
return this.salary + " " + this.age + " " + this.name;
}
//先比较salary,高的排序在前;若相同,age小的在前
public int compareTo(Person o) {
int compareResult1= this.salary - o.salary;
if(compareResult1 != 0) {
return -compareResult1;
} else {
return this.age - o.age;
}
}
//序列化,将NewKey转化成使用流传送的二进制
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeInt(age);
dataOutput.writeInt(salary);
}
//使用in读字段的顺序,要与write方法中写的顺序保持一致
public void readFields(DataInput dataInput) throws IOException {
//read string
this.name = dataInput.readUTF();
this.age = dataInput.readInt();
this.salary = dataInput.readInt();
}
}
Содержание объяснения
2.8 Перекос данных
Перекос данных является распространенным явлением в данных. Выбросы неизбежно появятся в данных и приведут к искажению данных. Эти выбросы могут значительно замедлить выполнение MapReduce. Общие искажения данных делятся на следующие категории:
- Частота данных искажена — количество данных в одной области намного больше, чем в других областях. (уменьшить наклон)
- Неравномерность размера данных — некоторые записи намного больше среднего. (наклон карты)
Перекос данных может возникать как на стороне карты, так и на стороне уменьшения. Перекос данных на стороне карты снижает эффективность обработки различных наборов данных. Перекос данных на стороне сокращения часто возникает из-за разделителя MapReduce по умолчанию.
Перекос данных значительно продлит время выполнения карты и уменьшит количество задач, а также приведет к тому, что операции, требующие кэширования наборов данных, будут потреблять больше ресурсов памяти.
2.8.1 Как диагностировать перекос данных
- Обратите внимание на проблему перекоса частоты данных в выходных данных карты.
- Как диагностировать, какие ключи в выводе карты имеют перекос данных?
-
Добавьте функцию записи деталей ключа вывода карты в методе сокращения.
-
После обнаружения искаженных данных необходимо диагностировать те ключи, которые вызывают искаженные данные. Простой способ — реализовать отслеживание каждого ключа в коде.максимальное значение. Чтобы уменьшить объем отслеживания, вы можете установить порог объема данных, отслеживать только те ключи, объем данных которых больше порога, и выводить их в журнал.
-
8.2 Замедление перекоса данных на стороне уменьшения
-
Уменьшение перекоса данных обычно относится к ситуации, когда в выходных данных карты существует перекос частоты данных, то есть объем данных некоторых выходных ключей намного больше, чем объем данных других выходных ключей.
-
Как уменьшить потерю производительности из-за перекоса данных на стороне уменьшения?
① Выборка и разделение диапазона
Hadoop默认的分区器是基于map输出键的哈希值分区。这仅在数据分布比较均匀时比较好。在有数据倾斜时就很有问题。
使用分区器需要首先了解数据的特性。**TotalOrderPartitioner**中,可以通过对原始数据进行抽样得到的结果集来预设分区边界值。TotalOrderPartitioner中的范围分区器可以通过预设的分区边界值进行分区。因此它也可以很好地用在矫正数据中的部分键的数据倾斜问题。
② Пользовательский раздел
另一个抽样和范围分区的替代方案是基于输出键的背景知识进行自定义分区。例如,如果map输出键的单词来源于一本书。其中大部分必然是省略词(stopword)。那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。而将其他的都发送给剩余的reduce实例。
③ Объединить
使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。在可能的情况下,combine的目的就是聚合并精简数据。在技术48种介绍了combine。
④ Соединение со стороны карты и полусоединение
如果连接的数据集太大而不能在map端的连接中使用。那么可以考虑第4章和第7章中介绍的超大数据集的连接优化方案。
⑤ Пользовательская стратегия с искажением размера данных
在map端或reduce端的数据大小倾斜都会对缓存造成较大的影响,乃至导致OutOfMemoryError异常。处理这种情况并不容易。可以参考以下方法。
- 设置mapred.linerecordreader.maxlength来限制RecordReader读取的最大长度。RecordReader在TextInputFormat和KeyValueTextInputFormat类中使用。默认长度没有上限。
- 通过org.apache.hadoop.contrib.utils.join设置缓存的数据集的记录数上限。在reduce中默认的缓存记录数上限是100条。
- 考虑使用有损数据结构压缩数据,如Bloom过滤器。
finally
МР не разбит на главы, и объем очень большой, надеюсь, вы сможете терпеливо его прочитать.
Согласно заказу, следующий — Yarn, после процесса больших данных.