бережливый парсинг голанга

Apache Thrift

RPC

чтоRPC?

RPCполное имяRemote Procedure CallТрансляция удаленного вызова процедуры

RPCиHTTPразница

HTTPявляется соглашением,RPCв состоянии пройтиHTTPтакже может быть достигнута путемSocketРеализуйте набор протоколов самостоятельно.

По сложности,RPCРамки определенно выше простотыHTTPИнтерфейс. Но без сомнения,HTTPИнтерфейс ограниченHTTPсоглашение, необходимо принестиHTTPЗаголовок запроса, что приводит к менее эффективной или менее безопасной передачеRPC

и котказываться отмаленький,HTTPсоглашение относительноTCPПротокол сообщений, добавленный накладной расход лежит в соединении и отключении.HTTPОн поддерживает мультиплексирование пула соединений(HTTP 1.x)

Бережливая архитектура

Apache Thriftпредставляет собой межъязыковую структуру обслуживания, в основном дляRPC, с механизмами сериализации и десериализацииThriftСодержит полную структуру стека для построения клиентской и серверной частей.

Протокол передачи(TProtocol)

ThriftЭто позволяет пользователям выбирать разницу между протоколом передачи данных между клиентом и сервером.Протокол передачи обычно делится натекстовый и двоичный (binary) Протокол передачи, Чтобы сохранить полосу пропускания и повысить эффективность передачи, в большинстве случаев используется двоичный тип протокола передачи.

  • TBinaryProtocol: Двоичный формат кодирования для передачи данных
  • TCompactProtocol: Эффективный формат плотного двоичного кодирования для передачи данных
  • TJSONProtocol:использоватьJSONпротокол кодирования данных для передачи данных
  • TDebugProtocol: Используйте читаемый текстовый формат, который легко понять.debug

Способ передачи данных (TTransport)

TTransportЭто транспортный уровень, тесно связанный с базовой передачей данных. Для каждого поддерживаемого базового транспорта существует соответствующийTTransport. На этом уровне данные обрабатываются как поток байтов, то есть транспортный уровень видит один байт за другим, и последовательно отправляет и получает эти байты.TTransportЯ не знаю, какой тип данных он передает.На самом деле, транспортному уровню все равно, какой это тип данных.Он должен только отправлять и получать данные в байтах. Парсинг типов данных находится вTProtocolЭтот слой завершен.

  • TSocket: использовать блокировкуI/Oпередача, которая является наиболее распространенным способом
  • THttpTransport:использоватьHTTPпротокол передачи данных
  • TFramedTransPort: отframeОн передается единицами и используется в неблокирующих сервисах;
  • TFileTransPort: передать как файл
  • TMemoryTransport: Память используется дляI/Oкоробка передач
  • TZlibTransport:использоватьzlibСжатый для использования с другими транспортами
  • TBufferedTransportкtransportВыполняется объектная манипуляция даннымиbuffer, т.е. изbufferчитать данные для передачи или записывать данные непосредственно вbuffer

Модель сети на стороне сервера (TServer)

TServerсуществуетthriftОсновная задача в рамках — получитьclientзапрос и направить его вprocessorобработка запросов. Для разных масштабов доступаthriftобеспечивает различныеTServerМодель.thriftВ настоящее время поддерживаетсяserverМодели включают:

  • TSimpleServer: Однопоточная серверная часть с использованием стандартной блокировкиI/O
  • TTHreaadPoolServer: Многопоточная серверная часть использует стандартную блокировку.I/O
  • TNonblockingServer: многопоточная серверная часть с использованием неблокирующегоI/O
  • TThreadedServer: многопоточная сетевая модель, использующая блокировкуI/OСоздайте нить для каждого запроса

Для `Golang`, только для сервисной модели Tsimpleserver.

TProcesser

TProcessorглавная параTServerв одном запросеinputProtocolиoutputProtocolдействовать, то есть изinputProtocolчитать вclientзапросить данные, чтобыoutputProtocolНапишите возвращаемое значение пользовательской логики.TProcessorprocessявляется очень важным обработчиком, потому чтоclientвсеrpcЗвонки будут обрабатываться и переадресовываться этой функцией

ThriftClient

