ElasticSearch + Logstash для синхронизации базы данных

Elasticsearch

вводить

Когда мы используем mysql и elasticsearch в комбинации, могут возникнуть некоторые требования к синхронизации.На самом деле существует много способов синхронизировать базу данных и elasticsearch.

Вы можете использовать канал, который в основном отслеживает журнал binlog mysql и может отслеживать некоторые изменения в данных.Если данные изменяются, какую логику нам нужно делать, это можно реализовать искусственно.Он имитирует себя как подчиненный узел. Когда данные главного узла изменяются, он может видеть изменения в данных. Тем не менее, недостатки также очевидны.Поскольку он реализован на java, он относительно тяжелый, и для управления узлами канала необходимо использовать инструменты управления кластером, такие как zookeeper, поэтому в этой статье этот метод пока не рассматривается.

В этой статье в основном рассказывается об использовании Logstash JDBC для достижения синхронизации, которую относительно просто синхронизировать. Конечно, у него есть некоторые недостатки, то есть он потребляет мало памяти (если памяти много, я этого не говорил).

отображение финального эффекта

  1. Давайте сначала посмотрим на данные ElasticSearch, в которых лучше различать эффект, Из результатов ответа мы видим, что есть три фрагмента данных с идентификаторами 1, 2 и 3.

Выполнить оператор запроса

GET /myapp/_search
{
  "_source": "id"
}

результат ответа

{
  "took": 5,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 3,
    "max_score": 1,
    "hits": [
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "2",
        "_score": 1,
        "_source": {
          "id": 2
        }
      },
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "1",
        "_score": 1,
        "_source": {
          "id": 1
        }
      },
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "3",
        "_score": 1,
        "_source": {
          "id": 3
        }
      }
    ]
  }
}
  1. Теперь давайте изменим и добавим часть данных, чтобы увидеть изменения в данных es

Вот текущие данные базы, видно что в ней 3 записи.

mysql> select * from user;
+------+----------+-------------+------------+-------+------------+---------+
| id   | name     | phone       | password   | title | content    | article |
+------+----------+-------------+------------+-------+------------+---------+
|    1 | zhnagsan | 181222      | 123123     | ???   | ??????     | ???IE   |
|    2 | lishi    | 181222113   | 232123123  | 23??? | 234??????  | 4???IE  |
|    3 | wangwu   | 18111214204 | 1337547531 | ????? | lc content | Java    |
+------+----------+-------------+------------+-------+------------+---------+
3 rows in set (0.00 sec)
mysql>

Теперь мы выполняем sql, чтобы добавить в него часть данных.

mysql> insert into user (id, name, phone, password, title, content, article) values (4, "lc", "123456789", "123456789", "测试", "测试内容", "Java") 
Query OK, 1 row affected (0.00 sec)
mysql>
  1. Выполните оператор запроса в следующий раз, посмотрите на изменения данных es, и вы увидите, что есть дополнительные данные с идентификатором 4, и по умолчанию будет задержка в 1 минуту в средней синхронизации.

выполнить поиск

GET /myapp/_search
{
  "_source": "id"
}

результат ответа

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 4,
    "max_score": 1,
    "hits": [
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "2",
        "_score": 1,
        "_source": {
          "id": 2
        }
      },
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "4",
        "_score": 1,
        "_source": {
          "id": 4
        }
      },
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "1",
        "_score": 1,
        "_source": {
          "id": 1
        }
      },
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "3",
        "_score": 1,
        "_source": {
          "id": 3
        }
      }
    ]
  }
}

окрестности

Виртуальная машина: VMware 11.0.2 Система оператора: CentOS версии 6.9 (финальная) Эластичный поиск: 6.4.0 Версия Кибаны: 6.4.0 Версия LogStash: 6.6.1 Версия JDK: 1.8.0_181 Версия MySQL: 5.1.73 (эта версия устанавливается непосредственно с yum. На самом деле, это руководство имеет мало общего с версией mysql, потому что в то время для подключения к базе данных использовался пакет драйвера jdbc) пакет драйвера logstash jdbc версии 5.1.46

