gRPC от обучения к производству

gRPC
gRPC от обучения к производству

видео информация

grpc: From Tutorial to Production by Alan Shreve at GopherCon 2017

Woohoo.YouTube.com/watch?V=7FZ…

Сообщение блога:about.source graph.com/go/personal-PC-in-…

Как микросервисы должны взаимодействовать?

Ответ:SOAP...ну, шучу, конечно, это уже не SOAP.

Сейчас популярна практикаHTTP + JSON (REST API)

Алан сказал: «Если я никогда в жизни не напишу еще одну клиентскую REST-библиотеку, я могу умереть счастливо...😂», потому что это самое скучное занятие — делать одно и то же снова и снова.

Почему нельзя использовать REST API?

  • Реализация Stream слишком сложна
  • А двунаправленный поток просто невозможен
  • Сложные операции моделирования
  • Очень неэффективно, текстовое представление — не лучший выбор для сети.
  • Более того, на самом деле сервис вообще не RESTful, это просто конечная точка HTTP
  • Трудно получить несколько данных ресурсов в одном запросе (см. GraphQL для встречного примера).
  • Отсутствие формальных (машиночитаемых) ограничений API
    • Поэтому для написания клиента требуется человек
      • А потому что 👷 очень дорого, а я не люблю писать клиентам

Что такое gRPC

gRPC — это высокопроизводительная платформа RPC общего назначения с открытым исходным кодом.

Вместо того, чтобы объяснять определение, понятнее сделать что-то на самом деле.

Создайте службу кэширования

С такими вещами, как gRPC, мы не начинаем писать код Go, мы начинаем писать gRPC.IDLначалось.

app.proto

syntax = "proto3"
package rpc;
service Cache {
  rpc Store(StoreReq) returns (StoreResp) {}
  rpc Get(GetReq) returns (GetResp) {}
}
message StoreReq {
  string key = 1;
  bytes val = 2;
}
message StoreResp {
}
message GetReq {
  string key = 1;
}
message GetResp {
  bytes val = 1;
}

Когда этот файл записывается, мы сразу имеем9Библиотеки для клиентов на многих языках.

  • C++
  • Java(and Android)
  • Python
  • Go
  • Ruby
  • C#
  • Javascript(node.js)
  • Objective-C (iOS!)
  • PHP

В то же время у нас также есть7API Stub на стороне сервера на разных языках:

  • C++
  • Java
  • Python
  • Go
  • Ruby
  • C#
  • Javascript(node.js)

server.go

func serverMain() {
  if err := runServer(); err != nil {
    fmt.Fprintf(os.Stderr, "Failed to run cache server: %s\n", err)
    os.Exit(1)
  }
}
func runServer() error {
  srv := grpc.NewServer()
  rpc.RegisterCacheServer(srv, &CacheService{})
  l, err := net.Listen("tcp", "localhost:5051")
  if err != nil {
    return err
  }
  //  block
  return srv.Serve(l)
}

Пока не реализуйте CacheService, сначала оставьте его пустым, а реализуйте позже.

type CacheService struct {
}
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
  return nil, fmt.Errorf("unimplemented")
}
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  return nil, fmt.Errorf("unimplemented")
}

client.go

func clientMain() {
  if err != runClient(); err != nil {
    fmt.Fprintf(os.Stderr, "failed: %v\n", err)
    os.Exit(1)
  }
}
func runClient() error {
  //  建立连接
  conn, err := grpc.Dial("localhost:5053", grpc.WithInsecure())
  if err != nil {
    return fmt.Errorf("failed to dial server: %v", err)
  }
  cache := rpc.NewCacheClient(conn)
  //  调用 grpc 的 store() 方法存储键值对 { "gopher": "con" }
  _, err = cache.Store(context.Background(), &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
  if err != nil {
    return fmt.Errorf("failed to store: %v", err)
  }
  //  调用 grpc 的 get() 方法取回键为 `gopher` 的值
  resp, err := cache.Get(context.Background(), &rpc.GetReq{Key: "gopher"})
  if err != nil {
    return fmt.Errorf("failed to get: %v", err)
  }
  //  输出
  fmt.Printf("Got cached value %s\n", resp.Val)
  return nil
}

Разве это не просто WSDL?

Некоторые люди могут подумать, что этоWSDLЭто слишком похоже, и это правильно так думать, потому что gRPC извлек уроки из ошибок предыдущего SOAP/WSDL, а также впитал в себя их превосходные качества.

  • Не так тесно связан с XML (grpc подключается и может быть заменен различными базовыми представлениями)
  • Любой, кто писал XML/XSD, знает, что эти определения службы слишком громоздки, gRPC не имеет этой проблемы.
  • WSDL имеет совершенно ненужную сложность и в основном ненужные функции (двухэтапная фиксация)
  • WSDL негибкий и несовместимый вперед (в отличие отprotobuf)
  • Производительность SOAP/WSDL слишком низкая, и нельзя использовать потоки.
  • Но машинопонятное определение API в WSDL — это действительно хорошо.

Реализовать конкретный CacheService

server.go

type CacheService struct {
  store map[string][]byte
}
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
  val := s.store[req.Key]
  return &rpc.GetResp{Val: val}, nil
}
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  s.store[req.Key] = req.Val
  return &rpc.StoreResp{}, nil
}

