Некоторые вещи из BIO в исходный код NIO: Селектор под NIO

Java

предисловие

В этой серии статей будет подробно рассказано о постепенном обогащении функций NIO, что проложит путь к объяснению библиотеки Reactor-Netty.

О методологии программирования на Java: совместное использование видео Reactor и Webflux, Rxjava и Reactor завершены, адрес станции b следующий:

Интерпретация исходного кода Rxjava и совместное использование:вооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо

Интерпретация и совместное использование исходного кода Reactor:вооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо

Интерпретация исходного кода этой серии основана на деталях API JDK11, которые могут отличаться от других версий.Пожалуйста, решите проблему с версией JDK самостоятельно.

Предыдущие статьи из этой серии:

Некоторые сведения об исходном коде BIO to NIO BIO

Некоторые сведения об исходном коде BIO to NIO на NIO

Некоторые сведения об исходном коде BIO to NIO в NIO

Введение SelectionKey

Как мы говорили в предыдущем содержании, после того, как студент определен, нам нужно установить его статус, а затем передать его вSelectorДля управления мы можем установить его статус черезSelectionKeyпродолжать.

Тогда здесь мы сначала проходимChannelподробно не объяснилSelectableChannelвнизregisterметод. Как мы упоминали ранее,SelectableChannelбудетchannelдать возможность пройтиSelectorдля мультиплексирования. Как менеджер,channelДля повторного использования необходимо зарегистрироваться у администратора. так,SelectableChannelвнизregisterЭтот метод является ядром нашего второго внимания, а также точкой входа для нашего следующего содержания.registerДля интерпретации метода, пожалуйста, смотрите нашу предыдущую статьюНекоторые сведения об исходном коде BIO to NIO на NIOсерединаДайте каналу возможность быть мультиплексированнымсодержание этого раздела.

Здесь нужно помнитьSelectableChannelпричаливаетchannelособенность (т.е.SelectionKey), что похоже на табличное оформление.Изначально функции можно было задавать в таблице, но для того, чтобы сделать операцию более целенаправленной, то есть для того, чтобы упростить управление кодовыми функциями, мы извлекли и спроектировали первую таблицу Две таблицы чем-то напоминают человеческие органы, в целом все работают вместе для выполнения одной задачи, но внутренние органы сосредоточены на своих основных специфических функциях, а иногда имеют некоторые мелкие функции других органов.

Отсюда мы также можем знать, чтоSelectionKeyозначаетSelectableChannelа такжеSelectorСвязанный тег может быть просто понят какtoken. Он похож на тот, который ресепшн получит из фона после того, как пользователь войдет в систему управления правами.tokenТаким образом, пользователи могут положиться на этоtokenдля доступа к соответствующей информации о ресурсах для операции.

//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
    throws ClosedChannelException
{       ...
    synchronized (regLock) {
       ...
        synchronized (keyLock) {
           ...
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.attach(att);
                k.interestOps(ops);
            } else {
                // New registration
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
            return k;
        }
    }
}

Комбинируя верхний и нижний исходный код, в каждомSelectorиспользоватьregisterрегистрация методаchannel, создаст и вернетSelectionKey.

//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
                                        int ops,
                                        Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    // register (if needed) before adding to key set
    implRegister(k);

    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}

мы вНекоторые сведения об исходном коде BIO to NIO на NIOсерединаДайте каналу возможность быть мультиплексированнымСодержимое этого раздела известно, что после регистрации наSelectorначальство,Channelостанется зарегистрированным до тех пор, пока его не снимут с учета. Незарегистрированный, когда незарегистрированныйSelectorназначен наChannelвсех ресурсов. то естьSelectionKeyв своем призывеSelectionKey#channelметод, или что этот ключ представляетchannelзакрытый, или ключ, связанный сSelectorОн действителен до тех пор, пока не будет закрыт. Мы также знаем из анализа предыдущей статьи, что отменаSelectionKey, не сразу изSelectorудалено, оно будет добавлено вSelectorизcancelledKeysэтоSetколлекция, которая будет удалена во время следующей операции выбора, мы можем сделать это с помощьюjava.nio.channels.SelectionKey#isValidсудитьSelectionKeyэто эффективно.

SelectionKey содержит четыре набора операций, каждый набор операций представлен Int, а младшие четыре бита значения int используются для представленияchannelПоддерживаемые типы необязательных операций.


   /**
    * Operation-set bit for read operations.
    */
   public static final int OP_READ = 1 << 0;

   /**
    * Operation-set bit for write operations.
    */
   public static final int OP_WRITE = 1 << 2;

   /**
    * Operation-set bit for socket-connect operations.
    */
   public static final int OP_CONNECT = 1 << 3;

   /**
    * Operation-set bit for socket-accept operations.
    */
   public static final int OP_ACCEPT = 1 << 4;

interestOps

пройти черезinterestOpsбыть увереннымselectorКакие категории действий проверяются на готовность во время следующего действия выбора, является ли событие действияchannelобеспокоенный.interestOpsсуществуетSelectionKeyПри создании инициализируется для регистрацииSelectorзначение ops в то время, это значение может быть передано черезsun.nio.ch.SelectionKeyImpl#interestOps(int)меняться, мыSelectorImpl#registerможно ясно увидеть.

//sun.nio.ch.SelectionKeyImpl
public final class SelectionKeyImpl
   extends AbstractSelectionKey
{
   private static final VarHandle INTERESTOPS =
           ConstantBootstraps.fieldVarHandle(
                   MethodHandles.lookup(),
                   "interestOps",
                   VarHandle.class,
                   SelectionKeyImpl.class, int.class);

   private final SelChImpl channel;
   private final SelectorImpl selector;

   private volatile int interestOps;
   private volatile int readyOps;

   // registered events in kernel, used by some Selector implementations
   private int registeredEvents;

   // index of key in pollfd array, used by some Selector implementations
   private int index;

   SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
       channel = ch;
       selector = sel;
   }
  ...
}

readyOps

readyOpsвыразить черезSelectorобнаруженchannelСобытия действий, которые готовы. существуетSelectionKeyПри создании (как показано в исходном коде выше)readyOpsзначение равно 0, вSelectorизselectОн может быть обновлен во время операции, но следует отметить, что мы не можем напрямую вызвать обновление.

