Интерлюдия: разминка исходного кода Kafka --- Java NIO

Java

предисловие

Я забыл сказать несколько слов в предисловии предыдущего поста hahh

Потому что чтение исходного кода Кафки требует определенного понимания знаний Java NIO, так что, как сказать, если вы чувствуете, что знакомы с Java, эту статью, которая также является эпизодом, можно напрямую игнорировать. Поскольку в этой статье не будет никаких сложных моментов, главное — пройти основы, чтобы последующие статьи с исходным кодом можно было читать более плавно.

1. Основы НИО

Java New IO — это новый API-интерфейс ввода-вывода, представленный в Java 1.4. Он может заменить предыдущий стандартный ввод-вывод. По сравнению с исходным вводом-выводом NIO имеет ту же функцию и назначение, но способ использования совершенно другой. NIO ориентирован на буфер. Операции ввода-вывода на основе областей и каналов, которые также делают чтение и запись более эффективными, чем традиционные операции ввода-вывода.

1.1 Основное различие между IO и NIO

IO NIO
ориентированный на поток ориентированный на буфер
блокировка ввода-вывода неблокирующий ввод-вывод
никто Селектор

1.1.1 Поток традиционного ввода-вывода

Следующие диаграммы используются для краткого понимания, втрадиционный ввод-выводКогда приложение хочет читать и записывать файлы в сети и на диске, они должны установить соединение.Что за концепция поток?Мы можем сначалаДумайте об этом как о водопроводной воде.Если вы используете водопроводную воду дома, вам нужен водопровод, чтобы вода могла поступать из водопровода в дом и играть роль в транспортировке..

Поэтому, когда данные в нашем файле необходимо ввести в приложение, они создадутвходитьтрубопровод. И когда у нашего приложения есть данные, которые необходимо записать в файловую систему, оно создаствыходКонвейер, эти два конвейера — это наш входной поток и выходной поток. Эта вода никогда не поднимается вверх по течению, поэтомуВсе они односторонние трубы. Итак, это очень хорошо понять?

1.1.2 NIO

Это также та же файловая система и приложение, но в этот раз поток заменен каналом, и теперь мы можем думать, чтоЭто железная дорога, поэтому мы знаем, что сама железная дорога не может перевозить товары, поэтому нам нужно транспортное средство --- поезд (то есть буферная зона), и данные, необходимые приложению, транспортируются этим транспортным средством, называемым буфером. зона.. Этот поезд может приходить и уходить, так чтоNIO - двунаправленная передачаиз.

1.2 Buffer

Ядро NIO лежит в двух каналах: канал и буфер. Канал — это открытое соединение с устройством ввода-вывода. При его использовании необходимо получить канал для подключения устройств ввода-вывода и буфер для хранения данных, а затем обработать данные, оперируя буфером. (На самом деле речь о картинке выше, или в одном предложении один отвечает за передачу, а другой за хранение).

Буферы определяются пакетом Java.nio, и все буферы являются подклассами абстрактного класса Buffer. Buffer основан на различных типах данных, и распространенными подклассами являются xxxBuffer (IntBuffer, DoubleBuffer и т. д.), базовые типы данных которых не являются логическими. Разные классы Buffer управляются одинаково, а методы получения объектов различаются.

// 创建一个容量为capacity的xxx类型的Buffer对象
static xxxBuffer allocate(int capacity)

А буфер предоставляет два основных метода: get() и put().Метод put предназначен для сохранения данных в буфере, а метод get — для получения данных из буфера.

Теперь давайте посмотрим на код

public class BufferTest {
    @Test
    public void testBuffer(){
        // 创建缓冲区对象
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    }
}

Нажмите на ByteBuffer, и вы увидите, что эта штука наследует класс Buffer.

public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer>

На этом этапе продолжайте нажимать на класс Buffer, и первое, что вы увидите, это наличие нескольких встроенных свойств.

1.2.1 Основные свойства буфера

① вместимость

Указывает максимальную емкость данных буфера.Это значение не может быть отрицательным. И его нельзя изменить после создания.

② предел

Индекс первых данных, которые не могут быть прочитаны или записаны, данные этого индекса недоступны для чтения или записи. Это значение не может быть отрицательным и не может превышать емкость.Как показано в третьем буфере на рисунке выше, блоки данных после нижнего индекса 5 не могут быть прочитаны или записаны, и предел равен 5.

③ положение

индекс следующих данных для чтения или записи, это значение не может быть отрицательным и не может превышать емкость, как показано во втором буфере на рисунке, первые 5 блоков были записаны, а индекс шестого блока данных равен 5, поэтому позиция равна 5

