Язык go реализует собственный RPC: кодек go rpc.

Go

предисловие

RPC — это аббревиатура удаленного вызова процедур (Remote Procedure Call).Посредством RPC мы можем вызывать функции, расположенные в других местах, точно так же, как и локальные методы. Более распространенными могут быть вызовы HTTP API. Для простого сравнения, RPC более совершенен, чем инкапсуляция HTTP-вызовов. Вызывающему абоненту не нужно заниматься сериализацией и десериализацией вручную, а стоимость использования ниже (хотя стоимость обучения может быть меньше). быть выше).

В учебных целях на этот раз цель состоит в том, чтобы использовать язык go для реализации собственного RPC. В реальном мире для инструмента RPC, помимо вызовов методов, люди уделяют больше внимания другим функциям, таким как обнаружение сервисов, балансировка нагрузки, переход на более раннюю версию прерывателя цепи и другие функции, которые пока не будут здесь рассматриваться, но сосредоточиться только на реализации вызова рабочего метода.

существуетпредыдущая статьяУ меня есть общее представление о инфраструктуре rpc, которая поставляется с языком go.Упоминается, что go rpc резервирует интерфейс кодека, который позволяет пользователям использовать свой собственный протокол сериализации в go rpc.На этот раз я попытаюсь реализовать кодек моего собственного RPC.

Готов к работе

протокол сериализации

Основные элементы для реализации RPC, вероятно, следующие: протокол сериализации, сетевая модель и модель потока. Кодек в go rpc в основном реализует протокол сериализации.

Изначально я хотел использовать знакомый протокол Thrift, но процесс RPC реализован с использованием самого Thrift, поэтому это не простой протокол сериализации, и его логика сериализации может быть несовместима с go. rpc хорошо подходит, плюс необходимо написать определения IDL, что увеличивает сложность. Изначально он должен был быть знаком с go, поэтому начнем с простого, поэтому в качестве протокола сериализации выбираем messagepack.

messagepackЭто относительно легкий протокол сериализации.Его логика похожа на json, но он использует бинарную форму, поэтому он быстрее, чем сериализация json, и данные, сгенерированные после сериализации, также меньше.В принципе, его можно рассматривать как бинарную версию json. .

Создайте определение класса

Чтобы реализовать собственный кодек, вам необходимо реализовать два интерфейса, предусмотренные в go rpc: ServerCodec и ClientCodec. Очевидно, что они представляют логику сервера и клиента соответственно. Определения двух интерфейсов следующие:

type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(interface{}) error
	WriteResponse(*Response, interface{}) error
	Close() error
}
type ClientCodec interface {
	WriteRequest(*Request, interface{}) error
	ReadResponseHeader(*Response) error
	ReadResponseBody(interface{}) error
	Close() error
}

Как видите, go rpc абстрагирует запрос/ответ в виде заголовок+тело.При чтении данных он делится на чтение головы и чтение тела.При записи данных вам нужно только написать часть тела, и перейти к rpc добавят его для нас. в головной раздел. Затем мы определяем две структуры для представления полных данных запроса/ответа:

type MsgpackReq struct {
	rpc.Request  //head
	Arg interface{} //body
}

type MsgpackResp struct {
	rpc.Response  //head
	Reply interface{}  //body
}

Здесь msgpackReq и msgpackResp непосредственно встраивают Запрос и Ответ, которые поставляются с go rpc.Встроенные Запрос и Ответ определяют серийный номер, имя метода и другую информацию.

Далее следует объявление пользовательского кодека:

type MessagePackServerCodec struct {
	rwc    io.ReadWriteCloser //用于读写数据,实际是一个网络连接
	req    MsgpackReq //用于缓存解析到的请求
	closed bool  //标识codec是否关闭
}

type MessagePackClientCodec struct {
	rwc    io.ReadWriteCloser
	resp   MsgpackResp  //用于缓存解析到的请求
	closed bool
}

