После многих попыток научиться, я наконец понял NIO!

Java

NIO — неблокирующий ввод-вывод (новый ввод-вывод)

  1. io — это потоковое программирование, которое можно использовать только как входной или выходной поток. Оно блокируется синхронно. Каждое соединение должно создавать поток для его обработки. Накладные расходы на переключение контекста потока очень велики, вызывая большой горлышко бутылки.
  2. Таким образом, псевдоблокирующий ввод-вывод, реализованный пулом потоков, в определенной степени решает проблему создания избыточных потоков, но принципиально не решает проблему блокировки, а слишком большое количество потоков и слишком маленькие пулы потоков также вызовут большое узкое место.
  3. Поскольку корневым узким местом является количество потоков и блокировка ввода-вывода, есть ли способ обрабатывать несколько клиентских подключений только с одним потоком? Вот почему NIO появился

НИО в основном имеетТри основные части:

  • буфер буфер
  • Канальный конвейер
  • Селектор селектор

nio является блочно-ориентированным, программирование буфера буфера, нижний слой представляет собой массив, буфер обеспечивает доступ к данным, чтение и запись канала в буфер, чтение и запись буфера в канал, чтение из буфера в канал программы двунаправленный

Понимание NIO требует понимания модели программирования событий.

Ядро НИО:

NIO изменился с блокировки чтения и записи (занятия потоков) на однопоточные события опроса, находя сетевые дескрипторы, которые можно читать и записывать для чтения и записи. За исключением того, что опрос событий блокируется (нечего делать, что должно быть заблокировано), остальные операции ввода-вывода являются чисто операциями ЦП, и нет необходимости включать многопоточность.

Эффективность однопоточной обработки ввода-вывода действительно очень высока, нет переключения потоков, только отчаянное чтение, запись и выбор событий.

НИО ведет нас:

  1. Модель, управляемая событиями — асинхронное программирование неотделимо от событий.
  2. Однопоточная обработка нескольких соединений — мультиплексирование делает обработку более эффективной.
  3. Неблокирующий ввод-вывод, блокировка только для получения действенных событий
  4. Передача на основе блоков более эффективна, чем передача на основе потоков
  5. Нулевая копия — DirectBuffer

недостаток:

NIO не полностью скрывает различия платформ, он по-прежнему реализуется на основе системы ввода-вывода каждой операционной системы, и различия все еще существуют. Построение модели, управляемой событиями, с использованием NIO для сетевого программирования — непростая задача, и здесь есть много подводных камней.

Рекомендуется использовать зрелый фреймворк NIO Netty.

Buffer

Буфер — это, по сути, блок памяти, в который данные могут быть записаны, а затем прочитаны из него. Этот фрагмент памяти обернут как объект буфера NIO и предоставляет набор методов для легкого доступа к этому фрагменту памяти.

Емкость, Позиция, Ограничение

<= mark <= position <= limit <= capacity

  • capacity

Как блок памяти, Buffer имеет фиксированное значение размера, также называемое "capacity". Вы можете записывать в него только байты емкости, long, char и другие типы. Когда буфер заполнен, его необходимо очистить (прочитав данные или очистив данные), чтобы продолжить запись данных в него.

  • position

Когда вы записываете данные в буфер, position представляет текущую позицию. Начальное значение позиции равно 0. Когда байтовые, длинные и другие данные записываются в буфер, позиция перемещается вперед к следующему блоку буфера, куда могут быть вставлены данные. позиция может быть до вместимости – 1.

При чтении данных они также считываются из определенного места. При переключении буфера из режима записи в режим чтения позиция сбрасывается на 0. При чтении данных из позиции буфера позиция перемещается вперед к следующей доступной для чтения позиции.

  • limit

В режиме записи предел буфера указывает, сколько данных вы можете записать в буфер. В режиме записи ограничение равно емкости буфера.

При переключении Buffer в режим чтения limit указывает, сколько данных вы можете прочитать максимум. Поэтому при переключении буфера в режим чтения лимит будет установлен на значение позиции в режиме записи. Другими словами, вы можете прочитать все ранее записанные данные (ограничение установлено на количество записанных данных, которое является позицией в режиме записи)

