Flume создает систему сбора логов

Apache Flume

title: Flume создает систему сбора логов дата: 2018-02-03 19:45 тэги: [кафка, флюм]

1. Введение в Flume

1. Особенности Flume
  • Flume — это распределенная, надежная и высокодоступная массивная коллекция журналов. , системы агрегации и передачи
  • Модель потока данных: источник-канал-приемник
  • Механизм транзакций обеспечивает надежность доставки сообщений
  • Встроенные богатые плагины, легко интегрируемые с другими системами
  • Реализация Java, отличный дизайн системы, понятные модули, простота разработки

2. Схема прототипа лотка

Flume原型图.png

3. Основные компоненты лотка

  • Событие: основная единица сообщения, состоящая из заголовка и тела.
  • Агент: процесс JVM, отвечающий за пересылку сообщений, созданных внешним источником на одном конце, внешнему получателю на другом конце.
    • Источник: чтение событий из внешних источников и запись в канал
    • Канал: временный компонент события, после записи исходника событие всегда будет сохраняться,
    • Приемник: чтение события из канала и запись в пункт назначения

3. Поток событий Flume

Flume事件流.png

4. Поток данных Flumes

Flume数据流.png
Flume数据流2.png

2. Сборка лотка

1. Загрузите бинарный установочный пакет

ссылка для скачивания:Flume.apache.org/download. Договор ...

2. Установите Flume

Разархивируйте файл установочного пакета

[hadoop@hadoop01 apps]$ tar -zxvf apache-flume-1.8.0-bin.tar.gz 
[hadoop@hadoop01 apps]$ cd apache-flume-1.8.0-bin/
[hadoop@hadoop01 apache-flume-1.8.0-bin]$ ll
总用量 148
drwxr-xr-x.  2 hadoop hadoop    62 1月  21 14:31 bin
-rw-r--r--.  1 hadoop hadoop 81264 9月  15 20:26 CHANGELOG
drwxr-xr-x.  2 hadoop hadoop   127 1月  21 14:31 conf
-rw-r--r--.  1 hadoop hadoop  5681 9月  15 20:26 DEVNOTES
-rw-r--r--.  1 hadoop hadoop  2873 9月  15 20:26 doap_Flume.rdf
drwxr-xr-x. 10 hadoop hadoop  4096 9月  15 20:48 docs
drwxr-xr-x.  2 hadoop hadoop  8192 1月  21 14:31 lib
-rw-r--r--.  1 hadoop hadoop 27663 9月  15 20:26 LICENSE
-rw-r--r--.  1 hadoop hadoop   249 9月  15 20:26 NOTICE
-rw-r--r--.  1 hadoop hadoop  2483 9月  15 20:26 README.md
-rw-r--r--.  1 hadoop hadoop  1588 9月  15 20:26 RELEASE-NOTES
drwxr-xr-x.  2 hadoop hadoop    68 1月  21 14:31 tools
[hadoop@hadoop01 apache-flume-1.8.0-bin]$ 

3. Создайте мягкое соединение [этот шаг можно пропустить]

[root@hadoop01 bin]# ln -s /home/hadoop/apps/apache-flume-1.8.0-bin /usr/local/flume

4. Настройте переменные среды

Отредактируйте файл /etc/profile и добавьте следующее:

export FLUME_HOME=/usr/local/flume
export PATH=$PATH:${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${HIVE_HOME}/bin:${FLUME_HOME}/bin

4. Запустить лоток

Используйте example.conf Запустите файл конфигурации экземпляра

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Команда запуска выглядит следующим образом:

[root@hadoop01 conf]# pwd
/home/hadoop/apps/apache-flume-1.8.0-bin/conf
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file  example.conf --name a1 -Dflume.root.logger=INFO,console

После успешного запуска отображается следующий рисунок:

........略
18/01/27 18:17:25 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
18/01/27 18:17:25 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@20470f counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/01/27 18:17:25 INFO node.Application: Starting Channel c1
18/01/27 18:17:25 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
18/01/27 18:17:25 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/01/27 18:17:25 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/01/27 18:17:26 INFO node.Application: Starting Sink k1
18/01/27 18:17:26 INFO node.Application: Starting Source r1
18/01/27 18:17:26 INFO source.NetcatSource: Source starting
18/01/27 18:17:26 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

отправить данные через телнет

[root@hadoop01 apps]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Are you OK ?
OK

Консоль печатает следующим образом:

Impl[/127.0.0.1:44444]
18/01/27 18:21:00 INFO sink.LoggerSink: Event: { headers:{} body: 41 72 65 20 79 6F 75 20 4F 4B 20 3F 0D          Are you OK ?. }

Если вы не можете использовать telnet, сначала установите инструмент telnet.

[root@hadoop01 apps]# yum -y install telnet

3. Практика на флюме

1. Список исходных компонентов

  • Источник: подключайтесь к различным внешним источникам данных и отправляйте собранные события в канал. Один источник может отправлять события в несколько каналов. Flume имеет очень богатый встроенный источник, и пользователи могут настраивать источник
Тип источника Type использовать
Avro Source avro Запустите сервер Avro, который может подключаться к агенту верхнего уровня.
HTTP Source http Запустите HTTP-сервер
Exec Source exec Выполните команду UNIX, получите стандартный вывод, например tail -f
Taildir Source TAILDIR Прослушать каталог или файл
Spooling Directory Source spooldir Мониторинг новых файлов в каталоге
Kafka Source org.apache.flume.sourc e.kafka.KafkaSource Чтение данных Кафки
JMS Source jms Чтение данных из источника JMS

2.Исходный агент avro и исходный агент Exec

  • Для настройки avroagent файл конфигурации avrosource.conf выглядит следующим образом:
//avrosource.conf
avroagent.sources = r1
avroagent.channels = c1
avroagent.sinks = k1 
avroagent.sources.r1.type = avro
avroagent.sources.r1.bind = 192.168.43.20
avroagent.sources.r1.port = 8888
avroagent.sources.r1.threads= 3
avroagent.sources.r1.channels = c1
avroagent.channels.c1.type = memory
avroagent.channels.c1.capacity = 10000 
avroagent.channels.c1.transactionCapacity = 1000
avroagent.sinks.k1.type = logger
avroagent.sinks.k1.channel = c1
  • Начать агент Avrosource
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file avrosource.conf  --name avroagent -Dflume.root.logger=INFO,console

Запуск прошел успешно, как показано на следующем рисунке:

...略
18/01/27 18:46:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/01/27 18:46:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/01/27 18:46:36 INFO node.Application: Starting Sink k1
18/01/27 18:46:36 INFO node.Application: Starting Source r1
18/01/27 18:46:36 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 192.168.43.20, port: 8888 }...
18/01/27 18:46:37 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
18/01/27 18:46:37 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
18/01/27 18:46:37 INFO source.AvroSource: Avro source r1 started
  • Настройте execAgent для установления последовательного соединения с sourceAgent.Файл конфигурации execsource.conf выглядит следующим образом:
execagent.sources = r1 
execagent.channels = c1
execagent.sinks = k1
execagent.sources.r1.type = exec 
execagent.sources.r1.command = tail -F /home/hadoop/apps/flume/execsource/exectest.log
execagent.sources.r1.channels = c1
execagent.channels.c1.type = memory
execagent.channels.c1.capacity = 10000 
execagent.channels.c1.transactionCapacity = 1000
execagent.sinks.k1.type = avro
execagent.sinks.k1.channel = c1
execagent.sinks.k1.hostname = 192.168.43.20
execagent.sinks.k1.port = 8888
  • Запустите execAgent и внедрите execagent для отслеживания изменений файлов, sourceAgent для получения изменений.

запустить execAgent