func NewServerCodec(conn net.Conn) *MessagePackServerCodec {
	return &MessagePackServerCodec{conn, MsgpackReq{}, false}
}

func NewClientCodec(conn net.Conn) *MessagePackClientCodec {
	return &MessagePackClientCodec{conn, MsgpackResp{}, false}
}

Как упоминалось в предыдущей статье, кодек должен содержать источник данных для чтения и записи данных, и здесь напрямую передается сетевое соединение.

Реализуйте метод кодека

Реализовать идеи

Далее следует конкретная реализация метода.Для простоты два шага десериализации объединены в один шаг.Все данные анализируются и кэшируются при чтении головной части, а кэш возвращается непосредственно при чтении тела. читать.результат. Конкретная идея такова:

  1. Когда клиент отправляет запрос, он упаковывает данные в MsgpackReq, затем сериализует их с помощью пакета сообщений и отправляет.
  2. Когда сервер читает часть заголовка запроса, он десериализует полученные данные в MsgpackReq с пакетом сообщений и кэширует результат.
  3. Когда сервер читает тело запроса, он получает поле Arg из кэшированного MsgpackReq и возвращает его.
  4. Когда сервер отправляет ответ, он упаковывает данные в MsgpackResp, затем сериализует их с помощью messagepack и отправляет.
  5. Когда клиент читает часть заголовка ответа, он десериализует полученные данные в MsgpackResp с пакетом сообщений и кэширует результат.
  6. Когда клиент читает тело ответа, он получает поле Reply или Error из кэшированного MsgpackResp и возвращает его.

Реализация клиента

Код прямо здесь:

func (c *MessagePackClientCodec) WriteRequest(r *rpc.Request, arg interface{}) error {
	//先判断codec是否已经关闭,如果是则直接返回
	if c.closed {
		return nil
	}
	//将r和arg组装成一个MsgpackReq并序列化
	request := &MsgpackReq{*r, arg}
	reqData, err := msgpack.Marshal(request)
	if err != nil {
		panic(err)
		return err
	}
	//先发送数据长度
	head := make([]byte, 4)
	binary.BigEndian.PutUint32(head, uint32(len(reqData)))
	_, err = c.rwc.Write(head)
	//再将序列化产生的数据发送出去
	_, err = c.rwc.Write(reqData)
	return err
}

func (c *MessagePackClientCodec) ReadResponseHeader(r *rpc.Response) error {
	//先判断codec是否已经关闭,如果是则直接返回
	if c.closed {
		return nil
	}
	//读取数据
	data, err := readData(c.rwc)
	if err != nil {
		//client一旦初始化就会开始轮询数据,所以要处理连接close的情况
		if strings.Contains(err.Error(), "use of closed network connection") {
			return nil
		}
		panic(err) //简单起见,出现异常直接panic
	}

	//将读取到的数据反序列化成一个MsgpackResp
	var response MsgpackResp
	err = msgpack.Unmarshal(data, &response)

	if err != nil {
		panic(err) //简单起见,出现异常直接panic
	}

	//根据读取到的数据设置request的各个属性
	r.ServiceMethod = response.ServiceMethod
	r.Seq = response.Seq
	//同时将读取到的数据缓存起来
	c.resp = response

	return nil
}

func (c *MessagePackClientCodec) ReadResponseBody(reply interface{}) error {
	//这里直接用缓存的数据返回即可

	if "" != c.resp.Error {//如果返回的是异常
		return errors.New(c.resp.Error)
	}
	if reply != nil {
		//正常返回,通过反射将结果设置到reply变量,因为reply一定是指针类型,所以不必检查CanSet
		reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(c.resp.Reply))
	}
	return nil
}


func (c *MessagePackClientCodec) Close() error {
	c.closed = true //关闭时将closed设置为true
	if c.rwc != nil {
		return c.rwc.Close()
	}
	return nil
}

