Реализация балансировки нагрузки GRPC

gRPC
Реализация балансировки нагрузки GRPC

Управляемое чтение

gRPC — это очень хороший фреймворк RPC с открытым исходным кодом от Google, который поддерживает такие языки программирования, как PYTHON/JAVA/PHP/GO/C/C++/C#/NODEJS/RUBY, что очень удобно для межъязыковых вызовов.

В производственной среде обычно развертывают несколько служб RPC для повышения доступности и скорости отклика. но С точки зрения балансировки нагрузки он не так богат, как компоненты dubbo, но он предоставляет интерфейс для обнаружения сервисов.Реализуя его интерфейс, можно гибко реализовать функцию балансировки нагрузки.

Следующее может быстро реализовать функцию балансировки нагрузки, зарегистрировав доступные службы при запуске через локальный файл конфигурации.

grpc:
  hosts: host1:8080,host2:8080

Создание канала GRPC

ManagedChannelBuilder
                // 设置连接的目标地址
                .forTarget("local")
                // 设置地址服务
                .nameResolverFactory(new LocalNameResolverProvider(configInterface))
                .enableRetry()
                .maxRetryAttempts(5)
                .keepAliveTime(5, TimeUnit.MINUTES)
                .keepAliveWithoutCalls(true)
                .keepAliveTimeout(10, TimeUnit.MINUTES)
                .idleTimeout(24, TimeUnit.HOURS)
                // 设置轮询策略
                .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
                .usePlaintext()
                .build();
  • forTarget задает адрес для подключения к службе RPC, например «127.0.0.1:8080».
  • Обнаружение службы nameResolverFactory
  • Поставщик услуг LocalNameResolverProvider
  • loadBalancerFactory устанавливает стратегию балансировки нагрузки
  • Стратегия опроса RoundRobinLoadBalancerFactory

Метод конфигурации nameResolverFactory

  • клиент GRPC черезloadBalancerFactoryЧтобы установить стратегию балансировки нагрузки, выберите здесьRoundRobinLoadBalancerFactory, политика опроса службы.

  • пройти черезnameResolverFactoryНастройте метод обнаружения службы адресов,NameResolverProviderДля достижения службы регистрации и обнаружения.

  • класс поставщика услуг

// 需要实现NameResolverProvider抽象类中的相关方法
public class LocalNameResolverProvider extends NameResolverProvider {
    private final ConfigInterface configInterface;

    @Inject
    public LocalNameResolverProvider(ConfigInterface configInterface) {
        this.configInterface = configInterface;
    }

    // 服务是否可用
    @Override
    protected boolean isAvailable() {
        return true;
    }

    // 优先级默认5
    @Override
    protected int priority() {
        return 5;
    }

    // 服务发现类
    @Nullable
    @Override
    public NameResolver newNameResolver(URI targetUri, Attributes params) {
        return new LocalNameResolver(configInterface);
    }

    // 服务协议
    @Override
    public String getDefaultScheme() {
        return "local";
    }
}

  • класс обнаружения службы
public class LocalNameResolver extends NameResolver {
    private final ConfigInterface configInterface;

    @Inject
    public LocalNameResolver(ConfigInterface configInterface) {
        this.configInterface = configInterface;
    }

    @Override
    public String getServiceAuthority() {
        return "none";
    }

    // 配置可用服务,RPC在调用的时候,轮询选择这里配置的可用的服务地址列表
    @Override
    public void start(Listener listener) {
        LogUtils.info("LocalNameResolver start ...");
        ArrayList<EquivalentAddressGroup> addressGroups = new ArrayList<EquivalentAddressGroup>();
        // 获取rpc地址的配置列表
        // 地址格式 host1:8080,host2:8081
        Map<String, Object> config = (Map<String, Object>) this.configInterface.getRpcConfig().get("grpc");
        String[] hosts = config.get("hosts").toString().split(",");
        for (String host : hosts) {
            if (host.trim().length() > 0) {
                String[] address = host.split(":");
                List<SocketAddress> socketAddresses = new ArrayList<SocketAddress>();
                socketAddresses.add(new InetSocketAddress(address[0], Integer.parseInt(address[1])));
                addressGroups.add(new EquivalentAddressGroup(socketAddresses));
            }
        }
        listener.onAddresses(addressGroups, Attributes.EMPTY);
    }

    @Override
    public void shutdown() {

    }
}
  • В приведенной выше конфигурации при совершении вызова RPC будет опрошен и выбран зарегистрированный адрес службы.
  • Добавление нового доступного сервисного узла требует обновления файла конфигурации.

Динамическая регистрация и обнаружение

  • При запуске службы GRPC зарегистрируйте адрес службы в zookeeper.
  • выполнитьNameResolverstart, слушайте изменения в zookeeper и обновляйте список доступных адресов в режиме реального времени.