Некоторые сведения об исходном коде BIO to NIO на NIO

Java

предисловие

Эта статья подробно объяснит постепенное обогащение функций NIO, прокладывая путь к объяснению библиотеки Reactor-Netty.

О методологии программирования на Java: совместное использование видео Reactor и Webflux, Rxjava и Reactor завершены, адрес станции b следующий:

Интерпретация исходного кода Rxjava и совместное использование:вооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо

Интерпретация и совместное использование исходного кода Reactor:вооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо

замена сцены

Продолжение предыдущей статьиНекоторые сведения об исходном коде BIO to NIO BIO, давайте узнаем кое-что о NIO.

В предыдущей статье мы видели, что нам нужно добиться асинхронности и неблокировки.То, что мы делаем сами, — это создание пула потоков и изменение времени ожидания некоторого кода для подключения к клиенту, но недостатки также очень очевидны. Давайте изменим наше мышление. Вот пример сценария, класс A и класс B должны выполнить задание один на один, каждая пара людей получает разные задания, и время, затраченное на них, различается. Поскольку задание имеет вознаграждение, учащиеся возьмут, в традиционном режиме учащиеся в классе A и классе B должны идти вместе, даже если это просто задача обнаружения сердцебиения без управления, В этом случае у клиента вообще нет данных для отправки, он просто хочет сказать самому серверу Он еще жив.В таком случае,если есть еще одноклассник в классе Б сделать стыковку будет очень проблематично.Каждого одноклассника в классе Б можно рассматривать как нить на стороне сервера. Итак, нам нужен менеджер, поэтомуSelectorОказалось, что нам, как менеджерам, здесь часто нужно управлять статусом учеников, ждут ли они заданий, получают ли информацию, выводят ли информацию и т. д.,SelectorОн больше фокусируется на действиях, и для этих меток состояний достаточно делать вещи, на самом деле этими метками состояний тоже нужно управлять, поэтомуSelectionKeyОно тоже возникло. Затем нам нужно улучшить упаковку этих студентов, чтобы носить такие ярлыки. Точно так же для одноклассников мы должны еще больше развязать руки, например, дать им компьютер, чтобы ученики могли делать больше вещей, тогда этот компьютер является существованием буфера здесь. Итак, в NIO в основном три роли,Bufferбуфер,Channelряд,SelectorС селектором мы разобрались, теперь мы будем шаг за шагом анализировать и интерпретировать его исходный код.

Интерпретация канала

Предоставьте каналу возможность быть асинхронным и прерываемым

Из приведенного выше видно, что студенты фактически представляют один за другим.Socketсуществования, то здесьChannelЭто усиленная упаковка для него, котораяChannelКонкретная реализация должна иметьSocketС этим полем все в порядке, и тогда конкретный класс реализации также тесно окружен им.SocketИмеет функцию сделать статью. Итак, давайте сначала посмотрим наjava.nio.channels.ChannelНастройка интерфейса:

public interface Channel extends Closeable {

    /**
     * Tells whether or not this channel is open.
     *
     * @return {@code true} if, and only if, this channel is open
     */
    public boolean isOpen();

    /**
     * Closes this channel.
     *
     * <p> After a channel is closed, any further attempt to invoke I/O
     * operations upon it will cause a {@link ClosedChannelException} to be
     * thrown.
     *
     * <p> If this channel is already closed then invoking this method has no
     * effect.
     *
     * <p> This method may be invoked at any time.  If some other thread has
     * already invoked it, however, then another invocation will block until
     * the first invocation is complete, after which it will return without
     * effect. </p>
     *
     * @throws  IOException  If an I/O error occurs
     */
    public void close() throws IOException;

}

Здесь очень прямая настройка, судя по тому, находится ли Канал в открытом состоянии, и действие по закрытию Канала, о нем мы поговорим далее.ClosedChannelExceptionкак это происходит конкретно в коде. Иногда Канал может быть закрыт и прерван асинхронно, что нам и нужно. Итак, чтобы добиться этого эффекта, мы должны установить интерфейс, который может выполнять этот эффект. Достигаемый конкретный эффект должен заключаться в том, что если поток выполняет операции ввода-вывода в канале, который реализует этот интерфейс, другой поток может вызвать метод закрытия канала. В результате блокирующий поток, выполняющий операцию ввода-вывода, получитAsynchronousCloseExceptionаномальный.