Один и тот же буфер может хранить данные разных типов, но вам нужно указать тип, чтобы получить его, когда вы его получите

ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.putInt(1);
buffer.putLong(387524875628742L);
buffer.putChar('s');
buffer.flip();
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getChar());

Метод put может помещать только тип byte, а не int

перевернуть, очистить, перемотать, отметить

  • flip

Метод flip переключает буфер из режима записи в режим чтения. Вызов метода flip() установит позицию обратно на 0 и установит ограничение на предыдущее значение позиции.

    public final Buffer flip() {
        this.limit = this.position;
        this.position = 0;
        this.mark = -1;
        return this;
    }

  • clear

position будет установлено обратно на 0, а предел будет установлен на значение емкости. Другими словами, буфер опустошается. Данные в буфере не очищаются, но эти флаги говорят нам, с чего начать запись данных в буфер.

    public final Buffer clear() {
        this.position = 0;
        this.limit = this.capacity;
        this.mark = -1;
        return this;
    }

  • rewind

Buffer.rewind() устанавливает позицию обратно в 0, поэтому вы можете перечитать все данные в буфере. Ограничение остается прежним, по-прежнему указывает, сколько элементов можно прочитать из буфера.

    public final Buffer rewind() {
        this.position = 0;
        this.mark = -1;
        return this;
    }

  • mark

Конкретная позиция в буфере может быть отмечена. Затем эту позицию можно восстановить, вызвав метод Buffer.reset().

    public final Buffer mark() {
        this.mark = this.position;
        return this;
    }

  • фрагментация

Разделите буфер на буфер в соответствии с установленной позицией и пределом, со своей собственной позицией, пределом и емкостью, и данные совместно используют данные буфера адреса памяти.

public static void test2(){
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        for(int i=0;i<buffer.capacity();i++){
            buffer.put((byte)i);
        }
        buffer.position(10);
        buffer.limit(20);
        ByteBuffer buffer1 = buffer.slice();//buffer分片
        for(int m=0;m<buffer1.capacity();m++){
            byte b = buffer1.get();
            System.out.print(b+" ");
        }
    }

输出:
10 11 12 13 14 15 16 17 18 19

ReadOnlyBuffer

Обычный буфер (доступный для чтения и записи) может быть преобразован в буфер только для чтения в любое время, но буфер только для чтения не может быть преобразован в обычный буфер.

ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();

Преобразованный буфер является новым буфером только для чтения с независимой позицией, лимитом и емкостью.

DirectBuffer

堆外内存buffer,本地JNI非JVM堆内存buffer,允许直接访问

Обычный ByteBuffer управляется JVM, а память выделяется в куче JVM.

ByteBuffer buf = ByteBuffer.allocate(1024);

DirectBuffer будет размещен в локальной памяти, вне управления кучей JVM.

ByteBuffer buf = ByteBuffer.allocateDirect(1024);

почему ты хочешь сделать это?

----------Опять GC---------

Все мы знаем, что JVM возьмет GC старого поколения в кучу.标记-整理Стратегия приведет к изменению адреса объекта в куче памяти, и сборщику мусора будет сложно его организовать, когда буфер слишком велик.

так оно и оказалосьDirectBuffer, который используетunsafe.allocateMemoryВыделение памяти — это собственный метод, который используется буфером.addressВ переменную записывается адрес этой памяти для обеспечения доступа

сравнивать

  • DirectBuffer: выделение собственного метода, очевидно, не так быстро, как выделение кучи JVM, но требуетIOа также网络IOЕсли это так, DirectBuffer работает быстрее.

DirectByteBuffer наследует MappedByteBuffer.

Использование кеша может использовать DirectByteBuffer и HeapByteBuffer. Если используется DirectByteBuffer, как правило, копия из системного пространства в пространство пользователя может быть уменьшена.

В случае небольших и средних приложений с относительно небольшим объемом данных вы можете рассмотреть возможность использования heapBuffer; в противном случае вы можете использовать directBuffer.

