Серия микросервисов (1): интерпретация исходного кода Go Rpc

Микросервисы Go RPC
Серия микросервисов (1): интерпретация исходного кода Go Rpc

Фреймворк RPC — важная часть микросервисов, и необходимо знать и понимать его принципы. Исходный код на языке Go имеет собственную реализацию функции RPC, хотя официал и заявил, что она не будет обновляться, но из-за ее простой реализации и небольшого количества кода многие места стоит изучать и изучать, что является очень хорошее начало для чтения исходного кода RPC.

Адрес источника:GitHub.com/gowaves/go/he…

1. Основное использование

Давайте сначала посмотрим на официальный пример вызова:

  1. Код серверной части:
// content of server.go
package main

import(
    "net"
    "net/rpc"
    "net/http"
    "errors"
    "log"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
    *reply = args.A * args.B
    return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
    if args.B == 0 {
        return errors.New("divide by zero")
    }
    quo.Quo = args.A / args.B
    quo.Rem = args.A % args.B
    return nil
}

func listenTCP(addr string) (net.Listener, string) {
    l, e := net.Listen("tcp", addr)
    if e != nil {
        log.Fatalf("net.Listen tcp :0: %v", e)
    }
    return l, l.Addr().String()
}

func main() {
    rpc.Register(new(Arith)) //注册服务
    var l net.Listener
    tcpAddr := "127.0.0.1:8080"
    l, serverAddr := listenTCP(tcpAddr) //监听TCP连接
    log.Println("RPC server listening on", serverAddr)
    go rpc.Accept(l)

    rpc.HandleHTTP() //监听HTTP连接
    httpAddr := "127.0.0.1:8081"
    l, serverAddr = listenTCP(httpAddr)
    log.Println("RPC server listening on", serverAddr)
    go http.Serve(l, nil)

    select{}
}

Функция вызова rpc заключается в том, что Arith реализует метод Multiply и Divide. Глядя на основную функцию, rpc реализует регистрациюrpc.Register(new(Arith))метод, затем начните слушатьlistenTCP(tcpAddr), это через метод Listen в сетевом пакете, объектом прослушивания может быть TCP соединениеrpc.Accept(l), вы также можете попробовать HTTP-соединениеhttp.Serve(l, nil), это запуск HTTPServer с помощью пакета net/http.

  1. Клиентская часть кода
// content of client.go
package main

import(
    "net/rpc"
    "log"
    "fmt"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

func main() {
    client, err := rpc.DialHTTP("tcp", "127.0.0.1:8081")
    if err != nil {
        log.Fatal("dialing:", err)
    }

    // Synchronous call
    args := &Args{7,8}
    var reply int
    err = client.Call("Arith.Multiply", args, &reply)
    if err != nil {
        log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)

    // Asynchronous call
    clientTCP, err := rpc.Dial("tcp", "127.0.0.1:8080")
    if err != nil {
        log.Fatal("dialing:", err)
    }
    
    quotient := new(Quotient)
    divCall := clientTCP.Go("Arith.Divide", args, quotient, nil)
    replyCall := <-divCall.Done    // will be equal to divCall
    if replyCall.Error != nil {
        fmt.Println(replyCall.Error)
    } else {
        fmt.Printf("Arith: %d/%d=%d...%d\n", args.A, args.B, quotient.Quo, quotient.Rem)
    }

Клиентский код rpc предоставляет два методаrpc.DialHTTPиrpc.DialОбеспечивает прослушивание соединений HTTP и Tcp соответственно. затем пройтиCallилиGoЧтобы вызвать метод сервера, разница между ними является то, что один является синхронным вызовом,Goявляется асинхронным вызовом.

результат операции:

// server.go
➜  server ./serve
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8080
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8081
// client.go
➜  client ./client
Arith: 7*8=56
Arith: 7/8=0...7

2. анализ исходного кода client.go

Давайте сначала взглянем на исходный код клиента, для начала разберемся с основной логикой клиентского кода на предыдущей картинке:

  1. Dial and DialHTTP
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
	conn, err := net.Dial(network, address)
	if err != nil {
		return nil, err
	}
	return NewClient(conn), nil
}

DialПостроен на net.Dial, возвращает клиентский объект,DialHTTPиDialАналогично, но с дополнительной обработкой HTTP и, наконец, возвращает NewClient(conn).

  1. NewClient
func NewClient(conn io.ReadWriteCloser) *Client {
	encBuf := bufio.NewWriter(conn)
	client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
	return NewClientWithCodec(client)
}

// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
	client := &Client{
		codec:   codec,
		pending: make(map[uint64]*Call),
	}
	go client.input()
	return client
}

