logstash-7.1.1 импортировать несколько таблиц в ES

Logstash

Не хотите слушать мою многословность, можете посмотреть прямо на итоговую конфигурацию законченныйШаг за шагом реализовать logstash для синхронизации данных MySQL с ESПосле этого я хотел попробовать сохранить несколько таблиц в ES.Я изначально думал, что это можно сделать, просто скопировав еще несколько JDBC и добавив тип, но трагическое открытие оказалось проблемой, поэтому я буду записывать проблемы, которые здесь нужно внимание.

Конфигурации, требующие внимания

tracking_column

Сообщение об ошибке lastmodifiedTime должно быть в наборе результатов вашего запроса tracking_column => "lastmodifiedTime"

[2019-06-03T00:37:00,342][INFO ][logstash.inputs.jdbc     ] (0.004409s) SELECT version()
[2019-06-03T00:37:00,369][INFO ][logstash.inputs.jdbc     ] (0.002446s) SELECT count(*) AS `count` FROM (SELECT id, nickname FROM sys_user WHERE lastmodified_time > '1970-01-01 08:00:00'
) AS `t1` LIMIT 1
[2019-06-03T00:37:00,401][INFO ][logstash.inputs.jdbc     ] (0.008020s) SELECT * FROM (SELECT id, nickname FROM sys_user WHERE lastmodified_time > '1970-01-01 08:00:00'
) AS `t1` LIMIT 50000 OFFSET 0
[2019-06-03T00:37:00,402][WARN ][logstash.inputs.jdbc     ] tracking_column not found in dataset. {:tracking_column=>"lastmodified_time"}
{ 3362 rufus-scheduler intercepted an error:
  3362   job:
  3362     Rufus::Scheduler::CronJob "* * * * *" {}
  3362   error:
  3362     3362
  3362     TypeError
  3362     no implicit conversion of NilClass into String
  3362       uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date/format.rb:335:in `_parse'
  3362       uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date.rb:734:in `parse'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/value_tracking.rb:73:in `set_value'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/jdbc.rb:237:in `execute_statement'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb:277:in `execute_query'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb:258:in `block in run'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:234:in `do_call'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:258:in `do_trigger'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:300:in `block in start_work_thread'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:299:in `block in start_work_thread'
  3362       org/jruby/RubyKernel.java:1425:in `loop'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:289:in `block in start_work_thread'
  3362   tz:
  3362     ENV['TZ']: 
  3362     Time.now: 2019-06-03 00:37:00 -0700

Подсказка: tracking_column не найден в наборе данных. {:tracking_column=>"lastmodified_time"} Добавьте lastmodified_time в slq

SELECT id, nickname,lastmodified_time FROM sys_user WHERE lastmodified_time > '1970-01-01 08:00:00' 

Выходной индекс не поддерживает формат горба и начинает устанавливаться в sysUser, и сообщается об ошибке

сообщение об ошибке

  [2019-06-03T01:55:00,698][ERROR][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"13", :_index=>"sysUser", :_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x3e36d2a5>], :response=>{"index"=>{"_index"=>"sysUser", "_type"=>"_doc", "_id"=>"13", "status"=>400, "error"=>{"type"=>"invalid_index_name_exception", "reason"=>"Invalid index name [sysUser], must be lowercase", "index_uuid"=>"_na_", "index"=>"sysUser"}}}}

Совет: не удалось проиндексировать событие в Elasticsearch.

После изменения имени индекса это нормально

      elasticsearch {
         hosts => "http://192.168.177.128:9200"
         index => "user"
         document_id => "%{id}"
         #document_id => "%{userId}"
         user => "elastic"
         password => "123456"
         }

При печати информации о двух конфигурациях ошибка сообщается

output {
  if[type] =="sys_user"{
      stdout {
 	#打印信息的时候 不同配置的id 不能一样
        id=>"%{id}"
           }
      }
  if[type] == "article"{
      stdout {
      #打印信息的时候 不同配置的id 不能一样
       # id=>"%{id}"
          }
      }
}

После исправления не забудьте указать UserID и ArticleID в качестве псевдонима для набора поиска. Когда я думал в первый раз, второе условие индекса типа никогда не импортировало ошибку ES, и это было причиной, а позже тест обнаружил, что эта проблема необъяснима, подозревается, что это проблема формата оболочки или горба. Именование индекса ведет


output {
  if[type] =="sys_user"{
      stdout {
       #打印信息的时候 不同配置的id 不能一样
          id=>"%{userId}"
          }
      
      }
  if[type] == "article"{
      stdout {
     #打印信息的时候 不同配置的id 不能一样
     id=>"%{articleId}"
        }
      }
}