Выше приведена реализация клиентской части, стоит отметить несколько моментов:

  1. Перед чтением и записью данных нужно проверить, не закрылся ли кодек
  2. При чтении и записи данных нужно разобраться с распаковкой и залипанием (через функцию readData)

Реализация сервера

Также непосредственно в коде:

func (c *MessagePackServerCodec) WriteResponse(r *rpc.Response, reply interface{}) error {
	//先判断codec是否已经关闭,如果是则直接返回
	if c.closed {
		return nil
	}
	//将r和reply组装成一个MsgpackResp并序列化
	response := &MsgpackResp{*r, reply}

	respData, err := msgpack.Marshal(response)
	if err != nil {
		panic(err)
		return err
	}
	head := make([]byte, 4)
	binary.BigEndian.PutUint32(head, uint32(len(respData)))
	_, err = c.rwc.Write(head)
	//将序列化产生的数据发送出去
	_, err = c.rwc.Write(respData)
	return err
}

func (c *MessagePackServerCodec) ReadRequestHeader(r *rpc.Request) error {
	//先判断codec是否已经关闭,如果是则直接返回
	if c.closed {
		return nil
	}
	//读取数据
	data, err := readData(c.rwc)
	if err != nil {
		//这里不能直接panic,需要处理EOF和reset的情况
		if err == io.EOF {
			return err
		}
		if strings.Contains(err.Error(), "connection reset by peer") {
			return err
		}
		panic(err) //其他异常直接panic
	}
	//将读取到的数据反序列化成一个MsgpackReq
	var request MsgpackReq
	err = msgpack.Unmarshal(data, &request)

	if err != nil {
		panic(err) //简单起见,出现异常直接panic
	}

	//根据读取到的数据设置request的各个属性
	r.ServiceMethod = request.ServiceMethod
	r.Seq = request.Seq
	//同时将解析到的数据缓存起来
	c.req = request

	return nil
}

func (c *MessagePackServerCodec) ReadRequestBody(arg interface{}) error {
	if arg != nil {
		//参数不为nil,通过反射将结果设置到arg变量
		reflect.ValueOf(arg).Elem().Set(reflect.ValueOf(c.req.Arg))
	}
	return nil
}

func (c *MessagePackServerCodec) Close() error {
	c.closed = true
	if c.rwc != nil {
		return c.rwc.Close()
	}
	return nil
}

По сути, реализация серверной части почти такая же, как и логика клиентской стороны, за исключением того, что роли запроса и ответа различны. Вот несколько замечаний:

  1. Когда серверная сторона считывает данные, ей необходимо иметь дело с ситуацией EOF и сбросом соединения.
  2. Сервер явно не обрабатывает ошибку, сгенерированную интерфейсом при возврате данных, а просто передает ответ обратно, так как ошибка сохраняется в rpc.Request и не требует обработки кодеком

Работа с распаковкой и наклейкой

Конкретные идеи для справкиЯзык go обрабатывает распаковку/закрепление TCP., вот реализация readData:

func readData(conn io.ReadWriteCloser) (data []byte, returnError error) {
	const HeadSize = 4 //设定长度部分占4个字节
	headBuf := bytes.NewBuffer(make([]byte, 0, HeadSize))
	headData := make([]byte, HeadSize)
	for {
		readSize, err := conn.Read(headData)
		if err != nil {
			returnError = err
			return
		}
		headBuf.Write(headData[0:readSize])
		if headBuf.Len() == HeadSize {
			break
		} else {
			headData = make([]byte, HeadSize-readSize)
		}
	}
	bodyLen := int(binary.BigEndian.Uint32(headBuf.Bytes()))
	bodyBuf := bytes.NewBuffer(make([]byte, 0, bodyLen))
	bodyData := make([]byte, bodyLen)
	for {
		readSize, err := conn.Read(bodyData)
		if err != nil {
			returnError = err
			return
		}
		bodyBuf.Write(bodyData[0:readSize])
		if bodyBuf.Len() == bodyLen {
			break
		} else {
			bodyData = make([]byte, bodyLen-readSize)
		}
	}
	data = bodyBuf.Bytes()
	returnError = nil
	return
}

