Шаблоны проектирования | Шаблоны посредников и типичные приложения

база данных MySQL Шаблоны проектирования Elasticsearch

Основное содержание этой статьи:

  • Знакомство с шаблоном посредника
  • Пример синхронизации данных
  • Резюме промежуточного шаблона
  • Типичное применение шаблона посредника анализа исходного кода
    • Шаблон посредника в таймере Java

Больше материалов можно найти в моем личном блоге:laijianfeng.org
Обратите внимание на общедоступную учетную запись [Xiao Xuanfeng] WeChat и своевременно получайте сообщения в блогах.

长按关注【小旋锋】微信公众号

модель посредника

В мире существуют различные базы данных, и разные базы данных имеют свои собственные сценарии применения.Для одних и тех же данных сначала может использоваться реляционная база данных (например, MySQL) для хранения и запросов, а Redis используется в качестве базы данных кэширования. объем данных велик Запросы с использованием MySQL могут быть медленными, поэтому необходимо синхронизировать данные с Elasticsearch или столбцовой базой данных, такой как Hbase, для запросов больших данных.

Как разработать схему синхронизации данных— важный вопрос. Есть много источников данных и много целей, и если дизайн не хорош, он может «влиять на все тело».

Если спроектировать так: каждый источник данных напрямую синхронизирует данные с целевой базой данных, если баз данных N, то максимально возможные задания синхронизации достигнутN * N, когда какая-либо конфигурация одной из баз данных изменяется, может потребоваться изменение другойN - 1Задание синхронизации базы данных.

Теперь представьте еще одно решение. DataX — это широко используемый автономный инструмент/платформа синхронизации данных в Alibaba Group, включая MySQL, Oracle, SqlServer, Postgre, HDFS, Hive, ADS, HBase, TableStore (OTS), MaxCompute (ODPS), DRDS и другие эффективные функции синхронизации данных между различными разнородными источниками данных.

DataX

DataX на самом деле является посредником.Он считывает данные из источника данных и записывает их в цель.Источнику данных больше не нужно поддерживать задание синхронизации с целью, а нужно только взаимодействовать с DataX. DataX воплощает идею паттерна медиатора.

Паттерн посредника: Используйте промежуточный объект (посредник) для инкапсуляции серии взаимодействий объектов.Посредник делает так, что объектам не нужно явно ссылаться друг на друга, так что связь является слабой, и взаимодействие между ними может быть изменено независимо. Режим посредника, также известный как режим посредника, представляет собой поведенческий режим объекта.

Роль

Медиатор (абстрактный медиатор): определяет интерфейс, который используется для связи с различными объектами-коллегами.

ConcreteMediator (бетонный посредник): это подкласс абстрактного посредника, который реализует совместное поведение путем координации различных объектов коллеги и поддерживает ссылку на каждый объект коллеги.

Коллега (абстрактный класс коллег): он определяет общедоступные методы каждого класса-коллеги и объявляет некоторые абстрактные методы для реализации подклассами.В то же время он поддерживает ссылку на абстрактный класс-посредник, и его подклассы могут взаимодействовать с посредником через эту ссылку.

ConcreteColleague (конкретный класс коллеги): это подкласс абстрактного класса коллеги; когда каждому объекту коллеги необходимо взаимодействовать с другими объектами коллеги, он сначала связывается с посредником и косвенно завершает связь с другими классами коллег через посредника; абстрактный метод, объявленный в абстрактном коллега класс.

Суть режима посредника заключается во введении класса посредника.Промежуточный класс берет на себя две обязанности:

  • Транзит (структурный): благодаря функции передачи, предоставляемой посредником, каждому объекту-коллеге больше не нужно явно ссылаться на других коллег. Когда ему необходимо общаться с другими коллегами, косвенные вызовы могут быть реализованы через посредника. Эта транзитная роль принадлежит структурной поддержке посредника.
  • Координация (поведенческая): посредник может дополнительно инкапсулировать отношения между коллегами, а коллеги могут взаимодействовать с посредником согласованным образом, не указывая, что посредник должен делать.Запрос далее обрабатывается для разделения и инкапсуляции поведения отношений между коллегами.

Пример

Реализуем упрощенный вариант схемы синхронизации данных, есть три базы данных: Mysql, Redis и Elasticsearch, основная база данных Mysql, при добавлении части данныхнужноСинхронизируется с двумя другими базами данных; Redis действует как кеширующая база данных при добавлении фрагмента данных.ненужныйСинхронизируется с другой базой данных, в то время как Elasticsearch, как база данных запросов больших данных, имеет статистическую функцию при добавлении фрагмента данных.нужно всего лишьСинхронизированы с Mysql, поэтому схема взаимосвязей между ними показана ниже.

