Внедрение инфраструктуры RPC с нуля (2)

Go

предисловие

предыдущий постЗдесь мы реализовали базовый клиент и сервер RPC, на этот раз мы приступили к реализации функций более высокого уровня. Из-за нехватки места, пожалуйста, обратитесь к конкретной реализации кода:кодовый адрес

базовая опорная часть

Обновленные версии Клиента и Сервера

клиентская реализация

реализация сервера

Давайте сначала переопределим Клиента и Сервера: SGClient и SGServer.SGClientОн инкапсулирует работу RPCClient, определенную в предыдущем разделе, и предоставляет связанные функции управления службами;SGServerОн обновлен по сравнению с RPCServer, определенным в предыдущем разделе, для поддержки связанных функций управления службами. SG (управление услугами) здесь означает управление услугами. Соответствующие определения прямо вставлены сюда:

type SGClient interface {
	Go(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) (*Call, error)
	Call(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error
}
type sgClient struct {
	shutdown  bool
	option    SGOption
	clients   sync.Map //map[string]RPCClient
	serversMu sync.RWMutex
	servers   []registry.Provider
}
type RPCServer interface {
	Register(rcvr interface{}, metaData map[string]string) error
	Serve(network string, addr string) error
	Services() []ServiceInfo
	Close() error
}
type SGServer struct { //原来的RPCServer
	codec      codec.Codec
	serviceMap sync.Map
	tr         transport.ServerTransport
	mutex      sync.Mutex
	shutdown   bool
	Option Option
}

перехватчик

Как упоминалось в предыдущей статье, нам нужно обеспечить одинаковый способ использования фильтров для достижения цели открытия для расширений и закрытия для модификаций. Здесь мы используем функции высшего порядка для определения квадратного аспекта и перехватчика метода.Сначала определим несколько аспектов:

//客户端切面
type CallFunc func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error
type GoFunc func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) *Call
//服务端切面
type ServeFunc func(network string, addr string) error
type ServeTransportFunc func(tr transport.Transport)
type HandleRequestFunc func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport)

Выше приведены несколько функций, через которые вызовы RPC будут проходить на стороне клиента и сервера.Мы определяем их как аспекты, а затем определяем соответствующие перехватчики:

//客户端拦截器
packege client
type Wrapper interface {
	WrapCall(option *SGOption, callFunc CallFunc) CallFunc
	WrapGo(option *SGOption, goFunc GoFunc) GoFunc
}
//f服务端拦截器
package server
type Wrapper interface {
	WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc
	WrapServeTransport(s *SGServer, transportFunc ServeTransportFunc) ServeTransportFunc
	WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc
}

Таким образом, пользователи могут реализоватьWapperИнтерфейс для улучшения поведения клиента или сервера, например параметров запроса и результатов, записываемых в журнал, или в ответ на динамические изменения параметров и тому подобное. Наши собственные функции, связанные с фреймворком, также могутWrapperвыполнить. В настоящее время клиент реализует метод инкапсуляции метаданных.MetaDataWrapperи регистрировать запросы и ответыLogWrapper; сервер в данный момент находится вDefaultWrapperРеализована логика регистрации службы, прослушивания сигналов выхода и подсчета запросов.

Поскольку go не предоставляет метод абстрактного класса, может не потребоваться перехватывать все аспекты для некоторых классов реализации (например, только перехватывающий Call не хочет перехватывать Go). В этом случае вы можете напрямую вернуть объект функции в параметр.

Реализация клиентского перехватчика

Реализация перехватчика на стороне сервера

Секция управления услугами

Регистрация и обнаружение службы

До этого вызовы нашей службы RPC вызывались путем указания ip и порта сервера на стороне клиента.Этот метод очень прост, но также имеет очень ограниченные сценарии.По оценкам, его можно использовать только в тестах или демонстрациях. Поэтому нам необходимо предоставить функции, связанные с регистрацией и обнаружением сервисов, чтобы конфигурация клиента больше не была привязана к фактическому IP, а получала список серверов через независимый реестр и могла получать обновления в реальном времени, когда изменения узла сервера.

Сначала определите соответствующий интерфейс (кодовый адрес):

//Registry包含两部分功能:服务注册(用于服务端)和服务发现(用于客户端)
type Registry interface {
	Register(option RegisterOption, provider ...Provider) //注册
	Unregister(option RegisterOption, provider ...Provider) //注销
	GetServiceList() []Provider //获取服务列表
	Watch() Watcher //监听服务列表的变化
	Unwatch(watcher Watcher) //取消监听
}
type RegisterOption struct {
	AppKey string //AppKey用于唯一标识某个应用
}
type Watcher interface {
	Next() (*Event, error) //获取下一次服务列表的更新
	Close()
}
type EventAction byte
const (
	Create EventAction = iota
	Update
	Delete
)
type Event struct { //Event表示一次更新
	Action    EventAction
	AppKey    string
	Providers []Provider //具体变化的服务提供者(增量而不是全量)
}
type Provider struct { //某个具体的服务提供者
	ProviderKey string // Network+"@"+Addr
	Network     string
	Addr        string
	Meta        map[string]string
}

AppKey

Мы используем концепцию AppKey для идентификации службы, напримерcom.meituan.demo.rpc.server. Сервер регистрирует свою собственную связанную информацию (включая AppKey, ip, порт, список методов и т. д.) в реестре при запуске; клиенту нужно только выполнить поиск в реестре в соответствии с AppKey сервера, когда ему нужно вызвать.

