Данные подтаблицы подтаблицы микросервиса MySQL для решения синхронизации MongoDB

база данных Микросервисы MySQL MongoDB

При выполнении анализа больших данных в существующей архитектуре первым шагом является изменение данных из реляционной базы данных в нереляционную базу данных.В Интернете есть много решений.Мы также провели много исследований и испытали три набора Решения Практика, закончилось использованием канала. Это статья, написанная коллегой из нашего отдела больших данных Чжаном Тунжуем, чтобы поделиться с вами.Если вы заинтересованы, вы можете представить ее позже.

фон спроса

В последние годы концепция микросервисов продолжает оставаться актуальной, и в Интернете появляется все больше и больше дискуссий о микросервисах и монолитных архитектурах.Сталкиваясь с растущими потребностями бизнеса, многие компании предпочитают использовать микросервисы при обновлении своей технической архитектуры. Моя компания также выбрала это направление для обновления своей технической архитектуры, чтобы обеспечить более широкий доступ и более удобное расширение бизнеса.

проблема найдена

Существует два основных способа разделения микросервисов: разделение бизнес-системы без разделения базы данных, разделение бизнес-системы и разделение библиотеки. Если размер данных небольшой, нет необходимости разделять базу данных, потому что разделение данных неизбежно приведет к таким проблемам, как многомерный запрос данных и межпроцессные транзакции. Однако с развитием бизнеса моей компании один экземпляр базы данных уже не может удовлетворить потребности бизнеса, поэтому я выбрал режим разделения бизнес-системы и разделения базы данных одновременно, поэтому я также столкнулся с вышеуказанными проблемами. В этой статье в основном представлено решение запросов в реальном времени для многомерных данных. Текущая архитектура системы и структура хранения следующие:

Решения

  • Чтобы запросить данные из нескольких баз данных, вам сначала необходимо синхронизировать базы данных вместе, чтобы упростить запрос.

  • Чтобы удовлетворить спрос на большие объемы данных, в качестве библиотеки синхронизации предпочтительнее использовать базу данных NOSQL.

  • Базы данных NOSQL в основном не могут выполнять связанные запросы, поэтому необходимо объединить реляционные данные в нереляционные данные.

  • Многомерные бизнес-запросы требуют производительности в реальном времени, поэтому вам нужно выбрать базу данных с относительно хорошей производительностью в реальном времени в NOSQL: MongoDB.

На основании вышеуказанных идей основная архитектура интеграции данных показана на следующем рисунке:

решение

В настоящее время некоторые случаи синхронизации данных в Интернете делятся на два типа: синхронизация сообщений MQ и синхронизация чтения данных binlog.

Давайте сначала поговорим о синхронизации сообщений MQ.Моя компания некоторое время пробовала этот метод синхронизации и обнаружила следующие проблемы:

  • Данные передаются по всему бизнесу, а сообщения MQ отправляются для критически важных для бизнеса операций с данными, которые сильно зависят от бизнес-системы.

  • Данные о запасах в базе данных необходимо обрабатывать отдельно

  • Для таблицы инструментов также необходимо поддерживать синхронизацию отдельно

  • Каждый раз, когда добавляется новая таблица данных, необходимо повторно добавлять логику MQ.

Учитывая вышеперечисленные проблемы, оптимальное решение для синхронизации данных с помощью MQ

Существуют некоторые зрелые решения для использования метода чтения данных бинарного журнала, такие как вольфрамовый репликатор, но эти инструменты синхронизации могут обеспечить репликацию данных только 1: 1. Добавление пользовательской логики в процесс репликации данных затруднительно и не поддерживает операция сбора данных подбазы данных и подтаблицы. Таким образом, оптимальное решение должно состоять в том, чтобы обрабатывать последующую логику данных самостоятельно после чтения бинарного журнала. В настоящее время наиболее зрелым решением в инструменте чтения бинлога должен быть канал с открытым исходным кодом Alibaba.

canal

canal — это компонент добавочной подписки и потребления binlog базы данных mysql от Alibaba. Alibaba Cloud DRDS, вторичный индекс Alibaba TDDL и репликация небольших таблиц основаны на канале и широко используются. Принцип канала относительно прост:

  • canal имитирует протокол взаимодействия подчиненного сервера mysql, притворяется подчиненным сервером mysql и отправляет протокол дампа мастеру mysql.

  • Мастер mysql получает запрос на дамп и начинает передавать двоичный журнал подчиненному (то есть каналу)

  • канал анализирует двоичный объект журнала (оригинал - поток байтов)

