Изучение больших данных с нуля — оптимизация производительности Hive

Hadoop
Изучение больших данных с нуля — оптимизация производительности Hive

Я часто использую hive в своей работе, а также много писал HiveQL. Здесь некоторые оптимизации производительности, обычно используемые Hive, резюмируются по трем аспектам.

Оптимизация уровня дизайна таблицы

Использование оптимизации секционированных таблиц

Таблица разделовОн предназначен для классификации и хранения данных в одном или нескольких измерениях, и один раздел соответствует одному каталогу. Если в условии фильтра есть поле раздела, то Hive нужно только просмотреть файлы в соответствующем каталоге раздела и не нужно просматривать глобальные данные, что значительно уменьшает объем обрабатываемых данных и повышает эффективность запросов.

Когда в большинстве случаев запрос таблицы Hive будет отфильтрован по определенному полю, очень удобно создать таблицу разделов.

Используйте оптимизацию таблицы сегментов

После указания количества сегментов при сохранении данных хэшируйте их в соответствии с определенным полем, чтобы определить, в каком сегменте хранить. Цель этого аналогична цели таблицы разделов. Это также делает ненужным глобальный обход всех данных. во время фильтрации, а нужно всего лишь пройтись по всем данным.где ведро на нем.

Выберите подходящий формат хранения файлов

Apache Hive поддерживает несколько знакомых форматов файлов, используемых в Apache Hadoop.

TextFileФормат по умолчанию, если он не указан при создании таблицы, по умолчанию используется этот формат.

Способ хранения: рядовое хранение.

Каждая строка представляет собой запись, и каждая строка заканчивается новой строкой.\nконец. Когда данные не сжаты, нагрузка на диск будет относительно большой, и затраты на синтаксический анализ данных также будут относительно большими.

можно комбинироватьGzip,Bzip2Его можно использовать вместе с другими методами сжатия (система будет автоматически проверять и распаковывать при запросе), но для некоторых алгоритмов сжатия hive не будет разделять данные, поэтому он не может выполнять параллельные операции с данными.

SequenceFile

Двоичный файл, предоставляемый API Hadoop, который прост в использовании, делится и сжимается.

Поддерживаются три варианта сжатия: NONE, RECORD, BLOCK. RECORD имеет низкую степень сжатия, и обычно рекомендуется использовать сжатие BLOCK.

RCFile

Метод хранения: данные разделены на строки, а каждый блок хранится в столбцах.

  • Во-первых, данные делятся на строки, чтобы гарантировать, что одна и та же запись находится в блоке, что позволяет избежать необходимости чтения нескольких блоков при чтении записи.
  • Во-вторых, блочные данные хранятся в формате столбцов, что способствует сжатию данных и быстрому доступу к столбцам.

ORC

Метод хранения: данные разбиты на строки, а каждый блок хранится в столбцах

Новый формат, предоставленный Hive, представляет собой обновленную версию RCFile.Производительность была значительно улучшена, данные можно сжимать и сохранять с быстрым сжатием и быстрым доступом к столбцам.

Parquet

Режим хранения: хранение столбцов

Паркет эффективен для типа масштабного запроса. Для сканирования определенного запроса определенные столбцы в таблице, паркет особенно полезны. Паркет, как правило, используется сжатие Gzip. По умолчанию Snappy.

Parquet поддерживает механизм запросов Impala.

Формат хранения файлов таблицы должен быть максимально адаптированParquetилиORC, что не только уменьшает объем хранилища, но и оптимизирует производительность запросов, сжатия, сопоставления таблиц и т. д.;

Выберите правильный метод сжатия

Оператор Hive, наконец, преобразуется в программу MapReduce для выполнения, а узкое место производительности MapReduce связано ссетевой ввод-вывода такжеДисковый ввод-вывод, чтобы решить узкое место в производительности, самое главноеУменьшить объем данных, сжатие данных — хороший способ. Хотя сжатие уменьшает объем данных, процесс сжатия потребляет ЦП, но в Hadoop узким местом производительности часто является не ЦП, а нагрузка на ЦП невелика, поэтому сжатие полностью использует относительно простаивающий ЦП.

Сравнение часто используемых алгоритмов сжатия