Обратите внимание, что здесь нет блокировок, вы можете думать о них, как они будут вызываться одновременно в будущем.

обработка ошибок

Конечно, gRPC поддерживает обработку ошибок. Предположим, вышеприведенный Get() переписан, чтобы сообщать об ошибке для несуществующего ключа:

func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
  val, ok := s.store[req.Key]
  if !ok {
    return nil, status.Errorf(code.NotFound, "Key not found %s", req.Key)
  }
  return &rpc.GetResp{Val: val}, nil
}

зашифрованная передача

Если бы такой код был развернут, он был быSREПерехвачено, так как все сообщения должны передаваться в зашифрованном виде.

Добавить зашифрованный транспорт TLS в gRPC очень просто. Например, мы модифицируем runServer(), чтобы добавить зашифрованную передачу TLS.

func runServer() error {
  tlsCreds, err := credentials.NewServerTLSFromFile("tls.crt", "tls.key")
  if err != nil {
    return err
  }
  srv := grpc.NewServer(grpc.Creds(tlsCreds))
  ...
}

Опять же, нам также нужно изменить runClient().

func runClient() error {
  tlsCreds := credentials.NewTLS(&tls.Config(InsecureSkipVerify: true))
  conn, err := grpc.Dial("localhost:5051", grpc.WithTransportCredentials(tlsCreds))
  ...
}

Как использовать gRPC в продакшене

  • HTTP/2
  • protobuf serialization (pluggable)
  • Клиент откроет длинное соединение с сервером grpc
    • Будет новый поток HTTP/2 для каждого вызова RPC
    • Разрешить вызовы RPC для имитации режима полета
  • разрешить клиентамиПотоковая передача сервера

Внедрение gRPC

В настоящее время существует 3 высокопроизводительных реализации, управляемых событиями.

  • C
    • Ruby, Python, Node.js, PHP, C#, Objective-C, C++ — все это привязки к этой реализации ядра C.
    • PHP привязан к этой реализации через PECL
  • Java
    • Netty + BoringSSL через JNI
  • Go
    • Реализация Pure Go с использованием crypto/tls из стандартной библиотеки Go.

Откуда взялся gRPC?

  • Первоначально создан командой Google
  • Еще раньше был внутренний проект Google под названием stubby.
  • Этот gRPC является проектом с открытым исходным кодом следующего поколения, и теперь он используется не только Google, но и многими компаниями.
    • Конечно, Google также является основным поставщиком кода.

Пример производственной среды: мультиарендность

После выхода в онлайн для производства я обнаружил, что некоторые клиенты сгенерировали большое количество пар "ключ-значение". Я спросил и узнал, что некоторые клиенты хотят кэшировать все, что, очевидно, не очень хорошо для нашей службы кэширования.

Мы хотим ограничить такое поведение, но для текущей системы это требование не может быть выполнено, поэтому нам нужно модифицировать реализацию и выдавать токен клиента каждому клиенту, тогда мы можем ограничить максимальное количество ключей, которые может создать конкретный клиент во избежание злоупотребления системой. Это становится многопользовательской службой кэширования.

Как и прежде, мы по-прежнему начинаем с IDL, нам нужно изменить интерфейс и добавить пункт account_token.

