Netty | Анализ рабочего процесса и описание основных компонентов и практические примеры использования кода

задняя часть Netty
Netty | Анализ рабочего процесса и описание основных компонентов и практические примеры использования кода

Эта статья участвовала в "Проект «Звезда раскопок»”, чтобы выиграть творческий подарочный пакет и бросить вызов творческим поощрительным деньгам.

Преамбула:Ваше первое приложение Netty

В предыдущей статье было написано первое приложение входа Netty.В этой статье в основном анализируется рабочий процесс и основные компоненты Netty из приведенного выше кода в сочетании с блок-схемой этой статьи.

Наконец, мы приведем пример для дальнейшего понимания.

Надеюсь, вы можете получить что-то! ! 🚀

1. Рабочий процесс Netty

Давайте сначала взглянем на диаграмму принципов работы Netty, кратко поговорим о рабочем процессе, а затем проанализируем основные компоненты Netty один за другим с помощью этой диаграммы.

1.1 Схема работы сервера:

image-20210825155927290

1.2, анализ рабочего процесса сервера:

  1. Когда запустится серверная часть, привяжите локальный порт и инициализируйтеNioServerSocketChannel.

  2. поставить себяNioServerSocketChannelзарегистрироваться вBossNioEventLoopGroupна селекторе.

    • Серверная часть содержит 1Boss NioEventLoopGroupИWorker NioEventLoopGroup,
    • Boss NioEventLoopGroupВ частности, отвечает за получение соединений от клиентов,Worker NioEventLoopGroupОтвечает за чтение и запись в сети
    • NioEventLoopGroup эквивалентна группе циклов событий.Эта группа содержит несколько циклов событий NioEventLoop, и каждый NioEventLoop содержит 1 селектор и 1 поток цикла событий.
  3. BossNioEventLoopGroupЦиклические задачи:

    1. Опросить событие принятия;

    2. Процесс события приема и зарегистрируйте генерируемую ниосокархАНел с определеннымWorkNioEventLoopGroupна Селекторе.

    3. Обработайте задачи в очереди задач, запустите AllTasks. Задачи в очереди задач включают вызовы пользователейeventloop.execute或scheduleВыполненные задачи или другие потоки, отправленные в этотeventloopзадача.

  4. WorkNioEventLoopGroupЦиклические задачи:

    • голосованиеread和Writeмероприятие

    • Обработка событий ввода-вывода, когда происходят события чтения и записи NioSocketChannel, обрабатывается обратный вызов (триггер) ChannelHandler.

    • Задача, которая обрабатывает очередь задач, т.е.runAllTasks

1.3 Схема работы клиента

image-20210825231934496

Процесс не будет повторяться и описываться 😁

Во-вторых, основные компоненты модуля

Основные компоненты Netty примерно следующие:

  1. Интерфейс канала
  2. Интерфейс EventLoopGroup
  3. Интерфейс ChannelFuture
  4. Интерфейс ChannelHandler
  5. Интерфейс ChannelPipeline
  6. Интерфейс ChannelHandlerContext
  7. Абстрактный класс SimpleChannelInboundHandler
  8. Bootstrap, класс ServerBootstrap
  9. Интерфейс ChannelFuture
  10. Класс ChannelOption

2.1, интерфейс канала

Обычно мы используем базовые операции ввода-вывода (bind(), connect(), read() и write()), которые существенно зависят от примитивов, предоставляемых базовой сетевой передачей, которые в JavaSocketДобрый.

API, предоставляемый интерфейсом Netty Channel, значительно сокращает прямое использованиеSocketсложность класса. Кроме тогоChannelобеспечить асинхронную сетьI/Oопераций (таких как установление соединений, чтение и запись, привязка портов), асинхронные вызовы означают любыеI/Oвызовы будут возвращаться немедленно, и нет гарантии, что запрошенныйI/OОперация завершена.

