1. Предпосылки
Глядя на gorm ранее, обнаружил повторное использование базы данных пула соединений gorm / sql.
Итак, кодовая реализация пула соединений базы данных/базы данных SQL, после прочтения кода, кажется, что это не очень сложно, но я всегда чувствую, что этого недостаточно, поэтому я чувствую, что хочу написать соединение с соединением. . (Наконец, я также убедился, что у меня недостаточно понимания исходного кода, я смогу его увидеть.
2. Принцип реализации пула соединений
Что такое пул соединений
- это бассейн
- В пуле хранится ограниченное количество немедленно доступных соединений, что сокращает время создания и закрытия соединений.
- Связи имеют срок жизни
Специально для пула соединений с базой данных я нарисовал блок-схему для получения соединений в соответствии с моим собственным пониманием.
Из приведенного выше рисунка видно, что в дополнение к емкости пула соединений у нас также есть ограничение на максимальное количество соединений. Соединения в пуле позволяют не создавать и часто закрывать соединения, при этом должно быть ограничение на максимальное соединение, чтобы избежать неограниченного создания соединений, что приведет к исчерпанию ресурсов сервера и недоступность услуг.
Соединения в пуле также имеют время жизни, если время жизни превышено, соединение будет уничтожено.
3. Какие проблемы необходимо учитывать при реализации пула соединений
3.1 Функциональные точки
-
присоединиться
-
освободить соединение
-
Ping
-
закрыть пул соединений
-
Установите максимальное количество подключений и емкость пула подключений (время существования подключенного соединения и т. д.).
3.2 Детали реализации
- Какими свойствами должно обладать подключение, например максимальное количество подключений, емкость пула подключений, время создания подключения и время жизни
- Как имитировать использование пула соединений и ожидание освобождения других соединений после превышения максимального числа соединений
- Как обеспечить согласованность данных при работе с несколькими сопрограммами
- Если вы реализуете мониторинг тайм-аута соединения и уведомление
4. Конкретная реализация
Реализация пула соединений здесь включает
- Установите максимальное количество подключений и емкость пула подключений
- присоединиться
- освободить соединение
4.1 Определение структуры
Определите структуру Conn, которая содержит почти все информационные свойства, необходимые для соединения.
type Conn struct {
maxConn int // 最大连接数
maxIdle int // 最大可用连接数
freeConn int // 线程池空闲连接数
connPool []int // 连接池
openCount int // 已经打开的连接数
waitConn map[int]chan Permission // 排队等待的连接队列
waitCount int // 等待个数
lock sync.Mutex // 锁
nextConnIndex NextConnIndex // 下一个连接的ID标识(用于区分每个ID)
freeConns map[int]Permission // 连接池的连接
}
Это не создает реальное соединение с базой данных, а использует непустое разрешение, чтобы указать, что соединение получено. Непустое разрешение позволяет выполнять только операции, подобные добавлениям, удалениям, изменениям и проверкам.
Разрешение, соответствующее следующей структуре
type Permission struct {
NextConnIndex // 对应Conn中的NextConnIndex
Content string // 通行证的具体内容,比如"PASSED"表示成功获取
CreatedAt time.Time // 创建时间,即连接的创建时间
MaxLifeTime time.Duration // 连接的存活时间,本次没有用到这个属性,保留
}
Структура, соответствующая NextConnindex, заключается в следующем
type NextConnIndex struct {
Index int
}
Существует также конфигурация, используемая для установки максимального количества подключений и максимального количества подключений в пуле подключений.
type Config struct {
MaxConn int
MaxIdle int
}
4.2 Инициализация параметров пула соединений
func Prepare(ctx context.Context, config *Config) (conn *Conn) {
// go func() {
//for {
//conn.expiredCh = make(chan string, len(conn.freeConns))
//for _, value := range conn.freeConns {
// if value.CreatedAt.Add(value.MaxLifeTime).Before(nowFunc()) {
// conn.expiredCh <- "CLOSE"
// }
//}
// }()
return &Conn{
maxConn: config.MaxConn,
maxIdle: config.MaxIdle,
openCount: 0,
connPool: []int{},
waitConn: make(map[int]chan Permission),
waitCount: 0,
freeConns: make(map[int]Permission),
}
}
Здесь главное инициализировать указанные выше параметры структуры Conn.
Комментируемая часть в основном хочет запустить сопрограмму прослушивания для прослушивания соединений с истекшим сроком действия и отправки их через канал. (Есть еще некоторые детали в этой части, которые я не разобрался ясно, так что давайте пока отложим это в сторону)
4.3 Установите MaxConn и MaxIdle
Добавьте код в main.go
ctx := context.Background()
config := &custom_pool.Config{
MaxConn: 2,
MaxIdle: 1,
}
Это означает, что пул соединений может кэшировать только одно соединение, а максимальное количество новых соединений равно 2. Если оно превышается, оно будет добавлено в очередь ожидания.
4.4 Получить соединение
// 创建连接
func (conn *Conn) New(ctx context.Context) (permission Permission, err error) {
/**
1、如果当前连接池已满,即len(freeConns)=0
2、判定openConn是否大于maxConn,如果大于,则丢弃获取加入队列进行等待
3、如果小于,则考虑创建新连接
*/
conn.lock.Lock()
select {
default:
case <-ctx.Done(): // context取消或超时,则退出
conn.lock.Unlock()
return Permission{}, errors.New("new conn failed, context cancelled!")
}
// 连接池不为空,从连接池获取连接
if len(conn.freeConns) > 0 {
var (
popPermission Permission
popReqKey int
)
// 获取其中一个连接
for popReqKey, popPermission = range conn.freeConns {
break
}
// 从连接池删除
delete(conn.freeConns, popReqKey)
fmt.Println("log", "use free conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
conn.lock.Unlock()
return popPermission, nil
}
if conn.openCount >= conn.maxConn { // 当前连接数大于上限,则加入等待队列
nextConnIndex := getNextConnIndex(conn)
req := make(chan Permission, 1)
conn.waitConn[nextConnIndex] = req
conn.waitCount++
conn.lock.Unlock()
select {
// 如果在等待指定超时时间后,仍然无法获取释放连接,则放弃获取连接,这里如果不在超时时间后退出会一直阻塞
case <-time.After(time.Second * time.Duration(3)):
fmt.Println("超时,通知主线程退出")
return
case ret, ok := <-req: // 有放回的连接, 直接拿来用
if !ok {
return Permission{}, errors.New("new conn failed, no available conn release")
}
fmt.Println("log", "received released conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
return ret, nil
}
return Permission{}, errors.New("new conn failed")
}
// 新建连接
conn.openCount++
conn.lock.Unlock()
permission = Permission{NextConnIndex: NextConnIndex{nextConnIndex},
Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
fmt.Println("log", "create conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
return permission, nil
}
В основном делится на три части
-
Если пул соединений не пуст, вы можете напрямую получить соединение из пула и использовать его.
-
Если пул соединений пуст и текущее количество соединений превысило максимальное число соединений maxConn, текущая задача будет добавлена в очередь ожидания, и в то же время будет контролировать, есть ли свободные соединения. соединение все еще не получено, оно выйдет из блокировки и вернется.
-
Если пул соединений пуст и максимальное количество соединений maxConn не достигнуто, будет создано новое соединение.
функция getNextConnIndex
func getNextConnIndex(conn *Conn) int {
currentIndex := conn.nextConnIndex.Index
conn.nextConnIndex.Index = currentIndex + 1
return conn.nextConnIndex.Index
}
4.5 Отключить соединение
// 释放连接
func (conn *Conn) Release(ctx context.Context) (result bool, err error) {
conn.lock.Lock()
// 如果等待队列有等待任务,则通知正在阻塞等待获取连接的进程(即New方法中"<-req"逻辑)
// 这里没有做指定连接的释放,只是保证释放的连接会被利用起来
if len(conn.waitConn) > 0 {
var req chan Permission
var reqKey int
for reqKey, req = range conn.waitConn {
break
}
// 假定释放的连接就是下面新建的连接
permission := Permission{NextConnIndex: NextConnIndex{reqKey},
Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
req <- permission
conn.waitCount--
delete(conn.waitConn, reqKey)
conn.lock.Unlock()
} else {
if conn.openCount > 0 {
conn.openCount--
if len(conn.freeConns) < conn.maxIdle { // 确保连接池大小不会超过maxIdle
nextConnIndex := getNextConnIndex(conn)
permission := Permission{NextConnIndex: NextConnIndex{nextConnIndex},
Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
conn.freeConns[nextConnIndex] = permission
}
}
conn.lock.Unlock()
}
return
}
В основном делится на две части
- Если обнаружится, что в очереди ожидания есть задачи, когда соединение будет освобождено, освобожденное соединение будет отправлено через канал для использования блокирующей задачей, ожидающей освобождения соединения, и задача будет удалена из очереди. очереди ожидания в то же время.
- Если в настоящее время нет ожидающих задач, поместите соединение в пул соединений.
Вот теперьFunc
var nowFunc = time.Now
5. Моделирование случая
5.1 Создание соединения без разблокировки
То есть создается только соединение, и соединение не будет разорвано при получении соединения.
package main
import (
"context"
custom_pool "go-demo/main/src/custom-pool"
)
func main() {
ctx := context.Background()
config := &custom_pool.Config{
MaxConn: 2,
MaxIdle: 1,
}
conn := custom_pool.Prepare(ctx, config)
if _, err := conn.New(ctx); err != nil {
return
}
if _, err := conn.New(ctx); err != nil {
return
}
if _, err := conn.New(ctx); err != nil {
return
}
if _, err := conn.New(ctx); err != nil {
return
}
if _, err := conn.New(ctx); err != nil {
return
}
}
Результат выполнения следующий
Обратите внимание, что приведенный выше код всегда получает соединение, и соединение не разрывается после того, как соединение установлено.
В первый раз, если пул соединений пуст, создайте новое соединение
Второе приобретение, пул соединения пуст, продолжайте создавать новые соединения
Третье привлечение, пул соединения пуст и имеет соединения> = maxconn, он будет блокировать ожидание выпуска соединения, но поскольку нет выпуска соединения, он ждал, чтобы выйти после тайм-аута до 3 секунд.
Таким образом, 3-й, 4-й и 5-й раз - все выходы по тайм-ауту.
5.2 Отключить соединение
Что, если мы разорвем соединение, мы можем разорвать соединение, запустив новую сопрограмму следующим образом:
package main
import (
"context"
custom_pool "go-demo/main/src/custom-pool"
)
func main() {
ctx := context.Background()
config := &custom_pool.Config{
MaxConn: 2,
MaxIdle: 1,
}
conn := custom_pool.Prepare(ctx, config)
if _, err := conn.New(ctx); err != nil {
return
}
if _, err := conn.New(ctx); err != nil {
return
}
go conn.Release(ctx)
if _, err := conn.New(ctx); err != nil {
return
}
if _, err := conn.New(ctx); err != nil {
return
}
if _, err := conn.New(ctx); err != nil {
return
}
}
Результат выполнения следующий
log create conn!!!!! openCount: 1 freeConns: map[]
log create conn!!!!! openCount: 2 freeConns: map[]
log received released conn!!!!! openCount: 2 freeConns: map[]
超时,通知主线程退出
超时,通知主线程退出
Первые два раза такие же, как и выше, но при получении третьего раза будет получено освобожденное соединение, поэтому освобожденное соединение можно будет повторно использовать и вернуть.
Но четвертое и пятое творение, потому что нет выпущенного соединения, выйдет после ожидания тайм-аута.
5.3 пул соединений
Вышеуказанные два случая выполняются, когда MaxConn=2, MaxIdle=1.
Давайте посмотрим, сможем ли мы смоделировать ситуацию использования пула соединений на основе двух указанных выше настроек параметров.
package main
import (
"context"
custom_pool "go-demo/main/src/custom-pool"
)
func main() {
ctx := context.Background()
config := &custom_pool.Config{
MaxConn: 2,
MaxIdle: 1,
}
conn := custom_pool.Prepare(ctx, config)
if _, err := conn.New(ctx); err != nil {
return
}
go conn.Release(ctx)
if _, err := conn.New(ctx); err != nil {
return
}
go conn.Release(ctx)
if _, err := conn.New(ctx); err != nil {
return
}
go conn.Release(ctx)
if _, err := conn.New(ctx); err != nil {
return
}
go conn.Release(ctx)
if _, err := conn.New(ctx); err != nil {
return
}
}
То есть, кроме первого раза, позже будет релиз подключения.
Результат выполнения может быть следующим
log create conn!!!!! openCount: 1 freeConns: map[]
log create conn!!!!! openCount: 2 freeConns: map[]
log use free conn!!!!! openCount: 1 freeConns: map[]
log use free conn!!!!! openCount: 0 freeConns: map[]
log create conn!!!!! openCount: 1 freeConns: map[]
Как видно из результатов выполнения, соединение в пуле соединений используется дважды.
Примечание: Поскольку выпуск представляет собой только что начатое выполнение сопрограммы, порядок выполнения не может быть гарантирован, поскольку разные порядок выполнения будут иметь разные результаты выполнения. Вышеприведенное — лишь один из результатов выполнения.
Полный код выше см. https://github.com/DMinerJackie/go-demo/tree/master/main/src/custom-pool.
6. Резюме и перспективы
6.1 Резюме
- Углубить понимание реализации пула соединений с помощью рукописного пула соединений.
- Научитесь использовать каналы и сопрограммы
- Узнайте, как выйти после заблокированного канала в течение указанного времени (установите время ожидания)
- Общество для блокировки общих ресурсов, таких как доступ и обновление nextConnIndex, необходимо заблокировать.
6.2 Перспективы
- Close и Ping не пишутся (реализация не сложная)
- Соединение пула соединений должно иметь время жизни и удаляется из пула соединений по истечении срока действия соединения.
- Реализация использует обычную коллекцию карт, и можно рассматривать syncMap, безопасную для параллелизма.
- Реализация кода относительно проста и не элегантна, и ее можно продолжать улучшать, чтобы обеспечить единую ответственность.
Личный публичный аккаунт JackieZheng, добро пожаловать на внимание~