message StoreReq {
  string key = 1;
  bytes val = 2;
  string account_token = 3;
}

Точно так же нам нужна отдельная служба для службы учетной записи, чтобы получить количество ключей кеша, разрешенных учетной записью:

service Accounts {
  rpc GetByToken(GetByTokenReq) return (GetByTokenResp) {}
}
message GetByTokenReq {
  string token = 1;
}
message GetByTokenResp {
  Account account = 1;
}
message Account {
  int64 max_cache_keys = 1;
}

Здесь установлен новый сервис Accounts, и есть метод GetByToken(), который дает токен и возвращает результат типа Account, и есть ключ max_cache_keys в Account, соответствующий максимальному количеству значений ключа, которое может быть кэшировано.

Теперь мы дополнительно модифицируемclient.go

func runClient() error {
  ...
  cache := rpc.NewCacheClient(conn)
  _, err = cache.Store(context.Background(), &rpc.StoreReq{
    AccountToken: "inconshreveable",
    Key:          "gopher",
    Val:          []byte("con"),
  })
  if err != nil {
    return fmt.Errorf("failed to store: %v", err)
  }
  ...
}

Изменения на стороне сервера немного больше, но не чрезмерны.

type CacheService struct {
  accounts      rpc.AccountsClient
  store         map[string][]byte
  keysByAccount map[string]int64
}

Обратите внимание, что учетные записи здесь — это клиент grpc, потому что наша служба также является клиентом другой службы grpc. Итак, в следующей реализации Store() нам нужно вызвать другую службу через учетные записи, чтобы получить информацию об учетной записи.

func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  //  调用另一个服务取得账户信息,包含其键值限制
  resp, err := s.accounts.GetByToken(context.Background(), &rpc.GetByTokenReq{
    Token: req.AccountToken,
  })
  if err != nil {
    return nil, err
  }
  //  检查是否超量使用
  if s.keysByAccount[req.AccountToken] >= resp.Account.MaxCacheKeys {
    return nil, status.Errorf(codes.FailedPrecondition, "Account %s exceeds max key limit %d", req.AccountToken, resp.Account.MaxCacheKeys)
  }
  //  如果键不存在,需要新加键值,那么我们就对计数器加一
  if _, ok := s.store[req.Key]; !ok {
    s.keysByAccount[req.AccountToken] += 1
  }
  //  保存键值
  s.store[req.Key] = req.Val
  return &rpc.StoreResp{}, nil
}

Пример производственной среды: производительность

Вышеупомянутая проблема решена, наш сервис снова работает, и не будет пользователей, создающих слишком много ключевых значений. Но вскоре мы получили новые проблемы от других пользователей.Многие люди ответили, что новая система работает медленно и не соответствует требованиям.SLAтребования.

Но мы понятия не имели, что происходит, и поняли, что у нашей программы не было никакой наблюдаемости (Observability), другими словами, у нашей программы не было никакой системы измерения для подсчета данных, связанных с производительностью.

Начнем с самого простого, добавления логов.

Давайте начнем сclient.goДля начала добавьте некоторые измерения и подсчеты, а также выходные данные журнала.

...
//  开始计时
start := time.Now()
_, err = cache.Store(context.Background(), &rpc.StoreReq{
  AccountToken: "inconshreveable",
  Key:          "gopher",
  Val:          []byte("con"),
})
//  计算 cache.Store() 调用时间
log.Printf("cache.Store duration %s", time.Since(start))
if err != nil {
  return fmt.Errorf("failed to store: %v", err)
}
//  再次开始计时
start = time.Now()
//  调用 grpc 的 get() 方法取回键为 `gopher` 的值
resp, err := cache.Get(context.Background(), &rpc.GetReq{Key: "gopher"})
//  计算 cache.Get() 调用时间
log.Printf("cache.Get duration %s", time.Since(start))
if err != nil {
  return fmt.Errorf("failed to get: %v", err)
}

То же самое делается на стороне сервера.

func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  //  开始计时
  start := time.Now()
  //  调用另一个服务取得账户信息,包含其键值限制
  resp, err := s.accounts.GetByToken(context.Background(), &rpc.GetByTokenReq{
    Token: req.AccountToken,
  })
  //  输出 account.GetByToken() 的调用时间
  log.Printf("accounts.GetByToken duration %s", time.Since(start))
  ...
}

После этих модификаций мы обнаружили, что одно и то же делается снова и снова, так есть ли способ изменить эту скучную практику? Изучив документацию grpc, я увидел, что естьClient Interceptorс вещами.

Это эквивалентно промежуточному программному обеспечению, но на стороне клиента. Когда клиент делает вызов rpc, это ПО промежуточного слоя будет вызываться первым, поэтому это ПО промежуточного слоя может обернуть вызов в слое перед вызовом.

Чтобы реализовать эту функциональность, мы создаем новый файл с именем interceptor.go:

func WithClientInterceptor() grpc.DialOption {
  return grpc.WithUnaryInterceptor(clientInterceptor)
}
func clientInterceptor(
  ctx context.Context,
  method string,
  req interface{},
  reply interface{},
  cc *grpc.ClientConn,
  invoker grpc.UnaryInvoker,
  opts ...grpc.CallOption,
) error {
  start := time.Now()
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Printf("invoke remote method=%s duration=%s error=%v", method, time.Since(start), err)
  return err
}

После того, как у нас есть этот WithClientInterceptor(), мы можем зарегистрировать его в grpc.Dial().client.go

func runClient() error {
  ...
  conn, err := grpc.Dial("localhost:5051",
    grpc.WithTransportCredentials(tlsCreds),
    WithClientInterceptor())
  ...
}

После регистрации все вызовы grpc будут проходить через наш зарегистрированный clientInterceptor(), поэтому будет подсчитываться все время, вместо повторного добавления времени, измерения и вывода внутри каждой функции.

После добавления измерения клиента, естественно думать, что сервер может делать то же самое? После проверки документации, да, есть файл с именемServer Interceptorс вещами.

Таким же образом добавляем на сервер interceptor.go и добавляем функцию ServerInterceptor().

func ServerInterceptor() grpc.ServerOption {
  return grpc.UnaryInterceptor(serverInterceptor)
}
func serverInterceptor(
  ctx context.Context,
  req interface{},
  info *grpc.UnaryServerInfo,
  handler grpc.UnaryHandler,
) (interface{}, error) {
  start := time.Now()
  resp, err := handler(ctx, req)
  log.Printf("invoke server method=%s duration=%s error=%v",
    info.FullMethod,
    time.Since(start),
    err)
  return resp, err
}

Как и в случае с клиентом, нам нужно зарегистрировать промежуточное ПО, которое мы определили в runServer().

func runServer() error {
  ...
  srv := grpc.NewServer(grpc.Creds(tlsCreds), ServerInterceptor())
  ...
}

Случай производственной среды: тайм-аут

После добавления логов мы, наконец, обнаружили в логах, что /rpc.Accounts/GetByToken/ заняло много времени. Нам нужно установить тайм-аут для этой операции.server.go

func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  accountsCtx, _ := context.WithTimeout(context.Background(), 2 * time.Second)
  resp, err := s.accounts.GetByToken(accountsCtx, &rpc.GetByTokenReq{
    Token: req.AccountToken,
  })
  ...
}

Операция здесь очень проста, просто используйте context.WithTimeout() напрямую в стандартной библиотеке.

Пример производственной среды: передача контекста

После вышеописанной модификации заказчик все равно жалуется на несоблюдение SLA, и правильно над этим хорошо подумать. Даже если ограничение составляет 2 секунды, клиентскому вызову все равно требуется время, а другие коды также имеют накладные расходы по времени в середине. А некоторые заказчики говорят, что здесь нужна 1 секунда, а не 2 секунды.

Что ж, давайте отправим эту настройку времени вызывающему абоненту.

Во-первых, мы требуем установки ограничений времени вызова на стороне клиента:client.go

func runClient() error {
  ...
  ctx, _ := context.WithTimeout(context.Background(), time.Second)
  _, err = cache.Store(ctx, &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
  ...
  ctx, _ = context.WithTimeout(context.Background(), 50*time.Millisecond)
  resp, err := cache.Get(ctx, &rpc.GetReq{Key: "gopher"})
  ...
}

Затем на стороне сервера мы передаем контекст. Получает ctx вызывающего абонента напрямую.

func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  resp, err := s.accounts.GetByToken(ctx, &rpc.GetByTokenReq{
    Token: req.AccountToken,
  })
  ...
}

Пример производственной среды: метаданные GRPC

Вышеуказанные проблемы были решены, и наконец-то можно вздохнуть с облегчением. Но заказчик выдвинул новые требования... 😅, если мы сможем добавить флаг пробного прогона, значит, я хочу, чтобы вы сделали все, что нужно, кроме собственно модификации библиотеки ключ-значение.

Метаданные GRPC, также известные как заголовок GRPC. Как и в заголовках HTTP, может передаваться некоторая информация о метаданных. Использование метаданных может сделать нашу реализацию Dry Run более лаконичной, и нам не нужно реализовывать логику проверки флага Dry Run в каждом методе RPC, мы можем быть независимыми.

func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
  resp, err := s.accounts.GetByToken(ctx, &rpc.GetByTokenReq{
    Token: req.AccountToken,
  })
  if !dryRun(ctx) {
    if _, ok := s.store[req.Key]; !ok {
      s.keysByAccount[req.AccountToke] += 1
    }
    s.store[req.Key] = req.Val
  }
  return &rpc.StoreResp{}, nil
}
func dryRun(ctx context.Context) bool {
  md, ok := metadata.FromContext(ctx)
  if !ok {
    return false
  }
  val, ok := md["dry-run"]
  if !ok {
    return false
  }
  if len(val) < 1 {
    return false
  }
  return val[0] == "1"
}

Конечно, при этом есть компромисс, потому что обобщение лишает возможности проверки типов.

Когда клиент звонит, вам нужно добавить пробные параметры в метаданные в зависимости от ситуации.

func runClient() error {
  ...
  ctx, _ := context.WithTimeout(context.Background(), time.Second)
  ctx = metadata.NewContext(ctx, metadata.Pairs("dry-run", "1"))
  _, err = cache.Store(ctx, &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
  ...
}

Пример производственной среды: повторите попытку

После внедрения Dry Run я подумал, что пришло время отдохнуть.Клиенты, которые жаловались на медлительность раньше, жаловались снова.Хотя есть контроль тайм-аута для соблюдения SLA, сервис по-прежнему медленный, и общий тайм-аут не работает. Проверил и обнаружил, что дело в сети, мы мало что могли сделать. Чтобы решить проблему клиента, добавим механизм повторных попыток.

Мы можем добавить механизм повторной попытки к каждому вызову gRPC, а также можем использовать Interceptor, как и предыдущую статистику времени, верно?

func clientInterceptor(...) error {
  var (
    start     = time.Now()
    attempts  = 0
    err       error
    backoff   retryBackOff
  )
  for {
    attempts += 1
    select {
    case <-ctx.Done():
      err = status.Errorf(codes.DeadlineExceeded, "timeout reached before next retry attempt")
    case <-backoff.Next():
      startAttempt := time.Now()
      err = invoker(ctx, method, req, reply, cc, opts...)
      if err != nil {
        log.Printf(...)
        continue
      }
    }
    break
  }
  log.Printf(...)
  return err
}

Выглядит хорошо, а потом я выложу код. Когда результат был отправлен на рассмотрение, мне перезвонили, сказав, что код необоснованный, потому что если оннеидемпотентныйоперация, которая приведет к многократному выполнению и изменит ожидаемый результат.

Кажется, что мы должны относиться к идемпотентным и неидемпотентным операциям по-разному.

silo.FireZeMissiles(NotIdempotent(ctx), req)

Ну, конечно, нет такого. Поэтому нам нужно самим создать маркер через контекст, чтобы указать, является ли операция идемпотентной.

func NotIdempotent(ctx context.Context) context.Context {
  return context.WithValue(ctx, "idempotent", false)
}
func isIdempotent(ctx context.Context) bool {
  val, ok := ctx.Value("idempotent").(bool)
  if !ok {
    return true
  }
  return val
}

Затем добавьте оценку isIdempotent() в нашу реализацию clientInterceptor():

func clientInterceptor(...) error {
  var (
    start     = time.Now()
    attempts  = 0
    err       error
    backoff   retryBackOff
  )
  for {
    attempts += 1
    select {
    case <-ctx.Done():
      err = status.Errorf(codes.DeadlineExceeded, "timeout reached before next retry attempt")
    case <-backoff.Next():
      startAttempt := time.Now()
      err = invoker(ctx, method, req, reply, cc, opts...)
      if err != nil && isIdempotent(ctx) {
        log.Printf(...)
        continue
      }
    }
    break
  }
  log.Printf(...)
  return err
}

Таким образом, при сбое вызова клиент проверяет и обнаруживает, что он является идемпотентным, перед повторной попыткой, в противном случае он не повторяет попытку. Повторные операции неидемпотентных операций избегаются.

Производственный пример: структурные ошибки

Я чувствую, что проблем нет, поэтому развертывание происходит онлайн. Но побегав некоторое время, я обнаружил, что что-то не так. Все успешные вызовы RPC, то есть сама операция выполнена правильно, и повторные попытки тайм-аута являются нормальными. Но все неудачные вызовы RPC ошибочны, все неудачные вызовы RPC возвращают тайм-аут, а не саму ошибку. Упомянутый здесь сбой означает не тайм-аут, вызванный сетевой проблемой, а сбой самого запроса.Например, как упоминалось ранее, Get() должна возвращать ошибку, если ключ не существует, или если Store() превышает квоты, он должен вернуть Error, такой ошибки в логе не видно, но соответствует тайм-ауту.

После анализа выясняется, что сервер должен сообщить об ошибке, и проблемы нет, но клиент ошибается, и он должен вернуть ошибку вызывающей стороне, вместо этого клиентский код начинает повторять операцию. Похоже, что все еще есть проблема с кодом, который был повторен ранее.

err = invoker(ctx, method, req, reply, cc, opts...)
if err != nil && isIdempotent(ctx) {
  log.Printf(...)
  continue
}

Если вы внимательно посмотрите на эту часть кода, то увидите, что какой бы ни была ошибка, пока она не равна нулю, мы пытаемся снова. На самом деле это неправильно, мы должны повторять попытки только для определенных ошибок, таких как проблемы с сетью и тому подобное, и не должны повторять попытки для ошибок, которые мы хотим вернуть вызывающей стороне, что не имеет смысла.

Тогда возникает вопрос, как мы должны судить об ошибке, чтобы решить, стоит ли повторять попытку?

  • Можно использовать разные коды ошибок. Повторная попытка требуется для определенных кодов, но не для других, поэтому вам необходимо настроить код ошибки gRPC;
  • Мы также можем определить данные типа Error, которые содержат какой-то флаг, указывающий, стоит ли повторять попытку.
  • Или просто поместите код ошибки в ответное сообщение и убедитесь, что каждое сообщение имеет код ошибки, который мы определяем, чтобы указать, требуется ли повторная попытка.

Итак, нам нужно полное структурированное сообщение об ошибке, а не простой код ошибки и строка. Конечно, этот путь нелегкий, но мы так много сделали и сможем его преодолеть, если будем упорствовать.

Здесь мы снова начинаем с IDL:

message Error {
  int64 code = 1;
  string messsage = 2;
  bool temporary = 3;
  int64 userErrorCode = 4;
}

Затем мы реализуем этот тип ошибки.rpc/error.go

func (e *Error) Error() string {
  return e.Message
}
func Errorf(code codes.Code, temporary bool, msg string, args ..interface{}) error {
  return &Error{
    Code:      int64(code),
    Message:   fmt.Sprintf(msg, args...),
    Temporary: temporary,
  }
}

С помощью этих двух функций мы можем отображать и создавать эту переменную типа Error, но как передать сообщение об ошибке обратно клиенту? Затем проблема начинает усложняться:rpc/error.go

func MarshalError (err error, ctx context.Context) error {
  rerr, ok := err.(*Error)
  if !ok {
    return err
  }
  pberr, marshalerr := pb.Marshal(rerr)
  if marshalerr == nil {
    md := metadata.Pairs("rpc-error", base64.StdEncoding.EncodeToString(pberr))
    _ = grpc.SetTrailer(ctx, md)
  }
  return status.Errorf(codes.Code(rerr.Code), rerr.Message)
}
func UnmarshalError(err error, md metadata.MD) *Error {
  vals, ok := md["rpc-error"]
  if !ok {
    return nil
  }
  buf, err := base64.StdEncoding.DecodeString(vals[0])
  if err != nil {
    return nil
  }
  var rerr Error
  if err := pb.Unmarshal(buf, &rerr); err != nil {
    return nil
  }
  return &rerr
}

interceptor.go

func serverInterceptor (
  ctx context.Context,
  req interface{},
  info *grpc.UnaryServerInfo,
  handler grpc.UnaryHandler,
) (interface{}, error) {
  start := time.Now()
  resp, err := handler(ctx, req)
  err = rpc.MarshalError(err, ctx)
  log.Print(...)
  return resp, err
}

это некрасиво, но работает.

Вот как можно обойти эту проблему, когда gRPC не поддерживает расширенную ошибку, и справиться с этим. Теперь это позволяет ошибкам распространяться за пределы хоста.

Пример производственной среды: Дамп

Другой клиент пришел, чтобы попросить его.Некоторые клиенты сказали, что мы можем сохранить или получить его, но как мы можем получить все данные в нем? Так что спрос есть, и я надеюсь реализовать операцию Dump(), которая сможет извлечь все данные.

Теперь, когда мы знакомы с дорогой, давайте изменим IDL и добавим функцию Dump().

service Cache {
  rpc Store(StoreReq) returns (StoreResp) {}
  rpc Get(GetReq) returns (GetResp) {}
  rpc Dump(DumpReq) returns (DumpResp) {}
}
message DumpReq{
}
message DumpResp {
  repeated DumpItem items = 1;
}
message DumpItem {
  string key = 1;
  bytes val = 2;
}

Здесь DumpResp использует повтор, потому что я не знаю, почему он не называется массивом в protobuf.

Пример производственной среды: управление потоком

Запустили новую функцию Дамп, и оказалось, что Дамп всем очень нравится, Дампом занимается много людей, и в результате память сервера заканчивается. Поэтому нам нужны некоторые ограничительные средства для управления потоком.

Изучив документацию, я обнаружил, что мы можем контролировать, сколько одновременных доступов может быть сделано одновременно, и как часто можно получить доступ к службе.server.go

func runServer() error {
  ...
  srv := grpc.NewServer(grpc.Creds(tlsCreds),
    ServerInterceptor(),
    grpc.MaxConcurrentStreams(64),
    grpc.InTapHandle(NewTap().Handler))
  rpc.RegisterCacheServer(srv, NewCacheService(accounts))
  l, err := net.Listen("tcp", "localhost:5051")
  if err != nil {
    return err
  }
  l = netutil.LimitListener(l, 1024)
  return srv.Serve(l)
}

Здесь мы используем netutil.LimitListener(l, 1024) для управления общим количеством соединений, а затем используем grpc.MaxConcurrentStreams(64), чтобы указать, сколько одновременных потоков может иметь каждое соединение grpc. Эти два вместе в основном контролируют общее количество параллелизма.

Но в gRPC нет возможности ограничить частоту доступа к нему. Поэтому здесь используется grpc.InTapHandle(NewTap().Handler)) для настройки, которая выполняется в более расширенном положении.

tap.go

type Tap struct {
  lim *rate.Limiter
}
func NewTap() *Tap {
  return &Tap(rate.NewLimiter(150, 5))
}
func (t *Tap) Handler(ctx context.Context, info *tap.Info) (context.Context, error) {
  if !t.lim.Allow() {
    return nil, status.Errorf(codes.ResourceExhausted, "service is over rate limit")
  }
  return ctx, nil
}

Пример производственной среды: потоковая передача

После того, как было развернуто предыдущее решение, память окончательно уменьшилась, но перед остальными я обнаружил, что людям все больше нравится пользоваться этим сервисом кэширования, а памяти не хватает. В это время мы начали думать о том, можно ли подкорректировать дизайн, вместо того, чтобы генерировать полный возвращаемый массив в памяти каждый раз при выполнении Dump, он будет отправляться обратно по запросу в виде потока.app.proto

syntax = "proto3";
package rpc;
service Cache {
  rpc Store(StoreReq) returns (StoreResp) {}
  rpc Get(GetReq) returns (GetResp) {}
  rpc Dump(DumpReq) returns (stream DumpItem) {}
}
message DumpReq{
}
message DumpItem {
  string key = 1;
  bytes val = 2;
}

Здесь мы больше не используем повторяющиеся массивы, а используем поток, после того как клиент запрашивает Dump(), результат отправляется обратно в виде потока.server.go

