Используйте инкрементную синхронизацию Canal Synchronation MySQL для Elasticsearch

Elasticsearch

В этой статье описывается, как использовать канал для поэтапной синхронизации информации базы данных mysql с ElasticSearch. (Примечание: это инкрементно!!!)

1. Введение

1.1 Введение в канал

Canal — это высокопроизводительная система синхронизации данных, основанная на двоичном журнале MySQL. Canal широко используется в Alibaba Group (включаяwww.taobao.com) для обеспечения надежного инкрементного конвейера данных с малой задержкой, адрес github:Github.com/alibaba/misery...

Canal Server может анализировать бинарный журнал mysql и подписываться на изменения данных, а Canal Client может внедрять изменения для трансляции куда угодно, например в базы данных и Apache Kafka.

Он имеет следующие функции:

  1. Поддерживаются все платформы.
  2. Поддержка детального мониторинга системы на базе Prometheus.
  3. Поддержка парсинга и подписки на MySQL binlog различными способами, например через GTID.
  4. Поддержка высокопроизводительной синхронизации данных в реальном времени. (см. Производительность)
  5. И сервер Canal, и клиент Canal поддерживают высокую доступность/масштабируемость на базе Apache ZooKeeper.
  6. Поддержка докера.

недостаток:

Полное обновление не поддерживается, поддерживается только добавочное обновление.

Полный вики-адрес:GitHub.com/Alibaba/Misery…

1.2 Как это работает

Принцип прост:

  1. Канал аналога интерактивного протокола MySQL ведомого устройства, маскируемого ведомым устройством под MySQL, и передаваемого по протоколу пересылки ведущему серверу MySQL.
  2. Мастер MySQL получает запрос на создание дампа и начинает передавать двоичный журнал подчиненному устройству (т.е. каналу).
  3. Canal анализирует объект двоичного журнала в свой собственный тип данных (необработанный поток байтов).

как показано на рисунке:

image

1.3 Синхронизация

При синхронизации данных с es нужно использовать переходник: canal adapter. На данный момент последняя версия 1.1.3, адрес для скачивания:GitHub.com/Alibaba/Misery….

В настоящее время ES, кажется, поддерживает версию 6.x, но не поддерживает версию 7.x! ! !

2. Подготовка

2.1 ЕС и JDK

Для установки es см.:www.dalaoyang.cn/article/78

Чтобы установить jdk, см.:www.dalaoyang.cn/article/16

2.2 Установить канальный сервер

Скачать Canal.deployer-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

распаковать файлы

tar -zxvf canal.deployer-1.1.3.tar.gz

Перейти к распакованной папке

cd canal.deployer-1.1.3

Измените файл conf / example / instance.properties, обращая внимание на следующие точки:

  • canal.instance.master.address: адрес базы данных, например 127.0.0.1:3306
  • canal.instance.dbUsername: пользователь базы данных
  • canal.instance.dbPassword: пароль базы данных

Полное содержание выглядит следующим образом:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=
#canal.instance.tsdb.dbUsername=
#canal.instance.tsdb.dbPassword=

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=12345678
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

# mq config
#canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

Вернитесь в каталог canal.deployer-1.1.3 и запустите канал:

sh bin/startup.sh

Посмотреть журнал:

vi logs/canal/canal.log

Просмотрите журнал конкретного экземпляра:

 vi logs/example/example.log

закрыть команду

sh bin/stop.sh

2.3 Установка канального адаптера

скачать canal.adapter-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz

распаковывать

tar -zxvf canal.adapter-1.1.3.tar.gz

Перейти в разархивированную папку

cd canal.adapter-1.1.3

Измените файл conf/application.yml, в основном обратите внимание на следующее содержимое, поскольку это файл yml, обратите внимание на имена атрибутов, которые я объяснил здесь:

  • server.port: номер порта адаптера канала
  • Canal.conf.canalserHost: адрес и IP-адрес Canal-сервера
  • canal.conf.srcDataSources.defaultDS.url: адрес базы данных
  • canal.conf.srcDataSources.defaultDS.username: имя пользователя базы данных
  • Canal.conf.srcdatasources.defaultds.password: пароль базы данных
  • canal.conf.canalAdapters.groups.outerAdapters.hosts: адрес хоста es, порт tcp

Полное содержание выглядит следующим образом:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null


canal.conf:
  mode: tcp
  canalServerHost: 127.0.0.1:11111
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: 12345678
  canalAdapters:
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: es
        hosts: 127.0.0.1:9300
        properties:
         cluster.name: elasticsearch

Кроме того, вам необходимо настроить файлы conf/es/*.yml, и адаптер автоматически загрузит все файлы конфигурации, заканчивающиеся на .yml, в conf/es. Прежде чем вводить конфигурацию, нам нужно представить структуру таблицы, используемую в этом случае, следующим образом:

CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(200) NOT NULL,
  `address` varchar(1000) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Вам нужно вручную создать индекс в es, например, используйте es-head, чтобы создать его здесь, как показано ниже:

image

Структура тестового индекса выглядит следующим образом:

{
    "mappings":{
        "_doc":{
            "properties":{
                "name":{
                    "type":"text"
                },
                "address":{
                    "type":"text"
                }
            }
        }
    }
}

Далее создаем test.yml (имя файла произвольное), а содержание понятное. _index — имя индекса, а sql — соответствующий оператор. Содержание такое:

dataSourceKey: defaultDS
destination: example
groupId:
esMapping:
  _index: test
  _type: _doc
  _id: _id
  upsert: true
  sql: "select a.id as _id,a.name,a.address from test a"
  commitBatch: 3000

После завершения настройки вернитесь в корневой каталог canal-adapter и выполните команду для запуска

bin/startup.sh

Просмотр журналов

vi logs/adapter/adapter.log

Закройте команду канала-адаптера

bin/stop.sh

3. Тест

После успешного запуска сначала проверьте es-head, как показано на рисунке, сейчас нет данных.

image

Далее вставляем кусок данных в базу для тестирования, инструкция такая:

INSERT INTO `test`.`test`(`id`, `name`, `address`) VALUES (7, '北京', '北京市朝阳区');

Затем посмотрите на es-head следующим образом

image

Далее смотрим лог, как показано ниже:

2019-06-22 17:54:15.385 [pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":7,"name":"北京","address":"北京市朝阳区"}],"database":"test","destination":"example","es":1561197255000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test","ts":1561197255384,"type":"INSERT"} 
Affected indexes: test 

Небольшое знание: метод просмотра журнала, описанный выше, может быть не очень полезным.Рекомендуется использовать следующий синтаксис, например, просмотр последних 200 строк журнала:

tail -200f logs/adapter/adapter.log

4. Резюме

1. Полное обновление невозможно, но возможны добавления, удаления и модификации. 2. Обязательно заранее создайте индекс. 3.es настроен с TCP-портом, например, по умолчанию 9300