SelectionKeyизreadyOpsозначаетchannelОн готов к некоторым операциям, но нет гарантии, что во время операции для этого типа события готовности не произойдет блокировка, то есть поток, в котором находится операция, может заблокироваться. по окончанииselectСразу после операции в большинстве случаевreadyOpsобновить в это времяreadyOpsзначение является наиболее точным, если внешнее событие илиchannelЕсть операции ввода-вывода,readyOpsможет быть неточным. Итак, мы увидели, что этоvolatileТипы.

SelectionKeyВсе операционные события определены, но специфичныchannelПоддерживаемые рабочие события зависят от конкретныхchannel, то есть конкретный анализ конкретных проблем. все необязательныеchannel(которыйSelectableChannelподклассы) доступны черезSelectableChannel#validOpsметод для определения того, было ли событие операцииchannelподдерживается, то есть каждый подкласс будет иметь паруvalidOpsреализация, которая возвращает число, которое идентифицирует толькоchannelКакие операции поддерживаются. попробуйте установить или проверитьchannelУстановлены поддерживаемые операции, и будет выдано соответствующее исключение времени выполнения. В различных сценариях применения поддерживаемыеOpsотличается, вырванная часть выглядит так:

//java.nio.channels.SocketChannel#validOps
public final int validOps() {
    //即1|4|8  1101
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE
            | SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps() {
    // 16
    return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps() {
    // 1|4
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE);
}

Если вам нужно часто связывать некоторые данные, указанные в нашей программе, сSelectionKey, например, мы используем объект для представления состояния высокоуровневого протокола на верхнем уровне, а объект используется для уведомления реализации процессора протокола. Итак, SelectionKey поддерживает черезattachметод присоединения объекта кSelectionKeyизattachmentначальство.attachmentв состоянии пройтиjava.nio.channels.SelectionKey#attachmentспособ доступа. Если вы хотите отменить объект, вы можете сделать это следующим образом:selectionKey.attach(null).

Следует отметить, что если прикрепленный объект больше не используется, его необходимо очистить вручную, если нет, если даSelectionKeyОн существовал всегда. Поскольку здесь он является сильной ссылкой, сборщик мусора не будет перерабатывать объект. Если его не очистить, это станет утечкой памяти.

SelectionKey является потокобезопасным при одновременном использовании несколькими потоками. Нам просто нужно знать,SelectorизselectОперация всегда будет использовать текущийinterestOpsустановленное значение.

СелекторИсследовать

До сих пор мы были на связи более или менееSelector, что это за роль, мне должно быть очень ясно, тогда мы будем дальше исследовать то, с чем мы уже соприкоснулисьSelectorконструкция рабочего механизма.

Открытый метод селектора

Вы можете сказать из названияSelectableChannelобъект зависит отSelectorдля достижения мультиплексирования. Мы можем позвонитьjava.nio.channels.Selector#openсоздатьselectorОбъект:

//java.nio.channels.Selector#open
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}

об этомSelectorProvider.provider(), который использует реализацию по умолчанию в соответствии с системой, в которой он находится.Я здесь система Windows, тогда его реализация по умолчаниюsun.nio.ch.WindowsSelectorProvider, чтобы можно было вызвать конкретную реализацию на основе соответствующей системы.

//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}
//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {

/**
 * Prevent instantiation.
 */
private DefaultSelectorProvider() { }

/**
 * Returns the default SelectorProvider.
 */
public static SelectorProvider create() {
    return new sun.nio.ch.WindowsSelectorProvider();
}

}

Основываясь на окнах, селектор в конечном итоге будет использоваться здесь.sun.nio.ch.WindowsSelectorImplДавайте проведем базовую логику.

public class WindowsSelectorProvider extends SelectorProviderImpl {

    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}

Здесь нам нужно посмотреть наWindowsSelectorImplконструктор:

//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    pollWrapper = new PollArrayWrapper(INIT_CAP);
    wakeupPipe = Pipe.open();
    wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

    // Disable the Nagle algorithm so that the wakeup is more immediate
    SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
    (sink.sc).socket().setTcpNoDelay(true);
    wakeupSinkFd = ((SelChImpl)sink).getFDVal();

    pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}

нам даноPipe.open()просто знаюselectorбудет оставаться открытым до тех пор, пока не вызовет свойcloseметод:

//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
    boolean open = selectorOpen.getAndSet(false);
    if (!open)
        return;
    implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
                selectedKeys.remove(ski);
                i.remove();
            }
            assert selectedKeys.isEmpty() && keys.isEmpty();
        }
    }
}
//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose() throws IOException {
    assert !isOpen();
    assert Thread.holdsLock(this);

    // prevent further wakeup
    synchronized (interruptLock) {
        interruptTriggered = true;
    }

    wakeupPipe.sink().close();
    wakeupPipe.source().close();
    pollWrapper.free();

    // Make all remaining helper threads exit
    for (SelectThread t: threads)
            t.makeZombie();
    startLock.startThreads();
}

Как видите, предыдущийwakeupPipeЗакрыто методом close. Здесь снова задействован метод closewakeupPipe.sink()а такжеwakeupPipe.source()закрытие сpollWrapper.free()Освобождение, вот и сложность нашей статьи, вот, давайте разберемся, что это за существование. Во-первых, у нас естьWindowsSelectorImpl(SelectorProvider sp)Этот конструктор делает следующее:

  • СоздаватьPollArrayWrapperобъект (pollWrapper);
  • Pipe.open()открыть трубу;
  • получитьwakeupSourceFdа такжеwakeupSinkFdдва файловых дескриптора;
  • Поместите файловый дескриптор на исходную сторону канала (wakeupSourceFd) вставитьpollWrapperвнутри;

Решение Pipe.open()

Здесь мы зададимся вопросом, зачем создается пайп и для чего он используется.

Давайте посмотримPipe.open()Реализация исходного кода:

//java.nio.channels.Pipe#open
public static Pipe open() throws IOException {
    return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe() throws IOException {
    return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
    try {
        AccessController.doPrivileged(new Initializer(sp));
    } catch (PrivilegedActionException x) {
        throw (IOException)x.getCause();
    }
}
private class Initializer
implements PrivilegedExceptionAction<Void>
{

private final SelectorProvider sp;

private IOException ioe = null;

private Initializer(SelectorProvider sp) {
    this.sp = sp;
}

@Override
public Void run() throws IOException {
    LoopbackConnector connector = new LoopbackConnector();
    connector.run();
    if (ioe instanceof ClosedByInterruptException) {
        ioe = null;
        Thread connThread = new Thread(connector) {
            @Override
            public void interrupt() {}
        };
        connThread.start();
        for (;;) {
            try {
                connThread.join();
                break;
            } catch (InterruptedException ex) {}
        }
        Thread.currentThread().interrupt();
    }

    if (ioe != null)
        throw new IOException("Unable to establish loopback connection", ioe);

    return null;
}

Из приведенного выше исходного кода мы можем знать, чтоPipeImplобъект, вPipeImplбудет выполняться в конструктореAccessController.doPrivileged, который выполняется сразу после его вызоваInitializerизrunметод:

//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {

    @Override
    public void run() {
        ServerSocketChannel ssc = null;
        SocketChannel sc1 = null;
        SocketChannel sc2 = null;

        try {
            // Create secret with a backing array.
            ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
            ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);

            // Loopback address
            InetAddress lb = InetAddress.getLoopbackAddress();
            assert(lb.isLoopbackAddress());
            InetSocketAddress sa = null;
            for(;;) {
                // Bind ServerSocketChannel to a port on the loopback
                // address
                if (ssc == null || !ssc.isOpen()) {
                    ssc = ServerSocketChannel.open();
                    ssc.socket().bind(new InetSocketAddress(lb, 0));
                    sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
                }

                // Establish connection (assume connections are eagerly
                // accepted)
                sc1 = SocketChannel.open(sa);
                RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
                do {
                    sc1.write(secret);
                } while (secret.hasRemaining());
                secret.rewind();

                // Get a connection and verify it is legitimate
                sc2 = ssc.accept();
                do {
                    sc2.read(bb);
                } while (bb.hasRemaining());
                bb.rewind();

                if (bb.equals(secret))
                    break;

                sc2.close();
                sc1.close();
            }

            // Create source and sink channels
            source = new SourceChannelImpl(sp, sc1);
            sink = new SinkChannelImpl(sp, sc2);
        } catch (IOException e) {
            try {
                if (sc1 != null)
                    sc1.close();
                if (sc2 != null)
                    sc2.close();
            } catch (IOException e2) {}
            ioe = e;
        } finally {
            try {
                if (ssc != null)
                    ssc.close();
            } catch (IOException e2) {}
        }
    }
}
}

вот для созданияpipeпроцесс,windowsРеализация ниже заключается в создании двух локальныхsocketChannel, а затем подключиться (процесс подключения выполняется путем записи случайных данных для проверки соединения двух сокетов), дваsocketChannelРеализовал конвейер отдельноpipeизsourceа такжеsinkконец. И мы до сих пор не знаем этогоpipeДля чего это? Если вы знакомы с системными вызовамиC/C++, вы можете знать, что блок вselectСуществует три способа разбудить поток:

  1. Есть данные для чтения/записи, или возникает исключение.
  2. Время блокировки истекло, т.е.time out.
  3. получилnon-blockсигнал о. кkillилиpthread_killпроблема.

так,Selector.wakeup()разбудить заблокированногоselect, то только этими тремя способами, среди которых:

  • Второй способ можно исключить, т.к.selectПосле блокировки его нельзя изменитьtime outвремя.
  • А третий вроде бы только вLinuxпонял выше,WindowsМеханизма такой сигнализации нет.

Кажется, есть только первый способ. Если мы позвоним несколько разSelector.open(), затем вWindowsКаждый раз, когда созывается собрание, оно устанавливает пару из себя и своегоloopbackизTCPСоединение, в Linux пара открывается при каждом вызовеpipe(трубы обычно открываются парами под Linux), в этот момент предполагается, что мы можем угадать - то есть, если вы хотите проснутьсяselect, просто двигайтесь к своимloopbackСоединение отправляет данные в прошлое, поэтому вы можете проснуться и заблокировать вselectна нитке.

Резюмируем вышеизложенное: вWindowsВниз,Javaвиртуальная машина вSelector.open()будет строиться на себе и самостоятельноloopbackизTCPподключиться; вLinuxВниз,Selectorсоздастpipe. Это в основном дляSelector.wakeup()Может легко проснуться заблокированнымselect()Поток в системном вызове (путем вызоваTCPПросто напишите что-нибудь в ссылках и каналах, чтобы разбудить заблокированные темы).

Интерпретация PollArrayWrapper

существуетWindowsSelectorImplКонструктор Наконец, мы видим эту строку кода:pollWrapper.addWakeupSocket(wakeupSourceFd, 0);, то есть поставить дескриптор файла на стороне Source в пайпе (wakeupSourceFd) вставитьpollWrapperвнутри.pollWrapperв видеPollArrayWrapper, что это такое, в этом разделе мы его рассмотрим.

class PollArrayWrapper {

    private AllocatedNativeObject pollArray; // The fd array

    long pollArrayAddress; // pollArrayAddress

    @Native private static final short FD_OFFSET     = 0; // fd offset in pollfd
    @Native private static final short EVENT_OFFSET  = 4; // events offset in pollfd

    static short SIZE_POLLFD = 8; // sizeof pollfd struct

    private int size; // Size of the pollArray

    PollArrayWrapper(int newSize) {
        int allocationSize = newSize * SIZE_POLLFD;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();
        this.size = newSize;
    }

    ...

    // Access methods for fd structures
    void putDescriptor(int i, int fd) {
        pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
    }

    void putEventOps(int i, int event) {
        pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
    }
    ...
   // Adds Windows wakeup socket at a given index.
    void addWakeupSocket(int fdVal, int index) {
        putDescriptor(index, fdVal);
        putEventOps(index, Net.POLLIN);
    }
}

здесь будетwakeupSourceFdизPOLLINСобытие идентифицируется какpollArrayизEventOpsСоответствующее значение , здесь — память, непосредственно управляемая небезопасным, т. е. относительно этогоpollArrayСмещение адреса памяти, где он находитсяSIZE_POLLFD * i + EVENT_OFFSETпишите по этому адресуNet.POLLINПредставленное значение — это значение, показанное в исходном коде, связанном с локальным методом ниже.putDescriptorЭто такая же операция. когдаsink端Когда данные записываются,sourceсоответствующий файловый дескрипторwakeupSourceFdбудет в состоянии готовности.