④ отметка отметка/сброс сброс

mark — это индекс, после указания конкретной позиции в Buffer через метод mark() Buffer, его можно сбросить в эту позицию через метод reset(), лучше объяснить это через код.

1.2.2 часть кода (очень простая)

1. Сначала мы создаем буферный объект, а затем печатаем его свойства

ByteBuffer byteBuffer = ByteBuffer.allocate(10);
System.out.println(byteBuffer.position());
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());

运行结果:0,10,10
2. Выполните метод put(), чтобы поместить в него символ

String str = "abcde";
byteBuffer.put(str.getBytes());
System.out.println(byteBuffer.position());
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());

运行结果:5,10,10
"abcde"长度为5,position已经变化,其它不变
3. Используйте flip() для переключения в режим чтения

byteBuffer.flip();
System.out.println(byteBuffer.position());
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());

运行结果:0,10,5

В это время позиция становится 0, потому что 5 в начале - это потому, что блок данных с индексом 5 должен быть записан в это время, а после перехода в режим чтения первым читается, очевидно, индекс 0. данные блокировать. Значение предела также стало равным 5, потому что данные, которые можно прочитать в настоящее время, были потеряны, поскольку индекс равен 5, поэтому предел равен 5.

4. Просто получить данные в буфер
byte[] array = new byte[byteBuffer.limit()];
byteBuffer.get(array);
System.out.println(new String(array,0,array.length));

运行结果:abcde
5.mark() & reset()
byte[] array = new byte[byteBuffer.limit()];
byteBuffer.get(array,0,2);
System.out.println(new String(array,0,2));
System.out.println(byteBuffer.position());

byteBuffer.mark();
byteBuffer.get(array,2,2);
System.out.println(new String(array,2,2));
System.out.println(byteBuffer.position());

byteBuffer.reset();
System.out.println(byteBuffer.position());

运行结果:ab,2,cd,4,2

На самом деле очень просто, то есть при чтении в первый раз читаются только первые два символа, а потом результат позиции 2, а потом читаются последние два символа, а позиция 4, но потому что я читаю спереди Когда я выполнял операцию пометки в 2, он автоматически возвращался в положение чтения перед моей пометкой Это так просто.

6. Некоторые другие методы

метод rewind(), повторяемое чтение,clear() очищает буфер, но очистка буфера в этом методе является забытым состоянием, то есть данные все еще существуют в буфере, но автоматически игнорируются. На данный момент данные могут быть прочитаны снова, и она все еще может быть получена путем Get ().Метод hasRemaining() указывает количество оставшихся работоспособных данных.Сколько там?Например в примере марк только что,после того как я его сбросил,осталось работоспособных данных 3,потому что я читал только ab и cde.

1.2.3 Прямые и косвенные буферы

Косвенный буфер: буфер выделяется методом allocate(). Создайте буфер в памяти JVM.

Прямой буфер: буфер выделяется методом allocateDirect() и устанавливается в физической памяти. более высокая эффективность.

① Непрямой буфер

Когда приложение хочет прочитать данные с диска, оно сначала делает запрос, чтобыФизический диск сначала считывает свои данные в адресное пространство ядра, а затем пространство ядра копирует данные в адресное пространство пользователя. Затем данные можно вернуть в приложение с помощью метода read().. И приложению нужно записать в него данные, тоже самое.Сначала запишите в адресное пространство пользователя, затем скопируйте в адресное пространство ядра, а затем запишите на диск. На данный момент нетрудно обнаружить, что эта операция копирования очень избыточна, поэтому эффективность непрямых буферов будет относительно низкой.

② Прямой буфер

Прямой буфер действительно очень прямой, как следует из названия.При записи он записывается в отображаемый файл физической памяти, а затем записывается на физический диск.При чтении диск считывает данные в этот файл, а затем читает его к приложению.в программе. Промежуточного процесса копирования нет.

1.3 channel

1.3.1 Вытяните концептуальную основу

Определяется пакетом java.nio.channels, представляет собой ссылку, открытую источником ввода-вывода и целью, не имеет возможности прямого доступа к самим данным и может взаимодействовать только с Buffer.

Традиционный ввод-вывод полностью отвечает за ЦП.В настоящее время, когда в этом проекте выполняется большое количество операций чтения файлов, загрузка ЦП будет очень низкой, поскольку операция ввода-вывода вытесняет ресурсы ЦП.

В этом контексте были сделаны некоторые оптимизации, чтобы отменить соединение с процессором и преобразовать его вDMA (прямой доступ к памяти)Путь. Конечно, сама операция прямого доступа к памяти также должна планироваться процессором. Однако эта потеря, естественно, будет намного меньше, чем большое количество IO.

