Go создает инфраструктуру микросервисов на основе grpc — регистрация и обнаружение сервисов.

задняя часть Микросервисы Go gRPC
Go создает инфраструктуру микросервисов на основе grpc — регистрация и обнаружение сервисов.

Обзор

图
grpc — это инфраструктура RPC с открытым исходным кодом от Google. Она реализована на основе http2 и поддерживает кросс-языковость. В настоящее время она в основном охватывает основные языки. язык генерируется инструментом protobuf для использования.

Для нового языка, такого как go, экологическая цепочка все еще находится в стадии разработки, как и среда микросервисов.Следующее будет создавать структуру связи микросервисов на основе версии grpc-go.

1. Механизм регистрации и публикации сервиса

1.1 Проблема решена

Регистрация и публикация службы в основном решают проблему зависимости службы. В общем, если служба A вызывает службу B, наиболее прямым способом является настройка IP-адреса и порта. Однако по мере увеличения числа зависимостей службы конфигурация будет очень сложной. , а при переносе службы необходимо изменить конфигурацию всех связанных служб, что очень сложно поддерживать и чревато проблемами. Поэтому, чтобы решить эту зависимость от службы, возникла регистрация и публикация службы.

1.2 Механизм

图
Регистрация и обнаружение службы в основном делятся на следующие пункты.

  • Выпуск служебной информации
    В основном это имя службы, информация об IP-адресе и некоторые метаданные вложения службы.Зарегистрируйтесь в центре публикации службы регистрации через интерфейс регистрации.
  • Обнаружение выживания
    Когда служба неожиданно останавливается, клиент должен воспринять остановку службы и исключить IP-адрес службы из списка доступных IP-адресов, что может быть достигнуто с помощью синхронизированного пульса.
  • Балансировка клиентской нагрузки
    За счет регистрации и публикации службы можно развернуть несколько экземпляров службы, и клиент может добиться прямой балансировки нагрузки в экземпляре, реализуя тем самым горизонтальное расширение службы.

Таким образом, регистрацию и выпуск службы можно резюмировать следующим образом: служба сообщает информацию, клиент извлекает информацию о службе и выполняет вызов через имя службы, когда служба не работает, клиент отключает неисправную службу и клиент автоматически добавляется в список вызовов при запуске службы.

2. Осознайте

Вся реализация grpc-go использует многие функции интерфейса go, поэтому, расширив интерфейс, можно легко реализовать регистрацию и обнаружение сервисов.Здесь реестр сервисов учитывает доступность и согласованность и обычно использует etcd или zookeeper для его реализации Версия etcd.
Полный код и примеры использования см.grpc-wrapper

2.1 Клиент

В частности, необходимо реализовать несколько интерфейсов.Для клиента в самой простой реализации нужно реализовать только два метода интерфейса Resolve() и Next(), а затем использовать метод балансировки нагрузки опроса.
В основном это реализовано через интерфейс Get и интерфейс Watch в etcd.

  • Интерфейс разрешения()
//用于生成Watcher,监听注册中心中的服务信息变化
func (er *etcdRegistry) Resolve(target string) (naming.Watcher, error) {
	ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut)
	w := &etcdWatcher{
		cli:    er.cli,
		target: target + "/",
		ctx:    ctx,
		cancel: cancel,
	}
	return w, nil
}

  • Интерфейс Далее()
//Next接口主要用于获取注册的服务信息,通过channel以及watch,当服务信息发生
//变化时,Next接口会将变化返回给grpc框架从而实现服务信息变更.
func (ew *etcdWatcher) Next() ([]*naming.Update, error) {
	var updates []*naming.Update
    //初次获取时,创建监听channel,并返回获取到的服务信息
	if ew.watchChan == nil {
		//create new chan
		resp, err := ew.cli.Get(ew.ctx, ew.target, etcd.WithPrefix(), etcd.WithSerializable())
		if err != nil {
			return nil, err
		}
		for _, kv := range resp.Kvs {
			var upt naming.Update
			if err := json.Unmarshal(kv.Value, &upt); err != nil {
				continue
			}
			updates = append(updates, &upt)
		}
        //创建etcd的watcher监听target(服务名)的信息.
		opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
		ew.watchChan = ew.cli.Watch(context.TODO(), ew.target, opts...)
		return updates, nil
	}

    //阻塞监听,服务发生变化时才返回给上层
	wrsp, ok := <-ew.watchChan
	if !ok {
		err := status.Error(codes.Unavailable, "etcd watch closed")
		return nil, err
	}
	if wrsp.Err() != nil {
		return nil, wrsp.Err()
	}
	for _, e := range wrsp.Events {
		var upt naming.Update
		var err error
		switch e.Type {
		case etcd.EventTypePut:
			err = json.Unmarshal(e.Kv.Value, &upt)
			upt.Op = naming.Add
		case etcd.EventTypeDelete:
			err = json.Unmarshal(e.PrevKv.Value, &upt)
			upt.Op = naming.Delete
		}

		if err != nil {
			continue
		}
		updates = append(updates, &upt)
	}
	return updates, nil
}

2.2 Сервер

Серверу нужно только сообщать служебную информацию и регулярно поддерживать пульс, что реализовано через интерфейс Put в etcd и интерфейс KeepAlive. детали следующим образом:

func (er *etcdRegistry) Register(ctx context.Context, target string, update naming.Update, opts ...wrapper.RegistryOptions) (err error) {
	//将服务信息序列化成json格式
    var upBytes []byte
	if upBytes, err = json.Marshal(update); err != nil {
		return status.Error(codes.InvalidArgument, err.Error())
	}

	ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut)
	er.cancal = cancel
	rgOpt := wrapper.RegistryOption{TTL: wrapper.DefaultRegInfTTL}
	for _, opt := range opts {
		opt(&rgOpt)
	}

	switch update.Op {
	case naming.Add:
		lsRsp, err := er.lsCli.Grant(ctx, int64(rgOpt.TTL/time.Second))
		if err != nil {
			return err
		}

        //Put服务信息到etcd,并设置key的值TTL,通过后面的KeepAlive接口
        //对TTL进行续期,超过TTL的时间未收到续期请求,则说明服务可能挂了,从而清除服务信息
		etcdOpts := []etcd.OpOption{etcd.WithLease(lsRsp.ID)}
		key := target + "/" + update.Addr
		_, err = er.cli.KV.Put(ctx, key, string(upBytes), etcdOpts...)
		if err != nil {
			return err
		}

        //保持心跳
		lsRspChan, err := er.lsCli.KeepAlive(context.TODO(), lsRsp.ID)
		if err != nil {
			return err
		}
		go func() {
			for {
				_, ok := <-lsRspChan
				if !ok {
					grpclog.Fatalf("%v keepalive channel is closing", key)
					break
				}
			}
		}()
	case naming.Delete:
		_, err = er.cli.Delete(ctx, target+"/"+update.Addr)
	default:
		return status.Error(codes.InvalidArgument, "unsupported op")
	}
	return nil
}

3. Ссылка

grpc
etcd
grpc-wrapper