Как выбрать метод сжатия

  1. степень сжатия
  2. Скорость сжатия и распаковки
  3. Поддерживать ли раскол

Файлы, которые поддерживают разбиение, могут иметь несколько программ сопоставления параллельно для обработки больших файлов данных, большинство файлов не поддерживают разбиение, потому что эти файлы можно читать только с самого начала.

Оптимизация синтаксиса и параметров на уровне

Обрезка столбца

Когда Hive считывает данные, он может считывать только столбцы, необходимые в запросе, и игнорировать другие столбцы. Это экономит накладные расходы на чтение, хранение промежуточных таблиц и накладные расходы на консолидацию данных. 

set hive.optimize.cp = true; -- 列裁剪,取数只取查询中需要用到的列,默认为真

Обрезка раздела

В процессе запроса выбираются только нужные разделы, что позволяет уменьшить количество считываемых разделов и объем считываемых данных.

set hive.optimize.pruner=true; // 默认为true

Объединение небольших файлов

Слияние ввода карты

При выполнении программы MapReduce общая ситуация такова, что файлу для обработки необходим преобразователь. Но если источник данных представляет собой большое количество небольших файлов, не будет ли он запускать большое количество задач сопоставления, что приведет к трате большого количества ресурсов. Входные небольшие файлы могут быть объединены, чтобы уменьшить количество задач сопоставления.Детальный анализ

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- Map端输入、合并文件之后按照block的大小分割(默认)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Map端输入,不合并

Карта / уменьшение вывода слияние

Большое количество мелких файлов будет оказывать давление на HDFS и влиять на эффективность обработки. Эффект можно удалить, объединив файлы результатов Map и Reduce.

set hive.merge.mapfiles=true;  -- 是否合并Map输出文件, 默认值为真
set hive.merge.mapredfiles=true; -- 是否合并Reduce 端输出文件,默认值为假
set hive.merge.size.per.task=25610001000; -- 合并文件的大小,默认值为 256000000

Разумно контролировать количество задач map/reduce

Разумно контролировать количество картографов

Сокращение количества картографов может быть достигнуто за счет объединения небольших файлов. Увеличьте количество картографов, контролируя предыдущее сокращение

Метод расчета количества картографов по умолчанию

输入文件总大小:total_size     
hdfs 设置的数据块大小:dfs_block_size       
default_mapper_num = total_size/dfs_block_size

MapReduce предоставляет следующие параметры для управления количеством задач карты:

set mapred.map.tasks=10;

Буквально кажется, что количество мапперов можно задавать напрямую, но к сожалению нет, эту настройку параметра можно задавать только когда больше чемdefault_mapper_numвступит в силу, когда .

Так что, если нам нужно уменьшить количество мапперов, но размер файла фиксирован?

в состоянии пройтиmapred.min.split.sizeУстановите размер файла, обрабатываемого каждой задачей, этот размер может бытьdfs_block_sizeвступит в силу, когда

split_size=max(mapred.min.split.size, dfs_block_size)          
split_num=total_size/split_size                   
compute_map_num = min(split_num,  max(default_mapper_num, mapred.map.tasks))

Это уменьшает количество картографов.

Обобщим метод управления количеством картографов:

  • Если вы хотите увеличить количество мапперов, вы можете установитьmapred.map.tasksна большее значение
  • Если вы хотите уменьшить количество мапперов, вы можете установитьmaperd.min.split.sizeна большее значение
  • Если на вход поступает большое количество мелких файлов, и вы хотите уменьшить количество мапперов, вы можете установитьhive.input.formatОбъединение небольших файлов

Если вы хотите настроить количество мапперов, то перед настройкой необходимо определить примерный размер обрабатываемого файла и форму файла (большое количество мелких файлов или один большой файл), а затем установить соответствующие параметры.

Разумно контролировать количество редукторов

Если редукторов слишком много, один редуктор создаст результирующий файл, который создаст много маленьких файлов.Если эти результирующие файлы будут использоваться в качестве входных данных для следующего задания, возникнет проблема, связанная с необходимостью объединения небольших файлов. , и запустите Инициализация редуктора требует затрат и ресурсов.

