Прочтите исходный код: почему redigo небезопасен для многопоточности

Go

Redigo — это сторонняя библиотека для работы с redis в golang, причина выбора этой библиотеки в том, что ее документация очень богата, а работа с ней относительно проста. Типичное использование redigo выглядит так:

package main

import (
	"github.com/gomodule/redigo/redis"
	"log"
)

func main() {
	conn, err := redis.Dial("tcp", "192.168.1.2:6379")
	if err != nil {
		log.Fatalf("dial redis failed :%v\n", err)
	}

	result, err := redis.String(conn.Do("SET", "hello", "world"))
	if err != nil {
		log.Fatalln(err)
	}

	log.Println(result)
}

Здесь следует отметить, что по умолчанию к Redis можно получить только локальный доступ.Вы можете получить удаленный доступ, изменив привязку в /etc/redis/redis.conf.Здесь я изменил привязку к IP-адресу машины, на которой находится служба.

Хотя использование redigo очень простое, в его документации также указано, что нам нужно уделить особое внимание.godocСмотрите оригинальный текст в:

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do and Close methods.

Это переводится как:

Соединения поддерживают одновременный запуск одного агента, вызывающего Receive, и одного агента, вызывающего методы Send и Flush. Параллельные вызовы методов Do и Close не поддерживаются.

Из любопытства программистов я взглянул на исходный код реализации метода Do в redigo и примерно понял, почему функция Do небезопасна для параллелизма. Часть его исходного кода выглядит следующим образом:

func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
	return c.DoWithTimeout(c.readTimeout, cmd, args...)
}

