Управляемое чтение
gRPC — это высокопроизводительная инфраструктура RPC общего назначения с открытым исходным кодом. Она в основном разработана Google для разработки мобильных приложений и разработана на основе стандарта протокола HTTP/2. Она разработана на основе протокола сериализации ProtoBuf (Protocol Buffers). и поддерживает множество языков разработки.
Благодаря механизму подключаемых модулей gRPC может гибко реализовывать балансировку нагрузки, цепочку вызовов, проверку работоспособности, аутентификацию полномочий и другие модули.В этой статье в основном рассказывается, как реализовать функцию балансировки нагрузки через интерфейс, определенный gRPC.
решение для балансировки нагрузки
Помимо решения проблемы межъязыкового вызова и развязки модулей, служба RPC может расширять узлы службы RPC по горизонтали за счет микрослужбы модулей.Прикладной уровень может вызывать несколько служб асинхронно, чтобы уменьшить задержку прикладного уровня. .
Поскольку службы RPC не имеют состояния, машины можно добавлять горизонтально, чтобы расширить возможности их службы. Но как воспользоваться несколькими узлами?
Обычно балансировку нагрузки (LB) можно добавить перед RPC, а соответствующий сервисный узел — после LB. Прикладной уровень напрямую обращается к LB, а узел RPC скрыт за LB и невидим для прикладного уровня. Это преимущество очевидно, поскольку адрес и порт LB предоставляются прикладному уровню, прикладному уровню не нужно заботиться об алгоритме LB, что значительно снижает сложность вызова приложения. Этот метод является лучшим решением для приложений с коротким подключением, таких как Интернет, поскольку при повторном подключении LB-соединения можно повторно выбрать серверную службу.
Однако это большая проблема для длинных сервисов.
Например, длиннолинковое приложение A подключается к LB, а LB случайным образом перенаправляется на серверную RPC-службу B. Поскольку A является длиннолинейным, все последующие запросы будут идти к B, поэтому он не может играть роли в нагрузке. балансировка. Вы можете подумать, что лучше поддерживать длительное соединение со службой RPC, вместо того, чтобы подключать и разрывать соединение каждый раз, когда он вызывается. gRPC не предоставляет такие компоненты балансировки нагрузки, но предоставляет интерфейс балансировки нагрузки, если он расширяется.NameResolverProvider
class, реализовать метод интерфейса, вы можете легко реализовать модуль балансировки нагрузки.
Основные сведения о балансировке нагрузки gRPC см.nuggets.capable/post/684490…
Ниже описано, как реализовать балансировку нагрузки NameResolver через zookeeper.
Реализация ZkNameResolverProvider
public class ZkNameResolverProvider extends NameResolverProvider {
@Override
protected boolean isAvailable() {
return true;
}
@Override
protected int priority() {
return 5;
}
@Nullable
@Override
public NameResolver newNameResolver(URI targetUri, Attributes params) {
return new ZkNameResolver(targetUri);
}
@Override
public String getDefaultScheme() {
return "zk";
}
}
Реализация ZkNameResolver
public class ZkNameResolver extends NameResolver implements Watcher {
private URI zkUri;
private ZooKeeper zoo;
private Listener listener;
private final int ZK_CONN_TIMEOUT = 3000;
private final String ZK_PATH = "/grpc_server_list";
ZkNameResolver(URI zkUri) {
this.zkUri = zkUri;
}
@Override
public String getServiceAuthority() {
return zkUri.getAuthority();
}
@Override
public void start(Listener listener) {
this.listener = listener;
final CountDownLatch latch = new CountDownLatch(1);
String zkAddr = zkUri.getHost() + ":" + zkUri.getPort();
System.out.printf("connect to zookeeper server %s", zkAddr);
try {
this.zoo = new ZooKeeper(zkAddr, ZK_CONN_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
} catch (IOException e) {
System.out.printf(e);
System.out.printf("connect to zookeeper failed, JVM exited [%s]", e.getMessage());
System.exit(1);
}
try {
latch.await();
System.out.printf("connect to zookeeper succeed");
} catch (InterruptedException e) {
System.out.printf(e);
System.out.printf("CountDownLatch interrupted, JVM exited [%s]", e.getMessage());
System.exit(1);
}
try {
Stat stat = zoo.exists(ZK_PATH, true);
if (stat == null) {
System.out.printf("%s not exists", ZK_PATH);
} else {
System.out.printf("%s exists", ZK_PATH);
}
} catch (KeeperException | InterruptedException e) {
System.out.printf(e);
}
try {
List<String> children = zoo.getChildren(ZK_PATH, this);
addServersToListener(children);
} catch (KeeperException | InterruptedException e) {
System.out.printf(e);
System.out.printf("get children of %s failed [%s], JVM exited", ZK_PATH, e.getMessage());
System.exit(1);
}
}
// 把zookeeper ZK_PATH的子节点作为rpc的节点地址,注册到gRPC负载均衡服务中
private void addServersToListener(List<String> servers) {
System.out.printf("rpc servers:%s", servers);
ArrayList<EquivalentAddressGroup> addressGroups = new ArrayList<EquivalentAddressGroup>();
for (String server : servers) {
List<SocketAddress> socketAddresses = new ArrayList<SocketAddress>();
String[] address = server.split(":");
socketAddresses.add(new InetSocketAddress(address[0], Integer.parseInt(address[1])));
addressGroups.add(new EquivalentAddressGroup(socketAddresses));
}
if (addressGroups.size() > 0) {
listener.onAddresses(addressGroups, Attributes.EMPTY);
} else {
System.out.printf("No servers find, keep looking");
}
}
@Override
public void shutdown() {
try {
zoo.close();
} catch (InterruptedException e) {
System.out.printf(e);
}
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None) {
System.out.printf("Zookeeper connection expired");
} else {
try {
List<String> children = zoo.getChildren(ZK_PATH, false);
addServersToListener(children);
zoo.getChildren(ZK_PATH, this);
} catch (Exception e) {
System.out.printf(e);
}
}
}
}
- Используйте host:port службы rpc в качестве пути к zookeeper (
/grpc_server_list/rpc_host:50010
), zookeeper прослушивает такие события, как создание и удаление путей - Когда узел rpc находится в сети или в автономном режиме, динамически добавляйте или удаляйте информацию об узле из zookeeper.
- пройти через
listener.onAddresses
Зарегистрируйте адрес службы rpc в подсистеме балансировки нагрузки gRPC.
создание канала
this.channel = ManagedChannelBuilder
// 配置zk地址
.forTarget("zk://zkhost:2181")
// 配置NameResolverProvider实现类
.nameResolverFactory(new ZkNameResolverProvider())
.enableRetry()
.maxRetryAttempts(5)
.keepAliveTime(5, TimeUnit.MINUTES)
.keepAliveWithoutCalls(true)
.keepAliveTimeout(10, TimeUnit.MINUTES)
.idleTimeout(24, TimeUnit.HOURS)
// 轮询策略
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.usePlaintext()
.build();
-
forTarget("zk://zkhost:2181")
Настройте адрес ссылки zookeeper -
nameResolverFactory(new ZkNameResolverProvider())
настроитьNameResolverProvider
Класс реализации для пропуска gRPCZkNameResolverProvider
Найти доступные адреса сервисных узлов - Вызов службы rpc в соответствии с конфигурацией
loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
Стратегия опрашивает, чтобы вызвать соответствующую серверную службу.
Суммировать
gRPC предоставляет очень гибкий интерфейс балансировки нагрузки, который позволяет легко добиться балансировки нагрузки. Благодаря настраиваемому механизму балансировки нагрузки вызывающая сторона может поддерживать длинную связь с каждым RPC, что значительно увеличивает сетевые издержки RPC, и опрашивает каждую службу RPC, чтобы повысить скорость отклика RPC.
Через механизм просмотра зоопарка можно прослушивать определенный путь (/grpc_server_list
) добавляются или удаляются дочерние узлы, а регистрация сервисов может быть реализована динамически в автономном режиме, что значительно повышает удобство горизонтального расширения серверных сервисов.