С нулевой одиночной строки используйте Netty для создания чата для обмена мгновенными сообщениями~

Java задняя часть
С нулевой одиночной строки используйте Netty для создания чата для обмена мгновенными сообщениями~

В этой статье приведен полный пример кода, см.GitHub.com/Y u — День V/SP…изlab-67содержание.

Оригинальность не так проста, дай мне немногоStarЭй, пошли утку!

пс: Количество слов в печенке слишком много, некоторые коды конвертируются в картинки, толстые друзья могут по ссылке отправить на гейхаб для проверки конкретного кода ==

1 Обзор

существует«Введение в Spring Boot WebSocket»В этой статье мы используем WebSocket для реализации простой функции обмена мгновенными сообщениями, которая поддерживает аутентификацию личности, сообщения частного чата и сообщения группового чата.

Затем есть толстые друзья, которые отправляют личные сообщения, надеясь использовать чистую Netty для достижения аналогичной функции. Совесть, конечно, красной карточки ей не даст, поэтому и есть эта статья. Там могут быть толстые друзья, которые не знают, что такое Нетти.Вот краткое введение:

Netty — это фреймворк с открытым исходным кодом для Java.

Netty предоставляет платформу асинхронных, управляемых событиями сетевых приложений и инструменты для быстрой разработки высокопроизводительных и надежных сетевых серверных и клиентских программ.

Другими словами, Netty представляет собой клиентскую и серверную среду программирования на основе NIO.Использование Netty может гарантировать, что вы сможете быстро и легко разработать сетевое приложение, такое как клиентское и серверное приложение, которое реализует определенный протокол.

Netty значительно упрощает и оптимизирует процесс программирования и разработки сетевых приложений, таких как разработка служб TCP и UDP Socket.

Далее создадим три новых проекта, как показано на следующем рисунке:

三个项目

  • lab-67-netty-demo-serverПроект: Построить сервер Netty.
  • lab-67-netty-demo-clientПроект: Создание клиента Netty.
  • lab-67-netty-demo-commonПроект: обеспечивает базовую инкапсуляцию Netty, а также предоставляет функции кодирования, декодирования и распространения сообщений.

Кроме того, мы также приведем примеры часто используемых функций Netty:

  • Механизм сердцебиения реализует обнаружение выживания сервера для клиента.
  • Отключение и повторное подключение для реализации повторного подключения клиента к серверу.

Не пищать, просто начните сушить.

Дружеское напоминание: толстые друзья могут волноваться, нельзя ли читать эту статью без фонда Netty? !

Мысли, смотри! Только посмотрите на него хард, по коду можете собрать сами.В конце статьи предоставлю волну НеттиБазаСтатья о начале работы.

2. Создайте сервер Netty и Client

В этой статье приведен полный пример кода, см.GitHub.com/Y u — День V/SP…изlab-67содержание.

Оригинальность не так проста, дай мне немногоStarЭй, пошли утку!

В этом разделе мы сначала используем Netty для сборки основного кода сервера и клиента, чтобы толстые друзья имели начальное представление о коде проекта.

2.1 Сервер сборки Netty Build

Создайтеlab-67-netty-demo-serverпроект, построить NettyСервер. Как показано ниже:

项目结构

Ниже мы рассмотрим толькоserverКод под пакетом избегает лишней информации и ломает лысину толстых друзей.

2.1.1 NettyServer

СоздайтеNettyServer类,Netty 服务端。 код показывает, как показано ниже:

@Component
public class NettyServer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${netty.port}")
    private Integer port;

    @Autowired
    private NettyServerHandlerInitializer nettyServerHandlerInitializer;

    /**
     * boss 线程组,用于服务端接受客户端的连接
     */
    private EventLoopGroup bossGroup = new NioEventLoopGroup();
    /**
     * worker 线程组,用于服务端接受客户端的数据读写
     */
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    /**
     * Netty Server Channel
     */
    private Channel channel;

    /**
     * 启动 Netty Server
     */
    @PostConstruct
    public void start() throws InterruptedException {
        // <2.1> 创建 ServerBootstrap 对象,用于 Netty Server 启动
        ServerBootstrap bootstrap = new ServerBootstrap();
        // <2.2> 设置 ServerBootstrap 的各种属性
        bootstrap.group(bossGroup, workerGroup) // <2.2.1> 设置两个 EventLoopGroup 对象
                .channel(NioServerSocketChannel.class)  // <2.2.2> 指定 Channel 为服务端 NioServerSocketChannel
                .localAddress(new InetSocketAddress(port)) // <2.2.3> 设置 Netty Server 的端口
                .option(ChannelOption.SO_BACKLOG, 1024) // <2.2.4> 服务端 accept 队列的大小
                .childOption(ChannelOption.SO_KEEPALIVE, true) // <2.2.5> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能
                .childOption(ChannelOption.TCP_NODELAY, true) // <2.2.6> 允许较小的数据包的发送,降低延迟
                .childHandler(nettyServerHandlerInitializer);
        // <2> 绑定端口,并同步等待成功,即启动服务端
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            channel = future.channel();
            logger.info("[start][Netty Server 启动在 {} 端口]", port);
        }
    }

    /**
     * 关闭 Netty Server
     */
    @PreDestroy
    public void shutdown() {
        // <3.1> 关闭 Netty Server
        if (channel != null) {
            channel.close();
        }
        // <3.2> 优雅关闭两个 EventLoopGroup 对象
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

}

🔥 ① В классе добавить@ComponentАннотация, создание NettyServer передано в управление Spring.

  • portсвойство, читатьapplication.ymlфайл конфигурацииnetty.portэлемент конфигурации.
  • #start()метод, добавить@PostConstructПримечания, запустите сервер Netty.
  • #shutdown()метод, добавить@PreDestroyОбратите внимание, выключите сервер Netty.

🔥 ② Давайте рассмотрим поближе#start()Код метода, как реализовать запуск Netty Server.

<2.1>разместить, создатьServerBootstrapкласс, предоставленный NettyсерверНачать занятия, помогите нам инициализировать сервер.

<2.2>где задайте различные свойства ServerBootstrap.

Дружеское напоминание: здесь задействовано много знаний о компонентах Netty, я сначала опишу их простым языком, а затем проследую за статьей об основах Netty в конце статьи.Дополнительное обучение.

<2.2.1>место, звоните#group(EventLoopGroup parentGroup, EventLoopGroup childGroup)метод, установленный с помощьюbossGroupиworkerGroup. в:

  • bossGroupАтрибуты:BossГруппа потоков, чтобы сервер мог принятьсоединять.
  • workerGroupАтрибуты:WorkerГруппа потоков, чтобы сервер мог принятьчтение и запись данных.

Netty использует многопоточную модель multi-Reactor, которую может принять сервер.БолееВозможность клиента читать и записывать данные. Причина в том, что:

  • Создан специально для приемаПодключение клиентаизbossGroupГруппа потоков не влияет на подключение новых клиентов из-за частого чтения и записи данных подключенных клиентов.
  • Создан для полученияКлиент прочитал и писатьизworkerGroupгруппа тем,несколькоПотоки читают и записывают данные на стороне клиента и могут поддерживать больше клиентов.