NewClientЗдесь я сделал две вещи: во-первых, сгенерировал объект структуры клиента, включая метод сериализации, инициализировал объекты в нем и т. д. Go Rpc по умолчанию использует сериализацию gob, но также может использовать json или protobuf. Во-вторых, запустить сопрограмму goroutine, которая вызываетinputМетод, основная часть этого клиента, будет рассмотрен ниже.

  1. Call and GoВ приведенном выше примере после создания клиентского объекта он будет явно вызванCallилиGo, представляющий как синхронные, так и асинхронные вызовы. Давайте посмотрим на исходный код:
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
	call := new(Call)
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Reply = reply
	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		if cap(done) == 0 {
			log.Panic("rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	client.send(call)
	return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error
}

можно увидеть,client.Callметод на самом деле вызываетclient.Go, только черезchanблокировать.

Создайте структуру вызова, соберите метод вызова сервера, параметры, возвращаемые параметры, тег завершения вызова, а затем вызовитеclient.sendметод для отправки структуры вызова на сервер. После того, как сервер получит эти параметры, он отразит конкретный метод, а затем выполнит соответствующую функцию. НижеCallОпределение структуры:

// Call 
type Call struct {
	ServiceMethod string      // The name of the service and method to call. 服务方法名
	Args          interface{} // The argument to the function (*struct). 请求参数
	Reply         interface{} // The reply from the function (*struct). 返回参数
	Error         error       // After completion, the error status. 错误状态
	Done          chan *Call  // Strobes when call is complete. 
}
  1. client.send
func (client *Client) send(call *Call) {
	client.reqMutex.Lock()
	defer client.reqMutex.Unlock()
	// Register this call.
	client.mutex.Lock()
	if client.shutdown || client.closing {
		client.mutex.Unlock()
		call.Error = ErrShutdown
		call.done()
		return
	}
	seq := client.seq
	client.seq++
	client.pending[seq] = call
	client.mutex.Unlock()
	// Encode and send the request.
	client.request.Seq = seq
	client.request.ServiceMethod = call.ServiceMethod
	err := client.codec.WriteRequest(&client.request, call.Args)
	if err != nil {
		client.mutex.Lock()
		call = client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()
		if call != nil {
			call.Error = err
			call.done()
		}
	}
}

Метод отправки будет только сейчасcallИнформация в структуре отправляется на сервер, во-первых, данные не отправляются напрямую на сервер, а параметры запроса и методы сервера сначала назначаются структуре запроса в структуре клиента, и в процессе назначения требуются блокировки. . Затем вызовите метод WriteRequest класса Gob, чтобы сбросить данные в буфер.

  1. client.input client.sendМетод заключается в отправке данных на сервер, в то время как ввод является противоположным, и получает возвращаемый результат ответа сервера клиенту.
func (client *Client) input() {
	var err error
	var response Response
	for err == nil {
		response = Response{}
		err = client.codec.ReadResponseHeader(&response)
		if err != nil {
			break
		}
		seq := response.Seq
		client.mutex.Lock()
		call := client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()

		switch {
		case call == nil:
			
			err = client.codec.ReadResponseBody(nil)
			
            ....
			
            }
			call.done()
		}
	}
	
    ...
	
	}
}

