Как спроектировать и реализовать пул соединений с базой данных?

база данных
Как спроектировать и реализовать пул соединений с базой данных?

Миссия пула соединений!

Будь то пул потоков или пул соединений с базой данных, все они имеют общую функцию:Повторное использование ресурсов, в обычном сценарии мы используем соединение, его жизненный цикл может быть таким:

Соединение, от создания до уничтожения, используется только один раз (здесь один раз относится к использованию в одной области видимости), когда цикл заканчивается, другому вызывающему объекту все еще нужно это соединение для выполнения каких-либо действий, и этот опыт необходимо повторить. жизненного цикла. Поскольку для создания и уничтожения требуется соответствующее время потребления службы и системные ресурсы для обработки, это не только тратит много системных ресурсов, но и приводит к тому, что часть времени тратится в процессе бизнес-ответа на повторное создание и уничтожение. миссия решить эту проблему!

Что должен делать пул соединений?

Как следует из названия, пул соединенийбассейнСлово ярко прояснило свое предназначение, оно употреблено для того, чтобы свести все связи в одну"池子"По сравнению с исходным жизненным циклом пул соединений имеет следующие особенности:

  • Create на самом деле не создает, а выбирает из пула неиспользуемые соединения.
  • Уничтожение на самом деле не уничтожает, а возвращает используемое соединение обратно в пул (логическое закрытие).
  • Реальное создание и уничтожение определяются механизмом функций пула потоков.

Следовательно, при использовании пула соединений наш жизненный цикл использования соединения будет развиваться следующим образом:

План анализа

Некромантия - Портал:GitHub.com/Люблю тебя, Лили/Хорошо…, DEMO реализовано на языке Java!

Предварительно нам нужно закурить, чтобы проанализировать, что должен делать пул соединений:

  • Необходим контейнер для сохранения соединений, кроме того, контейнер должен поддерживать добавление и удаление соединений и обеспечивать потокобезопасность.
  • Нам нужно внести логические коррективы в уничтожение соединения, нам нужно переопределить егоcloseа такжеisClosedметод.
  • Нам нужна запись для управления пулом соединений, например для повторного использования неиспользуемых соединений.

Пул соединений предназначен не только дляConnectionДля контроля жизненного цикла также следует добавить некоторые функции, такие как начальное количество подключений, максимальное количество подключений, минимальное количество подключений, максимальное время простоя и время ожидания получения подключения. также просто поддержите их.

Имея четкую цель, приступайте к работе.

Выбор контейнера пула соединений

Чтобы обеспечить потокобезопасность, мы можем стремиться кJUCВолшебные силы под пакетом, пусть контейнер, который мы хотим, будетx,ТакxОн должен не только соответствовать базовым функциям добавления, удаления, изменения и проверки, но и обеспечивать функцию тайм-аута получения, чтобы гарантировать, что при отсутствии незанятого соединения в пуле в течение длительного времени служба будет отключена. не будет заблокирован и будет немедленно взорван. Кроме того,xДолжна быть обеспечена двусторонняя работа, это необходимо для того, чтобы пул соединений мог выявлять загруженные незанятые соединения и облегчать операции повторного использования.

В итоге,LinkedBlockingDequeнаиболее подходящий вариант, он используетInterruptibleReentrantLockДля обеспечения безопасности потоков используйтеConditionВыполнять блокировку захватных элементов, а также поддерживать двунаправленные операции.

Также мы можем разделить пул соединений на 3 типа:

  • Рабочий пул: Сохраняет используемое соединение.
  • бесплатный бассейн: Сохраняет незанятые соединения.
  • перерабатывающий бассейн: Соединение, которое было восстановлено (физически закрыто).

в,Рабочий пулиперерабатывающий бассейнНет необходимости использовать двустороннее выравнивание, возможно, одностороннюю очередь илиSetможно заменить на:

private LinkedBlockingQueue<HoneycombConnection> workQueue;
private LinkedBlockingDeque<HoneycombConnection> idleQueue;
private LinkedBlockingQueue<HoneycombConnection> freezeQueue;

Оформление соединения

Выход пула соединенийConnection, который представляет соединение с базой данных. После того, как вышестоящая служба использует его для завершения операции, она напрямую вызовет свойcloseспособ разорвать соединение, и что нам нужно сделать, так это изменить логику его закрытия без ведома вызывающего, когда вызовcloseметод, мы помещаем его обратно в очередь простоя, чтобы обеспечить возможность его повторного использования!

Поэтому нам нужноConnectionЧтобы сделать украшение, метод очень простой, но очень утомительный, вот новый класс для достиженияConnectionИнтерфейс, реализующий **"редактируемый"** за счет переопределения всех методов.Connection, мы называем егоConnectionДекоратор:

public class HoneycombConnectionDecorator implements Connection{

    protected Connection connection;
    
    protected HoneycombConnectionDecorator(Connection connection) {
        this.connection = connection;
    }
    
    此处省略对方法实现的三百行代码...
}

После этого нам нужно создать новый собственныйConnectionЧтобы наследовать этот декоратор и переопределить соответствующий метод:

public class HoneycombConnection extends HoneycombConnectionDecorator implements HoneycombConnectionSwitcher{
    @Override
    public void close() { do some things }

    @Override
    public boolean isClosed() throws SQLException { do some things }    
    
    省略...
}

Переопределение источника данных

DataSourceЭто спецификация, определенная JDK для лучшей интеграции и управления источниками данных, для получения записи о соединениях, чтобы мы могли лучше расширять источники данных (например, добавлять специальные атрибуты) на этом уровне и делать наш пул соединений более функциональным. , богатый, нам нужно реализовать один из наших собственныхDataSourceможет:

public class HoneycombWrapperDatasource implements DataSource{
    protected HoneycombDatasourceConfig config;
    省略其它方法的实现...
    @Override
    public Connection getConnection() throws SQLException {
        return DriverManager.getConnection(config.getUrl(), config.getUser(), config.getPassword());
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return DriverManager.getConnection(config.getUrl(), username, password);
    }
    省略其它方法的实现...
}

Мы завершили реализацию источника данных, но способ получить соединение здесь — физическое создание, нам нужно выполнить цель объединения и нужно переписатьHoneycombWrapperDatasourceЛогика получения соединения в методе заключается в создании нового класса для переопределения метода родительского класса:

public class HoneycombDataSource extends HoneycombWrapperDatasource{
    private HoneycombConnectionPool pool;
    @Override
    public Connection getConnection() throws SQLException {
        这里实现从pool中取出连接的逻辑
    }
    省略...
}

Расширение функции

При текущей структурной системе наш пул соединений постепенно появился, но этого далеко недостаточно, чтобы мы могли свободно расширяться в рамках этой структуры, чтобы пул соединений мог более гибко контролировать соединения, поэтому мы можем ввестихарактеристикаЭта концепция позволяет нам получить доступ к пулу соединений внутри него и выполнить ряд расширенных операций с пулом соединений:

public abstract class AbstractFeature{
    public abstract void doing(HoneycombConnectionPool pool);
}

AbstractFeatureАбстрактный родительский класс должен быть реализованdoingвнутри метода мы можем управлять пулом соединений, типичным примером которого является переработка простаивающих соединений в пуле:

public class CleanerFeature extends AbstractFeature{
    @Override
    public void doing(HoneycombConnectionPool pool) {
        这里做空闲连接的回收
    }
}

выполнить план

После приведенного выше анализа для создания пула соединений требуется взаимодействие этих модулей.Общий процесс выглядит следующим образом:

Шаг 1. Задайте свойства источника данных

в инициализацииDataSourceРаньше нам нужно было установить каждое свойство, здесь мы используемHoneycombWrapperDatasourceсерединаHoneycombDatasourceConfigнести свойства:

public class HoneycombDatasourceConfig {

    //db url
    private String url;

    //db user
    private String user;

    //db password
    private String password;

    //driver驱动
    private String driver;

    //初始化连接数,默认为2
    private int initialPoolSize = 2;

    //最大连接数,默认为10
    private int maxPoolSize = 10;

    //最小连接数,默认为2
    private int minPoolSize = 2;
    
    //获取连接时,最大等待时长,默认为60s
    private long maxWaitTime = 60 * 1000;

    //最大空闲时长,超出要被回收,默认为20s
    private long maxIdleTime = 20 * 1000;
    
    //特性列表
    private List<AbstractFeature> features;
    
    public HoneycombDatasourceConfig() {
        features = new ArrayList<AbstractFeature>(5);
    }
    