Аналогично следует рассмотреть другую ситуацию: если поток выполняет операции ввода-вывода в канале, реализующем этот интерфейс, другой поток может вызвать заблокированный поток.interruptметод(Thread#interrupt()), вызывая закрытие канала, заблокированный поток должен получитьClosedByInterruptExceptionисключение и установить статус прерывания на заблокированный поток.

В это время, если для потока установлено состояние прерывания, а канал над ним снова вызывает операцию блокировки ввода-вывода, то канал будет закрыт, и поток немедленно получит операцию блокировки ввода-вывода.ClosedByInterruptExceptionисключение, его состояние прерывания остается неизменным. Этот интерфейс определяется следующим образом:

public interface InterruptibleChannel
    extends Channel
{

    /**
     * Closes this channel.
     *
     * <p> Any thread currently blocked in an I/O operation upon this channel
     * will receive an {@link AsynchronousCloseException}.
     *
     * <p> This method otherwise behaves exactly as specified by the {@link
     * Channel#close Channel} interface.  </p>
     *
     * @throws  IOException  If an I/O error occurs
     */
    public void close() throws IOException;

}

Его конкретная реализация для упомянутой выше логики находится вjava.nio.channels.spi.AbstractInterruptibleChannelДля анализа этого класса, давайте обратимся к этой статьеInterruptibleChannel и прерываемый ввод-вывод

Дайте каналу возможность быть мультиплексированным

Мы сказали ранее,ChannelвозможноSelectorиспользовать, покаSelectorоснован наChannelсостояние для назначения задач, затемChannelдолжен предоставить регистрациюSelectorметод выше, приходите иSelectorсвязывать. то естьChannelэкземпляр для вызоваregister(Selector,int,Object). Обратите внимание, потому чтоSelectorдолжен управляться в соответствии со значением состояния, поэтому этот метод вернетSelectionKeyобъект для представления этогоchannelсуществуетselectorна статус. оSelectionKey, он содержит много вещей, поэтому я не буду упоминать его здесь.

//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException
    {
        if ((ops & ~validOps()) != 0)
            throw new IllegalArgumentException();
        if (!isOpen())
            throw new ClosedChannelException();
        synchronized (regLock) {
            if (isBlocking())
                throw new IllegalBlockingModeException();
            synchronized (keyLock) {
                // re-check if channel has been closed
                if (!isOpen())
                    throw new ClosedChannelException();
                SelectionKey k = findKey(sel);
                if (k != null) {
                    k.attach(att);
                    k.interestOps(ops);
                } else {
                    // New registration
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
                return k;
            }
        }
    }
//java.nio.channels.spi.AbstractSelectableChannel#addKey
    private void addKey(SelectionKey k) {
        assert Thread.holdsLock(keyLock);
        int i = 0;
        if ((keys != null) && (keyCount < keys.length)) {
            // Find empty element of key array
            for (i = 0; i < keys.length; i++)
                if (keys[i] == null)
                    break;
        } else if (keys == null) {
            keys = new SelectionKey[2];
        } else {
            // Grow key array
            int n = keys.length * 2;
            SelectionKey[] ks =  new SelectionKey[n];
            for (i = 0; i < keys.length; i++)
                ks[i] = keys[i];
            keys = ks;
            i = keyCount;
        }
        keys[i] = k;
        keyCount++;
    }

После регистрации вSelector, канал останется зарегистрированным до тех пор, пока он не будет отменен. При отмене регистрации все ресурсы, выделенные Селектором Каналу, будут освобождены. То есть Канал напрямую не предоставляет метод дерегистрации, так что давайте изменим образ мыслей: мы можем отменить ключ на Селекторе от имени его регистрации. Здесь вы можете позвонитьSelectionKey#cancel()метод явной отмены ключа. затем вSelectorОтмените регистрацию канала во время следующей операции выбора.

//java.nio.channels.spi.AbstractSelectionKey#cancel
    /**
     * Cancels this key.
     *
     * <p> If this key has not yet been cancelled then it is added to its
     * selector's cancelled-key set while synchronized on that set.  </p>
     */
    public final void cancel() {
        // Synchronizing "this" to prevent this key from getting canceled
        // multiple times by different threads, which might cause race
        // condition between selector's select() and channel's close().
        synchronized (this) {
            if (valid) {
                valid = false;
                //还是调用Selector的cancel方法
                ((AbstractSelector)selector()).cancel(this);
            }
        }
    }


//java.nio.channels.spi.AbstractSelector#cancel
    void cancel(SelectionKey k) {                       
        synchronized (cancelledKeys) {
            cancelledKeys.add(k);
        }
    }


//在下一次select操作的时候来解除那些要求cancel的key,即解除Channel注册
//sun.nio.ch.SelectorImpl#select(long)
    @Override
    public final int select(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("Negative timeout");
            //重点关注此方法
        return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
    }
//sun.nio.ch.SelectorImpl#lockAndDoSelect
    private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
    {
        synchronized (this) {
            ensureOpen();
            if (inSelect)
                throw new IllegalStateException("select in progress");
            inSelect = true;
            try {
                synchronized (publicSelectedKeys) {
                    //重点关注此方法
                    return doSelect(action, timeout);
                }
            } finally {
                inSelect = false;
            }
        }
    }
//sun.nio.ch.WindowsSelectorImpl#doSelect
    protected int doSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
    {
        assert Thread.holdsLock(this);
        this.timeout = timeout; // set selector timeout
        processUpdateQueue();
        //重点关注此方法
        processDeregisterQueue();
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        ...
    }

     /**
     * sun.nio.ch.SelectorImpl#processDeregisterQueue
     * Invoked by selection operations to process the cancelled-key set
     */
    protected final void processDeregisterQueue() throws IOException {
        assert Thread.holdsLock(this);
        assert Thread.holdsLock(publicSelectedKeys);

        Set<SelectionKey> cks = cancelledKeys();
        synchronized (cks) {
            if (!cks.isEmpty()) {
                Iterator<SelectionKey> i = cks.iterator();
                while (i.hasNext()) {
                    SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                    i.remove();

                    // remove the key from the selector
                    implDereg(ski);

                    selectedKeys.remove(ski);
                    keys.remove(ski);

                    // remove from channel's key set
                    deregister(ski);

                    SelectableChannel ch = ski.channel();
                    if (!ch.isOpen() && !ch.isRegistered())
                        ((SelChImpl)ch).kill();
                }
            }
        }
    }

Здесь, когда Канал закрыт, либо по телефонуChannel#closeИли закройте Канал, прервав поток, это неявно отменит все ключи о Канале, а также вызовет его внутренне.k.cancel().

//java.nio.channels.spi.AbstractInterruptibleChannel#close
    /**
     * Closes this channel.
     *
     * <p> If the channel has already been closed then this method returns
     * immediately.  Otherwise it marks the channel as closed and then invokes
     * the {@link #implCloseChannel implCloseChannel} method in order to
     * complete the close operation.  </p>
     *
     * @throws  IOException
     *          If an I/O error occurs
     */
    public final void close() throws IOException {
        synchronized (closeLock) {
            if (closed)
                return;
            closed = true;
            implCloseChannel();
        }
    }
//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
     protected final void implCloseChannel() throws IOException {
        implCloseSelectableChannel();

        // clone keys to avoid calling cancel when holding keyLock
        SelectionKey[] copyOfKeys = null;
        synchronized (keyLock) {
            if (keys != null) {
                copyOfKeys = keys.clone();
            }
        }

        if (copyOfKeys != null) {
            for (SelectionKey k : copyOfKeys) {
                if (k != null) {
                    k.cancel();   // invalidate and adds key to cancelledKey set
                }
            }
        }
    }

еслиSelectorЕсли он будет закрыт сам по себе, Канал также будет снят с регистрации, и ключ, зарегистрированный от имени Канала, также станет недействительным:

//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
        boolean open = selectorOpen.getAndSet(false);
        if (!open)
            return;
        implCloseSelector();
    }
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
                selectedKeys.remove(ski);
                i.remove();
            }
            assert selectedKeys.isEmpty() && keys.isEmpty();
        }
    }
}

