В эпоху облачных технологий и волны контейнеризации сбор журналов контейнеров является важной темой, которая кажется незаметной, но ее нельзя игнорировать. Для сбора журналов контейнера наши обычно используемые инструменты — это filebeat и fluentd, которые имеют свои преимущества и недостатки.По сравнению с fluentd на основе ruby, учитывая настраиваемость, мы обычно выбираем filbeat из стека технологии golang в качестве основного агента сбора журналов по умолчанию. .
По сравнению с традиционным методом сбора логов контейнеризированный одиночный узел будет запускать больше сервисов, а нагрузка будет иметь более короткий жизненный цикл, что с большей вероятностью будет оказывать давление на агент сбора логов, хотя filebeat достаточно легковесен и высокопроизводителен, но если вы не понимаете механизма filebeat и необоснованно настраиваете filebeat, это также может принести нам неожиданные неприятности и проблемы в реальной производственной среде.
Общая структура
Функция сбора логов не кажется сложной, основная функция — найти сконфигурированный лог-файл, прочитать и обработать его и отправить на соответствующий бэкэнд, такой как elasticsearch, kafka и т. д.
На официальном сайте filebeat есть принципиальная схема, как показано ниже:
filebeat.inputs:
- type: log
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/*.log
Данные журнала, собранные различными горутинами харвестера, будут отправлены в глобальную очередь очереди.Существует два типа реализации очередей: очереди на основе памяти и очереди на диске.В настоящее время очередь на основе диска все еще находится на стадии альфа-тестирования, а filebeat включено по умолчанию Очередь кеша в памяти.
Всякий раз, когда данные в очереди кэшируются до определенного размера или превышают фиксированное время (по умолчанию 1 с), они будут потребляться зарегистрированным клиентом из очереди и отправляться на настроенный сервер. В настоящее время клиенты, которые можно установить, это kafka, elasticsearch, redis и т. д.
Несмотря на то, что все это выглядит просто, в реальном использовании нам все же нужно рассмотреть больше вопросов, таких как:
- Как filbebeat обнаруживает и собирает файлы журналов?
- Как Filebeat гарантирует, что коллекция журналов будет отправлена в удаленное хранилище без потери части данных?
- Если filebeat не работает, как убедиться, что следующая коллекция начинается с последнего состояния без повторного сбора всех журналов?
- Filebeat использует слишком много памяти или процессора, как это проанализировать и решить?
- Как filebeat поддерживает docker и kubernetes и как настроить сбор журналов при контейнеризации?
- Если я хочу отправлять логи, собранные filebeat, в back-end хранилище, если оно не поддерживается нативно, как настроить разработку?
Все это требует более глубокого понимания filebeat.Давайте проследим за исходным кодом filebeat, чтобы изучить механизм реализации.
Как собирается лог
Исходный код filebeat принадлежит проекту beats, и первоначальной целью проекта beats является сбор всех видов данных, поэтому beats абстрагируется от библиотеки libbeat.На основе libbeat мы можем быстро разработать и внедрить инструмент для сбора данных. к filebeat также есть такие инструменты, как metricbeat., packetbeat и другие официальные проекты, которые также входят в проект beats.
Если мы взглянем на код в целом, мы обнаружим, что libbeat реализовал общие функции, такие как memqueue очереди кэша памяти, несколько клиентов, отправляющих журнал вывода, процессор фильтрации и обработки данных, а filebeat нужно только реализовать чтение файла журнала и другие журналы. Логика функций может быть.
С точки зрения реализации кода, filebeat можно условно разделить на следующие модули:
- ввод: найдите настроенный файл журнала и запустите харвестер
- харвестер: читать файлы и отправлять в спулер
- спулер: кешировать данные журнала до тех пор, пока их нельзя будет отправить издателю
- издатель: отправлять журналы на серверную часть и одновременно уведомлять регистратора
- регистратор: записывать состояние собираемых файлов журнала
1. Найдите файл журнала
Для сбора и управления жизненным циклом файлов журналов filebeat абстрагирует структуру Crawler, После запуска filebeat краулер создается в соответствии с конфигурацией, затем перебирает и запускает каждый ввод:
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig, r.GetStates())
}
В логике каждой операции ввода сначала будет получен соответствующий лог-файл согласно конфигурации.Следует отметить, что метод сопоставления здесь не обычный, а правило linux glob, которое все-таки несколько отличается от обычного.
matches, err := filepath.Glob(path)
После получения всех совпадающих файлов журналов он пройдет сложную фильтрацию, например, если настроеноexclude_files
Такие файлы будут проигнорированы, а также будет запрошен статус файла, если время последней модификации файла превышаетignore_older
конфигурации файл не будет собран.
2. Прочтите файл журнала
После сопоставления файлов журналов, которые необходимо собрать окончательно, filebeat запустит горутин-сборщик для каждого файла, непрерывно читает журнал в горутине и отправляет его в очередь кэша памяти memqueue.
существует(h *Harvester) Run()
В методе мы видим вот такой бесконечный цикл, а код, упускающий часть логики, выглядит следующим образом:
for {
message, err := h.reader.Next()
if err != nil {
switch err {
case ErrFileTruncate:
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source)
h.state.Offset = 0
filesTruncated.Add(1)
case ErrRemoved:
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source)
case ErrRenamed:
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source)
case ErrClosed:
logp.Info("Reader was closed: %s. Closing.", h.state.Source)
case io.EOF:
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
case ErrInactive:
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
default:
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
}
return nil
}
...
if !h.sendEvent(data, forwarder) {
return nil
}
}
Видно, что метод reader.Next() будет непрерывно читать журнал, и если исключение не будет возвращено, данные журнала будут отправлены в очередь кеша.
Существует несколько типов возвращаемых исключений.В дополнение к чтению EOF также будут такие ситуации, как бездействие файла в течение определенного периода времени, что приведет к выходу горутины харвестера, прекращению сбора файла и закрытию файла. ручка.
Чтобы Filebeat не занимал слишком много файловых дескрипторов для сбора файлов журналов, по умолчаниюclose_inactive
Параметр — 5 минут, если лог-файл не модифицируется в течение 5 минут, вышеприведенный код войдет в кейс ErrInactive, после чего горутина харвестера будет закрыта.
В этом сценарии следует также отметить, что если файл удаляется из коллекции журналов, но поскольку дескриптор файла в это время поддерживается filebeat, место на диске, занимаемое файлом, будет сохраняться до тех пор, пока горутина харвестера не завершится.
3. Очередь кэша
Когда memqueue инициализируется, filebeat будет настроен в соответствии сmin_event
Будь то значение больше 1 для создания BufferingEventLoop или DirectEventLoop, обычно значением по умолчанию является BufferingEventLoop, то есть буферизованная очередь.
type bufferingEventLoop struct {
broker *Broker
buf *batchBuffer
flushList flushList
eventCount int
minEvents int
maxEvents int
flushTimeout time.Duration
// active broker API channels
events chan pushRequest
get chan getRequest
pubCancel chan producerCancelRequest
// ack handling
acks chan int // ackloop -> eventloop : total number of events ACKed by outputs
schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked
pendingACKs chanList // ordered list of active batches to be send to the ackloop
ackSeq uint // ack batch sequence number to validate ordering
// buffer flush timer state
timer *time.Timer
idleC <-chan time.Time
}
BufferingEventLoop — это структура, реализующая Broker и имеющая различные каналы, которая в основном используется для отправки журналов потребителям для потребления. Метод run BufferingEventLoop также представляет собой бесконечный цикл, который можно рассматривать как центр планирования событий журнала.
for {
select {
case <-broker.done:
return
case req := <-l.events: // producer pushing new event
l.handleInsert(&req)
case req := <-l.get: // consumer asking for next batch
l.handleConsumer(&req)
case count := <-l.acks:
l.handleACK(count)
case <-l.idleC:
l.idleC = nil
l.timer.Stop()
if l.buf.length() > 0 {
l.flushBuffer()
}
}
}
Каждый раз, когда горутина харвестера считывает данные журнала, они в конечном итоге будут отправлены в bufferingEventLoop.events chan pushRequest
канал, затем активируйте вышеуказанныйreq := <-l.events
В этом случае метод handleInsert добавит данные в буфер буферизацииEventLoop, buf — это очередь, в которой memqueue фактически кэширует данные журнала, если длина буфера превышает настроенное максимальное значение или срабатывает таймер в буфере событийвентлупа.case <-l.idleC
, будет вызван метод flushBuffer().
flushBuffer() сработает сноваreq := <-l.get
case, а затем запустите метод handleConsumer, наиболее важной частью этого метода является следующий код:
req.resp <- getResponse{ackChan, events}
Здесь получается канал ответа потребителя-потребителя, а затем в этот канал отправляются данные. Когда это действительно будет достигнуто, потребление memqueue потребителем будет запущено. Таким образом, на самом деле memqueue потребляется потребителем не постоянно, а потребляется, когда memqueue уведомляет потребителя, что можно понимать как импульсную передачу.
4. Очередь потребления
На самом деле, уже при инициализации filebeat был создан eventConsumer, который пытался получить данные журнала от Broker in the loop методом бесконечного цикла.
for {
if !paused && c.out != nil && consumer != nil && batch == nil {
out = c.out.workQueue
queueBatch, err := consumer.Get(c.out.batchSize)
...
batch = newBatch(c.ctx, queueBatch, c.out.timeToLive)
}
...
select {
case <-c.done:
return
case sig := <-c.sig:
handleSignal(sig)
case out <- batch:
batch = nil
}
}
Вышеупомянутый потребитель.Get заключается в том, что потребитель-потребитель получает данные журнала от брокера, а затем отправляет их в выходной канал для отправки выходным клиентом.Давайте посмотрим на основной код в методе Get:
select {
case c.broker.requests <- getRequest{sz: sz, resp: c.resp}:
case <-c.done:
return nil, io.EOF
}
// if request has been send, we do have to wait for a response
resp := <-c.resp
return &batch{
consumer: c,
events: resp.buf,
ack: resp.ack,
state: batchActive,
}, nil
Структура getRequest выглядит следующим образом:
type getRequest struct {
sz int // request sz events from the broker
resp chan getResponse // channel to send response to
}
Структура getResponse:
type getResponse struct {
ack *ackChan
buf []publisher.Event
}
getResponse содержит данные журнала, а getRequest содержит канал для отправки потребителям.
Параметр, полученный в методе handleConsumer буферной очереди bufferingEventLoop выше, — это getRequest, который содержит канал getResponse, запрошенный потребителем.
Если handleConsumer не отправляет данные, метод Consumer.Get всегда будет блокироваться в select, а канал getResponse потребителя не будет получать данные журнала до сброса Buffer.
5. Отправить журналы
При создании битов будет создан clientWorker.В методе run clientWorker данные журнала будут непрерывно считываться из канала, отправленного потребителем, а затем будет вызываться client.Publish для отправки журналов пакетами.
func (w *clientWorker) run() {
for !w.closed.Load() {
for batch := range w.qu {
if err := w.client.Publish(batch); err != nil {
return
}
}
}
}
Библиотека libbeats содержит несколько клиентов, таких как kafka, elasticsearch и logstash, каждый из которых реализует клиентский интерфейс:
type Client interface {
Close() error
Publish(publisher.Batch) error
String() string
}
Конечно, самое главное — реализовать интерфейс Publish, а затем отправить лог.
На самом деле проектирование потока лог-данных в различных каналах в filebeat относительно сложно и громоздко, автор тоже долго изучал и нарисовал длинную архитектурную схему, чтобы понять логику.
Вот упрощенная схема для справки:
Как гарантировать хотя бы раз
filebeat поддерживает файл реестра на локальном диске, в котором хранится состояние всех собранных файлов журнала.
Фактически, каждый раз, когда данные журнала успешно отправляются на серверную часть, будет возвращено событие подтверждения. filebeat запускает независимую сопрограмму реестра, отвечающую за отслеживание события.После получения события подтверждения он обновит состояние файла журнала до файла реестра.Смещение в состоянии представляет смещение прочитанного файла, поэтому filebeat гарантирует, что данные журнала перед записью смещения должны быть получены внутренним хранилищем журнала.
Структура государства выглядит следующим образом:
type State struct {
Id string `json:"-"` // local unique id to make comparison more efficient
Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
Meta map[string]string `json:"meta"`
FileStateOS file.StateOS
}
Данные, записанные в файле реестра, примерно следующие:
[{"source":"/tmp/aa.log","offset":48,"timestamp":"2019-07-03T13:54:01.298995+08:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":7048952,"device":16777220}}]
Поскольку файлы могут быть переименованы или перемещены, Filebeat идентифицирует каждый файл журнала на основе его индексного дескриптора и номера устройства.
Если filebeat перезапускается ненормально, файл реестра будет считываться каждый раз при запуске сборщика сбора, и сбор будет продолжаться с последнего записанного состояния, чтобы гарантировать, что все файлы журнала не будут повторно отправлены с самого начала.
Конечно, если filebeat зависнет до возврата ack в процессе отправки лога, файл реестра точно не обновится до последнего состояния, то при следующем сборе лога эта часть лога будет отправляться повторно, так что значит filebeat может гарантировать только один раз и не может гарантировать, что он не будет отправлен повторно.
Другая необычная ситуация заключается в том, что если старый файл удалить под linux и сразу же создать новый файл, то очень вероятно, что они имеют один и тот же индекс, а поскольку filebeat помечает смещение коллекции файловых записей в соответствии с индексом, он будет причина в реестре Запись фактически является состоянием удаленного файла, поэтому новая коллекция файлов начинается со смещения старого файла, в котором будут пропущены данные журнала.
Чтобы максимально избежать повторного использования inode и предотвратить увеличение размера файла реестра с течением времени, рекомендуется использовать конфигурации clean_inactive и clean_remove для удаления состояния файлов, которые не были обновлены или удалены для давно из реестра.
В то же время мы можем обнаружить, что в журнале чтения харвестера состояние реестра будет обновлено для обработки некоторых нештатных ситуаций. Например, если файл журнала очищен, filebeat вернет исключение ErrFileTruncate в следующем методе Reader.Next, установит Offset файла флага inode равным 0, завершит этот сборщик и перезапустит новый сборщик, хотя файл остается неизменным. , смещение реестра в 0 равно 0, и сбор начнется с самого начала.
Важно отметить, что если вы используете контейнер для развертывания filebeat, вам необходимо смонтировать файл реестра на хосте, иначе файл реестра будет потерян после перезапуска контейнера, что приведет к тому, что filebeat будет повторно собирать файлы журналов с хоста. начало.
обновление автоматической перезагрузки filebeat
В настоящее время filebeat поддерживает конфигурацию ввода перезагрузки и конфигурацию модуля, но механизм перезагрузки обновляется только регулярно.
После включения reload.enable в конфигурации вы также можете настроить reload.period, чтобы указать временной интервал для конфигурации автоматической перезагрузки.
Когда filebeat запускается, он создает сопрограмму, предназначенную для перезагрузки. Для каждого работающего харвестера filebeat добавит его в глобальный список Runner.После каждого временного интервала он будет запускать diff-оценку файла конфигурации.Если его нужно остановить, добавьте его в список stopRunner, а затем закройте его по одному. Добавьте в список startRunner и запустите новый Runner.
поддержка файловых битов для kubernetes
Официальная документация filebeat предоставляет метод развертывания на основе набора демонов в Kubernetes.Основная конфигурация выглядит следующим образом:
- type: docker
containers.ids:
- "*"
processors:
- add_kubernetes_metadata:
in_cluster: true
То есть установите входной ввод на тип докера. Поскольку стандартные журналы вывода всех контейнеров по умолчанию находятся в/var/lib/docker/containers/<containerId>/*-json.log
путь, поэтому этот тип файла журнала по существу собирается.
Отличие от традиционного метода развертывания заключается в том, что если служба развернута в kubernetes, измерение просмотра и извлечения журналов не может быть ограничено узлами и службами, но также должно иметь podName, containerName и т. д., поэтому нам нужно пометить каждый log.Метаданные kubernetes добавляются в бэкенд.
Когда в конфигурацию будет добавлен обработчик add_kubernetes_metadata, filebeat запустит сторожевой сервис, который слушает kubernetes, отслеживает изменения всех подов kubernetes, а затем синхронизирует последние события подов, принадлежащих этому узлу, с локальным кешем.
После того, как контейнер будет уничтожен и создан на узле, в каталоге /var/lib/docker/containers/ произойдут изменения. containerId, чтобы получить podName, Label и другие данные и добавить их в поля метаинформации журнала.
В Filebeat также есть бета-версия функции автообнаружения, предназначенной для централизованного управления файлами конфигурации filebeat, разбросанными по разным узлам. В настоящее время kubernetes также поддерживается как провайдер, который, по сути, отслеживает события kubernetes, а затем собирает стандартные выходные файлы docker.
Общая структура выглядит следующим образом:
Однако в реальной производственной среде недостаточно собирать только стандартные журналы вывода контейнера. Нам часто нужно собирать настраиваемый каталог журналов, смонтированный контейнером, а также контролировать метод сбора журналов каждой службы и выполнять дополнительную настройку. , функция.
В Qingzhou Container Cloud мы разработали агент, который отслеживает события kubernetes и автоматически генерирует конфигурации файлов.Через CRD он поддерживает такие функции, как настройка внутреннего каталога журналов контейнера, поддержка настраиваемых полей и поддержка многострочного чтения. В то же время в kubernetes можно единообразно управлять различными конфигурациями журналов, а генерация и обновление конфигурации журналов в различных сценариях могут выполняться автоматически без необходимости для пользователей воспринимать создание, уничтожение и миграцию модулей.
Анализ производительности и настройка
Хотя серия beats в основном легкая, хотя использование памяти файловым битом, написанным на golang, действительно намного лучше, чем у logstash на основе jvm, факты говорят нам, что это не так просто.
Запуск filebeat обычно занимает только 3 или 40 МБ памяти, но иногда в облаке контейнеров Цинчжоу мы также обнаружим, что использование памяти контейнера filebeat на некоторых узлах превышает настроенный предел ограничения пода (обычно установлен на 200 МБ), и Постоянно срабатывает OOM.
По этой причине количество контейнеров, работающих в общем контейнере, особенно количество контейнеров, работающих на голом железе, может быть больше, что приводит к большому количеству комбайнов для сбора бревен. Если нет хорошего файла конфигурации, будет большая вероятность, что память резко поднимется.
Конечно, большая часть памяти filebeat по-прежнему принадлежит memqueue, все собранные логи будут сначала отправляться в memqueue для агрегации, а затем отправляться через output. Данные каждого лога собираются в структуру событий в filebeat.Количество событий в кеше memqueue, настроенном filebeat по умолчанию, равно 4096.queue.mem.events
настраивать. По умолчанию размер самого большого журнала событий ограничен 10 МБ.max_bytes
настраивать.4096 * 10MB = 40GB
, вполне возможно, что в крайних случаях filebeat занимает не менее 40 ГБ памяти. Особенно при настройке многострочного многострочного режима, если многострочная конфигурация неверна, одно событие ошибочно собирается как данные тысяч журналов, что может привести к тому, что memqueue займет большой объем памяти, что приведет к взрыву памяти.
Поэтому необходимо разумно настраивать правила сопоставления лог-файлов, ограничивать размер однострочных логов и настраивать количество кешей memqueue в соответствии с реальной ситуацией, чтобы избежать проблемы чрезмерного использования памяти файловыми битами в фактическое использование.
Как продлить разработку filebeat
В общем, filebeat может удовлетворить большинство требований к сбору журналов, но все еще неизбежно, что некоторые специальные сценарии требуют от нас настройки разработки filebeat.Конечно, сам дизайн filebeat также обеспечивает хорошую масштабируемость.
В настоящее время Beats предоставляет только несколько типов клиентов вывода, таких как elasticsearch, kafka, logstash и т. д. Если мы хотим отправлять filebeat непосредственно на другие серверные части, нам нужно настроить собственный вывод. Точно так же, если вам нужно отфильтровать журналы или добавить метаинформацию, вы также можете создать собственный плагин процессора.
Будь то добавление вывода или запись процессора, общая идея, предоставляемая filebeat, в основном одна и та же. Вообще говоря, есть 3 способа:
- Прямой форк filebeat и разработка на существующем исходном коде. И вывод, и процессор предоставляют интерфейсы, аналогичные Run, Stop и т. д. Вам нужно только реализовать этот тип интерфейса, а затем зарегистрировать соответствующий метод инициализации плагина в методе init. Конечно, поскольку метод init в golang вызывается при импорте пакета, его нужно импортировать вручную в код, инициализирующий filebeat.
- Скопируйте копию main.go filebeat, импортируйте нашу собственную библиотеку плагинов, а затем перекомпилируйте. По сути, он мало чем отличается от метода 1.
- Также filebeat предоставляет механизм плагинов на основе плагина golang, вам нужно скомпилировать самостоятельно разработанный плагин в общую библиотеку ссылок .so, а затем указать путь к библиотеке через -plugin в параметре запуска filebeat. Однако на самом деле, с одной стороны, плагин golang недостаточно зрелый и стабильный, с другой стороны, самостоятельно разработанный плагин все еще должен опираться на ту же версию библиотеки libbeat, а также должен быть скомпилирован с та же версия golang, ям может быть больше, что не рекомендуется.