предисловие
Эта статья подробно объяснит постепенное обогащение функций NIO, прокладывая путь к объяснению библиотеки Reactor-Netty.
О методологии программирования на Java: совместное использование видео Reactor и Webflux, Rxjava и Reactor завершены, адрес станции b следующий:
Интерпретация исходного кода Rxjava и совместное использование:вооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо
Интерпретация и совместное использование исходного кода Reactor:вооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо
замена сцены
Продолжение предыдущей статьиНекоторые сведения об исходном коде BIO to NIO BIO, давайте узнаем кое-что о NIO.
В предыдущей статье мы видели, что нам нужно добиться асинхронности и неблокировки.То, что мы делаем сами, — это создание пула потоков и изменение времени ожидания некоторого кода для подключения к клиенту, но недостатки также очень очевидны. Давайте изменим наше мышление. Вот пример сценария, класс A и класс B должны выполнить задание один на один, каждая пара людей получает разные задания, и время, затраченное на них, различается. Поскольку задание имеет вознаграждение, учащиеся возьмут, в традиционном режиме учащиеся в классе A и классе B должны идти вместе, даже если это просто задача обнаружения сердцебиения без управления, В этом случае у клиента вообще нет данных для отправки, он просто хочет сказать самому серверу Он еще жив.В таком случае,если есть еще одноклассник в классе Б сделать стыковку будет очень проблематично.Каждого одноклассника в классе Б можно рассматривать как нить на стороне сервера. Итак, нам нужен менеджер, поэтомуSelector
Оказалось, что нам, как менеджерам, здесь часто нужно управлять статусом учеников, ждут ли они заданий, получают ли информацию, выводят ли информацию и т. д.,Selector
Он больше фокусируется на действиях, и для этих меток состояний достаточно делать вещи, на самом деле этими метками состояний тоже нужно управлять, поэтомуSelectionKey
Оно тоже возникло. Затем нам нужно улучшить упаковку этих студентов, чтобы носить такие ярлыки. Точно так же для одноклассников мы должны еще больше развязать руки, например, дать им компьютер, чтобы ученики могли делать больше вещей, тогда этот компьютер является существованием буфера здесь.
Итак, в NIO в основном три роли,Buffer
буфер,Channel
ряд,Selector
С селектором мы разобрались, теперь мы будем шаг за шагом анализировать и интерпретировать его исходный код.
Интерпретация канала
Предоставьте каналу возможность быть асинхронным и прерываемым
Из приведенного выше видно, что студенты фактически представляют один за другим.Socket
существования, то здесьChannel
Это усиленная упаковка для него, котораяChannel
Конкретная реализация должна иметьSocket
С этим полем все в порядке, и тогда конкретный класс реализации также тесно окружен им.Socket
Имеет функцию сделать статью. Итак, давайте сначала посмотрим наjava.nio.channels.Channel
Настройка интерфейса:
public interface Channel extends Closeable {
/**
* Tells whether or not this channel is open.
*
* @return {@code true} if, and only if, this channel is open
*/
public boolean isOpen();
/**
* Closes this channel.
*
* <p> After a channel is closed, any further attempt to invoke I/O
* operations upon it will cause a {@link ClosedChannelException} to be
* thrown.
*
* <p> If this channel is already closed then invoking this method has no
* effect.
*
* <p> This method may be invoked at any time. If some other thread has
* already invoked it, however, then another invocation will block until
* the first invocation is complete, after which it will return without
* effect. </p>
*
* @throws IOException If an I/O error occurs
*/
public void close() throws IOException;
}
Здесь очень прямая настройка, судя по тому, находится ли Канал в открытом состоянии, и действие по закрытию Канала, о нем мы поговорим далее.ClosedChannelException
как это происходит конкретно в коде.
Иногда Канал может быть закрыт и прерван асинхронно, что нам и нужно. Итак, чтобы добиться этого эффекта, мы должны установить интерфейс, который может выполнять этот эффект. Достигаемый конкретный эффект должен заключаться в том, что если поток выполняет операции ввода-вывода в канале, который реализует этот интерфейс, другой поток может вызвать метод закрытия канала. В результате блокирующий поток, выполняющий операцию ввода-вывода, получитAsynchronousCloseException
аномальный.
Аналогично следует рассмотреть другую ситуацию: если поток выполняет операции ввода-вывода в канале, реализующем этот интерфейс, другой поток может вызвать заблокированный поток.interrupt
метод(Thread#interrupt()
), вызывая закрытие канала, заблокированный поток должен получитьClosedByInterruptException
исключение и установить статус прерывания на заблокированный поток.
В это время, если для потока установлено состояние прерывания, а канал над ним снова вызывает операцию блокировки ввода-вывода, то канал будет закрыт, и поток немедленно получит операцию блокировки ввода-вывода.ClosedByInterruptException
исключение, его состояние прерывания остается неизменным.
Этот интерфейс определяется следующим образом:
public interface InterruptibleChannel
extends Channel
{
/**
* Closes this channel.
*
* <p> Any thread currently blocked in an I/O operation upon this channel
* will receive an {@link AsynchronousCloseException}.
*
* <p> This method otherwise behaves exactly as specified by the {@link
* Channel#close Channel} interface. </p>
*
* @throws IOException If an I/O error occurs
*/
public void close() throws IOException;
}
Его конкретная реализация для упомянутой выше логики находится вjava.nio.channels.spi.AbstractInterruptibleChannel
Для анализа этого класса, давайте обратимся к этой статьеInterruptibleChannel и прерываемый ввод-вывод
Дайте каналу возможность быть мультиплексированным
Мы сказали ранее,Channel
возможноSelector
использовать, покаSelector
основан наChannel
состояние для назначения задач, затемChannel
должен предоставить регистрациюSelector
метод выше, приходите иSelector
связывать. то естьChannel
экземпляр для вызоваregister(Selector,int,Object)
. Обратите внимание, потому чтоSelector
должен управляться в соответствии со значением состояния, поэтому этот метод вернетSelectionKey
объект для представления этогоchannel
существуетselector
на статус. оSelectionKey
, он содержит много вещей, поэтому я не буду упоминать его здесь.
//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (!isOpen())
throw new ClosedChannelException();
synchronized (regLock) {
if (isBlocking())
throw new IllegalBlockingModeException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}
//java.nio.channels.spi.AbstractSelectableChannel#addKey
private void addKey(SelectionKey k) {
assert Thread.holdsLock(keyLock);
int i = 0;
if ((keys != null) && (keyCount < keys.length)) {
// Find empty element of key array
for (i = 0; i < keys.length; i++)
if (keys[i] == null)
break;
} else if (keys == null) {
keys = new SelectionKey[2];
} else {
// Grow key array
int n = keys.length * 2;
SelectionKey[] ks = new SelectionKey[n];
for (i = 0; i < keys.length; i++)
ks[i] = keys[i];
keys = ks;
i = keyCount;
}
keys[i] = k;
keyCount++;
}
После регистрации вSelector
, канал останется зарегистрированным до тех пор, пока он не будет отменен. При отмене регистрации все ресурсы, выделенные Селектором Каналу, будут освобождены.
То есть Канал напрямую не предоставляет метод дерегистрации, так что давайте изменим образ мыслей: мы можем отменить ключ на Селекторе от имени его регистрации. Здесь вы можете позвонитьSelectionKey#cancel()
метод явной отмены ключа. затем вSelector
Отмените регистрацию канала во время следующей операции выбора.
//java.nio.channels.spi.AbstractSelectionKey#cancel
/**
* Cancels this key.
*
* <p> If this key has not yet been cancelled then it is added to its
* selector's cancelled-key set while synchronized on that set. </p>
*/
public final void cancel() {
// Synchronizing "this" to prevent this key from getting canceled
// multiple times by different threads, which might cause race
// condition between selector's select() and channel's close().
synchronized (this) {
if (valid) {
valid = false;
//还是调用Selector的cancel方法
((AbstractSelector)selector()).cancel(this);
}
}
}
//java.nio.channels.spi.AbstractSelector#cancel
void cancel(SelectionKey k) {
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}
//在下一次select操作的时候来解除那些要求cancel的key,即解除Channel注册
//sun.nio.ch.SelectorImpl#select(long)
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
//重点关注此方法
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
//重点关注此方法
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
//sun.nio.ch.WindowsSelectorImpl#doSelect
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue();
//重点关注此方法
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
...
}
/**
* sun.nio.ch.SelectorImpl#processDeregisterQueue
* Invoked by selection operations to process the cancelled-key set
*/
protected final void processDeregisterQueue() throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
i.remove();
// remove the key from the selector
implDereg(ski);
selectedKeys.remove(ski);
keys.remove(ski);
// remove from channel's key set
deregister(ski);
SelectableChannel ch = ski.channel();
if (!ch.isOpen() && !ch.isRegistered())
((SelChImpl)ch).kill();
}
}
}
}
Здесь, когда Канал закрыт, либо по телефонуChannel#close
Или закройте Канал, прервав поток, это неявно отменит все ключи о Канале, а также вызовет его внутренне.k.cancel()
.
//java.nio.channels.spi.AbstractInterruptibleChannel#close
/**
* Closes this channel.
*
* <p> If the channel has already been closed then this method returns
* immediately. Otherwise it marks the channel as closed and then invokes
* the {@link #implCloseChannel implCloseChannel} method in order to
* complete the close operation. </p>
*
* @throws IOException
* If an I/O error occurs
*/
public final void close() throws IOException {
synchronized (closeLock) {
if (closed)
return;
closed = true;
implCloseChannel();
}
}
//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
// clone keys to avoid calling cancel when holding keyLock
SelectionKey[] copyOfKeys = null;
synchronized (keyLock) {
if (keys != null) {
copyOfKeys = keys.clone();
}
}
if (copyOfKeys != null) {
for (SelectionKey k : copyOfKeys) {
if (k != null) {
k.cancel(); // invalidate and adds key to cancelledKey set
}
}
}
}
еслиSelector
Если он будет закрыт сам по себе, Канал также будет снят с регистрации, и ключ, зарегистрированный от имени Канала, также станет недействительным:
//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
boolean open = selectorOpen.getAndSet(false);
if (!open)
return;
implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
implClose();
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}
Одинchannel
поддерживаетсяOps
, если несколькоOps
, в конкретномselector
После регистрации вы не сможетеselector
Повторить регистрацию дальше, то есть во второй заходjava.nio.channels.spi.AbstractSelectableChannel#register
Когда метод будет получен, будут внесены только изменения Ops, и повторная регистрация не будет выполнена, потому что при регистрации будет сгенерирован новыйSelectionKey
объект. Мы можем позвонитьjava.nio.channels.SelectableChannel#isRegistered
метод, чтобы определить, следует ли отправить один или несколькоSelector
зарегистрированchannel
.
//java.nio.channels.spi.AbstractSelectableChannel#isRegistered
// -- Registration --
public final boolean isRegistered() {
synchronized (keyLock) {
//我们在之前往Selector上注册的时候调用了addKey方法,即每次往//一个Selector注册一次,keyCount就要自增一次。
return keyCount != 0;
}
}
На этом этапе, после наследования класса SelectableChannel, канал может безопасно использоваться несколькими параллельными потоками.
Здесь следует отметить, что наследственностьAbstractSelectableChannel
После этого класса вновь созданные каналы всегда находятся в режиме блокировки. Однако сSelector
Операции, связанные с мультиплексированием, должны быть основаны на неблокирующем режиме, поэтому после регистрации наSelector
Раньше надо былоchannel
перевести в неблокирующий режим и перед отменой регистрации,channel
Не может вернуться в режим блокировки.
Здесь мы рассмотрели режим блокировки и неблокирующий режим канала. В режиме блокировки, вChannel
Каждая вызываемая операция ввода-вывода будет блокироваться до ее завершения. В неблокирующем режиме операции ввода-вывода никогда не блокируются и могут передавать меньше байтов, чем запрошено, или вообще не передавать байтов. Мы можем определить, находится ли он в режиме блокировки, вызвав метод канала isBlocking.
//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (!isOpen())
throw new ClosedChannelException();
synchronized (regLock) {
//此处会做判断,假如是阻塞模式,则会返回true,然后就会抛出异常
if (isBlocking())
throw new IllegalBlockingModeException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}
Таким образом, мы можем использовать следующие примеры в качестве ссылки:
public NIOServerSelectorThread(int port)
{
try {
//打开ServerSocketChannel,用于监听客户端的连接,他是所有客户端连接的父管道
serverSocketChannel = ServerSocketChannel.open();
//将管道设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//利用ServerSocketChannel创建一个服务端Socket对象,即ServerSocket
serverSocket = serverSocketChannel.socket();
//为服务端Socket绑定监听端口
serverSocket.bind(new InetSocketAddress(port));
//创建多路复用器
selector = Selector.open();
//将ServerSocketChannel注册到Selector多路复用器上,并且监听ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The server is start in port: "+port);
} catch (IOException e) {
e.printStackTrace();
}
}
Из-за нехватки времени эта статья находится здесь временно, а остальное будет объяснено в следующей статье.