    省略getter、setter....

Шаг 2. Инициализируйте пул соединений

После настройки свойств нам нужно завершить инициализацию пула соединений.HoneycombDataSourceизinitРеализовано в методе:

private void init() throws ClassNotFoundException, SQLException {
    //阻塞其他线程初始化操作,等待初始化完成
    if(initialStarted || ! (initialStarted = ! initialStarted)) {
        if(! initialFinished) {
            try {
                INITIAL_LOCK.lock();
                INITIAL_CONDITION.await();
            } catch (InterruptedException e) {
            } finally {
                INITIAL_LOCK.unlock();
            }
        }
        return;
    }
    
    //config参数校验
    config.assertSelf();
    
    Class.forName(getDriver());
    
    //实例化线程池
    pool = new HoneycombConnectionPool(config);
    
    //初始化最小连接
    Integer index = null;
    for(int i = 0; i < config.getInitialPoolSize(); i ++) {
        if((index =  pool.applyIndex()) != null) {
            pool.putLeisureConnection(createNativeConnection(pool), index);
        }
    }
    
    //触发特性
    pool.touchFeatures();
    
    //完成初始化并唤醒其他阻塞
    initialFinished = true;
    try {
        INITIAL_LOCK.lock();
        INITIAL_CONDITION.signalAll();
    }catch(Exception e) {
    }finally {
        INITIAL_LOCK.unlock();
    }
}

Шаг 3. Создайте начальное подключение

существуетinitметод, еслиinitialPoolSizeЕсли оно больше 0, будет создано и помещено в пул соединений указанное количество физических подключений.Количество созданных подключений должно быть меньше максимального количества подключений.maxPoolSize:

public HoneycombConnection createNativeConnection(HoneycombConnectionPool pool) throws SQLException {
    return new HoneycombConnection(super.getConnection(), pool);
}

После завершения инициализации следующим шагом будет получение соединения.

Шаг 4: Получите из бесплатного пула

Ранее мы разделили пул соединений на три, которыебесплатный бассейн,Рабочий пулиперерабатывающий бассейн.

мы можем пройтиHoneycombDataSourceизgetConnectionспособ получения соединений, когда нам нужно получить, первое соображение заключается в том, есть ли незанятые соединения в пуле бездействия, что позволяет избежать создания и активации новых соединений:

@Override
public Connection getConnection() throws SQLException {
    try {
    	//初始化连接池
        init();
    } catch (ClassNotFoundException e) {
        throw new RuntimeException(e);
    }
    
    HoneycombConnection cn = null;
    Integer index = null;
    
    if(pool.assignable()) {
    	//空闲池可分配,从空闲池取出
        cn = pool.getIdleConnection();
    }else if(pool.actionable()) {
    	//回收池可分配,从回收池取出
        cn = pool.getFreezeConnection();
    }else if((index =  pool.applyIndex()) != null) {
    	//如果连接数未满,创建新的物理连接
        cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
    }
    
    if(cn == null) {
    	//如果无法获取连接,阻塞等待空闲池连接
        cn = pool.getIdleConnection();
    }
    
    if(cn.isClosedActive()) {
    	//如果物理连接关闭,则获取新的连接
        cn.setConnection(super.getConnection());
    }
    return cn;
}

Шаг 5: Получите его из пула вторичной переработки

Если свободный пул не может быть выделен, это означает, что поставки соединений в дефиците.Возможно, какие-то простаивающие соединения были рекультивированы (физически закрыты) ранее, поэтому перед созданием нового соединения мы можем перейти в пул повторного использования, чтобы увидеть, если есть какие-либо исправленные соединения, и удалите их напрямую, если они существуют:

else if(pool.actionable()) {
	//回收池可分配,从回收池取出
    cn = pool.getFreezeConnection();
}

Шаг 6: Создайте новое подключение

Если рециркуляционный пул тоже нераспределяемый, необходимо определить, достигло ли количество соединений в пуле соединений максимальное количество соединений, если нет, то создать новое физическое соединение и добавить его непосредственно в рабочий пул:

else if((index =  pool.applyIndex()) != null) {
	//如果连接数未满,创建新的物理连接,添加到工作池
    cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
}

Шаг 7: Дождитесь подключения из бесплатного пула

Если ни одно из трех вышеперечисленных условий не выполнено, то остается только ждать освобождения остальных подключений из свободного пула:

if(cn == null) {
	//如果无法获取连接,阻塞等待空闲池连接
    cn = pool.getIdleConnection();
}

Конкретная логика заключена вHoneycombConnectionPoolизgetIdleConnectionВ методе:

public HoneycombConnection getIdleConnection() {
    try {
    	//获取最大等待时间
        long waitTime = config.getMaxWaitTime();
        while(waitTime > 0) {
            long beginPollNanoTime = System.nanoTime();
            
            //设置超时时间,阻塞等待其他连接的释放
            HoneycombConnection nc = idleQueue.poll(waitTime, TimeUnit.MILLISECONDS);
            if(nc != null) {
            	//状态转换
                if(nc.isClosed() && nc.switchOccupied() && working(nc)) {
                    return nc;
                }
            }
            long timeConsuming = (System.nanoTime() - beginPollNanoTime) / (1000 * 1000);
            
            //也许在超时时间内获取到了连接,但是状态转换失败,此时刷新超时时间
            waitTime -= timeConsuming;
        }
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
    }
    throw new RuntimeException("获取连接超时");
}

Шаг 8: Активируйте соединение

Наконец, определите, закрыто ли соединение физически.Если это так, нам нужно открыть новое соединение, чтобы заменить соединение, которое было переработано:

if(cn.isClosedActive()) {
	//如果物理连接关闭,则获取新的连接
    cn.setConnection(super.getConnection());
}

переработка соединения

Если объем нашего бизнеса резко возрастет в течение определенного периода времени, будет много связей, которые должны работать одновременно.Через некоторое время объем нашего бизнеса падает, а ранее созданные связи явно насыщаются.AbstractFeatureПул соединений входящей операции.

Для повторного использования этой операции мы передаемCleanerFeatureреализовать:

public class CleanerFeature extends AbstractFeature{

