Фреймворк RPC — важная часть микросервисов, и необходимо знать и понимать его принципы. Исходный код на языке Go имеет собственную реализацию функции RPC, хотя официал и заявил, что она не будет обновляться, но из-за ее простой реализации и небольшого количества кода многие места стоит изучать и изучать, что является очень хорошее начало для чтения исходного кода RPC.
Адрес источника:GitHub.com/gowaves/go/he…
1. Основное использование
Давайте сначала посмотрим на официальный пример вызова:
- Код серверной части:
// content of server.go
package main
import(
"net"
"net/rpc"
"net/http"
"errors"
"log"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func listenTCP(addr string) (net.Listener, string) {
l, e := net.Listen("tcp", addr)
if e != nil {
log.Fatalf("net.Listen tcp :0: %v", e)
}
return l, l.Addr().String()
}
func main() {
rpc.Register(new(Arith)) //注册服务
var l net.Listener
tcpAddr := "127.0.0.1:8080"
l, serverAddr := listenTCP(tcpAddr) //监听TCP连接
log.Println("RPC server listening on", serverAddr)
go rpc.Accept(l)
rpc.HandleHTTP() //监听HTTP连接
httpAddr := "127.0.0.1:8081"
l, serverAddr = listenTCP(httpAddr)
log.Println("RPC server listening on", serverAddr)
go http.Serve(l, nil)
select{}
}
Функция вызова rpc заключается в том, что Arith реализует метод Multiply и Divide.
Глядя на основную функцию, rpc реализует регистрациюrpc.Register(new(Arith))
метод, затем начните слушатьlistenTCP(tcpAddr)
, это через метод Listen в сетевом пакете, объектом прослушивания может быть TCP соединениеrpc.Accept(l)
, вы также можете попробовать HTTP-соединениеhttp.Serve(l, nil)
, это запуск HTTPServer с помощью пакета net/http.
- Клиентская часть кода
// content of client.go
package main
import(
"net/rpc"
"log"
"fmt"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
func main() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:8081")
if err != nil {
log.Fatal("dialing:", err)
}
// Synchronous call
args := &Args{7,8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)
// Asynchronous call
clientTCP, err := rpc.Dial("tcp", "127.0.0.1:8080")
if err != nil {
log.Fatal("dialing:", err)
}
quotient := new(Quotient)
divCall := clientTCP.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
if replyCall.Error != nil {
fmt.Println(replyCall.Error)
} else {
fmt.Printf("Arith: %d/%d=%d...%d\n", args.A, args.B, quotient.Quo, quotient.Rem)
}
Клиентский код rpc предоставляет два методаrpc.DialHTTP
иrpc.Dial
Обеспечивает прослушивание соединений HTTP и Tcp соответственно. затем пройтиCall
илиGo
Чтобы вызвать метод сервера, разница между ними является то, что один является синхронным вызовом,Go
является асинхронным вызовом.
результат операции:
// server.go
➜ server ./serve
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8080
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8081
// client.go
➜ client ./client
Arith: 7*8=56
Arith: 7/8=0...7
2. анализ исходного кода client.go
Давайте сначала взглянем на исходный код клиента, для начала разберемся с основной логикой клиентского кода на предыдущей картинке:
-
Dial
andDialHTTP
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
Dial
Построен на net.Dial, возвращает клиентский объект,DialHTTP
иDial
Аналогично, но с дополнительной обработкой HTTP и, наконец, возвращает NewClient(conn).
NewClient
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
NewClient
Здесь я сделал две вещи: во-первых, сгенерировал объект структуры клиента, включая метод сериализации, инициализировал объекты в нем и т. д. Go Rpc по умолчанию использует сериализацию gob, но также может использовать json или protobuf. Во-вторых, запустить сопрограмму goroutine, которая вызываетinput
Метод, основная часть этого клиента, будет рассмотрен ниже.
-
Call
andGo
В приведенном выше примере после создания клиентского объекта он будет явно вызванCall
илиGo
, представляющий как синхронные, так и асинхронные вызовы. Давайте посмотрим на исходный код:
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
можно увидеть,client.Call
метод на самом деле вызываетclient.Go
, только черезchan
блокировать.
Создайте структуру вызова, соберите метод вызова сервера, параметры, возвращаемые параметры, тег завершения вызова, а затем вызовитеclient.send
метод для отправки структуры вызова на сервер. После того, как сервер получит эти параметры, он отразит конкретный метод, а затем выполнит соответствующую функцию.
НижеCall
Определение структуры:
// Call
type Call struct {
ServiceMethod string // The name of the service and method to call. 服务方法名
Args interface{} // The argument to the function (*struct). 请求参数
Reply interface{} // The reply from the function (*struct). 返回参数
Error error // After completion, the error status. 错误状态
Done chan *Call // Strobes when call is complete.
}
client.send
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
Метод отправки будет только сейчасcall
Информация в структуре отправляется на сервер, во-первых, данные не отправляются напрямую на сервер, а параметры запроса и методы сервера сначала назначаются структуре запроса в структуре клиента, и в процессе назначения требуются блокировки. . Затем вызовите метод WriteRequest класса Gob, чтобы сбросить данные в буфер.
-
client.input
client.send
Метод заключается в отправке данных на сервер, в то время как ввод является противоположным, и получает возвращаемый результат ответа сервера клиенту.
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
err = client.codec.ReadResponseBody(nil)
....
}
call.done()
}
}
...
}
}
Основная логика заключается в непрерывном чтении потока по TCP в цикле, анализе заголовка в объект Response и анализе тела в объекте call.Reply, а также вызове функции done в вызове после анализа. Таким образом, клиент может получить объект Reply, который является результатом, возвращаемым сервером, и может распечатать и получить значение.
Суммировать:
После описания этих методов вернитесь к началуclient.go
Блок-схема понятна, можно сказать, что она разделена на две строки, одна строка показывает вызов для отправки данных запроса, а другая строка запускает сопрограмму для получения возвращаемых данных сервера.
3. Анализ исходного кода Server.go
Без дальнейших церемоний, давайте сделаем снимок, чтобы получить общее представление:
Все разделено на три части, первая часть регистрирует метод, определенный сервером, вторая часть слушает запрос клиента и анализирует параметры запроса, полученные от клиента. Третья часть получает параметры запроса и выполняет вызывающую функцию сервера, а результат возвращает клиенту.
Весь процесс фактически можно сравнить с процессом вызова сокета.
-
register
Сначала посмотрите на структуру сервера:
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
type Server struct {
serviceMap sync.Map // map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
Глядя на английские комментарии, становится понятнее, чем то, что он делает. Сервер хранит службу сервера, а также запрос и ответ, которые он запрашивает. Эти два протокола согласованы с клиентом, а именно:
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
type Response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *Response // for free list in Server
}
Служба хранит метод, который сервер должен зарегистрировать, а methodType — это атрибут конкретного метода.
Следовательно, для того, чтобы клиент мог удаленно позвонить методу сервера, предпосылка заключается в том, что методы сервера были загружены в структуру сервера до вызова, поэтому сервер должен отобразить вызов в метод регистра. Давайте посмотрим на Основной код внутри:
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
...
s.name = sname
// Install the methods
s.method = suitableMethods(s.typ, true)
...
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
...
}
...
}
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
argType := mtype.In(1)
...
replyType := mtype.In(2)
...
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
Этот код получает некоторые атрибуты метода, реализованного структурой, посредством отражения, включая исполняемый объект метода, имя, параметры запроса и возвращаемые параметры.
Наконец, он сохраняется в serviceMap сервера. Структура метода, используемого клиентом для вызова сервера, представляет собой struct.method, поэтому его нужно разделить только на .. После получения имени структуры и имени метода метод можно получить через serviceMap, а результат можно полученный при его выполнении.
После регистрации метода следующим шагом будет прослушивание клиентских запросов.
Accept
Первый взглядAccept
код:
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("rpc.Serve: accept:", err.Error())
return
}
go server.ServeConn(conn)
Через прослушиваемый порт tcp в пакете net, а затем запустить сопрограмму, посмотрим, что делается в этой сопрограмме?
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
...
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
...
}
Ye Hao понимать это, Serveconn The Gob of Sequence и методы для сохранения структуры Conn GOBSERVERCODEC, а затем вызовите метод Server.serveCodec, этот способ выполнения вещей - пройти через клиентский анализ пакетов для пакетов, проанализируют, как возвращаются параметры запроса, которые будут возвращены, И способ регулировки сервера которых, которые обрабатываются в вышеуказанном методе Server.ReadRequest.
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
...
}
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
...
dot := strings.LastIndex(req.ServiceMethod, ".")
...
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]
// Look up the request.
svci, ok := server.serviceMap.Load(serviceName)
...
svc = svci.(*service)
mtype = svc.method[methodName]
...
}
return
}
Основная функция находится в readRequestHeader, во-первых, разделить struct.method, переданный клиентом, в соответствии с ., затем получить serviceName и methodName, а затем перейти к server.serviceMap, чтобы получить конкретную службу и объект выполнения метода.
Получив его, он запустит сопрограмму и вызовет метод service.call, который здесь выполняет метод службы сервера, получает возвращаемый результат, а затем вызывает WriteReponse для обратной записи данных. Затем клиентский метод ввода зацикливается для получения результатов. Это образует замкнутый цикл.
Давайте посмотрим на метод service.call:
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
...
function := mtype.method.Func
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
...
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
...
}
Реализованная функция аналогична приведенному выше анализу: получить объект функции через mtype, затем вызвать отраженный метод Call для выполнения результата и, наконец, вызвать server.sendResponse для отправки результата.
Это очень ясно, когда вы видите здесь и оглядываетесь назад на блок-схему кода сервера, нарисованную выше.
Исходный код Go RPC здесь.
4. Резюме
Исходный код Go RPC в настоящее время официально не поддерживается.Официально рекомендуется использовать grpc.В следующей статье планируется анализ исходного кода grpc.
Вот краткое изложение преимуществ и недостатков:
- преимущество:
- Код упрощен, а масштабируемость высока.
- недостаток:
- При синхронном вызове асинхронный метод Go блокируется chan, а тайм-аут не обрабатывается, так что в случае тайм-аута нельзя выпустить большое количество сопрограмм.
- Там может быть утечка памяти, потому что данные запроса клиента находится в структуре сервера. Если сервер не возвращается, данные в нем не будут очищены. Функция запускает функцию клиента и не будет убирать контент, поэтому Структура всегда будет сохранена., В результате чего утечка памяти.
Текущая среда RPC с открытым исходным кодом больше не является простым сетевым вызовом, подобным этому, но также включает множество функций управления службами, таких как регистрация и обнаружение служб, ограничение тока и предохранитель, мониторинг и т. д. В будущем это будет передано новым rpc, и, наконец, вы достигнете цели самостоятельно написать полную инфраструктуру rpc.
Для получения дополнительных статей и обсуждений, связанных с RPC, обратите внимание на общедоступный аккаунт: «Tiancheng Technology Talk».