Занятия после школы: Заинтересованные толстые друзья, вы можете посмотреть позже«[Серия NIO] - Модель реактора»статья.

<2.2.2>место, звоните#channel(Class<? extends C> channelClass)метод, установленный с помощьюNioServerSocketChannelкласс, который является классом реализации TCP-сокета сервера NIO, определенным Netty.

<2.2.3>место, звоните#localAddress(SocketAddress localAddress)метод установки серверапорт.

<2.2.4>место, звонитеoption#(ChannelOption<T> option, T value)Метод, настроить сервер на прием клиентаочередь соединенийразмер. Поскольку установление TCP-соединения представляет собой трехстороннее рукопожатие, после завершения первого рукопожатия оно будет добавлено в очередь соединений сервера.

Упражнения после уроков: для получения дополнительной информации вы можете просмотреть ее позже.«О невыполненных параметрах TCP Socket»статья.

<2.2.5>место, звоните#childOption(ChannelOption<T> childOption, T value)метод TCP Keepalive, который реализует TCP-уровеньсердцебиениеФункции.

Упражнения после уроков: для получения дополнительной информации вы можете просмотреть ее позже.«Механизм TCP Keepalive доходит до корней»статья.

<2.2.6>место, звоните#childOption(ChannelOption<T> childOption, T value)метод, позволяющийменьшие пакеты, уменьшая задержку.

Упражнения после уроков: для получения дополнительной информации вы можете просмотреть ее позже."Подробное программирование сокетов --- опция TCP_NODELAY"статья.

<2.2.7>место, звоните#childHandler(ChannelHandler childHandler)Метод, устанавливающий обработчик канала, подключенного к клиенту, в NettyServerHandlerInitializer. Позже мы в«2.1.2 Неттисерверхандлеринициализатор»Раздел посмотреть.

<2.3>место, звоните#bind() + #sync()метод, привязать порт иСинхронизироватьДождаться успеха, то есть запустить сервер.

🔥 ③ Давайте рассмотрим поближе#shutdown()Код метода, как реализовать отключение сервера Netty.

<3.1>, позвоните на канал#close()метод отключает Netty Server, чтобы клиенты больше не могли подключаться.

<3.2>, вызовите EventLoopGroup#shutdownGracefully()метод для корректного закрытия EventLoopGroup. Например, пулы потоков внутри них.

2.1.2 NettyServerHandlerInitializer

Прежде чем рассматривать код NettyServerHandlerInitializer, нам нужно понятьChannelHandlerКомпонент, используемый для обработки различных событий Channel. События здесь очень широкие, такие как соединения, чтение и запись данных, исключения, преобразования данных и так далее.

ChannelHandler имеет много подклассов, один из которых особенный.ChannelInitializer, который используется для реализации пользовательской логики инициализации при создании канала. Здесь мы создаемNettyServerHandlerInitializerclass, он наследует абстрактный класс ChannelInitializer, код выглядит следующим образом:

@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {

    /**
     * 心跳超时时间
     */
    private static final Integer READ_TIMEOUT_SECONDS = 3 * 60;

    @Autowired
    private MessageDispatcher messageDispatcher;
    @Autowired
    private NettyServerHandler nettyServerHandler;

    @Override
    protected void initChannel(Channel ch) {
        // <1> 获得 Channel 对应的 ChannelPipeline
        ChannelPipeline channelPipeline = ch.pipeline();
        // <2> 添加一堆 NettyServerHandler 到 ChannelPipeline 中
        channelPipeline
                // 空闲检测
                .addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS))
                // 编码器
                .addLast(new InvocationEncoder())
                // 解码器
                .addLast(new InvocationDecoder())
                // 消息分发器
                .addLast(messageDispatcher)
                // 服务端处理器
                .addLast(nettyServerHandler)
        ;
    }

}

Когда каждый клиент устанавливает соединение с сервером, сервер создает соответствующий ему канал. В этот момент NettyServerHandlerInitializer выполнит#initChannel(Channel c)метод пользовательской инициализации.

Дружеское напоминание: канал созданного клиента, не общаться с«2.1.1 Неттисервер»Путаница с NioServerSocketChannel в разделе не та.

существует#initChannel(Channel ch)методchПараметр — это клиентский канал, созданный в это время.

<1>, позвоните на канал#pipeline()метод для получения соответствующего канала клиентского каналаChannelPipeline. ChannelPipeline состоит из серии ChannelHandlers или ChannelHandlers.цепь. Таким образом, все события в канале будут проходить через ChannelPipeline и обрабатываться на нем ChannelHandler.

<2>, добавлятьпятьChannelHandler для ChannelPipeline, роль каждого смотрите в комментариях к нему. В частности, мы подробно объясним в следующих подразделах.

2.1.3 NettyServerHandler

СоздайтеNettyServerHandlerкласс, наследованиеChannelInboundHandlerAdapterКласс, реализующий клиентский каналУчреждатьсоединять,ОтключитьПодключение и обработка исключений. код показывает, как показано ниже:

@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private NettyChannelManager channelManager;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 从管理器中添加
        channelManager.add(ctx.channel());
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
        // 从管理器中移除
        channelManager.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);
        // 断开连接
        ctx.channel().close();
    }

}

① Добавить в класс@ChannelHandler.SharableАннотация, помечающая этот ChannelHandler, может использоваться несколькими каналами.

channelManagerсвойство, является менеджером клиентского канала, который мы реализовали.

  • #channelActive(ChannelHandlerContext ctx)методы на стороне клиента и сервераУчреждатьКогда соединение установлено, вызовите NettyChannelManager's#add(Channel channel)метод, добавьте кв.
  • #channelUnregistered(ChannelHandlerContext ctx)методы на стороне клиента и сервераОтключитьПри подключении вызовите NettyChannelManager's#add(Channel channel)метод, из которогоУдалить.

Исходный код конкретного NettyChannelManager, мы находимся в"2.1.4 NettyChannelManager"В раздел заходи~

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause)метод, когда при обработке событий канала возникает исключение, вызовите метод канала#close()метод,Отключитьподключение к клиенту.

2.1.4 NettyChannelManager

СоздайтеNettyChannelManagerкласс, обеспечивающийдваФункции.

🔥 ① Клиентский каналуправлять. код показывает, как показано ниже:

@Component
public class NettyChannelManager {

    /**
     * {@link Channel#attr(AttributeKey)} 属性中,表示 Channel 对应的用户
     */
    private static final AttributeKey<String> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("user");

    private Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * Channel 映射
     */
    private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();
    /**
     * 用户与 Channel 的映射。
     *
     * 通过它,可以获取用户对应的 Channel。这样,我们可以向指定用户发送消息。
     */
    private ConcurrentMap<String, Channel> userChannels = new ConcurrentHashMap<>();

    /**
     * 添加 Channel 到 {@link #channels} 中
     *
     * @param channel Channel
     */
    public void add(Channel channel) {
        channels.put(channel.id(), channel);
        logger.info("[add][一个连接({})加入]", channel.id());
    }