Введение в канал: https://github.com/alibaba/canal/wiki

Я использую канал в режиме HA, выбор доступных экземпляров zookeeper, каждый экземпляр базы данных, конфигурация сервера выглядит следующим образом:

содержание:

conf
    database1
        -instance.properties
    database2
        -instance.properties
    canal.propertiesскопировать код

instance.properties

canal.instance.mysql.slaveId = 1001
canal.instance.master.address = X.X.X.X:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .*\\..*
canal.instance.filter.black.regex =  скопировать код

canal.properties

canal.id= 1
canal.ip=X.X.X.X
canal.port= 11111
canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181
canal.zookeeper.flush.period = 1000
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024 ...
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/default-instance.xmlскопировать код

Поток данных развертывания выглядит следующим образом:

tip:Несмотря на то, что canal поддерживает журналы binlog как смешанного, так и строкового типа, если данные строки представляют собой журнал смешанного типа, имя таблицы невозможно получить, поэтому это решение пока поддерживает только формат строки binlog.

синхронизация данных

Создайте клиентское приложение Canal, чтобы подписаться на данные Binlog, прочитанные каналом Canal

1. Включите подписку на несколько экземпляров и подпишитесь на несколько экземпляров.

public void initCanalStart() {
    List<String> destinations = canalProperties.getDestination();
    final List<CanalClient> canalClientList = new ArrayList<>();
    if (destinations != null && destinations.size() > 0) {
        for (String destination : destinations) {
            // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
            CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", "");
            CanalClient client = new CanalClient(destination, connector);
            canalClientList.add(client);
            client.start();
        }
    }
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            try {
                logger.info("## stop the canal client");
                for (CanalClient canalClient : canalClientList) {
                    canalClient.stop();
                }
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping canal:", e);
            } finally {
                logger.info("## canal client is down.");
            }
        }
    });
}скопировать код

Обработка сообщений о подписке

private void process() {
    int batchSize = 5 * 1024;
    while (running) {
        try {
            MDC.put("destination", destination);
            connector.connect();
            connector.subscribe();
            while (running) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId != -1 && size > 0) {
                    saveEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } catch (Exception e) {
            logger.error("process error!", e);
        } finally {
            connector.disconnect();
            MDC.remove("destination");
        }
    }}скопировать код

Обработка сообщений в соответствии с событиями базы данных, фильтрация списка сообщений и обработка изменений данных.Используемая информация:

  • insert :schemaName,tableName,beforeColumnsList

  • update :schemaName,tableName,afterColumnsList

  • delete :schemaName,tableName,afterColumnsList

RowChange rowChage = null;
    try {
        rowChage = RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
    }
    EventType eventType = rowChage.getEventType();
    logger.info(row_format,
            entry.getHeader().getLogfileName(),
            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
            entry.getHeader().getTableName(), eventType,
            String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime));
    if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
        logger.info(" sql ----> " + rowChage.getSql());
        continue;
    }
    DataService dataService = SpringUtil.getBean(DataService.class);
    for (RowData rowData : rowChage.getRowDatasList()) {
        if (eventType == EventType.DELETE) {
            dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
        } else if (eventType == EventType.INSERT) {
            dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
        } else if (eventType == EventType.UPDATE) {
            dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
        } else {
            logger.info("未知数据变动类型:{}", eventType);
        }
    }
}скопировать код

ColumnsList MongoTemplate преобразован в полезные классы данных: DBObject, способ преобразования типа данных

public static DBObject columnToJson(List<CanalEntry.Column> columns) {
    DBObject obj = new BasicDBObject();
    try {
        for (CanalEntry.Column column : columns) {
            String mysqlType = column.getMysqlType();
            //int类型,长度11以下为Integer,以上为long
            if (mysqlType.startsWith("int")) {
                int lenBegin = mysqlType.indexOf('(');
                int lenEnd = mysqlType.indexOf(')');
                if (lenBegin > 0 && lenEnd > 0) {
                    int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd));
                    if (length > 10) {
                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
                        continue;
                    }
                }
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue()));
            } else if (mysqlType.startsWith("bigint")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
            } else if (mysqlType.startsWith("decimal")) {
                int lenBegin = mysqlType.indexOf('(');
                int lenCenter = mysqlType.indexOf(',');
                int lenEnd = mysqlType.indexOf(')');
                if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) {
                    int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd));
                    if (length == 0) {
                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
                        continue;
                    }
                }
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue()));
            } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue()));
            } else if (mysqlType.equals("date")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue()));
            } else if (mysqlType.equals("time")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue()));
            } else {
                obj.put(column.getName(), column.getValue());
            }
        }
    } catch (ParseException e) {
        e.printStackTrace();        }
    return obj;
}скопировать код

