Перенесете вас в пул соединений с базой данных

база данных
[原文链接](https://mp.weixin.qq.com/s/7wT_mw4uC0GuhhsJJIV0Pg)

Обзор

Роль пула соединений заключается в повышении производительности, сохранении созданного соединения в пуле и использовании созданного соединения напрямую для доступа к серверу при поступлении запроса. При этом исключается процесс создания и разрыва соединений (трехстороннее рукопожатие при установлении TCP-соединения и четырехстороннее рукопожатие при его разрыве), что повышает производительность.
Основной принцип построения пула соединений заключается в следующем:
(1) Установите объект пула соединений (запуск службы).
(2) Создайте начальное количество подключений (т. Е. IDLE) в соответствии с предварительно указанными параметрами.
(3) Для запроса доступа получите соединение непосредственно из пула соединений. Если в объекте пула соединений нет свободных соединений, и количество соединений не достигает максимума (т.е. максимальное количество активных соединений), создается новое соединение, если оно достигает максимума, устанавливается определенный тайм-аут для получения связи.
(4) Используйте соединение для доступа к услуге.
(5) Когда служба доступа завершена, соединение освобождается (освобожденное соединение в это время на самом деле не закрывается, а помещается в очередь бездействия. Если фактическое количество бездействующих соединений больше, чем начальное количество бездействующих соединений , соединение будет разорвано).
(6) Освободите объект пула соединений (во время остановки службы и технического обслуживания освободите объект пула соединений и отключите все соединения).

Проще говоря, под пулом соединений можно понимать конвейер один за другим, при простое трубопровода его можно вынимать и использовать, при этом можно и прокладывать новые трубопроводы (разумеется, предел максимальное количество подключений не может быть превышено). После использования труба становится свободной.

Наиболее часто используемые пулы соединений — это пулы соединений с базой данных и пулы соединений HTTP-клиента.Я также сам написал пулы соединений, такие как пулы соединений Thrift и пулы соединений, вставленные в очереди Rabbitmq.

Далее анализируется структура трех типичных пулов соединений.

пул соединений с базой данных

Во-первых, давайте проанализируем принципы проектирования и реализации пула соединений с базой данных. DBUtils относится к модулю реализации пула соединений с базой данных, который используется для подключения модуля DB-API 2, потокового соединения с базой данных и обеспечения безопасного и эффективного доступа модуля к базе данных. В этой статье в основном анализируется процесс PooledDB.

DBUtils.PooledDB реализует надежное, потокобезопасное, кэшированное, многократно используемое соединение с базой данных с использованием модуля DB-API 2.

На следующем рисунке показан рабочий процесс при использовании PooledDB:

В этой статье в основном рассматриваются выделенные соединения, то есть выделенные соединения с базой данных.При инициализации пула соединений необходимо указать такие параметры, как mincached, maxcached и maxconnections, которые представляют собой минимальное количество соединений в пуле соединений, максимальное количество соединений в пуле соединений и максимальное количество соединений, доступных системе, при этом параметр blocking указывает, следует ли блокировать ожидание получения соединения или возвращать исключение, когда соединение не может быть получено:

if not blocking:
    def wait():
        raise TooManyConnections
    self._condition.wait = wait

При инициализации пула соединений устанавливаются минкэшированные соединения, код выглядит следующим образом:

# Establish an initial number of idle database connections:
idle = [self.dedicated_connection() for i in range(mincached)]
while idle:
    idle.pop().close()

В нем есть метод close, взгляните на реализацию метода закрытия соединения:

def close(self):
    """Close the pooled dedicated connection."""
    # Instead of actually closing the connection,
    # return it to the pool for future reuse.
    if self._con:
        self._pool.cache(self._con)
        self._con = None

В основном, чтобы реализовать метод кэширования, посмотрите на конкретный код:

def cache(self, con):
    """Put a dedicated connection back into the idle cache."""
    self._condition.acquire()
    try:
        if not self._maxcached or len(self._idle_cache) < self._maxcached:
            con._reset(force=self._reset) # rollback possible transaction
            # the idle cache is not full, so put it there
            self._idle_cache.append(con) # append it to the idle cache
        else: # if the idle cache is already full,
            con.close() # then close the connection
        self._connections -= 1
        self._condition.notify()
    finally:
        self._condition.release()

Из приведенного выше кода видно, что close не закрывает соединение, а помещает соединение обратно в пул соединений, когда количество пулов соединений меньше maxcached, и закрывает соединение, когда оно больше этого значения. В то же время можно заметить, что транзакцию необходимо откатить, прежде чем поместить ее обратно в пул соединений, чтобы избежать выживших транзакций, которые не были зафиксированы при использовании пула соединений. Это гарантирует, что все соединения, входящие в пул соединений, доступны.

Процесс получения соединения аналогичен описанному выше: сначала устанавливается соединение из пула соединений, а в случае сбоя соединения устанавливается новое соединение:

# try to get a dedicated connection
    self._condition.acquire()
    try:
        while (self._maxconnections
                and self._connections >= self._maxconnections):
            self._condition.wait()
        # connection limit not reached, get a dedicated connection
        try: # first try to get it from the idle cache
            con = self._idle_cache.pop(0)
        except IndexError: # else get a fresh connection
            con = self.steady_connection()
        else:
            con._ping_check() # check connection
        con = PooledDedicatedDBConnection(self, con)
        self._connections += 1
    finally:
        self._condition.release()

Закрытие соединения аналогично процессу закрытия соединения после того, как mincached-соединения только что созданы.Когда количество пулов соединений меньше, чем maxcached, соединение помещается обратно в пул соединений, а когда оно больше этого значения, соединение закрыто.

Очередь RabbitMQ вставляется в пул соединений сообщений.

Асинхронная передача сообщений — это метод, обычно используемый в системах с высокой степенью параллелизма. И без очереди сообщений не обойтись. Частая вставка сообщений в очередь сообщений, установление соединения и разрыв соединения будут относительно большими накладными расходами. Таким образом, пул соединений можно использовать для повышения производительности системы.

Схема и реализация пула соединений выглядит следующим образом:

При получении соединения сначала получить соединение из очереди, если не получится, будет установлено новое соединение, если не удастся установить новое соединение, оно будет заблокировано и будет ждать получения ссылки из очереди по таймауту период. В случае неудачи сделайте последнюю попытку восстановить соединение. Код реализован следующим образом:

def get_connection_pipe(self):
        """
        获取连接
        :return:
        """
        try:
            connection_pipe = self._queue.get(False)
        except Queue.Empty:
            try:
                connection_pipe = self.get_new_connection_pipe()
            except GetConnectionException:
                timeout = self.timeout
                try:
                    connection_pipe = self._queue.get(timeout=timeout)
                except Queue.Empty:
                    try:
                        connection_pipe = self.get_new_connection_pipe()
                    except GetConnectionException:
                        logging.error("Too much connections, Get Connection Timeout!")
        if (time.time() - connection_pipe.use_time) > self.disable_time:
            self.close(connection_pipe)
            return self.get_connection_pipe()
        return connection_pipe

Полный дизайн пула соединений для RabbitMQ, вставленного в очередь сообщений, выглядит следующим образом:

# coding:utf-8
import logging
import threading
import Queue
from kombu import Connection
import time

class InsertQueue():
    def __init__(self, host=None, port=None, virtual_host=None, heartbeat_interval=3, name=None, password=None,
                 logger=None, maxIdle=10, maxActive=50, timeout=30, disable_time=20):
        """
        :param str host: Hostname or IP Address to connect to
        :param int port: TCP port to connect to
        :param str virtual_host: RabbitMQ virtual host to use
        :param int heartbeat_interval:  How often to send heartbeats
        :param str name: auth credentials name
        :param str password: auth credentials password
        """
        self.logger = logging if logger is None else logger
        self.host = host
        self.port = port
        self.virtual_host = virtual_host
        self.heartbeat_interval = heartbeat_interval
        self.name = name
        self.password = password
        self.mutex = threading.RLock()
        self.maxIdle = maxIdle
        self.maxActive = maxActive
        self.available = self.maxActive
        self.timeout = timeout
        self._queue = Queue.Queue(maxsize=self.maxIdle)
        self.disable_time = disable_time

    def get_new_connection_pipe(self):
        """
        产生新的队列连接
        :return:
        """

        with self.mutex:
            if self.available <= 0:
                raise GetConnectionException
            self.available -= 1
        try:

            conn = Connection(hostname=self.host,
                              port=self.port,
                              virtual_host=self.virtual_host,
                              heartbeat=self.heartbeat_interval,
                              userid=self.name,
                              password=self.password)
            producer = conn.Producer()

            return ConnectionPipe(conn, producer)
        except:
            with self.mutex:
                self.available += 1
            raise GetConnectionException

    def get_connection_pipe(self):
        """
        获取连接
        :return:
        """
        try:
            connection_pipe = self._queue.get(False)
        except Queue.Empty:
            try:
                connection_pipe = self.get_new_connection_pipe()
            except GetConnectionException:
                timeout = self.timeout
                try:
                    connection_pipe = self._queue.get(timeout=timeout)
                except Queue.Empty:
                    try:
                        connection_pipe = self.get_new_connection_pipe()
                    except GetConnectionException:
                        logging.error("Too much connections, Get Connection Timeout!")
        if (time.time() - connection_pipe.use_time) > self.disable_time:
            self.close(connection_pipe)
            return self.get_connection_pipe()
        return connection_pipe

    def close(self, connection_pipe):
        """
        close the connection and the correlative channel
        :param connection_pipe:
        :return:
        """
        with self.mutex:
            self.available += 1
            connection_pipe.close()
        return

    def insert_message(self, exchange=None, body=None, routing_key='', mandatory=True):
        """
        insert message to queue
        :param str exchange: exchange name
        :param str body: message
        :param str routing_key: routing key
        :param bool mandatory: is confirm: True means confirm, False means not confirm
        :return:
        """

        put_into_queue_flag = True
        insert_result = False
        connection_pipe = None
        try:

            connection_pipe = self.get_connection_pipe()
            producer = connection_pipe.channel
            use_time = time.time()
            producer.publish(exchange=exchange,
                                             body=body,
                                             delivery_mode=2,
                                             routing_key=routing_key,
                                             mandatory=mandatory
                                             )
            insert_result = True

        except Exception:
            insert_result = False
            put_into_queue_flag = False
        finally:

            if put_into_queue_flag is True:
                try:
                    connection_pipe.use_time = use_time
                    self._queue.put_nowait(connection_pipe)
                except Queue.Full:
                    self.close(connection_pipe)
            else:
                if connection_pipe is not None:
                    self.close(connection_pipe)

        return insert_result

class ConnectionPipe(object):
    """
    connection和channel对象的封装
    """

    def __init__(self, connection, channel):
        self.connection = connection
        self.channel = channel
        self.use_time = time.time()

    def close(self):
        try:
            self.connection.close()
        except Exception as ex:
            pass

class GetConnectionException():
    """
    获取连接异常
    """
    pass

Бережливый пул подключений

Что такое бережливость? Короче говоря, Thrift определяет простой файл, содержащий типы данных и интерфейсы служб, в качестве входных файлов, а компилятор генерирует код для удобного создания способа взаимодействия клиентов и серверов RPC. По сути, это метод удаленного вызова.Поскольку стек протоколов представляет собой уровень TCP, он более эффективен, чем уровень HTTP.

Структура пула соединений Thrift аналогична структуре пула соединений базы данных.

Идея по-прежнему заключается в том, что при получении соединения сначала получить соединение из пула соединений.Если в пуле нет соединения, решается, можно ли создать новое соединение.Если новое соединение не может быть создано, оно будет заблокировано. и дождитесь соединения.

Метод обработки, когда очередь не может быть получена из пула, метод обработки этой конструкции: когда соединение не может быть получено, поместить эту часть запроса в очередь ожидания, ожидая получения соединения; и когда соединение закрывается и помещается обратно в пул соединений, сначала определите, есть ли запросы, ожидающие получения соединений в этой очереди, и если да, назначьте приоритет этим запросам.

Когда соединение не может быть получено, код обработки выглядит следующим образом, и запрос помещается в очередь для блокировки и ожидания получения соединения:

async_result = AsyncResult()
self.no_client_queue.appendleft(async_result)
client = async_result.get()  # blocking

Когда есть освобождение соединения, которое необходимо вернуть в пул соединений, эта часть запроса должна иметь приоритет.Код выглядит следующим образом:

def put_back_connections(self, client):
    """
    线程安全
    将连接放回连接池,逻辑如下:
    1、如果有请求尚未获取到连接,请求优先
    2、如果连接池中的连接的数目小于maxIdle,则将该连接放回连接池
    3、关闭连接
    :param client:
    :return:
    """
    with self.lock:
        if self.no_client_queue.__len__() > 0:
            task = self.no_client_queue.pop()
            task.set(client)
        elif self.connections.__len__() < self.maxIdle:
            self.connections.add(client)
        else:
            client.close()
            self.pool_size -= 1

Наконец, на основе экономичного пула подключений представлена ​​реализация простой структуры обслуживания.

Платформа службы разделена на две части: RPC и реестр.
1. RPC: удаленный вызов, существует множество протоколов передачи для удаленного вызова, таких как http, веб-сервис, TCP и т. д. Thrift также является основной инфраструктурой RPC в мире. Акцент делается на том, чтобы быть безопасным, быстрым и предпочтительно кросс-языковым.
2. Центр регистрации: используется для хранения, обслуживания IP-адреса и информации о порте и т. д. Лучшими решениями для хранения служебной информации являются: Zookeeper, Redis и т. д. Основное внимание уделяется избеганию проблем с отдельными точками и простоте обслуживания.

Обычная схема архитектуры:

С помощью пула соединений Thrift в качестве клиента и Zookeeper в качестве реестра создается структура службы. В частности, сервер регистрируется в Zookeeper при запуске службы, а клиент обнаруживает IP-адрес и порт сервера через Zookeeper при его запуске и устанавливает соединение для доступа к службе сервера посредством опроса пула соединений Thrift.

Конкретный код дизайна выглядит следующим образом, код немного длинный, и вы получите кое-что от тщательного изучения:

# coding: utf-8

import threading
from collections import deque
import logging
import socket
import time
from kazoo.client import KazooClient
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.transport import (
    TBufferedTransportFactory,
    TSocket,
)
from gevent.event import AsyncResult
from gevent import Timeout

from error import CTECThriftClientError
from thriftpy.thrift import TClient
from thriftpy.transport import TTransportException

class ClientPool:
    def __init__(self, service, server_hosts=None, zk_path=None, zk_hosts=None, logger=None, max_renew_times=3, maxActive=20,
                 maxIdle=10, get_connection_timeout=30, socket_timeout=30, disable_time=3):
        """
        :param service: Thrift的Service名称
        :param server_hosts: 服务提供者地址,数组类型,['ip:port','ip:port']
        :param zk_path: 服务提供者在zookeeper中的路径
        :param zk_hosts: zookeeper的host地址,多个请用逗号隔开
        :param max_renew_times: 最大重连次数
        :param maxActive: 最大连接数
        :param maxIdle: 最大空闲连接数
        :param get_connection_timeout:获取连接的超时时间
        :param socket_timeout: 读取数据的超时时间
        :param disable_time: 连接失效时间
        """
        # 负载均衡队列
        self.load_balance_queue = deque()
        self.service = service
        self.lock = threading.RLock()
        self.max_renew_times = max_renew_times
        self.maxActive = maxActive
        self.maxIdle = maxIdle
        self.connections = set()
        self.pool_size = 0
        self.get_connection_timeout = get_connection_timeout
        self.no_client_queue = deque()
        self.socket_timeout = socket_timeout
        self.disable_time = disable_time
        self.logger = logging if logger is None else logger

        if zk_hosts:
            self.kazoo_client = KazooClient(hosts=zk_hosts)
            self.kazoo_client.start()
            self.zk_path = zk_path
            self.zk_hosts = zk_hosts
            # 定义Watcher
            self.kazoo_client.ChildrenWatch(path=self.zk_path,
                                            func=self.watcher)
            # 刷新连接池中的连接对象
            self.__refresh_thrift_connections(self.kazoo_client.get_children(self.zk_path))
        elif server_hosts:
            self.server_hosts = server_hosts
            # 复制新的IP地址到负载均衡队列中
            self.load_balance_queue.extendleft(self.server_hosts)
        else:
            raise CTECThriftClientError('没有指定服务器获取方式!')

    def get_new_client(self):
        """
        轮询在每个ip:port的连接池中获取连接(线程安全)
        从当前队列右侧取出ip:port信息,获取client
        将连接池对象放回到当前队列的左侧
        请求或连接超时时间,默认30秒
        :return:
        """
        with self.lock:
            if self.pool_size < self.maxActive:
                try:
                    ip = self.load_balance_queue.pop()
                except IndexError:
                    raise CTECThriftClientError('没有可用的服务提供者列表!')
                if ip:
                    self.load_balance_queue.appendleft(ip)
                    # 创建新的thrift client
                    t_socket = TSocket(ip.split(':')[0], int(ip.split(':')[1]),
                                       socket_timeout=1000 * self.socket_timeout)
                    proto_factory = TBinaryProtocolFactory()
                    trans_factory = TBufferedTransportFactory()
                    transport = trans_factory.get_transport(t_socket)
                    protocol = proto_factory.get_protocol(transport)
                    transport.open()
                    client = TClient(self.service, protocol)
                    self.pool_size += 1
                return client
            else:
                return None

    def close(self):
        """
        关闭所有连接池和zk客户端
        :return:
        """
        if getattr(self, 'kazoo_client', None):
            self.kazoo_client.stop()

    def watcher(self, children):
        """
        zk的watcher方法,负责检测zk的变化,刷新当前双端队列中的连接池
        :param children: 子节点,即服务提供方的列表
        :return:
        """
        self.__refresh_thrift_connections(children)

    def __refresh_thrift_connections(self, children):
        """
        刷新服务提供者在当前队列中的连接池信息(线程安全),主要用于zk刷新
        :param children:
        :return:
        """
        with self.lock:
            # 清空负载均衡队列
            self.load_balance_queue.clear()
            # 清空连接池
            self.connections.clear()
            # 复制新的IP地址到负载均衡队列中
            self.load_balance_queue.extendleft(children)

    def __getattr__(self, name):
        """
        函数调用,最大重试次数为max_renew_times
        :param name:
        :return:
        """

        def method(*args, **kwds):

            # 从连接池获取连接
            client = self.get_client_from_pool()

            # 连接池中无连接
            if client is None:
                # 设置获取连接的超时时间
                time_out = Timeout(self.get_connection_timeout)
                time_out.start()
                try:
                    async_result = AsyncResult()
                    self.no_client_queue.appendleft(async_result)
                    client = async_result.get()  # blocking
                except:
                    with self.lock:
                        if client is None:
                            self.no_client_queue.remove(async_result)
                            self.logger.error("Get Connection Timeout!")
                finally:
                    time_out.cancel()

            if client is not None:

                for i in xrange(self.max_renew_times):

                    try:
                        put_back_flag = True
                        client.last_use_time = time.time()
                        fun = getattr(client, name, None)
                        return fun(*args, **kwds)
                    except socket.timeout:
                        self.logger.error("Socket Timeout!")
                        # 关闭连接,不关闭会导致乱序
                        put_back_flag = False
                        self.close_one_client(client)
                        break

                    except TTransportException, e:
                        put_back_flag = False

                        if e.type == TTransportException.END_OF_FILE:
                            self.logger.warning("Socket Connection Reset Error,%s", e)
                            with self.lock:
                                client.close()
                                self.pool_size -= 1
                                client = self.get_new_client()
                        else:
                            self.logger.error("Socket Error,%s", e)
                            self.close_one_client(client)
                            break

                    except socket.error, e:
                        put_back_flag = False
                        if e.errno == socket.errno.ECONNABORTED:
                            self.logger.warning("Socket Connection aborted Error,%s", e)
                            with self.lock:
                                client.close()
                                self.pool_size -= 1
                                client = self.get_new_client()
                        else:
                            self.logger.error("Socket Error, %s", e)
                            self.close_one_client(client)
                            break

                    except Exception as e:
                        put_back_flag = False

                        self.logger.error("Thrift Error, %s", e)
                        self.close_one_client(client)
                        break

                    finally:
                        # 将连接放回连接池
                        if put_back_flag is True:
                            self.put_back_connections(client)
            return None

        return method

    def close_one_client(self, client):
        """
        线程安全
        关闭连接
        :param client:
        :return:
        """
        with self.lock:
            client.close()
            self.pool_size -= 1

    def put_back_connections(self, client):
        """
        线程安全
        将连接放回连接池,逻辑如下:
        1、如果有请求尚未获取到连接,请求优先
        2、如果连接池中的连接的数目小于maxIdle,则将该连接放回连接池
        3、关闭连接
        :param client:
        :return:
        """
        with self.lock:
            if self.no_client_queue.__len__() > 0:
                task = self.no_client_queue.pop()
                task.set(client)
            elif self.connections.__len__() < self.maxIdle:
                self.connections.add(client)
            else:
                client.close()
                self.pool_size -= 1

    def get_client_from_pool(self):
        """
        线程安全
        从连接池中获取连接,若连接池中有连接,直接取出,否则,
        新建一个连接,若一直无法获取连接,则返回None
        :return:
        """
        client = self.get_one_client_from_pool()

        if client is not None and (time.time() - client.last_use_time) < self.disable_time:
            return client
        else:
            if client is not None:
                self.close_one_client(client)

        client = self.get_new_client()
        if client is not None:
            return client

        return None

    def get_one_client_from_pool(self):
        """
        线程安全
        从连接池中获取一个连接,若取不到连接,则返回None
        :return:
        """
        with self.lock:
            if self.connections:
                try:
                    return self.connections.pop()
                except KeyError:
                    return None
            return None