    private Logger logger = LoggerFactory.getLogger(CleanerFeature.class);

    public CleanerFeature(boolean enable, long interval) {
       //enable表示是否启用
       //interval表示扫描间隔
       super(enable, interval);
    }

    @Override
    public void doing(HoneycombConnectionPool pool) {
        LinkedBlockingDeque<HoneycombConnection> idleQueue = pool.getIdleQueue();
        Thread t = new Thread() {
            @Override
            public void run() {
                while(true) {
                    try {
                        //回收扫描间隔
                    	Thread.sleep(interval);
                        
                    	//回收时,空闲池上锁
                        synchronized (idleQueue) {
                            logger.debug("Cleaner Model To Start {}", idleQueue.size());
                            //回收操作
                            idleQueue.stream().filter(c -> { return c.idleTime() > pool.getConfig().getMaxIdleTime(); }).forEach(c -> {
                                try {
                                    if(! c.isClosedActive() && c.idle()) {
                                        c.closeActive();
                                        pool.freeze(c);
                                    }
                                } catch (SQLException e) {
                                    e.printStackTrace();
                                } 
                            });
                            logger.debug("Cleaner Model To Finished {}", idleQueue.size());
                        }
                    }catch(Throwable e) {
                        logger.error("Cleaner happended error", e);
                    }
                }
            }
        };
        t.setDaemon(true);
        t.start();
    }
}

Операция здесь очень проста. Заблокируйте пул бездействия, просканируйте все соединения и освободите соединения, время простоя которых превышает установленное максимальное время простоя. Обновляем соединение, когда оно помещается в пул простоя. Его точка времени простоя, тогда текущее время простоя равно текущему времени минус время начала простоя:

idleTime = nowTime - idleStartTime

Обновить время начала простоя, когда состояние переключения находится в режиме ожидания:

 @Override
public boolean switchIdle() {
    return unsafe.compareAndSwapObject(this, statusOffset, status, ConnectionStatus.IDLE) && flushIdleStartTime();
}

пройти тест

Самый быстрый способ испытать результаты — использовать его, вот опыт модульного тестирования:

static ThreadPoolExecutor tpe = new ThreadPoolExecutor(1000, 1000, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    
@Test
public void testConcurrence() throws SQLException, InterruptedException{
    long start = System.currentTimeMillis();
    HoneycombDataSource dataSource = new HoneycombDataSource();
    dataSource.setUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&transformedBitIsBoolean=true&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=Asia/Shanghai");
    dataSource.setUser("root");
    dataSource.setPassword("root");
    dataSource.setDriver("com.mysql.cj.jdbc.Driver");
    dataSource.setMaxPoolSize(50);
    dataSource.setInitialPoolSize(10);
    dataSource.setMinPoolSize(10);
    dataSource.setMaxWaitTime(60 * 1000);
    dataSource.setMaxIdleTime(10 * 1000);
    dataSource.addFeature(new CleanerFeature(true, 5 * 1000));
    
    test(dataSource, 10000);
    System.out.println(System.currentTimeMillis() - start + " ms");
}

public static void test(DataSource dataSource, int count) throws SQLException, InterruptedException {
    CountDownLatch cdl = new CountDownLatch(count);
    for(int i = 0; i < count; i ++) {
        tpe.execute(() -> {
            try {
                HoneycombConnection connection = (HoneycombConnection) dataSource.getConnection();
                Statement s = connection.createStatement();
                s.executeQuery("select * from test limit 1");
                connection.close();
            }catch(Exception e) {
            }finally {
                cdl.countDown();
            }
        });
    }
    cdl.await();
    tpe.shutdown();
}

Конфигурация ПК:ЦП Intel(R) Core(TM) i5-8300H с тактовой частотой 2,30 ГГц 2,30 ГГц 4-ядерный 8G 512SSD

10000 запросов, отнимающих много времени:

938 ms

Вывод: снова вызовите портал:GitHub.com/Люблю тебя, Лили/Хорошо…