возвращает a сразу после завершения вызоваChannelFutureнапример, зарегистрировав слушателей наChannelFutureна, поддержка наI/OНемедленный обратный вызов, чтобы уведомить вызывающую сторону об успешном завершении операции, сбое или ее отмене.

Кроме того, Channel является корнем широкой иерархии классов со многими предопределенными специализированными реализациями, такими как:

  • LocalServerChannel: ServerChannel для локального транспорта, позволяющего обмениваться данными с виртуальными машинами.
  • EmbeddedChannel: базовый класс для реализаций канала, используемых во встроенном виде.
  • NioSocketChannel: Асинхронный клиент TCP, подключение через сокет.
  • NioServerSocketChannel: Асинхронный TCP на стороне сервера, соединение через сокет.
  • NioDatagramChannel: Асинхронное соединение UDP.
  • NioSctpChannel: асинхронное соединение Sctp на стороне клиента, использующее неблокирующий режим и позволяющее читать/записывать SctpMessage в базовый SctpChannel.
  • NioSctpServerChannel: асинхронные соединения Sctp на стороне сервера, эти каналы охватывают сетевой ввод-вывод UDP и TCP и файловый ввод-вывод.

2.2, интерфейс EventLoopGroup

EventLoopОпределяет базовую абстракцию Netty для обработки событий, происходящих в течение жизненного цикла соединения.

Netty абстрагирует Selector от приложения, запуская события, устраняя весь код диспетчеризации, который в противном случае нужно было бы писать вручную. Внутри каждому каналу будет назначен EventLoop для обработки всех событий, включая:

  • вопрос регистрации;
  • Отправка событий в ChannelHandler;
  • Запланируйте дальнейшие действия.

Однако мы не будем здесь углубляться в это, а сделаем краткое описание связи между Channel, EventLoop, Thread и EventLoopGroup.

  • ОдинEventLoopGroupсодержит один или несколькоEventLoop;
  • каждыйEventLoopПоддерживает аSelectorНапример, EventLoop — только один в своем жизненном цикле.Thread связывать;
  • Поэтому все поEventLoopВсе обрабатываемые события ввода-вывода будутThreadобрабатывается выше, практически исключая необходимость синхронизации;
  • ОдинChannelзарегистрированы только в одном во время его жизниEventLoop;
  • ОдинEventLoopможет быть назначен один или несколькоChannel.
  • Обычно служебный порт представляет собойServerSocketChannelсоответствует одномуSelectorс однимEventLoopнить.BossEventLoopОтвечает за прием клиентских подключений иSocketChannelсдаватьWorkerEventLoopGroupЧтобы выполнить обработку ввода-вывода, как показано на блок-схеме выше.

2.3, интерфейс ChannelFuture

Все операции ввода/вывода в Netty являются асинхронными. Поскольку операция может вернуться не сразу, нам нужен способ определить ее результат в более поздний момент времени. Конкретная реализация осуществляется черезFutureа такжеChannelFutures,ЭтоaddListener()Метод зарегистрировалChannelFutureListener, чтобы зарегистрированные события прослушивателя запускались автоматически после завершения операции (независимо от того, успешно она или нет).

Общие методы

  • Channel channel(), возвращает текущий прогрессIOрабочий канал
  • ChannelFuture sync(), дождитесь завершения асинхронной операции

2.4, интерфейс ChannelHandler

Из предыдущей вводной процедуры мы можем видетьChannelHandlerВажность в Netty заключается в том, что он действует как контейнер для всей логики приложения, которая обрабатывает входящие и исходящие данные. Большая часть нашей бизнес-логики также написана в реализованном классе слов, вдобавокChannelHandlerМетод автоматически запускается событием, и его не нужно отправлять нам.

ChannelHandlerСуществует множество классов реализации или подинтерфейсов. Обычно мы просто идем к наследованию или подинтерфейсу, а потом переписываем методы внутри.

image-20210825214929693