ОдинchannelподдерживаетсяOps, если несколькоOps, в конкретномselectorПосле регистрации вы не сможетеselectorПовторить регистрацию дальше, то есть во второй заходjava.nio.channels.spi.AbstractSelectableChannel#registerКогда метод будет получен, будут внесены только изменения Ops, и повторная регистрация не будет выполнена, потому что при регистрации будет сгенерирован новыйSelectionKeyобъект. Мы можем позвонитьjava.nio.channels.SelectableChannel#isRegisteredметод, чтобы определить, следует ли отправить один или несколькоSelectorзарегистрированchannel.

//java.nio.channels.spi.AbstractSelectableChannel#isRegistered
 // -- Registration --

    public final boolean isRegistered() {
        synchronized (keyLock) {
            //我们在之前往Selector上注册的时候调用了addKey方法,即每次往//一个Selector注册一次,keyCount就要自增一次。
            return keyCount != 0;
        }
    }

На этом этапе, после наследования класса SelectableChannel, канал может безопасно использоваться несколькими параллельными потоками. Здесь следует отметить, что наследственностьAbstractSelectableChannelПосле этого класса вновь созданные каналы всегда находятся в режиме блокировки. Однако сSelectorОперации, связанные с мультиплексированием, должны быть основаны на неблокирующем режиме, поэтому после регистрации наSelectorРаньше надо былоchannelперевести в неблокирующий режим и перед отменой регистрации,channelНе может вернуться в режим блокировки. Здесь мы рассмотрели режим блокировки и неблокирующий режим канала. В режиме блокировки, вChannelКаждая вызываемая операция ввода-вывода будет блокироваться до ее завершения. В неблокирующем режиме операции ввода-вывода никогда не блокируются и могут передавать меньше байтов, чем запрошено, или вообще не передавать байтов. Мы можем определить, находится ли он в режиме блокировки, вызвав метод канала isBlocking.