//java.base/windows/native/libnio/ch/nio_util.h
    /* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined   */
    /* in Windows Vista / Windows Server 2008 and later. If we are on an      */
    /* older release we just use the Solaris constants as this was previously */
    /* done in PollArrayWrapper.java.                                         */
    #define POLLIN       0x0001
    #define POLLOUT      0x0004
    #define POLLERR      0x0008
    #define POLLHUP      0x0010
    #define POLLNVAL     0x0020
    #define POLLCONN     0x0002

AllocatedNativeObjectРодительский класс этого класса имеет большое количествоunsafeОперации класса, они напрямую основаны на операциях уровня памяти. Из конструктора его родительского класса мы также можем ясно видетьpollArrayчерезunsafe.allocateMemory(size + ps)Выделенный блок системной памяти.

class AllocatedNativeObject                             // package-private
    extends NativeObject
{
    /**
     * Allocates a memory area of at least {@code size} bytes outside of the
     * Java heap and creates a native object for that area.
     */
    AllocatedNativeObject(int size, boolean pageAligned) {
        super(size, pageAligned);
    }

    /**
     * Frees the native memory area associated with this object.
     */
    synchronized void free() {
        if (allocationAddress != 0) {
            unsafe.freeMemory(allocationAddress);
            allocationAddress = 0;
        }
    }

}
//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
        if (!pageAligned) {
            this.allocationAddress = unsafe.allocateMemory(size);
            this.address = this.allocationAddress;
        } else {
            int ps = pageSize();
            long a = unsafe.allocateMemory(size + ps);
            this.allocationAddress = a;
            this.address = a + ps - (a & (ps - 1));
        }
    }

Пока мы закончилиSelector.open()интерпретации, его основная задача состоит в том, чтобы завершить установлениеPipe, и положиpipe sourceконецwakeupSourceFdположить вpollArray, этоpollArrayдаSelectorХаб, который выполняет квесты своего персонажа. В этой статье в основном анализируется реализация Windows, то есть через два подключения под WindowssocketChannelДостигнутоPipe,linuxнепосредственно использовать системуpipeВот и все.

Управление SelectionKey в селекторе

SelectionKey зарегистрирован в селекторе

Так называемая регистрация фактически помещает объект в поле-контейнер в зарегистрированном объекте.Это поле может быть массивом, очередью, набором коллекций или списком. Здесь то же самое, за исключением того, что он должен иметь возвращаемое значение, поэтому просто верните объект, который будет помещен в коллекцию.

//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
                                        int ops,
                                        Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    // register (if needed) before adding to key set
    implRegister(k);

    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
    ensureOpen();
    synchronized (updateLock) {
        newKeys.addLast(ski);
    }
}

Мы видели этот код раньше, здесь мы просматриваем его снова. Сначала создайте новыйSelectionKeyImplобъект, этот объект является правильнымChannelупаковка, не только это, но и кстати токSelectorОбъект принят, поэтому мы также можем передатьSelectionKeyобъект, чтобы получить соответствующийSelectorобъект.

Далее, исходя изwindowsплатформа реализованаimplRegister, сначала черезensureOpen()обеспечить, чтобыSelectorоткрыт. Тогда поместите этоSelectionKeyImplПрисоединяйсяWindowsSelectorImplвнутренняя цельновая регистрацияУправляется SelectionKeynewKeysсреди,newKeysЯвляетсяArrayDequeобъект. заArrayDequeЕсли вы не понимаете, вы можете обратиться кDeque и ArrayDeque анализа исходного кода контейнера JavaЭта статья.

тогда этоSelectionKeyImplПрисоединяйсяsun.nio.ch.SelectorImpl#keysиди, этоSet<SelectionKey>Коллекция, представляющая тех, кто зарегистрирован в текущемSelectorна объектеSelectionKeyсобирать. Давайте посмотримsun.nio.ch.SelectorImplконструктор:

//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
    super(sp);
    keys = ConcurrentHashMap.newKeySet();
    selectedKeys = new HashSet<>();
    publicKeys = Collections.unmodifiableSet(keys);
    publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}

То есть здесьpublicKeysпроисходит отkeys,ТолькоpublicKeysдоступен только для чтения, мы хотим знать текущуюSelectorзарегистрирован на объектеkeys, вы можете позвонитьsun.nio.ch.SelectorImpl#keysполучить:

//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys() {
    ensureOpen();
    return publicKeys;
}

Вернувшись в этот конструктор,selectedKeys, как следует из названия, принадлежит выбранным Ключам, то есть во время предыдущей операцииChannelсоответствующийSelectionKey. Эта коллекцияkeysподмножество . пройти черезselector.selectedKeys()Получать.

//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
    ensureOpen();
    return publicSelectedKeys;
}

Мы видим, что он возвращаетсяpublicSelectedKeys, элементы в этом поле можно удалять, но нельзя добавлять. В предыдущем материале мы рассмотрелиSelectionKeyотмена, так что мы вjava.nio.channels.spi.AbstractSelectorметод, определяетсяcancelledKeysДа, такжеHashSetобъект. представитель которого аннулирован, но не снят с регистрацииSelectionKey. Эта коллекция Set недоступна напрямую, и опять же, это подмножество keys().

для новыхSelectorНапример, все приведенные выше наборы пусты. Как видно из приведенного выше исходного кода, черезchannel.registerбудетSelectionKeyДобавить кkeys, это источник ключа. еслиselectionKey.cancel()вызывается, то ключ будет добавлен вcancelledKeysэтой коллекции, затем при следующем вызове селектораselectметод, в настоящее времяcanceldKeysне пусто, вызовет этоSelectionKeyизderegisterоперация (высвобождение ресурсов иkeysудаленный). будь то черезchannel.close()или черезselectionKey.cancel(), приведет кSelectionKeyбыть добавленным кcannceldKeyсередина.

Во время каждой операции выбора (выбрать) ключ может быть добавлен вselectedKeysв или изcancelledKeysудалено в.

Интерпретация метода выбора Selector

Зная вышеизложенное, перейдем кselectметод, обратите внимание на его детали. Зависит отSelectorАпи известно,selectЕсть две формы операции, одна из них select(), selectNow(), select(long timeout); другойselect(Consumer<SelectionKey> action, long timeout),select(Consumer<SelectionKey> action),selectNow(Consumer<SelectionKey> action). Последний представляет собой недавно добавленный API в JDK11, который в основном предназначен для пользовательской операции, выполняемой с соответствующим ключом каналами, готовыми к операциям ввода-вывода в процессе выбора.

Следует отметить, что существуютConsumer<SelectionKey> actionОперация выбора параметра блокируется, она будет вызываться только в том случае, если выбран хотя бы один канал.Selectorпримерwakeupметод пробуждения, аналогично поток, в котором он находится, может быть прерван.

//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}

//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout)
    throws IOException
{
    Objects.requireNonNull(action);
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
    {
        synchronized (this) {
            ensureOpen();
            if (inSelect)
                throw new IllegalStateException("select in progress");
            inSelect = true;
            try {
                synchronized (publicSelectedKeys) {
                    return doSelect(action, timeout);
                }
            } finally {
                inSelect = false;
            }
        }
    }

Мы можем заметить, что в любом случае все они оказываются вlockAndDoSelectВ этом методе конкретная система в конечном итоге будет выполненаdoSelect(action, timeout)выполнить. Здесь мы беремsun.nio.ch.WindowsSelectorImpl#doSelectВ качестве примера для описания шагов его работы:

// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
    throws IOException
    {
        assert Thread.holdsLock(this);
        this.timeout = timeout; // set selector timeout
        processUpdateQueue();  // <1>
        processDeregisterQueue(); // <2>
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        // Calculate number of helper threads needed for poll. If necessary
        // threads are created here and start waiting on startLock
        adjustThreadsCount();
        finishLock.reset(); // reset finishLock
        // Wakeup helper threads, waiting on startLock, so they start polling.
        // Redundant threads will exit here after wakeup.
        startLock.startThreads();
        // do polling in the main thread. Main thread is responsible for
        // first MAX_SELECTABLE_FDS entries in pollArray.
        try {
            begin();
            try {
                subSelector.poll();  // <3>
            } catch (IOException e) {
                finishLock.setException(e); // Save this exception
            }
            // Main thread is out of poll(). Wakeup others and wait for them
            if (threads.size() > 0)
                finishLock.waitForHelperThreads();
          } finally {
              end();
          }
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        finishLock.checkForException();
        processDeregisterQueue();  // <4>
        int updated = updateSelectedKeys(action); // <5>
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        resetWakeupSocket(); // <6>
        return updated;
    }

Интерпретация процессаUpdateQueue

  1. Во-первых, мы можем узнать, что через конкретную реализацию соответствующего класса реализации операционной системы (здесь — WindowsSelectorImpl), через<1>кудаprocessUpdateQueue()узнать о каждом оставшемсяChannel(Некоторые каналы были отменены) на данный моментinterestOps, в том числе вновь зарегистрированных иupdateKeys, и сделай этоpollWrapperуправленческие операции.

    • то есть для вновь зарегистрированныхSelectionKeyImpl, мы относимся к этомуpollArrayСмещение адреса памяти, где он находитсяSIZE_POLLFD * totalChannels + FD_OFFSETа такжеSIZE_POLLFD * totalChannels + EVENT_OFFSETдепозит отдельноSelectionKeyImplфайловый дескрипторfdсоответствующий емуEventOps(изначально 0).

    • правильноupdateKeys, потому что раньшеpollArrayОн был сохранен в относительной позиции , здесь нам также нужно судить о достоверности полученного ключа.SelectionKeyImplобъектinterestOpsнаписатьpollWrapperхранить его вEventOpsпозиция.

    Уведомление: правильноnewKeysПосле оценки действительности ключа, если он действителен, он вызоветgrowIfNeeded()метод, здесь мы сначала оценимchannelArray.length == totalChannels, этоSelectionKeyImplМассив с начальной емкостью 8.channelArrayэто на самом деле удобноSelectorуправление в книгахSelectionKeyImplЭто просто массив чисел, судя по длине и размеру массива, если иtotalChannels(начальное значение равно 1) равно, а не только дляchannelArrayрасширение и, что более важно, помощьpollWrapper,ПозволятьpollWrapperРасширение – это цель.

    и когдаtotalChannels % MAX_SELECTABLE_FDS == 0, затем откройте дополнительный поток для обработкиselector.windowsначальствоselectСистемные вызовы имеют максимальное ограничение файлового дескриптора и могут быть опрошены только за один раз.1024При наличии более 1024 файловых дескрипторов требуется многопоточный опрос. звонить в то же времяpollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels)в связи с этимpollArrayСмещение адреса памяти, где он находитсяSIZE_POLLFD * totalChannels + FD_OFFSETпишите по этому адресуwakeupSourceFdпредставленыfdValценность. Таким образом, только что запущенный поток может пройти черезMAX_SELECTABLE_FDSопределить это для мониторингаwakeupSourceFd, легко проснутьсяselector. пройти черезski.setIndex(totalChannels)записыватьSelectionKeyImplПозиция индекса в массиве для последующего использования.

   /**
    * sun.nio.ch.WindowsSelectorImpl#processUpdateQueue
    * Process new registrations and changes to the interest ops.
    */
private void processUpdateQueue() {
    assert Thread.holdsLock(this);

    synchronized (updateLock) {
        SelectionKeyImpl ski;

        // new registrations
        while ((ski = newKeys.pollFirst()) != null) {
            if (ski.isValid()) {
                growIfNeeded();
                channelArray[totalChannels] = ski;
                ski.setIndex(totalChannels);
                pollWrapper.putEntry(totalChannels, ski);
                totalChannels++;
                MapEntry previous = fdMap.put(ski);
                assert previous == null;
            }
        }

        // changes to interest ops
        while ((ski = updateKeys.pollFirst()) != null) {
            int events = ski.translateInterestOps();
            int fd = ski.getFDVal();
            if (ski.isValid() && fdMap.containsKey(fd)) {
                int index = ski.getIndex();
                assert index >= 0 && index < totalChannels;
                pollWrapper.putEventOps(index, events);
            }
        }
    }
}