ThriftClientиTProcessorта же основная операцияinputProtocolиoutputProtocol, разницаthriftClientбудетrpcзвонок делится наsendиreceiveДва шага:

  • sendШаг, возьмите параметры звонка пользователя в целомstructнаписатьTProtocol, и отправил вTServer.
  • sendпосле окончания,thriftClientвойти немедленноreceiveстатус ожиданияTServerответ на. заTServerОтвет, используйте класс синтаксического анализа возвращаемого значения, чтобы проанализировать возвращаемое значение, завершитьrpcперечислить.

Сервисный режим TSimpleServer

На самом деле это не типичноTSimpleServer,потому что он не блокируется после принятия сокета. это больше похоже наTThreadedServer, может обрабатывать различныеgoroutineразличные соединения в . еслиgolangРеализация пользователя на стороне клиентаconn-poolЧто-то вроде этого будет работать.

type TSimpleServer struct {
	quit chan struct{}     // 采用阻塞channel进行判断

	processorFactory       TProcessorFactory
	serverTransport        TServerTransport
	inputTransportFactory  TTransportFactory
	outputTransportFactory TTransportFactory
	inputProtocolFactory   TProtocolFactory
	outputProtocolFactory  TProtocolFactory
}

следующий кодthrift-idlИбо следующий анализ берет это в качестве примера

namespace go echo

struct EchoReq {
    1: string msg;
}

struct EchoRes {
    1: string msg;
}

service Echo {
    EchoRes echo(1: EchoReq req);
}

Код сервера сервера

func (p *TSimpleServer) Serve() error {
	err := p.Listen()
	if err != nil {
		return err
	}
	p.AcceptLoop()
	return nil
}

func (p *TSimpleServer) AcceptLoop() error {
	for {
	    // 此处的Accept()是阻塞的,是调用listener.Accept()
		client, err := p.serverTransport.Accept()
		if err != nil {
			select {
			case <-p.quit:
				return nil
			default:
			}
			return err
		}
		if client != nil {
			go func() {
				if err := p.processRequests(client); err != nil {
					log.Println("error processing request:", err)
				}
			}()
		}
	}
}

еслиserverВ это время запрос все еще обрабатывается, и сервер внезапно перезагружается.thrift 1.0Невозможно сделать изящный перезапуск,ноgo thriftПоследняя версия принятаgolang waitgroupПутьДостигнут изящный перезапуск ~

func (p *TSimpleServer) processRequests(client TTransport) error {
	processor := p.processorFactory.GetProcessor(client)

	inputTransport := p.inputTransportFactory.GetTransport(client)
	outputTransport := p.outputTransportFactory.GetTransport(client)

	inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
	outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
	defer func() {
		if e := recover(); e != nil {
			log.Printf("panic in processor: %s: %s", e, debug.Stack())
		}
	}()
	if inputTransport != nil {
		defer inputTransport.Close()
	}
	if outputTransport != nil {
		defer outputTransport.Close()
	}
	for {
		ok, err := processor.Process(inputProtocol, outputProtocol)

		if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
			return nil
		} else if err != nil {
			log.Printf("error processing request: %s", err)
			return err
		}
		if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
			continue
		}
 		if !ok {
			break
		}
	}
	return nil
}

Processлогика обработки

func (p *EchoProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
	name, _, seqId, err := iprot.ReadMessageBegin()
	if err != nil {
		return false, err
	}
	// 获取传递过来的name,如果存在则处理
	if processor, ok := p.GetProcessorFunction(name); ok {
		return processor.Process(seqId, iprot, oprot)
	}
	// 异常逻辑
	iprot.Skip(thrift.STRUCT)
	iprot.ReadMessageEnd()
	x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
	oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
	x3.Write(oprot)
	oprot.WriteMessageEnd()
	oprot.Flush()
	return false, x3

}

TServerполучилаrpcПосле заявки позвонитеTProcessorprocessдля обработки.TProcessorprocessпервый звонокTTransport.readMessageBeginинтерфейс, читатьrpcназвание вызова иrpcтип вызова.

еслиrpcТип вызоваrpc call, затем позвонитеTProcessor.process_fnпродолжить обработку, неизвестноrpcТип вызова, исключение брошено.TProcessor.process_fnв соответствии сrpcназывать имя, владетьprocessMapнайти соответствующийrpcфункция обработчика. Если есть соответствующийrpcЕсли функция обработки вызывается, функция обработки вызывается для продолжения ответа на запрос. Выдает исключение, если оно не существует.