//java.nio.channels.spi.AbstractSelectableChannel#register
 public final SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException
    {
        if ((ops & ~validOps()) != 0)
            throw new IllegalArgumentException();
        if (!isOpen())
            throw new ClosedChannelException();
        synchronized (regLock) {
     //此处会做判断,假如是阻塞模式,则会返回true,然后就会抛出异常
            if (isBlocking())
                throw new IllegalBlockingModeException();
            synchronized (keyLock) {
                // re-check if channel has been closed
                if (!isOpen())
                    throw new ClosedChannelException();
                SelectionKey k = findKey(sel);
                if (k != null) {
                    k.attach(att);
                    k.interestOps(ops);
                } else {
                    // New registration
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
                return k;
            }
        }
    }

Таким образом, мы можем использовать следующие примеры в качестве ссылки:

public NIOServerSelectorThread(int port)
	{
		try {
			//打开ServerSocketChannel,用于监听客户端的连接,他是所有客户端连接的父管道
			serverSocketChannel = ServerSocketChannel.open();
			//将管道设置为非阻塞模式
			serverSocketChannel.configureBlocking(false);
			//利用ServerSocketChannel创建一个服务端Socket对象,即ServerSocket
			serverSocket = serverSocketChannel.socket();
			//为服务端Socket绑定监听端口
			serverSocket.bind(new InetSocketAddress(port));
			//创建多路复用器
			selector = Selector.open();
			//将ServerSocketChannel注册到Selector多路复用器上,并且监听ACCEPT事件
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("The server is start in port: "+port);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

Из-за нехватки времени эта статья находится здесь временно, а остальное будет объяснено в следующей статье.