окончательная конфигурация

  • Запустите файл конфигурации myconf.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  jdbc{
    # mysql 数据库链接
    jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/vue_article?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    # 用户名和密码
    jdbc_user => "root"
    jdbc_password => "123456*"
    #驱动
    jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
    # 驱动类名
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    jdbc_default_timezone =>"Asia/Shanghai"
    # mysql文件, 也可以直接写SQL语句在此处,如下:
    # statement => "SELECT id as articleId,author,title FROM article"
    statement_filepath => "./config/vue_article.sql"
    # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
    schedule => "* * * * *"
    type => "article"
    # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
    use_column_value => true
    # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
    tracking_column => "lastmodifiedTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_article_last_time"
    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => false
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
  jdbc{
    # mysql 数据库链接
    jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/vue_authentication?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    # 用户名和密码
    jdbc_user => "root"
    jdbc_password => "123456*"
    #驱动
    jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
    # 驱动类名
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    jdbc_default_timezone =>"Asia/Shanghai"
    # mysql文件, 也可以直接写SQL语句在此处,如下:
    # statement => "SELECT id,nickname FROM sys_user"
    statement_filepath => "./config/vue_authentication_sys_user.sql"
    # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
    schedule => "* * * * *"
    type => "sys_user"
    # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
    use_column_value => true
    # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键, 
    # lastmodifiedTime一定要在你查询的结果集里面
    tracking_column => "lastmodifiedTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_sys_user_last_time"
    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => false
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
}

output {
  if[type] =="sys_user"{
      # stdout {
     #  #打印信息的时候 不同配置的id 不能一样
     #  # id=>"%{id}"
      #    id=>"%{userId}"
      #     }
      #
      elasticsearch {
         hosts => "http://192.168.177.128:9200"
         index => "user"
         document_id => "%{id}"
         #document_id => "%{userId}"
         user => "elastic"
         password => "123456"
         }
      }
  if[type] == "article"{
     # stdout {
     #  #打印信息的时候 不同配置的id 不能一样
     #  # id=>"%{id}"
     #      id=>"%{articleId}"
     #     }
     #
     elasticsearch {
        hosts => "http://192.168.177.128:9200"
        index => "article"
        document_id => "%{id}"
	# 生产最好不同配置的id 
        #document_id => "%{articleId}"
        user => "elastic"
        password => "123456"
        } 
      }
}

  • vue_authentication_sys_user.sql
SELECT id,avatar,avatar_large as avatarLarge, create_date as createDate ,lastmodified_time as lastmodifiedTime, nickname,username,role_id as roleId,city,cover,gender,motto,rank FROM sys_user WHERE lastmodified_time >= :sql_last_value  ORDER BY lastmodified_time
  • vue_article.sql
SELECT id, author,author_id as authorId, avatar,coment_times as comentTimes, content, create_date as createDate, des,image_url as imageUrl, lastmodified_time as lastmodifiedTime, likes,read_times as readTimes, status, tag, tag_id as tagId, title, user_id as userId FROM article WHERE lastmodified_time >= :sql_last_value ORDER BY lastmodified_time

Дополнительные инструкции

  1. Почему эти два SQL должны использовать ORDER BY lastmodified_time?Если есть 1000 частей данных, logstash берет 100 частей данных в соответствии со временем, а время последней части является наименьшим из оставшихся 900 частей времени, я изначально думал, что это так, но это не так, я попросил взглянуть на БД и понять разницу между myisam и innodb, наша база данных в основном innodb, а общий индекс по умолчанию равен id. При нормальных обстоятельствах время создания create_date будет в том же порядке, что и id, но время обновления данных lastmodified_time не обязательно, поэтому здесь требуется ORDER BY.
  1. Зачем использовать больше или равно, если в одно и то же время есть 1000, 101 данных, принимая 100, вы потеряете часть данных, поэтому вам нужно быть больше или равно, что будет генерировать данные за то же время даже если он не будет обновлен, он все равно будет импортирован в ES; поэтому установка размера jdbc_page_size зависит от вашего бизнеса

15 июня 2019 00:40:38 При развёртывании очередного комплекта синхронизации данных обнаружилась большая яма, необъяснимая проблема, упомянутая ранее, опять в норме, а не причина индекса; При печати лога обнаружил, что не могу ввести второго sys_user.Обнаружил, что в базе есть поле type.Logstash фильтрует через регулярные выражения, поэтому не попадет в это условное суждение, для проверки добавил новый условие, Это значение типа базы данных, и обнаружено, что информация действительно печатается... Безмолвие, здесь можно решить, запросив все поля и используя псевдонимы.Если дело не так много, вы можете рассмотреть возможность изменения данных поля

персональный сайт