Основная логика заключается в непрерывном чтении потока по TCP в цикле, анализе заголовка в объект Response и анализе тела в объекте call.Reply, а также вызове функции done в вызове после анализа. Таким образом, клиент может получить объект Reply, который является результатом, возвращаемым сервером, и может распечатать и получить значение.

Суммировать:

После описания этих методов вернитесь к началуclient.goБлок-схема понятна, можно сказать, что она разделена на две строки, одна строка показывает вызов для отправки данных запроса, а другая строка запускает сопрограмму для получения возвращаемых данных сервера.

3. Анализ исходного кода Server.go

Без дальнейших церемоний, давайте сделаем снимок, чтобы получить общее представление:

Все разделено на три части, первая часть регистрирует метод, определенный сервером, вторая часть слушает запрос клиента и анализирует параметры запроса, полученные от клиента. Третья часть получает параметры запроса и выполняет вызывающую функцию сервера, а результат возвращает клиенту.

Весь процесс фактически можно сравнить с процессом вызова сокета.

  1. registerСначала посмотрите на структуру сервера:
type methodType struct {
	sync.Mutex // protects counters
	method     reflect.Method
	ArgType    reflect.Type
	ReplyType  reflect.Type
	numCalls   uint
}

type service struct {
	name   string                 // name of service
	rcvr   reflect.Value          // receiver of methods for the service
	typ    reflect.Type           // type of the receiver
	method map[string]*methodType // registered methods
}

type Server struct {
	serviceMap sync.Map   // map[string]*service
	reqLock    sync.Mutex // protects freeReq
	freeReq    *Request
	respLock   sync.Mutex // protects freeResp
	freeResp   *Response
}

Глядя на английские комментарии, становится понятнее, чем то, что он делает. Сервер хранит службу сервера, а также запрос и ответ, которые он запрашивает. Эти два протокола согласованы с клиентом, а именно:

type Request struct {
	ServiceMethod string   // format: "Service.Method"
	Seq           uint64   // sequence number chosen by client
	next          *Request // for free list in Server
}

type Response struct {
	ServiceMethod string    // echoes that of the Request
	Seq           uint64    // echoes that of the request
	Error         string    // error, if any.
	next          *Response // for free list in Server
}

Служба хранит метод, который сервер должен зарегистрировать, а methodType — это атрибут конкретного метода.

Следовательно, для того, чтобы клиент мог удаленно позвонить методу сервера, предпосылка заключается в том, что методы сервера были загружены в структуру сервера до вызова, поэтому сервер должен отобразить вызов в метод регистра. Давайте посмотрим на Основной код внутри:

func (server *Server) register(rcvr interface{}, name string, useName bool) error {
	s := new(service)
	s.typ = reflect.TypeOf(rcvr)
	s.rcvr = reflect.ValueOf(rcvr)
	sname := reflect.Indirect(s.rcvr).Type().Name()
	...
	s.name = sname

	// Install the methods
	s.method = suitableMethods(s.typ, true)

	...
    
	if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
		...   
	}
    ...
}


func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
	methods := make(map[string]*methodType)
	for m := 0; m < typ.NumMethod(); m++ {
		method := typ.Method(m)
		mtype := method.Type
		mname := method.Name
		
		argType := mtype.In(1)
		
        ...
	
		replyType := mtype.In(2)
		
        ...
        
		methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
	}
	return methods
}

Этот код получает некоторые атрибуты метода, реализованного структурой, посредством отражения, включая исполняемый объект метода, имя, параметры запроса и возвращаемые параметры.

Наконец, он сохраняется в serviceMap сервера. Структура метода, используемого клиентом для вызова сервера, представляет собой struct.method, поэтому его нужно разделить только на .. После получения имени структуры и имени метода метод можно получить через serviceMap, а результат можно полученный при его выполнении.

После регистрации метода следующим шагом будет прослушивание клиентских запросов.

  1. Accept

Первый взглядAcceptкод:

func (server *Server) Accept(lis net.Listener) {
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Print("rpc.Serve: accept:", err.Error())
			return
		}
		go server.ServeConn(conn)

