Логи, собранные Meitu, нужно очищать и регулировать через программу ETL, а затем высаживать на HDFS/Hive персистентно, что удобно для последующего унифицированного анализа и обработки.
Что такое ЭТЛ?
ETL расшифровывается как Extract-Transform-Load, который используется для описания передачи данных из источника.извлекать(извлекать),конвертировать(преобразование),нагрузка(загрузить) процесс в место назначения. Термин ETL чаще используется в хранилищах данных, но его объекты не ограничиваются хранилищами данных.
В уникальной бизнес-среде Meitu ETL должна соответствовать следующим требованиям:
1. Большой объем данных, эффективная очистка и посадка.Meitu имеет множество предприятий, большую базу пользователей и огромное количество данных.Кроме того, бизнес-сторона надеется, что данные можно будет быстро запросить после сбора данных.
2. Гибкая конфигурация для работы с различными форматами данных.Из-за постоянного доступа к новым услугам, когда есть доступ к данным новых деловых сторон, он должен быть гибким и общим, а новые бизнес-данные могут быть очищены и реализованы путем добавления информации о конфигурации; в то же время форматы данных каждой деловой стороны различны, ETL должна быть совместима с различными распространенными форматами данных для удовлетворения потребностей различных предприятий (например, json, avro, DelimiterText и т. д.).
3. Ограничения и нормы.Он должен соответствовать спецификациям хранилища базы данных, и данные размещаются в соответствии с разными уровнями (уровень STG, уровень ODS и т. д.), разными библиотеками (default.db, meipai.db и т. д.) и разными разделами (разделы времени). необходимо указать).
4. Отказоустойчивость.Учитывая, что в сборе бизнес-журнала могут быть некоторые грязные данные, при достижении определенного порога необходимо выдавать сигнал тревоги; и могут возникнуть различные условия, такие как сбой кластера Hadoop и сбой Kafka, поэтому необходимо поддерживать повторный запуск и восстановление данных.
ETL существует в двух формах: Live Streaming ETL и Offline ETL.
Как показано ниже,Прямая трансляция ETLОбычно есть две формы: одна — собирать журналы на стороне сервера через Flume, а затем напрямую передавать через HDFS; другая — сначала собирать данные в Kafka, а затем попадать в HDFS через потоковую передачу Storm или Spark и потоковую передачу в реальном времени. ETL в случае сбоя Восстановление воспроизведения затруднено. В настоящее время Meitu использует потоковую ETL только в реальном времени для вставки и очистки данных.
Согласно структуре Lambda, в случае сбоя ETL потокового вещанияАвтономный ETLбыть исправленным. Автономный ETL должен извлекать сообщения из Kafka, а затем отправлять их из HDFS после ETL. Чтобы повысить производительность в режиме реального времени и уменьшить нагрузку на данные, автономный ETL запланирован на 05 минут в час для очистки данных за предыдущий час. Чтобы уменьшить нагрузку на NameNode HDFS и уменьшить размер небольших файлов, данные той же темы и раздела в разделе даты добавляются в один и тот же файл журнала.
Архитектурный дизайн и принцип реализации Offline ETL
Offline ETL использует фреймворк MapReduce для обработки и очистки данных различных предприятий, в основном используя идею «разделяй и властвуй», которая может горизонтально расширить возможности очистки данных;
Как показано на рисунке выше, автономный ETL разделен на три модуля:
Ввод (Формат ввода): он в основном анализирует и сегментирует источник данных (данные Kafka) и распределяет его между различными процессами Map для обработки в соответствии с определенной стратегией; создает RecordReader, который используется для чтения и анализа сегментированных данных, генерации значения ключа и передачи его для последующей обработки.
Карта (картограф): Обработка данных типа "ключ-значение".
Выход (формат вывода): Создайте RecordWriter для обработки обработанных данных типа "ключ-значение" в соответствии с библиотекой, таблицей и разделом; наконец, проверьте целостность обработки сообщения на этапе фиксации.
Автономный рабочий процесс ETL
На приведенном выше рисунке показан основной рабочий процесс автономного ETL:
1. kafka-etl абстрагирует общедоступную информацию о конфигурации в процессе очистки бизнес-данных в схему etl, представляющую разные данные каждого бизнеса;
2. Когда kafka-etl запускается, информация о теме и схеме бизнес-данных, подлежащих обработке, будет извлечена из zookeeper;
3. kafka-etl получает данные смещения (beginOffset, endOffset), которые будут использоваться для каждых бизнес-данных по теме и разделу, и сохраняет их в mysql;
4. kafka-etl абстрагирует информацию смещения нужного на этот раз топика&partition в kafkaEvent, а затем шардирует эти kafkaEvents по определенной стратегии, то есть каждый маппер обрабатывает часть kafkaEvent;
5. RecordReader будет использовать эту информацию о смещении, анализировать и декодировать в данные типа «ключ-значение» и передавать их последующей обработке очистки;
6. Очищенный ключ-значение унифицируется с помощью данных RecordWriter и помещается в HDFS.
Модульная реализация Offline ETL
Разделение данных (разделение)
Получаем наибольшее смещение текущей темы и раздела от kafka и отсечное смещение последнего потребления, формируем [beginOffset, endOffset] kafkaEvent, который будет потребляться на этот раз, kafkaEvent будет разбросан по каждому мапперу для обработки, и, наконец, эта информация о смещении будет сохранена в таблице mysql.
Так как же убедиться, что данные не искажены? Сначала настройте количество пользовательских картографов и создайте соответствующее количество ETLSplits. Поскольку kafkaEevent содержит смещение, используемое перед отдельной темой и разделом, и максимальное смещение, которое необходимо использовать, можно получить общее количество сообщений, которые необходимо использовать для каждого kafkaEvent. Наконец, все kafkaEevents просматриваются, и текущий kafkaEevent добавляется к текущему минимальному ETLSplit (который можно получить, сравнивая общий объем потребляемых данных).Сгенерированный таким образом ETLSplit может максимально обеспечить баланс данных.
Анализ и очистка данных (чтение)
Как показано на рисунке выше, во-первых, у каждого сегмента будет соответствующий объект RecordReader для анализа, а RecordReade содержит несколько объектов KafkaConsumerReader, которые должны потреблять каждое событие KafkaEevent. Каждое KafkaEevent будет соответствовать KafkaConsumer, который необходимо декодировать и десериализовать после извлечения сообщения байтовых данных, которое включает структуру MessageDecoder. В настоящее время MessageDecoder поддерживает три формата:
Формат |
вовлекающие темы |
---|---|
Avro |
андроид, ios, ad_sdk_android... |
Json |
сервер приложений для каждой строки, защита от спама... |
DelimiterText |
сервер-приложений-правый глаз, сервер-приложений-правый глаз-IM... |
Когда MessageDecoder получает ключ и значение Kafka, он десериализует их и, наконец, генерирует ETLKey и ETLValue. В то же время MessageDecoder содержит Injector, который в основном делает следующие вещи:
Ввести помощь: для данных журнала, собранных агентом arachnia, проанализируйте уникальный идентификатор журнала внедрения KafkaKey Aid;
Внедрить информацию GeoIP: анализировать информацию об IP-адресе в соответствии с GeoIP и вводить географическую информацию (например, country_id, провинция_id, город_id);
Внедрить SdkDeviceInfo: ETL потоковой передачи в реальном времени сам вводит информацию, такую как gid, is_app_new и т. д., но автономный ETL проверяет полноту информации для дальнейшей защиты.
В процессе также задействован DebugFilter, который фильтрует журналы устройства отладки SDK и не попадает в HDFS.
Многофайловый лендинг (Запись)
Поскольку RecordWriter MapReduce сам по себе не поддерживает единую посадку нескольких файлов, требуется специальная обработка, а файлы HDFS не поддерживают записи и добавления нескольких процессов (потоков), поэтому мыKafkaKey+ бизнес-раздел + раздел времени + раздел KafkaОпределите уникальный файл, каждый файл будет содержать информацию о разделе kafka. Создайте RecordWriter для каждого файла одновременно.
Как показано на рисунке выше, каждый RecordWriter содержит несколько модулей записи, и каждый модуль записи соответствует файлу, что позволяет избежать многопоточного чтения и записи одного и того же файла. В настоящее время количество писателей поддерживается через кеш guava.Если писателей слишком много или слишком долго нет доступа для записи, будет запущено действие закрытия, и будет создана следующая партия сообщений kafka с соответствующими каталогами. и автор будет добавлен. Таким образом, мы можем записывать и добавлять несколько файлов в одну и ту же карту.
Проверка целостности потребления данных (фиксация)
Счетчик MapReduce предоставляет нам окно для наблюдения и подсчета различных деталей периода выполнения задания MapReduce. И он поставляется со многими счетчиками по умолчанию, которые могут определить, полностью ли израсходованы данные:
reader_records: количество успешно проанализированных сообщений;
decode_records_error: количество сообщений, которые не удалось проанализировать;
author_records: количество успешно написанных сообщений;
...
Наконец, подтвердите, завершено ли потребление сообщения, проверив, согласованы ли количество смещений темы, read_records и write_records, которые будут использованы на этот раз.
* Разрешить определенный процент грязных данных, если предел превышен, будет сгенерировано SMS-предупреждение
Основные характеристики системы ETL
Повторный запуск данных и их оптимизация
Как ETL реализует сбор и оптимизацию данных? Сначала разберитесь со сценариями, которые необходимо повторить:
* Существует три этапа, когда пользователь вызывает приложение kill: 1) убить SIGTERM(-15) pid; 2) заснуть на 250 мс; 3) убить SIGKILL(-9) pid.
Итак, каковы способы повторного запуска?
На следующем рисунке показан общий процесс третьего метода повторного запуска. ETL запланирован на ежечасной основе. Во-первых, данные записываются во временный каталог на ежечасной основе. текущего часа будет перезапущен. Если посадка прошла успешно, она будет объединена с целевым файлом каталога хранилища.Если слияние не удалось, он также будет уведомлен сигналом тревоги и повторно запустится вручную, чтобы объединить небольшие файлы в целевой файл.
Оптимизированный анализ ситуации повторного запуска показан в следующей таблице:
Автоматическое горизонтальное масштабирование
Теперь в автономном режиме Kafka-ETL запланирован на 05 минут в час.Каждый запланированный ETL будет получать текущее последнее и самое большое последнее смещение каждой темы и раздела и объединять его со смещением отсечки, использованным в предыдущем часе, для использования kafkaEvent, которое будет использовано. локально. Поскольку последнее смещение, полученное каждый раз, является неконтролируемым, в некоторых случаях смещение сообщений некоторых топиков и разделов растет очень быстро, и количество разделов топика кафки не может быть скорректировано вовремя, что приводит к задержке обработки потребления ETL и влияет на нижестоящий поток бизнес-процессов:
Из-за расширения, сбоя и других причин необходимо восполнить недостающие данные или исторические данные.В этом случае смещение сообщений раздела темы и& растет очень быстро, и полагаться только на раздел темы kafka для расширения ненадежно. перегородка;
В пиковые периоды пользовательского трафика, такие как пиковые выходные, праздничные дни, 6.18 и Double Eleven, собранные данные о поведении пользователей будут в несколько или десятки раз выше, чем обычно.
Может ли Kafka ETL автоматически масштабироваться по горизонтали, не сильно зависит от количества разделов темы kafka. Если данные, которые должны обрабатываться темой kafkaEvent, слишком велики, оцените максимальное количество элементов, которые один преобразователь может использовать в течение разумного диапазона времени, а затем разделите kafkaEvent по горизонтали на несколько подчиненных kafkaEvents и назначьте их каждому преобразователю для обработка, чтобы избежать одного маппера.Маппер должен обрабатывать слишком большие kafkaEvents за раз, что приводит к задержкам и улучшает возможность масштабирования по горизонтали. Логика разделения показана на следующем рисунке:
В будущем мы оптимизируем автоматическое горизонтальное расширение для следующих двух точек:
Если общий объем данных сообщений, обрабатываемых одним преобразователем, относительно велик, он рассмотрит возможность увеличения числа преобразователей и создания сегментов для балансировки нагрузки.
Скорость обработки сообщений у каждого формата разная, и у некоторых мапперов может быть большая нагрузка при распределении, для каждого формата будет настроен определенный вес, и kafkaEvent будет распределяться в соответствии с количеством сообщений и весами.