Если количество редьюсеров слишком мало, такой редуктор должен обрабатывать большой объем данных, а также может возникнуть проблема перекоса данных, что делает весь запрос трудоемким. По умолчанию количество редукторов, выделяемых ульем, определяется следующими параметрами:

  • Параметр 1:hive.exec.reducers.bytes.per.reducer(по умолчанию 1G)
  • Параметр 2:hive.exec.reducers.max(по умолчанию 999)

Формула расчета редуктора:

N = min(参数2, 总输入数据量/参数1)

Количество редукторов можно контролировать, изменяя значения двух вышеуказанных параметров. также через

set mapred.map.tasks=10; 

Непосредственно управляет количеством редукторов.Если этот параметр установлен, два вышеуказанных параметра будут игнорироваться.

Присоединяйтесь к оптимизации

Сначала отфильтруйте данные

Минимизируйте объем данных на каждом этапе, максимально используйте поля секций для таблицы секций и выбирайте только те столбцы, которые необходимо использовать позже, чтобы свести к минимуму объем данных, задействованных в соединениях.

Принцип объединения малых столов с большими столами

Маленькие таблицы должны объединяться, большие таблицы должны соответствовать принципу объединения больших таблиц с малыми таблицами, потому что операция уменьшения фазы соединения, расположенная слева от объединения, оглавления будет загружена в память, чем меньше записей в таблице слева, может эффективно снизить вероятность переполнения памяти. присоединяйтесь в порядке выполнения слева направо для создания задания, чтобы обеспечить увеличение размера таблицы непрерывной последовательности запросов слева направо.

использовать тот же ключ подключения

В кусте, когда 3 или более таблиц объединены, если условие включения использует одно и то же поле, они будут объединены в задание MapReduce.Используя эту функцию, одно и то же объединение можно поместить в задание, чтобы сэкономить время выполнения.

включить присоединение к карте

Mapjoin напрямую распределяет относительно небольшие таблицы по обеим сторонам соединения в память каждого процесса карты и выполняет операцию соединения в процессе карты, так что шаг сокращения не требуется, тем самым повышая скорость. Только операции соединения могут включать mapjoin.

set hive.auto.convert.join = true; -- 是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。
set hive.mapjoin.smalltable.filesize = 2500000; -- 刷入内存表的大小(字节)
set hive.mapjoin.maxsize=1000000;  -- Map Join所处理的最大的行数。超过此行数,Map Join进程会异常退出

максимально атомарно

Старайтесь избегать того, чтобы SQL содержал сложную логику, вы можете использовать промежуточную таблицу для завершения сложной логики.

Карта бочкового стола

При объединении двух таблиц с сегментами, если соединение представляет собой поле с сегментами, а количество сегментов маленькой таблицы кратно большой таблице, для повышения эффективности можно включить объединение карт.

set hive.optimize.bucketmapjoin = true; -- 启用桶表 map join

Группировать по оптимизации

По умолчанию данные одного и того же ключа на этапе карты будут распределены на уменьшение, а когда данные ключа слишком велики, они будут сгенерированы.перекос данных. провестиgroup byОперацию можно оптимизировать с учетом следующих двух аспектов:

1. Частичная агрегация на стороне карты

На самом деле не все операции агрегации нужно выполнять на стороне Reduce.Многие операции агрегации можно сначала частично агрегировать на стороне Map, а затем получить окончательный результат на стороне Reduce.

set hive.map.aggr=true; -- 开启Map端聚合参数设置

set hive.grouby.mapaggr.checkinterval=100000; -- 在Map端进行聚合操作的条目数目

2. Балансировка нагрузки при искажении данных

set hive.groupby.skewindata = true; -- 有数据倾斜的时候进行负载均衡(默认是false) 

Если для параметра установлено значение true, результирующий план запроса содержит две задачи MapReduce. В первой задаче MapReduce результаты вывода карты будут случайным образом распределяться по редюсерам, каждый редьюсер будет выполнять операции частичной агрегации и выводить результаты, чтобы результаты обработки были одинаковыми.group by keyМожно распределить по разным редьюсерам для достижения цели балансировки нагрузки; вторая задача MapReduce основана на результатах предварительной обработки данных.group by keyРаспространяется на каждое сокращение и, наконец, завершает окончательную операцию агрегирования.

Упорядочить по оптимизации