В этот момент появляется понятие канала, который представляет собой полностью независимый процессор. Он специально используется для операций ввода-вывода файлов.

1.3.2 Общие каналы

Основные классы реализации, предоставляемые Java для интерфейса Channel:

FileChannel:用于读取,写入,映射和操作文件的通道
DatagramChannel:通过UDP读写网络中的数据通道
SocketChannel:通过TCP读写网络中的数据通道
ServerSocketChannel:可以监听新进来的TCP连接,对每一个新进来的连接
    都会创建一个SocketChannel

Один из способов получить канал — вызвать метод getChannel() для объекта, который поддерживает канал.Класс поддержки выглядит следующим образом.

FileInputStream
FileOutputStream
RandomAccessFile
DatagramSocket
Socket
ServerSocket

Другой способ получить его — использовать статический метод newByteChannel() класса Files для получения байтового канала. Или открыть и вернуть указанный канал через статический метод open() канала.

1.3.3 Общие методы и простота использования

① Используйте непрямой буфер для завершения копирования файла
// 创建输入输出流对象
FileInputStream fileInputStream = new FileInputStream("testPic.jpg");
FileOutputStream fileOutputStream = new FileOutputStream("testPic2.jpg");

// 通过流对象获取通道channel
FileChannel inChannel = fileInputStream.getChannel();
FileChannel outChannel = fileOutputStream.getChannel();

// 创建指定大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

// 将通道中的数据写入到缓冲区中
while (inChannel.read(byteBuffer) != -1){

    // 切换成读取模式
    byteBuffer.flip();
    // 将缓冲区中的数据写到输出通道
    outChannel.write(byteBuffer);

    // 清空缓冲区
    byteBuffer.clear();

}
//回收资源(这里为了省时间直接抛出去了,反正这段不太重要)
outChannel.close();
inChannel.close();
fileInputStream.close();
fileOutputStream.close();

运行结果:就自然是复制了一个testPic2出来啦

Поскольку сам код не сложный, комментарии написаны более подробно, поэтому расширяться не будут.


② Используйте прямой буфер для завершения копирования файла

Обратите внимание, что StandardOpenOption здесь представляет собой перечисление, представляющее режим, очевидно, что здесь должен быть выбран режим чтения READ.