//sun.nio.ch.PollArrayWrapper#putEntry
// Prepare another pollfd struct for use.
void putEntry(int index, SelectionKeyImpl ski) {
    putDescriptor(index, ski.getFDVal());
    putEventOps(index, 0);
}
//sun.nio.ch.WindowsSelectorImpl#growIfNeeded
private void growIfNeeded() {
    if (channelArray.length == totalChannels) {
        int newSize = totalChannels * 2; // Make a larger array
        SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
        System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
        channelArray = temp;
        pollWrapper.grow(newSize);
    }
    if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
        pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
        totalChannels++;
        threadsCount++;
    }
}
// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private static final int MAX_SELECTABLE_FDS = 1024;

// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array,  where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
// The number of valid entries in  poll array, including entries occupied
// by wakeup socket handle.
private int totalChannels = 1;

//sun.nio.ch.PollArrayWrapper#grow
// Grows the pollfd array to new size
void grow(int newSize) {
    PollArrayWrapper temp = new PollArrayWrapper(newSize);
    for (int i = 0; i < size; i++)
        replaceEntry(this, i, temp, i);
    pollArray.free();
    pollArray = temp.pollArray;
    this.size = temp.size;
    pollArrayAddress = pollArray.address();
}

// Maps file descriptors to their indices in  pollArray
private static final class FdMap extends HashMap<Integer, MapEntry> {
    static final long serialVersionUID = 0L;
    private MapEntry get(int desc) {
        return get(Integer.valueOf(desc));
    }
    private MapEntry put(SelectionKeyImpl ski) {
        return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
    }
    private MapEntry remove(SelectionKeyImpl ski) {
        Integer fd = Integer.valueOf(ski.getFDVal());
        MapEntry x = get(fd);
        if ((x != null) && (x.ski.channel() == ski.channel()))
            return remove(fd);
        return null;
    }
}

// class for fdMap entries
private static final class MapEntry {
    final SelectionKeyImpl ski;
    long updateCount = 0;
    MapEntry(SelectionKeyImpl ski) {
        this.ski = ski;
    }
}
private final FdMap fdMap = new FdMap();
интерпретация processDeregisterQueue
  1. затем через上面WindowsSelectorImpl#doSelect展示源码中<2>кудаprocessDeregisterQueue().
    • правильноcancelledKeysочищать, проходитьcancelledKeys, и для каждогоkeyпровестиderegisterоперации, то изcancelledKeysудалить из коллекции, изkeysколлекция сselectedKeysУдалите его, чтобы освободить ссылку, которую gc удобно перерабатывать,
    • позвонить в пределахimplDeregметод, начнется сchannelArrayудалить соответствующийChannelпредставленыSelectionKeyImpl,КорректированиеtotalChannelsи количество потоков, отmapа такжеkeysудалено вSelectionKeyImpl, УдалитьChannelВверхSelectionKeyImplи закрытьChannel.
    • Также было установлено, чтоprocessDeregisterQueue()метод вызываетpollМетод вызывается до и после, чтобы убедиться, что вызов обрабатывается правильно.pollКлючи, отмененные в течение периода времени, когда метод заблокирован, могут быть своевременно очищены.
    • Наконец, рассудит этоcancelledKeyпредставленыchannelОткрывать и отменять регистрацию, если закрыть и отменять регистрацию, то ресурсы, занятые соответствующим файловым дескриптором, должны быть закрыты.
   /**
    * sun.nio.ch.SelectorImpl#processDeregisterQueue
    * Invoked by selection operations to process the cancelled-key set
    */
protected final void processDeregisterQueue() throws IOException {
    assert Thread.holdsLock(this);
    assert Thread.holdsLock(publicSelectedKeys);

    Set<SelectionKey> cks = cancelledKeys();
    synchronized (cks) {
        if (!cks.isEmpty()) {
            Iterator<SelectionKey> i = cks.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                i.remove();

                // remove the key from the selector
                implDereg(ski);

                selectedKeys.remove(ski);
                keys.remove(ski);

                // remove from channel's key set
                deregister(ski);

                SelectableChannel ch = ski.channel();
                if (!ch.isOpen() && !ch.isRegistered())
                    ((SelChImpl)ch).kill();
            }
        }
    }
}
//sun.nio.ch.WindowsSelectorImpl#implDereg
@Override
protected void implDereg(SelectionKeyImpl ski) {
    assert !ski.isValid();
    assert Thread.holdsLock(this);

    if (fdMap.remove(ski) != null) {
        int i = ski.getIndex();
        assert (i >= 0);

        if (i != totalChannels - 1) {
            // Copy end one over it
            SelectionKeyImpl endChannel = channelArray[totalChannels-1];
            channelArray[i] = endChannel;
            endChannel.setIndex(i);
            pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
        }
        ski.setIndex(-1);

        channelArray[totalChannels - 1] = null;
        totalChannels--;
        if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
            totalChannels--;
            threadsCount--; // The last thread has become redundant.
        }
    }
}

//sun.nio.ch.SocketChannelImpl#kill
@Override
public void kill() throws IOException {
    synchronized (stateLock) {
        if (state == ST_KILLPENDING) {
            state = ST_KILLED;
            nd.close(fd);
        }
    }
}
//C:/Program Files/Java/jdk-11.0.1/lib/src.zip!/java.base/sun/nio/ch/SocketChannelImpl.java:1126
static {
    IOUtil.load();
    nd = new SocketDispatcher();
}
//sun.nio.ch.SocketDispatcher#close
void close(FileDescriptor fd) throws IOException {
    close0(fd);
}
Интерпретация AdjustThreadsCount
  1. Затем мы видим上面WindowsSelectorImpl#doSelectпокажи исходный кодadjustThreadsCount()вызов метода.
    • Ранее упоминалось, что еслиtotalChannels % MAX_SELECTABLE_FDS == 0, затем откройте еще один поток для обработкиselector. Здесь основано наКоличество выделенных потоковЧтобы увеличить или уменьшить потоки, это фактически максимумselectОграничение дескриптора файла для операций корректируется с учетом количества потоков.
    • Посмотрим, что делает созданный поток, то есть понаблюдаемSelectThreadизrunреализация метода. Наблюдая за его исходным кодом, вы можете увидеть, что он первыйwhile (true),пройти черезstartLock.waitForStart(this)Чтобы контролировать, запущен ли поток или ожидает, если он запущен, он вызоветsubSelector.poll(index)(Мы подробно объясним это позже),
    • когда эта темаpollконец, и если есть несколько потоков относительно текущего основного потокаSelectThreadДля дочерних потоков текущийSelectThreadНить заканчивается первойpollтогда позвониfinishLock.threadFinished()для уведомления основного потока. Просто создайте этот поток и вызовите егоrunметод, в настоящее времяlastRun = 0, при первом запускеsun.nio.ch.WindowsSelectorImpl.StartLock#runsCounterТо же самое 0, поэтому он вызоветstartLock.wait()Затем войдите в состояние ожидания.

Уведомление:

  • sun.nio.ch.WindowsSelectorImpl.StartLockОн также будет судить, заброшен ли текущий обнаруженный поток, и если он заброшен, он вернетtrue, так что обнаруженный поток также может выйти из своего метода запуска.whileТаким образом, цикл завершает выполнение потока.
  • При регулировке резьбы (вызовadjustThreadsCountметод) сSelectorперечислитьcloseметод будет вызываться косвенноsun.nio.ch.WindowsSelectorImpl#implClose, оба метода предполагаютSelectorОсвобождение потока, то есть вызовsun.nio.ch.WindowsSelectorImpl.SelectThread#makeZombie.
  • finishLock.threadFinished()позвонюwakeup()метод для уведомления основного потока, здесь мы можем узнать подробности, если поток блокируется вselectметод, вы можете позвонитьwakeupметод приведет к немедленному возврату блокирующей операции выбора черезWindowsСоответствующая реализация, принцип на самом делеpipeизsinkБайт записывается в конец,sourceДескриптор файла находится в состоянии готовности,pollметод возвращается, что приводит кselectметод возвращает. В других системах Solaris или Linux фактически используются системные вызовы.pipeЧтобы завершить создание конвейера, это эквивалентно непосредственному использованию системного конвейера. пройти черезwakeup()Соответствующую реализацию также можно увидеть, вызвавwakeupустановитinterruptTriggeredБит флага, поэтому вызывайте несколько раз подрядwakeupЭффект эквивалентен звонку и не приведет к появлению незначительных ошибок.
//sun.nio.ch.WindowsSelectorImpl#adjustThreadsCount
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount() {
    if (threadsCount > threads.size()) {
        // More threads needed. Start more threads.
        for (int i = threads.size(); i < threadsCount; i++) {
            SelectThread newThread = new SelectThread(i);
            threads.add(newThread);
            newThread.setDaemon(true);
            newThread.start();
        }
    } else if (threadsCount < threads.size()) {
        // Some threads become redundant. Remove them from the threads List.
        for (int i = threads.size() - 1 ; i >= threadsCount; i--)
            threads.remove(i).makeZombie();
    }
}

//sun.nio.ch.WindowsSelectorImpl.SelectThread
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
    private final int index; // index of this thread
    final SubSelector subSelector;
    private long lastRun = 0; // last run number
    private volatile boolean zombie;
    // Creates a new thread
    private SelectThread(int i) {
        super(null, null, "SelectorHelper", 0, false);
        this.index = i;
        this.subSelector = new SubSelector(i);
        //make sure we wait for next round of poll
        this.lastRun = startLock.runsCounter;
    }
    void makeZombie() {
        zombie = true;
    }
    boolean isZombie() {
        return zombie;
    }
    public void run() {
        while (true) { // poll loop
            // wait for the start of poll. If this thread has become
            // redundant, then exit.
            if (startLock.waitForStart(this))
                return;
            // call poll()
            try {
                subSelector.poll(index);
            } catch (IOException e) {
                // Save this exception and let other threads finish.
                finishLock.setException(e);
            }
            // notify main thread, that this thread has finished, and
            // wakeup others, if this thread is the first to finish.
            finishLock.threadFinished();
        }
    }
}

// sun.nio.ch.WindowsSelectorImpl.FinishLock#threadFinished
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished() {
    if (threadsToFinish == threads.size()) { // finished poll() first
        // if finished first, wakeup others
        wakeup();
    }
    threadsToFinish--;
    if (threadsToFinish == 0) // all helper threads finished poll().
        notify();             // notify the main thread
}

//sun.nio.ch.WindowsSelectorImpl#wakeup
@Override
public Selector wakeup() {
    synchronized (interruptLock) {
        if (!interruptTriggered) {
            setWakeupSocket();
            interruptTriggered = true;
        }
    }
    return this;
}
//sun.nio.ch.WindowsSelectorImpl#setWakeupSocket
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
    setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);

JNIEXPORT void JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
                                                jint scoutFd)
{
    /* Write one byte into the pipe */
    const char byte = 1;
    send(scoutFd, &byte, 1, 0);
}
Интерпретация метода опроса subSelector
  1. subSelector.poll()является ядром select,nativeфункцияpoll0осознать и поставитьpollWrapper.pollArrayAddressпередается как параметр вpoll0,readFds,writeFdsа такжеexceptFdsМассивы используются для хранения основныхselectВ результате первая позиция массива предназначена для хранения произошедшего события.socketОбщее количество и остальные места храненияsocketручкаfd. Мы знаем из следующего кода: этоpoll0()будет контролироватьpollWrapperсерединаFDВходящие и исходящие данные отсутствуют, что приведет кIOБлокировать до тех пор, пока не произойдет событие чтения или записи данных. потому чтоpollWrapperтакже сохранено вServerSocketChannelизFD, так что покаClientSocketотправить данные наServerSocket,Такpoll0()вернется; и потомуpollWrapperтакже сохранено вpipeизwriteконецFD, так что покаpipeизwriteконецFDОтправка части данных также вызоветpoll0()return; если ни одно из этих условий не выполняется, тоpoll0()блокировать все время, т.selector.select()будет заблокирован навсегда; если что-то из этого произойдет, тоselector.select()вернется, все вSelectThreadизrun()использовать вwhile (true) {}, что гарантирует, чтоselectorПродолжить прослушивание после получения данных и их обработкиpoll();