тестовый код

Затем мы тестируем наш кодек с помощью простого вызова Echo:

//声明接口类
type EchoService struct {}
//定义方法Echo
func (service *EchoService) Echo(arg string, result *string) error {
	*result = arg
	return nil
}
//服务端启动逻辑
func RegisterAndServeOnTcp() {
	err := rpc.Register(&EchoService{})//注册并不是注册方法,而是注册EchoService的一个实例
	if err != nil {
		log.Fatal("error registering", err)
		return
	}
	tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")
	if err != nil {
		log.Fatal("error resolving tcp", err)
	}
	listener, err := net.ListenTCP("tcp", tcpAddr)

	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Fatal("error accepting", err)
		} else {
			//这里先通过NewServerCodec获得一个实例,然后调用rpc.ServeCodec来启动服务
			rpc.ServeCodec(msgpk.NewServerCodec(conn))
		}
	}
}
//客户端调用逻辑
func Echo(arg string) (result string, err error) {
	var client *rpc.Client
	conn, err := net.Dial("tcp", ":1234")
	client = rpc.NewClientWithCodec(msgpk.NewClientCodec(conn))

	defer client.Close()

	if err != nil {
		return "", err
	}
	err = client.Call("EchoService.Echo", arg, &result) //通过类型加方法名指定要调用的方法
	if err != nil {
		return "", err
	}
	return result, err
}
//main函数
func main() {
	go server.RegisterAndServeOnTcp() //先启动服务端
	time.Sleep(1e9)
	wg := new(sync.WaitGroup) //waitGroup用于阻塞主线程防止提前退出
	callTimes := 10
	wg.Add(callTimes)
	for i := 0; i < callTimes; i++ {
		go func() {
		        //使用hello world加一个随机数作为参数
			argString := "hello world "+strconv.Itoa(rand.Int())
			resultString, err := client.Echo(argString)
			if err != nil {
				log.Fatal("error calling:", err)
			}
			if resultString != argString {
				fmt.Println("error")
			} else {
				fmt.Printf("echo:%s\n", resultString)
			}
			wg.Done()
		}()
	}
	wg.Wait()
}

В приведенном выше примере сервер сначала запускается с помощью go server.RegisterAndServeOnTcp(), а затем одновременно запускаются 10 процедур go для инициирования запроса, и клиент распечатывает соответствующий результат после получения ответа. Наконец, основная функция выполняется, и консоль выводит результат (случайное число позади может быть другим):

echo:hello world 8674665223082153551
echo:hello world 6129484611666145821
echo:hello world 5577006791947779410
echo:hello world 605394647632969758
echo:hello world 4037200794235010051
echo:hello world 3916589616287113937
echo:hello world 894385949183117216
echo:hello world 1443635317331776148
echo:hello world 2775422040480279449
echo:hello world 6334824724549167320

Эпилог

На данный момент простой пользовательский rpc для языка go завершен, хотя пользовательская часть — это только часть протокола сериализации.Например, модель потоков — это по-прежнему логика, которая поставляется с go rpc, и в предисловии об этом не упоминается. различные расширенные функции. В будущем рассмотрите возможность реализации RPC с нуля на языке go.

разное

Параллельные сценарии

Внимательные студенты могли обнаружить, что реализованная здесь логика вообще не учитывает параллелизм, а кэшированные данные также непосредственно помещаются в объект кодека. И такая простая реализация не приведет к сбою одновременных вызовов.Конкретная причина в том, что когда go rpc обрабатывает каждый объект кодека, запросы на чтение выполняются последовательно, а затем запросы обрабатываются параллельно и возвращаются результаты.