предисловие
предыдущий постЗдесь мы реализовали базовый клиент и сервер RPC, на этот раз мы приступили к реализации функций более высокого уровня. Из-за нехватки места, пожалуйста, обратитесь к конкретной реализации кода:кодовый адрес
базовая опорная часть
Обновленные версии Клиента и Сервера
Давайте сначала переопределим Клиента и Сервера: SGClient и SGServer.SGClient
Он инкапсулирует работу RPCClient, определенную в предыдущем разделе, и предоставляет связанные функции управления службами;SGServer
Он обновлен по сравнению с RPCServer, определенным в предыдущем разделе, для поддержки связанных функций управления службами. SG (управление услугами) здесь означает управление услугами.
Соответствующие определения прямо вставлены сюда:
type SGClient interface {
Go(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) (*Call, error)
Call(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error
}
type sgClient struct {
shutdown bool
option SGOption
clients sync.Map //map[string]RPCClient
serversMu sync.RWMutex
servers []registry.Provider
}
type RPCServer interface {
Register(rcvr interface{}, metaData map[string]string) error
Serve(network string, addr string) error
Services() []ServiceInfo
Close() error
}
type SGServer struct { //原来的RPCServer
codec codec.Codec
serviceMap sync.Map
tr transport.ServerTransport
mutex sync.Mutex
shutdown bool
Option Option
}
перехватчик
Как упоминалось в предыдущей статье, нам нужно обеспечить одинаковый способ использования фильтров для достижения цели открытия для расширений и закрытия для модификаций. Здесь мы используем функции высшего порядка для определения квадратного аспекта и перехватчика метода.Сначала определим несколько аспектов:
//客户端切面
type CallFunc func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error
type GoFunc func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) *Call
//服务端切面
type ServeFunc func(network string, addr string) error
type ServeTransportFunc func(tr transport.Transport)
type HandleRequestFunc func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport)
Выше приведены несколько функций, через которые вызовы RPC будут проходить на стороне клиента и сервера.Мы определяем их как аспекты, а затем определяем соответствующие перехватчики:
//客户端拦截器
packege client
type Wrapper interface {
WrapCall(option *SGOption, callFunc CallFunc) CallFunc
WrapGo(option *SGOption, goFunc GoFunc) GoFunc
}
//f服务端拦截器
package server
type Wrapper interface {
WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc
WrapServeTransport(s *SGServer, transportFunc ServeTransportFunc) ServeTransportFunc
WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc
}
Таким образом, пользователи могут реализоватьWapper
Интерфейс для улучшения поведения клиента или сервера, например параметров запроса и результатов, записываемых в журнал, или в ответ на динамические изменения параметров и тому подобное. Наши собственные функции, связанные с фреймворком, также могутWrapper
выполнить. В настоящее время клиент реализует метод инкапсуляции метаданных.MetaDataWrapper
и регистрировать запросы и ответыLogWrapper
; сервер в данный момент находится вDefaultWrapper
Реализована логика регистрации службы, прослушивания сигналов выхода и подсчета запросов.
Поскольку go не предоставляет метод абстрактного класса, может не потребоваться перехватывать все аспекты для некоторых классов реализации (например, только перехватывающий Call не хочет перехватывать Go). В этом случае вы можете напрямую вернуть объект функции в параметр.
Реализация клиентского перехватчика
Реализация перехватчика на стороне сервера
Секция управления услугами
Регистрация и обнаружение службы
До этого вызовы нашей службы RPC вызывались путем указания ip и порта сервера на стороне клиента.Этот метод очень прост, но также имеет очень ограниченные сценарии.По оценкам, его можно использовать только в тестах или демонстрациях. Поэтому нам необходимо предоставить функции, связанные с регистрацией и обнаружением сервисов, чтобы конфигурация клиента больше не была привязана к фактическому IP, а получала список серверов через независимый реестр и могла получать обновления в реальном времени, когда изменения узла сервера.
Сначала определите соответствующий интерфейс (кодовый адрес):
//Registry包含两部分功能:服务注册(用于服务端)和服务发现(用于客户端)
type Registry interface {
Register(option RegisterOption, provider ...Provider) //注册
Unregister(option RegisterOption, provider ...Provider) //注销
GetServiceList() []Provider //获取服务列表
Watch() Watcher //监听服务列表的变化
Unwatch(watcher Watcher) //取消监听
}
type RegisterOption struct {
AppKey string //AppKey用于唯一标识某个应用
}
type Watcher interface {
Next() (*Event, error) //获取下一次服务列表的更新
Close()
}
type EventAction byte
const (
Create EventAction = iota
Update
Delete
)
type Event struct { //Event表示一次更新
Action EventAction
AppKey string
Providers []Provider //具体变化的服务提供者(增量而不是全量)
}
type Provider struct { //某个具体的服务提供者
ProviderKey string // Network+"@"+Addr
Network string
Addr string
Meta map[string]string
}
AppKey
Мы используем концепцию AppKey для идентификации службы, напримерcom.meituan.demo.rpc.server
. Сервер регистрирует свою собственную связанную информацию (включая AppKey, ip, порт, список методов и т. д.) в реестре при запуске; клиенту нужно только выполнить поиск в реестре в соответствии с AppKey сервера, когда ему нужно вызвать.
В настоящее время пока реализовано только прямое подключение (peer2peer) и регистрация сервисов в памяти (InMemory), а другие независимые компоненты, такие как etcd или zookeeper, будут подключены позже.
Адрес реализации кода InMemory
Балансировка нагрузки
При регистрации и обнаружении службы клиент может столкнуться с более чем одним сервером. Клиенту необходимо выбрать один из нескольких серверов для фактической связи, прежде чем инициировать вызов. Конкретная стратегия выбора заключается в следующем: Многие, такие как случайный выбор, циклический выбор , выбор на основе веса, на основе нагрузки на сервер или настраиваемые правила и т. д.
Вот определение интерфейса:
//Filter用于自定义规则过滤某个节点
type Filter func(provider registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}) bool
type SelectOption struct {
Filters []Filter
}
type Selector interface {
Next(providers []registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}, opt SelectOption) (registry.Provider, error)
}
В настоящее время реализована только случайная балансировка нагрузки, а другие стратегии, такие как круглый робин или последовательное перемешивание, будут реализованы в будущем. Пользователи также могут выбрать реализовать свои собственные стратегии балансировки нагрузки.
Отказоустойчивость
Длительное соединение и переподключение к сети
Для того, чтобы уменьшить накладные расходы на частое создание и разрыв сетевых соединений, мы поддерживаем длительное соединение от клиента к серверу, и кэшируем созданное соединение (объект RPCClient) с картой, а ключом является идентификатор соответствующего сервера. Перед вызовом клиент извлекает кэшированный RPCClient в соответствии с результатом балансировки нагрузки, а затем инициирует вызов. Когда мы не можем получить соответствующий клиент или обнаруживаем, что срок действия кэшированного клиента истек, нам необходимо повторно установить соединение (повторно создать объект RPCClient).
func (c *sgClient) selectClient(ctx context.Context, ServiceMethod string, arg interface{}) (provider registry.Provider, client RPCClient, err error) {
//根据负载均衡决定要调用的服务端
provider, err = c.option.Selector.Next(c.providers(), ctx, ServiceMethod, arg, c.option.SelectOption)
if err != nil {
return
}
client, err = c.getClient(provider)
return
}
func (c *sgClient) getClient(provider registry.Provider) (client RPCClient, err error) {
key := provider.ProviderKey
rc, ok := c.clients.Load(key)
if ok {
client := rc.(RPCClient)
if client.IsShutDown() {
//如果已经失效则清除掉
c.clients.Delete(key)
}
}
//再次检索
rc, ok = c.clients.Load(key)
if ok {
//已经有缓存了,返回缓存的RPCClient
client = rc.(RPCClient)
} else {
//没有缓存,新建一个然后更新到缓存后返回
client, err = NewRPCClient(provider.Network, provider.Addr, c.option.Option)
if err != nil {
return
}
c.clients.Store(key, client)
}
return
}
В текущей реализации каждому сервис-провайдеру соответствует только один RPCClient.В будущем можно рассматривать реализации, аналогичные пулам соединений, то есть каждому сервис-провайдеру соответствует несколько RPCClient, и перед каждым вызовом из пула соединений берется один RPCClient .
Отказоустойчивость кластера
В распределенной системе исключения неизбежны. В случае сбоя вызова мы можем выбрать метод обработки, который следует использовать. Вот некоторые распространенные:
type FailMode byte
const (
FailFast FailMode = iota //快速失败
FailOver //重试其他服务器
FailRetry //重试同一个服务器
FailSafe //忽略失败,直接返回
)
Конкретная реализация относительно проста, то есть в соответствии с настроенными параметрами отказоустойчивости и количеством попыток принять решение о повторной попытке; другие включают FailBack (повторная передача после задержки), Fork и Broadcast и т. д., которые не были реализованы для время.
изящно выйти
При получении сигнала выхода из программы серверная сторона попытается сначала обработать незавершенный в данный момент запрос, а затем выйти после обработки запроса.Когда указанное время (по умолчанию 12 с) не будет обработано, серверная сторона сразу завершит работу.
func (s *SGServer) Close() error {
s.mutex.Lock()
defer s.mutex.Unlock()
s.shutdown = true
//等待当前请求处理完或者直到指定的时间
ticker := time.NewTicker(s.Option.ShutDownWait)
defer ticker.Stop()
for {
if s.requestInProcess <= 0 { //requestInProcess表示当前正在处理的请求数,在wrapper里计数
break
}
select {
case <-ticker.C:
break
}
}
return s.tr.Close()
}
Эпилог
Вот и весь контент этого времени, вообще говоря, он инкапсулируется на основе предыдущего, а последующие точки расширения резервируются, а затем реализуются простые функции, связанные с управлением услугами. Подводя итог, на этот раз мыпредыдущий постНа основании следующих изменений:
- Переопределить интерфейс клиента и сервера
- Предоставляет перехватчик (интерфейс Wrapper)
- Предоставляет интерфейсы регистрации и обнаружения служб и балансировки нагрузки, а также простые реализации.
- Простая отказоустойчивая обработка
- Простой изящный выход
- Добавлена поддержка сериализации гобов (сравнительно простая, в статье не упоминается)