A, Профиль желоба
Apache Flume — это распределенная высокодоступная система сбора данных. Он может собирать данные из разных источников данных и отправлять их в систему хранения после агрегации, которая обычно используется для сбора данных журналов. Flume разделен на две версии: NG и OG (до 1.0), NG был полностью переработан на основе OG и на данный момент является наиболее широко используемой версией. Следующие описания основаны на NG.
2. Архитектура Flume и основные концепции
На следующем рисунке показана базовая архитектура Flume:
2.1 Базовая архитектура
Внешние источники данных отправляются в Flume в определенном формате.events
(Событие), когдаsource
полученоevents
, он сохраняет его в одном или несколькихchannel
,channe
будет держатьevents
пока это неsink
потребляется.sink
Основная функция отchannel
читать вevents
, и сохранить его во внешней системе хранения или переслать на следующийsource
, после успешногоchannel
удалено вevents
.
2.2 Основные понятия
1. Event
Evnet
Это основная единица передачи данных Flume NG. Аналогично сообщениям в JMS и системах обмена сообщениями. ОдинEvnet
Состоит из заголовка и тела: первое представляет собой карту ключ/значение, второе представляет собой произвольный массив байтов.
2. Source
Компонент сбора данных собирает данные из внешних источников данных и сохраняет их в Channel.
3. Channel
Channel
Это канал между источником и приемником, используемый для временного хранения данных. Может быть в памяти или постоянной файловой системой:
-
Memory Channel
: Преимущество использования памяти в том, что она работает быстро, но данные могут быть потеряны (например, при внезапном простое); -
File Channel
: Преимущество использования постоянной файловой системы заключается в том, что данные не теряются, но это медленно.
4. Sink
Sink
Основная функция отChannel
читать вEvnet
, и сохранить его во внешней системе хранения или переслать на следующийSource
, после успешногоChannel
удалено вEvent
.
5. Agent
это автономный (JVM) процесс, который содержитSource
,Channel
,Sink
и другие компоненты.
2.3 Типы компонентов
Каждый компонент в Flume предоставляет расширенные типы для различных сценариев:
-
Тип источника: встроены десятки типов, например
Avro Source
,Thrift Source
,Kafka Source
,JMS Source
; -
Тип раковины:
HDFS Sink
,Hive Sink
,HBaseSinks
,Avro Sink
Ждать; -
Тип канала:
Memory Channel
,JDBC Channel
,Kafka Channel
,File Channel
Ждать.
При использовании Flume, если нет особых потребностей, большинство потребностей можно удовлетворить, комбинируя различные типы встроенных источников, приемников и каналов. существуетОфициальный сайт FlumeПараметры конфигурации всех типов компонентов подробно описаны в таблице выше, а образцы конфигурации прилагаются, при этом параметры разных версий могут немного отличаться, поэтому рекомендуется выбирать Руководство пользователя соответствующего версия официального сайта в качестве основной версии.
3. Режим архитектуры Flume
Flume поддерживает различные архитектурные шаблоны, которые описаны ниже.
3.1 multi-agent flow
Flume поддерживает передачу данных между несколькими агентами, что требует, чтобы и приемник предыдущего агента, и источник следующего агента былиAvro
Тип Sink указывает на имя хоста (или IP-адрес) и порт, на котором находится источник (см. Вариант 3 ниже для подробной настройки).
3.2 Consolidation
Часто для сбора журналов используется большое количество клиентов (например, распределенных веб-сервисов).Flume поддерживает использование нескольких агентов для сбора журналов по отдельности, а затем объединяет их с помощью одного или нескольких агентов перед сохранением в файловой системе.
3.3 Multiplexing the flow
Flume поддерживает передачу событий от одного источника к нескольким каналам, то есть к нескольким приемникам.Эта операция называетсяFan Out
(Разветвление). по умолчаниюFan Out
копируется во все каналыEvent
, то есть данные, полученные всеми Каналами, одинаковы. В то же время Flume также поддерживаетSource
Настройте селектор мультиплексирования для реализации пользовательских правил маршрутизации.
4. Формат конфигурации Flume
Настройка Flume обычно требует следующих двух шагов:
- Определите источники, приемники и каналы агента соответственно, а затем привяжите источники и приемники к каналу. Следует отметить, что источник может быть настроен на несколько каналов, а приемник может быть настроен только на один канал. Основной формат следующий:
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>
- Определите конкретные свойства Источника, Приемника и Канала соответственно. Основной формат следующий:
<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>
# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>
Пять, установка и развертывание Flume
Для удобства, поздний обзор, этот склад устанавливает все программное обеспечение одни главу, монтаж рулет, см.:
Установка и развертывание Flume в среде Linux
6. Корпус использования Flume
Представьте несколько вариантов использования Flume:
- Вариант 1. Используйте Flume для отслеживания изменений содержимого файла и вывода нового добавленного содержимого на консоль.
- Вариант 2. Используйте Flume для мониторинга указанного каталога и сохранения вновь добавленных файлов в каталоге в HDFS.
- Вариант 3: Используйте Avro для отправки данных журнала, собранных этим сервером, на другой сервер.
6.1 Случай 1
Требования: Отслеживайте изменения содержимого файла и выводите новое добавленное содержимое на консоль.
Реализация: в основном используетсяExec Source
Сотрудничатьtail
реализация команды.
1. Конфигурация
Новый файл конфигурацииexec-memory-logger.properties
Содержание следующее:
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
#将sources与channels进行绑定
a1.sources.s1.channels = c1
#配置sink
a1.sinks.k1.type = logger
#将sinks与channels进行绑定
a1.sinks.k1.channel = c1
#配置channel类型
a1.channels.c1.type = memory
2. Запуск
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \
--name a1 \
-Dflume.root.logger=INFO,console
3. Тест
Добавить данные в файл:
Дисплей консоли:
6.2 Случай 2
Требования: Отслеживайте указанный каталог и сохраняйте вновь добавленные файлы в каталоге в HDFS.
Реализация: использованиеSpooling Directory Source
а такжеHDFS Sink
.
1. Конфигурация
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type =spooldir
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName
#将sources与channels进行绑定
a1.sources.s1.channels =c1
#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#将sinks与channels进行绑定
a1.sinks.k1.channel = c1
#配置channel类型
a1.channels.c1.type = memory
2. Старт
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/spooling-memory-hdfs.properties \
--name a1 -Dflume.root.logger=INFO,console
3. Тест
Скопируйте любой файл в каталог для прослушивания, вы можете увидеть путь файла, загруженного в HDFS, из журнала:
# cp log.txt logs/
Проверьте, соответствует ли содержимое файла, загруженного в HDFS, локальному:
# hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801
6.3 Случай 3
Требования: Отправьте данные, собранные этим сервером, на другой сервер.
Реализация: использованиеavro sources
а такжеavro Sink
выполнить.
1. Настройте сбор логов Flume
Новая конфигурацияnetcat-memory-avro.properties
, прослушайте изменения в содержимом файла, а затем передайте новое содержимое файла черезavro sink
Отправьте на порт 8888 сервера hadoop001:
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
a1.sources.s1.channels = c1
#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1
#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
2. Настройте агрегацию журналов Flume
использоватьavro source
Прослушайте порт 8888 сервера hadoop001 и выведите полученный контент в консоль:
#指定agent的sources,sinks,channels
a2.sources = s2
a2.sinks = k2
a2.channels = c2
#配置sources属性
a2.sources.s2.type = avro
a2.sources.s2.bind = hadoop001
a2.sources.s2.port = 8888
#将sources与channels进行绑定
a2.sources.s2.channels = c2
#配置sink
a2.sinks.k2.type = logger
#将sinks与channels进行绑定
a2.sinks.k2.channel = c2
#配置channel类型
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
3. Старт
Запускаем агрегацию логов Flume:
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \
--name a2 -Dflume.root.logger=INFO,console
Соберите Flume в журналах запуска:
flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \
--name a1 -Dflume.root.logger=INFO,console
Рекомендуется начинать в указанном выше порядке, т.к.avro.source
Сначала он будет привязан к порту, так чтоavro sink
При подключении он не сообщит об исключении, что невозможно подключиться. Но неважно, если это начнется не по порядку,sink
Он будет продолжать попытки, пока соединение не будет установлено.
4. Тест
податьtmp/log.txt
Добавить содержимое:
Вы можете видеть, что контент отслеживался с порта 8888 и успешно выводился на консоль:
Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным