Go gRPC Series 3: потоковый клиент и сервер

Go

предисловие

Привет всем, я Jianyu. В этой главе мы познакомимся с потоковой передачей gRPC, которая делится на три типа:

  • RPC потоковой передачи на стороне сервера: RPC потоковой передачи на стороне сервера
  • Потоковая передача RPC на стороне клиента: Потоковая передача RPC на стороне клиента
  • Двунаправленный потоковый RPC: Двунаправленный потоковый RPC

поток

Любая технология должна существовать из-за своих болевых точек. Если вы хотите узнать о вызовах потоковой передачи gRPC, продолжайте

рисунок

image

Потоковая передача gRPC основана на HTTP/2 и будет подробно описана в следующих главах.

Почему бы не использовать простой RPC

Почему существует потоковая передача, что не так с Simple RPC? Моделируя бизнес-сценарий, мы можем узнать, что при использовании Simple RPC возникают следующие проблемы:

  • Мгновенное давление, вызванное чрезмерным размером пакета
  • При получении пакетов данных все пакеты данных должны быть успешно и правильно приняты, прежде чем можно будет вызвать ответ и выполнить бизнес-обработку (он не может быть отправлен на стороне клиента, но обработан на стороне сервера).

Зачем использовать потоковый RPC

  • Массивный пакет
  • сцена в реальном времени

Сцена моделирования

Каждое утро в 6:00 необходимо синхронизировать пакет из миллионов наборов данных от А до Б. Во время синхронизации будет выполняться ряд операций (архивирование, анализ данных, портреты, журналы и т. д.). Объем данных, задействованных в этой одноразовой операции, действительно велик.

После завершения синхронизации некоторые люди сразу же проверят данные, чтобы подготовиться к новому дню. Также в соответствии с режимом реального времени.

По сравнению с двумя, в этом сценарии больше подходит использование Streaming RPC.

gRPC

Объясняя конкретный код потоковой передачи gRPC, я будуСосредоточьтесь на первом разделе, потому что три режима на самом деле представляют собой разные комбинации. Надеюсь, вы сможете обратить внимание на понимание и сделать выводы из одного случая, по сути, это одна и та же точка знаний 👍

Структура каталогов

$ tree go-grpc-example 
go-grpc-example
├── client
│   ├── simple_client
│   │   └── client.go
│   └── stream_client
│       └── client.go
├── proto
│   ├── search.proto
│   └── stream.proto
└── server
    ├── simple_server
    │   └── server.go
    └── stream_server
        └── server.go

Добавлены stream_server, stream_client для хранения серверных и клиентских файлов, proto/stream.proto используется для записи IDL

IDL

В файле stream.proto в папке proto напишите следующее:

syntax = "proto3";

package proto;

service StreamService {
    rpc List(StreamRequest) returns (stream StreamResponse) {};

    rpc Record(stream StreamRequest) returns (StreamResponse) {};

    rpc Route(stream StreamRequest) returns (stream StreamResponse) {};
}


message StreamPoint {
  string name = 1;
  int32 value = 2;
}

message StreamRequest {
  StreamPoint pt = 1;
}

message StreamResponse {
  StreamPoint pt = 1;
}

Обратите внимание на ключевое слово stream, которое объявляет его потоковым методом. Здесь задействованы три метода, и соответствующее отношение

  • Список: RPC для потоковой передачи на стороне сервера
  • Запись: Клиентская потоковая передача RPC
  • Маршрут: двунаправленный потоковый RPC

базовый шаблон + пустое определение

Server

package main

import (
	"log"
	"net"

	"google.golang.org/grpc"

	pb "github.com/EDDYCJY/go-grpc-example/proto"
	
)

type StreamService struct{}

const (
	PORT = "9002"
)

func main() {
	server := grpc.NewServer()
	pb.RegisterStreamServiceServer(server, &StreamService{})

	lis, err := net.Listen("tcp", ":"+PORT)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}

	server.Serve(lis)
}

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
	return nil
}

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
	return nil
}

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
	return nil
}