MappedByteBuffer

ByteBuffer сопоставляется с памятью вне кучи, DirectByteBuffer наследует этот класс для реализации выделения памяти вне кучи.

Сопоставьте буфер с памятью вне кучи следующим образом.

MappedByteBuffer mappedByteBuffer = channel.map(MapMode.READ_WRITE, 0, channel.size());

Использовать копии файлов:

RandomAccessFile in = new RandomAccessFile("nio/1.txt", "rw");
RandomAccessFile out = new RandomAccessFile("nio/2.txt", "rw");
FileChannel inChannel = in.getChannel();
FileChannel outChannel = out.getChannel();
MappedByteBuffer inputData = inChannel.map(FileChannel.MapMode.READ_ONLY,0,new File("nio/1.txt").length());
Charset charset = Charset.forName("utf-8");//编码
CharsetDecoder decoder = charset.newDecoder();
CharsetEncoder encoder = charset.newEncoder();
CharBuffer charBuffer = decoder.decode(inputData);
ByteBuffer buffer = encoder.encode(charBuffer);
outChannel.write(buffer);
in.close();out.close();

Канал — канал

FileChannel

Канал, подключенный к файлу, предоставленному NIO для чтения и записи файлов.

При использовании FileChannel вам необходимо输入输出流或者RandomAccessFileПолучить FILEChannel в

  • Если вы хотите прочитать данные из FileChannel, вам нужно применить ByteBuffer для чтения данных из FileChannel в буфер ByteBuffer,read()Возвращает, сколько байт было прочитано, если возвращает -1, файл достиг конца
  • Если вы хотите записать данные в FileChannel, вам нужно сначала записать данные в ByteBuffer, а затем записать из ByteBuffer в FileChannel, вызовитеwrite()метод

Обратите внимание, что Buffer.flip () требуется между чтением и записью;

пример:

1. Прочитайте данные файла и распечатайте

FileInputStream fileInputStream = new FileInputStream("1.log");
FileChannel channel = fileInputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);;
channel.read(byteBuffer);
byteBuffer.flip();
while(byteBuffer.remaining()>0){
    byte b = byteBuffer.get();
    System.out.println((char) b);
}
fileInputStream.close();

2. Запишите данные 1.txt в 2.txt

FileInputStream inputStream = new FileInputStream("1.txt");
FileChannel in = inputStream.getChannel();
FileOutputStream outputStream = new FileOutputStream("2.txt");
FileChannel out = outputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while(true){
    byteBuffer.clear();//没有的话会一直读取
    int read = in.read(byteBuffer);
    System.out.println("read:"+read);
    if(read==-1){
        break;//为-1表示文件结束 返回
    }
    byteBuffer.flip();
    out.write(byteBuffer);
}
inputStream.close();
outputStream.close();

ServerSockerChannel

NIO предоставляет канал, который может прослушивать новые входящие соединения TCP, т.е.ServerSocketChannel, соответствующий ИОServerSocket

  • Канал открытого монитора
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    //do something with socketChannel...
}

SocketChannel

Канал, предоставляемый NIO для подключения к сокету TCP,SocketChannel, соответствующий ИОSocket

  • Откройте SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));

Канал читать и писать

  • читать
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

  • Напишите
ByteBuffer writeBuffer = ByteBuffer.allocate(48);
String msg = "hello";
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
channel.write(writeBuffer);

  • прочти и напиши
ByteBuffer buffer = ByteBuffer.allocate(1024);
int byteRead = channel.read(buffer);
if(byteRead<=0){
    channel.close();
    break;
}
buffer.flip();
channel.write(buffer);
read += byteRead;
buffer.clear();

После каждой записи в буфер, если данные буфера не нужно использовать повторно, рекомендуется очищать буфер и готовиться к следующей операции записи

Селектор — мультиплексор (селектор)

Мультиплексор, название очень яркое, использует поток для обработки нескольких каналов, тем самым управляя несколькими каналами.

Зачем использовать один поток для управления несколькими каналами?

