Как использовать Logstash для синхронизации данных между реляционными базами данных и ElasticSearch

Elasticsearch

Автор: Алекс Марквардт | Адресhow-to-keep-elastichsearch-synchronzied-with-a-relational-database-using-logstash

Предисловие переводчика

Основная работа в последнее время заключается в добавлении функции поиска в приложение компании. Поскольку я также столкнулся с проблемой синхронизации данных в реляционной базе данных с ElasticSearch, мне потребовалось некоторое время, чтобы перевести этот официальный пост в блоге. В последнее время появились мысли о синхронизации данных.

В этой статье основное внимание уделяется не использованию подключаемого модуля Logstash JDBC, а тому, как работать с некоторыми деталями синхронизации данных. На мой взгляд, эти дизайнерские идеи универсальны, независимо от того, какой метод вы используете для синхронизации данных.

перевести текст


Чтобы воспользоваться мощными поисковыми возможностями ElasticSearch, большинство компаний развертывают ElasticSearch на основе реляционных баз данных. В таких сценариях необходимо поддерживать синхронизацию данных между ElasticSearch и реляционными базами данных.

В этом сообщении блога рассказывается, как эффективно реплицировать и синхронизировать данные между MySQL и ElasticSearch через Logstash.

Примечание. Код и методы, продемонстрированные в этой статье, были протестированы в MySQL и теоретически применимы ко всем реляционным базам данных.

В этой статье соответствующая информация о компонентах выглядит следующим образом:

  • MySQL: 8.0.16.
  • Elasticsearch: 7.1.1
  • Logstash: 7.1.1
  • Java: 1.8.0_162-b12
  • JDBC input plugin: v4.3.13
  • JDBC connector: Connector/J 8.0.16

Обзор синхронизации данных

В этой статье будут данные ElasticSearch между MySQL и синхронизированы с помощью Logstash входного плагина JDBC. Концептуально подключаемый модуль JDBC для обнаружения новых и обновленных с момента последней итерации данных путем периодического опроса. Для правильной работы необходимо соблюдение нескольких условий:

Параметр _id в ElasticSearch должен исходить из поля id в MySQL. Он обеспечивает сопоставление данных документа между MySQL и ElasticSearch. Если запись в MySQL обновляется, все документы, связанные с ElasticSearch, должны быть переписаны. Чтобы было ясно, переписывание документа в ElasticSearch так же эффективно, как и операция обновления. Внутри операция обновления состоит из удаления старого документа и создания нового документа.

При вставке или обновлении записи в MySQL необходимо включить поле, чтобы сохранить время вставки или обновления поля. Таким образом, Logstash может понять, что каждый запрос обновляет или вставляет только записи с момента последнего опроса. Каждый раз, когда Logstash опрашивает, он сохраняет последнее время вставки или обновления, прочитанное из MySQL, которое больше, чем последнее время последнего опроса.

Если вышеуказанные условия соблюдены, мы можем настроить Logstash для периодического чтения всех недавно обновленных или вставленных записей из MySQL и записи их в Elasticsearch.

Код конфигурации для Logstash будет приведен далее в этой статье.

Настройки MySQL

Библиотека и таблицы MySQL настроены следующим образом:

CREATE DATABASE es_db

USE es_db

DROP TABLE IF EXISTS es_table

CREATE TABLE es_table (
  id BIGINT(20) UNSIGNED NOT NULL,
  PRIMARY KEY (id),
  UNIQUE KEY unique_id (id),
  client_name VARCHAR(32) NOT NULL,
  modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

Есть несколько моментов в конфигурации, которые необходимо объяснить, а именно:

  • es_table, таблица данных MySQL, мы будем синхронизировать ее данные с ElasticSearch;
  • id, уникальный идентификатор записи. Обратите внимание, что когда id определяется как первичный ключ, он также определяется как уникальный, что гарантирует, что каждый id появляется в таблице только один раз. При синхронизации ElasticSearch он будет преобразован в _id документа;
  • client_name, которое представляет пользовательское поле, используемое для сохранения данных. Чтобы сделать сообщение в блоге кратким, мы определяем только одно поле, а дополнительные поля легко добавить. В следующей демонстрации мы обновим это поле, чтобы проиллюстрировать, что не только вновь вставленные записи будут синхронизированы с MySQL, но и обновленные записи также будут синхронизированы с MySQL;
  • modification_time, который используется для сохранения времени обновления или вставки записей, что позволяет Logstash запрашивать только недавно обновленные записи после последнего опроса в каждом опросе;
  • insertion_time, это поле используется для времени вставки записи, в основном для удобства демонстрации, но не обязательно для синхронизации;

операции MySQL

Выполнив предыдущие настройки, мы можем вставлять записи с помощью следующих команд:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);