Перед написанием кода рекомендуется сначала определить базовый шаблон и интерфейс gRPC Server. Если вы не уверены, обратитесь к пунктам знаний в предыдущей главе.

Client

package main

import (
    "log"
    
	"google.golang.org/grpc"

	pb "github.com/EDDYCJY/go-grpc-example/proto"
)

const (
	PORT = "9002"
)

func main() {
	conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("grpc.Dial err: %v", err)
	}

	defer conn.Close()

	client := pb.NewStreamServiceClient(conn)

	err = printLists(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: List", Value: 2018}})
	if err != nil {
		log.Fatalf("printLists.err: %v", err)
	}

	err = printRecord(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Record", Value: 2018}})
	if err != nil {
		log.Fatalf("printRecord.err: %v", err)
	}

	err = printRoute(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Route", Value: 2018}})
	if err != nil {
		log.Fatalf("printRoute.err: %v", err)
	}
}

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	return nil
}

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	return nil
}

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	return nil
}

1. RPC для потоковой передачи на стороне сервера: RPC для потоковой передачи на стороне сервера

Потоковая передача RPC на стороне сервера, очевидно, является односторонним потоком и относится к серверу как к потоку, а к клиенту — как к обычному запросу RPC.

Проще говоря, клиент инициирует обычный запрос RPC, сервер отправляет набор данных несколько раз через потоковые ответы, а клиент Recv получает набор данных. Примерно так, как показано:

image

Server

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
	for n := 0; n <= 6; n++ {
		err := stream.Send(&pb.StreamResponse{
			Pt: &pb.StreamPoint{
				Name:  r.Pt.Name,
				Value: r.Pt.Value + int32(n),
			},
		})
		if err != nil {
			return err
		}
	}

	return nil
}

В Сервере основное вниманиеstream.Sendметод. Похоже, его можно отправить N раз? Есть ли ограничение по размеру?

type StreamService_ListServer interface {
	Send(*StreamResponse) error
	grpc.ServerStream
}

func (x *streamServiceListServer) Send(m *StreamResponse) error {
	return x.ServerStream.SendMsg(m)
}

Читая исходный код, мы можем знать, что protoc генерирует различные стандартные методы интерфейса в соответствии с определением, когда он генерируется. Наконец, единое расписание внутреннихSendMsgметод, который включает в себя следующий процесс:

  • Сериализация тела сообщения (объекта)
  • Сжатие сериализованного тела сообщения
  • Добавить 5-байтовый заголовок в тело передаваемого сообщения
  • Определите, превышает ли общая длина тела сжатого и сериализованного сообщения в байтах предустановленное значение maxSendMessageSize (значение по умолчанию равноmath.MaxInt32), если он превышает, это вызовет ошибку
  • Набор данных, записанный в поток

Client

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	stream, err := client.List(context.Background(), r)
	if err != nil {
		return err
	}

	for {
		resp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}

		log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
	}

	return nil
}

В Клиенте основное вниманиеstream.Recv()метод. при каких обстоятельствахio.EOF? При каких обстоятельствах появляется сообщение об ошибке?

type StreamService_ListClient interface {
	Recv() (*StreamResponse, error)
	grpc.ClientStream
}