Переключение контекста потока обходится дорого, а меньшее количество потоков более эффективно для обработки каналов.

Создать селектор — создать конкурс

Selector selector = Selector.open();

Зарегистрируйтесь на канале — купите входной билет

Канал передает события канала селектору, регистрируясь в селекторе, и возвращает SelectionKey.

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

  • При использовании с селектором канал должен быть в非阻塞模式Вниз. Это означает, что FileChannel нельзя использовать с Selector, потому что FileChannel нельзя переключить в неблокирующий режим.
channel.configureBlocking(false);

  • Получить канал и селектор и подготовленные события через SelectionKey
Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

Селектор выполняет выборку — берет запись и вовлекается

После регистрации канала в Selector мы можем использоватьSelector.select();Метод получает готовый канал и возвращает целое число типа int, указывающее количество готовых каналов.

пройти черезselector.selectedKeys();Метод получает готовый SelectionKey, а затем получает канал и селектор через SelectionKey.Как правило, для обхода этих подготовленных каналов используются итераторы.

Каждый раз при обработке SelectionKey его надо удалять из итератора.После обработки этот SelectionKey бесполезен,так же как входной билет,через него можно зайти на арену и он имеет соответствующую информацию о входе и месте на нем,можно больше не выполнять с ним допустимых действий после окончания матча.

  • После просмотра игры организатор не соберет все билеты, с ними нужно разобраться самостоятельно, нельзя мусорить на площадке, нужно выбросить их в мусорное ведро или забрать домой
iterator.remove();

  • метод пробуждения()

Поток блокируется после вызова метода select(). Даже если ни один канал не готов, он не может вернуться. Метод wakeUp возвращается немедленно.

Разбрасывать, собирать

Scatter/gather часто используется в ситуациях, когда передаваемые данные нужно обрабатывать отдельно, например, при передаче сообщения, состоящего из заголовка сообщения и тела сообщения, вы можете разбросать тело сообщения и заголовок сообщения по разным буферам, чтобы вы можете легко обрабатывать заголовки и тела сообщений.

Scatter

Разбросанное (scatter) чтение из Канала относится к записи прочитанных данных в несколько буферов во время операции чтения. Таким образом, Канал «разбрасывает» данные, считанные из Канала, по нескольким Буферам.

Gather

Сбор в канал относится к записи данных из нескольких буферов в один и тот же канал во время операции записи, поэтому канал «собирает» данные из нескольких буферов и отправляет их в канал.

Пример: Используйте три буфера длиной 3, 4, 5 для хранения входной строки, первые 3 символа хранятся в первом буфере, 4-7 символов хранятся во втором буфере, длина 4, 8-12 Хранится в третий буфер длиной 5

ServerSocketChannel serverSocketChannel =  ServerSocketChannel.open();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(8899);
        serverSocketChannel.socket().bind(inetSocketAddress);
        int messageLength = 3 + 4 + 5;
        ByteBuffer[] byteBuffer = new ByteBuffer[3];
        byteBuffer[0] = ByteBuffer.allocate(3);
        byteBuffer[1] = ByteBuffer.allocate(4);
        byteBuffer[2] = ByteBuffer.allocate(5);
        SocketChannel socketChannel = serverSocketChannel.accept();
        while (true){
            int byteRead = 0;
            while (byteRead<messageLength){
                long r = socketChannel.read(byteBuffer);
                byteRead += r;
                System.out.println("byteread:"+byteRead);
                Arrays.stream(byteBuffer).map(o->"position:"+o.position()+",limit:"+o.limit()).forEach(System.out::println);
            }

            Arrays.stream(byteBuffer).forEach(Buffer::flip);

            int byteWrite = 0;
            while(byteWrite<messageLength){
                long r = socketChannel.write(byteBuffer);
                byteWrite += r;
                System.out.println("bytewrite:"+byteWrite);
                Arrays.stream(byteBuffer).map(o->"position:"+o.position()+",limit:"+o.limit()).forEach(System.out::println);
            }

            Arrays.stream(byteBuffer).forEach(Buffer::clear);
        }

