Изучение компонентов Go — рукописный пул соединений не так прост

Go

1. Предпосылки

Глядя на gorm ранее, обнаружил повторное использование базы данных пула соединений gorm / sql.

Итак, кодовая реализация пула соединений базы данных/базы данных SQL, после прочтения кода, кажется, что это не очень сложно, но я всегда чувствую, что этого недостаточно, поэтому я чувствую, что хочу написать соединение с соединением. . (Наконец, я также убедился, что у меня недостаточно понимания исходного кода, я смогу его увидеть.

2. Принцип реализации пула соединений

Что такое пул соединений

  • это бассейн
  • В пуле хранится ограниченное количество немедленно доступных соединений, что сокращает время создания и закрытия соединений.
  • Связи имеют срок жизни

Специально для пула соединений с базой данных я нарисовал блок-схему для получения соединений в соответствии с моим собственным пониманием.

Из приведенного выше рисунка видно, что в дополнение к емкости пула соединений у нас также есть ограничение на максимальное количество соединений. Соединения в пуле позволяют не создавать и часто закрывать соединения, при этом должно быть ограничение на максимальное соединение, чтобы избежать неограниченного создания соединений, что приведет к исчерпанию ресурсов сервера и недоступность услуг.

Соединения в пуле также имеют время жизни, если время жизни превышено, соединение будет уничтожено.

3. Какие проблемы необходимо учитывать при реализации пула соединений

3.1 Функциональные точки

  • присоединиться

  • освободить соединение

  • Ping

  • закрыть пул соединений

  • Установите максимальное количество подключений и емкость пула подключений (время существования подключенного соединения и т. д.).

3.2 Детали реализации

  • Какими свойствами должно обладать подключение, например максимальное количество подключений, емкость пула подключений, время создания подключения и время жизни
  • Как имитировать использование пула соединений и ожидание освобождения других соединений после превышения максимального числа соединений
  • Как обеспечить согласованность данных при работе с несколькими сопрограммами
  • Если вы реализуете мониторинг тайм-аута соединения и уведомление

4. Конкретная реализация

Реализация пула соединений здесь включает

  • Установите максимальное количество подключений и емкость пула подключений
  • присоединиться
  • освободить соединение

4.1 Определение структуры

Определите структуру Conn, которая содержит почти все информационные свойства, необходимые для соединения.

type Conn struct {
	maxConn       int                     // 最大连接数
	maxIdle       int                     // 最大可用连接数
	freeConn      int                     // 线程池空闲连接数
	connPool      []int                   // 连接池
	openCount     int                     // 已经打开的连接数
	waitConn      map[int]chan Permission // 排队等待的连接队列
	waitCount     int                     // 等待个数
	lock          sync.Mutex              // 锁
	nextConnIndex NextConnIndex						// 下一个连接的ID标识(用于区分每个ID)
	freeConns     map[int]Permission 			// 连接池的连接	
}

Это не создает реальное соединение с базой данных, а использует непустое разрешение, чтобы указать, что соединение получено. Непустое разрешение позволяет выполнять только операции, подобные добавлениям, удалениям, изменениям и проверкам.

Разрешение, соответствующее следующей структуре

type Permission struct {
	NextConnIndex								 // 对应Conn中的NextConnIndex
	Content     string					 // 通行证的具体内容,比如"PASSED"表示成功获取
	CreatedAt   time.Time				 // 创建时间,即连接的创建时间
	MaxLifeTime time.Duration    // 连接的存活时间,本次没有用到这个属性,保留
}

Структура, соответствующая NextConnindex, заключается в следующем

type NextConnIndex struct {
	Index int
}

Существует также конфигурация, используемая для установки максимального количества подключений и максимального количества подключений в пуле подключений.

type Config struct {
	MaxConn int
	MaxIdle int
}

4.2 Инициализация параметров пула соединений

func Prepare(ctx context.Context, config *Config) (conn *Conn) {
	// go func() {
		//for {
		//conn.expiredCh = make(chan string, len(conn.freeConns))
		//for _, value := range conn.freeConns {
		//	if value.CreatedAt.Add(value.MaxLifeTime).Before(nowFunc()) {
		//		conn.expiredCh <- "CLOSE"
		//	}
		//}
	// }()
	return &Conn{
		maxConn:   config.MaxConn,
		maxIdle:   config.MaxIdle,
		openCount: 0,
		connPool:  []int{},
		waitConn:  make(map[int]chan Permission),
		waitCount: 0,
		freeConns: make(map[int]Permission),
	}
}

Здесь главное инициализировать указанные выше параметры структуры Conn.

Комментируемая часть в основном хочет запустить сопрограмму прослушивания для прослушивания соединений с истекшим сроком действия и отправки их через канал. (Есть еще некоторые детали в этой части, которые я не разобрался ясно, так что давайте пока отложим это в сторону)

4.3 Установите MaxConn и MaxIdle

Добавьте код в main.go

ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}