    /**
     * 添加指定用户到 {@link #userChannels} 中
     *
     * @param channel Channel
     * @param user 用户
     */
    public void addUser(Channel channel, String user) {
        Channel existChannel = channels.get(channel.id());
        if (existChannel == null) {
            logger.error("[addUser][连接({}) 不存在]", channel.id());
            return;
        }
        // 设置属性
        channel.attr(CHANNEL_ATTR_KEY_USER).set(user);
        // 添加到 userChannels
        userChannels.put(user, channel);
    }

    /**
     * 将 Channel 从 {@link #channels} 和 {@link #userChannels} 中移除
     *
     * @param channel Channel
     */
    public void remove(Channel channel) {
        // 移除 channels
        channels.remove(channel.id());
        // 移除 userChannels
        if (channel.hasAttr(CHANNEL_ATTR_KEY_USER)) {
            userChannels.remove(channel.attr(CHANNEL_ATTR_KEY_USER).get());
        }
        logger.info("[remove][一个连接({})离开]", channel.id());
    }
}

🔥 ② Отправить на канал клиентаИнформация. код показывает, как показано ниже:

@Component
public class NettyChannelManager {

    /**
     * 向指定用户发送消息
     *
     * @param user 用户
     * @param invocation 消息体
     */
    public void send(String user, Invocation invocation) {
        // 获得用户对应的 Channel
        Channel channel = userChannels.get(user);
        if (channel == null) {
            logger.error("[send][连接不存在]");
            return;
        }
        if (!channel.isActive()) {
            logger.error("[send][连接({})未激活]", channel.id());
            return;
        }
        // 发送消息
        channel.writeAndFlush(invocation);
    }

    /**
     * 向所有用户发送消息
     *
     * @param invocation 消息体
     */
    public void sendAll(Invocation invocation) {
        for (Channel channel : channels.values()) {
            if (!channel.isActive()) {
                logger.error("[send][连接({})未激活]", channel.id());
                return;
            }
            // 发送消息
            channel.writeAndFlush(invocation);
        }
    }

}

2.1.5 Введение зависимостей

Создайтеpom.xmlфайл, который вводит зависимости Netty.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lab-67-netty-demo</artifactId>
        <groupId>cn.iocoder.springboot.labs</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-67-netty-demo-server</artifactId>

    <properties>
        <!-- 依赖相关配置 -->
        <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
        <!-- 插件相关配置 -->
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- Spring Boot 基础依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- Netty 依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.50.Final</version>
        </dependency>

        <!-- 引入 netty-demo-common 封装 -->
        <dependency>
            <groupId>cn.iocoder.springboot.labs</groupId>
            <artifactId>lab-67-netty-demo-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

2.1.6 NettyServerApplication

СоздайтеNettyServerApplicationкласс, класс запуска Netty Server. код показывает, как показано ниже:

@SpringBootApplication
public class NettyServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(NettyServerApplication.class, args);
    }

}

2.1.7 Простой тест

Выполните класс NettyServerApplication, чтобы запустить сервер Netty Server. Журнал выглядит следующим образом:

... // 省略其他日志

2020-06-21 00:16:38.801  INFO 41948 --- [           main] c.i.s.l.n.server.NettyServer             : [start][Netty Server 启动在 8888 端口]
2020-06-21 00:16:38.893  INFO 41948 --- [           main] c.i.s.l.n.NettyServerApplication         : Started NettyServerApplication in 0.96 seconds (JVM running for 1.4)

Netty Server запускается в8888порт.

2.2 Создание клиента Netty

Создайтеlab-67-netty-demo-clientпроект, построить Nettyклиент. Как показано ниже:

项目结构

Ниже мы рассмотрим толькоclientКод под пакетом избегает лишней информации и ломает лысину толстых друзей.

2.2.1 NettyClient

СоздайтеNettyClientкласс, клиент Netty. код показывает, как показано ниже:

image.png

Дружеское напоминание: общий код, да и«2.1.1 Неттисервер»Эквивалентны и в основном одинаковы.

🔥 ① В классе добавить@ComponentАннотация, создание NettyClient передано в управление Spring.

  • serverHostиserverPortсвойство, читатьapplication.ymlфайл конфигурацииnetty.server.hostиnetty.server.portэлемент конфигурации.
  • #start()метод, добавить@PostConstructАннотировать, запустить клиент Netty.
  • #shutdown()метод, добавить@PreDestroyОбратите внимание, закройте клиент Netty.

🔥 ② Давайте рассмотрим поближе#start()Код метода, как реализовать запуск Netty Client и установить соединение с сервером.

<2.1>разместить, создатьBootstrapкласс, предоставленный NettyклиентКласс Startup удобен для нас для инициализации клиента.

<2.2>, Набор различных свойств Bootstrap.

<2.2.1>место, звоните#group(EventLoopGroup group)метод, установленный с помощьюeventGroupГруппа потоков, чтобы клиент подключался к серверу, данные считывались и записывались.

<2.2.2>место, звоните#channel(Class<? extends C> channelClass)метод, установленный с помощьюNioSocketChannelкласс, который является классом реализации TCP-клиента сервера NIO, определенным Netty.

<2.2.3>место, звоните#remoteAddress(SocketAddress localAddress)способ установить соединение с серверомадрес.

<2.2.4>место, звоните#option(ChannelOption<T> childOption, T value)метод TCP Keepalive, который реализует TCP-уровеньсердцебиениеФункции.

<2.2.5>место, звоните#childOption(ChannelOption<T> childOption, T value)метод, позволяющийменьшие пакеты, уменьшая задержку.

<2.2.7>место, звоните#handler(ChannelHandler childHandler)метод, установкаСвояОбработчик канала — NettyClientHandlerInitializer. позже мы2.2.2 NettyClientHanderInitializer "Раздел посмотреть.

<2.3>место, звоните#connect()метод, подключитесь к серверу иасинхронныйДождаться успеха, то есть запустить клиент. Заодно добавить прослушиватель обратного вызова ChannelFutureListener, при сбое подключения к серверу вызывать#reconnect()способ достижения синхронизированного переподключения. 😈 конкретный#reconnect()Код метода, его мы рассмотрим позже.

③ Давайте посмотрим поближе#shutdown()Код метода, как реализовать закрытие Netty Client.

<3.1>, позвоните на канал#close()метод, закройте Netty Client, чтобы клиент отключился от сервера.

<3.2>, вызовите EventLoopGroup#shutdownGracefully()метод для корректного закрытия EventLoopGroup. Например, пулы потоков внутри них.

#send(Invocation invocation)Метод, реализующий сообщение серверу.

Поскольку NettyClient является клиентом, нет необходимости использовать его как NettyServer."2.1.4 NettyChannelManager"Поддерживает коллекцию каналов.

2.2.2 NettyClientHandlerInitializer

созданныйNettyClientHandlerInitializerкласс наследует абстрактный класс ChannelInitializer.После установления соединения с сервером добавляется соответствующий обработчик ChannelHandler. код показывает, как показано ниже:

@Component
public class NettyClientHandlerInitializer extends ChannelInitializer<Channel> {

    /**
     * 心跳超时时间
     */
    private static final Integer READ_TIMEOUT_SECONDS = 60;

    @Autowired
    private MessageDispatcher messageDispatcher;

    @Autowired
    private NettyClientHandler nettyClientHandler;