测试:使用linux nc localhost 8899测试
输入:helloworld回车 
输出:
byteread:11
position:3,limit:3
position:4,limit:4
position:4,limit:5
解释:
回车算一个字符一共11个字符,前三个存储到第一个buffer了,存满了;中间四个存储到第二个buffer,存满了;剩下多余的存储到第三个buffer,没有存满

клиент NIO-сервера

Эта программа демонстрирует использование NIO для создания чата, сервер соединяется с несколькими клиентами, и клиенты могут отправлять сообщения друг другу.

  • сервер сервер
/**
 * 可以直接使用 linux nc命令当做客户端
 * nc localhost 端口
 */
public class Server {
    private static Map<SocketChannel,String> clientMap = new HashMap<>();
    public static void main(String[] args) throws IOException {
        //打开服务器channel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置非阻塞 即将使用selector
        serverSocketChannel.configureBlocking(false);
        //获取服务器的socket
        ServerSocket serverSocket = serverSocketChannel.socket();
        //绑定端口
        serverSocket.bind(new InetSocketAddress(8089));
        //打开一个多路复用器,使用一条线程处理客户端channel
        Selector selector = Selector.open();
        //注册服务器channel到
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true){
            //阻塞获取channel事件
            //一旦调用了 select() 方法,并且返回值表明有一个或更多个通道就绪了
            int num = selector.select();
            /**
             * 获取到后 拿到多路复用器的SelectionKey 核心方法channel获取注册在起上的channel
             * SelectionKey 每次注册一个channel都会创建一个SelectionKey 其中常量定义channel状态
            **/
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            //对其中每一个SelectionKey进行操作
            selectionKeys.forEach(selectionKey->{
                    try {
                        //如果该服务器SelectionKey被接收
                        if(selectionKey.isAcceptable()){
                            //拿到服务器channel
                            ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
                            SocketChannel client = null;
                            //拿到本次连接上服务器的客户端
                            client = server.accept();
                            client.configureBlocking(false);
                            //把客户端注册到多路复用器,监听客户端的可读事件
                            client.register(selector,SelectionKey.OP_READ);
                            //为每个客户端分配id
                            String key = "["+ UUID.randomUUID()+"]";
                            clientMap.put(client,key);
                            //如果SelectionKey读就绪,执行读操作
                        }else if(selectionKey.isReadable()){
                            //拿到channel
                            SocketChannel channel = (SocketChannel) selectionKey.channel();
                            //创建读buffer
                            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                            //读取channel中数据到读buffer
                            int read = channel.read(readBuffer);
                            String reMsg = "";
                            //如果有数据
                            if(read>0){
                                //翻转进行写操作
                                readBuffer.flip();
                                //制定解码集utf-8,对读buffer解码打印
                                Charset charset = Charset.forName("utf-8");
                                reMsg = String.valueOf(charset.decode(readBuffer).array());
                                System.out.println(clientMap.get(channel)+" receive: "+reMsg);
                            }else if(read==-1) channel.close();//如果客户端关闭就关闭客户端channel
                            //群发:发送数据到其他客户端channel
                            for(SocketChannel ch:clientMap.keySet()){
                                if(ch!=channel) {
                                    String key = clientMap.get(ch);
                                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                    writeBuffer.put(("来自"+key + ":" + reMsg).getBytes());
                                    writeBuffer.flip();
                                    ch.write(writeBuffer);
                                }
                            }
                        }

                    } catch (IOException e) {
                        e.printStackTrace();

                }
            });
            selectionKeys.clear();//每次处理完一个SelectionKey的事件,把该SelectionKey删除
        }
    }
}

  • клиент