Обновите запись с помощью следующей команды:

UPDATE es_table SET client_name = <new client name> WHERE id=<id>;

Используйте следующую команду для обновления и вставки записей:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client_name when created>) ON DUPLICATE KEY UPDATE client_name=<client name when updated>

синхронный код

Код конфигурации конвейера Logstash выглядит следующим образом, который реализует описанные выше функции, синхронизацию данных из MySQL в ElasticSearch.

input {
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => "<my username>"
    jdbc_password => "<my password>"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *",
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time)) > :sql_last_value AND modification_time < NOW() ORDER BY modification_time desc"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}

output {
  # stdout { codec => "rubydebug" }
  elasticsearch {
    index => "rdbms_sync_idx"
    document_id => "%{[%metedata][_id]}"
  }
}

Несколько замечаний о конфигурации конвейера:

  • tracking_column

Настроен здесь как "unix_ts_in_secs". Он используется для отслеживания последних записей и хранится в файле .logstash_jdbc_last_run, и следующий опрос будет использовать эту границу для выборки записей. В операторе SELECT значение этого поля конфигурации можно получить через :sql_last_value.

  • unix_ts_in_secs

Генерируется оператором SELECT и представляет собой UNIX TIMESTAMP со значением «modification_time». На него ссылается «track_column», о котором говорилось ранее. Использование UNIX TIMESTAMP, а не других форм времени, снижает сложность и предотвращает несоответствия времени, вызванные часовыми поясами.

  • sql_last_value

Встроенный параметр конфигурации, указывающий начальную позицию каждого опроса. Во входной конфигурации на него можно ссылаться операторами SELECT. Перед началом каждого опроса прочитайте из .logstash_jdbc_last_run самое последнее значение «unix_ts_in_secs». Это гарантирует, что при каждом опросе извлекаются только самые последние вставленные и обновленные записи.

  • schedule

Укажите период выполнения опроса через синтаксис cron, в примере "*/5 * * * * *" означает опрос каждые 5 секунд.

  • modification_time < NOW()

Часть условия запроса оператора SELECT в настоящее время неясна, и подробности будут представлены в следующих главах.

  • filter

Эта конфигурация указывает на копирование идентификатора из MySQL в поле метаданных _id, чтобы гарантировать, что документы в ElasticSearch записываются с правильным _id. Причина, по которой используются метаданные, потому что они временные, не создает новые поля в документе. При этом мы также удалим поля id и @version, которые не хотим писать в Elasticsearch.

  • output

В выходном сегменте OUTPUT мы указываем, что документ должен быть выведен в Elasticsearch, и устанавливаем выходной документ _id на метаданные, созданные сегментом фильтра _ID. Если вам нужно выполнить отладку, можно реализовать RubyDebug части комментариев.

Анализ корректности оператора SELECT

Далее мы начнем объяснять, почему так важно включать в оператор SELECT modification_time

Интуитивная сцена первая

Работает ли это, когда предложение where содержит только UNIX_TIMESTAMP(modification_time) > :sql_last_value, без модификации

statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value) ORDER BY modification_time ASC"

На первый взгляд кажется, что проблем нет, и все должно работать нормально. Но на самом деле здесь есть крайние случаи, которые могут привести к потере некоторых документов. В качестве примера предположим, что MySQL вставляет два документа в секунду, а Logstash выполняется каждые 5 секунд. Как показано на рисунке ниже, временной диапазон от T0 до T10, записи данных от R1 до R22.

Первый опрос Logstash происходит в момент времени T5, при этом считываются записи с R1 по R11, что обозначено голубым цветом на рисунке. В этот момент sql_last_value равно T5, полученному из R11.

Если после завершения чтения от MySQL Logstash данных, одновременно T5, но и запись в MySQL. Следующий опрос будет тянуть только к записи больше, чем T5, что означает, что R12 будет потерян. Как показано, голубые и серые области представляют раз, когда предыдущие опрос и записи приобретены.

Обратите внимание, что R12 при таких сценариях никогда не будет записан в Elasticsearch.

Интуитивная сцена вторая

Чтобы решить эту проблему, некоторые люди могут задаться вопросом, возможно ли изменить больше чем (>) в предложении where на больше или равно (>=). Оператор SELECT выглядит следующим образом:

statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >= :sql_last_value) ORDER BY modification_time ASC"

Этот подход на самом деле не идеален. В этом случае некоторые документы могут быть прочитаны дважды и многократно записаны в ElasticSearch. Хотя это не влияет на правильность результата, но выполняет избыточную работу. Как показано на рисунке ниже, первый опрос Logstash такой же, как и в сценарии 1, а голубая область указывает на прочитанные записи.