    @Override
    protected void initChannel(Channel ch) {
        ch.pipeline()
                // 空闲检测
                .addLast(new IdleStateHandler(READ_TIMEOUT_SECONDS, 0, 0))
                .addLast(new ReadTimeoutHandler(3 * READ_TIMEOUT_SECONDS))
                // 编码器
                .addLast(new InvocationEncoder())
                // 解码器
                .addLast(new InvocationDecoder())
                // 消息分发器
                .addLast(messageDispatcher)
                // 客户端处理器
                .addLast(nettyClientHandler)
        ;
    }

}

и«2.1.2 Неттисерверхандлеринициализатор»Код в принципе тот же, разница в том, что дополнительно добавлено обнаружение простояIdleStateHandler, клиентский процессор заменяется наNettyClientHandler.

2.2.3 NettyClientHandler

СоздайтеNettyClientHandlerКласс, реализующий клиентский каналОтключитьПодключение и обработка исключений. код показывает, как показано ниже:

@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private NettyClient nettyClient;

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 发起重连
        nettyClient.reconnect();
        // 继续触发事件
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);
        // 断开连接
        ctx.channel().close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
        // 空闲时,向服务端发起一次心跳
        if (event instanceof IdleStateEvent) {
            logger.info("[userEventTriggered][发起一次心跳]");
            HeartbeatRequest heartbeatRequest = new HeartbeatRequest();
            ctx.writeAndFlush(new Invocation(HeartbeatRequest.TYPE, heartbeatRequest))
                    .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        } else {
            super.userEventTriggered(ctx, event);
        }
    }

}

① Добавить в класс@ChannelHandler.SharableАннотация, помечающая этот ChannelHandler, может использоваться несколькими каналами.

#channelInactive(ChannelHandlerContext ctx)метод, реализованный на стороне сервераОтключитьПри подключении вызовите NettyClient's#reconnect()метод, который реализует клиентвыбор времении серверПовторное подключение.

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause)метод, когда при обработке событий канала возникает исключение, вызовите метод канала#close()метод,Отключитьподключение к клиенту.

#userEventTriggered(ChannelHandlerContext ctx, Object event)метод, когда клиент бездействует, отправляет пульс на сервер, то естьМеханизм сердцебиения. Мы обсудим это подробно позже.

2.2.4 Введение зависимостей

Создайтеpom.xmlфайл, который вводит зависимости Netty.

image.png

image.png

2.2.5 NettyClientApplication

СоздайтеNettyClientApplicationКласс, Netty Client запускает класс. код показывает, как показано ниже:

@SpringBootApplication
public class NettyClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(NettyClientApplication.class, args);
    }

}

2.2.6 Простой тест

Выполните класс NettyClientApplication, чтобы запустить клиент Netty Client. Журнал выглядит следующим образом:

... // 省略其他日志

2020-06-21 09:06:12.205  INFO 44029 --- [ntLoopGroup-2-1] c.i.s.l.n.client.NettyClient             : [start][Netty Client 连接服务器(127.0.0.1:8888) 成功]

В то же время сервер Netty Server обнаруживает, что клиент подключен, и печатает следующий журнал:

2020-06-21 09:06:12.268  INFO 41948 --- [ntLoopGroup-3-1] c.i.s.l.n.server.NettyChannelManager     : [add][一个连接(db652822)加入]

2.3 Резюме

На данный момент мы завершили создание сервера и клиента Netty. Поскольку API, предоставляемый Netty, очень удобен, нам не нужно иметь дело с большим количеством низкоуровневого и подробного кода, как при непосредственном использовании NIO.

Тем не менее, вышеприведенный контент — это только закуска к этой статье, основной фильм вот-вот начнется! Красиво, продолжай смотреть вниз, Олли!

3. Протокол связи

существует«2. Создайте сервер и клиент Netty»В этом разделе мы реализовали клиентскую и серверную части.соединятьФункции. В этом разделе мы хотим заставить их двоих поговорить, то есть провестичтение и запись данных.

При разработке ежедневных проектов HTTP используется в качестве протокола связи между передней частью и задней частью, используятекстовое содержаниеДля взаимодействия формат данных обычноJSON. Но в мире TCP нам нужно основывать нашибинарныйСоздавайте, создавайте протоколы связи на стороне клиента и на стороне сервера.

В качестве примера возьмем клиент, отправляющий сообщение на сервер Предположим, что клиент хочет отправить запрос на вход в систему, и соответствующий класс выглядит следующим образом:

public class AuthRequest {

    /** 用户名 **/
    private String username;
    /** 密码 **/
    private String password;
    
}
  • Очевидно, что мы не можем кинуть Java-объект напрямую в TCP Socket, а должны преобразовать его вБайтовый байтовый массив, можно записать в TCP-сокет. То есть объект сообщения нужно передать черезСериализация, преобразованный в байтовый массив байтов.
  • При этом, когда сервер получает массив байтов, его нужно преобразовать в Java-объект, т. е.десериализовать. В противном случае сервер обрабатывает поток для строки байтов? !

Дружеское напоминание: тот же процесс происходит, когда сервер отправляет сообщение клиенту!

Существует множество инструментов для сериализации, например, предоставляемых Google.Protobuf, высокая производительность, а размер сериализованных двоичных данных небольшой. Netty интегрирует Protobuf и предоставляет соответствующиекодек. Как показано ниже:

Netty protobuf 包

Но учитывая, что многие толстые друзья не знают Protobuf, потому что он реализует сериализацию и увеличивает дополнительные затраты на обучение толстых друзей. Поэтому мы должны проявить осторожность и выяснить, или использоватьJSONспособ сериализации. Может быть, толстые друзья будут сбиты с толку, JSON необъектпреобразовать внить? Эй, давайте преобразуем строку вбайтовый массив байтовВот и все~

Далее создаем новыйlab-67-netty-demo-commonпроект, и вcodecПод пакетом реализуем наш пользовательский протокол связи. Как показано ниже:

项目结构

3.1 Invocation

СоздайтеInvocationКласс, тело сообщения протокола связи. код показывает, как показано ниже:

/**
 * 通信协议的消息体
 */
public class Invocation {

    /**
     * 类型
     */
    private String type;
    /**
     * 消息,JSON 格式
     */
    private String message;

    // 空构造方法
    public Invocation() {
    }

    public Invocation(String type, String message) {
        this.type = type;
        this.message = message;
    }

    public Invocation(String type, Message message) {
        this.type = type;
        this.message = JSON.toJSONString(message);
    }
    
    // ... 省略 setter、getter、toString 方法
}

typeАтрибуты,тип, используемый для сопоставления с соответствующим обработчиком сообщений. Если аналогия с протоколом HTTP,typeАтрибут эквивалентен адресу запроса.

messageСвойства, содержимое сообщения в формате JSON.

Кроме того,Message— это интерфейс сообщений, который мы определили. код показывает, как показано ниже:

public interface Message {

    // ... 空,作为标记接口

}

3.2 Наклеивание и распаковка

Прежде чем мы начнем рассматривать процессоры кодеков Invocation, давайте посмотримлипкий мешокиРаспаковатьКонцепция чего-либо.

Если содержание, цитируйте«Четыре решения для Netty, чтобы решить проблему склеивания и распаковки»Содержание статьи подлежит вторичному редактированию.