Через прослушиваемый порт tcp в пакете net, а затем запустить сопрограмму, посмотрим, что делается в этой сопрограмме?

func (server *Server) ServeConn(conn io.ReadWriteCloser) {
	buf := bufio.NewWriter(conn)
	srv := &gobServerCodec{
		rwc:    conn,
		dec:    gob.NewDecoder(conn),
		enc:    gob.NewEncoder(buf),
		encBuf: buf,
	}
	server.ServeCodec(srv)
}

func (server *Server) ServeCodec(codec ServerCodec) {
	sending := new(sync.Mutex)
	wg := new(sync.WaitGroup)
	for {
		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
		
       ...
       
		go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
	}
  ...
}

Ye Hao понимать это, Serveconn The Gob of Sequence и методы для сохранения структуры Conn GOBSERVERCODEC, а затем вызовите метод Server.serveCodec, этот способ выполнения вещей - пройти через клиентский анализ пакетов для пакетов, проанализируют, как возвращаются параметры запроса, которые будут возвращены, И способ регулировки сервера которых, которые обрабатываются в вышеуказанном методе Server.ReadRequest.

func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {

	service, mtype, req, keepReading, err = server.readRequestHeader(codec)
	...
}

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
	// Grab the request header.
	req = server.getRequest()
	...
	dot := strings.LastIndex(req.ServiceMethod, ".")
	...
	serviceName := req.ServiceMethod[:dot]
	methodName := req.ServiceMethod[dot+1:]

	// Look up the request.
	svci, ok := server.serviceMap.Load(serviceName)
	...
	svc = svci.(*service)
	mtype = svc.method[methodName]
	...
	}
	return
}

Основная функция находится в readRequestHeader, во-первых, разделить struct.method, переданный клиентом, в соответствии с ., затем получить serviceName и methodName, а затем перейти к server.serviceMap, чтобы получить конкретную службу и объект выполнения метода.

Получив его, он запустит сопрограмму и вызовет метод service.call, который здесь выполняет метод службы сервера, получает возвращаемый результат, а затем вызывает WriteReponse для обратной записи данных. Затем клиентский метод ввода зацикливается для получения результатов. Это образует замкнутый цикл.

Давайте посмотрим на метод service.call:

func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
	...
	function := mtype.method.Func
	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
	...
	server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
	...
}

Реализованная функция аналогична приведенному выше анализу: получить объект функции через mtype, затем вызвать отраженный метод Call для выполнения результата и, наконец, вызвать server.sendResponse для отправки результата.

Это очень ясно, когда вы видите здесь и оглядываетесь назад на блок-схему кода сервера, нарисованную выше.

Исходный код Go RPC здесь.

4. Резюме

Исходный код Go RPC в настоящее время официально не поддерживается.Официально рекомендуется использовать grpc.В следующей статье планируется анализ исходного кода grpc.

Вот краткое изложение преимуществ и недостатков:

  • преимущество:
    • Код упрощен, а масштабируемость высока.
  • недостаток:
    • При синхронном вызове асинхронный метод Go блокируется chan, а тайм-аут не обрабатывается, так что в случае тайм-аута нельзя выпустить большое количество сопрограмм.
    • Там может быть утечка памяти, потому что данные запроса клиента находится в структуре сервера. Если сервер не возвращается, данные в нем не будут очищены. Функция запускает функцию клиента и не будет убирать контент, поэтому Структура всегда будет сохранена., В результате чего утечка памяти.

Текущая среда RPC с открытым исходным кодом больше не является простым сетевым вызовом, подобным этому, но также включает множество функций управления службами, таких как регистрация и обнаружение служб, ограничение тока и предохранитель, мониторинг и т. д. В будущем это будет передано новым rpc, и, наконец, вы достигнете цели самостоятельно написать полную инфраструктуру rpc.

Для получения дополнительных статей и обсуждений, связанных с RPC, обратите внимание на общедоступный аккаунт: «Tiancheng Technology Talk».