简化的数据同步需求

Во-первых, давайте реализуем первыйНе используйСхема синхронизации данных в режиме посредника, каждый источник данных поддерживает собственное задание синхронизации.

абстрактная база данных

public abstract class AbstractDatabase {
    public abstract void add(String data);

    public abstract void addData(String data);
}

Специальная база данных Mysql поддерживает задания синхронизации, синхронизированные с Redis и Elasticsearch.

public class MysqlDatabase extends AbstractDatabase {
    private List<String> dataset = new ArrayList<String>();
    @Setter
    private RedisDatabase redisDatabase;
    @Setter
    private EsDatabase esDatabase;

    @Override
    public void addData(String data) {
        System.out.println("Mysql 添加数据:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data);
        this.redisDatabase.addData(data);   // 维护同步到Redis的同步作业
        this.esDatabase.addData(data);  // 维护同步到Elasticsearch的同步作业
    }

    public void select() {
        System.out.println("- Mysql 查询,数据:" + this.dataset.toString());
    }
}

Конкретная база данных Redis, нет необходимости синхронизации с другими базами данных

public class RedisDatabase extends AbstractDatabase {
    private List<String> dataset = new LinkedList<String>();

    @Override
    public void addData(String data) {
        System.out.println("Redis 添加数据:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data); // 不同步到其它数据库
    }

    public void cache() {
        System.out.println("- Redis 缓存的数据:" + this.dataset.toString());
    }
}

Elasticsearch, просто нужно синхронизироваться с Mysql

public class EsDatabase extends AbstractDatabase {
    private List<String> dataset = new CopyOnWriteArrayList<String>();
    @Setter
    private MysqlDatabase mysqlDatabase;
    @Override
    public void addData(String data) {
        System.out.println("ES 添加数据:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data);
        this.mysqlDatabase.addData(data);   // 维护同步到MySQL的同步作业
    }

    public void count() {
        int count = this.dataset.size();
        System.out.println("- Elasticsearch 统计,目前有 " + count + " 条数据,数据:" + this.dataset.toString());
    }
}

Протестируйте клиент, добавьте некоторые данные в три базы данных, чтобы проверить эффект синхронизации.

public class Client {
    public static void main(String[] args) {
        MysqlDatabase mysqlDatabase = new MysqlDatabase();
        RedisDatabase redisDatabase = new RedisDatabase();
        EsDatabase esDatabase = new EsDatabase();

        mysqlDatabase.setRedisDatabase(redisDatabase);
        mysqlDatabase.setEsDatabase(esDatabase);
        esDatabase.setMysqlDatabase(mysqlDatabase);

        System.out.println("\n---------mysql 添加数据 1,将同步到Redis和ES中-----------");
        mysqlDatabase.add("1");

        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();

        System.out.println("\n---------Redis添加数据 2,将不同步到其它数据库-----------");
        redisDatabase.add("2");

        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();

        System.out.println("\n---------ES 添加数据 3,只同步到 Mysql-----------");
        esDatabase.add("3");

        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();
    }
}

выходной результат

---------mysql 添加数据 1,将同步到Redis和ES中-----------
Mysql 添加数据:1
Redis 添加数据:1
ES 添加数据:1
- Mysql 查询,数据:[1]
- Redis 缓存的数据:[1]
- Elasticsearch 统计,目前有 1 条数据,数据:[1]

---------Redis添加数据 2,将不同步到其它数据库-----------
Redis 添加数据:2
- Mysql 查询,数据:[1]
- Redis 缓存的数据:[1, 2]
- Elasticsearch 统计,目前有 1 条数据,数据:[1]

---------ES 添加数据 3,只同步到 Mysql-----------
ES 添加数据:3
Mysql 添加数据:3
- Mysql 查询,数据:[1, 3]
- Redis 缓存的数据:[1, 2]
- Elasticsearch 统计,目前有 2 条数据,数据:[1, 3]

На самом деле, это удовлетворило наши потребности, ноесть некоторые проблемы:

  • Структура системы сложная, а степень связи высокая.. Источник данных должен поддерживать ссылку на целевую базу данных, чтобы завершить синхронизацию данных.
  • Плохая возможность повторного использования компонентов. Из-за сильной связи между каждым источником данных и целью без поддержки цели этот компонент трудно повторно использовать в другой системе или модуле.
  • Плохая масштабируемость системы: Если вам нужно добавить, изменить или удалить одну из баз данных, потребуется изменить исходный код нескольких классов, что нарушает принцип «открыто-закрыто» и имеет плохую масштабируемость и гибкость.

насРефакторинг с использованием шаблона медиатора, Перенесите функцию синхронизации данных на посредника, а посредник управляет заданием синхронизации данных

Во-первых, это все-таки абстрактный класс базы данных (абстрактный класс-коллега), который поддерживает промежуточный

public abstract class AbstractDatabase {
    public static final String MYSQL = "mysql";
    public static final String REDIS = "redis";
    public static final String ELASTICSEARCH = "elasticsearch";

    protected AbstractMediator mediator;    // 中介者

    public AbstractDatabase(AbstractMediator mediator) {
        this.mediator = mediator;
    }

    public abstract void addData(String data);

    public abstract void add(String data);
}

База данных Mysql (конкретный класс коллег)

public class MysqlDatabase extends AbstractDatabase {
    private List<String> dataset = new ArrayList<String>();

    public MysqlDatabase(AbstractMediator mediator) {
        super(mediator);
    }

    @Override
    public void addData(String data) {
        System.out.println("Mysql 添加数据:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data);
        this.mediator.sync(AbstractDatabase.MYSQL, data); // 数据同步作业交给中介者管理
    }

    public void select() {
        System.out.println("Mysql 查询,数据:" + this.dataset.toString());
    }
}

База данных Redis (конкретный класс коллег)

public class RedisDatabase extends AbstractDatabase {
    private List<String> dataset = new LinkedList<String>();

    public RedisDatabase(AbstractMediator mediator) {
        super(mediator);
    }

    @Override
    public void addData(String data) {
        System.out.println("Redis 添加数据:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data);
        this.mediator.sync(AbstractDatabase.REDIS, data);    // 数据同步作业交给中介者管理
    }

    public void cache() {
        System.out.println("Redis 缓存的数据:" + this.dataset.toString());
    }
}

Elasticsearch (конкретный коллегиальный класс)

public class EsDatabase extends AbstractDatabase {
    private List<String> dataset = new CopyOnWriteArrayList<String>();

    public EsDatabase(AbstractMediator mediator) {
        super(mediator);
    }

    @Override
    public void addData(String data) {
        System.out.println("ES 添加数据:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data);
        this.mediator.sync(AbstractDatabase.ELASTICSEARCH, data);    // 数据同步作业交给中介者管理
    }

    public void count() {
        int count = this.dataset.size();
        System.out.println("Elasticsearch 统计,目前有 " + count + " 条数据,数据:" + this.dataset.toString());
    }
}

абстрактный посредник

@Data
public abstract class AbstractMediator {
    protected MysqlDatabase mysqlDatabase;
    protected RedisDatabase redisDatabase;
    protected EsDatabase esDatabase;

    public abstract void sync(String databaseName, String data);
}

конкретный посредник

public class SyncMediator extends AbstractMediator {
    @Override
    public void sync(String databaseName, String data) {
        if (AbstractDatabase.MYSQL.equals(databaseName)) {
            // mysql 同步到 redis 和 Elasticsearch
            this.redisDatabase.addData(data);
            this.esDatabase.addData(data);
        } else if (AbstractDatabase.REDIS.equals(databaseName)) {
            // redis 缓存同步,不需要同步到其他数据库
        } else if (AbstractDatabase.ELASTICSEARCH.equals(databaseName)) {
            // Elasticsearch 同步到 Mysql
            this.mysqlDatabase.addData(data);
        }
    }
}

тестовый клиент

public class Client {
    public static void main(String[] args) {
        AbstractMediator syncMediator = new SyncMediator();
        MysqlDatabase mysqlDatabase = new MysqlDatabase(syncMediator);
        RedisDatabase redisDatabase = new RedisDatabase(syncMediator);
        EsDatabase esDatabase = new EsDatabase(syncMediator);

        syncMediator.setMysqlDatabase(mysqlDatabase);
        syncMediator.setRedisDatabase(redisDatabase);
        syncMediator.setEsDatabase(esDatabase);

        System.out.println("\n---------mysql 添加数据 1,将同步到Redis和ES中-----------");
        mysqlDatabase.add("1");
        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();

        System.out.println("\n---------Redis添加数据 2,将不同步到其它数据库-----------");
        redisDatabase.add("2");
        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();

        System.out.println("\n---------ES 添加数据 3,只同步到 Mysql-----------");
        esDatabase.add("3");
        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();
    }
}

Результат, как и ожидалось

---------mysql 添加数据 1,将同步到Redis和ES中-----------
Mysql 添加数据:1
Redis 添加数据:1
ES 添加数据:1
- Mysql 查询,数据:[1]
- Redis 缓存的数据:[1]
- Elasticsearch 统计,目前有 1 条数据,数据:[1]

---------Redis添加数据 2,将不同步到其它数据库-----------
Redis 添加数据:2
- Mysql 查询,数据:[1]
- Redis 缓存的数据:[1, 2]
- Elasticsearch 统计,目前有 1 条数据,数据:[1]

---------ES 添加数据 3,只同步到 Mysql-----------
ES 添加数据:3
Mysql 添加数据:3
- Mysql 查询,数据:[1, 3]
- Redis 缓存的数据:[1, 2]
- Elasticsearch 统计,目前有 2 条数据,数据:[1, 3]

Нарисуйте диаграмму классов следующим образом

示例.中介者模式

Резюме промежуточного шаблона

Основные преимущества модели посредника

