1 Обзор
Фреймворк rpc включен в исходный код go, а функция rpc реализована относительно упрощенным способом в то время.В настоящее время официальный представитель rpc в исходном коде объявил, что новые функции добавляться не будут, и использование Рекомендуется grpc. Поскольку инфраструктура rpc в стандартной библиотеке go, есть еще много мест, которые стоит изучить и изучить.Здесь мы проанализируем инфраструктуру go native rpc с точки зрения исходного кода.
2. серверная часть
Сторона сервера в основном делится на два этапа.Во-первых, метод регистрируется, и метод извлекается через обработку отражения и сохраняется на карте.Затем идет сетевой вызов, в основном прослушивание порта, чтение пакета данных, и расшифровка запроса. Вызовите метод после обработки отражения, закодируйте возвращаемое значение и верните его клиенту.
2.1 Регистрация метода
2.1.1 Register
// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func RegisterName(name string, rcvr interface{}) error {
return DefaultServer.RegisterName(name, rcvr)
}
Как и выше, для регистрации метода есть две функции входа, а именно: Register и RegisterName, где interface{} обычно представляет собой объект с методом. Если вы хотите настроить принимающий объект метода, вы можете использовать RegisterName.
2.1.2 Процесс отражения
type methodType struct {
sync.Mutex // protects counters
method reflect.Method //反射后的函数
ArgType reflect.Type //请求参数的反射值
ReplyType reflect.Type //返回参数的反射值
numCalls uint //调用次数
}
type service struct {
name string // 服务名,这里通常为register时的对象名或自定义对象名
rcvr reflect.Value // 服务的接收者的反射值
typ reflect.Type // 接收者的类型
method map[string]*methodType // 对象的所有方法的反射结果.
}
Процесс обработки отражения фактически заключается в создании вышеуказанной структуры посредством отражения объекта и метода объекта. Например, когда регистрируется такой объект, как ошибка Arith.Multiply(xx,xx), сгенерированная структура представляет собой карту[" Arith"]*service, service Метод map["Multiply"]*methodType.
Ниже приведены несколько ключевых кодов:
Создать сервисный объект
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
//生成service
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
....
s.name = sname
// 通过suitableMethods将对象的方法转换成map[string]*methodType结构
s.method = suitableMethods(s.typ, true)
....
//service存储为键值对
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
return nil
}
создать карту [строка] *methodType
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
//通过反射,遍历所有的方法
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
}
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
if reportErr {
log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
}
continue
}
//取出请求参数类型
argType := mtype.In(1)
...
// 取出响应参数类型,响应参数必须为指针
replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr {
if reportErr {
log.Println("method", mname, "reply type not a pointer:", replyType)
}
continue
}
...
// 去除函数的返回值,函数的返回值必须为error.
if returnType := mtype.Out(0); returnType != typeOfError {
if reportErr {
log.Println("method", mname, "returns", returnType.String(), "not error")
}
continue
}
//将方法存储成key-value
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
2.2 Сетевые вызовы
// Request 每次rpc调用的请求的头部分
type Request struct {
ServiceMethod string // 格式为: "Service.Method"
Seq uint64 // 客户端生成的序列号
next *Request // server端保持的链表
}
// Response 每次rpc调用的响应的头部分
type Response struct {
ServiceMethod string // 对应请求部分的 ServiceMethod
Seq uint64 // 对应请求部分的 Seq
Error string // 错误
next *Response // server端保持的链表
}
Как и выше, сетевой вызов в основном использует две вышеупомянутые структуры, которые являются параметрами запроса и возвращаемыми параметрами, и реализует взаимное преобразование из двоичного кода в структуру через кодек (gob/json), Он в основном включает следующие шаги:
Код ключа следующий: Вынуть запрос и получить параметры вызова соответствующей функции
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
//编码器读取生成请求
err = codec.ReadRequestHeader(req)
if err != nil {
//错误处理
...
return
}
keepReading = true
//取出服务名以及方法名
dot := strings.LastIndex(req.ServiceMethod, ".")
if dot < 0 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
return
}
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]
//从注册时生成的map中查询出相应的方法的结构
svci, ok := server.serviceMap.Load(serviceName)
if !ok {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
return
}
svc = svci.(*service)
//获取出方法的类型
mtype = svc.method[methodName]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
}
//Обработка цикла, постоянно читаем поток байтов по ссылке, расшифровываем запрос, вызываем метод, кодируем ответ и записываем обратно клиенту.
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
for {
//读取请求
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
...
}
//调用
go service.call(server, sending, mtype, req, argv, replyv, codec)
}
codec.Close()
}
вызов функции с параметрами
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// 通过反射进行函数调用
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// 返回值是不为空时,则取出错误的string
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
//发送相应,并释放请求结构
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}
3. Клиентская сторона
// 异步调用
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
}
// 同步调用
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
}
// Call represents an active RPC.
type Call struct {
ServiceMethod string // 服务名及方法名 格式:服务.方法
Args interface{} // 函数的请求参数 (*struct).
Reply interface{} // 函数的响应参数 (*struct).
Error error // 方法完成后 error的状态.
Done chan *Call // 方法调用结束后的channel.
}
Клиентская часть относительно проста, в основном предоставляет методы Call и Go, которые представляют собой синхронные вызовы и асинхронные вызовы соответственно, но на самом деле базовая реализация синхронных вызовов на самом деле является асинхронными вызовами, а структура Call в основном используется в вызове. , и соответствующие пояснения приведены выше.
3.1 Основной процесс
3.2 Код ключа
Отправьте часть кода запроса, каждый раз, когда отправляется запрос, генерируется объект вызова, и последовательность используется в качестве ключа для его сохранения на карте.Когда сервер возвращается, вызов берется из карты и обрабатывается соответственно.
func (client *Client) send(call *Call) {
//请求级别的锁
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
call.Error = ErrShutdown
client.mutex.Unlock()
call.done()
return
}
//生成seq,每次调用均生成唯一的seq,在服务端相应后会通过该值进行匹配
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// 请求并发送请求
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
//发送请求错误时,将map中call对象删除.
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
Код для получения части ответа, вот цикл for, который непрерывно считывает поток по tcp и декодирует его в объект Response и объект Reply метода.
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
//通过response中的 Seq获取call对象
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
//服务端返回错误,直接将错误返回
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
//通过编码器,将Resonse的body部分解码成reply对象.
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// 客户端退出处理
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}
4. Некоторые недостатки
-
Синхронные вызовы не могут истечь по времени
Поскольку собственный rpc предоставляет только два метода: синхронный вызов и асинхронный переход, синхронный сервер вызовов всегда будет блокироваться, если он не вернется. -
Утечка памяти после истечения времени ожидания асинхронного вызова
Реализация функции таймаута на основе асинхронных вызовов и каналов также будет иметь проблемы с утечкой, причина в том, что запрос клиента будет храниться в структуре карты, а выход из функции Go не очистит содержимое карты, поэтому если сервер не вернется, то запрос в карте памяти продолжит существовать, что приведет к утечке памяти. -
Базовое состояние ссылки не может быть сохранено
Поскольку механизм проверки активности отсутствует, при повторном использовании базовой ссылки ссылка будет фактически недоступна, но верхний уровень не сможет ее воспринять, что приведет к запросу, на который не удалось получить ответ.
5. Резюме
В общем, go native rpc — это базовая версия rpc с упрощенным кодом и высокой масштабируемостью, но она реализует только самые основные сетевые коммуникации rpc, такие как тайм-аут, управление ссылками (поддержание активности и повторное подключение), регистрация службы. обнаружил, что его все еще не хватает, поэтому он по-прежнему не может достичь готовой производственной среды.Относительно говоря, grpc намного более зрелый.Недавно я планирую интегрировать набор микросервисных коммуникационных фреймворков на основе grpc.Большинство компоненты с открытым исходным кодом, см. проект.grpc-wrapper.