3.2.1 Причины

Основной причиной залипания пакета и проблемы с распаковкой является операционная система, в момент передачи TCP данных будет буфер подслоя, например, размером 1024 байта.

  • Если количество данных, отправленных в один запросменьше, прежде чем размер буфера будет достигнут, TCP объединит несколько запросов в один и тот же запрос на отправку, который формируетлипкий мешокпроблема.

    Например, в"Подробное программирование сокетов --- опция TCP_NODELAY"В статье мы видим, что когда алгоритм Nagle выключен, запрос не будет дождаться, чтобы соответствовать размеру буфера, но как можно скорее уменьшите задержку.

  • Если количество данных, отправляемых в одном запросебольше, превышающий размер буфера, TCP разделит его на несколько передач, чтоРаспаковка, то есть разделение большого пакета на несколько маленьких пакетов для отправки.

На следующем рисунке показана схема наклеивания и распаковки, демонстрирующая три ситуации наклеивания и распаковки:

示例图

  • Оба пакета A и B как раз соответствуют размеру буфера TCP, или их время ожидания достигло времени ожидания TCP, поэтому для отправки по-прежнему используются два отдельных пакета.
  • Интервал между двумя запросами A и B короткий, и пакеты данных невелики, поэтому они объединяются в тот же пакет и отправляются на сервер.
  • Пакет B относительно велик, поэтому для передачи он разбивается на два пакета B_1 и B_2.Здесь, поскольку разделенный пакет B_2 относительно мал, он объединяется с пакетом A и отправляется.

3.2.2 Решения

Для проблем с наклейкой и распаковкой есть три распространенных решения:

🔥 ① Когда клиент отправляет пакеты данных, каждый пакетФиксированная длина. Например, 1024 байта, если данные, передаваемые клиентом, представляют собой менее 1024 байта, указанная длина завершена путем дополнения пространства.

Таким образом, мы не обнаружили ни одного случая принятия этого способа.

🔥 ② Клиент использует фикс в конце каждого пакетаразделитель. Например\r\n, если пакет разделен, дождитесь отправки следующего пакета и найдите\r\n, а затем объедините его разделенный заголовок с остальной частью предыдущего пакета, в результате чего получится полный пакет.

Конкретные случаи включают HTTP, WebSocket и Redis.

🔥 ③ Разделите сообщение на заголовок и тело сообщения и сохраните текущее сообщение целиком в заголовке.длина сообщения, полное сообщение читается только после того, как будет прочитано сообщение достаточной длины.

Дружеское напоминание: Схема ③ является обновленной версией ①.Динамическая длина.

В этой статье мы будем использовать этот метод для записи в него длины массива байтов перед тем, как каждый вызов будет сериализован в массив байтов и записан в сокет TCP. Как показано ниже:

Invocation 序列化

3.3 InvocationEncoder

СоздайтеInvocationEncoder类,实现将 Invocation 序列化,并写入到 TCP Socket 中。 код показывает, как показано ниже:

public class InvocationEncoder extends MessageToByteEncoder<Invocation> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {
        // <2.1> 将 Invocation 转换成 byte[] 数组
        byte[] content = JSON.toJSONBytes(invocation);
        // <2.2> 写入 length
        out.writeInt(content.length);
        // <2.3> 写入内容
        out.writeBytes(content);
        logger.info("[encode][连接({}) 编码了一条消息({})]", ctx.channel().id(), invocation.toString());
    }

}

MessageToByteEncoderопределяется НеттикодированиеАбстрактный класс ChannelHandler, универсальный<I>Сообщение преобразуется в массив байтов.

#encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out)метод, логика кодирования.

<2.1>, вызвать JSON#toJSONBytes(Object object, SerializerFeature... features)метод для преобразования вызова в массив байтов.

<2.2>, преобразовать массив байтовдлина, записанный в сокет TCP. Таким образом, последующие«3.4 Декодер вызова»По этой длине сообщение может быть проанализировано,Решить проблему склеивания и распаковки.

Дружеское напоминание: MessageToByteEncoder в конечном итогеByteBuf outЗапись в TCP-сокет.

<2.3>В точке массив байтов записывается в розетку TCP.

3.4 InvocationDecoder

СоздайтеInvocationDecoderКласс, реализующий чтение массивов байтов из TCP-сокета и их десериализацию в Invocation. код показывает, как показано ниже:

image.png

ByteToMessageDecoderопределяется НеттирасшифровкаАбстрактный класс ChannelHandler, читаемый в TCP Socketновые данные, запустить декодирование.

② в<2.1>,<2.2>,<2.3>, чтение из TCP-сокетадлина.

③ в<3.1>,<3.2>,<3.3>, чтение из TCP-сокетабайтовый массиви десериализован в объект Invocation.

Наконец, добавьтеList<Object> outи передать его следующему обработчику каналов для обработки. Позже мы«4. Распространение сообщений»В маленьком узле ты увидишьMessageDispatcherРаспространить вызов на соответствующийMessageHandlerв, продолжитьбизнесвыполнение логики.

3.5 Знакомство с зависимостями

Создайтеpom.xmlфайл, импортируя зависимости, такие как Netty, FastJSON и т. д.

image.png

3.6 Резюме

Пока что мы завершили определение протокола связи и логики кодирования и декодирования, разве это не интересно? !

Так же в коде инициализации NettyServerHandlerInitializer и NettyClientHandlerInitializer добавляем к нему кодек. Как показано ниже:

编解码器的初始化

4. Распространение сообщений

существуетSpringMVCсередина,DispatcherServletВ соответствии с адресом запроса, методом и т. д. запрос будет распределен по методу Method соответствующего контроллера.

существуетlab-67-netty-demo-clientПроектdispatcherпакет, мы создалиMessageDispatcherКласс, который реализует функции, аналогичные DispatcherServlet, и распределяет Invocation на соответствующий емуMessageHandlerв, продолжитьбизнесвыполнение логики.

dispatcher 包

Далее, давайте взглянем на конкретную реализацию кода.

4.1 Message

СоздайтеMessageinterface, который определяет интерфейс маркера для сообщения. код показывает, как показано ниже:

public interface Message {
}

На следующем рисунке показан класс реализации Message, в котором мы участвуем. Как показано ниже:

Message 实现类

4.2 MessageHandler

СоздайтеMessageHandlerинтерфейс, интерфейс обработчика сообщений. код показывает, как показано ниже:

public interface MessageHandler<T extends Message> {

    /**
     * 执行处理消息
     *
     * @param channel 通道
     * @param message 消息
     */
    void execute(Channel channel, T message);

    /**
     * @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段
     */
    String getType();

}
  • дженерики определены<T>, который должен быть классом реализации Message.
  • Два метода интерфейса определены, толстые друзья, посмотрите на сами комментарии.

На следующем рисунке показан класс реализации MessageHandler, в котором мы участвуем. Как показано ниже:

MessageHandler 实现类

4.3 MessageHandlerContainer

СоздайтеMessageHandlerContainerКласс, который действует как контейнер для MessageHandler s. код показывает, как показано ниже:

image.png

① ОсознатьInitializingBeanинтерфейс, в#afterPropertiesSet()сканируйте все компоненты MessageHandler Beans и добавляйте их в коллекцию MessageHandler.

