Netty + ZooKeeper реализует простую регистрацию и обнаружение сервисов

ZooKeeper Netty

换个角度看世界.jpg

I. Предыстория

Недавний проект: после того, как наша система получит задачу отправки заказа из вышестоящей системы, она передаст ее соответствующему оборудованию назначенного магазина и выполнит соответствующую бизнес-обработку.

2. Использование Нетти

После получения задачи отправить один, указываемый Netty, нажатой для хранения связанного оборудования. В нашей системе Netty Message Push для достижения длительного механизма подключения и сердцебиения.

Netty+ZK.png

2.1 Сторона Netty-сервера:

Каждый сервер Netty сохраняет clientId клиента и SocketChannel, к которому он подключен через ConcurrentHashMap.

Когда сервер отправляет сообщение клиенту, ему нужно только получить SocketChannel, соответствующий clientId, и записать соответствующее сообщение в SocketChannel.

        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .option(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline p = channel.pipeline();
                        p.addLast(new MessageEncoder());
                        p.addLast(new MessageDecoder());
                        p.addLast(new PushServerHandler());
                    }
                });

        ChannelFuture future = bootstrap.bind(host,port).sync();
        if (future.isSuccess()) {
            logger.info("server start...");
        }

2.2 Нетти-клиент:

Клиент используется для получения сообщений от сервера, а затем для выполнения бизнес-процессов. У клиента также есть механизм сердцебиения, который периодически отправляет сообщения Ping на сервер через событие IdleEvent, чтобы определить, прерван ли SocketChannel.

    public PushClientBootstrap(String host, int port) throws InterruptedException {

        this.host = host;
        this.port = port;

        start(host,port);
    }

    private void start(String host, int port) throws InterruptedException {

        bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .group(workGroup)
                .remoteAddress(host, port)
                .handler(new ChannelInitializer(){

                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline p = channel.pipeline();
                        p.addLast(new IdleStateHandler(20, 10, 0));  // IdleStateHandler 用于检测心跳
                        p.addLast(new MessageDecoder());
                        p.addLast(new MessageEncoder());
                        p.addLast(new PushClientHandler());
                    }
                });
        doConnect(port, host);
    }

    /**
     * 建立连接,并且可以实现自动重连.
     * @param port port.
     * @param host host.
     * @throws InterruptedException InterruptedException.
     */
    private void doConnect(int port, String host) throws InterruptedException {

        if (socketChannel != null && socketChannel.isActive()) {
            return;
        }

        final int portConnect = port;
        final String hostConnect = host;

        ChannelFuture future = bootstrap.connect(host, port);

        future.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (futureListener.isSuccess()) {
                    socketChannel = (SocketChannel) futureListener.channel();
                    logger.info("Connect to server successfully!");
                } else {
                    logger.info("Failed to connect to server, try connect after 10s");

                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                doConnect(portConnect, hostConnect);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        }).sync();
    }

3. Простая регистрация и обнаружение сервисов с помощью ZooKeeper

3.1 Регистрация службы

Регистрация службы в основном предназначена для разделения поставщиков услуг и потребителей услуг. Регистрация службы — это высокодоступный и строго согласованный репозиторий обнаружения служб, который в основном используется для хранения соответствия между API-интерфейсами служб и адресами. Для обеспечения высокой доступности реестр служб обычно представляет собой кластер и может обеспечить распределенную согласованность. В настоящее время обычно используются ZooKeeper, Etcd и т. д.

ZooKeeper используется в нашем проекте для реализации регистрации сервиса.

public class ServiceRegistry {

    private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);

    private CountDownLatch latch = new CountDownLatch(1);

    private String registryAddress;

    public ServiceRegistry(String registryAddress) {
        this.registryAddress = registryAddress;
    }

    public void register(String data) {
        if (data != null) {
            ZooKeeper zk = connectServer();
            if (zk != null) {
                createNode(zk, data);
            }
        }
    }

    /**
     * 连接 zookeeper 服务器
     * @return
     */
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            latch.await();
        } catch (IOException | InterruptedException e) {
            logger.error("", e);
        }
        return zk;
    }

    /**
     * 创建节点
     * @param zk
     * @param data
     */
    private void createNode(ZooKeeper zk, String data) {
        try {
            byte[] bytes = data.getBytes();
            String path = zk.create(Constants.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            logger.debug("create zookeeper node ({} => {})", path, data);
        } catch (KeeperException | InterruptedException e) {
            logger.error("", e);
        }
    }
}

