предисловие
Пока наш фреймворк имеет некоторые функции управления сервисом, на этот раз мы реализуем некоторые другие функции на основе предыдущих. Из-за нехватки места здесь приведена только часть реализации и полная ссылка на код:github
реестр зоопарка
Просто реализуйте интерфейс нашего предыдущего реестра.Здесь мы используем libkv docker вместо прямого использования клиента zk (изrpcxЯ узнал об этом), libkv инкапсулирует операции для нескольких служб хранения, включая Consul, Etcd, Zookeeper и BoltDB.Если вы хотите поддерживать другие типы хранилищ, вам нужно написать свой собственный клиент. Реестр на основе zk определяется следующим образом:
type ZookeeperRegistry struct {
AppKey string //一个ZookeeperRegistry实例和一个appkey关联
ServicePath string //数据存储的基本路径位置,比如/service/providers
UpdateInterval time.Duration //定时拉取数据的时间间隔
kv store.Store //封装过的zk客户端
providersMu sync.RWMutex
providers []registry.Provider //本地缓存的列表
watchersMu sync.Mutex
watchers []*Watcher //watcher列表
}
Инициализирующая часть логики выглядит следующим образом:
func NewZookeeperRegistry(AppKey string, ServicePath string, zkAddrs []string,
updateInterval time.Duration, cfg *store.Config) registry.Registry {
zk := new(ZookeeperRegistry)
zk.AppKey = AppKey
zk.ServicePath = ServicePath
zk.UpdateInterval = updateInterval
kv, err := libkv.NewStore(store.ZK, zkAddrs, cfg)
if err != nil {
log.Fatalf("cannot create zk registry: %v", err)
}
zk.kv = kv
basePath := zk.ServicePath
if basePath[0] == '/' { //路径不能以"/"开头
basePath = basePath[1:]
zk.ServicePath = basePath
}
//先创建基本路径
err = zk.kv.Put(basePath, []byte("base path"), &store.WriteOptions{IsDir: true})
if err != nil {
log.Fatalf("cannot create zk path %s: %v", zk.ServicePath, err)
}
//显式拉取第一次数据
zk.doGetServiceList()
go func() {
t := time.NewTicker(updateInterval)
for range t.C {
//定时拉取数据
zk.doGetServiceList()
}
}()
go func() {
//后台watch数据
zk.watch()
}()
return zk
}
Когда мы инициализируем реестр, мы выполняем две фоновые задачи: регулярное извлечение и мониторинг данных, что эквивалентно комбинации push и pull. В то же время данные, полученные мониторингом, являются полным объемом данных, потому что его проще реализовать.Если список услуг будет становиться все больше и больше в будущем, возможно, потребуется добавить механизм, основанный на номере версии или передавать только инкрементные данные. Вот несколько дополнительных моментов:
- Фон регулярно извлекает данные и кэширует их
- Возврат к кешу напрямую при запросе
- Добавлять узлы в zk при регистрации, удалять узлы в zk при выходе
- При мониторинге он отслеживает не каждого поставщика услуг, а его родительский каталог.При изменении список поставщиков услуг вытягивается равномерно, что может уменьшить количество наблюдателей и упростить логику.
- Из-за пункта 4 вам необходимо изменить содержимое родительского каталога (lastUpdate), чтобы инициировать мониторинг при регистрации и отмене регистрации.
Конкретная логика регистрации и отмены регистрации здесь не приводится, см.:github
сердцебиение клиента
Если мы используем zk в качестве реестра, более простым подходом может быть прямое добавление поставщика услуг в zk в качестве временного узла, чтобы мы могли использовать характеристики временного узла для достижения динамического обнаружения службы. Однако используемая нами библиотека libkv не поддерживает функцию временных узлов, а другие службы хранения, такие как etcd, могут не поддерживать функции временных узлов, кроме zk, поэтому все, что мы регистрируем в реестре,постоянный узел.在这种情况下,可能某些由于特殊情况无法访问的服务提供者并没有及时地将自身从注册中心注销掉,所以客户端需要额外的能力来判断一个服务提供者是否可用,而不是完全依赖注册中心。
Следовательно, нам нужно добавить поддержку пульса клиента.Клиент может регулярно отправлять запросы пульса на сервер, и сервер может возвращаться непосредственно, когда он получает запрос пульса, если клиент уведомлен о том, что он все еще доступен. Клиент может понизить рейтинг поставщика услуг, пульсация которого не удалась, в соответствии с установленным пороговым значением до тех пор, пока пульсация не восстановится или пока поставщик услуг не выйдет из системы. Клиент отправляет логику пульса следующим образом:
func (c *sgClient) heartbeat() {
if c.option.HeartbeatInterval <= 0 {
return
}
//根据指定的时间间隔发送心跳
t := time.NewTicker(c.option.HeartbeatInterval)
for range t.C {
if c.shutdown {
t.Stop()
return
}
//遍历每个RPCClient进行心跳检查
c.clients.Range(func(k, v interface{}) bool {
err := v.(RPCClient).Call(context.Background(), "", "", nil)
c.mu.Lock()
if err != nil {
//心跳失败进行计数
if fail, ok := c.clientsHeartbeatFail[k.(string)]; ok {
fail++
c.clientsHeartbeatFail[k.(string)] = fail
} else {
c.clientsHeartbeatFail[k.(string)] = 1
}
} else {
//心跳成功则进行恢复
c.clientsHeartbeatFail[k.(string)] = 0
c.serversMu.Lock()
for i, p := range c.servers {
if p.ProviderKey == k {
delete(c.servers[i].Meta, protocol.ProviderDegradeKey)
}
}
c.serversMu.Unlock()
}
c.mu.Unlock()
//心跳失败次数超过阈值则进行降级
if c.clientsHeartbeatFail[k.(string)] > c.option.HeartbeatDegradeThreshold {
c.serversMu.Lock()
for i, p := range c.servers {
if p.ProviderKey == k {
c.servers[i].Meta[protocol.ProviderDegradeKey] = true
}
}
c.serversMu.Unlock()
}
return true
})
}
}
Аутентификация
Реализация аутентификации относительно проста: клиент может нести информацию, связанную с аутентификацией, в метаданных, а сервер может выполнять аутентификацию через указанную оболочку. Код серверной Wrapper выглядит следующим образом:
type AuthFunc func(key string) bool
type ServerAuthInterceptor struct {
authFunc AuthFunc
}
func NewAuthInterceptor(authFunc AuthFunc) Wrapper {
return &ServerAuthInterceptor{authFunc}
}
func (sai *ServerAuthInterceptor) WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc {
return func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) {
if auth, ok := ctx.Value(protocol.AuthKey).(string); ok {
//鉴权通过则执行业务逻辑
if sai.authFunc(auth) {
requestFunc(ctx, response, response, tr)
return
}
}
//鉴权失败则返回异常
s.writeErrorResponse(response, tr, "auth failed")
}
}
Слияние понижения
Простой автоматический выключатель на основе временного окна временно реализуется следующим образом:
type CircuitBreaker interface {
AllowRequest() bool
Success()
Fail(err error)
}
type DefaultCircuitBreaker struct {
lastFail time.Time
fails uint64
threshold uint64
window time.Duration
}
func NewDefaultCircuitBreaker(threshold uint64, window time.Duration) *DefaultCircuitBreaker {
return &DefaultCircuitBreaker{
threshold: threshold,
window: window,
}
}
func (cb *DefaultCircuitBreaker) AllowRequest() bool {
if time.Since(cb.lastFail) > cb.window {
cb.reset()
return true
}
failures := atomic.LoadUint64(&cb.fails)
return failures < cb.threshold
}
func (cb *DefaultCircuitBreaker) Success() {
cb.reset()
}
func (cb *DefaultCircuitBreaker) Fail() {
atomic.AddUint64(&cb.fails, 1)
cb.lastFail = time.Now()
}
func (cb *DefaultCircuitBreaker) reset() {
atomic.StoreUint64(&cb.fails, 0)
cb.lastFail = time.Now()
}
Эпилог
Это конец содержания. Если у вас есть какие-либо замечания или предложения, вы можете их исправить.
ссылка на историю
Внедрение инфраструктуры RPC с нуля (ноль)