② в#getMessageHandler(String type)метод, получите объект MessageHandler, соответствующий типу. Позже мы вызовем этот метод в MessageDispatcher.

③ в#getMessageClass(MessageHandler handler)В этом методе класс Class, соответствующий типу сообщения, получается путем разбора универсальных шаблонов в его классе в MessageHandler. Вот ссылкаrocketmq-springПроектDefaultRocketMQListenerContainer#getMessageType()метод с небольшими изменениями.

Дружеское напоминание: если вы немного не понимаете общий механизм Java, вы можете быть немного хардкорным. Вы можете временно пропустить его, пока не узнаете намерение.

4.4 MessageDispatcher

СоздайтеMessageDispatcherкласс, который распространяет Invocation на соответствующий ему MessageHandler для выполнения бизнес-логики. код показывает, как показано ниже:

@ChannelHandler.Sharable
public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {

    @Autowired
    private MessageHandlerContainer messageHandlerContainer;

    private final ExecutorService executor =  Executors.newFixedThreadPool(200);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {
        // <3.1> 获得 type 对应的 MessageHandler 处理器
        MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());
        // 获得  MessageHandler 处理器的消息类
        Class<? extends Message> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);
        // <3.2> 解析消息
        Message message = JSON.parseObject(invocation.getMessage(), messageClass);
        // <3.3> 执行逻辑
        executor.submit(new Runnable() {

            @Override
            public void run() {
                // noinspection unchecked
                messageHandler.execute(ctx.channel(), message);
            }

        });
    }

}

① Добавить в класс@ChannelHandler.SharableАннотация, помечающая этот ChannelHandler, может использоваться несколькими каналами.

SimpleChannelInboundHandlerопределяется Неттиобработка сообщенийАбстрактный класс ChannelHandler, тип сообщения обработки<I>Дженерики Вин.

#channelRead0(ChannelHandlerContext ctx, Invocation invocation)метод, обработайте сообщение и распространите его.

消息分发

<3.1>, вызовите MessageHandlerContainer#getMessageHandler(String type)способ получить вызовtypeСоответствующий обработчик сообщенийпроцессор.

Затем вызовите MessageHandlerContainer#getMessageClass(messageHandler)Метод, получить процессор MessageHandlerкласс сообщения.

<3.2>, вызвать JSON## parseObject(String text, Class<T> clazz)метод, который будет вызыватьmessageРазобрать в MessageHandler соответствующийОбъект сообщения.

<3.3>, бросьте его в пул потоков, а затем вызовите MessageHandler's#execute(Channel channel, T message)метод, выполнитьБизнес-логика.

УведомлениеПочему ты хочешь проиграть?executorНить пул это? Мы начинаем понимать модуль Threading EventGroup.

Дружеское напоминание: когда мы запускаем сервер или клиент Netty, будет установлена ​​его группа событий.

EventGroup можно просто понимать какПул потокови размер пула потоковТолькоколичество ЦП * 2. каждый каналТолькобудет закреплен за одним изнить, для чтения и записи данных. Кроме того, несколько каналов будутобщийПоток, то есть использующий один и тот же поток для чтения и записи данных.

Итак, толстые друзья пытаются подумать о конкретном логическом виде MessageHandler, который часто включает в себяобработка ввода-вывода, например, для чтения из базы данных. Таким образом, это заставит канал выполнять MessageHandler в процессе,блокироватьчитать данные из других каналов, которые совместно используют текущий поток.

Итак, здесь мы создаемexecutorПул потоков, выполнять логическое выполнение MessageHandler, избегатьблокироватьДанные канала прочитаны.

Может быть, толстый друг скажет, что мы можем установить большую часть пула потоков EventGroup, например 200? Для сервера NETTY с длительным подключением часто будет 1000 ~ 100 000 клиентских подключений NETTY, поэтому независимо от того, сколько установлено пула потоков, он появится.блокироватьСостояние чтения данных.

дружеское напоминание:executorПул потоков, который мы обычно называем пулом бизнес-потоков или логическим пулом потоков, как следует из названия, выполняет бизнес-логику.

Этот метод проектирования в настоящее время используется фреймворками RPC, такими как Dubbo.

Последующие действия, толстые друзья могут внимательно читать«[Серия NIO] - Модель реактора»статью для дальнейшего понимания.

4.5 NettyServerConfig

СоздайтеNettyServerConfigКласс конфигурации для создания bean-компонентов MessageDispatcher и MessageHandlerContainer. код показывает, как показано ниже:

@Configuration
public class NettyServerConfig {

    @Bean
    public MessageDispatcher messageDispatcher() {
        return new MessageDispatcher();
    }

    @Bean
    public MessageHandlerContainer messageHandlerContainer() {
        return new MessageHandlerContainer();
    }

}

4.6 NettyClientConfig

Дружеское напоминание: и«4.5 Неттисерверконфиг»Заключение одинаково.

СоздайтеNettyClientConfigКласс конфигурации для создания bean-компонентов MessageDispatcher и MessageHandlerContainer. код показывает, как показано ниже:

@Configuration
public class NettyClientConfig {

    @Bean
    public MessageDispatcher messageDispatcher() {
        return new MessageDispatcher();
    }

    @Bean
    public MessageHandlerContainer messageHandlerContainer() {
        return new MessageHandlerContainer();
    }

}

4.7 Резюме

Позже мы продемонстрируем использование рассылки сообщений в следующих подразделах:

5. Отключить и снова подключить

Netty клиент должен реализоватьОтключить и снова подключитьМеханизмы устранения отключения в различных ситуациях. Например:

  • При запуске клиента Netty сервер Netty зависает, что приводит к невозможности подключения.
  • Во время работы сервер Netty зависает, вызывая отключение соединения.
  • Сеть при любом конце развевается, вызывая отключение соединения ненормально.

Конкретная реализация кода относительно проста, нужно увеличить только в двух местах.Повторное подключениемеханизм.

  • Нетти-клиентзапускатьКогда сервер Netty не может быть подключен, инициируется повторное подключение.
  • Нетти-клиентбегатьПри отключении от Netty инициируйте повторное подключение.

Учитывая, что переподключение будет существоватьПотерпеть поражениеслучай, мы используемвыбор времениДаже тяжелый путь, чтобы избежать потребления слишком много ресурсов.

5.1 Специальный код

① вNettyClientв, обеспечить#reconnect()способ реализации логики временного переподключения. код показывает, как показано ниже:

// NettyClient.java

public void reconnect() {
    eventGroup.schedule(new Runnable() {
        @Override
        public void run() {
            logger.info("[reconnect][开始重连]");
            try {
                start();
            } catch (InterruptedException e) {
                logger.error("[reconnect][重连失败]", e);
            }
        }
    }, RECONNECT_SECONDS, TimeUnit.SECONDS);
    logger.info("[reconnect][{} 秒后将发起重连]", RECONNECT_SECONDS);
}

предоставляется вызовом EventLoop#schedule(Runnable command, long delay, TimeUnit unit)метод, реализациявыбор временилогика. И во внутренней конкретной логике вызовите NettyClient's#start()способ инициировать соединение с сервером Netty.

