предисловие
Привет всем, я Jianyu. В этой главе мы познакомимся с потоковой передачей gRPC, которая делится на три типа:
- RPC потоковой передачи на стороне сервера: RPC потоковой передачи на стороне сервера
- Потоковая передача RPC на стороне клиента: Потоковая передача RPC на стороне клиента
- Двунаправленный потоковый RPC: Двунаправленный потоковый RPC
поток
Любая технология должна существовать из-за своих болевых точек. Если вы хотите узнать о вызовах потоковой передачи gRPC, продолжайте
рисунок
Потоковая передача gRPC основана на HTTP/2 и будет подробно описана в следующих главах.
Почему бы не использовать простой RPC
Почему существует потоковая передача, что не так с Simple RPC? Моделируя бизнес-сценарий, мы можем узнать, что при использовании Simple RPC возникают следующие проблемы:
- Мгновенное давление, вызванное чрезмерным размером пакета
- При получении пакетов данных все пакеты данных должны быть успешно и правильно приняты, прежде чем можно будет вызвать ответ и выполнить бизнес-обработку (он не может быть отправлен на стороне клиента, но обработан на стороне сервера).
Зачем использовать потоковый RPC
- Массивный пакет
- сцена в реальном времени
Сцена моделирования
Каждое утро в 6:00 необходимо синхронизировать пакет из миллионов наборов данных от А до Б. Во время синхронизации будет выполняться ряд операций (архивирование, анализ данных, портреты, журналы и т. д.). Объем данных, задействованных в этой одноразовой операции, действительно велик.
После завершения синхронизации некоторые люди сразу же проверят данные, чтобы подготовиться к новому дню. Также в соответствии с режимом реального времени.
По сравнению с двумя, в этом сценарии больше подходит использование Streaming RPC.
gRPC
Объясняя конкретный код потоковой передачи gRPC, я будуСосредоточьтесь на первом разделе, потому что три режима на самом деле представляют собой разные комбинации. Надеюсь, вы сможете обратить внимание на понимание и сделать выводы из одного случая, по сути, это одна и та же точка знаний 👍
Структура каталогов
$ tree go-grpc-example
go-grpc-example
├── client
│ ├── simple_client
│ │ └── client.go
│ └── stream_client
│ └── client.go
├── proto
│ ├── search.proto
│ └── stream.proto
└── server
├── simple_server
│ └── server.go
└── stream_server
└── server.go
Добавлены stream_server, stream_client для хранения серверных и клиентских файлов, proto/stream.proto используется для записи IDL
IDL
В файле stream.proto в папке proto напишите следующее:
syntax = "proto3";
package proto;
service StreamService {
rpc List(StreamRequest) returns (stream StreamResponse) {};
rpc Record(stream StreamRequest) returns (StreamResponse) {};
rpc Route(stream StreamRequest) returns (stream StreamResponse) {};
}
message StreamPoint {
string name = 1;
int32 value = 2;
}
message StreamRequest {
StreamPoint pt = 1;
}
message StreamResponse {
StreamPoint pt = 1;
}
Обратите внимание на ключевое слово stream, которое объявляет его потоковым методом. Здесь задействованы три метода, и соответствующее отношение
- Список: RPC для потоковой передачи на стороне сервера
- Запись: Клиентская потоковая передача RPC
- Маршрут: двунаправленный потоковый RPC
базовый шаблон + пустое определение
Server
package main
import (
"log"
"net"
"google.golang.org/grpc"
pb "github.com/EDDYCJY/go-grpc-example/proto"
)
type StreamService struct{}
const (
PORT = "9002"
)
func main() {
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server, &StreamService{})
lis, err := net.Listen("tcp", ":"+PORT)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
server.Serve(lis)
}
func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
return nil
}
func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
return nil
}
func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
return nil
}
Перед написанием кода рекомендуется сначала определить базовый шаблон и интерфейс gRPC Server. Если вы не уверены, обратитесь к пунктам знаний в предыдущей главе.
Client
package main
import (
"log"
"google.golang.org/grpc"
pb "github.com/EDDYCJY/go-grpc-example/proto"
)
const (
PORT = "9002"
)
func main() {
conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc.Dial err: %v", err)
}
defer conn.Close()
client := pb.NewStreamServiceClient(conn)
err = printLists(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: List", Value: 2018}})
if err != nil {
log.Fatalf("printLists.err: %v", err)
}
err = printRecord(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Record", Value: 2018}})
if err != nil {
log.Fatalf("printRecord.err: %v", err)
}
err = printRoute(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Route", Value: 2018}})
if err != nil {
log.Fatalf("printRoute.err: %v", err)
}
}
func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}
func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}
func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}
1. RPC для потоковой передачи на стороне сервера: RPC для потоковой передачи на стороне сервера
Потоковая передача RPC на стороне сервера, очевидно, является односторонним потоком и относится к серверу как к потоку, а к клиенту — как к обычному запросу RPC.
Проще говоря, клиент инициирует обычный запрос RPC, сервер отправляет набор данных несколько раз через потоковые ответы, а клиент Recv получает набор данных. Примерно так, как показано:
Server
func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
for n := 0; n <= 6; n++ {
err := stream.Send(&pb.StreamResponse{
Pt: &pb.StreamPoint{
Name: r.Pt.Name,
Value: r.Pt.Value + int32(n),
},
})
if err != nil {
return err
}
}
return nil
}
В Сервере основное вниманиеstream.Send
метод. Похоже, его можно отправить N раз? Есть ли ограничение по размеру?
type StreamService_ListServer interface {
Send(*StreamResponse) error
grpc.ServerStream
}
func (x *streamServiceListServer) Send(m *StreamResponse) error {
return x.ServerStream.SendMsg(m)
}
Читая исходный код, мы можем знать, что protoc генерирует различные стандартные методы интерфейса в соответствии с определением, когда он генерируется. Наконец, единое расписание внутреннихSendMsg
метод, который включает в себя следующий процесс:
- Сериализация тела сообщения (объекта)
- Сжатие сериализованного тела сообщения
- Добавить 5-байтовый заголовок в тело передаваемого сообщения
- Определите, превышает ли общая длина тела сжатого и сериализованного сообщения в байтах предустановленное значение maxSendMessageSize (значение по умолчанию равно
math.MaxInt32
), если он превышает, это вызовет ошибку - Набор данных, записанный в поток
Client
func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.List(context.Background(), r)
if err != nil {
return err
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
}
return nil
}
В Клиенте основное вниманиеstream.Recv()
метод. при каких обстоятельствахio.EOF
? При каких обстоятельствах появляется сообщение об ошибке?
type StreamService_ListClient interface {
Recv() (*StreamResponse, error)
grpc.ClientStream
}
func (x *streamServiceListClient) Recv() (*StreamResponse, error) {
m := new(StreamResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
RecvMsg прочитает из потока полное тело сообщения gRPC, и вы можете это узнать, прочитав исходный код:
(1) RecvMsg блокируется и ожидает
(2) RecvMsg вернется, когда поток завершится/успешно завершится (вызывается Close)io.EOF
(3) RecvMsg При возникновении какой-либо ошибки в потоке поток будет прерван, а сообщение об ошибке будет содержать код ошибки RPC. В RecvMsg могут возникать следующие ошибки:
- io.EOF
- io.ErrUnexpectedEOF
- transport.ConnectionError
- google.golang.org/grpc/codes
При этом следует отметить, что значение MaxReceiveMessageSize по умолчанию равно 1024*1024*4, превышать его не рекомендуется.
проверять
Запустите stream_server/server.go:
$ go run server.go
Запустите stream_client/client.go:
$ go run client.go
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2018
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2019
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2020
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2021
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2022
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2023
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2024
2. RPC для потоковой передачи на стороне клиента: RPC для потоковой передачи на стороне клиента
Потоковая передача RPC на стороне клиента, односторонняя потоковая передача, клиент инициирует потоковую передачунеоднократноЗапрос RPC к серверу, сервер инициируетоднаждыОтвет клиенту, примерно как показано:
Server
func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
for {
r, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gRPC Stream Server: Record", Value: 1}})
}
if err != nil {
return err
}
log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
}
return nil
}
Еще один невиданный ранее методstream.SendAndClose
, для чего это используется?
В этой программе мы обрабатываем каждый Recv, когда находимio.EOF
После (поток закрывается) окончательный результат ответа нужно отправить клиенту, а Recv, ожидающий на другой стороне, одновременно закрывается
Client
func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.Record(context.Background())
if err != nil {
return err
}
for n := 0; n < 6; n++ {
err := stream.Send(r)
if err != nil {
return err
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
return err
}
log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
return nil
}
stream.CloseAndRecv
иstream.SendAndClose
Это метод сопоставления потоков, я думаю, вы поняли его функцию за считанные секунды.
проверять
Перезапустите stream_server/server.go и снова запустите stream_client/client.go:
поток_клиент:
$ go run client.go
2018/09/24 16:23:03 resp: pj.name: gRPC Stream Server: Record, pt.value: 1
потоковый_сервер:
$ go run server.go
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
3. Двунаправленный потоковый RPC: Двунаправленный потоковый RPC
Двунаправленная потоковая передача RPC, как следует из названия, является двунаправленной потоковой передачей. Клиент инициирует запрос в потоковом режиме, и сервер также отвечает на запрос в потоковом режиме.
Первый запрос должен быть инициирован клиентом, но конкретный способ взаимодействия (кто приходит первым, сколько отправляется за раз, сколько ответов и когда закрыть) определяется в зависимости от того, как написана программа (может быть в сочетании с сопрограммами)
Предположим, что двунаправленный потокотправить по порядкуЕсли да, то примерно как показано:
Опять же, важно подчеркнуть, что двунаправленные потоки сильно различаются от программы к программе.Двунаправленные блок-схемы нельзя использовать в разных сценариях.
Server
func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
n := 0
for {
err := stream.Send(&pb.StreamResponse{
Pt: &pb.StreamPoint{
Name: "gPRC Stream Client: Route",
Value: int32(n),
},
})
if err != nil {
return err
}
r, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
n++
log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
}
return nil
}
Client
func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.Route(context.Background())
if err != nil {
return err
}
for n := 0; n <= 6; n++ {
err = stream.Send(r)
if err != nil {
return err
}
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
}
stream.CloseSend()
return nil
}
проверять
Перезапустите stream_server/server.go и снова запустите stream_client/client.go:
stream_server
$ go run server.go
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
stream_client
$ go run client.go
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 0
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 1
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 2
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 3
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 4
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 5
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 6
Суммировать
В этой статье представлены три типа методов взаимодействия потоков, и соответствующий метод может быть выбран в соответствии с реальным бизнес-сценарием. Он сделает больше с меньшими затратами.
?
Если у вас есть какие-либо вопросы или ошибки, добро пожаловать вissuesЗадавайте вопросы или вносите исправления, если вам нравится или помогаете, добро пожаловатьStar, является своеобразным поощрением и продвижением автора.