Получите необходимую среду

В этой статье пока указан только адрес для скачивания (временно краду большой ленивый 😄, порядок установки в порядке ссылки), logstash даст подробную инструкцию по использованию, его не нужно устанавливать, его нужно быть распакован.

Виртуальная машина: VMware 11.0.2

Система оператора: CentOS версии 6.9 (финальная)

Версия JDK: 1.8.0_181

Эластичный поиск: 6.4.0

Версия Кибаны: 6.4.0

Версия логсташа: 6.6.1

Пакет драйверов logStash

используя логсташ

Если окружение установлено, то видно следующее, а если не установлено, то видно😁

Введение в логсташ

Logstash — это механизм сбора данных с открытым исходным кодом с возможностями конвейера в реальном времени. Logstash может динамически объединять данные из разрозненных источников данных и нормализовать данные в соответствии с вашим выбором.

Централизуйте, преобразовывайте и храните свои данные

Logstash — это конвейер обработки данных на стороне сервера с открытым исходным кодом, который одновременно берет данные из нескольких источников данных, преобразует их и отправляет в ваше любимое «хранилище». (Наш любимый, конечно же, Elasticsearch)

Ввод: захват данных различных стилей, размеров и источников.

Данные часто существуют во многих системах в различных формах, разбросанных или централизованных. Logstash поддерживает различные параметры ввода и может одновременно фиксировать события из многих распространенных источников. С легкостью получайте данные из журналов, метрик, веб-приложений, хранилищ данных и различных сервисов AWS в виде непрерывной потоковой передачи.

Фильтры: анализ и преобразование данных в режиме реального времени.

По мере того как данные перемещаются из источника в репозиторий, фильтры Logstash анализируют отдельные события, определяют именованные поля для создания структуры и преобразовывают их в общий формат для более простого и быстрого анализа и повышения ценности для бизнеса. Logstash может динамически преобразовывать и анализировать данные независимо от формата или сложности:

  1. Получение структуры из неструктурированных данных с помощью Grok
  2. Расшифровка географических координат по IP-адресам
  3. Анонимизируйте данные PII и полностью исключите конфиденциальные поля.
  4. Общая обработка не зависит от источника данных, формата или схемы

Экспорт: выберите хранилище, экспортируйте данные

Хотя Elasticsearch является нашим основным направлением вывода и открывает бесконечные возможности для нашего поиска и аналитики, это не единственный вариант. Logstash предлагает множество вариантов вывода, вы можете отправлять данные куда угодно, и он обладает гибкостью, позволяющей разблокировать многочисленные последующие варианты использования.

Установить логсташ

Во-первых, давайте протестируем Logstash, который мы только что установили, с помощью самого простого конвейера Logstash. Конвейеры Logstash имеют два обязательных элемента: ввод и вывод, а также необязательный фильтр элементов. Плагины ввода используют данные из источника данных, плагины фильтров изменяют данные в соответствии с вашими ожиданиями, а плагины вывода записывают данные в места назначения.

Далее введите следующую команду из командной строки

bin/logstash -e 'input { stdin {} } output { stdout {} }'
选项 -e 的意思是允许你从命令行指定配置

Когда запуск будет завершен, он будет ждать вашего ввода, вы можете ввестиhello worldПопробуйте, это даст вам некоторую обратную связь об информации.

Синхронизация Mysql и ElasticSearch с помощью logstash

Подготовьте пакет драйвера JDBC

  1. Во-первых, поместите пакет драйвера jdbc по ссылке для скачивания, которую мы только что дали, в каталог logstash.

  1. распаковать этот файл
[root@localhost logstash-6.6.1]# unzip mysql-connector-java-5.1.46.zip

Сгенерируйте файл mysqlsyn.conf

  1. Войдите в каталог конфигурации и создайте файл mysqlsyn.conf.