[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file execsource.conf --name execagent

Запуск прошел успешно, как показано на следующем рисунке:

18/01/27 18:58:43 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
18/01/27 18:58:43 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: 192.168.43.20, port: 8888
18/01/27 18:58:43 INFO sink.AvroSink: Attempting to create Avro Rpc client.
18/01/27 18:58:43 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
18/01/27 18:58:44 INFO sink.AbstractRpcSink: Rpc sink k1 started.

Запишите содержимое в файл, отслеживаемый execAgent, и посмотрите, получает ли исходный агент измененное содержимое.

[root@hadoop01 execsource]# echo 222 > exectest.log 
[root@hadoop01 execsource]# echo 5555 >> exectest.log 
[root@hadoop01 execsource]# cat exectest.log 
222
5555

В исходном агенте управления печатной аудиторией сообщения монитора просмотра выглядят следующим образом:

18/01/27 18:58:50 INFO sink.LoggerSink: Event: { headers:{} body: 31 32 33                                        123 }
18/01/27 18:59:55 INFO sink.LoggerSink: Event: { headers:{} body: 35 35 35 35                                     5555 }

Это означает, что два последовательных агента успешно передают информацию.
инструкция:Начальное имя элемента конфигурации в файле конфигурации avroagent должно совпадать с именем запуска службы -имя.

3. Исходный компонент — Буферизация источника каталога

  • Чтобы настроить источник каталога буферизации, содержимое файла конфигурации spooldirsource.conf выглядит следующим образом:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /home/hadoop/apps/flume/spoolDir
a1.sources.r1.fileHeader = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

/home/hadoop/apps/flume/spoolDir должен быть создан с правами пользователя на чтение и запись.

Запустить SpoolDirsourceAgent

[hadoop@hadoop01 conf]$ flume-ng agent --conf conf --conf-file spooldirsource.conf  --name a1 -Dflume.root.logger=INFO,console

Создайте файл в папке spoolDir и запишите содержимое файла, а также обратите внимание на сообщение консоли:

18/01/28 17:06:54 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/apps/flume/spoolDir/test to /home/hadoop/apps/flume/spoolDir/test.COMPLETED
18/01/28 17:06:55 INFO sink.LoggerSink: Event: { headers:{file=/home/hadoop/apps/flume/spoolDir/test} body: 32 32 32                                        222 }

На этом этапе SpoolDirSourceAgent может отслеживать изменения файлов.
Стоит отметить, что:Агент источника каталога буферизации не отслеживает изменения файлов в подпапках и не поддерживает существующие изменения данных обновления файлов..

4. Исходный компонент — Kafka Source

  • Настройка источника Kafa, файл конфигурации kafasource.conf следующим образом:
kafkasourceagent.sources = r1
kafkasourceagent.channels = c1
kafkasourceagent.sinks = k1
kafkasourceagent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource 
kafkasourceagent.sources.r1.channels = c1 
kafkasourceagent.sources.r1.batchSize = 100
kafkasourceagent.sources.r1.batchDurationMillis = 1000
kafkasourceagent.sources.r1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092,192.168.43.24:9092
kafkasourceagent.sources.r1.kafka.topics = flumetopictest1
kafkasourceagent.sources.r1.kafka.consumer.group.id = flumekafkagroupid
kafkasourceagent.channels.c1.type = memory
kafkasourceagent.channels.c1.capacity = 10000 
kafkasourceagent.channels.c1.transactionCapacity = 1000
kafkasourceagent.sinks.k1.type = logger
kafkasourceagent.sinks.k1.channel = c1

Сначала запустите 3 узла службы узла Kafka, выполните на каждом узле Kafka, запустите позже.

[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

Создайте настроенный Topic flumetoptest1 на узле kafka, команда выглядит следующим образом:

[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic flumetopictest1
Created topic "flumetopictest1".

После успешного создания запустите исходный агент kafka с помощью следующей команды:

[root@hadoop01 conf]# flume-ng  agent --conf conf --conf-file kafkasource.conf --name kafkasourceagent -Dflume.root.logger=INFO,console

Создайте производителя Kafka для отправки сообщений

root@hadoop03 bin]# ./kafka-console-producer.sh --broker-list 192.168.43.22:9092,192.168.43.23:9092 --topic flumetopictest1

Отправьте сообщение, и kafka сможет получить сообщение в это время:

18/02/03 20:36:57 INFO sink.LoggerSink: Event: { headers:{topic=flumetopictest1, partition=2, timestamp=1517661413068} body: 31 32 33 31 33 32 32 31                         12313221 }
18/02/03 20:37:09 INFO sink.LoggerSink: Event: { headers:{topic=flumetopictest1, partition=1, timestamp=1517661428930} body: 77 69 20 61 69 79 6F 75 08 08 08                wi aiyou... }

5. Исходный компонент - исходник Taildir

Отслеживайте папку или файл, сопоставляйте отслеживаемый файл источника данных с помощью регулярного выражения, Taildir Source реализует возобновление точки останова, записывая местоположение отслеживаемого файла в файл, и может гарантировать, что никакие повторяющиеся данные не будут прочитаны.

  • Важные параметры
    тип: тип источника TAILDIR
    positionFile: путь к файлу для сохранения позиции чтения прослушиваемого файла.
    idleTimeout: время задержки закрытия файлов бездействия, если в закрытый файл бездействия добавляется новая запись.
    taildir srouce продолжит открывать файл бездействия, значение по умолчанию — 120000 миллисекунд.
    writePosInterval: интервал времени для записи прочитанной позиции файла в файл, который сохраняет прочитанную позицию, значение по умолчанию 3000 мс
    batchSize: максимальное количество событий, записываемых в канал пакетами, значение по умолчанию — 100.
    maxBackoffSleep: максимальное время задержки, в течение которого последние данные прослушиваемого файла не будут получены при последней попытке, значение по умолчанию — 5000 миллисекунд.
    cachePatternMatching: в отслеживаемой папке может быть много файлов, соответствующих регулярным выражениям. Добавление списка успешно сопоставленных отслеживаемых файлов и порядка чтения списка файлов в кеш может повысить производительность. Значение по умолчанию — true.
    fileHeader : добавлять ли абсолютный путь к файлу в заголовок события, значение по умолчанию — false
    fileHeaderKey: значение ключа, добавляемое к абсолютному пути файла в заголовке события, значение по умолчанию — файл
    файловые группы: список отслеживаемых файловых групп, taildirsource отслеживает несколько каталогов или файлов через файловые группы.
    filegroups.: путь регулярного выражения к файлу или прослушивание указанного пути к файлу
    каналы: название канала, к которому подключен источник
  • Настройте источник taildir, конкретное содержимое файла конфигурации taildirsource.conf выглядит следующим образом:
taildiragent.sources=r1
taildiragent.channels=c1
taildiragent.sinks=k1
taildiragent.sources.r1.type=TAILDIR
taildiragent.sources.r1.positionFile=/home/hadoop/apps/flume/taildir/position/taildir_position.json
taildiragent.sources.r1.filegroups=f1 f2
taildiragent.sources.r1.filegroups.f1=/home/hadoop/apps/flume/taildir/test1/test.log
taildiragent.sources.r1.filegroups.f2=/home/hadoop/apps/flume/taildir/test2/.*log.*
taildiragent.sources.r1.channels=c1
taildiragent.channels.c1.type=memory
taildiragent.channels.c1.transcationCapacity=1000
taildiragent.sinks.k1.type=logger
taildiragent.sinks.k1.channel=c1

Запустите агент taildirSource, код выглядит следующим образом:

[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file taildirsource.conf --name taildiragent -Dflume.root.logger=INFO,console

Начните записывать файлы в папки test1 и test2 и наблюдайте за приемом сообщений агента.

6. Компонент канала

  • Канал: Канал разработан как промежуточная область передачи событий, в которой хранятся события, собранные источником и не потребляемые приемником.Чтобы сбалансировать скорость сбора данных источника и чтения данных приемника, его можно рассматривать как очередь сообщений внутри Flume.
  • Канал является потокобезопасным и транзакционным, поддерживая такие операции, как сбой записи источника, повторная запись и сбой чтения приемника, повторное чтение и т. д.
  • Обычно используемые типы каналов: канал памяти, файловый канал, Канал Kafka, канал JDBC и т. д.

7. Компонент канала — канал памяти

  • Канал памяти: используя память в качестве канала, канал памяти имеет высокую скорость чтения и записи, но объем хранимых данных мал.Процесс Flume зависает, выключение или перезапуск сервера приведет к потере данных. Его можно использовать в сценариях, когда онлайн-сервер, на котором развернут Flume Agent, имеет достаточные ресурсы памяти и не заботится о потере данных. ключевой параметр:
type :channel类型memory
capacity :channel中存储的最大event数,默认值100
transactionCapacity :一次事务中写入和读取的event最大数,默认值100。
keep-alive:在Channel中写入或读取event等待完成的超时时间,默认值3秒
byteCapacityBufferPercentage:缓冲空间占Channel容量(byteCapacity)的百分比,为event中的头信息保留了空间,默认值20(单位百分比)
byteCapacity :Channel占用内存的最大容量,默认值为Flume堆内存的80%

8. Компонент канала — файловый канал

  • Файловый канал: запись событий в файлы на диске.По сравнению с каналом памяти емкость хранилища больше, и нет риска потери данных.
  • Путь к хранилищу данных File Channel позволяет настроить многодисковые пути к файлам для повышения производительности записи файлов.
  • Flume записывает последовательность событий в конец файла File Channel и передает ее через файл конфигурации. Установите верхний предел размера файла данных, задав параметр maxFileSize.
  • Когда событие в закрытом файле данных только для чтения полностью прочитано и приемник зафиксировал завершенную транзакцию чтения, Flume удалит файл данных, в котором хранится файл данных.
  • Установив контрольные точки и контрольные точки резервного копирования, данные в файловом канале можно быстро последовательно воспроизвести в памяти после перезапуска агента. Ключевые параметры следующие:
 type:channel类型为file 
 checkpointDir:检查点目录,默认在启动flume用户目录下创建,建 议单独配置磁盘路径 
 useDualCheckpoints:是否开启备份检查点,默认false,建议设置为true开启备份检查点,备份检查点的作用是当Agent意外出错导致写 入检查点文件异常,在重新启动File  Channel时通过备份检查点将数据回放到内存中,如果不开启备份检查点,在数据回放的过程中发现检查点文件异常会对所数据进行全回放,全回放的过程相当耗时 
 backupCheckpointDir:备份检查点目录,最好不要和检查点目录在同 一块磁盘上 
 checkpointInterval:每次写检查点的时间间隔,默认值30000毫秒 
 dataDirs:数据文件磁盘存储路径,建议配置多块盘的多个路径,通过磁盘的并行写入来提高file channel性能,多个磁盘路径用逗号隔开
 transactionCapacity:一次事务中写入和读取的event最大数,默认值 10000
 maxFileSize:每个数据文件的最大大小,默认值:2146435071字节
 minimumRequiredSpace:磁盘路径最小剩余空间,如果磁盘剩余空 间小于设置值,则不再写入数据
 capacity:file channel可容纳的最大event数
 keep-alive:在Channel中写入或读取event等待完成的超时时间,默认值3秒

Чтобы настроить FileChannel, содержимое конфигурации filechannel.conf выглядит следующим образом:

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /home/hadoop/apps/flume/filechannel/data
a1.channels.c1.checkpointDir = /home/hadoop/apps/flume/filechannel/checkpoint 
a1.channels.c1.useDualCheckpoints = true
a1.channels.c1.backupCheckpointDir = /home/hadoop/apps/flume/filechannel/backup
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Чтобы запустить FileChannel, команда запуска выглядит следующим образом:

[root@hadoop01 bin]# flume-ng agent --conf conf --conf-file filechannle.conf --name a1 -Dflume.root.logger=INFO,console

Отправьте данные на порт файла конфигурации 44444 и просмотрите запись канала.

telnet localhost asdfasd

На этом этапе вы можете наблюдать за результатами мониторинга печати консоли.

18/02/04 21:15:44 INFO sink.LoggerSink: Event: { headers:{} body: 61 64 66 61 64 66 61 64 66 61 73 66 0D          adfadfadfasf. }
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/hadoop/apps/flume/filechannel/checkpoint/checkpoint, elements to sync = 1
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1517749968978, queueSize: 0, queueHead: 0
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Attempting to back up checkpoint.
18/02/04 21:15:48 INFO file.Serialization: Skipping in_use.lock because it is in excludes set
18/02/04 21:15:48 INFO file.Serialization: Deleted the following files: , checkpoint, checkpoint.meta, inflightputs, inflighttakes.
18/02/04 21:15:48 INFO file.Log: Updated checkpoint for file: /home/hadoop/apps/flume/filechannel/data/log-2 position: 170 logWriteOrderID: 1517749968978
18/02/04 21:15:49 INFO file.EventQueueBackingStoreFile: Checkpoint backup completed.

9. Компонент канала — Kafka Channel

Канал Kafka: использование распределенной очереди сообщений kafka в качестве канала имеет большую емкость хранилища и более высокую отказоустойчивость, чем канал памяти и файловый канал, что компенсирует недостатки двух других каналов.Если производительность Kafka используется разумно, она может достичь удвоенный результат при половинных усилиях. . Ключевые параметры следующие:

type:Kafka Channel类型org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers:Kafka broker列表,格式为ip1:port1, ip2:port2…,建 议配置多个值提高容错能力,多个值之间用逗号隔开
kafka.topic:topic名称,默认值“flume-channel”
kafka.consumer.group.id:Consumer Group Id,全局唯一
parseAsFlumeEvent:是否以Avro FlumeEvent模式写入到Kafka Channel中,  默认值true,event的header信息与event body都写入到kafka中
pollTimeout:轮询超时时间,默认值500毫秒
kafka.consumer.auto.offset.reset:earliest表示从最早的偏移量开始拉取,latest表示从最新的偏移量开始拉取,none表示如果没有发现该Consumer组之前拉 取的偏移量则抛异常

Чтобы настроить KafkaChannel, содержимое конфигурации kafkachannel.conf выглядит следующим образом:

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092
a1.channels.c1.kafka.topic = flumechannel2
a1.channels.c1.kafka.consumer.group.id = flumecgtest1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Запустите службу kafak и создайте тему kafka с помощью следующих команд:

[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic flumechannel2

Просмотр информации о созданной теме

[root@hadoop03 bin]# ./kafka-topics.sh --list --zookeeper 192.168.43.20:2181
__consumer_offsets
flumechannel2
topicnewtest1

Запустите агент kafka и используйте telnet для отправки данных

[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file kafkachannel.conf --name a1 -Dflume.root.logger=INFO,console
[root@hadoop01 flume]# clear
[root@hadoop01 flume]# telnet localhost 44444 
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
abc
OK

Информация мониторинга выглядит следующим образом:

18/02/04 21:39:33 INFO sink.LoggerSink: Event: { headers:{} body: 61 62 63 0D                                     abc. }

10. Компонент раковины

  • Приемник: из потребления события канала, вывода во внешнее хранилище или вывода агенту следующего этапа.
  • Прием только из потребительского события в канале
  • Когда приемник успешно записывает событие, он отправляет транзакцию в канал. Транзакция приемника успешно отправлена, и обработанное событие будет удалено каналом. В противном случае канал будет ждать, пока приемник повторно использует неудачное событие.
  • Flume предоставляет богатые компоненты приемников, такие как Avro Sink, HDFS Sink, Kafka Sink, File Roll Sink, HTTP Sink и т. д.

11. Компонент раковины — раковина Avro

  • Avro Sink часто используется для подключения к источнику Avro следующего уровня и отправки события в источник Avro следующего уровня путем отправки запроса RPC.
  • Чтобы уменьшить большое количество сетевых ресурсов, занимаемых передачей событий, Avro Sink обеспечивает сквозную массовую передачу сжатых данных.

Описание ключевых параметров

type:Sink类型为avro。
hostname:绑定的目标Avro Souce主机名称或者IP
port:绑定的目标Avro Souce端口号
batch-size:批量发送Event数,默认值100
compression-type:是否使用压缩,如果使用压缩设则值为
“deflate”, Avro Sink设置了压缩那么Avro Source也应设置相同的 压缩格式,目前支持zlib压缩,默认值none
compression-level:压缩级别,0表示不压缩,从1到9数字越大压缩
效果越好,默认值6

12. Сонки компоненты - раковина HDFS

  • Приемник HDFS записывает события в HDFS для постоянного хранения
  • HDFS Sink предоставляет мощную функцию выхода временной метки в соответствии с информацией заголовка события.
  • Информация о метках времени преобразуется в формат даты и хранится иерархически в HDFS в виде каталога даты.

Информация о ключевом параметре поясняется следующим образом:

type:Sink类型为hdfs。
hdfs.path:HDFS存储路径,支持按日期时间分区。
hdfs.filePrefix:Event输出到HDFS的文件名前缀,默认前缀FlumeData
hdfs.fileSuffix:Event输出到HDFS的文件名后缀
hdfs.inUsePrefix:临时文件名前缀
hdfs.inUseSuffix:临时文件名后缀,默认值.tmp
hdfs.rollInterval:HDFS文件滚动生成时间间隔,默认值30秒,该值设置 为0表示文件不根据时间滚动生成

Настройте файл hdfsink.conf, содержимое конфигурации выглядит следующим образом:

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000 
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /data/flume/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = hdfssink
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.callTimeout = 60000

Запустите агент hdfssink, команда выглядит следующим образом:

[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file hdfssink.conf --name a1 -Dflume.root.logger=INFO,console

Используйте telnet для отправки данных на 44444 и наблюдайте результат записи данных

[hadoop@hadoop01 root]$ telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
abc
OK
2323444
OK

В этот момент консоль печатает и создается временный файл в файловой системе HDFS.

8/02/04 22:41:52 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/02/04 22:41:52 INFO hdfs.BucketWriter: Creating /data/flume/20180204/hdfssink.1517755312242.tmp
18/02/04 22:42:24 INFO hdfs.BucketWriter: Closing /data/flume/20180204/hdfssink.1517755312242.tmp
18/02/04 22:42:24 INFO hdfs.BucketWriter: Renaming /data/flume/20180204/hdfssink.1517755312242.tmp to /data/flume/20180204/hdfssink.1517755312242
18/02/04 22:42:24 INFO hdfs.HDFSEventSink: Writer callback called.

Стоит отметить, что:Пожалуйста, используйте пользователя hadoop для создания агента и отправки сообщения, чтобы избежать записи файла HDFS из-за разрешений.

13. Компоненты - кафка мойка

Flume записывает Event в назначенные Kafka темы через Kafkasink Основные параметры следующие:

 type:Sink类型,值为KafkaSink类路径  org.apache.flume.sink.kafka.KafkaSink。
 kafka.bootstrap.servers:Broker列表,定义格式host:port,多个Broker之间用逗号隔开,可以配置一个也可以配置多个,用于Producer发现集群中的Broker,建议配置多个,防止当个Broker出现问题连接 失败。
 kafka.topic:Kafka中Topic主题名称,默认值flume-topic。
 flumeBatchSize:Producer端单次批量发送的消息条数,该值应该根据实际环境适当调整,增大批量发送消息的条数能够在一定程度上提高性能,但是同时也增加了延迟和Producer端数据丢失的风险。 默认值100。
 kafka.producer.acks:设置Producer端发送消息到Borker是否等待接收Broker返回成功送达信号。0表示Producer发送消息到Broker之后不需要等待Broker返回成功送达的信号,这种方式吞吐量高,但是存 在数据丢失的风险。1表示Broker接收到消息成功写入本地log文件后向Producer返回成功接收的信号,不需要等待所有的Follower全部同步完消息后再做回应,这种方式在数据丢失风险和吞吐量之间做了平衡。all(或者-1)表示Broker接收到Producer的消息成功写入本 地log并且等待所有的Follower成功写入本地log后向Producer返回成功接收的信号,这种方式能够保证消息不丢失,但是性能最差。默 认值1。
 useFlumeEventFormat:默认值false,Kafka Sink只会将Event body内 容发送到Kafka Topic中。如果设置为true,Producer发送到KafkaTopic中的Event将能够保留Producer端头信息

Настройте kafkasink.conf, конкретная конфигурация выглядит следующим образом:

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000 
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.kafka.topic = FlumeKafkaSinkTopic1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1

Запустите узел kafka Broker 22 и узел Broker 23.

[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties 

Создать информацию о теме по профилю

[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic FlumeKafkaSinkTopic1
Created topic "FlumeKafkaSinkTopic1".

Чтобы запустить агент kafkasink, команда запуска выглядит следующим образом:

[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file kafkasink.conf --name a1 >/dev/null 2>&1 &

14.Перехватчик-перехватчик

  • Источник вызывает перехватчик перед записью события в канал.
  • Может быть несколько перехватчиков между источником и каналом, а разные перехватывают разные правила для обработки событий
  • Дополнительные, легкие, подключаемые плагины
  • Внедрение пользовательских перехватчиков путем реализации интерфейса Interceptor
  • Встроенные перехватчики: перехватчик временных меток, перехватчик хоста, перехватчик UUID, статический перехватчик, перехватчик фильтрации регулярных выражений и т. д.

15.Timestamp Interceptor

  • Flume использует перехватчик временных меток для добавления информации временной метки к информации заголовка события, ключ — временная метка, значение — временная метка, когда перехватчик перехватывает событие.
  • Роль временной метки информации заголовка. Например, данные, хранящиеся в HDFS, хранятся во временных разделах. Приемник может записывать событие в HDFS в соответствии с временным разделом в соответствии с временной меткой в ​​информации заголовка события.
  • Описание основных параметров:
    • тип: тип перехватчика - отметка времени
    • saveExisting: если в информации заголовка есть информация о временной метке, следует ли сохранить исходную информацию о временной метке, true для сохранения, false для замены существующей временной метки новой временной меткой, значение по умолчанию — false

16.Host Interceptor

  • Flume использует перехватчик штампа хоста, чтобы добавить имя или IP-адрес хоста в заголовок события.
  • Роль перехватчика хоста: например, Источник записывает Событие в разные Каналы в соответствии с именем хоста, чтобы последующий Приемник мог обрабатывать данные в разных Каналах отдельно
  • Описание основных параметров:
    • тип: тип перехватчика хост
    • saveExisting: если в информации заголовка есть информация о временной метке, следует ли сохранить исходную информацию о временной метке, true для сохранения, false для замены существующей временной метки новой временной меткой, значение по умолчанию — false
    • useIP: использовать ли IP в качестве информации о хосте для записи информации, значение по умолчанию — false.
    • hostHeader: установите ключ информации о хосте в информации заголовка, значение по умолчанию — host

17.Host InterceptorStatic Interceptor

  • Flume использует статический перехватчик для добавления статической информации в заголовок evetn.
  • Описание основных параметров:
  • тип: тип перехватчика статический
    • saveExisting: если в информации заголовка есть информация о временной метке, следует ли сохранить исходную информацию о временной метке, true для сохранения, false для замены существующей временной метки новой временной меткой, значение по умолчанию — false
    • ключ: ключ в информации заголовка
    • значение: значение, соответствующее ключу в информации заголовка

18.Селектор выбора

  • Источник вызывает перехватчик перед записью события в канал.Если перехватчик настроен, селектор вызывается после обработки всех перехватчиков. пройти через Селектор определяет, как события записываются в канал.
  • Встроенный селектор каналов репликации, селектор каналов мультиплексирования, селектор каналов мультиплексирования

19.Replicating Channel Selector

  • Если селектор каналов не указан, по умолчанию используется Реплицирующий селектор каналов. То есть источник записывает событие в несколько каналов одновременно реплицированным образом, и разные приемники могут получать одно и то же событие из разных каналов.
  • Описание основных параметров:
    • selector.type: тип селектора каналов реплицируется
    • selector.Optional: определяет необязательный канал. При сбое записи события в необязательный канал он не выдает исключение для источника и продолжает выполнение. Разделите несколько необязательных каналов пробелами

Источник копирует событие в несколько каналов, использует разные каналы через разные приемники и выводит одно и то же событие в разные места. Файл конфигурации: replicating_selector.conf

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#设置复制选择器
a1.sources.r1.selector.type = replicating
#设置required channel
a1.sources.r1.channels = c1 c2
#设置channel c1
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
#设置channel c2
a1.channels.c2.type = memory 
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 1000
#设置kafka sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = FlumeSelectorTopic1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.23.103:9092
a1.sinks.k1.kafka.flumeBatchSize = 5
a1.sinks.k1.kafka.producer.acks = 1
#设置file sink
a1.sinks.k2.channel = c2
a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /home/hadoop/apps/flume/selector
a1.sinks.k2.sink.rollInterval = 60

Запись в kafka и файл соответственно

Создать тему FlumeKafkaSinkTopic1

bin/kafka-topics.sh --create --zookeeper 192.168.183.100:2181 --replication-factor 1 --partitions 3 --topic FlumeSelectorTopic1

запустить агент потока

bin/flume-ng agent --conf conf --conf-file conf/replicating_selector.conf --name a1

отправить данные через телнет

telnet localhost 44444

См. Данные в / Home / Hadoop / Apps / Plume / Selector Path

Просмотр данных темы kafka FlumeSelectorTopic1

bin/kafka-console-consumer.sh --zookeeper 192.168.183.100:2181 --from-beginning --topic FlumeSelectorTopic1

20.Multiplexing Channel Selector

-Селектор каналов мультиплексирования Селектор мультиплексирования не основан на информации заголовка события Те же данные «ключ-значение», чтобы определить, на какой канал должно быть записано событие.

  • Существует три уровня каналов, а именно обязательный канал, необязательный канал и канал по умолчанию.
  • Описание основных параметров:
selector.type:Channel选择器类型为multiplexing
selector.header:设置头信息中用于检测的headerName
selector.default:默认写入的Channel列表
selector.mapping.*:headerName对应的不同值映射的不同Channel列表
selector.optional:可选写入的Channel列表

Файлы конфигурацииmultiplexing_selector.conf, avro_sink1.conf, avro_sink2.conf, avro_sink3.conf Отправлять данные агенту файла конфигурации, соответствующего разным avro_sink.Различные файлы конфигурации avro_sink записывают разные статические данные в информацию заголовка события через статический перехватчик. Multiplexing_selector отправляется в разные места назначения в соответствии с разными статическими типами данных в информации заголовка события.
multiplexing_selector.conf

a3.sources = r1
a3.channels = c1 c2 c3
a3.sinks = k1 k2 k3
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.183.100
a3.sources.r1.port = 8888
a3.sources.r1.threads= 3
#设置multiplexing selector
a3.sources.r1.selector.type = multiplexing
a3.sources.r1.selector.header = logtype
#通过header中logtype键对应的值来选择不同的sink
a3.sources.r1.selector.mapping.ad = c1
a3.sources.r1.selector.mapping.search = c2
a3.sources.r1.selector.default = c3
a3.sources.r1.channels = c1 c2 c3
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 1000
a3.channels.c2.type = memory
a3.channels.c2.capacity = 10000
a3.channels.c2.transactionCapacity = 1000
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000
#分别设置三个sink的不同输出
a3.sinks.k1.type = file_roll
a3.sinks.k1.channel = c1
a3.sinks.k1.sink.directory = /home/hadoop/apps/flume/multiplexing/k11
a3.sinks.k1.sink.rollInterval = 60
a3.sinks.k2.channel = c2
a3.sinks.k2.type = file_roll
a3.sinks.k2.sink.directory = /home/hadoop/apps/flume/multiplexing/k12
a3.sinks.k2.sink.rollInterval = 60
a3.sinks.k3.channel = c3
a3.sinks.k3.type = file_roll
a3.sinks.k3.sink.directory = /home/hadoop/apps/flume/multiplexing/k13
a3.sinks.k3.sink.rollInterval = 60

avro_sink1.conf

agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = localhost
agent1.sources.r1.port = 44444
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = logtype
agent1.sources.r1.interceptors.i1.value = ad
agent1.sources.r1.interceptors.i1.preserveExisting = false
agent1.sources.r1.channels = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 10000 
agent1.channels.c1.transactionCapacity = 1000
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hostname = 192.168.183.100
agent1.sinks.k1.port = 8888

avro_sink2.conf

agent2.sources = r1
agent2.channels = c1
agent2.sinks = k1
agent2.sources.r1.type = netcat
agent2.sources.r1.bind = localhost
agent2.sources.r1.port = 44445
agent2.sources.r1.interceptors = i1
agent2.sources.r1.interceptors.i1.type = static
agent2.sources.r1.interceptors.i1.key = logtype
agent2.sources.r1.interceptors.i1.value = search
agent2.sources.r1.interceptors.i1.preserveExisting = false
agent2.sources.r1.channels = c1
agent2.channels.c1.type = memory
agent2.channels.c1.capacity = 10000 
agent2.channels.c1.transactionCapacity = 1000
agent2.sinks.k1.type = avro
agent2.sinks.k1.channel = c1
agent2.sinks.k1.hostname = 192.168.183.100
agent2.sinks.k1.port = 8888

avro_sink3.conf

agent3.sources = r1
agent3.channels = c1
agent3.sinks = k1
agent3.sources.r1.type = netcat
agent3.sources.r1.bind = localhost
agent3.sources.r1.port = 44446
agent3.sources.r1.interceptors = i1
agent3.sources.r1.interceptors.i1.type = static
agent3.sources.r1.interceptors.i1.key = logtype
agent3.sources.r1.interceptors.i1.value = other
agent3.sources.r1.interceptors.i1.preserveExisting = false
agent3.sources.r1.channels = c1
agent3.channels.c1.type = memory
agent3.channels.c1.capacity = 10000 
agent3.channels.c1.transactionCapacity = 1000
agent3.sinks.k1.type = avro
agent3.sinks.k1.channel = c1
agent3.sinks.k1.hostname = 192.168.183.100
agent3.sinks.k1.port = 8888

Создайте каталоги k1 k2 k3 отдельно в каталоге /home/hadoop/apps/flume/multiplexing.

bin/flume-ng agent --conf conf --conf-file conf/multiplexing_selector.conf --name a3 -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf --conf-file conf/avro_sink1.conf --name agent1 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink2.conf --name agent2 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink3.conf --name agent3 >/dev/null 2>&1 &

отправить данные через телнет телнет локальный хост 44444

21.Sink Processor

  • Процессор приемника координирует балансировку нагрузки и аварийное переключение между несколькими приемниками.
  • Процессор приемника по умолчанию имеет только один приемник, нет необходимости создавать процессор приемника.
  • Группа приемников: поместите несколько приемников в группу, требуя, чтобы приемник в группе использовал канал.
  • Процессор приемника балансировки нагрузки round_robin (по умолчанию) или случайный
  • Процессор отказоустойчивого приемника (отказоустойчивый процессор) может определять список приоритетов приемника и выбирать приемник, используемый в соответствии с приоритетом.

22.Load-Balancing Sink Processor

Описание основных параметров:

sinks:sink组内的子Sink,多个子sink之间用空格隔开
processor.type:设置负载均衡类型load_balance
processor.backoff:设置为true时,如果在系统运行过程中执行的Sink失败,会将失败的Sink放进一个冷却池中。默认值false
processor.selector.maxTimeOut:失败sink在冷却池中最大驻留时间,默认值30000ms
processor.selector:负载均衡选择算法,可以使用轮询“round_robin”、随机“random”或者是继承AbstractSinkSelector类的自定义负载均衡实现类

示例

23.Failover Sink Processor

Описание основных параметров:

sinks:sink组内的子Sink,多个子sink之间用空格隔开
processor.type:设置故障转移类型“failover”
processor.priority.<sinkName>:指定Sink组内各子Sink的优先级别,优先级从高到低,数值越大优先级越高
processor.maxpenalty:等待失败的Sink恢复的最长时间,默认值30000毫秒

示例

24. Сценарии аварийного переключения приложений

  • Сценарий распределенного сбора журналов
  • Несколько агентов собирают данные журналов одного типа на разных компьютерах. Для обеспечения высокой доступности используется многоуровневое развертывание. Два или более сборщиков развертываются на уровне сбора журналов. Агент использует Failover SinkProcessor, чтобы понять, что отказ любого один из сборщиков не влияет на лог системы.служба сбора
    示例

Суммировать

总结