Наиболее распространенные типы обработчиков:

  • ChannelInboundHandler : получать входящие события и данные
  • ChannelOutboundHandler: Для обработки исходящих событий и данных.

Общие адаптеры:

  • ChannelInboundHandlerAdapter: для обработки входящих событий ввода-вывода

    ChannelInboundHandlerАбстрактный базовый класс для реализаций, предоставляющий реализации для всех методов. Эта реализация просто перенаправляет операциюChannelPipelineследующийChannelHandler . Подклассы могут переопределить реализацию метода, чтобы изменить это.

  • ChannelOutboundHandlerAdapter: используется для обработки исходящих событий ввода-вывода

Нам часто нужно настроить одинHandlerкласс для наследованияChannelInboundHandlerAdapter, а затем реализовать бизнес-логику, переопределив соответствующие методы, посмотрим, какие методы можно переопределить:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
	//注册事件
    public void channelRegistered(ChannelHandlerContext ctx) ;
	// 
    public void channelUnregistered(ChannelHandlerContext ctx);
	//通道已经就绪
    public void channelActive(ChannelHandlerContext ctx);
	
    public void channelInactive(ChannelHandlerContext ctx) ;
    //通道读取数据事件
    public void channelRead(ChannelHandlerContext ctx, Object msg) ;
	//通道读取数据事件完毕
    public void channelReadComplete(ChannelHandlerContext ctx) ;

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt);
	//通道可写性已更改
    public void channelWritabilityChanged(ChannelHandlerContext ctx);
	//异常处理
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
}

2.5, интерфейс ChannelPipeline

ChannelPipelineПредоставляет контейнер для цепочки ChannelHandlers и определяет API для распространения потока входящих и исходящих событий в цепочке. При создании канала он автоматически назначается своему собственномуChannelPipeline. Соотношение их состава следующее:

image-20210825223614701

ОдинChannelсодержитChannelPipeline,а такжеChannelPipelineв другом, поддерживаемомChannelHandlerContextсостоит из двусвязных списков, и каждыйChanneHandlerContextсвязанный с другимChannelHandler.

ChannelHandlerустановить наChannelPipelineВ процессе:

  1. ОдинChannelInitializerРеализация зарегистрирована наServerBootstrapсередина ;
  2. когдаChannelInitializer.initChannel()При вызове методаChannelInitializerбудетChannelPipelineПри монтаже комплекта нестандартныхChannelHandler;
  3. ChannelInitializerудалить себя изChannelPipelineудалено в.

С точки зрения клиентского приложения события называются исходящими, если их движение направлено от клиента к серверу, и входящими, если это не так. На стороне сервера все наоборот.

Если сообщение или любое другое входящее событие прочитано, оно будетChannelPipelineЗаголовок начинает течь и передается первомуChannelInboundHandler. После обработки этого обработчика данные будут переданы следующему в цепочкеChannelInboundHandler. В конце концов, данные придутChannelPipeline На этом вся обработка закончена.

Исходящие события доставляются из хвоста к первому обработчику исходящих сообщений. Исходящие и входящие обработчики не мешают друг другу.

2.6, интерфейс ChannelHandlerContext

эффект должен сделатьChannelHandlerспособенChannelPipelineИ другие процессы взаимодействуют. Поскольку ChannelHandlerContext сохраняетchannelВся соответствующая контекстная информация, а также корреляцияChannelHandlerобъект, кроме того,ChannelHandlerContext может уведомитьChannelPipelineследующийChannelHandlerа также динамическое изменениеChannelPipeline .

2.7, абстрактный класс SimpleChannelInboundHandler

Мы часто можем столкнуться с приложениями, которые воспользуютсяChannelHandlerЧтобы получить расшифрованное сообщение и реализовать бизнес-логику в этом обработчике, напишите такойChannelHandler, нам просто нужно расширить абстрактный класс SimpleChannelInboundHandler< T >То есть, где тип T — это тип Java сообщения, которое мы хотим обработать.