order byЭто можно сделать только в процессе сокращения, поэтому, если большой набор данныхorder by, что приведет к тому, что данные, обрабатываемые в процессе сокращения, будут довольно большими, что приведет к медленному выполнению запроса.

  • на окончательный результатorder by, не сортируйте большой набор данных посередине. Если окончательный результат меньше и может быть отсортирован по уменьшению, сделайте это на конечном наборе результатов.order by.
  • Если это первые N фрагментов данных после десортировки, вы можете использоватьdistribute byа такжеsort byПервые N элементов сортируются при каждом редукции, а затем наборы результатов каждого редукции объединяются и сортируются глобально в одном редукции, а затем берутся первые N элементов, потому что те, которые участвуют в глобальной сортировкеorder byОбъем данных не болееreduce个数 * N, поэтому эффективность выполнения очень высока.

Оптимизация COUNT DISTINCT

-- 优化前(只有一个reduce,先去重再count负担比较大):
select count(distinct id) from tablename;

-- 优化后(启动两个job,一个job负责子查询(可以有多个reduce),另一个job负责count(1)):
select count(1) from (select distinct id from tablename) tmp;

Прочитайте один раз и вставьте много раз

В некоторых сценариях после чтения данных из таблицы ее нужно использовать несколько раз, в этом случае можно использоватьmulti insertграмматика:

from sale_detail
  insert overwrite table sale_detail_multi partition (sale_date='2010', region='china' ) 
  select shop_name, customer_id, total_price where .....
  insert overwrite table sale_detail_multi partition (sale_date='2011', region='china' )
  select shop_name, customer_id, total_price where .....;

проиллюстрировать:

  • Как правило, один SQL может записывать до 128 каналов вывода.Если записано более 128 каналов, будет сообщено о синтаксической ошибке.
  • В мультивставке:
    • Для многораздельных таблиц одна и та же целевая секция не может появляться более одного раза.
    • Для несекционированных таблиц таблица не может появляться более одного раза.
  • Для разных разделов одной и той же таблицы разделов не может быть одновременныхinsert overwriteа такжеinsert intoоперации, иначе будет возвращена ошибка.

включить сжатие

сжатие вывода карты

set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

Промежуточное сжатие данных

Промежуточное сжатие данных предназначено для сжатия данных между несколькими заданиями, запрашиваемыми ульем. Лучше всего выбрать метод сжатия, который экономит процессорное время. может быть использованsnappyАлгоритм сжатия, эффективность сжатия и распаковки этого алгоритма очень высока.

set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;

Результаты сжатия данных

Окончательные данные результата (выходные данные редуктора) также могут быть сжаты, и можно выбрать хороший эффект сжатия, можно уменьшить размер диска для чтения и записи данных и данных времени; Примечание. Широко используемый алгоритм сжатия gzip, snappy не поддерживает параллельную обработку. Если источником данных являются большие архивные файлы gzip / snappy, для обработки этого файла потребуется только средство сопоставления, это серьезно повлияет на эффективность запроса. Таким образом, если результирующие данные в качестве источника данных нуждаются в других задачах запроса, вы можете выбрать поддержку разделяемогоLZOЭтот алгоритм может не только сжимать файл результата, но и обрабатывать его параллельно, что позволяет значительно повысить скорость выполнения задания. Узнайте, как установить библиотеку сжатия LZO для кластера Hadoop.эта статья.

set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;  
set mapreduce.output.fileoutputformat.compress.type=BLOCK;

Кластеры Hadoop поддерживают следующие алгоритмы:

  • org.apache.hadoop.io.compress.DefaultCodec
  • org.apache.hadoop.io.compress.GzipCodec
  • org.apache.hadoop.io.compress.BZip2Codec
  • org.apache.hadoop.io.compress.DeflateCodec
  • org.apache.hadoop.io.compress.SnappyCodec
  • org.apache.hadoop.io.compress.Lz4Codec
  • com.hadoop.compression.lzo.LzoCodec
  • com.hadoop.compression.lzo.LzopCodec

Оптимизация уровня архитектуры Hive

Включить прямое получение

У Hive есть два способа чтения данных из HDFS: включить чтение MapReduce и получить их напрямую.

Прямая выборка данных выполняется намного быстрее, чем чтение данных в MapReduce, но только несколько операций могут использовать прямую выборку.

