В последнее время онлайн-модуль, отвечающий за коллегу, всегда время от времени возвращает 504. После проверки было обнаружено, что использование памяти этой службой ненормально велико.После анализа pprof было обнаружено, что существуют десятки тысяч горутин. , После расследования и анализа стандартное использование пакета gorm отсутствует.Вызвано, то какова конкретная причина, будет ли это также похоже на«Анализ пакетов Go Http: зачем вам response.Body.Close ()»То же, что и в тексте, потому что связь не освобождается?
проблемное явление
demo
Сначала давайте посмотрим на пример, а затем угадаем напечатанный результат.
package main
import (
"log"
"net/http"
_ "net/http/pprof"
"time"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/mysql"
)
var (
db *gorm.DB
)
type User struct {
ID int64 `gorm:"column:id;primary_key" json:"id"`
Name string `gorm:"column:name" json:"name"`
}
func (user *User) TableName() string {
return "ranger_user"
}
func main() {
go func() {
log.Println(http.ListenAndServe(":6060", nil))
}()
for true {
GetUserList()
time.Sleep(time.Second)
}
}
func GetUserList() ([]*User, error) {
users := make([]*User, 0)
db := open()
rows, err := db.Model(&User{}).Where("id > ?", 1).Rows()
if err != nil {
panic(err)
}
// 为了试验而写的特殊逻辑
for rows.Next() {
user := &User{}
err = db.ScanRows(rows, user)
return nil, err
}
return users, nil
}
func open() *gorm.DB {
if db != nil {
return db
}
var err error
db, err = gorm.Open("mysql",
"user:pass@(ip:port)/db?charset=utf8&parseTime=True&loc=Local")
if err != nil {
panic(err)
}
return db
}
анализировать
Давайте взглянем на демо выше, вроде проблем нет, давайте запустим его на некоторое время и посмотрим
Немного неловко, я просто возвращаю простой запрос, откуда столько горутин?
Продолжайте видеть, какие функции генерируют горутины
startWatcher.func1
какого черта
func (mc *mysqlConn) startWatcher() {
watcher := make(chan mysqlContext, 1)
mc.watcher = watcher
finished := make(chan struct{})
mc.finished = finished
go func() {
for {
var ctx mysqlContext
select {
case ctx = <-watcher:
case <-mc.closech:
return
}
select {
case <-ctx.Done():
mc.cancel(ctx.Err())
case <-finished:
case <-mc.closech:
return
}
}
}()
}
Угадай, проверка
startWatcher
Вызывающий эту функцию, толькоMySQLDriver.Open
Он будет вызываться, то есть при создании нового подключения будет создаваться горутина монитора.
согласно с«Анализ пакетов Go Http: зачем вам response.Body.Close ()»Результат анализа , вы можете сделать смелое предположение.Возможно, что mysql получает соединение каждый раз, когда он запрашивает.Если нет свободного соединения, он создает новое.После завершения запроса соединение освобождается для пул соединений для использования в следующем запросе. А поскольку rows.Close() не вызывался, после того, как соединение было занято, оно не возвращалось в пул соединений для повторного использования, в результате чего для каждого запроса создавался новый запрос, что приводило к в большом количестве горутин для запуска.startWatcher.func1
Отслеживайте вновь созданные подключения. Таким образом, мы похожи на response.Close, если можно выполнить rows.Close(), то проверьте это.
Добавьте строку rows.Close() в приведенный выше тестовый код.
defer rows.Close()
for rows.Next() {
user := &User{}
err = db.ScanRows(rows, user)
return nil, err
}
Продолжайте наблюдать за изменениями горутины
Горутина больше не поднимается, похоже проблема решена
сомневаться
- Когда мы обычно пишем код, мы не вызываем
rows.Close()
Да, во многих случаях взрыв горутин не происходит, вот почему
структура
Как обычно, структуры, которые могут быть использованы, лучше опубликовать заранее, чтобы они были знакомы.
rows
// Rows is the result of a query. Its cursor starts before the first row
// of the result set. Use Next to advance from row to row.
type Rows struct {
dc *driverConn // owned; must call releaseConn when closed to release
releaseConn func(error) // driverConn.releaseConn, 在query的时候,会传递过来
rowsi driver.Rows
cancel func() // called when Rows is closed, may be nil.
closeStmt *driverStmt // if non-nil, statement to Close on close
// closemu prevents Rows from closing while there
// is an active streaming result. It is held for read during non-close operations
// and exclusively during close.
//
// closemu guards lasterr and closed.
closemu sync.RWMutex
closed bool
lasterr error // non-nil only if closed is true
// lastcols is only used in Scan, Next, and NextResultSet which are expected
// not to be called concurrently.
lastcols []driver.Value
}s
Запрос
Логика установления связей, структур областей видимости, методов Model и Where повторяться не будем.Предыдущая статья"Пит-рекорд ErrRecordNotFound GORM"Я уже говорил об этом в общих чертах, перейдем непосредственно кRows
Анализ функций
Rows
// Rows return `*sql.Rows` with given conditions
func (s *DB) Rows() (*sql.Rows, error) {
return s.NewScope(s.Value).rows()
}
func (scope *Scope) rows() (*sql.Rows, error) {
defer scope.trace(scope.db.nowFunc())
result := &RowsQueryResult{}
// 设置 row_query_result,供 callback 函数使用
scope.InstanceSet("row_query_result", result)
scope.callCallbacks(scope.db.parent.callbacks.rowQueries)
return result.Rows, result.Error
}
Я чувствую, что скоро приду сюдаcallback
Перезвоните
Исходя из опыта предыдущей статьи,rowQueries
Зарегистрированную функцию обратного вызова можно найти в функции init() в callback_row_query.go.
func init() {
DefaultCallback.RowQuery().Register("gorm:row_query", rowQueryCallback)
}
// queryCallback used to query data from database
func rowQueryCallback(scope *Scope) {
// 对应 上面函数里面的 scope.InstanceSet("row_query_result", result)
if result, ok := scope.InstanceGet("row_query_result"); ok {
// 组装出来对应的sql语句,eg: SELECT * FROM `ranger_user` WHERE (id > ?)
scope.prepareQuerySQL()
if str, ok := scope.Get("gorm:query_option"); ok {
scope.SQL += addExtraSpaceIfExist(fmt.Sprint(str))
}
if rowResult, ok := result.(*RowQueryResult); ok {
rowResult.Row = scope.SQLDB().QueryRow(scope.SQL, scope.SQLVars...)
} else if rowsResult, ok := result.(*RowsQueryResult); ok {
// result 对应的结构体是 RowsQueryResult,所以执行到这里,继续跟进这个函数
rowsResult.Rows, rowsResult.Error = scope.SQLDB().Query(scope.SQL, scope.SQLVars...)
}
}
}
Как видно выше,rowQueryCallback
Просто соберите sql, а затем вызовите пакет sql, предоставленный go to query
sql.Query
// Query executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
// query是sql语句,args则是sql中? 所代表的值
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
return db.QueryContext(context.Background(), query, args...)
}
// QueryContext executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
var rows *Rows
var err error
// maxBadConnRetries = 2
for i := 0; i < maxBadConnRetries; i++ {
// cachedOrNewConn 则是告诉query 去使用缓存的连接或者创建一个新的连接
rows, err = db.query(ctx, query, args, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
// 如果尝试了maxBadConnRetries次后,连接还是有问题的,则创建一个新的连接去执行sql
if err == driver.ErrBadConn {
return db.query(ctx, query, args, alwaysNewConn)
}
return rows, err
}
func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
// 根据上面定的获取连接的策略,来获取一个有效的连接
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
// 使用获取的连接,进行查询
return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}
Приведенную выше логику понять не сложно, тут две переменные, объясните
cachedOrNewConn: тип connReuseStrategy, по существу тип uint8, значение равно 1, этот флаг будет передан следующемуdb.conn
функция, скажите этой функции вернуть подключенную политику
1. 如果连接池中有空闲连接,返回一个空闲的
2. 如果连接池中没有空的连接,且没有超过最大创建的连接数,则创建一个新的返回
3. 如果连接池中没有空的连接,且超过最大创建的连接数,则等待连接释放后,返回这个空闲连接
alwaysNewConn:
- каждый раз возвращает новое соединение
присоединиться
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// Check if the context is expired.
// 校验一下ctx是否过期了
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime
// Prefer a free connection, if possible.
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
// 如果选择连接的策略是 cachedOrNewConn,并且有空闲的连接,则尝试获取连接池中的第一个连接
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
// 判断当前连接的空闲时间是否超过了设定的最大空闲时间
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Lock around reading lastErr to ensure the session resetter finished.
// 判断连接的lastErr,确保连接是被重置过的
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
// Out of free connections or we were asked not to use one. If we're not
// allowed to open any more connections, make a request and wait.
// 走到这里说明没有获取到空闲连接,判断创建的连接数量是否超过最大允许的连接数量
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
// 创建一个chan,用于接收释放的空闲连接
req := make(chan connRequest, 1)
// 创建一个key
reqKey := db.nextRequestKeyLocked()
// 将key 和chan绑定,便于根据key 定位所对应的chan
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := time.Now()
// Timeout the connection request with the context.
select {
case <-ctx.Done():
// Remove the connection request and ensure no value has been sent
// on it after removing.
// 如果ctx失效了,则这个空闲连接也不需要了,删除刚刚创建的key,防止这个连接被移除后再次为这个key获取连接
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default:
case ret, ok := <-req:
// 如果获取到了空闲连接,则放回连接池里面
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
case ret, ok := <-req:
// 此时拿到了空闲连接,且ctx没有过期,则判断连接是否有效
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
if !ok {
return nil, errDBClosed
}
// 判断连接是否过期
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
// Lock around reading lastErr to ensure the session resetter finished.
// 判断连接的lastErr,确保连接是被重置过的
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
// 上面两个都不满足,则创建一个新的连接,也就是 获取连接的策略是 alwaysNewConn 的时候
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
// 如果连接创建失败,则再尝试创建一次
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
// 关闭连接时会用到
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}
В приведенной выше логике вы можете видеть, что стратегия получения соединений такая же, как когда мы объясняли cachedOrNewConn и alwaysNewConn выше, но здесь есть две проблемы.
- Количество созданных подключений превышает максимально допустимое количество подключений, далее ждите простоя подключения.В это время в карту db.connRequests добавляется новый ключ.Этот ключ соответствует чану, а затем непосредственно ждет чан чтобы выплюнуть соединение.так как оно ждет разблокировать неработающее соединение, то соединение, вставленное в этот чан, должно быть в функции freeconn.Какова логика freeconn?
- После неудачной попытки создать новое соединение будет вызвана функция db.maybeOpenNewConnections, и эта функция не возвращает соединение, так что же она делает?
освободить соединение
Освобождение соединения в основном зависит отputconn
завершить, вconn
В коде ниже функция
case ret, ok := <-req:
// 如果获取到了空闲连接,则放回连接池里面
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
Также называется, поместите полученное, но больше не нужное соединение обратно в пул, давайте посмотрим на процесс освобождения соединения.
putConn
// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
db.mu.Lock()
// 释放一个正在用的连接,panic
if !dc.inUse {
panic("sql: connection returned that was never out")
}
dc.inUse = false
// 省略部分无关代码...
if err == driver.ErrBadConn {
// Don't reuse bad connections.
// Since the conn is considered bad and is being discarded, treat it
// as closed. Don't decrement the open count here, finalClose will
// take care of that.
// maybeOpenNewConnections 这个函数又见到了,它到底干了什么
db.maybeOpenNewConnections()
db.mu.Unlock()
dc.Close()
return
}
...
if db.closed {
// Connections do not need to be reset if they will be closed.
// Prevents writing to resetterCh after the DB has closed.
resetSession = false
}
if resetSession {
if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
// Lock the driverConn here so it isn't released until
// the connection is reset.
// The lock must be taken before the connection is put into
// the pool to prevent it from being taken out before it is reset.
dc.Lock()
}
}
// 把连接放回连接池中,也是这个函数的核心逻辑
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
// 如果释放连接失败,则关闭连接
if !added {
if resetSession {
dc.Unlock()
}
dc.Close()
return
}
if !resetSession {
return
}
// 尝试将连接放回resetterCh chan里面,如果失败,则标识连接异常
select {
default:
// If the resetterCh is blocking then mark the connection
// as bad and continue on.
dc.lastErr = driver.ErrBadConn
dc.Unlock()
case db.resetterCh <- dc:
}
}
putConnDBLocked
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
// 已经超出最大的连接数量了,不需要再放回了
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
// 如果有其他等待获取空闲连接的协程,则
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
// connRequests 获取一个 chan,并把这个连接返回到这个 chan里面
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true
}
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
// 如果没有超出最大数量限制,则把这个连接放到 freeConn 这个slice里面
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
}
Разобравшись с логикой освобождения соединения, мы можем увидеть общий процесс повторного использования соединения
- Приходит новый запрос, и нужно получить новое соединение
- Сначала определите, есть ли незанятое соединение, если нет и не превышает максимальное количество разрешенных для создания соединений, создайте одно
- После нескольких запросов количество подключений превысило установленное максимальное количество подключений, затем дождитесь освобождения незанятого подключения.
- На этом первый запрос завершен, готов разорвать соединение, посмотреть, есть ли запрос, ожидающий незанятого соединения, если есть, пропустить соединение напрямую через chan, в противном случае поставить соединение на незанятое соединение внутри бассейн
- На этом этапе дождитесь запроса на незанятое соединение позже, получите соединение, переданное первым запросом, и продолжите обработку запроса.
- выше, повторяю
###maybeOpenNewConnections
Эта функция встречалась в приведенном выше анализе дважды, давайте проанализируем, что она делает.
func (db *DB) maybeOpenNewConnections() {
// 计算需要创建的连接数,总共创建的有效连接数不能超过设置的最大连接数
numRequests := len(db.connRequests)
if db.maxOpen > 0 {
numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen {
numRequests = numCanOpen
}
}
for numRequests > 0 {
db.numOpen++ // optimistically
numRequests--
if db.closed {
return
}
// 往 openerCh 这个chan里面插入一条数据
db.openerCh <- struct{}{}
}
}
В предыдущем анализе, если количество созданных подключений >= максимально допустимому количеству подключений при получении подключений, в карте db.connRequests создается уникальное значение ключа для получения освобожденных простаивающих подключений, но если в процессе разрыва соединения обнаруживается, что соединение недействительно, и соединение нельзя использовать повторно.В это время он перейдет к этой функции и попытается создать новое соединение для других ожидающих запросов.
Вот вопрос: почемуdb.openerCh <- struct{}{}
Такой простой командой можно создать соединение, следующим шагом будет анализ получателя db.openerCh
###connectionOpener
Эта функция начнет выполняться, когда будет создана структура db, резидентная горутина
// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
// 这边接收到数据后,就开始创建一个新的连接
db.openNewConnection(ctx)
}
}
}
openNewConnection
// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
// maybeOpenNewConnctions has already executed db.numOpen++ before it sent
// on db.openerCh. This function must execute db.numOpen-- if the
// connection fails or is closed before returning.
// 调用 sql driver 库来创建一个连接
ci, err := db.connector.Connect(ctx)
db.mu.Lock()
defer db.mu.Unlock()
// 如果db已经关闭,则关闭连接并返回
if db.closed {
if err == nil {
ci.Close()
}
db.numOpen--
return
}
if err != nil {
// 创建连接失败了,重新调用 maybeOpenNewConnections 再创建一次
db.numOpen--
db.putConnDBLocked(nil, err)
db.maybeOpenNewConnections()
return
}
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
}
// 走到 putConnDBLocked,把连接交给等待的请求方或者连接池中
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
} else {
db.numOpen--
ci.Close()
}
}
Connect
Вот основная логика подключения к базе данных
func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
return t.driver.Open(t.dsn)
}
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
var err error
// New mysqlConn
mc := &mysqlConn{
maxAllowedPacket: maxPacketSize,
maxWriteSize: maxPacketSize - 1,
closech: make(chan struct{}),
}
// 解析dsn
mc.cfg, err = ParseDSN(dsn)
if err != nil {
return nil, err
}
mc.parseTime = mc.cfg.ParseTime
// Connect to Server
// 找到对应网络连接类型(tcp...) 的连接函数,并创建连接
dialsLock.RLock()
dial, ok := dials[mc.cfg.Net]
dialsLock.RUnlock()
if ok {
mc.netConn, err = dial(mc.cfg.Addr)
} else {
nd := net.Dialer{Timeout: mc.cfg.Timeout}
mc.netConn, err = nd.Dial(mc.cfg.Net, mc.cfg.Addr)
}
if err != nil {
return nil, err
}
// Enable TCP Keepalives on TCP connections
// 开启Keepalives
if tc, ok := mc.netConn.(*net.TCPConn); ok {
if err := tc.SetKeepAlive(true); err != nil {
// Don't send COM_QUIT before handshake.
mc.netConn.Close()
mc.netConn = nil
return nil, err
}
}
// Call startWatcher for context support (From Go 1.8)
// 这里调用startWatcher,开始对连接进行监控,及时释放连接
if s, ok := interface{}(mc).(watcher); ok {
s.startWatcher()
}
// 下面一些设置与分析无关,忽略...
return mc, nil
}
startWatcher
Эта функция в основном предназначена для контроля соединения
func (mc *mysqlConn) startWatcher() {
watcher := make(chan mysqlContext, 1)
mc.watcher = watcher
finished := make(chan struct{})
mc.finished = finished
go func() {
for {
var ctx mysqlContext
select {
case ctx = <-watcher:
case <-mc.closech:
return
}
select {
// ctx 过期的时候,关闭连接,这时候会关闭mc.closech
case <-ctx.Done():
mc.cancel(ctx.Err())
case <-finished:
// 关闭连接
case <-mc.closech:
return
}
}
}()
}
Логика создания соединения
- Сначала попытайтесь создать соединение, если это не удается, снова вызовите функцию MaybeOpenNewConnections, попробуйте снова создать новое соединение, пока создание не будет успешным или не будет запрашивающей стороны, ожидающей местоположения соединения.
- Когда создается новое соединение, вызывается функция startWatcher, резидентная горутина, которая отслеживает соединение и вовремя закрывает его.
- После того, как соединение будет успешно создано, передайте putConnDBLocked, чтобы передать соединение запрашивающей стороне, ожидающей соединения, или поместить его в пул соединений.
На данный момент процесс создания и повторного использования соединения в основном ясен.На данный момент у нас также есть четкое объяснение проблем, с которыми мы столкнулись в начале:
- При вызове функции Rows() для запроса вам необходимо получить соединение
- В настоящее время нет новых или незанятых соединений, поэтому необходимо создать новое соединение.
- Создание соединения означает создание горутины startWatcher для мониторинга.
- Поскольку rows.Close() не вызывается для освобождения соединения вовремя после завершения запроса, соединение не было возвращено в пул соединений или повторно использовано, поэтому новое соединение будет создаваться каждый раз, когда делается запрос.
- После нескольких запросов будет создано много горутин startWatcher, и, наконец, обнаруженное явление
Rows.Close
func (rs *Rows) Close() error {
return rs.close(nil)
}
func (rs *Rows) close(err error) error {
rs.closemu.Lock()
defer rs.closemu.Unlock()
// ...
rs.closed = true
// 相关字段的一些设置, 忽略 ....
rs.releaseConn(err)
return err
}
// 通过putConn 把连接释放
func (dc *driverConn) releaseConn(err error) {
dc.db.putConn(dc, err, true)
}
Функцию, соответствующую rs.releaseConn, можно найти в методе queryDC, который указан прямо здесь.
Как видите, rows.Close() наконец проходитputConn
Освободить текущее соединение для повторного использования
Rows.Next
Next подготавливает следующую запись для метода сканирования, чтобы метод сканирования мог ее прочитать.Если следующей строки нет или при подготовке следующей записи возникает ошибка, возвращается false
func (rs *Rows) Next() bool {
var doClose, ok bool
withLock(rs.closemu.RLocker(), func() {
// 准备下一条记录
doClose, ok = rs.nextLocked()
})
if doClose {
// 如果 doClose 为true,说明没有记录了,或者准备下一条记录的时候,出错了,此时关闭连接
rs.Close()
}
return ok
}
func (rs *Rows) nextLocked() (doClose, ok bool) {
// 如果 已经关闭了,就不要读取下一条了
if rs.closed {
return false, false
}
// Lock the driver connection before calling the driver interface
// rowsi to prevent a Tx from rolling back the connection at the same time.
rs.dc.Lock()
defer rs.dc.Unlock()
if rs.lastcols == nil {
rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns()))
}
// 获取下一条记录,并放到lastcols里面
rs.lasterr = rs.rowsi.Next(rs.lastcols)
if rs.lasterr != nil {
// Close the connection if there is a driver error.
// 读取出错,返回true,以便后面关闭连接
if rs.lasterr != io.EOF {
return true, false
}
nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
if !ok {
// 没有获取到记录了,返回true,以便后面关闭连接
return true, false
}
// The driver is at the end of the current result set.
// Test to see if there is another result set after the current one.
// Only close Rows if there is no further result sets to read.
if !nextResultSet.HasNextResultSet() {
doClose = true
}
return doClose, false
}
return false, true
}
Логика Next():
- При вызове Next() подготовить следующую запись для сканирования для чтения
- Возвращает false, если произошла ошибка при подготовке данных или нет следующей записи
- Если Next() получает значение false при подготовке данных, вызовите rows.Close(), чтобы поместить соединение обратно в пул или передать его другим запросам для ожидания, чтобы соединение можно было использовать повторно.
Итак, вот почему следующая демонстрация не имеет такой же проблемы.
for rows.Next() {
user := &User{}
err = db.ScanRows(rows, user)
if err != nil {
continue
}
}
Суммировать
На этом этапе вопрос, поднятый в начале, должен иметь четкий ответ: rows.Next() После получения последней записи он вызовет rows.Close(), чтобы поместить соединение обратно в пул соединений или передать его другим ожидающим запросы квадратные, поэтому нет необходимости вручную вызывать rows.Close(),
В демо с проблемой, поскольку rows.Next() не выполняется до последней записи и rows.Close() не вызывается, он не был возвращен для повторного использования после установления соединения, в результате чего приходит каждый. Запрос на создание нового соединения, создание нового монитораstartWatcher.func1
, что в итоге привело к взрыву памяти 💥