существуетSimpleChannelInboundHandler Наиболее важным методом являетсяvoid channelRead0(ChannelHandlerContext ctx, T msg),

После того, как мы сами реализуем этот метод, полученное сообщение было декодировано.

Например:

image-20210825230526952

2.8, Bootstrap, класс ServerBootstrap

Bootstrapзначит направлять, т.NettyПриложения обычно состоят изBootstrapВ начале основная роль заключается в настройке всегоNettyпрограмма, объединяющая отдельные компоненты.

категория Bootstrap ServerBootstrap
гид для загрузки клиента Используется для управления сервером
роль в сетевом программировании Используется для подключения к удаленным хостам и портам Используется для привязки к локальному порту
Количество групп EventLoopGroups 1 2

Думаю у всех могут возникнуть сомнения по поводу последнего пункта, почему у одного 1, а у другого 2?

Поскольку серверу нужны два разных набораChannel.

Первая группа будет содержать только одинServerChannel, который представляет собственный прослушивающий сокет сервера, привязанный к некоторому локальному порту.

А вторая группа будет содержать все созданные для обработки входящих клиентских подключений (по одному на каждое принятое сервером подключение)Channel.

Это можно увидеть на блок-схеме выше.

2.9, интерфейс ChannelFuture

Результат асинхронной операции ввода/вывода канала. Все операции ввода/вывода в Netty являются асинхронными. Это означает, что любой вызов ввода-вывода будет возвращен немедленно, но нет гарантии, что запрошенная операция ввода-вывода будет завершена к моменту завершения вызова. Вместо этого вы возвращаете экземпляр ChannelFuture, который предоставляет вам информацию о результате или состоянии операции ввода-вывода. ChannelFuture либо не завершен, либо завершен. Когда начинается операция ввода-вывода, создается новый объект будущего. Новое будущее изначально незавершенное — оно ни успешно, ни неудачно, ни отменено, потому что операция ввода-вывода еще не завершена. Если операция ввода-вывода завершена успешно, неудачно или отменена, будущая операция помечается как завершенная с более конкретной информацией, такой как причина сбоя. Обратите внимание, что даже отказы и отмены считаются состояниями завершения.

Nettyвсе вIOВсе операции асинхронны, и невозможно сразу узнать, правильно ли было обработано сообщение. Но вы можете подождать, пока он завершит выполнение через некоторое время, или зарегистрировать слушателя напрямую.Futureа такжеChannelFutures, они могут зарегистрировать прослушиватель, и прослушиватель автоматически вызовет зарегистрированное событие прослушивателя, когда операция завершится успешно или неудачно.

Общие методы

  • Channel channel(), возвращает текущий прогрессIOрабочий канал
  • ChannelFuture sync(), дождитесь завершения асинхронной операции

2.10, класс ChannelOption

  1. Nettyпри созданииChannelПосле экземпляра обычно необходимо установитьChannelOptionпараметр.
  2. ChannelOptionПараметры следующие:
    • ChannelOption.SO_KEEPALIVE: Всегда оставайтесь на связи
    • ChannelOption.SO_BACKLOG: Соответствует параметру невыполненной работы в функции прослушивания протокола TCP/IP, который используется для инициализации размера очереди, доступной для подключения к серверу. Сторона сервера обрабатывает запросы на подключение клиента в последовательной обработке, поэтому запрос помещается в очередь для ожидания обработки.Когда параметр backilog указывает, что терминал прибывает, сторона сервера помещает запрос на подключение клиента, который не может быть обработан, в очередь для обработки, отставание Параметр указывает размер очереди.

3. Примеры применения

【Случай】:

Напишите сервер, два или более клиентов, клиент может общаться друг с другом.

3.1. Обработчик сервера

ChannelHandlerСуществует множество классов реализации или подинтерфейсов. Обычно мы просто идем к наследованию или подинтерфейсу, а потом переписываем методы внутри.