Видно, что NIO по-прежнему блокирует IO, так в чем же разница между ним и BIO. На самом деле разница заключается в расположении блока,BIOзаблокирован вreadметод (recvfrom), в то время какNIOзаблокирован вselectметод. Так какая польза от этого. Если просто поменять место засора, то ничего не изменится, ноepoll等Хитрость реализации в том, что она использует механизм обратного вызова, так что слушатель может знать только то, чтоsocketДанные в потоке готовы, просто обработайте данные в этих потоках. использоватьBIO, при условии, что1000соединение, нужно открыть1000нити, то есть1000индивидуальныйreadПозиция блока заблокирована (мы уже демонстрировали это через Demo в части BIO), используяNIOпрограммирование, просто1нить, которая используетselectсо стратегией опросаepollМеханизм событий и красно-черная древовидная структура данных уменьшают накладные расходы на его внутренний опрос и значительно сокращают накладные расходы на переключение контекста потока.

//sun.nio.ch.WindowsSelectorImpl.SubSelector
private final class SubSelector {
        private final int pollArrayIndex; // starting index in pollArray to poll
        // These arrays will hold result of native select().
        // The first element of each array is the number of selected sockets.
        // Other elements are file descriptors of selected sockets.
        // 保存发生read的FD
        private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
        // 保存发生write的FD
        private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
        //保存发生except的FD
        private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];

        private SubSelector() {
            this.pollArrayIndex = 0; // main thread
        }

        private SubSelector(int threadIndex) { // helper threads
            this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
        }

        private int poll() throws IOException{ // poll for the main thread
            return poll0(pollWrapper.pollArrayAddress,
                         Math.min(totalChannels, MAX_SELECTABLE_FDS),
                         readFds, writeFds, exceptFds, timeout);
        }

        private int poll(int index) throws IOException {
            // poll for helper threads
            return  poll0(pollWrapper.pollArrayAddress +
                     (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
                     Math.min(MAX_SELECTABLE_FDS,
                             totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
                     readFds, writeFds, exceptFds, timeout);
        }

        private native int poll0(long pollAddress, int numfds,
             int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
             ...
}
Интерпретация updateSelectedKeys
  1. будет проходить дальше上面WindowsSelectorImpl#doSelect展示源码中<5>кудаupdateSelectedKeys(action)обрабатывать каждыйchannelизГотовыйИнформация.
  • Если каналkeyеще нетselectedKeysсуществует, добавьте его в коллекцию.
  • Если каналkeyуже существуетselectedKeys, то есть этоchannelсуществуют для поддержкиReadyOpsГотовая операция должна содержать одну из этих операций (путем(ski.nioReadyOps() & ski.nioInterestOps()) != 0для подтверждения), измените егоReadyOpsдля текущей операции. и что мы видели раньшеConsumer<SelectionKey>Это действие также осуществляется здесь. Как видно из следующего исходного кода, ранее записанного вReadyOpsлюбая готовая информация при вызове этогоactionРаньше он был отброшен и установлен напрямую.
//sun.nio.ch.WindowsSelectorImpl#updateSelectedKeys
private int updateSelectedKeys(Consumer<SelectionKey> action) {
    updateCount++;
    int numKeysUpdated = 0;
    numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
    for (SelectThread t: threads) {
        numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
    }
    return numKeysUpdated;
}
//sun.nio.ch.SelectorImpl#processReadyEvents
protected final int processReadyEvents(int rOps,
                                        SelectionKeyImpl ski,
                                        Consumer<SelectionKey> action) {
    if (action != null) {
        ski.translateAndSetReadyOps(rOps);
        if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
            action.accept(ski);
            ensureOpen();
            return 1;
        }
    } else {
        assert Thread.holdsLock(publicSelectedKeys);
        if (selectedKeys.contains(ski)) {
            if (ski.translateAndUpdateReadyOps(rOps)) {
                return 1;
            }
        } else {
            ski.translateAndSetReadyOps(rOps);
            if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                selectedKeys.add(ski);
                return 1;
            }
        }
    }
    return 0;
}
//sun.nio.ch.WindowsSelectorImpl.SubSelector#processSelectedKeys
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
    int numKeysUpdated = 0;
    numKeysUpdated += processFDSet(updateCount, action, readFds,
                                    Net.POLLIN,
                                    false);
    numKeysUpdated += processFDSet(updateCount, action, writeFds,
                                    Net.POLLCONN |
                                    Net.POLLOUT,
                                    false);
    numKeysUpdated += processFDSet(updateCount, action, exceptFds,
                                    Net.POLLIN |
                                    Net.POLLCONN |
                                    Net.POLLOUT,
                                    true);
    return numKeysUpdated;
}

    /**
    * sun.nio.ch.WindowsSelectorImpl.SubSelector#processFDSet
    * updateCount is used to tell if a key has been counted as updated
    * in this select operation.
    *
    * me.updateCount <= updateCount
    */
private int processFDSet(long updateCount,
                            Consumer<SelectionKey> action,
                            int[] fds, int rOps,
                            boolean isExceptFds)
{
    int numKeysUpdated = 0;
    for (int i = 1; i <= fds[0]; i++) {
        int desc = fds[i];
        if (desc == wakeupSourceFd) {
            synchronized (interruptLock) {
                interruptTriggered = true;
            }
            continue;
        }
        MapEntry me = fdMap.get(desc);
        // If me is null, the key was deregistered in the previous
        // processDeregisterQueue.
        if (me == null)
            continue;
        SelectionKeyImpl sk = me.ski;

        // The descriptor may be in the exceptfds set because there is
        // OOB data queued to the socket. If there is OOB data then it
        // is discarded and the key is not added to the selected set.
        if (isExceptFds &&
            (sk.channel() instanceof SocketChannelImpl) &&
            discardUrgentData(desc))
        {
            continue;
        }
        //我们应该关注的
        int updated = processReadyEvents(rOps, sk, action);
        if (updated > 0 && me.updateCount != updateCount) {
            me.updateCount = updateCount;
            numKeysUpdated++;
        }
    }
    return numKeysUpdated;
}

На этом содержание Selector пока подошло к концу.В следующей статье я буду интерпретировать Java NIO Buffer.