  • модель посредникаУпрощенное взаимодействие между объектами, он заменяет взаимодействие «многие ко многим» между первоначальными коллегами взаимодействием «один ко многим» между посредником и коллегами, а отношения «один ко многим» легче понять, поддерживать и расширять, а также преобразовывать изначально непонятные сетевую структуру в относительно простую структуру типа звезды.

  • Модель посредника можетРазделение объектов коллеги. Медиатор способствует ослаблению связи между коллегами. Мы можем самостоятельно изменять и повторно использовать каждого коллегу и медиатора. Удобнее добавлять новые медиаторы и новые классы коллег, и это лучше соответствует принципу «открыто-закрыто».

  • МогуУменьшить генерацию подклассов, посредник агрегирует поведение, изначально распределенное между несколькими объектами. Чтобы изменить это поведение, вам нужно всего лишь создать новый подкласс посредника, который позволяет повторно использовать каждый класс коллеги без расширения класса коллеги.

Основные недостатки шаблона посредника

  • существуетКонкретный класс посредника содержит большое количество деталей взаимодействия между коллегами., может привести к тому, что конкретный класс посредника будет очень сложным, что затруднит обслуживание системы. (То есть сложность взаимодействия между конкретными классами коллег сосредоточена в классе посредника, и в результате посредник становится самым сложным классом)

Применимая сцена

  • Между объектами в системе существуют сложные референсные отношения, а структура системы хаотична и трудна для понимания.

  • Объект трудно использовать повторно, потому что он ссылается на множество других объектов и напрямую взаимодействует с этими объектами.

  • Хотите инкапсулировать поведение в нескольких классах через промежуточный класс, не создавая слишком много подклассов. Этого можно добиться введением класса-посредника, в котором определяется общедоступное поведение взаимодействия объектов, и могут быть добавлены новые конкретные классы-посредники, если поведение необходимо изменить.

Типичные применения шаблона медиатора

Шаблон посредника в таймере Java

постучать одинjava.util.TimerДемо

два класса задач

public class MyOneTask extends TimerTask {
    private static int num = 0;
    @Override
    public void run() {
        System.out.println("I'm MyOneTask " + ++num);
    }
}

public class MyTwoTask extends TimerTask {
    private static int num = 1000;
    @Override
    public void run() {
        System.out.println("I'm MyTwoTask " + num--);
    }
}

Тест клиента, начало выполнения через 3 секунды, период цикла 1 секунда

public class TimerTest {
    public static void main(String[] args) {
        // 注意:多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,
        // 其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题
        Timer timer = new Timer();
        timer.schedule(new MyOneTask(), 3000, 1000); // 3秒后开始运行,循环周期为 1秒
        timer.schedule(new MyTwoTask(), 3000, 1000);
    }
}

выход

I'm MyOneTask 1
I'm MyTwoTask 1000
I'm MyTwoTask 999
I'm MyOneTask 2
I'm MyOneTask 3
I'm MyTwoTask 998
I'm MyTwoTask 997
I'm MyOneTask 4
I'm MyOneTask 5
I'm MyTwoTask 996
I'm MyTwoTask 995
I'm MyOneTask 6
...

TimerЧасть исходного кода ключа выглядит следующим образом

public class Timer {

    private final TaskQueue queue = new TaskQueue();
    private final TimerThread thread = new TimerThread(queue);
    