в состоянии пройтиhive.fetch.task.conversionПараметры для настройки, при каких обстоятельствах используется метод прямого захвата:

  • minimal:Толькоselect *, на поле разделаwhereфильтр, естьlimitТолько в этих трех сценариях можно включить метод прямого захвата.
  • more:существуетselect,whereфильтр,limit, прямая загрузка включена.
set hive.fetch.task.conversion=more; -- 启用fetch more模式

локализованное исполнение

Когда Hive запрашивает кластер, он по умолчанию запускается на нескольких компьютерах в кластере и требует, чтобы несколько компьютеров работали в координации.Этот метод очень хорошо решает проблему запроса большого объема данных. Однако, когда объем данных, обрабатываемых запросом Hive, относительно невелик, фактически нет необходимости запускать распределенный режим для выполнения, поскольку выполнение проекта распределенным образом для передачи между сетями, координации нескольких узлов и т. д., потребляет ресурсы. Для небольших наборов данных все задачи могут быть обработаны на одной машине через локальный режим, и время выполнения может быть значительно сокращено.

set hive.exec.mode.local.auto=true; -- 打开hive自动判断是否启动本地模式的开关
set hive.exec.mode.local.auto.input.files.max=4; -- map任务数最大值
set hive.exec.mode.local.auto.inputbytes.max=134217728; -- map输入文件最大大小

повторное использование JVM

Оператор Hive в конечном итоге будет преобразован в серию задач MapReduce. Каждая задача MapReduce состоит из серии задач Map и Reduce Task. По умолчанию задача Map или задача Reduce в MapReduce запускает процесс JVM и задачу после выполнения. , процесс JVM завершится. Таким образом, если задача занимает короткое время и JVM необходимо запускать несколько раз, время запуска JVM станет относительно большим потреблением, и в настоящее время это можно решить путем повторного использования JVM.

set mapred.job.reuse.jvm.num.tasks=5;

Есть у JVM и недостатки: включение повторного использования JVM всегда будет занимать слот используемой задачи для повторного использования и не будет освобождаться до тех пор, пока задача не будет выполнена. если不平衡的jobЕсли есть несколько задач сокращения, выполнение которых занимает гораздо больше времени, чем другие задачи сокращения, зарезервированные слоты останутся свободными, но не могут использоваться другими заданиями, пока все задачи не будут завершены.

параллельное выполнение

Для некоторых операторов запросов hive преобразует их в одну или несколько стадий, в том числе: стадию MapReduce, стадию выборки, стадию слияния, стадию ограничения и т. д. По умолчанию одновременно выполняется только один этап. Однако некоторые этапы могут выполняться параллельно, если они не являются взаимозависимыми. Многоэтапный параллелизм потребляет больше системных ресурсов.

set hive.exec.parallel=true;  -- 可以开启并发执行。
set hive.exec.parallel.thread.number=16;  -- 同一个sql允许最大并行度,默认为8。

Спекулятивное исполнение

В среде распределенного кластера из-за программных ошибок (включая ошибки самого Hadoop), несбалансированной нагрузки или неравномерного распределения ресурсов скорость выполнения нескольких задач одного и того же задания может быть непостоянной, а скорость выполнения некоторых задач может быть очевидной. , Медленнее, чем другие задачи (например, задача задания выполняется только на 50%, в то время как все остальные задачи завершили выполнение), то эти задачи будут замедлять общий ход выполнения задания. Чтобы избежать этой ситуации, Hadoop использует механизм спекулятивного выполнения (Speculative Execution), который выводит «обратную» задачу в соответствии с определенными правилами и запускает задачу резервного копирования для такой задачи, чтобы задача и исходная задача могли обрабатываться. та же задача в то же время данные, и, наконец, выберите результат расчета первой успешной операции для завершения задачи в качестве конечного результата.

set mapreduce.map.speculative=true;
set mapreduce.reduce.speculative=true;

предложение:

Эти функции можно отключить, если пользователь очень чувствителен к отклонениям во время выполнения. Если пользователю необходимо выполнить длинную карту или уменьшить задачу из-за большого количества входных данных, потери, вызванные запуском спекулятивного выполнения, огромны.



Расширенное чтение