FileChannel inChannel = FileChannel.open(Paths.get("testPic.jpg",StandardOpenOption.READ));
FileChannel outChannel = FileChannel.
        open(Paths.get("testPic2.jpg"),StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
// 进行内存映射
MappedByteBuffer inMappedBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMapBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

// 对缓冲区进行数据的读写操作
byte[] array = new byte[inMappedBuffer.limit()];
inMappedBuffer.get(array);
outMapBuffer.put(array);

// 回收资源
inChannel.close();
outChannel.close();

Если вам нужно посмотреть на разницу во времени между ними, просто используйте для этого самое обычное системное время, и я не буду добавлять его сюда.

2. Неблокирующая сетевая связь NIO

Традиционные потоки ввода-вывода блокируются. Когда поток вызывает чтение или запись, поток блокируется до тех пор, пока данные не будут прочитаны или записаны. В течение этого периода поток не может выполнять другие задачи. Поэтому вКогда сетевое взаимодействие завершено для операций ввода-вывода, поток блокируется, поэтому сервер должен предоставить каждому клиенту независимый поток для обработки., когда серверной части необходимо обработать большое количество клиентов, производительность резко упадет.

NIO не блокируется, когда поток читает и записывает данные из канала,Если данные недоступны, поток может выполнять другие задачи.. Потоки обычно тратят время неблокирующего ввода-вывода на выполнение операций ввода-вывода на других каналах, поэтому один поток может управлять несколькими входными и выходными каналами. Таким образом, NIO может позволить стороне сервера использовать один или ограниченное количество потоков для одновременной обработки всех клиентов, подключенных к стороне сервера.

2.1 Selector

Этот селектор фактически является регистратором, который вводит канал между клиентом и сервером.Например, сейчас мой клиент хочет передавать данные как сервер.Клиент отправит запрос на регистрацию канала в селектор, и регистрация будет завершена.После что Selector будет контролировать статус ввода-вывода (чтение, запись, подключение) этого канала. Только когда данные в канале будут полностью готовы, Селектор назначит данные потоку на стороне сервера для обработки.

Этот неблокирующий процесс может лучше использовать ресурсы ЦП. Повысить эффективность работы процессора. Это можно объяснить экспресс-доставкой. Если бы вы сказали мне в начале прийти и забрать экспресс через полчаса, а я уже прибыл в пункт назначения в это время, я мог бы просто стоять там и ждать полчаса. В этот период я ​​не могу никуда уехать, но после того, как вы приедете, вы мне позвоните и скажете, чтобы я приехал и забрал его, поэтому у меня больше свободного времени.

2.2 код (сетевая связь для блокировки IO)

Теперь давайте продемонстрируем сетевое взаимодействие блокирующего ввода-вывода.

2.2.1 клиент (блокирующий ввод-вывод)

Вы можете попробовать удалить sChannel.shutdownOutput() для этого кода.В это время вы обнаружите, что при запуске сервера и запуске клиентской программы программа также будет блокироваться, потому что сервер не может определить, отправили ли вы data. , поэтому клиентская сторона также заблокирована, а обе стороны заблокированы.

Другой способ — разблокировать, о чем будет рассказано позже.

// 1.获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("你的IP地址",9898));
// 2.创建文件通道
FileChannel inChannel = FileChannel.open(Paths.get("C:/Users/Administrator/Desktop/testPic.jpg"),StandardOpenOption.READ);
// 3.分配指定大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

// 4.发送数据,需要读取文件
while (inChannel.read(byteBuffer) != -1){
    byteBuffer.flip();
    // 将buffer的数据写入到通道中
    sChannel.write(byteBuffer);
    byteBuffer.clear();
}

// 主动告诉服务端,数据已经发送完毕
sChannel.shutdownOutput();

while (sChannel.read(byteBuffer) != -1){
        byteBuffer.flip();
        System.out.println("接收服务端数据成功···");
        byteBuffer.clear();
    }

// 5.关闭通道
inChannel.close();
sChannel.close();
2.2.2 сервер (блокирующий ввод-вывод)
// 1.获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 创建一个输出通道,将读取到的数据写入到输出通道中,保存为testPic2
FileChannel outChannel = FileChannel.open(Paths.get("testPic2.jpg"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);
// 2.绑定端口
ssChannel.bind(new InetSocketAddress(9898));
// 3.等待客户端连接,连接成功时会得到一个通道
SocketChannel sChannel = ssChannel.accept();
// 4.创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 5.接收客户端的数据存储到本地
while (sChannel.read(byteBuffer) != -1){
    byteBuffer.flip();
    outChannel.write(byteBuffer);
    byteBuffer.clear();
}

// 发送反馈给客户端
    // 向缓冲区中写入应答信息
    byteBuffer.put("服务端接收数据成功".getBytes());
    byteBuffer.flip();
    sChannel.write(byteBuffer);

// 关闭通道
sChannel.close();
outChannel.close();
byteBuffer.clear();

Затем, когда наш клиент будет запущен, будет выполнена операция копирования.

2.3 Селектор завершает неблокирующий ввод-вывод

Использование NIO для завершения сетевого взаимодействия требует трех основных объектов:

канал: интерфейс java.nio.channels.Channel,Сокетчаннел, серверсокетчаннел, Дейтаграммный канал

Связанные с трубой: Pipe.SinkChannel, Pine.SourceChannel

buffer: отвечает за хранение данных

Selector: Selector — это мультиплексор SelectableChannel, в основном используемый для мониторинга состояния ввода-вывода SelectableChannel.

2.3.1 клиент (неблокирующий)
// 1.获取通道,默认是阻塞的
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("192.168.80.1",9898));

// 1.1 将阻塞的套接字变成非阻塞
sChannel.configureBlocking(false);

// 2.创建指定大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 3.发送数据给服务端,直接将数据存储到缓冲区
byteBuffer.put(new Date().toString().getBytes());
// 4.将缓冲区的数据写入到sChannel
byteBuffer.flip();
sChannel.write(byteBuffer);
byteBuffer.clear();

// 关闭
sChannel.close();
2.3.2 сервер (неблокирующий)

Весь процесс был объяснен в комментариях к коду, поэтому я не буду его здесь подробно описывать.

// 1.获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 2.将阻塞的套接字设置为非阻塞的
ssChannel.configureBlocking(false);
// 3.绑定端口号
ssChannel.bind(new InetSocketAddress(9898));

// 4.创建选择器对象
Selector selector = Selector.open();

// 5.将通道注册到选择器上(这里的第二个参数为selectionKey),下面有解释
// 此时选择器就开始监听这个通道的接收时间,此时接收工作准备就绪,才开始下一步的操作
ssChannel.register(selector,SelectionKey.OP_ACCEPT);

// 6.通过轮询的方式获取选择器上准备就绪的事件
// 如果大于0,至少有一个SelectionKey准备就绪
while (selector.select() > 0){
    // 7.获取当前选择器中所有注册的selectionKey(已经准备就绪的监听事件)
    Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
    // 迭代获取已经准备就绪的选择键
    while (selectionKeyIterator.hasNext()){

        // 8.获取已经准备就绪的事件
        SelectionKey selectionKey = selectionKeyIterator.next();
        if (selectionKey.isAcceptable()){
            // 9.调用accept方法
            SocketChannel sChannel = ssChannel.accept();
            // 将sChannel设置为非阻塞
            // 再次强调,整个过程不能有任何一条阻塞通道
            sChannel.configureBlocking(false);

            // 进行数据接收工作,而且把sChannel也注册上选择器让选择器来监听
            sChannel.register(selector,SelectionKey.OP_READ);
        }else if (selectionKey.isReadable()){
            // 如果读状态已经准备就绪,就开始读取数据
            // 10.获取当前选择器上读状态准备就绪的通道
            SocketChannel sChannel = (SocketChannel) selectionKey.channel();
            // 11.读取客户端发送的数据,需要先创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            // 12.读取缓冲区的数据
            while (sChannel.read(byteBuffer) > 0){
                byteBuffer.flip();
                // 这里sChannel.read(byteBuffer)就是这个字节数组的长度
                System.out.println(new String(byteBuffer.array(),0,sChannel.read(byteBuffer)));

                // 清空缓冲区
                byteBuffer.clear();
            }
        }
        // 当selectionKey使用完毕需要移除,否则会一直优先
        selectionKeyIterator.remove();
    }

}

При вызове метода register для регистрации канала в селекторе событие прослушивания селектора для канала должно определяться вторым параметром ops.

读:SelectionKey.OP_READ(1)
写:SelectionKey.OP_WRITE(4)
连接:SelectionKey.OP_CONNECT(8)
接收:SelectionKey.OP_ACCEPT(16)

подобноПри регистрации есть более одного события прослушивателя, вам нужно использовать оператор бит-ИЛИ для подключения

int selectionKeySet = SelectionKey.OP_READ|SelectionKey.OP_WRITE

Что касается этого selectionKey, он представляет отношение регистрации между SelectableChannel и Selectr. Он также имеет набор соответствующих методов

2.3.3 Трансформация клиента

Введите Scanner для получения входной информации, но обратите внимание, что ввод IDEA в тестовом коде требует некоторых настроек.Конкретный метод заключается в добавлении строки в Help-Edit Custom VM Option.

-Deditable.java.test.console=true

Это позволит вам войти.

// 1.获取通道,默认是阻塞的
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("192.168.80.1",9898));