    public void schedule(TimerTask task, long delay) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        sched(task, System.currentTimeMillis()+delay, 0);
    }
    
    public void schedule(TimerTask task, Date time) {
        sched(task, time.getTime(), 0);
    }
    
    private void sched(TimerTask task, long time, long period) {
        if (time < 0)
            throw new IllegalArgumentException("Illegal execution time.");

        if (Math.abs(period) > (Long.MAX_VALUE >> 1))
            period >>= 1;

        // 获取任务队列的锁(同一个线程多次获取这个锁并不会被阻塞,不同线程获取时才可能被阻塞)
        synchronized(queue) {
            // 如果定时调度线程已经终止了,则抛出异常结束
            if (!thread.newTasksMayBeScheduled)
                throw new IllegalStateException("Timer already cancelled.");

            // 再获取定时任务对象的锁(为什么还要再加这个锁呢?想不清)
            synchronized(task.lock) {
                // 判断线程的状态,防止多线程同时调度到一个任务时多次被加入任务队列
                if (task.state != TimerTask.VIRGIN)
                    throw new IllegalStateException(
                        "Task already scheduled or cancelled");
                // 初始化定时任务的下次执行时间
                task.nextExecutionTime = time;
                // 重复执行的间隔时间
                task.period = period;
                // 将定时任务的状态由TimerTask.VIRGIN(一个定时任务的初始化状态)设置为TimerTask.SCHEDULED
                task.state = TimerTask.SCHEDULED;
            }
            
            // 将任务加入任务队列
            queue.add(task);
            // 如果当前加入的任务是需要第一个被执行的(也就是他的下一次执行时间离现在最近)
            // 则唤醒等待queue的线程(对应到上面提到的queue.wait())
            if (queue.getMin() == task)
                queue.notify();
        }
    }
    
    // cancel会等到所有定时任务执行完后立刻终止定时线程
    public void cancel() {
        synchronized(queue) {
            thread.newTasksMayBeScheduled = false;
            queue.clear();
            queue.notify();  // In case queue was already empty.
        }
    }
    // ...
}

Timerв центреschedulexxxчерез методTaskQueueКоординировать различныеTimerTaskзадания на время,Timerявляется посредником,TimerTaskЭто абстрактный класс коллег, а задача, которую мы пишем сами, — это конкретный класс коллег.

TimerThreadдаTimerОпределение класса потока планирования времени в середине, этот класс всегда будет выполняться как поток для выполнения.Timerзадач в очереди задач.

TimerФункция этого посредника состоит в том, чтобыЗапланируйте различные задачи, которые мы пишем регулярно, добавьте задачу вTaskQueueочередь задач, дайтеTimerThreadВыполнить, отделить задачу от потока выполнения

Другие приложения шаблона посредника

  • java.util.concurrent.Executor#executeа такжеjava.util.concurrent.ExecutorService#submitа такжеTimer#scheduleаналогичный

  • В шаблоне MVC контроллер является посредником, управляющим уровнем модели в соответствии с запросом уровня представления.

Ссылаться на:
Лю Вэй: Шаблоны проектирования Java Edition
MOOC Java Design Patterns Интенсивный метод отладки + анализ памяти
Таймер таймера для интерпретации исходного кода серии java.util

постскриптум

Добро пожаловать, чтобы комментировать, пересылать, делиться, ваша поддержка - моя самая большая мотивация

Рекомендуемое чтение

Шаблоны проектирования | Простые фабричные шаблоны и типовые приложения
Шаблоны проектирования | Шаблоны фабричных методов и типичные приложения
Шаблоны проектирования | Абстрактные фабричные шаблоны и типовые приложения
Шаблоны проектирования | Шаблоны построителей и типичные приложения
Шаблоны проектирования | Шаблоны прототипов и типовые приложения
Шаблоны проектирования | Шаблоны внешнего вида и типичные приложения
Шаблоны проектирования | Шаблоны декораторов и типичные приложения
Шаблоны проектирования | Шаблоны адаптеров и типичные приложения
Шаблоны проектирования | Легковесные шаблоны и типичные приложения
Шаблоны проектирования | Комбинированные шаблоны и типичные приложения
Шаблоны проектирования | Шаблоны методов шаблонов и типичные приложения
Шаблоны проектирования | Шаблоны итераторов и типичные приложения
Шаблоны проектирования | Шаблоны стратегий и типичные приложения
Шаблоны проектирования | Шаблоны Observer и типичные приложения
Шаблоны проектирования | Меморандумные шаблоны и типовые приложения