func (p *echoProcessorEcho) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
	args := EchoEchoArgs{}
	// 读取入参的参数
	if err = args.Read(iprot); err != nil {
		iprot.ReadMessageEnd()
		x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
		oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
		x.Write(oprot)
		oprot.WriteMessageEnd()
		oprot.Flush()
		return false, err
	}

	iprot.ReadMessageEnd()

	result := EchoEchoResult{}
	var retval *EchoRes
	var err2 error
	// 此处是thrift为什么err不能传错误,如果传业务错误会被阻塞
	if retval, err2 = p.handler.Echo(args.Req); err2 != nil {
		x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing echo: "+err2.Error())
		oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
		x.Write(oprot)
		oprot.WriteMessageEnd()
		oprot.Flush()
		return true, err2
	} else {
		result.Success = retval
	}

	if err2 = oprot.WriteMessageBegin("echo", thrift.REPLY, seqId); err2 != nil {
		err = err2
	}

	if err2 = result.Write(oprot); err == nil && err2 != nil {
		err = err2
	}
	if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
		err = err2
	}

	if err2 = oprot.Flush(); err == nil && err2 != nil {
		err = err2
	}
	if err != nil {
		return
	}
	return true, err
}

Серверstopкод

var once sync.Once
func (p *TSimpleServer) Stop() error {
	q := func() {
		p.quit <- struct{}{}
		p.serverTransport.Interrupt()
	}
	once.Do(q)
	return nil
}

stopФункция относительно простая, вы видите, что данные сразу в очередь блокировки записываются, затемserverбольше не принимаю запросы

код клиента

Clientфункция называется

func (p *EchoClient) Echo(req *EchoReq) (r *EchoRes, err error) {

	if err = p.sendEcho(req); err != nil {
		return
	}

	return p.recvEcho()
}

sendEcho()функция

func (p *EchoClient) sendEcho(req *EchoReq) (err error) {
	oprot := p.OutputProtocol
	if oprot == nil {
		oprot = p.ProtocolFactory.GetProtocol(p.Transport)
		p.OutputProtocol = oprot
	}
	// seqid + 1
	p.SeqId++

	if err = oprot.WriteMessageBegin("echo", thrift.CALL, p.SeqId); err != nil {
		return
	}

	// 构建参数
	args := EchoEchoArgs{
		Req: req,
	}

	if err = args.Write(oprot); err != nil {
		return
	}
	// 通知服务器发送完毕
	if err = oprot.WriteMessageEnd(); err != nil {
		return
	}
	return oprot.Flush()
}

recvEcho()функция

func (p *EchoClient) recvEcho() (value *EchoRes, err error) {
	iprot := p.InputProtocol
	if iprot == nil {
		iprot = p.ProtocolFactory.GetProtocol(p.Transport)
		p.InputProtocol = iprot
	}
	//
	method, mTypeId, seqId, err := iprot.ReadMessageBegin()
	if err != nil {
		return
	}
	if method != "echo" {
		err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "echo failed: wrong method name")
		return
	}
	if p.SeqId != seqId {
		err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "echo failed: out of sequence response")
		return
	}
	if mTypeId == thrift.EXCEPTION {
		error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
		var error1 error
		error1, err = error0.Read(iprot)
		if err != nil {
			return
		}
		if err = iprot.ReadMessageEnd(); err != nil {
			return
		}
		err = error1
		return
	}
	if mTypeId != thrift.REPLY {
		err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "echo failed: invalid message type")
		return
	}
	result := EchoEchoResult{}
	if err = result.Read(iprot); err != nil {
		return
	}
	if err = iprot.ReadMessageEnd(); err != nil {
		return
	}
	value = result.GetSuccess()
	return
}

Проблема с установкой THRIFT на компьютере Mac

  • Вопрос 1:go get git.apache.org/thrift.git/lib/go/thriftПотерпеть поражение
  • Проблема 2: Прямое использованиеgithub.comПредоставленная версия сообщит о неизвестной ошибке

Вопрос 2 должен основываться на вашемthrift -versionчтобы определить, какую версию скачатьthrift, например мойthrift版本是0.10.0Тогда нужно скачатьthriftадресhttps://github.com/apache/thrift/archive/0.10.0.zip

Создать вручнуюmkdir -p git.apache.org/thrift.git/lib/go/каталог, а затем загруженныйgoПереместите файл в этот каталог~

Reference