func (c *conn) DoWithTimeout(readTimeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {
	c.mu.Lock()
	pending := c.pending
	c.pending = 0
	c.mu.Unlock()

	if cmd == "" && pending == 0 {
		return nil, nil
	}

	if c.writeTimeout != 0 {
		c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
	}

	if cmd != "" {
		if err := c.writeCommand(cmd, args); err != nil {
			return nil, c.fatal(err)
		}
	}

	if err := c.bw.Flush(); err != nil {
		return nil, c.fatal(err)
	}

	var deadline time.Time
	if readTimeout != 0 {
		deadline = time.Now().Add(readTimeout)
	}
	c.conn.SetReadDeadline(deadline)

	if cmd == "" {
		reply := make([]interface{}, pending)
		for i := range reply {
			r, e := c.readReply()
			if e != nil {
				return nil, c.fatal(e)
			}
			reply[i] = r
		}
		return reply, nil
	}

	var err error
	var reply interface{}
	for i := 0; i <= pending; i++ {
		var e error
		if reply, e = c.readReply(); e != nil {
			return nil, c.fatal(e)
		}
		if e, ok := reply.(Error); ok && err == nil {
			err = e
		}
	}
	return reply, err
}

func (c *conn) writeCommand(cmd string, args []interface{}) error {
	c.writeLen('*', 1+len(args))
	if err := c.writeString(cmd); err != nil {
		return err
	}
	for _, arg := range args {
		if err := c.writeArg(arg, true); err != nil {
			return err
		}
	}
	return nil
}

Вышеуказанные три функции реализованы в файле conn.go пакета redigo redis, в методе DoWithTimeout мы видим, что он выполняет передачу данных и соответствующий прием последовательно, и в функции нет блокировки. Хотя базовая реализация отправки TCP в golang заблокирована, она может гарантировать, что данные одной операции записи не будут вставлены в данные другой операции записи, но в этой реализации DoWithTimeout мы все еще смутно ощущаем небезопасный вкус.

Мы фиксируем фокус на методе writeCommand. Из его реализации мы можем узнать, что его функция в основном заключается в отправке команд redis на сервер redis в диапазоне for... для выполнения. В это время мы можем заметить, что эта функция не заблокирована.Если для ... диапазона нужно записать данные в глобальный буфер, то параллелизм, вероятно, вызовет пересечение данных. Для подтверждения этого предположения продолжим смотреть на реализацию writeArg:

func (c *conn) writeArg(arg interface{}, argumentTypeOK bool) (err error) {
	switch arg := arg.(type) {
	case string:
		return c.writeString(arg)
	case []byte:
		return c.writeBytes(arg)
	case int:
		return c.writeInt64(int64(arg))
	case int64:
		return c.writeInt64(arg)
	case float64:
		return c.writeFloat64(arg)
	case bool:
		if arg {
			return c.writeString("1")
		} else {
			return c.writeString("0")
		}
	case nil:
		return c.writeString("")
	case Argument:
		if argumentTypeOK {
			return c.writeArg(arg.RedisArg(), false)
		}
		// See comment in default clause below.
		var buf bytes.Buffer
		fmt.Fprint(&buf, arg)
		return c.writeBytes(buf.Bytes())
	default:
		// This default clause is intended to handle builtin numeric types.
		// The function should return an error for other types, but this is not
		// done for compatibility with previous versions of the package.
		var buf bytes.Buffer
		fmt.Fprint(&buf, arg)
		return c.writeBytes(buf.Bytes())
	}
}

func (c *conn) writeString(s string) error {
	c.writeLen('$', len(s))
	c.bw.WriteString(s)
	_, err := c.bw.WriteString("\r\n")
	return err
}

Метод writeArg вызывает разные методы для записи данных, оценивая разницу во входящих параметрах, но нижний уровень этих методов фактически вызывает метод writeString. В реализации метода writeString мы видим, что redigo записывает все данные в bw. bw является записывающим устройством net.Conn в conn, то есть, если метод Do выполняется одновременно, все эти тела параллельного выполнения записывают данные в один и тот же записывающий элемент net.Conn, что в основном подтверждает то, что я сказал выше.

Давайте вернемся к методу Flush из bw, который вызывается после того, как функция DoWithTimeout выполнит команду writeCommand. Этот метод отправляет все данные в буфер. Давайте посмотрим на его реализацию:

// Flush writes any buffered data to the underlying io.Writer.
func (b *Writer) Flush() error {
	if b.err != nil {
		return b.err
	}
	if b.n == 0 {
		return nil
	}
	n, err := b.wr.Write(b.buf[0:b.n])
	if n < b.n && err == nil {
		err = io.ErrShortWrite
	}
	if err != nil {
		if n > 0 && n < b.n {
			copy(b.buf[0:b.n-n], b.buf[n:b.n])
		}
		b.n -= n
		b.err = err
		return err
	}
	b.n = 0
	return nil
}

Из кода видно, что после вызова метода b.wr.Write выполняется операция определения того, равна ли длина записываемых данных длине данных буфера. Из вышеприведенного анализа мы можем узнать, что redigo не блокируется в течение всего процесса вызова Do. Тогда, во время параллельного процесса Flush исполнительного органа, очень вероятно, что другие исполнительные органы будут буферизоваться для записи. в буферной зоне после вызова b.wr.Write длина записываемых данных меньше длины данных буфера, что приводит к короткой ошибке записи.

Мы можем написать программу для проверки:

package main

import (
	"github.com/gomodule/redigo/redis"
	"log"
	"sync"
)

func main() {
	conn, err := redis.Dial("tcp", "192.168.1.2:6379")
	if err != nil {
		log.Fatalf("dial redis failed :%v\n", err)
	}

	wg := sync.WaitGroup{}
	wg.Add(2)

	go func() {
		defer wg.Done()
		result, err := redis.String(conn.Do("SET", "hello", "world"))
		if err != nil {
			log.Fatalln(err)
		}
		log.Println(result)
	}()

	go func() {
		defer wg.Done()
		result, err := redis.String(conn.Do("SET", "hello", "world"))
		if err != nil {
			log.Fatalln(err)
		}
		log.Println(result)
	}()

	wg.Wait()
}

После выполнения возникает короткая ошибка записи:

Автор redigo рекомендует использовать пул соединений для обеспечения безопасности во время параллелизма Реализация пула соединений redigo будет рассмотрена вместе в следующий раз.

Читая исходный код, вы можете понять идеи авторов с открытым исходным кодом для реализации работ с открытым исходным кодом, а также вы можете расширить свой кругозор и распознать некоторые лучшие навыки программирования.Эта привычка должна соблюдаться.