Здесь мы просто унаследовали SimpleChannelInboundHandler , многие из методов здесь в основном до тех пор, пока мы переписываем бизнес-логику, и большинство триггеров вызываются автоматически при возникновении события, без необходимости вызывать их вручную.

package com.crush.atguigu.group_chat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author crush
 */
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 定义一个channle 组,管理所有的channel
     * GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    /**
     * handlerAdded 表示连接建立,一旦连接,第一个被执行
     *              将当前channel 加入到  channelGroup
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //将该客户加入聊天的信息推送给其它在线的客户端
        /*
        该方法会将 channelGroup 中所有的channel 遍历,并发送 消息,
        我们不需要自己遍历
         */
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
        channelGroup.add(channel);

    }

    /**
     * 断开连接, 将xx客户离开信息推送给当前在线的客户
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
        System.out.println("channelGroup size" + channelGroup.size());

    }

    /**
     * 表示channel 处于活动状态, 既刚出生 提示 xx上线
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println(ctx.channel().remoteAddress() + " 上线了~");
    }

    /**
     * 表示channel 处于不活动状态, 既死亡状态 提示 xx离线了
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        System.out.println(ctx.channel().remoteAddress() + " 离线了~");
    }

    //读取数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

        //获取到当前channel
        Channel channel = ctx.channel();
        //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息

        channelGroup.forEach(ch -> {
            if (channel != ch) { //不是当前的channel,转发消息
                ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
            } else {//回显自己发送的消息给自己
                ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //关闭通道
        ctx.close();
    }
}

3.2 Запуск сервера сервера

package com.crush.atguigu.group_chat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author crush
 */
public class GroupChatServer {

    /**
     * //监听端口
     */
    private int port;

    public GroupChatServer(int port) {
        this.port = port;
    }

    /**
     * 编写run方法 处理请求
     * @throws Exception
     */
    public void run() throws Exception {

        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //8个NioEventLoop
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //获取到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });

            System.out.println("netty 服务器启动");

            ChannelFuture channelFuture = b.bind(port).sync();

            //监听关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new GroupChatServer(7000).run();
    }
}

3.3. Обработчик клиента

package com.crush.atguigu.group_chat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * @author crush
 */
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    
    //当前Channel 已从对方读取消息时调用。
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

3.4. Клиент-сервер

package com.crush.atguigu.group_chat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;


/**
 * @author crush
 */
public class GroupChatClient {
    
    private final String host;
    private final int port;

    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            //得到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入相关handler
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自定义的handler
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //得到channel
            Channel channel = channelFuture.channel();
            System.out.println("-------" + channel.localAddress() + "--------");
            //客户端需要输入信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                //通过channel 发送到服务器端
                channel.writeAndFlush(msg + "\r\n");
            }
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new GroupChatClient("127.0.0.1", 7000).run();
    }
}

Несколько клиентов, просто cv один раз.

3.5 Тест:

Процесс тестирования состоит в том, чтобы сначала запустить сервер сервера, а затем запустить клиент.

image-20211010092350555

image-20211010092356763

image-20211010092402794

4. Поговорите с собой

Эту статью следует считать рукописью, раньше я был занят другими вещами.

Это все для сегодняшней статьи.

привет я блоггер宁在春:Дома

Если у вас есть какие-либо сомнения в статье, оставьте сообщение или личное сообщение или добавьте контактную информацию на главную страницу, и мы ответим как можно скорее.

Если вы обнаружите какие-либо проблемы в статье, надеюсь, вы меня поправите, большое спасибо.

Если вы считаете это полезным, пожалуйста, ставьте палец вверх и уходите!

Приглашаем всех обсудить в дискуссионной зоне, увеличить свою удачу и взглянуть на периферийные устройства, официально присланные Nuggets! ! !

Чем больше вы оставите комментарий, тем больше шансов выиграть приз! ! !

Подробности 👉Проект «Звезда раскопок»