2. Откройте файл с помощью редактора vim, добавьте в него следующее содержимое, сохраните и выйдите.

	input {
			jdbc {
			 		# jdbc驱动包位置
					jdbc_driver_library => "/mnt/logstash-6.6.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
					# 要使用的驱动包类,有过java开发经验的应该很熟悉这个了,不同的数据库调用的类不一样。
					jdbc_driver_class => "com.mysql.jdbc.Driver"
					 # myqsl数据库的连接信息
					jdbc_connection_string => "jdbc:mysql://0.0.0.0:3306/myapp"
					 # mysql用户
					jdbc_user => "root"
					 # mysql密码
					jdbc_password => "root"
					# 定时任务, 多久执行一次查询, 默认一分钟,如果想要没有延迟,可以使用 schedule => "* * * * * *"
					schedule => "* * * * *"
					 # 你要执行的语句
					statement => "select * from user"
			}
	}

	output {
			# 将数据输出到ElasticSearch中
			  elasticsearch {
			  		# es ip加端口
					hosts => ["0.0.0.0:9200"]
					# es文档索引
					index => "myusreinfo"
					# es文档数据的id,%{id}代表的是用数据库里面记录的id作为文档的id
					document_id => "%{id}"
	  }
	}

Запустите logstash для синхронизации

Файл mysqlsyn.conf мы сгенерировали выше.Дальше будем использовать logstash для синхронизации данных.Перед синхронизацией смотрим данные пользовательской таблицы нашей БД.

  1. Глядя на данные mysql, мы видим, что у нас все еще есть только первые 4 фрагмента данных.

  1. Проверьте, есть ли в ElasticSearchmyusreinfoЭтот показатель, как видно из рисунка, у нас есть толькоmyappэтот индекс.

  1. Запустите logstash с файлом конфигурации
[root@localhost logstash-6.6.1]# ./bin/logstash -f config/mysqlsyn.conf
-f 指定配置文件启动

Запуск прошел успешно, и данные уже синхронизируются.По умолчанию эта синхронизация выполняется раз в минуту.Вы можете видеть, что она выполняется 5 раз за 5 минут.

Проверьте эффект синхронизации

  1. Синхронизация запущена выше.Теперь посмотрим есть ли данные в ElasticSearch.Из рисунка видно, что myusrinfo синхронизировался с es, и мы видим, чтоdocs.countЧисло — это количество данных в базе данных, которую мы только что получили.

  1. Добавляем кусок данных в базу, а потом смотрим, изменятся ли данные ElasticSearch

  1. Проверяем, есть ли только что добавленные данные в ElasticSearch, по рисунку видно, что уже 5 кусков данных

  1. Сначала посмотрите на данные с идентификатором 5 в ElasticSearch, вы можете увидеть, что имяyinya

  1. База данных изменяет часть данных с идентификатором 5 и видит изменения данных ElasticSearch.

  1. Проверьте, изменились ли данные в ElasticSearch, вы видите, что данные изменились

  1. Удалите две части данных, чтобы увидеть изменения данных ElasticSearch, удалите две части данных с идентификатором 1 и 2.

  1. Проверить, изменились ли данные в ElasticSearch, но данные не изменились

Залить холодной водой синхронизацию операций удаления в реальном времени

До сих пор все плагины и информация о синхронизации в реальном времени, найденная в Google, stackoverflow, elastic.co и github, говорят нам, что в настоящее время нет хорошего решения для синхронного удаления. Компромиссное решение выглядит следующим образом: Обсуждение программы:обсудить.эластик.со/он/удалить-плохо…

stackoverflow.com/questions/3…

  1. Вариант первый

В исходной таблице базы данных mysql добавлен новый статус поля. Значение по умолчанию - ok. Если вы хотите удалить данные, вы можете использовать операцию обновления и изменить статус на удаленный, чтобы его можно было синхронизировать с es. В es значение состояния используется, чтобы определить, существует ли строка данных. удалено означает удалено, ок означает нормально.

  1. Вариант 2

Синхронизация достигается с помощью плагина go elasticsearch.