И поскольку NettyClient находится в#start()Метод заключается в подключении к серверу Netty.Потерпеть поражение, он позвонит снова#reconnect()метод, таким образом, повторно инициацияПовторное подключение по времени. Этот цикл повторяется до тех пор, пока клиент Netty не подключится к серверу Netty. Как показано ниже:

NettyClient 重连

② вNettyClientHandlerв, осознать#channelInactive(ChannelHandlerContext ctx)метод, на стороне сервера обнаружения и NettyОтключитьПри вызове Netty Client#reconnect()方法,发起重连。 код показывает, как показано ниже:

// NettyClientHandler.java

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    // 发起重连
    nettyClient.reconnect();
    // 继续触发事件
    super.channelInactive(ctx);
}

5.2 Простой тест

① Запустите Netty Client, не запускайте Netty Server, консоль распечатает журнал, как показано ниже:

重连失败

Видно, что Netty Client постоянно инициирует регулярное переподключение при сбое соединения.

② Запустите Netty Server, и консоль распечатает, как показано ниже:

重连成功

Вы можете видеть, что клиент Netty успешно повторно подключается к серверу Netty.

6. Механизм сердцебиения и обнаружение простоя

В вышеизложенном рекомендую толстым друзьям прочитать«Механизм TCP Keepalive доходит до корней»статья, мы можем узнать о TCPПринести свой собственныймеханизм обнаружения холостого хода,дефолтсоставляет 2 часа. Такой механизм обнаружения, отсистемный ресурсуровень приемлемый.

Но когдабизнесНа уровне, если на обнаружение связи между клиентом и сервером уходит 2 часадействительныйОн был отключен, что приведет к потере большого количества сообщений в середине, что повлияет на опыт использования клиентом.

Поэтому нам нужнобизнесНа уровне внедряйте выявление простоя самостоятельно, чтобы убедиться, чтокак можно скорееОткройте для себя клиент и сервердействительныйотключен. Логика реализации следующая:

  • СерверНе найдено со 180 секундыклиентпрочитай сообщение,инициативаОтключить.
  • клиентНайдено 180 секунд не изСерверпрочитай сообщение,инициативаОтключить.

Учитывая, что между клиентом и сервером не всегда происходит обмен сообщениями, нам необходимо увеличитьМеханизм сердцебиения:

  • клиенткаждые 60 секунд доСерверИнициируйте контрольное сообщение, чтобы убедиться, чтоСерверСообщение можно прочитать.
  • СерверКогда получено сообщение пульса, ответьтеклиентПодтверждающее сообщение, гарантирующееклиентСообщение можно прочитать.

дружеское напоминание:

  • Почему 180 секунд? Вы можете увеличить или уменьшить его, в зависимости от того, насколько быстро вы хотите обнаруживать аномалии соединения. Если время слишком короткое, сердцебиение будет слишком частым и потребует слишком много ресурсов.
  • Почему 60 секунд? Три попытки проверить, истекло ли время сердцебиения.

Хотя это звучит немного сложно, это не сложно реализовать.

6.1 Обнаружение бездействия на стороне сервера

существуетNettyServerHandlerInitializer, мы добавилиReadTimeoutHandlerПроцессор, которому не удается прочитать данные от однорангового узла в течение указанного времени, выдает исключение ReadTimeoutException. Как показано ниже:

ReadTimeoutHandler

Таким способом добитьсяСерверНайдено 180 секунд не изклиентпрочитай сообщение,инициативаОтключить.

6.2 Обнаружение бездействия клиента

Дружеское напоминание: и"Обнаружение свободного сервера 6.1"Последовательный.

существуетNettyClientHandlerInitializer, мы добавилиReadTimeoutHandlerПроцессор, которому не удается прочитать данные от однорангового узла в течение указанного времени, выдает исключение ReadTimeoutException. Как показано ниже:

ReadTimeoutHandler

Таким способом добитьсяклиентНайдено 180 секунд не изСерверпрочитай сообщение,инициативаОтключить.

6.3 Механизм сердцебиения

Нетти предоставляетIdleStateHandlerПроцессор, обеспечивающий функцию обнаружения простоя, в каналечитать или писатьКогда время простоя слишком велико,IdleStateEventсобытие.

Таким образом, нам нужно толькоNettyClientHandlerПроцессор, при получении события IdleStateEvent, клиент отправляет клиенту пульсирующее сообщение. Как показано ниже:

客户端心跳

В то же время мыСерверпроект, создалHeartbeatRequestHandlerПроцессор сообщений при получении запроса пульса от клиента отвечает клиенту подтверждающим сообщением. код показывает, как показано ниже:

@Component
public class HeartbeatRequestHandler implements MessageHandler<HeartbeatRequest> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, HeartbeatRequest message) {
        logger.info("[execute][收到连接({}) 的心跳请求]", channel.id());
        // 响应心跳
        HeartbeatResponse response = new HeartbeatResponse();
        channel.writeAndFlush(new Invocation(HeartbeatResponse.TYPE, response));
    }

    @Override
    public String getType() {
        return HeartbeatRequest.TYPE;
    }

}

6.4 Простой тест

Запустите сервер Netty Server, затем запустите клиент Netty Client, терпеливо подождите 60 секунд, вы можете увидетьсердцебиениеЖурнал выглядит следующим образом:

image.png

7. Логика аутентификации

Дружеское напоминание: начиная с этого раздела, давайте рассмотрим пример обработки бизнес-логики.

Процесс аутентификации показан на следующем рисунке:

认证流程

7.1 AuthRequest

СоздайтеAuthRequestкласс, определяющий пользователяСертификацияпросить. код показывает, как показано ниже:

public class AuthRequest implements Message {

    public static final String TYPE = "AUTH_REQUEST";

    /**
     * 认证 Token
     */
    private String accessToken;
    
    // ... 省略 setter、getter、toString 方法
}

Здесь мы используемaccessTokenМаркер аутентификации для аутентификации.

Потому что обычно мы используем HTTP для входа в систему, а затем используем вошедший в систему идентификатор (например, скажем,accessTokenМаркер аутентификации), который связывает клиента и текущего пользователя для аутентификации.

7.2 AuthResponse

СоздайтеAuthResponseкласс, определяющий пользователяСертификацияотклик. код показывает, как показано ниже:

public class AuthResponse implements Message {

    public static final String TYPE = "AUTH_RESPONSE";

    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 响应提示
     */
    private String message;
    
    // ... 省略 setter、getter、toString 方法
}

7.3 AuthRequestHandler

Сервер...

СоздайтеAuthRequestHandlerкласс, дляСервериметь дело склиентзапрос аутентификации. код показывает, как показано ниже:

image.png

Код относительно прост, толстые друзья взгляните<1>,<2>,<3>,<4>Комментировать.

7.4 AuthResponseHandler

Клиент...

СоздайтеAuthResponseHandlerкласс, дляклиентиметь дело сСерверответ аутентификации. код показывает, как показано ниже:

@Component
public class AuthResponseHandler implements MessageHandler<AuthResponse> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, AuthResponse message) {
        logger.info("[execute][认证结果:{}]", message);
    }

    @Override
    public String getType() {
        return AuthResponse.TYPE;
    }

}

Результат аутентификации печати, чтобы облегчить отладку.