public class Client {
    public static void main(String[] args) throws IOException {
        //打开客户端channel
        SocketChannel socketChannel = SocketChannel.open();
        //设置为非阻塞模式,可以配合selector使用
        socketChannel.configureBlocking(false);
        //打开selector
        Selector selector = Selector.open();
        //注册客户端channel到多路复用器,监听连接事件
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        //连接到指定地址
        socketChannel.connect(new InetSocketAddress("localhost",8089));
        while (true){
            try{
                    //执行selector方法,阻塞获取channel事件的触发
                    int num = selector.select();
                    //获取注册到多路复用器上的SelectionKey
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    //通过迭代器遍历SelectionKey
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        //如果SelectionKey触发的事件为连接就绪
                        if(selectionKey.isConnectable()){
                            //拿到SelectionKey的客户端channel
                            SocketChannel client = (SocketChannel) selectionKey.channel();
                            if(client.isConnectionPending()){
                                //完成连接
                                client.finishConnect();
                                //新建一个写buffer
                                ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                //写入客户端连接成功消息
                                writeBuffer.put((client.toString()+":连接成功!").getBytes());
                                //翻转读写操作 执行写操作
                                writeBuffer.flip();
                                //写入buffer数据刅客户端
                                client.write(writeBuffer);
                                //开辟一个线程写,因为标准输入是阻塞的,当前线程不能阻塞写
                                ExecutorService executorService = Executors.newSingleThreadExecutor();
                                executorService.submit(()->{
                                    while (true){
                                        writeBuffer.clear();
                                        InputStreamReader reader = new InputStreamReader(System.in);
                                        BufferedReader br = new BufferedReader(reader);
                                        String msg = br.readLine();
                                        //每次读入一行,写入数据到buffer并且写入客户端channel
                                        writeBuffer.put(msg.getBytes());
                                        writeBuffer.flip();
                                        client.write(writeBuffer);
                                    }
                                });
                            }
                            //注册客户端可读事件到多路复用器
                            client.register(selector,SelectionKey.OP_READ);
                            //如果多路复用器上的SelectionKey处于读就绪状态
                        }else if(selectionKey.isReadable()){
                            //拿到SelectionKey触发相应事件对应的客户端channel,执行读操作
                            SocketChannel client = (SocketChannel) selectionKey.channel();
                            //创建一个新的读buffer,
                            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                            //从准备好读操作的channel中读取数据
                            int count = client.read(readBuffer);
                            if (count>0){
                                //转码并数据使用String保存且打印
                                String reMsg = new String(readBuffer.array(),0,count);
                                System.out.println(reMsg);
                            }else if(count==-1) client.close();//关闭客户端
                        }
                    }
                    selectionKeys.clear();//每次处理完一个SelectionKey的事件,把该SelectionKey删除
                }
            catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

  • контрольная работа

1. Создайте сервер и три клиента

2. Клиенты 1, 2 и 3 отправляют данные соответственно

Сервер получает информацию о соединении, а три клиента отправляют информацию.

Клиент 1 создается первым, получает 2,3 информацию о соединении и 2,3 информацию об отправке

Клиент 2 создается до 3, получает 3 информации о соединении и 1,3 информацию об отправке

Клиент 3 создается последним и может получить только 1,2 для отправки информации

3. Теперь используйте команду nc для создания клиента

Отправить информацию, клиент может получить

Клиент 2 отправляет информацию, терминальный клиент также может получать

Пример NIO — передача данных через порты — мультисервер

Цель достижения: сервер прослушивает два порта, один 8089, один 8090, 8089 имеет только одно основное соединение клиента A, 8090 имеет несколько соединений клиента B, а клиент A получает сообщения, отправленные несколькими соединениями клиента B, для достижения межпортового сообщения. пересылка

  • Сервер

Давайте сначала посмотрим на сервер.Сначала серверу нужно слушать два порта.Мы создаем два канала сервера.После того как сервер получит соединение,он прослушивает событие данных,отправляемое клиентом Б(т.е. ); принять После сообщения клиенту B отправить его клиенту A

Как сервер отправляет данные клиенту А?

Сохраните набор клиентских каналов, назначьте разные конечные части идентификатора для разных клиентов порта, клиент A назначается как wxq], клиент B назначается как gzh] и сохраняет его в HashMap при создании их канала, канал как ключ, идентификатор сохраняется как значение