В настоящее время пока реализовано только прямое подключение (peer2peer) и регистрация сервисов в памяти (InMemory), а другие независимые компоненты, такие как etcd или zookeeper, будут подключены позже.

Адрес реализации кода InMemory

Балансировка нагрузки

При регистрации и обнаружении службы клиент может столкнуться с более чем одним сервером. Клиенту необходимо выбрать один из нескольких серверов для фактической связи, прежде чем инициировать вызов. Конкретная стратегия выбора заключается в следующем: Многие, такие как случайный выбор, циклический выбор , выбор на основе веса, на основе нагрузки на сервер или настраиваемые правила и т. д.

Вот определение интерфейса:

//Filter用于自定义规则过滤某个节点
type Filter func(provider registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}) bool
type SelectOption struct {
	Filters []Filter
}
type Selector interface {
	Next(providers []registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}, opt SelectOption) (registry.Provider, error)
}

В настоящее время реализована только случайная балансировка нагрузки, а другие стратегии, такие как круглый робин или последовательное перемешивание, будут реализованы в будущем. Пользователи также могут выбрать реализовать свои собственные стратегии балансировки нагрузки.

Отказоустойчивость

Длительное соединение и переподключение к сети

Для того, чтобы уменьшить накладные расходы на частое создание и разрыв сетевых соединений, мы поддерживаем длительное соединение от клиента к серверу, и кэшируем созданное соединение (объект RPCClient) с картой, а ключом является идентификатор соответствующего сервера. Перед вызовом клиент извлекает кэшированный RPCClient в соответствии с результатом балансировки нагрузки, а затем инициирует вызов. Когда мы не можем получить соответствующий клиент или обнаруживаем, что срок действия кэшированного клиента истек, нам необходимо повторно установить соединение (повторно создать объект RPCClient).

func (c *sgClient) selectClient(ctx context.Context, ServiceMethod string, arg interface{}) (provider registry.Provider, client RPCClient, err error) {
        //根据负载均衡决定要调用的服务端
	provider, err = c.option.Selector.Next(c.providers(), ctx, ServiceMethod, arg, c.option.SelectOption)
	if err != nil {
		return
	}
	client, err = c.getClient(provider)
	return
}

func (c *sgClient) getClient(provider registry.Provider) (client RPCClient, err error) {
	key := provider.ProviderKey
	rc, ok := c.clients.Load(key)
	if ok {
		client := rc.(RPCClient)
		if client.IsShutDown() {
		    //如果已经失效则清除掉
			c.clients.Delete(key)
		}
	}
        //再次检索
	rc, ok = c.clients.Load(key)
	if ok {
	        //已经有缓存了,返回缓存的RPCClient
		client = rc.(RPCClient)
	} else {
	        //没有缓存,新建一个然后更新到缓存后返回
		client, err = NewRPCClient(provider.Network, provider.Addr, c.option.Option)
		if err != nil {
			return
		}
		c.clients.Store(key, client)
	}
	return
}

В текущей реализации каждому сервис-провайдеру соответствует только один RPCClient.В будущем можно рассматривать реализации, аналогичные пулам соединений, то есть каждому сервис-провайдеру соответствует несколько RPCClient, и перед каждым вызовом из пула соединений берется один RPCClient .

Отказоустойчивость кластера

В распределенной системе исключения неизбежны. В случае сбоя вызова мы можем выбрать метод обработки, который следует использовать. Вот некоторые распространенные:

type FailMode byte
const (
	FailFast FailMode = iota //快速失败
	FailOver //重试其他服务器
	FailRetry //重试同一个服务器
	FailSafe //忽略失败,直接返回
)

Конкретная реализация относительно проста, то есть в соответствии с настроенными параметрами отказоустойчивости и количеством попыток принять решение о повторной попытке; другие включают FailBack (повторная передача после задержки), Fork и Broadcast и т. д., которые не были реализованы для время.

изящно выйти

При получении сигнала выхода из программы серверная сторона попытается сначала обработать незавершенный в данный момент запрос, а затем выйти после обработки запроса.Когда указанное время (по умолчанию 12 с) не будет обработано, серверная сторона сразу завершит работу.

func (s *SGServer) Close() error {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	s.shutdown = true
	//等待当前请求处理完或者直到指定的时间
	ticker := time.NewTicker(s.Option.ShutDownWait)
	defer ticker.Stop()
	for {
		if s.requestInProcess <= 0 { //requestInProcess表示当前正在处理的请求数,在wrapper里计数
			break
		}
		select {
		case <-ticker.C:
			break
		}
	}
	return s.tr.Close()
}

Эпилог

Вот и весь контент этого времени, вообще говоря, он инкапсулируется на основе предыдущего, а последующие точки расширения резервируются, а затем реализуются простые функции, связанные с управлением услугами. Подводя итог, на этот раз мыпредыдущий постНа основании следующих изменений:

  1. Переопределить интерфейс клиента и сервера
  2. Предоставляет перехватчик (интерфейс Wrapper)
  3. Предоставляет интерфейсы регистрации и обнаружения служб и балансировки нагрузки, а также простые реализации.
  4. Простая отказоустойчивая обработка
  5. Простой изящный выход
  6. Добавлена ​​поддержка сериализации гобов (сравнительно простая, в статье не упоминается)

ссылка на историю

Внедрение инфраструктуры RPC с нуля (ноль)

Внедрение инфраструктуры RPC с нуля (1)