centos7 mysql: Конфигурация установки MySQL5.7журнал:Версия logstash-7.1.1 скачатьЭП:Конфигурация установкирежим logstash-input-jdbc, этот режим поддерживает только добавление и обновление данных, но не поддерживает операции удаления
Подготовить
Установить logstash-input-jdbc
[root@localhost logstash-7.1.1]# bin/logstash-plugin install logstash-input-jdbc
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful
Загрузите mysql-connector-java-5.1.46.zip и распакуйте его. wget CDN.MySQL.com//загрузки/… unzip mysql-connector-java-5.1.46.zip
[root@localhost logstash-7.1.1]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
--2019-05-30 19:48:30-- https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
Resolving cdn.mysql.com (cdn.mysql.com)... 104.127.195.16
Connecting to cdn.mysql.com (cdn.mysql.com)|104.127.195.16|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4792694 (4.6M) [application/zip]
Saving to: ‘mysql-connector-java-5.1.46.zip’
100%[================================================================================================================================================>] 4,792,694 963KB/s in 5.2s
2019-05-30 19:48:37 (902 KB/s) - ‘mysql-connector-java-5.1.46.zip’ saved [4792694/4792694]
[root@localhost logstash-7.1.1]# unzip mysql-connector-java-5.1.46.zip
[root@localhost logstash-7.1.1]# ls
bin CONTRIBUTORS Gemfile lib logstash-core modules mysql-connector-java-5.1.46.zip tools x-pack
config data Gemfile.lock LICENSE.txt logstash-core-plugin-api mysql-connector-java-5.1.46 NOTICE.TXT vendor
Подготовьте таблицу данных в базе данных (тестовая таблица table1)
INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (1, '测试123');
INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (2, '测试234');
INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (3, '测试345');
INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (4, '测试4');
Создайте файл jdbc.sql в каталоге logstash/config и используйте его позже.
содержание
select * from table1
настроить конфигурацию
- Перейдите в logstash/config и сделайте копию logstash-sample.conf.
cp logstash-sample.conf mylogstash.conf
- редактировать
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
jdbc{
# mysql 数据库链接
# jdbc_connection_string => "jdbc:mysql:192.168.177.128:3306/test_datax_1?characterEncoding=utf8"
jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/test_datax_1?useUnicode=true&characterEncoding=utf-8&useSSL=false"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root123132*"
#驱动
jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
jdbc_default_timezone =>"Asia/Shanghai"
# mysql文件, 也可以直接写SQL语句在此处,如下:
# statement => "select * from t_order where update_time >= :sql_last_value;"
statement_filepath => "./config/jdbc.sql"
# 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
schedule => "* * * * *"
type => "jdbc"
# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
#record_last_run => true
# 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
# use_column_value => true
# 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
# tracking_column => "update_time"
# tracking_column_type => "timestamp"
# last_run_metadata_path => "./logstash_capital_bill_last_id"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
# clean_run => false
# 是否将 字段(column) 名称转小写
lowercase_column_names => false
}
}
output {
# elasticsearch {
# hosts => ["http://localhost:9200"]
# index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
# #user => "elastic"
# #password => "changeme"
# }
#
stdout {
codec => json_lines
}
}
- Выполнить (здесь только печатать)
[root@localhost logstash-7.1.1]# bin/logstash -f config/mylogstash.conf
Вернуть результат после успеха:
{"@timestamp":"2019-05-31T04:02:00.280Z","id":1,"name":"测试123","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:02:00.281Z","id":2,"name":"测试234","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:02:00.281Z","id":3,"name":"测试345","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:02:00.281Z","id":4,"name":"测试4","@version":"1","type":"jdbc"}
[2019-05-30T21:03:00,188][INFO ][logstash.inputs.jdbc ] (0.006211s) SELECT version()
[2019-05-30T21:03:00,201][INFO ][logstash.inputs.jdbc ] (0.010732s) SELECT version()
[2019-05-30T21:03:00,214][INFO ][logstash.inputs.jdbc ] (0.008067s) SELECT count(*) AS `count` FROM (select * from table1
) AS `t1` LIMIT 1
[2019-05-30T21:03:00,232][INFO ][logstash.inputs.jdbc ] (0.006079s) SELECT * FROM (select * from table1
) AS `t1` LIMIT 50000 OFFSET 0
{"@timestamp":"2019-05-31T04:03:00.233Z","id":1,"name":"测试123","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:03:00.234Z","id":2,"name":"测试234","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:03:00.234Z","id":3,"name":"测试345","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:03:00.234Z","id":4,"name":"测试4","@version":"1","type":"jdbc"}
^C[2019-05-30T21:03:45,379][ERROR][org.logstash.Logstash ] org.jruby.exceptions.ThreadKill
Добавьте поля, зависящие от обновления. Например, при добавлении новых данных вы хотите обновить соответствующие данные в ES в соответствии с изменениями в данных базы данных. Как правило, в качестве поля обновления устанавливается время обновления. В это время вы также можете напрямую настроить ввод ЧС
- Изменить настройку
input {
jdbc{
# mysql 数据库链接
# jdbc_connection_string => "jdbc:mysql:192.168.177.128:3306/test_datax_1?characterEncoding=utf8"
jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/test_datax_1?useUnicode=true&characterEncoding=utf-8&useSSL=false"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root123132*"
#驱动
jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
jdbc_default_timezone =>"Asia/Shanghai"
# mysql文件, 也可以直接写SQL语句在此处,如下:
# statement => "select * from t_order where update_time >= :sql_last_value;"
statement_filepath => "./config/jdbc.sql"
# 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
schedule => "* * * * *"
type => "jdbc"
# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
# 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => true
# 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
tracking_column => "update_time"
tracking_column_type => "timestamp"
last_run_metadata_path => "./logstash_capital_bill_last_id"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
# clean_run => false
# 是否将 字段(column) 名称转小写
lowercase_column_names => false
}
}
output {
elasticsearch {
hosts => "http://192.168.177.128:9200"
#index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
index => "tabel_test"
# 将表Id作为ES的主键,防止数据重复,这样会影响效率,双刃剑
document_id => "%{id}"
user => "elastic"
password => "123456"
}
# 打印信息,生产环境不需要
stdout {
codec => json_lines
}
}
- Измените конфигурацию vim/jdbc.sql.
select * from table1 WHERE update_time > :sql_last_value
Изменить таблицу базы данных table1, добавить поле update_time
- бегать
[root@localhost logstash-7.1.1]# bin/logstash -f config/mylogstash.conf
Измените время update_time или добавьте новые данные, вы увидите, что информация будет напечатана в фоновом режиме.
- В запросе ES используйте здесьkibanaЗапрос: ПОЛУЧИТЬ /table_test/_search
возвращение
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 8,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "tabel_test",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"update_time" : "2019-05-31T14:00:00.000Z",
"@version" : "1",
"@timestamp" : "2019-05-31T07:32:00.891Z",
"name" : "修改测试1",
"id" : 1,
"type" : "jdbc"
}
},
.....
Это заканчивается здесь
source
Уууу. Кто-нибудь clubs.com/details?ID=…
reference
Elasticsearch — Logstash реализует данные синхронизации mysql для elasticsearch.