Давайте поговорим о процессе на стороне сервера:

  1. Создайте два канала сервера и привяжите разные порты
  2. Создайте селектор мультиплексора, зарегистрируйте два сервера с помощью селектора и прослушайте приемлемое событие.
  3. Выполните метод selector.select(), получите коллекцию SelectionKey и продолжайте обрабатывать различные события.
    1. Если событие接收就绪,пройти черезSelectionKey.channel()Метод получает канал сервера и регистрирует разные события прослушивания в соответствии с разными портами.Если это 8090, это означает, что соединение клиента Б завершено, получаем канал клиента Б и слушаем его.可读事件, а к gzh присвойте суффикс id и сохраните его; если это серверный канал порта 8089, то это означает, что соединение клиента А завершено, и канал клиента клиент А слушает его可写事件, и назначьте суффикс идентификатора для wxq], сохраните его в хэш-карте
    2. если событие读就绪, указывающий, что клиент B завершил операцию записи данных и может прочитать данные клиента B и выполнить чтение; сначала прочитать и записать данные вreadBuffer,использоватьnew String(readBuffer.array()Создайте сообщение для отправки, пройдитесь по ключу клиентского канала, если суффикс wxq], это означает клиента A, затем запишите данные в writeBuffer и запишите данные в канал клиента A.
  4. Каждый раз, когда выполняется событие SelectionKey, удаляйте SelectionKey

Код:

public class Server {
    private static int CAPACITY = 1024;
    private static ByteBuffer readBuffer = ByteBuffer.allocate(CAPACITY);
    private static ByteBuffer writeBuffer = ByteBuffer.allocate(CAPACITY);
    private static Map<SocketChannel,String> clientMap = new HashMap<>();

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannelWxq = ServerSocketChannel.open();
        ServerSocketChannel serverSocketChannelGzh = ServerSocketChannel.open();
        serverSocketChannelGzh.configureBlocking(false);
        serverSocketChannelWxq.configureBlocking(false);
        ServerSocket serverSocketWxq = serverSocketChannelWxq.socket();
        ServerSocket serverSocketGzh = serverSocketChannelGzh.socket();
        serverSocketWxq.bind(new InetSocketAddress(8089));
        System.out.println("监听8089:微信墙服务端口");
        serverSocketGzh.bind(new InetSocketAddress(8090));
        System.out.println("监听8090:公众号服务端口");
        Selector selector = Selector.open();
        serverSocketChannelWxq.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChannelGzh.register(selector, SelectionKey.OP_ACCEPT);
        while (true){
            int num = selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            selectionKeys.forEach(selectionKey->{
                try {
                    if(selectionKey.isAcceptable()){
                        ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
                        SocketChannel client = null;
                        client = server.accept();
                        client.configureBlocking(false);
                        String key = "";
                        if(server==serverSocketChannelGzh) {//如果是公众号server,注册其客户端的可读事件
                            client.register(selector, SelectionKey.OP_READ);
                            key = "["+ UUID.randomUUID()+":gzh]";
                        }else if(server==serverSocketChannelWxq){//如果是
                            client.register(selector,SelectionKey.OP_WRITE);
                            key = "["+ UUID.randomUUID()+":wxq]";
                        }
                        System.out.println(key+":连接成功!");
                        clientMap.put(client,key);
                    }else if(selectionKey.isReadable()){
                        SocketChannel channel = (SocketChannel) selectionKey.channel();
                        readBuffer.clear();
                        int read = 0;
                        while(true){
                            int byteRead = channel.read(readBuffer);
                            if(byteRead<=0){
                                break;
                            }
                            readBuffer.flip();
                            channel.write(readBuffer);
                            read += byteRead;
                            readBuffer.clear();
                        }
                        String reMsg = new String(readBuffer.array(),0,read);
                        System.out.println(clientMap.get(channel)+" send to wxq: "+reMsg);
                        //写入微信墙服务
                        for(SocketChannel ch:clientMap.keySet()){
                            if(ch!=channel) {
                                String key = clientMap.get(ch);
                                if(key.endsWith("wxq]")) {
                                    writeBuffer.clear();
                                    writeBuffer.put(("来自" + clientMap.get(channel) + ":" + reMsg).getBytes(StandardCharsets.UTF_8));
                                    writeBuffer.flip();
                                    ch.write(writeBuffer);
                                }
                            }
                        }
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            selectionKeys.clear();//每次处理完一个SelectionKey的事件,把该SelectionKey删除
        }
    }
}

На этом сервер готов, можно использовать linux или winncКоманда подключается к серверу, имитируя клиента A и клиента B для отправки сообщений.

После того, как клиент отправит сообщение, он напишет его, потому что я записываю сообщение в буфер клиента B после получения сообщения.

  • Клиент Б — отправить сообщение

Клиент B отвечает за отправку сообщений, а главное событие отвечает за запись данных

процесс:

  1. Создайте клиентский каналSocketChannel, открыть селектор мультиплексора, привязать可连接事件, подключитесь к порту 8090 прослушивания на сервере
  2. воплощать в жизньselector.select()метод, обработка连接就绪а также写就绪两个事件
    1. Если событие连接就绪, просто получить канал, выполнитьfinishConnectМетод завершает соединение, и регистрируется событие прослушивателя.可写事件
    2. Если событие写就绪, выполнить операцию записи, прочитать ввод с консоли, используя стандартный ввод, и записать в буфер записи черезchannel.write()способ записи данных клиенту
  3. SelectionKey события очистки

Код:

public class GzhClient {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress("localhost",8090));
        while (true){
            try{
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if(selectionKey.isConnectable()){
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        if(client.isConnectionPending()){
                            client.finishConnect();
                        }
                        client.register(selector,SelectionKey.OP_WRITE);
                    }else if(selectionKey.isWritable()){
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                        writeBuffer.clear();
                        InputStreamReader reader = new InputStreamReader(System.in);
                        BufferedReader br = new BufferedReader(reader);
                        String msg = br.readLine();
                        //每次读入一行,写入数据到buffer并且写入客户端channel
                        writeBuffer.put(msg.getBytes());
                        writeBuffer.flip();
                        client.write(writeBuffer);
                    }
                }
                selectionKeys.clear();//每次处理完一个SelectionKey的事件,把该SelectionKey删除
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

  • Клиент A — получает сообщение, перенаправленное сервером.

Клиент А отвечает за отправку сообщений, а главное событие отвечает за чтение данных

процесс:

  1. Создайте клиентский каналSocketChannel, открыть селектор мультиплексора, привязать可连接事件, подключитесь к порту 8089, где сервер прослушивает
  2. воплощать в жизньselector.select()метод, обработка连接就绪а также读就绪两个事件
    1. Если событие连接就绪, просто получить канал, выполнитьfinishConnectМетод завершает соединение, и регистрируется событие прослушивателя.可写事件
    2. Если событие读就绪, выполнить операцию чтения и использовать данные в каналеread()Метод считывает в readBuffer черезnew String(readBuffer.array()Метод получает данные типа String и выводит их на консоль.
  3. SelectionKey события очистки

Код:

public class WxQClient {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress("localhost",8089));
        while (true){
            try{
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if(selectionKey.isConnectable()){
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        if(client.isConnectionPending()){
                            client.finishConnect();
                        }
                        client.register(selector,SelectionKey.OP_READ);
                    }else if(selectionKey.isReadable()){
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        int count = client.read(readBuffer);
                        if (count>0){
                            String reMsg = new String(readBuffer.array(),0,count);
                            System.out.println(reMsg);
                        }else if(count==-1) client.close();//关闭客户端
                    }
                }
                selectionKeys.clear();//每次处理完一个SelectionKey的事件,把该SelectionKey删除
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

На данный момент наш сервер и клиент AB завершены, теперь давайте тестировать

  1. Запустите сервер, запустите один WxQClient, то есть ClientA, и запустите два GzhClients, то есть ClientB

Сервер показывает, что соединение успешно

  1. Клиент Б отправляет сообщение

Сервер получает сообщение, распечатывает его и пересылает клиенту А, а клиент А распечатывает сообщение.