предисловие
RPC — это аббревиатура удаленного вызова процедур (Remote Procedure Call).Посредством RPC мы можем вызывать функции, расположенные в других местах, точно так же, как и локальные методы. Более распространенными могут быть вызовы HTTP API. Для простого сравнения, RPC более совершенен, чем инкапсуляция HTTP-вызовов. Вызывающему абоненту не нужно заниматься сериализацией и десериализацией вручную, а стоимость использования ниже (хотя стоимость обучения может быть меньше). быть выше).
В учебных целях на этот раз цель состоит в том, чтобы использовать язык go для реализации собственного RPC. В реальном мире для инструмента RPC, помимо вызовов методов, люди уделяют больше внимания другим функциям, таким как обнаружение сервисов, балансировка нагрузки, переход на более раннюю версию прерывателя цепи и другие функции, которые пока не будут здесь рассматриваться, но сосредоточиться только на реализации вызова рабочего метода.
существуетпредыдущая статьяУ меня есть общее представление о инфраструктуре rpc, которая поставляется с языком go.Упоминается, что go rpc резервирует интерфейс кодека, который позволяет пользователям использовать свой собственный протокол сериализации в go rpc.На этот раз я попытаюсь реализовать кодек моего собственного RPC.
Готов к работе
протокол сериализации
Основные элементы для реализации RPC, вероятно, следующие: протокол сериализации, сетевая модель и модель потока. Кодек в go rpc в основном реализует протокол сериализации.
Изначально я хотел использовать знакомый протокол Thrift, но процесс RPC реализован с использованием самого Thrift, поэтому это не простой протокол сериализации, и его логика сериализации может быть несовместима с go. rpc хорошо подходит, плюс необходимо написать определения IDL, что увеличивает сложность. Изначально он должен был быть знаком с go, поэтому начнем с простого, поэтому в качестве протокола сериализации выбираем messagepack.
messagepackЭто относительно легкий протокол сериализации.Его логика похожа на json, но он использует бинарную форму, поэтому он быстрее, чем сериализация json, и данные, сгенерированные после сериализации, также меньше.В принципе, его можно рассматривать как бинарную версию json. .
Создайте определение класса
Чтобы реализовать собственный кодек, вам необходимо реализовать два интерфейса, предусмотренные в go rpc: ServerCodec и ClientCodec. Очевидно, что они представляют логику сервера и клиента соответственно. Определения двух интерфейсов следующие:
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(interface{}) error
WriteResponse(*Response, interface{}) error
Close() error
}
type ClientCodec interface {
WriteRequest(*Request, interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error
Close() error
}
Как видите, go rpc абстрагирует запрос/ответ в виде заголовок+тело.При чтении данных он делится на чтение головы и чтение тела.При записи данных вам нужно только написать часть тела, и перейти к rpc добавят его для нас. в головной раздел. Затем мы определяем две структуры для представления полных данных запроса/ответа:
type MsgpackReq struct {
rpc.Request //head
Arg interface{} //body
}
type MsgpackResp struct {
rpc.Response //head
Reply interface{} //body
}
Здесь msgpackReq и msgpackResp непосредственно встраивают Запрос и Ответ, которые поставляются с go rpc.Встроенные Запрос и Ответ определяют серийный номер, имя метода и другую информацию.
Далее следует объявление пользовательского кодека:
type MessagePackServerCodec struct {
rwc io.ReadWriteCloser //用于读写数据,实际是一个网络连接
req MsgpackReq //用于缓存解析到的请求
closed bool //标识codec是否关闭
}
type MessagePackClientCodec struct {
rwc io.ReadWriteCloser
resp MsgpackResp //用于缓存解析到的请求
closed bool
}
func NewServerCodec(conn net.Conn) *MessagePackServerCodec {
return &MessagePackServerCodec{conn, MsgpackReq{}, false}
}
func NewClientCodec(conn net.Conn) *MessagePackClientCodec {
return &MessagePackClientCodec{conn, MsgpackResp{}, false}
}
Как упоминалось в предыдущей статье, кодек должен содержать источник данных для чтения и записи данных, и здесь напрямую передается сетевое соединение.
Реализуйте метод кодека
Реализовать идеи
Далее следует конкретная реализация метода.Для простоты два шага десериализации объединены в один шаг.Все данные анализируются и кэшируются при чтении головной части, а кэш возвращается непосредственно при чтении тела. читать.результат. Конкретная идея такова:
- Когда клиент отправляет запрос, он упаковывает данные в MsgpackReq, затем сериализует их с помощью пакета сообщений и отправляет.
- Когда сервер читает часть заголовка запроса, он десериализует полученные данные в MsgpackReq с пакетом сообщений и кэширует результат.
- Когда сервер читает тело запроса, он получает поле Arg из кэшированного MsgpackReq и возвращает его.
- Когда сервер отправляет ответ, он упаковывает данные в MsgpackResp, затем сериализует их с помощью messagepack и отправляет.
- Когда клиент читает часть заголовка ответа, он десериализует полученные данные в MsgpackResp с пакетом сообщений и кэширует результат.
- Когда клиент читает тело ответа, он получает поле Reply или Error из кэшированного MsgpackResp и возвращает его.
Реализация клиента
Код прямо здесь:
func (c *MessagePackClientCodec) WriteRequest(r *rpc.Request, arg interface{}) error {
//先判断codec是否已经关闭,如果是则直接返回
if c.closed {
return nil
}
//将r和arg组装成一个MsgpackReq并序列化
request := &MsgpackReq{*r, arg}
reqData, err := msgpack.Marshal(request)
if err != nil {
panic(err)
return err
}
//先发送数据长度
head := make([]byte, 4)
binary.BigEndian.PutUint32(head, uint32(len(reqData)))
_, err = c.rwc.Write(head)
//再将序列化产生的数据发送出去
_, err = c.rwc.Write(reqData)
return err
}
func (c *MessagePackClientCodec) ReadResponseHeader(r *rpc.Response) error {
//先判断codec是否已经关闭,如果是则直接返回
if c.closed {
return nil
}
//读取数据
data, err := readData(c.rwc)
if err != nil {
//client一旦初始化就会开始轮询数据,所以要处理连接close的情况
if strings.Contains(err.Error(), "use of closed network connection") {
return nil
}
panic(err) //简单起见,出现异常直接panic
}
//将读取到的数据反序列化成一个MsgpackResp
var response MsgpackResp
err = msgpack.Unmarshal(data, &response)
if err != nil {
panic(err) //简单起见,出现异常直接panic
}
//根据读取到的数据设置request的各个属性
r.ServiceMethod = response.ServiceMethod
r.Seq = response.Seq
//同时将读取到的数据缓存起来
c.resp = response
return nil
}
func (c *MessagePackClientCodec) ReadResponseBody(reply interface{}) error {
//这里直接用缓存的数据返回即可
if "" != c.resp.Error {//如果返回的是异常
return errors.New(c.resp.Error)
}
if reply != nil {
//正常返回,通过反射将结果设置到reply变量,因为reply一定是指针类型,所以不必检查CanSet
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(c.resp.Reply))
}
return nil
}
func (c *MessagePackClientCodec) Close() error {
c.closed = true //关闭时将closed设置为true
if c.rwc != nil {
return c.rwc.Close()
}
return nil
}
Выше приведена реализация клиентской части, стоит отметить несколько моментов:
- Перед чтением и записью данных нужно проверить, не закрылся ли кодек
- При чтении и записи данных нужно разобраться с распаковкой и залипанием (через функцию readData)
Реализация сервера
Также непосредственно в коде:
func (c *MessagePackServerCodec) WriteResponse(r *rpc.Response, reply interface{}) error {
//先判断codec是否已经关闭,如果是则直接返回
if c.closed {
return nil
}
//将r和reply组装成一个MsgpackResp并序列化
response := &MsgpackResp{*r, reply}
respData, err := msgpack.Marshal(response)
if err != nil {
panic(err)
return err
}
head := make([]byte, 4)
binary.BigEndian.PutUint32(head, uint32(len(respData)))
_, err = c.rwc.Write(head)
//将序列化产生的数据发送出去
_, err = c.rwc.Write(respData)
return err
}
func (c *MessagePackServerCodec) ReadRequestHeader(r *rpc.Request) error {
//先判断codec是否已经关闭,如果是则直接返回
if c.closed {
return nil
}
//读取数据
data, err := readData(c.rwc)
if err != nil {
//这里不能直接panic,需要处理EOF和reset的情况
if err == io.EOF {
return err
}
if strings.Contains(err.Error(), "connection reset by peer") {
return err
}
panic(err) //其他异常直接panic
}
//将读取到的数据反序列化成一个MsgpackReq
var request MsgpackReq
err = msgpack.Unmarshal(data, &request)
if err != nil {
panic(err) //简单起见,出现异常直接panic
}
//根据读取到的数据设置request的各个属性
r.ServiceMethod = request.ServiceMethod
r.Seq = request.Seq
//同时将解析到的数据缓存起来
c.req = request
return nil
}
func (c *MessagePackServerCodec) ReadRequestBody(arg interface{}) error {
if arg != nil {
//参数不为nil,通过反射将结果设置到arg变量
reflect.ValueOf(arg).Elem().Set(reflect.ValueOf(c.req.Arg))
}
return nil
}
func (c *MessagePackServerCodec) Close() error {
c.closed = true
if c.rwc != nil {
return c.rwc.Close()
}
return nil
}
По сути, реализация серверной части почти такая же, как и логика клиентской стороны, за исключением того, что роли запроса и ответа различны. Вот несколько замечаний:
- Когда серверная сторона считывает данные, ей необходимо иметь дело с ситуацией EOF и сбросом соединения.
- Сервер явно не обрабатывает ошибку, сгенерированную интерфейсом при возврате данных, а просто передает ответ обратно, так как ошибка сохраняется в rpc.Request и не требует обработки кодеком
Работа с распаковкой и наклейкой
Конкретные идеи для справкиЯзык go обрабатывает распаковку/закрепление TCP., вот реализация readData:
func readData(conn io.ReadWriteCloser) (data []byte, returnError error) {
const HeadSize = 4 //设定长度部分占4个字节
headBuf := bytes.NewBuffer(make([]byte, 0, HeadSize))
headData := make([]byte, HeadSize)
for {
readSize, err := conn.Read(headData)
if err != nil {
returnError = err
return
}
headBuf.Write(headData[0:readSize])
if headBuf.Len() == HeadSize {
break
} else {
headData = make([]byte, HeadSize-readSize)
}
}
bodyLen := int(binary.BigEndian.Uint32(headBuf.Bytes()))
bodyBuf := bytes.NewBuffer(make([]byte, 0, bodyLen))
bodyData := make([]byte, bodyLen)
for {
readSize, err := conn.Read(bodyData)
if err != nil {
returnError = err
return
}
bodyBuf.Write(bodyData[0:readSize])
if bodyBuf.Len() == bodyLen {
break
} else {
bodyData = make([]byte, bodyLen-readSize)
}
}
data = bodyBuf.Bytes()
returnError = nil
return
}
тестовый код
Затем мы тестируем наш кодек с помощью простого вызова Echo:
//声明接口类
type EchoService struct {}
//定义方法Echo
func (service *EchoService) Echo(arg string, result *string) error {
*result = arg
return nil
}
//服务端启动逻辑
func RegisterAndServeOnTcp() {
err := rpc.Register(&EchoService{})//注册并不是注册方法,而是注册EchoService的一个实例
if err != nil {
log.Fatal("error registering", err)
return
}
tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")
if err != nil {
log.Fatal("error resolving tcp", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("error accepting", err)
} else {
//这里先通过NewServerCodec获得一个实例,然后调用rpc.ServeCodec来启动服务
rpc.ServeCodec(msgpk.NewServerCodec(conn))
}
}
}
//客户端调用逻辑
func Echo(arg string) (result string, err error) {
var client *rpc.Client
conn, err := net.Dial("tcp", ":1234")
client = rpc.NewClientWithCodec(msgpk.NewClientCodec(conn))
defer client.Close()
if err != nil {
return "", err
}
err = client.Call("EchoService.Echo", arg, &result) //通过类型加方法名指定要调用的方法
if err != nil {
return "", err
}
return result, err
}
//main函数
func main() {
go server.RegisterAndServeOnTcp() //先启动服务端
time.Sleep(1e9)
wg := new(sync.WaitGroup) //waitGroup用于阻塞主线程防止提前退出
callTimes := 10
wg.Add(callTimes)
for i := 0; i < callTimes; i++ {
go func() {
//使用hello world加一个随机数作为参数
argString := "hello world "+strconv.Itoa(rand.Int())
resultString, err := client.Echo(argString)
if err != nil {
log.Fatal("error calling:", err)
}
if resultString != argString {
fmt.Println("error")
} else {
fmt.Printf("echo:%s\n", resultString)
}
wg.Done()
}()
}
wg.Wait()
}
В приведенном выше примере сервер сначала запускается с помощью go server.RegisterAndServeOnTcp(), а затем одновременно запускаются 10 процедур go для инициирования запроса, и клиент распечатывает соответствующий результат после получения ответа. Наконец, основная функция выполняется, и консоль выводит результат (случайное число позади может быть другим):
echo:hello world 8674665223082153551
echo:hello world 6129484611666145821
echo:hello world 5577006791947779410
echo:hello world 605394647632969758
echo:hello world 4037200794235010051
echo:hello world 3916589616287113937
echo:hello world 894385949183117216
echo:hello world 1443635317331776148
echo:hello world 2775422040480279449
echo:hello world 6334824724549167320
Эпилог
На данный момент простой пользовательский rpc для языка go завершен, хотя пользовательская часть — это только часть протокола сериализации.Например, модель потоков — это по-прежнему логика, которая поставляется с go rpc, и в предисловии об этом не упоминается. различные расширенные функции. В будущем рассмотрите возможность реализации RPC с нуля на языке go.
разное
Параллельные сценарии
Внимательные студенты могли обнаружить, что реализованная здесь логика вообще не учитывает параллелизм, а кэшированные данные также непосредственно помещаются в объект кодека. И такая простая реализация не приведет к сбою одновременных вызовов.Конкретная причина в том, что когда go rpc обрабатывает каждый объект кодека, запросы на чтение выполняются последовательно, а затем запросы обрабатываются параллельно и возвращаются результаты.