func (s *CacheService) Dump(req *rpc.DumpReq, stream rpc.Cache_DumpServer) error {
  for k, v := range s.store {
    stream.Send(&rpc.DumpItem{
      Key: k,
      Val: v,
    })
  }
  return nil
}

Мы модифицируем реализацию Dump() для отправки в поток с помощью stream.Send() для каждой записи.

Обратите внимание, что здесь у нас нет контекста, только поток.client.go

func runClient() error {
  ...
  stream, err := cache.Dump(context.Background(), &rpc.DumpReq{})
  if err != nil {
    return fmt.Errorf("failed to dump: %v", err)
  }
  for {
    item, err := stream.Recv()
    if err == io.EOF {
      break
    }
    if err != nil {
      return fmt.Errorf("failed to stream item: %v", err)
    }
  }
  return nil
}

Пример производственной среды: горизонтальное расширение, балансировка нагрузки

После использования потоковой передачи производительность сервера значительно улучшилась, однако наш сервис настолько привлекателен, у нас становится все больше и больше пользователей, и в результате у нас заканчивается память. В это время мы просмотрели код и почувствовали, что все можно сделать.Может быть, пришло время масштабироваться с одного сервера на несколько серверов и использовать балансировку нагрузки между ними.

gRPC — это связь с длительным подключением, поэтому, если клиент подключается к конечной точке gRPC, он всегда будет подключен к фиксированному серверу, поэтому балансировка нагрузки нескольких серверов не имеет смысла для одного и того же клиента, поскольку у клиента большое количество запросов. , это приведет к разрозненным запросам к разным серверам.

Если мы хотим, чтобы клиент воспользовался преимуществами многосерверного механизма, нам нужен более интеллектуальный клиент, который информирует клиента о наличии нескольких копий сервера, поэтому клиент устанавливает несколько соединений с разными серверами, чтобы один клиент мог Сторона использует горизонтальную масштабируемость балансировки нагрузки.

Пример производственной среды: многоязычное сотрудничество

В сложных средах наши клиенты gRPC (или даже серверы) могут работать на разных языковых платформах. На самом деле это преимущество gRPC, которое может легко реализовать связь между языковыми платформами.

Например, мы можем сделать клиент Python:

import grpc
import rpc_pb2 as rpc
channel = grpc.insecure_channel('localhost:5051')
cache_svc = rpc.CacheStub(channel)
resp = cache_svc.Get(rpc.GetReq(
  key="gopher",
))
print resp.val

Одна неприятная вещь заключается в том, что, хотя межъязыковая коммуникация gRPC очень удобна, реализация каждого языка относительно произвольна, например, CacheClient() в Go и CacheStub() в Python. Нет особой причины для разных названий, это потому, что разные авторы называли их в соответствии со своими собственными идеями, когда они их реализовывали.

Где gRPC еще не идеален

  • балансировки нагрузки
  • структурированные сообщения об ошибках
  • Пока нет поддержки JS в браузере (в каком-то смысле это наиболее часто используемый клиент)
  • Есть также частые изменения API (даже 1.0)
  • Очень плохая документация для некоторых языковых реализаций
  • Нет стандартизированной практики для разных языков

Варианты использования gRPC в производственной среде

  • ngrok, все 20+ внутренних коммуникаций проходят через gRPC
  • Square, заменившая все внутренние коммуникации на gRPC, стала первым пользователем и участником, использовавшим gRPC.
  • CoreOS, etcd v3 полностью gRPC
  • Google, Google Cloud Service (PubSub, Speech Rec) используют gRPC
  • Нетфликс, Йик Як, VSCO, Таракан, …

Будущие изменения для gRPC

  • Для будущих изменений, проверьте:
  • Поддержка нового языка (SwiftиHaskellв стадии тестирования)
  • Стабильность, надежность, улучшение производительности
  • Добавьте более детализированные API для поддержки настраиваемого поведения (управление соединениями, отслеживание каналов).
  • Браузер JS

Эта статья воспроизведена из:blog.horn99.org/post/go wave…

Личный публичный аккаунт WeChat:

Персональный гитхаб:

github.com/jiankunking

личный блог:

jiankunking.com