7.5 TestController

Клиент...

СоздайтеTestControllerкласс, обеспечивающий/test/mockИнтерфейс имитирует клиента для отправки запроса на сервер. код показывает, как показано ниже:

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private NettyClient nettyClient;

    @PostMapping("/mock")
    public String mock(String type, String message) {
        // 创建 Invocation 对象
        Invocation invocation = new Invocation(type, message);
        // 发送消息
        nettyClient.send(invocation);
        return "success";
    }

}

7.6 Простой тест

Запустите сервер Netty Server, запустите клиент Netty Client и используйте Postman для имитации запроса аутентификации. Как показано ниже:

Postman 模拟认证请求

При этом вы можете увидеть журнал успешной аутентификации следующим образом:

image.png

8. Логика единого чата

Процесс приватного чата показан на следующем рисунке:

私聊流程

Сервер отвечает за пересылку сообщения приватного чата, отправленного клиентом А, клиенту Б.

8.1 ChatSendToOneRequest

СоздайтеChatSendToOneRequestКласс, запрос на отправку сообщения приватного чата указанному человеку. код показывает, как показано ниже:

public class ChatSendToOneRequest implements Message {

    public static final String TYPE = "CHAT_SEND_TO_ONE_REQUEST";

    /**
     * 发送给的用户
     */
    private String toUser;
    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    
    // ... 省略 setter、getter、toString 方法
}

8.2 ChatSendResponse

СоздайтеChatSendResponseкласс, ответ чата на отправку сообщения результат. код показывает, как показано ниже:

public class ChatSendResponse implements Message {

    public static final String TYPE = "CHAT_SEND_RESPONSE";

    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 响应提示
     */
    private String message;
    
    // ... 省略 setter、getter、toString 方法
}

8.3 ChatRedirectToUserRequest

СоздайтеChatRedirectToUserRequestКласс, который пересылает сообщение на запрос пользователя. код показывает, как показано ниже:

public class ChatRedirectToUserRequest implements Message {

    public static final String TYPE = "CHAT_REDIRECT_TO_USER_REQUEST";

    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    
    // ... 省略 setter、getter、toString 方法
}

Дружеское напоминание: написав, я вдруг обнаружил, что один пропалfromUserПоле, указывающее, от кого пришло сообщение.

8.4 ChatSendToOneHandler

Сервер...

СоздайтеChatSendToOneHandlerкласс, дляСервериметь дело склиентзапрос приватного чата. код показывает, как показано ниже:

image.png

Код относительно прост, толстые друзья взгляните<1>,<2>Примечания к .

8.5 ChatSendResponseHandler

Клиент...

СоздайтеChatSendResponseHandlerкласс, дляклиентиметь дело сСерверответ в чате. код показывает, как показано ниже:

@Component
public class ChatSendResponseHandler implements MessageHandler<ChatSendResponse> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, ChatSendResponse message) {
        logger.info("[execute][发送结果:{}]", message);
    }

    @Override
    public String getType() {
        return ChatSendResponse.TYPE;
    }

}

распечатать чатОтправитьВ результате облегчается отладка.

8.6 ChatRedirectToUserRequestHandler

клиент

СоздайтеChatRedirectToUserRequestHandlerкласс, дляклиентиметь дело сСерверзапрос на пересылку сообщения. код показывает, как показано ниже:

@Component
public class ChatRedirectToUserRequestHandler implements MessageHandler<ChatRedirectToUserRequest> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, ChatRedirectToUserRequest message) {
        logger.info("[execute][收到消息:{}]", message);
    }

    @Override
    public String getType() {
        return ChatRedirectToUserRequest.TYPE;
    }

}

распечатать чатполучить сообщение, что удобно для отладки.

8.7 Простой тест

① Запустите сервер Netty Server.

② Запустите клиент NettyКлиент А. Затем используйте Postman для имитации запроса аутентификации (пользовательyunai). Как показано ниже:

Postman 模拟认证请求

③ Запустите клиент NettyКлиент Б. Обратите внимание, что вам нужно установить--server.portПорт 8081 во избежание конфликтов. Как показано ниже:

IDEA 设置

Затем используйте Postman для имитации запроса аутентификации (пользовательtutou). Как показано ниже:

Postman 模拟认证请求

④ Наконец, используйте Postman для имитации один разyunaiТаро даютtutouPotato отправляет личное сообщение в чат. Как показано ниже:

Postman 模拟私聊请求

В то же время вы можете увидеть журнал отправки приватных сообщений клиента А клиенту Б следующим образом:

image.png

9. Логика группового чата

Процесс группового чата показан на следующем рисунке:

群聊流程

Сервер отвечает за пересылку сообщения группового чата, отправленного клиентом А, клиентам А, В и С.

Дружеское напоминание: учитывая простоту логики, примеры в этом подразделе, предоставленные Xiaoyu, относятся не к одной группе, а к каждому в большом групповом чате~

9.1 ChatSendToAllRequest

СоздайтеChatSendToOneRequestкласс, всем разослалГрупповой чатзапрос сообщения. код показывает, как показано ниже:

public class ChatSendToAllRequest implements Message {

    public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";

    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    
    // ... 省略 setter、getter、toString 方法
}

Дружеское напоминание: если это серьезный групповой чат,groupIdполе, указывающее номер группы.

9.2 ChatSendResponse

и«8.2 Ответ на отправку чата»Разделы согласуются.

9.3 ChatRedirectToUserRequest

и«8.3 ЧатРедиректтоусеррекуест»Разделы согласуются.

9.4 ChatSendToAllHandler

Сервер...

СоздайтеChatSendToAllHandlerкласс, дляСервериметь дело склиентзапрос в групповой чат. код показывает, как показано ниже:

image.png

Код относительно прост, толстые друзья взгляните<1>,<2>Примечания к .

9.5 ChatSendResponseHandler

и«8.5 ChatsendResponseHandler»Разделы согласуются.

9.6 ChatRedirectToUserRequestHandler

и«8.6 ChatRedirectToUserRequestHandler»Разделы согласуются.

9.7 Простой тест

① Запустите сервер Netty Server.

② Запустите клиент NettyКлиент А. Затем используйте Postman для имитации запроса аутентификации (пользовательyunai). Как показано ниже:

Postman 模拟认证请求

③ Запустите клиент NettyКлиент Б. Обратите внимание, что вам нужно установить--server.portПорт 8081 во избежание конфликтов.

IDEA 设置

④ Запустите клиент NettyКлиент С. Обратите внимание, что вам нужно установить--server.portПорт 8082 во избежание конфликтов.

IDEA 设置

⑤ Наконец, используйте Postman для имитации отправки сообщения группового чата. Как показано ниже:

Postman 模拟群聊请求

В то же время видно, что клиент АГруппаЛоги для всех клиентов следующие:

image.png

666. Пасхальное яйцо

До сих пор мы реализовали простую функцию обмена мгновенными сообщениями через Netty, это большой выигрыш, хе-хе.

Теперь на совесть порекомендую очередную волну статей, хе-хе.

И другое продолжение, Сьюзан Чан НайGitHub.com/YuNaiV/onemei…В проекте с открытым исходным кодом реализована относительно полная функция обслуживания клиентов, хахаха~