При регистрации службы после запуска сервера Netty IP-адрес и порт сервера Netty регистрируются в ZooKeeper.

        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .option(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline p = channel.pipeline();
                        p.addLast(new MessageEncoder());
                        p.addLast(new MessageDecoder());
                        p.addLast(new PushServerHandler());
                    }
                });

        ChannelFuture future = bootstrap.bind(host,port).sync();
        if (future.isSuccess()) {
            logger.info("server start...");
        }

        if (serviceRegistry != null) {
            serviceRegistry.register(host + ":" + port);
        }

3.2 Обнаружение службы

Здесь мы используем обнаружение службы на стороне клиента, то есть механизм обнаружения службы реализуется клиентом.

Клиент получает адрес сервера, запрашивая реестр до установления соединения с сервером. Если есть несколько серверов Netty, вы можете выполнить балансировку нагрузки службы. В нашем проекте для загрузки используется только простой случайный метод.

public class ServiceDiscovery {

    private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);

    private CountDownLatch latch = new CountDownLatch(1);

    private volatile List<String> serviceAddressList = new ArrayList<>();

    private String registryAddress; // 注册中心的地址

    public ServiceDiscovery(String registryAddress) {
        this.registryAddress = registryAddress;

        ZooKeeper zk = connectServer();
        if (zk != null) {
            watchNode(zk);
        }
    }

    /**
     * 通过服务发现,获取服务提供方的地址
     * @return
     */
    public String discover() {
        String data = null;
        int size = serviceAddressList.size();
        if (size > 0) {
            if (size == 1) {  //只有一个服务提供方
                data = serviceAddressList.get(0);
                logger.info("unique service address : {}", data);
            } else {          //使用随机分配法。简单的负载均衡法
                data = serviceAddressList.get(ThreadLocalRandom.current().nextInt(size));
                logger.info("choose an address : {}", data);
            }
        }
        return data;
    }

    /**
     * 连接 zookeeper
     * @return
     */
    private ZooKeeper connectServer() {

        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            latch.await();
        } catch (IOException | InterruptedException e) {
            logger.error("", e);
        }
        return zk;
    }

    /**
     * 获取服务地址列表
     * @param zk
     */
    private void watchNode(final ZooKeeper zk) {

        try {
            //获取子节点列表
            List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        //发生子节点变化时再次调用此方法更新服务地址
                        watchNode(zk);
                    }
                }
            });
            List<String> dataList = new ArrayList<>();
            for (String node : nodeList) {
                byte[] bytes = zk.getData(Constants.ZK_REGISTRY_PATH + "/" + node, false, null);
                dataList.add(new String(bytes));
            }
            logger.debug("node data: {}", dataList);
            this.serviceAddressList = dataList;
        } catch (KeeperException | InterruptedException e) {
            logger.error("", e);
        }
    }
}

После запуска клиента Netty получите IP-адрес и порт сервера Netty с помощью службы обнаружения.

    /**
     * 支持通过服务发现来获取 Socket 服务端的 host、port
     * @param discoveryAddress
     * @throws InterruptedException
     */
    public PushClientBootstrap(String discoveryAddress) throws InterruptedException {

        serviceDiscovery = new ServiceDiscovery(discoveryAddress);
        serverAddress = serviceDiscovery.discover();

        if (serverAddress!=null) {
            String[] array = serverAddress.split(":");
            if (array!=null && array.length==2) {

                String host = array[0];
                int port = Integer.parseInt(array[1]);

                start(host,port);
            }
        }
    }

4. Резюме

Регистрация и обнаружение сервисов всегда были основными компонентами распространения. В этой статье рассказывается, как реализовать простую регистрацию и обнаружение службы с помощью ZooKeeper в качестве реестра. На самом деле существует множество вариантов реестра, таких как Etcd, Eureka и так далее. Важно выбрать тот, который отвечает потребностям нашего бизнеса.


Стек технологий Java и Android: еженедельно обновляйте и публикуйте оригинальные технические статьи, добро пожаловать, чтобы отсканировать QR-код общедоступной учетной записи ниже и подписаться, и с нетерпением ждем роста и развития вместе с вами.