Это означает, что пул соединений может кэшировать только одно соединение, а максимальное количество новых соединений равно 2. Если оно превышается, оно будет добавлено в очередь ожидания.

4.4 Получить соединение

// 创建连接
func (conn *Conn) New(ctx context.Context) (permission Permission, err error) {
	/**
	1、如果当前连接池已满,即len(freeConns)=0
	2、判定openConn是否大于maxConn,如果大于,则丢弃获取加入队列进行等待
	3、如果小于,则考虑创建新连接
	*/
	conn.lock.Lock()

	select {
	default:
	case <-ctx.Done():	// context取消或超时,则退出
		conn.lock.Unlock()

		return Permission{}, errors.New("new conn failed, context cancelled!")
	}

  // 连接池不为空,从连接池获取连接
	if len(conn.freeConns) > 0 {
		var (
			popPermission Permission
			popReqKey     int
		)
    
    // 获取其中一个连接
		for popReqKey, popPermission = range conn.freeConns {
			break
		}
    // 从连接池删除
		delete(conn.freeConns, popReqKey)
		fmt.Println("log", "use free conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
			conn.lock.Unlock()
		return popPermission, nil
	}

	if conn.openCount >= conn.maxConn { // 当前连接数大于上限,则加入等待队列
		nextConnIndex := getNextConnIndex(conn)

		req := make(chan Permission, 1)
		conn.waitConn[nextConnIndex] = req
		conn.waitCount++
		conn.lock.Unlock()

		select {
      // 如果在等待指定超时时间后,仍然无法获取释放连接,则放弃获取连接,这里如果不在超时时间后退出会一直阻塞
		case <-time.After(time.Second * time.Duration(3)):
			fmt.Println("超时,通知主线程退出")
			return
		case ret, ok := <-req: // 有放回的连接, 直接拿来用
			if !ok {
				return Permission{}, errors.New("new conn failed, no available conn release")
			}
			fmt.Println("log", "received released conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
			return ret, nil
		}
		return Permission{}, errors.New("new conn failed")
	}

	// 新建连接
	conn.openCount++
	conn.lock.Unlock()
	permission = Permission{NextConnIndex: NextConnIndex{nextConnIndex},
		Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
	fmt.Println("log", "create conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
	return permission, nil
}

В основном делится на три части

  • Если пул соединений не пуст, вы можете напрямую получить соединение из пула и использовать его.

  • Если пул соединений пуст и текущее количество соединений превысило максимальное число соединений maxConn, текущая задача будет добавлена ​​в очередь ожидания, и в то же время будет контролировать, есть ли свободные соединения. соединение все еще не получено, оно выйдет из блокировки и вернется.

  • Если пул соединений пуст и максимальное количество соединений maxConn не достигнуто, будет создано новое соединение.

функция getNextConnIndex

func getNextConnIndex(conn *Conn) int {
	currentIndex := conn.nextConnIndex.Index
	conn.nextConnIndex.Index = currentIndex + 1
	return conn.nextConnIndex.Index
}

4.5 Отключить соединение

// 释放连接
func (conn *Conn) Release(ctx context.Context) (result bool, err error) {
	conn.lock.Lock()
  // 如果等待队列有等待任务,则通知正在阻塞等待获取连接的进程(即New方法中"<-req"逻辑)
  // 这里没有做指定连接的释放,只是保证释放的连接会被利用起来
	if len(conn.waitConn) > 0 {
		var req chan Permission
		var reqKey int
		for reqKey, req = range conn.waitConn {
			break
		}
		// 假定释放的连接就是下面新建的连接
		permission := Permission{NextConnIndex: NextConnIndex{reqKey},
			Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
		req <- permission
		conn.waitCount--
		delete(conn.waitConn, reqKey)
		conn.lock.Unlock()
	} else {
		if conn.openCount > 0 {
			conn.openCount--
      
			if len(conn.freeConns) < conn.maxIdle {	// 确保连接池大小不会超过maxIdle
				nextConnIndex := getNextConnIndex(conn)
				permission := Permission{NextConnIndex: NextConnIndex{nextConnIndex},
					Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
				conn.freeConns[nextConnIndex] = permission
			}
		}
		conn.lock.Unlock()
	}
	return
}

В основном делится на две части

  • Если обнаружится, что в очереди ожидания есть задачи, когда соединение будет освобождено, освобожденное соединение будет отправлено через канал для использования блокирующей задачей, ожидающей освобождения соединения, и задача будет удалена из очереди. очереди ожидания в то же время.
  • Если в настоящее время нет ожидающих задач, поместите соединение в пул соединений.

Вот теперьFunc

var nowFunc = time.Now

5. Моделирование случая

5.1 Создание соединения без разблокировки

То есть создается только соединение, и соединение не будет разорвано при получении соединения.

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
}

Результат выполнения следующий

Обратите внимание, что приведенный выше код всегда получает соединение, и соединение не разрывается после того, как соединение установлено.

В первый раз, если пул соединений пуст, создайте новое соединение

Второе приобретение, пул соединения пуст, продолжайте создавать новые соединения

Третье привлечение, пул соединения пуст и имеет соединения> = maxconn, он будет блокировать ожидание выпуска соединения, но поскольку нет выпуска соединения, он ждал, чтобы выйти после тайм-аута до 3 секунд.

Таким образом, 3-й, 4-й и 5-й раз - все выходы по тайм-ауту.

5.2 Отключить соединение

Что, если мы разорвем соединение, мы можем разорвать соединение, запустив новую сопрограмму следующим образом:

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
}

Результат выполнения следующий

log create conn!!!!! openCount:  1  freeConns:  map[]
log create conn!!!!! openCount:  2  freeConns:  map[]
log received released conn!!!!! openCount:  2  freeConns:  map[]
超时,通知主线程退出
超时,通知主线程退出

Первые два раза такие же, как и выше, но при получении третьего раза будет получено освобожденное соединение, поэтому освобожденное соединение можно будет повторно использовать и вернуть.

Но четвертое и пятое творение, потому что нет выпущенного соединения, выйдет после ожидания тайм-аута.

5.3 пул соединений

Вышеуказанные два случая выполняются, когда MaxConn=2, MaxIdle=1.

Давайте посмотрим, сможем ли мы смоделировать ситуацию использования пула соединений на основе двух указанных выше настроек параметров.

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
}

То есть, кроме первого раза, позже будет релиз подключения.

Результат выполнения может быть следующим

log create conn!!!!! openCount:  1  freeConns:  map[]
log create conn!!!!! openCount:  2  freeConns:  map[]
log use free conn!!!!! openCount:  1  freeConns:  map[]
log use free conn!!!!! openCount:  0  freeConns:  map[]
log create conn!!!!! openCount:  1  freeConns:  map[]

Как видно из результатов выполнения, соединение в пуле соединений используется дважды.

Примечание: Поскольку выпуск представляет собой только что начатое выполнение сопрограммы, порядок выполнения не может быть гарантирован, поскольку разные порядок выполнения будут иметь разные результаты выполнения. Вышеприведенное — лишь один из результатов выполнения.

Полный код выше см. https://github.com/DMinerJackie/go-demo/tree/master/main/src/custom-pool.

6. Резюме и перспективы

6.1 Резюме

  • Углубить понимание реализации пула соединений с помощью рукописного пула соединений.
  • Научитесь использовать каналы и сопрограммы
  • Узнайте, как выйти после заблокированного канала в течение указанного времени (установите время ожидания)
  • Общество для блокировки общих ресурсов, таких как доступ и обновление nextConnIndex, необходимо заблокировать.

6.2 Перспективы

  • Close и Ping не пишутся (реализация не сложная)
  • Соединение пула соединений должно иметь время жизни и удаляется из пула соединений по истечении срока действия соединения.
  • Реализация использует обычную коллекцию карт, и можно рассматривать syncMap, безопасную для параллелизма.
  • Реализация кода относительно проста и не элегантна, и ее можно продолжать улучшать, чтобы обеспечить единую ответственность.

Личный публичный аккаунт JackieZheng, добро пожаловать на внимание~