tip:Если объект DBObject используется для одновременного сохранения исходных данных и комбинированных данных или других данных, он должен глубоко скопировать объект для создания копии, а затем использовать копию

сшивание данных

После того, как мы получим данные базы данных, мы выполняем операцию сплайсинга, например двух пользовательских таблиц:

user_info:{id,user_no,user_name,user_password}
user_other_info:{id,user_no,idcard,realname}скопировать код

Данные монго после сплайсинга:

user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})скопировать код

Есть много полученных данных, как я могу просто вызвать строчку данных?

Сначала посмотрите на информацию, которую мы можем получить: schemaName, tableName, DBObject, Event (вставка, обновление, удаление)

Объединение этих информационных идентификаторов вместе, чтобы увидеть: /schemaName/tableName/Event(DBObject), да, это стандартная ссылка для отдыха. Пока мы реализуем простой SpringMVC, мы можем автоматически получать необходимую информацию о данных для операций сплайсинга.

Сначала реализуйте @Controller, определите имя как Schema, а значение соответствует schemaName.

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public  @interface Schema {
 String value() default "";
}скопировать код

Затем реализуйте @RequestMapping, определите имя как Table и напрямую используйте EventType в Canal для соответствия RequestMethod.

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public  @interface Table {
    String value() default "";
    CanalEntry.EventType[] event() default {};
}скопировать код

Затем создайте springUtil, реализуйте интерфейс ApplicationContextAware и инициализируйте две карты при запуске и загрузке приложения: instanceMap, handlerMap

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
    if (SpringUtil.applicationContext == null) {
        SpringUtil.applicationContext = applicationContext;
        //初始化instanceMap数据
        instanceMap();
        //初始化handlerMap数据
        handlerMap();
    }
}private void instanceMap() {
    Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Schema.class);
    for (Object bean : beans.values()) {
        Class<?> clazz = bean.getClass();
        Object instance = applicationContext.getBean(clazz);
        Schema schema = clazz.getAnnotation(Schema.class);
        String key = schema.value();
        instanceMap.put(key, instance);
        logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName());
    }
}private void handlerMap(){  ...}скопировать код

Метод вызова:

public static void doEvent(String path, DBObject obj) throws Exception {
    String[] pathArray = path.split("/");
    if (pathArray.length != 4) {
        logger.info("path 格式不正确:{}", path);
        return;
    }
    Method method = handlerMap.get(path);
    Object schema = instanceMap.get(pathArray[1]);
    //查找不到映射Bean和Method不做处理
    if (method == null || schema == null) {
        return;    }
    try {
        long begin = System.currentTimeMillis();
        logger.info("integrate data:{},{}", path, obj);
        method.invoke(schema, new Object[]{obj});
        logger.info("integrate data consume: {}ms:", System.currentTimeMillis() - begin);
    } catch (Exception e) {
        logger.error("调用组合逻辑异常", e);
        throw new Exception(e.getCause());
    }
}скопировать код

Обработка сообщений объединения данных:

@Schema("demo_user")public class UserService {
    @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE})
    public void saveUser_UserInfo(DBObject userInfo) {
        String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString();
        DBCollection collection = completeMongoTemplate.getCollection("user");
        DBObject queryObject = new BasicDBObject("user_no", userNo);
        DBObject user = collection.findOne(queryObject);
        if (user == null) {
            user = new BasicDBObject();
            user.put("user_no", userNo);
            user.put("userInfo", userInfo);
            collection.insert(user);
        } else {                        DBObject updateObj = new BasicDBObject("userInfo", userInfo);
            DBObject update = new BasicDBObject("$set", updateObj);
            collection.update(queryObject, update);
        }
    }
}скопировать код

Пример исходного кода

https://github.com/zhangtr/canal-mongo

Первоисточник: http://www.torry.top/2017/10/22/canal-mongodb/

Рекомендуемое чтение