предисловие
В этой серии статей будет подробно рассказано о постепенном обогащении функций NIO, что проложит путь к объяснению библиотеки Reactor-Netty.
О методологии программирования на Java: совместное использование видео Reactor и Webflux, Rxjava и Reactor завершены, адрес станции b следующий:
Интерпретация исходного кода Rxjava и совместное использование:вооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо
Интерпретация и совместное использование исходного кода Reactor:вооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо
Интерпретация исходного кода этой серии основана на деталях API JDK11, которые могут отличаться от других версий.Пожалуйста, решите проблему с версией JDK самостоятельно.
Предыдущие статьи из этой серии:
Некоторые сведения об исходном коде BIO to NIO BIO
Некоторые сведения об исходном коде BIO to NIO на NIO
Некоторые сведения об исходном коде BIO to NIO в NIO
Введение SelectionKey
Как мы говорили в предыдущем содержании, после того, как студент определен, нам нужно установить его статус, а затем передать его вSelector
Для управления мы можем установить его статус черезSelectionKey
продолжать.
Тогда здесь мы сначала проходимChannel
подробно не объяснилSelectableChannel
внизregister
метод. Как мы упоминали ранее,SelectableChannel
будетchannel
дать возможность пройтиSelector
для мультиплексирования. Как менеджер,channel
Для повторного использования необходимо зарегистрироваться у администратора. так,SelectableChannel
внизregister
Этот метод является ядром нашего второго внимания, а также точкой входа для нашего следующего содержания.register
Для интерпретации метода, пожалуйста, смотрите нашу предыдущую статьюНекоторые сведения об исходном коде BIO to NIO на NIOсерединаДайте каналу возможность быть мультиплексированнымсодержание этого раздела.
Здесь нужно помнитьSelectableChannel
причаливаетchannel
особенность (т.е.SelectionKey
), что похоже на табличное оформление.Изначально функции можно было задавать в таблице, но для того, чтобы сделать операцию более целенаправленной, то есть для того, чтобы упростить управление кодовыми функциями, мы извлекли и спроектировали первую таблицу Две таблицы чем-то напоминают человеческие органы, в целом все работают вместе для выполнения одной задачи, но внутренние органы сосредоточены на своих основных специфических функциях, а иногда имеют некоторые мелкие функции других органов.
Отсюда мы также можем знать, чтоSelectionKey
означаетSelectableChannel
а такжеSelector
Связанный тег может быть просто понят какtoken
. Он похож на тот, который ресепшн получит из фона после того, как пользователь войдет в систему управления правами.token
Таким образом, пользователи могут положиться на этоtoken
для доступа к соответствующей информации о ресурсах для операции.
//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{ ...
synchronized (regLock) {
...
synchronized (keyLock) {
...
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}
Комбинируя верхний и нижний исходный код, в каждомSelector
использоватьregister
регистрация методаchannel
, создаст и вернетSelectionKey
.
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
мы вНекоторые сведения об исходном коде BIO to NIO на NIOсерединаДайте каналу возможность быть мультиплексированнымСодержимое этого раздела известно, что после регистрации наSelector
начальство,Channel
останется зарегистрированным до тех пор, пока его не снимут с учета. Незарегистрированный, когда незарегистрированныйSelector
назначен наChannel
всех ресурсов.
то естьSelectionKey
в своем призывеSelectionKey#channel
метод, или что этот ключ представляетchannel
закрытый, или ключ, связанный сSelector
Он действителен до тех пор, пока не будет закрыт. Мы также знаем из анализа предыдущей статьи, что отменаSelectionKey
, не сразу изSelector
удалено, оно будет добавлено вSelector
изcancelledKeys
этоSet
коллекция, которая будет удалена во время следующей операции выбора, мы можем сделать это с помощьюjava.nio.channels.SelectionKey#isValid
судитьSelectionKey
это эффективно.
SelectionKey содержит четыре набора операций, каждый набор операций представлен Int, а младшие четыре бита значения int используются для представленияchannel
Поддерживаемые типы необязательных операций.
/**
* Operation-set bit for read operations.
*/
public static final int OP_READ = 1 << 0;
/**
* Operation-set bit for write operations.
*/
public static final int OP_WRITE = 1 << 2;
/**
* Operation-set bit for socket-connect operations.
*/
public static final int OP_CONNECT = 1 << 3;
/**
* Operation-set bit for socket-accept operations.
*/
public static final int OP_ACCEPT = 1 << 4;
interestOps
пройти черезinterestOps
быть увереннымselector
Какие категории действий проверяются на готовность во время следующего действия выбора, является ли событие действияchannel
обеспокоенный.interestOps
существуетSelectionKey
При создании инициализируется для регистрацииSelector
значение ops в то время, это значение может быть передано черезsun.nio.ch.SelectionKeyImpl#interestOps(int)
меняться, мыSelectorImpl#register
можно ясно увидеть.
//sun.nio.ch.SelectionKeyImpl
public final class SelectionKeyImpl
extends AbstractSelectionKey
{
private static final VarHandle INTERESTOPS =
ConstantBootstraps.fieldVarHandle(
MethodHandles.lookup(),
"interestOps",
VarHandle.class,
SelectionKeyImpl.class, int.class);
private final SelChImpl channel;
private final SelectorImpl selector;
private volatile int interestOps;
private volatile int readyOps;
// registered events in kernel, used by some Selector implementations
private int registeredEvents;
// index of key in pollfd array, used by some Selector implementations
private int index;
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch;
selector = sel;
}
...
}
readyOps
readyOps
выразить черезSelector
обнаруженchannel
События действий, которые готовы. существуетSelectionKey
При создании (как показано в исходном коде выше)readyOps
значение равно 0, вSelector
изselect
Он может быть обновлен во время операции, но следует отметить, что мы не можем напрямую вызвать обновление.
SelectionKey
изreadyOps
означаетchannel
Он готов к некоторым операциям, но нет гарантии, что во время операции для этого типа события готовности не произойдет блокировка, то есть поток, в котором находится операция, может заблокироваться. по окончанииselect
Сразу после операции в большинстве случаевreadyOps
обновить в это времяreadyOps
значение является наиболее точным, если внешнее событие илиchannel
Есть операции ввода-вывода,readyOps
может быть неточным. Итак, мы увидели, что этоvolatile
Типы.
SelectionKey
Все операционные события определены, но специфичныchannel
Поддерживаемые рабочие события зависят от конкретныхchannel
, то есть конкретный анализ конкретных проблем.
все необязательныеchannel
(которыйSelectableChannel
подклассы) доступны черезSelectableChannel#validOps
метод для определения того, было ли событие операцииchannel
поддерживается, то есть каждый подкласс будет иметь паруvalidOps
реализация, которая возвращает число, которое идентифицирует толькоchannel
Какие операции поддерживаются. попробуйте установить или проверитьchannel
Установлены поддерживаемые операции, и будет выдано соответствующее исключение времени выполнения.
В различных сценариях применения поддерживаемыеOps
отличается, вырванная часть выглядит так:
//java.nio.channels.SocketChannel#validOps
public final int validOps() {
//即1|4|8 1101
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE
| SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps() {
// 16
return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps() {
// 1|4
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
Если вам нужно часто связывать некоторые данные, указанные в нашей программе, сSelectionKey
, например, мы используем объект для представления состояния высокоуровневого протокола на верхнем уровне, а объект используется для уведомления реализации процессора протокола. Итак, SelectionKey поддерживает черезattach
метод присоединения объекта кSelectionKey
изattachment
начальство.attachment
в состоянии пройтиjava.nio.channels.SelectionKey#attachment
способ доступа. Если вы хотите отменить объект, вы можете сделать это следующим образом:selectionKey.attach(null)
.
Следует отметить, что если прикрепленный объект больше не используется, его необходимо очистить вручную, если нет, если даSelectionKey
Он существовал всегда. Поскольку здесь он является сильной ссылкой, сборщик мусора не будет перерабатывать объект. Если его не очистить, это станет утечкой памяти.
SelectionKey является потокобезопасным при одновременном использовании несколькими потоками. Нам просто нужно знать,Selector
изselect
Операция всегда будет использовать текущийinterestOps
установленное значение.
СелекторИсследовать
До сих пор мы были на связи более или менееSelector
, что это за роль, мне должно быть очень ясно, тогда мы будем дальше исследовать то, с чем мы уже соприкоснулисьSelector
конструкция рабочего механизма.
Открытый метод селектора
Вы можете сказать из названияSelectableChannel
объект зависит отSelector
для достижения мультиплексирования.
Мы можем позвонитьjava.nio.channels.Selector#open
создатьselector
Объект:
//java.nio.channels.Selector#open
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
об этомSelectorProvider.provider()
, который использует реализацию по умолчанию в соответствии с системой, в которой он находится.Я здесь система Windows, тогда его реализация по умолчаниюsun.nio.ch.WindowsSelectorProvider
, чтобы можно было вызвать конкретную реализацию на основе соответствующей системы.
//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {
/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}
}
Основываясь на окнах, селектор в конечном итоге будет использоваться здесь.sun.nio.ch.WindowsSelectorImpl
Давайте проведем базовую логику.
public class WindowsSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
Здесь нам нужно посмотреть наWindowsSelectorImpl
конструктор:
//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
нам даноPipe.open()
просто знаюselector
будет оставаться открытым до тех пор, пока не вызовет свойclose
метод:
//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
boolean open = selectorOpen.getAndSet(false);
if (!open)
return;
implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
implClose();
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}
//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose() throws IOException {
assert !isOpen();
assert Thread.holdsLock(this);
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
pollWrapper.free();
// Make all remaining helper threads exit
for (SelectThread t: threads)
t.makeZombie();
startLock.startThreads();
}
Как видите, предыдущийwakeupPipe
Закрыто методом close. Здесь снова задействован метод closewakeupPipe.sink()
а такжеwakeupPipe.source()
закрытие сpollWrapper.free()
Освобождение, вот и сложность нашей статьи, вот, давайте разберемся, что это за существование.
Во-первых, у нас естьWindowsSelectorImpl(SelectorProvider sp)
Этот конструктор делает следующее:
- Создавать
PollArrayWrapper
объект (pollWrapper
); -
Pipe.open()
открыть трубу; - получить
wakeupSourceFd
а такжеwakeupSinkFd
два файловых дескриптора; - Поместите файловый дескриптор на исходную сторону канала (
wakeupSourceFd
) вставитьpollWrapper
внутри;
Решение Pipe.open()
Здесь мы зададимся вопросом, зачем создается пайп и для чего он используется.
Давайте посмотримPipe.open()
Реализация исходного кода:
//java.nio.channels.Pipe#open
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
try {
AccessController.doPrivileged(new Initializer(sp));
} catch (PrivilegedActionException x) {
throw (IOException)x.getCause();
}
}
private class Initializer
implements PrivilegedExceptionAction<Void>
{
private final SelectorProvider sp;
private IOException ioe = null;
private Initializer(SelectorProvider sp) {
this.sp = sp;
}
@Override
public Void run() throws IOException {
LoopbackConnector connector = new LoopbackConnector();
connector.run();
if (ioe instanceof ClosedByInterruptException) {
ioe = null;
Thread connThread = new Thread(connector) {
@Override
public void interrupt() {}
};
connThread.start();
for (;;) {
try {
connThread.join();
break;
} catch (InterruptedException ex) {}
}
Thread.currentThread().interrupt();
}
if (ioe != null)
throw new IOException("Unable to establish loopback connection", ioe);
return null;
}
Из приведенного выше исходного кода мы можем знать, чтоPipeImpl
объект, вPipeImpl
будет выполняться в конструктореAccessController.doPrivileged
, который выполняется сразу после его вызоваInitializer
изrun
метод:
//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {
@Override
public void run() {
ServerSocketChannel ssc = null;
SocketChannel sc1 = null;
SocketChannel sc2 = null;
try {
// Create secret with a backing array.
ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);
// Loopback address
InetAddress lb = InetAddress.getLoopbackAddress();
assert(lb.isLoopbackAddress());
InetSocketAddress sa = null;
for(;;) {
// Bind ServerSocketChannel to a port on the loopback
// address
if (ssc == null || !ssc.isOpen()) {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(lb, 0));
sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
}
// Establish connection (assume connections are eagerly
// accepted)
sc1 = SocketChannel.open(sa);
RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
do {
sc1.write(secret);
} while (secret.hasRemaining());
secret.rewind();
// Get a connection and verify it is legitimate
sc2 = ssc.accept();
do {
sc2.read(bb);
} while (bb.hasRemaining());
bb.rewind();
if (bb.equals(secret))
break;
sc2.close();
sc1.close();
}
// Create source and sink channels
source = new SourceChannelImpl(sp, sc1);
sink = new SinkChannelImpl(sp, sc2);
} catch (IOException e) {
try {
if (sc1 != null)
sc1.close();
if (sc2 != null)
sc2.close();
} catch (IOException e2) {}
ioe = e;
} finally {
try {
if (ssc != null)
ssc.close();
} catch (IOException e2) {}
}
}
}
}
вот для созданияpipe
процесс,windows
Реализация ниже заключается в создании двух локальныхsocketChannel
, а затем подключиться (процесс подключения выполняется путем записи случайных данных для проверки соединения двух сокетов), дваsocketChannel
Реализовал конвейер отдельноpipe
изsource
а такжеsink
конец.
И мы до сих пор не знаем этогоpipe
Для чего это?
Если вы знакомы с системными вызовамиC/C++
, вы можете знать, что блок вselect
Существует три способа разбудить поток:
- Есть данные для чтения/записи, или возникает исключение.
- Время блокировки истекло, т.е.
time out
. - получил
non-block
сигнал о. кkill
илиpthread_kill
проблема.
так,Selector.wakeup()
разбудить заблокированногоselect
, то только этими тремя способами, среди которых:
- Второй способ можно исключить, т.к.
select
После блокировки его нельзя изменитьtime out
время. - А третий вроде бы только в
Linux
понял выше,Windows
Механизма такой сигнализации нет.
Кажется, есть только первый способ. Если мы позвоним несколько разSelector.open()
, затем вWindows
Каждый раз, когда созывается собрание, оно устанавливает пару из себя и своегоloopback
изTCP
Соединение, в Linux пара открывается при каждом вызовеpipe
(трубы обычно открываются парами под Linux), в этот момент предполагается, что мы можем угадать - то есть, если вы хотите проснутьсяselect
, просто двигайтесь к своимloopback
Соединение отправляет данные в прошлое, поэтому вы можете проснуться и заблокировать вselect
на нитке.
Резюмируем вышеизложенное: вWindows
Вниз,Java
виртуальная машина вSelector.open()
будет строиться на себе и самостоятельноloopback
изTCP
подключиться; вLinux
Вниз,Selector
создастpipe
. Это в основном дляSelector.wakeup()
Может легко проснуться заблокированнымselect()
Поток в системном вызове (путем вызоваTCP
Просто напишите что-нибудь в ссылках и каналах, чтобы разбудить заблокированные темы).
Интерпретация PollArrayWrapper
существуетWindowsSelectorImpl
Конструктор Наконец, мы видим эту строку кода:pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
, то есть поставить дескриптор файла на стороне Source в пайпе (wakeupSourceFd
) вставитьpollWrapper
внутри.pollWrapper
в видеPollArrayWrapper
, что это такое, в этом разделе мы его рассмотрим.
class PollArrayWrapper {
private AllocatedNativeObject pollArray; // The fd array
long pollArrayAddress; // pollArrayAddress
@Native private static final short FD_OFFSET = 0; // fd offset in pollfd
@Native private static final short EVENT_OFFSET = 4; // events offset in pollfd
static short SIZE_POLLFD = 8; // sizeof pollfd struct
private int size; // Size of the pollArray
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
...
// Access methods for fd structures
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
...
// Adds Windows wakeup socket at a given index.
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
}
здесь будетwakeupSourceFd
изPOLLIN
Событие идентифицируется какpollArray
изEventOps
Соответствующее значение , здесь — память, непосредственно управляемая небезопасным, т. е. относительно этогоpollArray
Смещение адреса памяти, где он находитсяSIZE_POLLFD * i + EVENT_OFFSET
пишите по этому адресуNet.POLLIN
Представленное значение — это значение, показанное в исходном коде, связанном с локальным методом ниже.putDescriptor
Это такая же операция. когдаsink端
Когда данные записываются,source
соответствующий файловый дескрипторwakeupSourceFd
будет в состоянии готовности.
//java.base/windows/native/libnio/ch/nio_util.h
/* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined */
/* in Windows Vista / Windows Server 2008 and later. If we are on an */
/* older release we just use the Solaris constants as this was previously */
/* done in PollArrayWrapper.java. */
#define POLLIN 0x0001
#define POLLOUT 0x0004
#define POLLERR 0x0008
#define POLLHUP 0x0010
#define POLLNVAL 0x0020
#define POLLCONN 0x0002
AllocatedNativeObject
Родительский класс этого класса имеет большое количествоunsafe
Операции класса, они напрямую основаны на операциях уровня памяти. Из конструктора его родительского класса мы также можем ясно видетьpollArray
черезunsafe.allocateMemory(size + ps)
Выделенный блок системной памяти.
class AllocatedNativeObject // package-private
extends NativeObject
{
/**
* Allocates a memory area of at least {@code size} bytes outside of the
* Java heap and creates a native object for that area.
*/
AllocatedNativeObject(int size, boolean pageAligned) {
super(size, pageAligned);
}
/**
* Frees the native memory area associated with this object.
*/
synchronized void free() {
if (allocationAddress != 0) {
unsafe.freeMemory(allocationAddress);
allocationAddress = 0;
}
}
}
//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
if (!pageAligned) {
this.allocationAddress = unsafe.allocateMemory(size);
this.address = this.allocationAddress;
} else {
int ps = pageSize();
long a = unsafe.allocateMemory(size + ps);
this.allocationAddress = a;
this.address = a + ps - (a & (ps - 1));
}
}
Пока мы закончилиSelector.open()
интерпретации, его основная задача состоит в том, чтобы завершить установлениеPipe
, и положиpipe
source
конецwakeupSourceFd
положить вpollArray
, этоpollArray
даSelector
Хаб, который выполняет квесты своего персонажа. В этой статье в основном анализируется реализация Windows, то есть через два подключения под WindowssocketChannel
ДостигнутоPipe
,linux
непосредственно использовать системуpipe
Вот и все.
Управление SelectionKey в селекторе
SelectionKey зарегистрирован в селекторе
Так называемая регистрация фактически помещает объект в поле-контейнер в зарегистрированном объекте.Это поле может быть массивом, очередью, набором коллекций или списком. Здесь то же самое, за исключением того, что он должен иметь возвращаемое значение, поэтому просто верните объект, который будет помещен в коллекцию.
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}
Мы видели этот код раньше, здесь мы просматриваем его снова.
Сначала создайте новыйSelectionKeyImpl
объект, этот объект является правильнымChannel
упаковка, не только это, но и кстати токSelector
Объект принят, поэтому мы также можем передатьSelectionKey
объект, чтобы получить соответствующийSelector
объект.
Далее, исходя изwindows
платформа реализованаimplRegister
, сначала черезensureOpen()
обеспечить, чтобыSelector
открыт. Тогда поместите этоSelectionKeyImpl
ПрисоединяйсяWindowsSelectorImpl
внутренняя цельновая регистрацияУправляется SelectionKeynewKeys
среди,newKeys
ЯвляетсяArrayDeque
объект. заArrayDeque
Если вы не понимаете, вы можете обратиться кDeque и ArrayDeque анализа исходного кода контейнера JavaЭта статья.
тогда этоSelectionKeyImpl
Присоединяйсяsun.nio.ch.SelectorImpl#keys
иди, этоSet<SelectionKey>
Коллекция, представляющая тех, кто зарегистрирован в текущемSelector
на объектеSelectionKey
собирать. Давайте посмотримsun.nio.ch.SelectorImpl
конструктор:
//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = ConcurrentHashMap.newKeySet();
selectedKeys = new HashSet<>();
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
То есть здесьpublicKeys
происходит отkeys
,ТолькоpublicKeys
доступен только для чтения, мы хотим знать текущуюSelector
зарегистрирован на объектеkeys
, вы можете позвонитьsun.nio.ch.SelectorImpl#keys
получить:
//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys() {
ensureOpen();
return publicKeys;
}
Вернувшись в этот конструктор,selectedKeys
, как следует из названия, принадлежит выбранным Ключам, то есть во время предыдущей операцииChannel
соответствующийSelectionKey
. Эта коллекцияkeys
подмножество . пройти черезselector.selectedKeys()
Получать.
//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
ensureOpen();
return publicSelectedKeys;
}
Мы видим, что он возвращаетсяpublicSelectedKeys
, элементы в этом поле можно удалять, но нельзя добавлять.
В предыдущем материале мы рассмотрелиSelectionKey
отмена, так что мы вjava.nio.channels.spi.AbstractSelector
метод, определяетсяcancelledKeys
Да, такжеHashSet
объект. представитель которого аннулирован, но не снят с регистрацииSelectionKey
. Эта коллекция Set недоступна напрямую, и опять же, это подмножество keys().
для новыхSelector
Например, все приведенные выше наборы пусты. Как видно из приведенного выше исходного кода, черезchannel.register
будетSelectionKey
Добавить кkeys
, это источник ключа.
еслиselectionKey.cancel()
вызывается, то ключ будет добавлен вcancelledKeys
этой коллекции, затем при следующем вызове селектораselect
метод, в настоящее времяcanceldKeys
не пусто, вызовет этоSelectionKey
изderegister
операция (высвобождение ресурсов иkeys
удаленный). будь то черезchannel.close()
или черезselectionKey.cancel()
, приведет кSelectionKey
быть добавленным кcannceldKey
середина.
Во время каждой операции выбора (выбрать) ключ может быть добавлен вselectedKeys
в или изcancelledKeys
удалено в.
Интерпретация метода выбора Selector
Зная вышеизложенное, перейдем кselect
метод, обратите внимание на его детали. Зависит отSelector
Апи известно,select
Есть две формы операции, одна из них
select(), selectNow(), select(long timeout); другойselect(Consumer<SelectionKey> action, long timeout)
,select(Consumer<SelectionKey> action)
,selectNow(Consumer<SelectionKey> action)
. Последний представляет собой недавно добавленный API в JDK11, который в основном предназначен для пользовательской операции, выполняемой с соответствующим ключом каналами, готовыми к операциям ввода-вывода в процессе выбора.
Следует отметить, что существуютConsumer<SelectionKey> action
Операция выбора параметра блокируется, она будет вызываться только в том случае, если выбран хотя бы один канал.Selector
примерwakeup
метод пробуждения, аналогично поток, в котором он находится, может быть прерван.
//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout)
throws IOException
{
Objects.requireNonNull(action);
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
Мы можем заметить, что в любом случае все они оказываются вlockAndDoSelect
В этом методе конкретная система в конечном итоге будет выполненаdoSelect(action, timeout)
выполнить.
Здесь мы беремsun.nio.ch.WindowsSelectorImpl#doSelect
В качестве примера для описания шагов его работы:
// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue(); // <1>
processDeregisterQueue(); // <2>
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll(); // <3>
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue(); // <4>
int updated = updateSelectedKeys(action); // <5>
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket(); // <6>
return updated;
}
Интерпретация процессаUpdateQueue
-
Во-первых, мы можем узнать, что через конкретную реализацию соответствующего класса реализации операционной системы (здесь — WindowsSelectorImpl), через
<1>
кудаprocessUpdateQueue()
узнать о каждом оставшемсяChannel
(Некоторые каналы были отменены) на данный моментinterestOps
, в том числе вновь зарегистрированных иupdateKeys
, и сделай этоpollWrapper
управленческие операции.-
то есть для вновь зарегистрированных
SelectionKeyImpl
, мы относимся к этомуpollArray
Смещение адреса памяти, где он находитсяSIZE_POLLFD * totalChannels + FD_OFFSET
а такжеSIZE_POLLFD * totalChannels + EVENT_OFFSET
депозит отдельноSelectionKeyImpl
файловый дескрипторfd
соответствующий емуEventOps
(изначально 0). -
правильно
updateKeys
, потому что раньшеpollArray
Он был сохранен в относительной позиции , здесь нам также нужно судить о достоверности полученного ключа.SelectionKeyImpl
объектinterestOps
написатьpollWrapper
хранить его вEventOps
позиция.
Уведомление: правильно
newKeys
После оценки действительности ключа, если он действителен, он вызоветgrowIfNeeded()
метод, здесь мы сначала оценимchannelArray.length == totalChannels
, этоSelectionKeyImpl
Массив с начальной емкостью 8.channelArray
это на самом деле удобноSelector
управление в книгахSelectionKeyImpl
Это просто массив чисел, судя по длине и размеру массива, если иtotalChannels
(начальное значение равно 1) равно, а не только дляchannelArray
расширение и, что более важно, помощьpollWrapper
,ПозволятьpollWrapper
Расширение – это цель.и когда
totalChannels % MAX_SELECTABLE_FDS == 0
, затем откройте дополнительный поток для обработкиselector
.windows
начальствоselect
Системные вызовы имеют максимальное ограничение файлового дескриптора и могут быть опрошены только за один раз.1024
При наличии более 1024 файловых дескрипторов требуется многопоточный опрос. звонить в то же времяpollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels)
в связи с этимpollArray
Смещение адреса памяти, где он находитсяSIZE_POLLFD * totalChannels + FD_OFFSET
пишите по этому адресуwakeupSourceFd
представленыfdVal
ценность. Таким образом, только что запущенный поток может пройти черезMAX_SELECTABLE_FDS
определить это для мониторингаwakeupSourceFd
, легко проснутьсяselector
. пройти черезski.setIndex(totalChannels)
записыватьSelectionKeyImpl
Позиция индекса в массиве для последующего использования. -
/**
* sun.nio.ch.WindowsSelectorImpl#processUpdateQueue
* Process new registrations and changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);
synchronized (updateLock) {
SelectionKeyImpl ski;
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.putEntry(totalChannels, ski);
totalChannels++;
MapEntry previous = fdMap.put(ski);
assert previous == null;
}
}
// changes to interest ops
while ((ski = updateKeys.pollFirst()) != null) {
int events = ski.translateInterestOps();
int fd = ski.getFDVal();
if (ski.isValid() && fdMap.containsKey(fd)) {
int index = ski.getIndex();
assert index >= 0 && index < totalChannels;
pollWrapper.putEventOps(index, events);
}
}
}
}
//sun.nio.ch.PollArrayWrapper#putEntry
// Prepare another pollfd struct for use.
void putEntry(int index, SelectionKeyImpl ski) {
putDescriptor(index, ski.getFDVal());
putEventOps(index, 0);
}
//sun.nio.ch.WindowsSelectorImpl#growIfNeeded
private void growIfNeeded() {
if (channelArray.length == totalChannels) {
int newSize = totalChannels * 2; // Make a larger array
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
channelArray = temp;
pollWrapper.grow(newSize);
}
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;
threadsCount++;
}
}
// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private static final int MAX_SELECTABLE_FDS = 1024;
// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array, where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
// The number of valid entries in poll array, including entries occupied
// by wakeup socket handle.
private int totalChannels = 1;
//sun.nio.ch.PollArrayWrapper#grow
// Grows the pollfd array to new size
void grow(int newSize) {
PollArrayWrapper temp = new PollArrayWrapper(newSize);
for (int i = 0; i < size; i++)
replaceEntry(this, i, temp, i);
pollArray.free();
pollArray = temp.pollArray;
this.size = temp.size;
pollArrayAddress = pollArray.address();
}
// Maps file descriptors to their indices in pollArray
private static final class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
private MapEntry get(int desc) {
return get(Integer.valueOf(desc));
}
private MapEntry put(SelectionKeyImpl ski) {
return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
}
private MapEntry remove(SelectionKeyImpl ski) {
Integer fd = Integer.valueOf(ski.getFDVal());
MapEntry x = get(fd);
if ((x != null) && (x.ski.channel() == ski.channel()))
return remove(fd);
return null;
}
}
// class for fdMap entries
private static final class MapEntry {
final SelectionKeyImpl ski;
long updateCount = 0;
MapEntry(SelectionKeyImpl ski) {
this.ski = ski;
}
}
private final FdMap fdMap = new FdMap();
интерпретация processDeregisterQueue
- затем через
上面WindowsSelectorImpl#doSelect展示源码中<2>
кудаprocessDeregisterQueue()
.- правильно
cancelledKeys
очищать, проходитьcancelledKeys
, и для каждогоkey
провестиderegister
операции, то изcancelledKeys
удалить из коллекции, изkeys
коллекция сselectedKeys
Удалите его, чтобы освободить ссылку, которую gc удобно перерабатывать, - позвонить в пределах
implDereg
метод, начнется сchannelArray
удалить соответствующийChannel
представленыSelectionKeyImpl
,КорректированиеtotalChannels
и количество потоков, отmap
а такжеkeys
удалено вSelectionKeyImpl
, УдалитьChannel
ВверхSelectionKeyImpl
и закрытьChannel
. - Также было установлено, что
processDeregisterQueue()
метод вызываетpoll
Метод вызывается до и после, чтобы убедиться, что вызов обрабатывается правильно.poll
Ключи, отмененные в течение периода времени, когда метод заблокирован, могут быть своевременно очищены. - Наконец, рассудит это
cancelledKey
представленыchannel
Открывать и отменять регистрацию, если закрыть и отменять регистрацию, то ресурсы, занятые соответствующим файловым дескриптором, должны быть закрыты.
- правильно
/**
* sun.nio.ch.SelectorImpl#processDeregisterQueue
* Invoked by selection operations to process the cancelled-key set
*/
protected final void processDeregisterQueue() throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
i.remove();
// remove the key from the selector
implDereg(ski);
selectedKeys.remove(ski);
keys.remove(ski);
// remove from channel's key set
deregister(ski);
SelectableChannel ch = ski.channel();
if (!ch.isOpen() && !ch.isRegistered())
((SelChImpl)ch).kill();
}
}
}
}
//sun.nio.ch.WindowsSelectorImpl#implDereg
@Override
protected void implDereg(SelectionKeyImpl ski) {
assert !ski.isValid();
assert Thread.holdsLock(this);
if (fdMap.remove(ski) != null) {
int i = ski.getIndex();
assert (i >= 0);
if (i != totalChannels - 1) {
// Copy end one over it
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
}
ski.setIndex(-1);
channelArray[totalChannels - 1] = null;
totalChannels--;
if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
}
}
//sun.nio.ch.SocketChannelImpl#kill
@Override
public void kill() throws IOException {
synchronized (stateLock) {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
}
}
//C:/Program Files/Java/jdk-11.0.1/lib/src.zip!/java.base/sun/nio/ch/SocketChannelImpl.java:1126
static {
IOUtil.load();
nd = new SocketDispatcher();
}
//sun.nio.ch.SocketDispatcher#close
void close(FileDescriptor fd) throws IOException {
close0(fd);
}
Интерпретация AdjustThreadsCount
- Затем мы видим
上面WindowsSelectorImpl#doSelect
покажи исходный кодadjustThreadsCount()
вызов метода.- Ранее упоминалось, что если
totalChannels % MAX_SELECTABLE_FDS == 0
, затем откройте еще один поток для обработкиselector
. Здесь основано наКоличество выделенных потоковЧтобы увеличить или уменьшить потоки, это фактически максимумselect
Ограничение дескриптора файла для операций корректируется с учетом количества потоков. - Посмотрим, что делает созданный поток, то есть понаблюдаем
SelectThread
изrun
реализация метода. Наблюдая за его исходным кодом, вы можете увидеть, что он первыйwhile (true)
,пройти черезstartLock.waitForStart(this)
Чтобы контролировать, запущен ли поток или ожидает, если он запущен, он вызоветsubSelector.poll(index)
(Мы подробно объясним это позже), - когда эта тема
poll
конец, и если есть несколько потоков относительно текущего основного потокаSelectThread
Для дочерних потоков текущийSelectThread
Нить заканчивается первойpoll
тогда позвониfinishLock.threadFinished()
для уведомления основного потока. Просто создайте этот поток и вызовите егоrun
метод, в настоящее времяlastRun = 0
, при первом запускеsun.nio.ch.WindowsSelectorImpl.StartLock#runsCounter
То же самое 0, поэтому он вызоветstartLock.wait()
Затем войдите в состояние ожидания.
- Ранее упоминалось, что если
Уведомление:
sun.nio.ch.WindowsSelectorImpl.StartLock
Он также будет судить, заброшен ли текущий обнаруженный поток, и если он заброшен, он вернетtrue
, так что обнаруженный поток также может выйти из своего метода запуска.while
Таким образом, цикл завершает выполнение потока.- При регулировке резьбы (вызов
adjustThreadsCount
метод) сSelector
перечислитьclose
метод будет вызываться косвенноsun.nio.ch.WindowsSelectorImpl#implClose
, оба метода предполагаютSelector
Освобождение потока, то есть вызовsun.nio.ch.WindowsSelectorImpl.SelectThread#makeZombie
.finishLock.threadFinished()
позвонюwakeup()
метод для уведомления основного потока, здесь мы можем узнать подробности, если поток блокируется вselect
метод, вы можете позвонитьwakeup
метод приведет к немедленному возврату блокирующей операции выбора черезWindows
Соответствующая реализация, принцип на самом делеpipe
изsink
Байт записывается в конец,source
Дескриптор файла находится в состоянии готовности,poll
метод возвращается, что приводит кselect
метод возвращает. В других системах Solaris или Linux фактически используются системные вызовы.pipe
Чтобы завершить создание конвейера, это эквивалентно непосредственному использованию системного конвейера. пройти черезwakeup()
Соответствующую реализацию также можно увидеть, вызвавwakeup
установитinterruptTriggered
Бит флага, поэтому вызывайте несколько раз подрядwakeup
Эффект эквивалентен звонку и не приведет к появлению незначительных ошибок.
//sun.nio.ch.WindowsSelectorImpl#adjustThreadsCount
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount() {
if (threadsCount > threads.size()) {
// More threads needed. Start more threads.
for (int i = threads.size(); i < threadsCount; i++) {
SelectThread newThread = new SelectThread(i);
threads.add(newThread);
newThread.setDaemon(true);
newThread.start();
}
} else if (threadsCount < threads.size()) {
// Some threads become redundant. Remove them from the threads List.
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
threads.remove(i).makeZombie();
}
}
//sun.nio.ch.WindowsSelectorImpl.SelectThread
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
private final int index; // index of this thread
final SubSelector subSelector;
private long lastRun = 0; // last run number
private volatile boolean zombie;
// Creates a new thread
private SelectThread(int i) {
super(null, null, "SelectorHelper", 0, false);
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
void makeZombie() {
zombie = true;
}
boolean isZombie() {
return zombie;
}
public void run() {
while (true) { // poll loop
// wait for the start of poll. If this thread has become
// redundant, then exit.
if (startLock.waitForStart(this))
return;
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// notify main thread, that this thread has finished, and
// wakeup others, if this thread is the first to finish.
finishLock.threadFinished();
}
}
}
// sun.nio.ch.WindowsSelectorImpl.FinishLock#threadFinished
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished() {
if (threadsToFinish == threads.size()) { // finished poll() first
// if finished first, wakeup others
wakeup();
}
threadsToFinish--;
if (threadsToFinish == 0) // all helper threads finished poll().
notify(); // notify the main thread
}
//sun.nio.ch.WindowsSelectorImpl#wakeup
@Override
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
//sun.nio.ch.WindowsSelectorImpl#setWakeupSocket
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
JNIEXPORT void JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
jint scoutFd)
{
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
Интерпретация метода опроса subSelector
-
subSelector.poll()
является ядром select,native
функцияpoll0
осознать и поставитьpollWrapper.pollArrayAddress
передается как параметр вpoll0
,readFds
,writeFds
а такжеexceptFds
Массивы используются для хранения основныхselect
В результате первая позиция массива предназначена для хранения произошедшего события.socket
Общее количество и остальные места храненияsocket
ручкаfd
. Мы знаем из следующего кода: этоpoll0()
будет контролироватьpollWrapper
серединаFD
Входящие и исходящие данные отсутствуют, что приведет кIO
Блокировать до тех пор, пока не произойдет событие чтения или записи данных. потому чтоpollWrapper
также сохранено вServerSocketChannel
изFD
, так что покаClientSocket
отправить данные наServerSocket
,Такpoll0()
вернется; и потомуpollWrapper
также сохранено вpipe
изwrite
конецFD
, так что покаpipe
изwrite
конецFD
Отправка части данных также вызоветpoll0()
return; если ни одно из этих условий не выполняется, тоpoll0()
блокировать все время, т.selector.select()
будет заблокирован навсегда; если что-то из этого произойдет, тоselector.select()
вернется, все вSelectThread
изrun()
использовать вwhile (true) {}
, что гарантирует, чтоselector
Продолжить прослушивание после получения данных и их обработкиpoll()
;
Видно, что NIO по-прежнему блокирует IO, так в чем же разница между ним и BIO. На самом деле разница заключается в расположении блока,
BIO
заблокирован вread
метод (recvfrom), в то время какNIO
заблокирован вselect
метод. Так какая польза от этого. Если просто поменять место засора, то ничего не изменится, ноepoll等
Хитрость реализации в том, что она использует механизм обратного вызова, так что слушатель может знать только то, чтоsocket
Данные в потоке готовы, просто обработайте данные в этих потоках. использоватьBIO
, при условии, что1000
соединение, нужно открыть1000
нити, то есть1000
индивидуальныйread
Позиция блока заблокирована (мы уже демонстрировали это через Demo в части BIO), используяNIO
программирование, просто1нить, которая используетselect
со стратегией опросаepoll
Механизм событий и красно-черная древовидная структура данных уменьшают накладные расходы на его внутренний опрос и значительно сокращают накладные расходы на переключение контекста потока.
//sun.nio.ch.WindowsSelectorImpl.SubSelector
private final class SubSelector {
private final int pollArrayIndex; // starting index in pollArray to poll
// These arrays will hold result of native select().
// The first element of each array is the number of selected sockets.
// Other elements are file descriptors of selected sockets.
// 保存发生read的FD
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
// 保存发生write的FD
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
//保存发生except的FD
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private SubSelector() {
this.pollArrayIndex = 0; // main thread
}
private SubSelector(int threadIndex) { // helper threads
this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
}
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long pollAddress, int numfds,
int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
...
}
Интерпретация updateSelectedKeys
- будет проходить дальше
上面WindowsSelectorImpl#doSelect展示源码中<5>
кудаupdateSelectedKeys(action)
обрабатывать каждыйchannel
изГотовыйИнформация.
- Если канал
key
еще нетselectedKeys
существует, добавьте его в коллекцию. - Если канал
key
уже существуетselectedKeys
, то есть этоchannel
существуют для поддержкиReadyOps
Готовая операция должна содержать одну из этих операций (путем(ski.nioReadyOps() & ski.nioInterestOps()) != 0
для подтверждения), измените егоReadyOps
для текущей операции. и что мы видели раньшеConsumer<SelectionKey>
Это действие также осуществляется здесь. Как видно из следующего исходного кода, ранее записанного вReadyOps
любая готовая информация при вызове этогоaction
Раньше он был отброшен и установлен напрямую.
//sun.nio.ch.WindowsSelectorImpl#updateSelectedKeys
private int updateSelectedKeys(Consumer<SelectionKey> action) {
updateCount++;
int numKeysUpdated = 0;
numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
for (SelectThread t: threads) {
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
}
return numKeysUpdated;
}
//sun.nio.ch.SelectorImpl#processReadyEvents
protected final int processReadyEvents(int rOps,
SelectionKeyImpl ski,
Consumer<SelectionKey> action) {
if (action != null) {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
action.accept(ski);
ensureOpen();
return 1;
}
} else {
assert Thread.holdsLock(publicSelectedKeys);
if (selectedKeys.contains(ski)) {
if (ski.translateAndUpdateReadyOps(rOps)) {
return 1;
}
} else {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
return 1;
}
}
}
return 0;
}
//sun.nio.ch.WindowsSelectorImpl.SubSelector#processSelectedKeys
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, action, readFds,
Net.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, action, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, action, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
}
/**
* sun.nio.ch.WindowsSelectorImpl.SubSelector#processFDSet
* updateCount is used to tell if a key has been counted as updated
* in this select operation.
*
* me.updateCount <= updateCount
*/
private int processFDSet(long updateCount,
Consumer<SelectionKey> action,
int[] fds, int rOps,
boolean isExceptFds)
{
int numKeysUpdated = 0;
for (int i = 1; i <= fds[0]; i++) {
int desc = fds[i];
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
MapEntry me = fdMap.get(desc);
// If me is null, the key was deregistered in the previous
// processDeregisterQueue.
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
// The descriptor may be in the exceptfds set because there is
// OOB data queued to the socket. If there is OOB data then it
// is discarded and the key is not added to the selected set.
if (isExceptFds &&
(sk.channel() instanceof SocketChannelImpl) &&
discardUrgentData(desc))
{
continue;
}
//我们应该关注的
int updated = processReadyEvents(rOps, sk, action);
if (updated > 0 && me.updateCount != updateCount) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
return numKeysUpdated;
}
На этом содержание Selector пока подошло к концу.В следующей статье я буду интерпретировать Java NIO Buffer.