Сегодня поговорим об обнаружении сервисов gRPC и принципах балансировки нагрузки, которые отличаются отNginx
,Lvs
илиF5
Для этих стратегий балансировки нагрузки на стороне сервера gRPC использует балансировку нагрузки, реализованную клиентом. Что это означает?Для системы, использующей балансировку нагрузки на стороне сервера, клиент сначала получает доступ к доменному имени/IP-адресу балансировки нагрузки, а затем балансировка нагрузки распределяет запрос на определенный сервисный узел на бэкэнде в соответствии с политика. Для балансировки нагрузки на стороне клиента клиент выбирает узел из списка доступных внутренних сервисных узлов для прямого подключения к внутреннему серверу в соответствии со своей собственной стратегией балансировки нагрузки.
Etcd
пакетnaming
Компонент предоставляет комбинацию преобразователя именgRPC
свой собственныйRoundRobin
Балансировщик нагрузки с циклическим планированием позволяет пользователям легко создавать систему регистрации/обнаружения служб и балансировки нагрузки. Если круговое планирование не может удовлетворить требования к планированию или вы не хотите использоватьEtcd
В качестве реестра служб и преобразователя имен его можно реализовать, написав кодgRPC
ОпределенныйResolver
а такжеBalancer
интерфейс для удовлетворения пользовательских требований системы.
Соответствующие версии исходного кода, указанные в этой статье: gRPC v1.2.x, Etcd v3.3.
Если вы не знаете про gRPC и Etcd, то можете прочитать то, что я писал давным-давноНачало работы с gRPCа такжеНачало работы с etcdсерия статей.
Обнаружение регистрации службы gRPC
Кратко объясним, как использоватьEtcd
Реализует принципы регистрации и обнаружения служб. Процесс регистрации и обнаружения сервисов можно кратко описать следующей схемой:
Служба A на приведенном выше рисунке содержит два узла. После запуска службы на узле она будет использовать уникальный идентификатор, содержащий имя службы и IP-адрес узла в качестве ключа (например, /service/a/114.128.45.117), и информация об IP-адресе и порте сервисного узла в качестве ключа. Значение хранится вEtcd
начальство. Эти ключи являются ключами с арендой, и наша служба должна регулярно продлевать аренду.Если сам сервисный узел не работает, например, сервис на узле2 не работает, и продление не может быть завершено, тогда его соответствующий ключ: / service/a /114.128.45.117 истечет, и клиент больше не сможет получать информацию об этом сервисном узле от Etcd.
При этом клиент также будет использоватьEtcd
изWatch
функциональный монитор с/servive/a
Изменения всех ключей с префиксом, если есть событие добавления или удаления ключа узлаEtcd
пройдешьWatchChan
отправлено клиенту,WatchChan
Реализация на языке программирования естьGo
изChannel
.
регистрация службы
оEtcd
регистрация услуги, в официальном программном комплексе не предусмотрена единая функция регистрации для звонков. Так как же сохранить информацию об узле в новом сервисном узле после его добавления?Etcd
и уведомить преобразователь имен? В файле naming/grpc.go исходного пакета Etcd можно обнаружить, чтоUpdate
метод, этоUpdate
Может выполнять как операции добавления, так и удаления:
func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
switch nm.Op {
case naming.Add:
var v []byte
if v, err = json.Marshal(nm); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
case naming.Delete:
_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
default:
return status.Error(codes.InvalidArgument, "naming: bad naming op")
}
return err
}
После запуска службы к ней можно получить доступ черезUpdate
способ указать собственный адрес службы и портPut
В ключе с префиксом пользовательской цели, например, на рисунке выше, переменная target должна быть именем службы /service/a, определенным нами. Как правило, в конкретной практике он упаковывается сам по себе в соответствии с потребностями системы.Update
Метод завершается регистрацией услуги и регулярным продлением сервисной ноды Key on Etcd.Практика у каждой компании разная.Конкретный код ставить не буду.Вообще, продление аренды делается черезEtcd
в арендеKeepAlive
Метод реализован (Lease.KeepAlive).
обнаружение службы
Как клиент узнает, что новый узел зарегистрирован или исходный узел остановлен? Для этого требуется помощь распознавателя имен Resolver Роль Resolver можно понимать как сопоставление строки с набором IP-портов и другой информацией.
Определение интерфейса gRPC для Resolver выглядит следующим образом:
type Resolver interface {
// Resolve creates a Watcher for target.
Resolve(target string) (Watcher, error)
}
Метод Resolve синтаксического анализатора именования вернет Watcher, который может отслеживать изменение информации об адресе внутреннего сервера, соответствующего цели, отправленной синтаксическим анализатором именования (аналогично ключу, соответствующему имени службы, упомянутому выше). пример) и уведомить балансировщик о своем собственном. Поддерживаемые адреса динамически добавляются или удаляются.
Интерфейс Watcher определяется следующим образом:
//源码地址 https://github.com/grpc/grpc-go/blob/v1.2.x/naming/naming.go
type Watcher interface {
Next() ([]*Update, error)
// Close closes the Watcher.
Close()
}
Etcd предоставляет реализации для обоих интерфейсов:
// 源码地址:https://github.com/etcd-io/etcd/blob/release-3.3/clientv3/naming/grpc.go
// GRPCResolver 实现了grpc的naming.Resolver接口
type GRPCResolver struct {
// Client is an initialized etcd client.
Client *etcd.Client
}
func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
return w, nil
}
// 实现了grpc的naming.Watcher接口
type gRPCWatcher struct {
c *etcd.Client
target string
ctx context.Context
cancel context.CancelFunc
wch etcd.WatchChan
err error
}
func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
if gw.wch == nil {
// first Next() returns all addresses
return gw.firstNext()
}
// process new events on target/*
wr, ok := <-gw.wch
if !ok {
...
updates := make([]*naming.Update, 0, len(wr.Events))
for _, e := range wr.Events {
var jupdate naming.Update
var err error
switch e.Type {
case etcd.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &jupdate)
jupdate.Op = naming.Add
case etcd.EventTypeDelete:
err = json.Unmarshal(e.PrevKv.Value, &jupdate)
jupdate.Op = naming.Delete
default:
continue
}
if err == nil {
updates = append(updates, &jupdate)
}
}
return updates, nil
}
func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// 获取前缀为gw.target的所有Key的值,放到现有数组里
resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
if gw.err = err; err != nil {
return nil, err
}
updates := make([]*naming.Update, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
var jupdate naming.Update
if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
continue
}
updates = append(updates, &jupdate)
}
opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
// watch 监听这些Key的变化,包括前缀相同的新Key的加入
gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
return updates, nil
}
func (gw *gRPCWatcher) Close() { gw.cancel() }
эта частьGRPCResolver
а такжеgRPCWatcher
Функция и роль каждого метода типа такие же, как иRoundRobin
Этот балансировщик gRPC относительно тесно интегрирован, и я собираюсь поместить его ниже и объяснить вместе с реализацией балансировки нагрузки в исходном коде.
балансировки нагрузки
Во-первых, давайте взглянем на определение интерфейса gRPC для балансировки нагрузки:
type Balancer interface {
Start(target string, config BalancerConfig) error
Up(addr Address) (down func(error))
Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
Notify() <-chan []Address
// Close shuts down the balancer.
Close() error
}
Вызывается, когда устанавливается соединение между клиентом gRPC и сервером.Dail
можно использовать методWithBalancer
метод вDiaplOption
Укажите компонент балансировки нагрузки в:
client, err := etcd.Client()
...
resolver := &naming.GRPCResolver{Client: client}
b := grpc.RoundRobin(resolver)
opt0 := grpc.WithBalancer(b)
grpc.Dial(target, opt0 , opt1, ...) // 后面省略了
В приведенном выше примере для реализации RoundRobin используется Balancer, поставляемый с gRPC.Помимо реализации интерфейса Balancer, RoundRobin имеет собственный встроенный Resolver для получения связанной с ним IP-информации из имени и событий обновления службы (добавление и удаление сервисных узлов). В приведенном выше примере указан RoundRobinEtcd
который предоставилname.GRPCResolver
В качестве синтаксического анализатора имен этот синтаксический анализатор имен упоминается в предыдущем разделе.Etcd
gRPC в комплектеnaming.Resolver
реализация интерфейса.
RoundRobin
Давайте посмотрим, что предусмотрено в пакете gRPC.RoundRobin
Реализация кода, в основном сосредоточенная на принципе реализации кода балансировки нагрузки и использовании Resolver для обнаружения служб и обновления узлов.
RoundRobin
Структура определяется следующим образом:
// 源码在:https://github.com/grpc/grpc-go/blob/v1.2.x/balancer.go
type roundRobin struct {
r naming.Resolver
w naming.Watcher
addrs []*addrInfo // 客户端可以尝试连接的所有地址
mu sync.Mutex
addrCh chan []Address // 用于通知gRPC内部的,客户端可连接地址的信道
next int // index of the next address to return for Get()
waitCh chan struct{} // the channel to block when there is no connected address available
done bool // The Balancer is closed.
}
- r — это анализатор имен, вы можете определить свой собственный анализатор имен, например, анализатор имен Etcd. Если r равно nil, параметр target в Dial будет напрямую добавлен к адресам как запрашиваемый адрес.
- w — это наблюдатель, возвращаемый методом Resolve преобразователя имен. Наблюдатель может отслеживать изменения адресной информации, отправляемой преобразователем имен, и уведомлять roundRobin о необходимости динамического добавления или удаления адресов в адресах.
- addrs — это массив адресной информации, полученный от именования.Каждый адрес в массиве имеет не только адресную информацию, но и флаг, указывающий, было ли создано соединение между gRPC и адресом в состоянии готовности.
- addrCh - Канал адресного массива, Канал будет вКаждыйПосле того, как преобразователь имен отправит изменения адресной информации, онвсеОб обновлении адреса сообщается lbWatcher внутри gRPC.lbWatcher — это сопрограмма, которая унифицированным образом управляет состоянием подключения адреса и отвечает за подключение нового адреса и операцию закрытия удаленного адреса.
- Следующим идет индекс циклического перебора, то есть место, где в массиве адресов было пройдено циклическое планирование.
- waitCh — когда адрес в addrs пуст, вызовы grpc
Get()
Метод надеется получить соединение с целью. Если для отказоустойчивости gRPC установлено значение false, тоGet()
Метод будет блокироваться на этом канале до тех пор, пока не будет готово соединение.
Начать круговой алгоритм
Запуск RoundRobin заключается в реализации метода Start интерфейса Balancer, который передается с самого началаgrpc.WithBalancer
балансировщик нагрузки, назначенныйBalancerWrapperBuilder
при созданииBalancerWrapper
Запускается, когда:
func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
// 这里触发Balancer的Start方法
bwb.b.Start(opts.Target.Endpoint, BalancerConfig{
DialCreds: opts.DialCreds,
Dialer: opts.Dialer,
})
_, pickfirst := bwb.b.(*pickFirst)
bw := &balancerWrapper{
......
}
cc.UpdateBalancerState(connectivity.Idle, bw)
go bw.lbWatcher() // 监听Balancer 通知过来的地址变化
return bw
}
Основная функция метода Start — передать синтаксический анализатор имен RoundRobin.Resolve
метод для получения внутренних изменений, которые отслеживают анализатор именWatcher
. В то же время новыйaddrChan
использовал кgRPC
ВнутреннийlbWatcher
толкатьWatcher
Отслеживание изменений адреса.
func (rr *roundRobin) Start(target string, config BalancerConfig) error {
rr.mu.Lock()
defer rr.mu.Unlock()
if rr.done {
return ErrClientConnClosing
}
if rr.r == nil {
// 如果没有解析器,那么直接将target加入addrs地址数组
rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
return nil
}
// Resolve接口会返回一个watcher,watcher可以监听解析器的地址变化
w, err := rr.r.Resolve(target)
if err != nil {
return err
}
rr.w = w
// 创建一个channel,当watcher监听到地址变化时,通知grpc内部lbWatcher去连接该地址
rr.addrCh = make(chan []Address, 1)
// go 创建新协程监听watcher,监听地址变化。
go func() {
for {
if err := rr.watchAddrUpdates(); err != nil {
return
}
}
}()
return nil
}
После создания addrCh горутина будет запущена в конце метода Start, и эта горутина будет вызываться в цикле.watchAddrUpdates
Запрос, если есть именованный преобразовательWatcher
Прошли обновления.
Мониторинг обновлений адресов серверов
существуетwatchAddrUpdates
Этот метод заключается в прослушивании через метод Next Resolver Watcher, созданный в методе Start выше.Etcd
Обновление внутреннего сервисного узла, реализация этого Watcher предоставляется в пакете Etcd, упомянутом в главе об обнаружении сервисов выше.gRPCWatcher
Типа, его метод Next прослушает изменение ключа, состоящего из имени службы на Etcd, и затем передаст эту информацию вышеприведенному.Start
созданный в методеaddrChan
ряд.
func (rr *roundRobin) watchAddrUpdates() error {
// watcher的next方法会阻塞,直至有地址变化信息过来,updates即为变化信息
updates, err := rr.w.Next()
if err != nil {
return err
}
// 对于addrs地址数组的操作,显然是要加锁的,因为有多个goroutine在同时操作
rr.mu.Lock()
defer rr.mu.Unlock()
for _, update := range updates {
addr := Address{
Addr: update.Addr,
Metadata: update.Metadata,
}
switch update.Op {
case naming.Add:
//对于新增类型的地址,注意这里不会重复添加。
var exist bool
for _, v := range rr.addrs {
if addr == v.addr {
exist = true
break
}
}
if exist {
continue
}
rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
case naming.Delete:
//对于删除的地址,直接在addrs中删除就行了
for i, v := range rr.addrs {
if addr == v.addr {
copy(rr.addrs[i:], rr.addrs[i+1:])
rr.addrs = rr.addrs[:len(rr.addrs)-1]
break
}
}
default:
grpclog.Errorln("Unknown update.Op ", update.Op)
}
}
// 这里复制了整个addrs地址数组,然后丢到addrCh channel中通知grpc内部lbWatcher,
// lbWatcher会关闭删除的地址,连接新增的地址。
// 连接ready后会有专门的goroutine调用Up方法修改addrs中地址的状态。
open := make([]Address, len(rr.addrs))
for i, v := range rr.addrs {
open[i] = v.addr
}
if rr.done {
return ErrClientConnClosing
}
select {
case <-rr.addrCh:
default:
}
rr.addrCh <- open
return nil
}
установить соединение
Метод Up внутренне балансирует нагрузку с помощью gRPC.watcher
называетсяwatcher
Он прочитает глобальную очередь состояния соединения и изменит состояние соединения в списке соединений, поддерживаемом RoundRobin (отдельная горутина инициирует попытку соединения с целевой службой, и состояние соединения объекта соединения будет изменено на соединение после попытки Соединение в состоянии «подключено» вызовет метод Up, чтобы изменить состояние адреса в массиве адресов addrs насвязанный.
func (rr *roundRobin) Up(addr Address) func(error) {
rr.mu.Lock()
defer rr.mu.Unlock()
var cnt int
//将地址数组中的addr置为已连接状态,这样这个地址就可以被client使用了。
for _, a := range rr.addrs {
if a.addr == addr {
if a.connected {
return nil
}
a.connected = true
}
if a.connected {
cnt++
}
}
// 当有一个可用地址时,之前可能是0个,可能要很多client阻塞在获取连接地址上,这里通知所有的client有可用连接啦。
// 为什么只等于1时通知?因为可用地址数量>1时,client是不会阻塞的。
if cnt == 1 && rr.waitCh != nil {
close(rr.waitCh)
rr.waitCh = nil
}
//返回禁用该地址的方法
return func(err error) {
rr.down(addr, err)
}
}
закрыть соединение
Метод Down используется для закрытия соединения.Этот метод прост, просто найдите адрес и установите его как недоступный.
func (rr *roundRobin) down(addr Address, err error) {
rr.mu.Lock()
defer rr.mu.Unlock()
for _, a := range rr.addrs {
if addr == a.addr {
a.connected = false
break
}
}
}
Клиент получает соединение
клиент звонитgRPC
конкретныйMethod
изInvoke
метод, пойдетRoundRobin
Если адрес пуст или адрес в адресе недоступен,Get()
Метод возвращает ошибку. Но если установитьfailfast = false
,Get()
метод заблокируетwaitCh
на этом канале доUp
Метод уведомляется, а затем опрашивает доступные адреса для планирования.
func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
var ch chan struct{}
rr.mu.Lock()
if rr.done {
rr.mu.Unlock()
err = ErrClientConnClosing
return
}
if len(rr.addrs) > 0 {
// addrs的长度可能变化,如果next值超出了,就置为0,从头开始调度。
if rr.next >= len(rr.addrs) {
rr.next = 0
}
next := rr.next
//遍历整个addrs数组,直到选出一个可用的地址
for {
a := rr.addrs[next]
// next值加一,当然是循环的,到len(addrs)后,变为0
next = (next + 1) % len(rr.addrs)
if a.connected {
addr = a.addr
rr.next = next
rr.mu.Unlock()
return
}
if next == rr.next {
// 遍历完一圈了,还没找到,走下面逻辑
break
}
}
}
if !opts.BlockingWait { //如果是非阻塞模式,如果没有可用地址,那么报错
if len(rr.addrs) == 0 {
rr.mu.Unlock()
err = status.Errorf(codes.Unavailable, "there is no address available")
return
}
// Returns the next addr on rr.addrs for failfast RPCs.
addr = rr.addrs[rr.next].addr
rr.next++
rr.mu.Unlock()
return
}
// Wait on rr.waitCh for non-failfast RPCs.
// 如果是阻塞模式,那么需要阻塞在waitCh上,直到Up方法给通知
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
} else {
ch = rr.waitCh
}
rr.mu.Unlock()
for {
select {
case <-ctx.Done():
err = ctx.Err()
return
case <-ch:
rr.mu.Lock()
if rr.done {
rr.mu.Unlock()
err = ErrClientConnClosing
return
}
if len(rr.addrs) > 0 {
if rr.next >= len(rr.addrs) {
rr.next = 0
}
next := rr.next
for {
a := rr.addrs[next]
next = (next + 1) % len(rr.addrs)
if a.connected {
addr = a.addr
rr.next = next
rr.mu.Unlock()
return
}
if next == rr.next {
// 遍历完一圈了,还没找到,可能刚Up的地址被down掉了,重新等待。
break
}
}
}
// The newly added addr got removed by Down() again.
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
} else {
ch = rr.waitCh
}
rr.mu.Unlock()
}
}
}
Суммировать
весьgRPC
на основеEtcd
Процесс реализации регистрации/обнаружения сервисов и балансировки нагрузки, а также реализация ключевого исходного кода были разобраны.На самом деле, детали реализации исходного кода намного сложнее, чем перечисленные здесь.Цель этой статьи – зафиксировать нагрузка на обучение и практику gRPC Некоторые критические пути во время балансировки и разрешения услуг. Кроме того, следует отметить, что в этой статье используется код gRPC v1.2.x.После версии 1.3 в официальном пакете была изменена директория и имя пакета, которое немного отличается от исходного кода, указанного в эта статья и использование Balancer, но принцип остается примерно таким же, за исключением того, что каждое издание развивается на этой основе.
Смотрите здесь, если вам понравилась моя статья, вы можете поставить мне лайк, я буду делиться своими знаниями и практическим опытом из первых рук в технических статьях каждую неделю, спасибо за вашу поддержку. Ищите в WeChat и следите за официальной учетной записью «Управление сетью Наоби Нао», чтобы получать сообщения о моих статьях как можно скорее.