RPC
что
RPC?
RPCполное имяRemote Procedure CallТрансляция удаленного вызова процедуры
RPCиHTTPразница
HTTPявляется соглашением,RPCв состоянии пройтиHTTPтакже может быть достигнута путемSocketРеализуйте набор протоколов самостоятельно.
По сложности,RPCРамки определенно выше простотыHTTPИнтерфейс. Но без сомнения,HTTPИнтерфейс ограниченHTTPсоглашение, необходимо принестиHTTPЗаголовок запроса, что приводит к менее эффективной или менее безопасной передачеRPC
и котказываться отмаленький,HTTPсоглашение относительноTCPПротокол сообщений, добавленный накладной расход лежит в соединении и отключении.HTTPОн поддерживает мультиплексирование пула соединений(HTTP 1.x)
Бережливая архитектура
Apache Thriftпредставляет собой межъязыковую структуру обслуживания, в основном дляRPC, с механизмами сериализации и десериализацииThriftСодержит полную структуру стека для построения клиентской и серверной частей.
Протокол передачи(TProtocol)
ThriftЭто позволяет пользователям выбирать разницу между протоколом передачи данных между клиентом и сервером.Протокол передачи обычно делится натекстовый и двоичный (binary) Протокол передачи, Чтобы сохранить полосу пропускания и повысить эффективность передачи, в большинстве случаев используется двоичный тип протокола передачи.
-
TBinaryProtocol: Двоичный формат кодирования для передачи данных -
TCompactProtocol: Эффективный формат плотного двоичного кодирования для передачи данных -
TJSONProtocol:использоватьJSONпротокол кодирования данных для передачи данных -
TDebugProtocol: Используйте читаемый текстовый формат, который легко понять.debug
Способ передачи данных (TTransport)
TTransportЭто транспортный уровень, тесно связанный с базовой передачей данных. Для каждого поддерживаемого базового транспорта существует соответствующийTTransport. На этом уровне данные обрабатываются как поток байтов, то есть транспортный уровень видит один байт за другим, и последовательно отправляет и получает эти байты.TTransportЯ не знаю, какой тип данных он передает.На самом деле, транспортному уровню все равно, какой это тип данных.Он должен только отправлять и получать данные в байтах. Парсинг типов данных находится вTProtocolЭтот слой завершен.
-
TSocket: использовать блокировкуI/Oпередача, которая является наиболее распространенным способом -
THttpTransport:использоватьHTTPпротокол передачи данных -
TFramedTransPort: отframeОн передается единицами и используется в неблокирующих сервисах; -
TFileTransPort: передать как файл -
TMemoryTransport: Память используется дляI/Oкоробка передач -
TZlibTransport:использоватьzlibСжатый для использования с другими транспортами -
TBufferedTransportкtransportВыполняется объектная манипуляция даннымиbuffer, т.е. изbufferчитать данные для передачи или записывать данные непосредственно вbuffer
Модель сети на стороне сервера (TServer)
TServerсуществуетthriftОсновная задача в рамках — получитьclientзапрос и направить его вprocessorобработка запросов. Для разных масштабов доступаthriftобеспечивает различныеTServerМодель.thriftВ настоящее время поддерживаетсяserverМодели включают:
-
TSimpleServer: Однопоточная серверная часть с использованием стандартной блокировкиI/O -
TTHreaadPoolServer: Многопоточная серверная часть использует стандартную блокировку.I/O -
TNonblockingServer: многопоточная серверная часть с использованием неблокирующегоI/O -
TThreadedServer: многопоточная сетевая модель, использующая блокировкуI/OСоздайте нить для каждого запроса
Для `Golang`, только для сервисной модели Tsimpleserver.
TProcesser
TProcessorглавная параTServerв одном запросеinputProtocolиoutputProtocolдействовать, то есть изinputProtocolчитать вclientзапросить данные, чтобыoutputProtocolНапишите возвращаемое значение пользовательской логики.TProcessorprocessявляется очень важным обработчиком, потому чтоclientвсеrpcЗвонки будут обрабатываться и переадресовываться этой функцией
ThriftClient
ThriftClientиTProcessorта же основная операцияinputProtocolиoutputProtocol, разницаthriftClientбудетrpcзвонок делится наsendиreceiveДва шага:
-
sendШаг, возьмите параметры звонка пользователя в целомstructнаписатьTProtocol, и отправил вTServer. -
sendпосле окончания,thriftClientвойти немедленноreceiveстатус ожиданияTServerответ на. заTServerОтвет, используйте класс синтаксического анализа возвращаемого значения, чтобы проанализировать возвращаемое значение, завершитьrpcперечислить.
Сервисный режим TSimpleServer
На самом деле это не типичноTSimpleServer,потому что он не блокируется после принятия сокета.
это больше похоже наTThreadedServer, может обрабатывать различныеgoroutineразличные соединения в .
еслиgolangРеализация пользователя на стороне клиентаconn-poolЧто-то вроде этого будет работать.
type TSimpleServer struct {
quit chan struct{} // 采用阻塞channel进行判断
processorFactory TProcessorFactory
serverTransport TServerTransport
inputTransportFactory TTransportFactory
outputTransportFactory TTransportFactory
inputProtocolFactory TProtocolFactory
outputProtocolFactory TProtocolFactory
}
следующий кодthrift-idlИбо следующий анализ берет это в качестве примера
namespace go echo
struct EchoReq {
1: string msg;
}
struct EchoRes {
1: string msg;
}
service Echo {
EchoRes echo(1: EchoReq req);
}
Код сервера сервера
func (p *TSimpleServer) Serve() error {
err := p.Listen()
if err != nil {
return err
}
p.AcceptLoop()
return nil
}
func (p *TSimpleServer) AcceptLoop() error {
for {
// 此处的Accept()是阻塞的,是调用listener.Accept()
client, err := p.serverTransport.Accept()
if err != nil {
select {
case <-p.quit:
return nil
default:
}
return err
}
if client != nil {
go func() {
if err := p.processRequests(client); err != nil {
log.Println("error processing request:", err)
}
}()
}
}
}
еслиserverВ это время запрос все еще обрабатывается, и сервер внезапно перезагружается.thrift 1.0Невозможно сделать изящный перезапуск,ноgo thriftПоследняя версия принятаgolang waitgroupПутьДостигнут изящный перезапуск ~
func (p *TSimpleServer) processRequests(client TTransport) error {
processor := p.processorFactory.GetProcessor(client)
inputTransport := p.inputTransportFactory.GetTransport(client)
outputTransport := p.outputTransportFactory.GetTransport(client)
inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
defer func() {
if e := recover(); e != nil {
log.Printf("panic in processor: %s: %s", e, debug.Stack())
}
}()
if inputTransport != nil {
defer inputTransport.Close()
}
if outputTransport != nil {
defer outputTransport.Close()
}
for {
ok, err := processor.Process(inputProtocol, outputProtocol)
if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
return nil
} else if err != nil {
log.Printf("error processing request: %s", err)
return err
}
if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
continue
}
if !ok {
break
}
}
return nil
}
Processлогика обработки
func (p *EchoProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
name, _, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return false, err
}
// 获取传递过来的name,如果存在则处理
if processor, ok := p.GetProcessorFunction(name); ok {
return processor.Process(seqId, iprot, oprot)
}
// 异常逻辑
iprot.Skip(thrift.STRUCT)
iprot.ReadMessageEnd()
x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
x3.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, x3
}
TServerполучилаrpcПосле заявки позвонитеTProcessorprocessдля обработки.TProcessorprocessпервый звонокTTransport.readMessageBeginинтерфейс, читатьrpcназвание вызова иrpcтип вызова.
еслиrpcТип вызоваrpc call, затем позвонитеTProcessor.process_fnпродолжить обработку, неизвестноrpcТип вызова, исключение брошено.TProcessor.process_fnв соответствии сrpcназывать имя, владетьprocessMapнайти соответствующийrpcфункция обработчика. Если есть соответствующийrpcЕсли функция обработки вызывается, функция обработки вызывается для продолжения ответа на запрос. Выдает исключение, если оно не существует.
func (p *echoProcessorEcho) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := EchoEchoArgs{}
// 读取入参的参数
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, err
}
iprot.ReadMessageEnd()
result := EchoEchoResult{}
var retval *EchoRes
var err2 error
// 此处是thrift为什么err不能传错误,如果传业务错误会被阻塞
if retval, err2 = p.handler.Echo(args.Req); err2 != nil {
x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing echo: "+err2.Error())
oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return true, err2
} else {
result.Success = retval
}
if err2 = oprot.WriteMessageBegin("echo", thrift.REPLY, seqId); err2 != nil {
err = err2
}
if err2 = result.Write(oprot); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.Flush(); err == nil && err2 != nil {
err = err2
}
if err != nil {
return
}
return true, err
}
Серверstopкод
var once sync.Once
func (p *TSimpleServer) Stop() error {
q := func() {
p.quit <- struct{}{}
p.serverTransport.Interrupt()
}
once.Do(q)
return nil
}
stopФункция относительно простая, вы видите, что данные сразу в очередь блокировки записываются, затемserverбольше не принимаю запросы
код клиента
Clientфункция называется
func (p *EchoClient) Echo(req *EchoReq) (r *EchoRes, err error) {
if err = p.sendEcho(req); err != nil {
return
}
return p.recvEcho()
}
sendEcho()функция
func (p *EchoClient) sendEcho(req *EchoReq) (err error) {
oprot := p.OutputProtocol
if oprot == nil {
oprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.OutputProtocol = oprot
}
// seqid + 1
p.SeqId++
if err = oprot.WriteMessageBegin("echo", thrift.CALL, p.SeqId); err != nil {
return
}
// 构建参数
args := EchoEchoArgs{
Req: req,
}
if err = args.Write(oprot); err != nil {
return
}
// 通知服务器发送完毕
if err = oprot.WriteMessageEnd(); err != nil {
return
}
return oprot.Flush()
}
recvEcho()функция
func (p *EchoClient) recvEcho() (value *EchoRes, err error) {
iprot := p.InputProtocol
if iprot == nil {
iprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.InputProtocol = iprot
}
//
method, mTypeId, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return
}
if method != "echo" {
err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "echo failed: wrong method name")
return
}
if p.SeqId != seqId {
err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "echo failed: out of sequence response")
return
}
if mTypeId == thrift.EXCEPTION {
error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
var error1 error
error1, err = error0.Read(iprot)
if err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
err = error1
return
}
if mTypeId != thrift.REPLY {
err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "echo failed: invalid message type")
return
}
result := EchoEchoResult{}
if err = result.Read(iprot); err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
value = result.GetSuccess()
return
}
Проблема с установкой THRIFT на компьютере Mac
- Вопрос 1:
go get git.apache.org/thrift.git/lib/go/thriftПотерпеть поражение - Проблема 2: Прямое использование
github.comПредоставленная версия сообщит о неизвестной ошибке
Вопрос 2 должен основываться на вашемthrift -versionчтобы определить, какую версию скачатьthrift, например мойthrift版本是0.10.0Тогда нужно скачатьthriftадресhttps://github.com/apache/thrift/archive/0.10.0.zip
Создать вручнуюmkdir -p git.apache.org/thrift.git/lib/go/каталог, а затем загруженныйgoПереместите файл в этот каталог~
Reference
- Woohoo.16 blog.com/series/ он…
- Woohoo. IBM.com/developer Я…
- путь AU.com/remote-Proc…
- Woohoo.slide share.net/DVI RS can/int…
- блог woo woo woo.cn на.com/LBSwhile/afraid/485…
- блог woo woo woo.cn on.com/winner-0715…
- zhuanlan.zhihu.com/p/53685973
- zhuanlan.zhihu.com/p/53687302
- сегмент fault.com/ah/119000001…