Из-за трехэтапного рукопожатия TCP и других причин установление соединения является относительно дорогостоящим поведением. Таким образом, в программе, которая должна взаимодействовать с определенным объектом несколько раз, необходимо поддерживать пул соединений с многократно используемыми соединениями для повторного использования.
Чтобы поддерживать пул соединений, самым основным требованием является выполнение:потокобезопасный, особенно в таком языке, как Golang, где эта функция — горутина.
Реализовать простой пул соединений
type Pool struct {
m sync.Mutex // 保证多个goroutine访问时候,closed的线程安全
res chan io.Closer //连接存储的chan
factory func() (io.Closer,error) //新建连接的工厂方法
closed bool //连接池关闭标志
}
В этом простом пуле соединений мы используем chan для хранения соединений в пуле. Метод создания новой структуры также относительно прост:
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size的值太小了。")
}
return &Pool{
factory: fn,
res: make(chan io.Closer, size),
}, nil
}
Просто укажите соответствующую фабричную функцию и размер пула соединений.
присоединиться
Итак, как мы получаем ресурсы от него? Поскольку наша внутренняя структура для хранения соединений — это chan, нам нужны только простыеselectВы можете гарантировать потокобезопасность:
//从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer,error) {
select {
case r,ok := <-p.res:
log.Println("Acquire:共享资源")
if !ok {
return nil,ErrPoolClosed
}
return r,nil
default:
log.Println("Acquire:新生成资源")
return p.factory()
}
}
Сначала мы получаем его из res chan пула соединений, если нет, то мы используем уже подготовленную фабричную функцию для построения соединения. В то же время мы используем его при получении соединения из resokСначала определите, был ли закрыт пул соединений. Если он уже закрыт, мы возвращаем уже подготовленную ошибку закрытия соединения.
закрыть пул соединений
Итак, поскольку мы упомянули о закрытии пула соединений, как нам закрыть пул соединений?
//关闭资源池,释放资源
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
//关闭通道,不让写入了
close(p.res)
//关闭通道里的资源
for r:=range p.res {
r.Close()
}
}
Здесь нам нужно сделатьp.m.Lock()Операция блокировки, это делается потому, что нам нужноclosedчитать и писать. Нужно сначала установить этот флаг, а потом закрыть чан рес, чтобыAcquireМетод больше не может получать новые подключения. были правыresСоединение в этом канале выполняет операцию закрытия.
освободить соединение
Существует предпосылка сначала освободить соединение, то есть пул соединений не был закрыт. Если пул соединений был закрыт, перейдите кresЕсли вы отправите соединение внутрь, это вызовет панику.
func (p *Pool) Release(r io.Closer){
//保证该操作和Close方法的操作是安全的
p.m.Lock()
defer p.m.Unlock()
//资源池都关闭了,就省这一个没有释放的资源了,释放即可
if p.closed {
r.Close()
return
}
select {
case p.res <- r:
log.Println("资源释放到池子里了")
default:
log.Println("资源池满了,释放这个资源吧")
r.Close()
}
}
Выше приведена простая и потокобезопасная реализация пула соединений. Что мы видим, так это то, что, хотя пул соединений уже реализован, все еще есть несколько небольших недостатков:
- У нас нет ограничений на максимальное количество соединений, если пул потоков пуст, мы создадим новое соединение и вернем его по умолчанию. Как только параллелизм становится высоким, новые соединения будут создаваться, что легко (особенно MySQL) вызвать
too many connections
возникает ошибка.- Поскольку нам нужно гарантировать максимальное количество доступных подключений, мы не хотим, чтобы это число было слишком мертвым. Есть надежда, что определенное количество незанятых соединений может поддерживаться при бездействии, но мы также надеемся, что сможем ограничить максимальное количество доступных соединений maxNum.
- Первый случай — слишком много параллелизма, что делать, если количество параллелизма слишком мало? Теперь после того, как мы создадим новое соединение и вернем его, мы не используем это соединение в течение длительного времени. Тогда соединение, скорее всего, было установлено несколько часов или более назад. Нет никакого способа гарантировать доступность соединения, которое простаивает в течение длительного времени. Возможно, что соединение, которое мы получим в следующий раз, является соединением с истекшим сроком действия.
Затем мы можем увидеть, как они решают эти проблемы, из зрелой библиотеки пула соединений MySQL и библиотеки пула соединений Redis.
Пул соединений Sql для стандартной библиотеки Golang
Пул соединений Golang реализован в стандартной библиотеке.database/sql/sql.go
Вниз. Когда мы запускаем:
db, err := sql.Open("mysql", "xxxx")
, будет открыт пул соединений. Мы можем посмотреть на возвращенныйdbструктура:
type DB struct {
waitDuration int64 // Total time waited for new connections.
mu sync.Mutex // protects following fields
freeConn []*driverConn
connRequests map[uint64]chan connRequest
nextRequest uint64 // Next key to use in connRequests.
numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{}
closed bool
maxIdle int // zero means defaultMaxIdleConns; negative means 0
maxOpen int // <= 0 means unlimited
maxLifetime time.Duration // maximum amount of time a connection may be reused
cleanerCh chan struct{}
waitCount int64 // Total number of connections waited for.
maxIdleClosed int64 // Total number of connections closed due to idle.
maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
}
Вышеупомянутое опускает некоторые поля, которые пока не требуют внимания. Как мы видим, структура подключения к хранилищу пула БДfreeConn, не тот chan, который мы использовали раньше, а **[]driverConn**, фрагмент соединений. В то же время мы также можем видеть, что естьmaxIdleи другие связанные переменные для контроля количества незанятых соединений. Примечательно,DBфункция инициализацииOpenФункция не создает новое соединение с базой данных. И в какой функции находится новое подключение? мы можемQueryМетод идет полностью назад, и мы можем видеть эту функцию:func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)
. И способ, которым мы получаем соединения из пула соединений, начинается здесь:
присоединиться
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
// 先判断db是否已经关闭。
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// 注意检测context是否已经被超时等原因被取消。
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime
// 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Lock around reading lastErr to ensure the session resetter finished.
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
// 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// 下面的动作相当于往connRequests这个map插入自己的号码牌。
// 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := time.Now()
// Timeout the connection request with the context.
select {
case <-ctx.Done():
// context取消操作的时候,记得从connRequests这个map取走自己的号码牌。
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default:
case ret, ok := <-req:
// 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还!
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
case ret, ok := <-req:
// 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
// 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}
Проще говоря,DBВ дополнение к использованию слайсов для хранения соединений, структура также добавляет механизм, подобный очереди.connRequestsдля решения процесса получения ожидающего соединения. В то же время он имеет хороший баланс в оценке работоспособности соединения. Итак, теперь, когда есть механизм очередей, что вы делаете, когда возвращаете соединение?
освободить соединение
мы можем напрямую найтиfunc (db *DB) putConnDBLocked(dc *driverConn, err error) bool
Сюда. Как говорится в комментариях, основная цель этого метода заключается в следующем:
Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.
Давайте в основном посмотрим на ключевые строки в нем:
...
// 如果已经超过最大打开数量了,就不需要在回归pool了
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
// 这边是重点了,基本来说就是从connRequest这个map里面随机抽一个在排队等着的请求。取出来后发给他。就不用归还池子了。
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // 删除这个在排队的请求。
if err == nil {
dc.inUse = true
}
// 把连接给这个正在排队的连接。
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
// 既然没人排队,就看看到了最大连接数目没有。没到就归还给freeConn。
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
...
Мы видим, что при возврате соединения, если в очереди есть запрос, он не будет возвращен в пул и отправлен человеку в очереди.
Теперь в основном решите небольшие проблемы, упомянутые выше. Не будет слишком много соединений, чтобы контролировать слишком много соединений. Также полезно поддерживать минимальное количество пулов соединений. В то же время он также проверяет работоспособность соединения.
Стоит отметить, что как код стандартной библиотеки, так и соответствующие комментарии и коды — все идеально, и вы действительно можете выглядеть обновленным.
redisКлиент Redis, реализованный на Golang
Как этот реализованный на Golang клиент Redis реализует пул соединений. Идеи здесь очень странные, и я все еще могу узнать много хороших идей. Конечно, из-за относительно небольшого количества комментариев к коду это все еще немного сбивает с толку на первый взгляд. Соответствующий адрес кода можно увидеть по адресу https://github.com/go-redis/redis/blob/master/internal/pool/pool.go.
И его структура пула соединений выглядит следующим образом
type ConnPool struct {
...
queue chan struct{}
connsMu sync.Mutex
conns []*Conn
idleConns []*Conn
poolSize int
idleConnsLen int
stats Stats
_closed uint32 // atomic
closedCh chan struct{}
}
Мы видим, что структура, в которой хранится соединение, по-прежнему является срезом. Но мы можем сосредоточиться наqueue
,conns
,idleConns
Эти переменные будут упомянуты позже. Но стоит отметить! Мы видим, что есть две структуры **[]Conn**:conns
,idleConns
, то возникает вопрос:
Где существует связь?
Создайте новое соединение с пулом соединений
Начнем с нового подключения к пулу соединений:
func NewConnPool(opt *Options) *ConnPool {
....
p.checkMinIdleConns()
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
....
}
Функция инициализации пула соединений отличается от двух предыдущих.
-
checkMinIdleConns
метод, когда пул соединений инициализируется, пул соединений будет заполнен бездействующими соединениями. -
go p.reaper(opt.IdleCheckFrequency)
Когда пул соединений инициализирован, будет запущен процесс go для периодического исключения соединений, подлежащих устранению в пуле соединений.
присоединиться
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
//这边和前面sql获取连接函数的流程先不同。sql是先看看连接池有没有空闲连接,有的话先获取不到再排队。这边是直接先排队获取令牌,排队函数后面会分析。
err := p.waitTurn(ctx)
if err != nil {
return nil, err
}
//前面没出error的话,就已经排队轮候到了。接下来就是获取的流程。
for {
p.connsMu.Lock()
//从空闲连接里面先获取一个空闲连接。
cn := p.popIdle()
p.connsMu.Unlock()
if cn == nil {
// 没有空闲连接时候直接跳出循环。
break
}
// 判断是否已经过时,是的话close掉了然后继续取出。
if p.isStaleConn(cn) {
_ = p.CloseConn(cn)
continue
}
atomic.AddUint32(&p.stats.Hits, 1)
return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
// 如果没有空闲连接的话,这边就直接新建连接了。
newcn, err := p.newConn(ctx, true)
if err != nil {
// 归还令牌。
p.freeTurn()
return nil, err
}
return newcn, nil
}
Мы можем попытаться ответить на вопрос в самом начале: где на самом деле существует связь? Ответ отcn := p.popIdle()
Из этого предложения видно, что действие по получению соединения производно отidleConns
Получается внутри, и функция внутри тоже это доказывает. Но так ли это на самом деле? Посмотрим позже.
В то же время я понимаю:
- Очередь sql означает, что после того, как я подаю заявку на подключение к пулу соединений, я сообщаю пулу соединений свой номер. Как только сторона связи увидит свободное время, она позвонит на мой номер. Я согласился, и тогда пул соединений дал мне соединение напрямую. Если я не верну его, пул соединений никогда не вызовет следующий номер.
- Что означает сторона redis, так это то, что я подаю заявку с пулом соединений, это не соединение, а токен. Я стоял в очереди, пул соединений дал мне токен, и я пошел на склад, чтобы найти свободное соединение или создать новое соединение самостоятельно. Когда у вас заканчиваются соединения, помимо возврата соединения, вы должны вернуть токен. Конечно, если я ошибусь при создании нового подключения самостоятельно, даже если я не могу получить подключение домой, я должен вернуть токен в пул подключений, иначе количество токенов в пуле подключений будет меньше , и максимальное количество подключений будет меньше.
и:
func (p *ConnPool) freeTurn() {
<-p.queue
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
...
case p.queue <- struct{}{}:
return nil
...
}
Он полагается на очередь очереди для поддержания количества токенов.
Такconns
Какова роль? Мы можем взглянуть на новую функцию соединения:
новое соединение
func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
cn, err := p.dialConn(ctx, pooled)
if err != nil {
return nil, err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
if pooled {
// 如果连接池满了,会在后面移除。
if p.poolSize >= p.opt.PoolSize {
cn.pooled = false
} else {
p.poolSize++
}
}
p.connsMu.Unlock()
return cn, nil
}
Основная логика отсутствует. То есть, если я создам новое подключение, я не буду ставить его напрямуюidleConns
внутри, но поставить сначалаconns
в. При этом смотрите, не полон ли бассейн. Если он заполнен, он будет помечен при возврате позже и удален позже. Так это будет удалено позже, когда это значит? Вот когда пришло время вернуть соединение, как описано ниже.
обратное соединение
func (p *ConnPool) Put(cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf("Conn has unread data")
p.Remove(cn, BadConnError{})
return
}
//这就是我们刚刚说的后面了,前面标记过不要入池的,这边就删除了。当然了,里面也会进行freeTurn操作。
if !cn.pooled {
// 这个方法就是前面的标志位,判断里面可以知道,前面标志不要池化的,这里会将它删除。
p.Remove(cn, nil)
return
}
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
p.connsMu.Unlock()
//我们可以看到很明显的这个归还号码牌的动作。
p.freeTurn()
}
Ответ заключается в том, что все соединения на самом деле хранятся в слайсе conns. Если соединение находится в состоянии ожидания бездействия, добавьте на него указатель в idleConns!
На самом деле, процесс его возврата заключается в проверке того, является ли соединение, которое я собираюсь вернуть, перепроданным продуктом.Если это так, нет необходимости объединять его, и его можно удалить напрямую. Если нет, просто добавьте само соединение (указатель) в idleConns.
Подождите, логика выше кажется немного неправильной? Рассмотрим процесс подключения:
- Первый
waitTurn
, получить токен. Количество токенов зависит от пулаqueue
решил. - Получил жетон, иди на склад
idleConns
Возьмите неработающие соединения внутрь. Если нет, будь собойnewConn
один, и записал его какconns
в. - Когда закончите, позвоните
put
Возврат: то есть изconns
Добавьте указатель на это соединение вidleConns
. Проверяйте при возвратеnewConn
Не пора ли сделать отметку перепроданности? Если да, не переводитеidleConns
.
Давно интересовался, так как для подключения всегда нужно получить токен, количество токенов фиксировано. Почему он все еще перепродан? Просматривая исходный код, мой ответ:
Несмотря на то чтоGet
Способ получения соединенияnewConn
Этот частный метод, подлежащий токен-контролю, не будет перепродан. Но этот метод принимает параметры:pooled bool
. Поэтому я думаю, что меня беспокоит, что, когда другие люди вызывают этот метод, они передадут true независимо от того, является ли это 3721, в результате чего poolSize становится все больше и больше.
В общем, управление номером соединения пула соединений Redis все еще находится в
queue
Этот чан, который я называю токеном, действует.
Суммировать
Как вы можете видеть выше, основной гарантией пула соединений является потокобезопасность при получении соединений. Однако при реализации многих дополнительных возможностей это реализуется с разных сторон. Все еще очень интересно. Но независимо от того, использует ли структура хранения chan или slice, этого можно добиться очень хорошо. Если вы используете срезы для хранения соединений, таких как sql или redis, вы должны поддерживать структуру для представления эффекта очереди.