// 1.1 将阻塞的套接字变成非阻塞
sChannel.configureBlocking(false);

// 2.创建指定大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
    String str = scanner.next();
    // 3.发送数据给服务端,直接将数据存储到缓冲区
    byteBuffer.put((new Date().toString()+str).getBytes());
    // 4.将缓冲区的数据写入到sChannel
    byteBuffer.flip();
    sChannel.write(byteBuffer);
    byteBuffer.clear();
}
// 关闭
sChannel.close();

Это завершает режим вопросов и ответов сетевого общения.

2.4 Трубопровод

Канал в Java NIO — это одностороннее соединение данных между двумя потоками.Канал имеет исходный канал и канал-приемник.Данные будут записываться в приемник и получаться из источника.

// 1.获取管道
Pipe pipe = Pipe.open();

// 2.创建缓冲区对象
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 3.获取sink通道
Pipe.SinkChannel sinkChannel = pipe.sink();
byteBuffer.put("通过单向管道传输数据".getBytes());

// 4.将数据写入sinkChannel
byteBuffer.flip();
sinkChannel.write(byteBuffer);
// 5.读取缓冲区中的数据
Pipe.SourceChannel sourceChannel = pipe.source();
// 6.读取sourceChannel中的数据放入到缓冲区
byteBuffer.flip();
sourceChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(),0,sourceChannel.read(byteBuffer)));

sourceChannel.close();
sinkChannel.close();
    
运行结果就是打印了我们的那串字符"通过单向管道传输数据",没啥

finally

Примерно перечислил некоторые базовые знания NIO, контента вроде бы много, но он не предполагает слишком сложных пунктов знаний, он просто реализуется шаг за шагом. На самом деле, если вы хотите углубиться, есть еще много других точек знаний, таких как Путь, Пути и Файлы NIO2. Никаких дальнейших объяснений здесь не дается. Заинтересованные друзья могут узнать сами.