Второй опрос Logstash прочитает все записи, большие или равные T5. Как показано на рисунке ниже, обратите внимание, что R11, фиолетовая область, снова будет отправлена ​​в ElasticSearch.

Реализация обоих сценариев не идеальна. Первый сценарий приводит к потере данных, что недопустимо. Сценарий 2, есть проблема повторного чтения и записи, хотя это не влияет на корректность данных, но выполняется избыточный ввод-вывод.

Конечное решение

Впереди две программы невыполнимы, нам нужно продолжать искать другие решения. На самом деле, очень просто, указав (UNIX_TIMESTAMP(modification_time)>:sql_last_value ANDmodification_time

Как показано на рисунке ниже, опрос Logstash происходит в момент времени T5. Поскольку указано время модификации

Начать следующий опрос, текущее время T10.

Поскольку установлено значение UNIX_TIMESTAMP(modification_time) > :sql_last_value, а текущее значение sql_last_value равно T4, этот опрос начнется с T5. А modification_time

Таким образом, каждая запись в MySQL может быть прочитана ровно один раз, что позволяет избежать проблемы потери данных или повторного чтения в текущем интервале времени, которые могут быть вызваны каждым опросом.

Тест системы

Простой тест может помочь нам проверить желаемую конфигурацию. Мы можем записать некоторые данные в базу данных следующим образом:

INSERT INTO es_table (id, client_name) VALUES (1, 'Jim Carrey');
INSERT INTO es_table (id, client_name) VALUES (2, 'Mike Myers');
INSERT INTO es_table (id, client_name) VALUES (3, 'Bryan Adams');

Как только подключаемый модуль ввода JDBC инициирует выполнение, все записи будут считаны из MySQL и записаны в ElasticSearch. Мы можем просматривать документы в ElasticSearch с помощью операторов запроса.

GET rdbms_sync_idx/_search

Результат выполнения следующий:

"hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "rdbms_sync_idx",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "insertion_time" : "2019-06-18T12:58:56.000Z",
          "@timestamp" : "2019-06-18T13:04:27.436Z",
          "modification_time" : "2019-06-18T12:58:56.000Z",
          "client_name" : "Jim Carrey"
        }
      },
Etc …

Обновите документ с id=1 следующим образом:

UPDATE es_table SET client_name = 'Jimbo Kerry' WHERE id=1;

При _id = 1 достигается правильное обновление документа. Просмотрите документацию, выполнив следующую команду:

GET rdbms_sync_idx/_doc/1

Результат ответа следующий:

{
  "_index" : "rdbms_sync_idx",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 3,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "insertion_time" : "2019-06-18T12:58:56.000Z",
    "@timestamp" : "2019-06-18T13:09:30.300Z",
    "modification_time" : "2019-06-18T13:09:28.000Z",
    "client_name" : "Jimbo Kerry"
  }
}

Для _версии документа установлено значение 2, время модификации и время_вставки больше не совпадают, а имя_клиента обновлено правильно. И @timestamp, не то, на что нужно обращать внимание, он добавляется Logstash по умолчанию.

Обновите, чтобы добавить оператор выполнения upsert следующим образом:

INSERT INTO es_table (id, client_name) VALUES (4, 'Bob is new') ON DUPLICATE KEY UPDATE client_name='Bob exists already';

Как и прежде, мы можем убедиться в правильности синхронизации, просмотрев соответствующую документацию в ElasticSearch.

Удаление документа

Не знаю, обнаружили ли вы, что если документ удаляется из MySQL, он не будет синхронизирован с ElasticSearch. Что касается этого вопроса, вот несколько вариантов, которые мы можем рассмотреть:

Запись в MySQL может использоваться для указания того, является ли запись действительной, путем включения поля is_deleted. Как только произойдет обновление, is_deleted также будет синхронно обновлен до ElasticSearch. Таким образом, при выполнении запроса MySQL или ElasticSearch нам нужно переписать оператор запроса, чтобы отфильтровать записи, для которых is_deleted имеет значение true. В то же время для удаления этих документов в MySQL и ElasticSearch требуется какой-то фоновый процесс.

Другое необязательное решение заключается в том, что система приложений отвечает за удаление данных в MySQL и ElasticSearch, то есть когда система приложений удаляет данные в MySQL, она также отвечает за удаление соответствующих документов в ElasticSearch.

Суммировать

В этой статье описывается, как синхронизировать данные между реляционной базой данных и ElasticSearch через Logstash. В этой статье в качестве примера используется MySQL, но теоретически продемонстрированные методы и коды должны быть в равной степени применимы и к другим реляционным базам данных.