вводить
Когда мы используем mysql и elasticsearch в комбинации, могут возникнуть некоторые требования к синхронизации.На самом деле существует много способов синхронизировать базу данных и elasticsearch.
Вы можете использовать канал, который в основном отслеживает журнал binlog mysql и может отслеживать некоторые изменения в данных.Если данные изменяются, какую логику нам нужно делать, это можно реализовать искусственно.Он имитирует себя как подчиненный узел. Когда данные главного узла изменяются, он может видеть изменения в данных. Тем не менее, недостатки также очевидны.Поскольку он реализован на java, он относительно тяжелый, и для управления узлами канала необходимо использовать инструменты управления кластером, такие как zookeeper, поэтому в этой статье этот метод пока не рассматривается.
В этой статье в основном рассказывается об использовании Logstash JDBC для достижения синхронизации, которую относительно просто синхронизировать. Конечно, у него есть некоторые недостатки, то есть он потребляет мало памяти (если памяти много, я этого не говорил).
отображение финального эффекта
- Давайте сначала посмотрим на данные ElasticSearch, в которых лучше различать эффект, Из результатов ответа мы видим, что есть три фрагмента данных с идентификаторами 1, 2 и 3.
Выполнить оператор запроса
GET /myapp/_search
{
"_source": "id"
}
результат ответа
{
"took": 5,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1,
"hits": [
{
"_index": "myapp",
"_type": "doc",
"_id": "2",
"_score": 1,
"_source": {
"id": 2
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "1",
"_score": 1,
"_source": {
"id": 1
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "3",
"_score": 1,
"_source": {
"id": 3
}
}
]
}
}
- Теперь давайте изменим и добавим часть данных, чтобы увидеть изменения в данных es
Вот текущие данные базы, видно что в ней 3 записи.
mysql> select * from user;
+------+----------+-------------+------------+-------+------------+---------+
| id | name | phone | password | title | content | article |
+------+----------+-------------+------------+-------+------------+---------+
| 1 | zhnagsan | 181222 | 123123 | ??? | ?????? | ???IE |
| 2 | lishi | 181222113 | 232123123 | 23??? | 234?????? | 4???IE |
| 3 | wangwu | 18111214204 | 1337547531 | ????? | lc content | Java |
+------+----------+-------------+------------+-------+------------+---------+
3 rows in set (0.00 sec)
mysql>
Теперь мы выполняем sql, чтобы добавить в него часть данных.
mysql> insert into user (id, name, phone, password, title, content, article) values (4, "lc", "123456789", "123456789", "测试", "测试内容", "Java")
Query OK, 1 row affected (0.00 sec)
mysql>
- Выполните оператор запроса в следующий раз, посмотрите на изменения данных es, и вы увидите, что есть дополнительные данные с идентификатором 4, и по умолчанию будет задержка в 1 минуту в средней синхронизации.
выполнить поиск
GET /myapp/_search
{
"_source": "id"
}
результат ответа
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 4,
"max_score": 1,
"hits": [
{
"_index": "myapp",
"_type": "doc",
"_id": "2",
"_score": 1,
"_source": {
"id": 2
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "4",
"_score": 1,
"_source": {
"id": 4
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "1",
"_score": 1,
"_source": {
"id": 1
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "3",
"_score": 1,
"_source": {
"id": 3
}
}
]
}
}
окрестности
Виртуальная машина: VMware 11.0.2 Система оператора: CentOS версии 6.9 (финальная) Эластичный поиск: 6.4.0 Версия Кибаны: 6.4.0 Версия LogStash: 6.6.1 Версия JDK: 1.8.0_181 Версия MySQL: 5.1.73 (эта версия устанавливается непосредственно с yum. На самом деле, это руководство имеет мало общего с версией mysql, потому что в то время для подключения к базе данных использовался пакет драйвера jdbc) пакет драйвера logstash jdbc версии 5.1.46
Получите необходимую среду
В этой статье пока указан только адрес для скачивания (временно краду большой ленивый 😄, порядок установки в порядке ссылки), logstash даст подробную инструкцию по использованию, его не нужно устанавливать, его нужно быть распакован.
Виртуальная машина: VMware 11.0.2
Система оператора: CentOS версии 6.9 (финальная)
используя логсташ
Если окружение установлено, то видно следующее, а если не установлено, то видно😁
Введение в логсташ
Logstash — это механизм сбора данных с открытым исходным кодом с возможностями конвейера в реальном времени. Logstash может динамически объединять данные из разрозненных источников данных и нормализовать данные в соответствии с вашим выбором.
Централизуйте, преобразовывайте и храните свои данные
Logstash — это конвейер обработки данных на стороне сервера с открытым исходным кодом, который одновременно берет данные из нескольких источников данных, преобразует их и отправляет в ваше любимое «хранилище». (Наш любимый, конечно же, Elasticsearch)
Ввод: захват данных различных стилей, размеров и источников.
Данные часто существуют во многих системах в различных формах, разбросанных или централизованных. Logstash поддерживает различные параметры ввода и может одновременно фиксировать события из многих распространенных источников. С легкостью получайте данные из журналов, метрик, веб-приложений, хранилищ данных и различных сервисов AWS в виде непрерывной потоковой передачи.
Фильтры: анализ и преобразование данных в режиме реального времени.
По мере того как данные перемещаются из источника в репозиторий, фильтры Logstash анализируют отдельные события, определяют именованные поля для создания структуры и преобразовывают их в общий формат для более простого и быстрого анализа и повышения ценности для бизнеса. Logstash может динамически преобразовывать и анализировать данные независимо от формата или сложности:
- Получение структуры из неструктурированных данных с помощью Grok
- Расшифровка географических координат по IP-адресам
- Анонимизируйте данные PII и полностью исключите конфиденциальные поля.
- Общая обработка не зависит от источника данных, формата или схемы
Экспорт: выберите хранилище, экспортируйте данные
Хотя Elasticsearch является нашим основным направлением вывода и открывает бесконечные возможности для нашего поиска и аналитики, это не единственный вариант. Logstash предлагает множество вариантов вывода, вы можете отправлять данные куда угодно, и он обладает гибкостью, позволяющей разблокировать многочисленные последующие варианты использования.
Установить логсташ
Во-первых, давайте протестируем Logstash, который мы только что установили, с помощью самого простого конвейера Logstash. Конвейеры Logstash имеют два обязательных элемента: ввод и вывод, а также необязательный фильтр элементов. Плагины ввода используют данные из источника данных, плагины фильтров изменяют данные в соответствии с вашими ожиданиями, а плагины вывода записывают данные в места назначения.
Далее введите следующую команду из командной строки
bin/logstash -e 'input { stdin {} } output { stdout {} }'
选项 -e 的意思是允许你从命令行指定配置
Когда запуск будет завершен, он будет ждать вашего ввода, вы можете ввестиhello world
Попробуйте, это даст вам некоторую обратную связь об информации.
Синхронизация Mysql и ElasticSearch с помощью logstash
Подготовьте пакет драйвера JDBC
- Во-первых, поместите пакет драйвера jdbc по ссылке для скачивания, которую мы только что дали, в каталог logstash.
- распаковать этот файл
[root@localhost logstash-6.6.1]# unzip mysql-connector-java-5.1.46.zip
Сгенерируйте файл mysqlsyn.conf
- Войдите в каталог конфигурации и создайте файл mysqlsyn.conf.
2. Откройте файл с помощью редактора vim, добавьте в него следующее содержимое, сохраните и выйдите.
input {
jdbc {
# jdbc驱动包位置
jdbc_driver_library => "/mnt/logstash-6.6.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
# 要使用的驱动包类,有过java开发经验的应该很熟悉这个了,不同的数据库调用的类不一样。
jdbc_driver_class => "com.mysql.jdbc.Driver"
# myqsl数据库的连接信息
jdbc_connection_string => "jdbc:mysql://0.0.0.0:3306/myapp"
# mysql用户
jdbc_user => "root"
# mysql密码
jdbc_password => "root"
# 定时任务, 多久执行一次查询, 默认一分钟,如果想要没有延迟,可以使用 schedule => "* * * * * *"
schedule => "* * * * *"
# 你要执行的语句
statement => "select * from user"
}
}
output {
# 将数据输出到ElasticSearch中
elasticsearch {
# es ip加端口
hosts => ["0.0.0.0:9200"]
# es文档索引
index => "myusreinfo"
# es文档数据的id,%{id}代表的是用数据库里面记录的id作为文档的id
document_id => "%{id}"
}
}
Запустите logstash для синхронизации
Файл mysqlsyn.conf мы сгенерировали выше.Дальше будем использовать logstash для синхронизации данных.Перед синхронизацией смотрим данные пользовательской таблицы нашей БД.
- Глядя на данные mysql, мы видим, что у нас все еще есть только первые 4 фрагмента данных.
- Проверьте, есть ли в ElasticSearch
myusreinfo
Этот показатель, как видно из рисунка, у нас есть толькоmyapp
этот индекс.
- Запустите logstash с файлом конфигурации
[root@localhost logstash-6.6.1]# ./bin/logstash -f config/mysqlsyn.conf
-f 指定配置文件启动
Запуск прошел успешно, и данные уже синхронизируются.По умолчанию эта синхронизация выполняется раз в минуту.Вы можете видеть, что она выполняется 5 раз за 5 минут.
Проверьте эффект синхронизации
- Синхронизация запущена выше.Теперь посмотрим есть ли данные в ElasticSearch.Из рисунка видно, что myusrinfo синхронизировался с es, и мы видим, что
docs.count
Число — это количество данных в базе данных, которую мы только что получили.
- Добавляем кусок данных в базу, а потом смотрим, изменятся ли данные ElasticSearch
- Проверяем, есть ли только что добавленные данные в ElasticSearch, по рисунку видно, что уже 5 кусков данных
- Сначала посмотрите на данные с идентификатором 5 в ElasticSearch, вы можете увидеть, что имя
yinya
- База данных изменяет часть данных с идентификатором 5 и видит изменения данных ElasticSearch.
- Проверьте, изменились ли данные в ElasticSearch, вы видите, что данные изменились
- Удалите две части данных, чтобы увидеть изменения данных ElasticSearch, удалите две части данных с идентификатором 1 и 2.
- Проверить, изменились ли данные в ElasticSearch, но данные не изменились
Залить холодной водой синхронизацию операций удаления в реальном времени
До сих пор все плагины и информация о синхронизации в реальном времени, найденная в Google, stackoverflow, elastic.co и github, говорят нам, что в настоящее время нет хорошего решения для синхронного удаления. Компромиссное решение выглядит следующим образом: Обсуждение программы:обсудить.эластик.со/он/удалить-плохо…
stackoverflow.com/questions/3…
- Вариант первый
В исходной таблице базы данных mysql добавлен новый статус поля. Значение по умолчанию - ok. Если вы хотите удалить данные, вы можете использовать операцию обновления и изменить статус на удаленный, чтобы его можно было синхронизировать с es. В es значение состояния используется, чтобы определить, существует ли строка данных. удалено означает удалено, ок означает нормально.
- Вариант 2
Синхронизация достигается с помощью плагина go elasticsearch.