func (x *streamServiceListClient) Recv() (*StreamResponse, error) {
	m := new(StreamResponse)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

RecvMsg прочитает из потока полное тело сообщения gRPC, и вы можете это узнать, прочитав исходный код:

(1) RecvMsg блокируется и ожидает

(2) RecvMsg вернется, когда поток завершится/успешно завершится (вызывается Close)io.EOF

(3) RecvMsg При возникновении какой-либо ошибки в потоке поток будет прерван, а сообщение об ошибке будет содержать код ошибки RPC. В RecvMsg могут возникать следующие ошибки:

  • io.EOF
  • io.ErrUnexpectedEOF
  • transport.ConnectionError
  • google.golang.org/grpc/codes

При этом следует отметить, что значение MaxReceiveMessageSize по умолчанию равно 1024*1024*4, превышать его не рекомендуется.

проверять

Запустите stream_server/server.go:

$ go run server.go

Запустите stream_client/client.go:

$ go run client.go 
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2018
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2019
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2020
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2021
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2022
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2023
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2024

2. RPC для потоковой передачи на стороне клиента: RPC для потоковой передачи на стороне клиента

Потоковая передача RPC на стороне клиента, односторонняя потоковая передача, клиент инициирует потоковую передачунеоднократноЗапрос RPC к серверу, сервер инициируетоднаждыОтвет клиенту, примерно как показано:

image

Server

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
	for {
		r, err := stream.Recv()
		if err == io.EOF {
			return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gRPC Stream Server: Record", Value: 1}})
		}
		if err != nil {
			return err
		}

		log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
	}

	return nil
}

Еще один невиданный ранее методstream.SendAndClose, для чего это используется?

В этой программе мы обрабатываем каждый Recv, когда находимio.EOFПосле (поток закрывается) окончательный результат ответа нужно отправить клиенту, а Recv, ожидающий на другой стороне, одновременно закрывается

Client

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	stream, err := client.Record(context.Background())
	if err != nil {
		return err
	}

	for n := 0; n < 6; n++ {
		err := stream.Send(r)
		if err != nil {
			return err
		}
	}

	resp, err := stream.CloseAndRecv()
	if err != nil {
		return err
	}

	log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)

	return nil
}

stream.CloseAndRecvиstream.SendAndCloseЭто метод сопоставления потоков, я думаю, вы поняли его функцию за считанные секунды.

проверять

Перезапустите stream_server/server.go и снова запустите stream_client/client.go:

поток_клиент:
$ go run client.go
2018/09/24 16:23:03 resp: pj.name: gRPC Stream Server: Record, pt.value: 1
потоковый_сервер:
$ go run server.go
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018

3. Двунаправленный потоковый RPC: Двунаправленный потоковый RPC

Двунаправленная потоковая передача RPC, как следует из названия, является двунаправленной потоковой передачей. Клиент инициирует запрос в потоковом режиме, и сервер также отвечает на запрос в потоковом режиме.

Первый запрос должен быть инициирован клиентом, но конкретный способ взаимодействия (кто приходит первым, сколько отправляется за раз, сколько ответов и когда закрыть) определяется в зависимости от того, как написана программа (может быть в сочетании с сопрограммами)

Предположим, что двунаправленный потокотправить по порядкуЕсли да, то примерно как показано:

image

Опять же, важно подчеркнуть, что двунаправленные потоки сильно различаются от программы к программе.Двунаправленные блок-схемы нельзя использовать в разных сценариях.

Server

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
	n := 0
	for {
		err := stream.Send(&pb.StreamResponse{
			Pt: &pb.StreamPoint{
				Name:  "gPRC Stream Client: Route",
				Value: int32(n),
			},
		})
		if err != nil {
			return err
		}

		r, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}

		n++

		log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
	}

	return nil
}

Client

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	stream, err := client.Route(context.Background())
	if err != nil {
		return err
	}

	for n := 0; n <= 6; n++ {
		err = stream.Send(r)
		if err != nil {
			return err
		}

		resp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}

		log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
	}

	stream.CloseSend()

	return nil
}

проверять

Перезапустите stream_server/server.go и снова запустите stream_client/client.go:

stream_server
$ go run server.go
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
stream_client
$ go run client.go
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 0
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 1
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 2
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 3
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 4
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 5
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 6

Суммировать

В этой статье представлены три типа методов взаимодействия потоков, и соответствующий метод может быть выбран в соответствии с реальным бизнес-сценарием. Он сделает больше с меньшими затратами.

?

Если у вас есть какие-либо вопросы или ошибки, добро пожаловать вissuesЗадавайте вопросы или вносите исправления, если вам нравится или помогаете, добро пожаловатьStar, является своеобразным поощрением и продвижением автора.

мой публичный аккаунт

image

Ссылаться на

Пример кода для этой серии