Анализ Java NIO (8): подробное объяснение селектора ядра с высокой степенью параллелизма

Java задняя часть

последний разделАнализ Java NIO (7): введение в канал, буфер и селектор ядра NIOпредставилChannel,BufferиSelectorосновное использование
Получив представление о восприятии, давайте посмотрим, как реализован нижний слой Selector.

1. Дизайн селектора

Автор скачалopenjdk8исходный код, нарисуйте диаграмму классов

Ясно видно, что реализация Selector в openjdkSelectorImpl,
Затем SelectorImpl делегирует ответственность конкретной платформе, например той, что показана на рисунке после linux 2.6.EpollSelectorImpl, платформа WindowsWindowsSelectorImpl, MacOSXплатформаKQueueSelectorImpl.

Как можно догадаться из названия, openjdk все равно должен использоваться внизуepoll,kqueue,iocpЭти методы реализуют мультиплексирование ввода/вывода. ФронтАнализ Java NIO (3): выберите системный вызов для мультиплексирования ввода/вывода ,Анализ Java NIO (4): системный вызов опроса для мультиплексирования ввода-вывода , Анализ Java NIO (5): системный вызов epoll для мультиплексирования ввода/выводаЯ написал 3 статьи, чтобы объяснить его использование, и заинтересованные читатели могут оглянуться назад.

2. Получить селектор

Как мы все знаем,Selector.open()может получитьSelectorПример, как этого добиться?

// Selector.java
public static Selector open() throws IOException {
    // 首先找到provider,然后再打开Selector
    return SelectorProvider.provider().openSelector();
}

// java.nio.channels.spi.SelectorProvider
    public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                            // 这里就是打开Selector的真正方法
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

В openjdk каждая ОС имеетsun.nio.ch.DefaultSelectorProviderРеализация, возьмем солярис в качестве примера:

/**
 * Returns the default SelectorProvider.
 */
public static SelectorProvider create() {
    // 获取OS名称
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    // 根据名称来创建不同的Selctor
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}

Если имя системыLinux, то, что действительно создаетsun.nio.ch.EPollSelectorProvider. Если ни SunOS, ни Linux, используйтеsun.nio.ch.PollSelectorProvider, оPollSelectorЗаинтересованные читатели сами поймут, в этой статье используются только фактические общеупотребительныеEpollSelectorОбсудить на примере.

Открытымsun.nio.ch.EPollSelectorProviderПроверятьopenSelectorметод

public AbstractSelector openSelector() throws IOException {
    return new EPollSelectorImpl(this);
}

Очень интуитивно понятно, поэтому мы получаем окончательную реализацию Selector на платформе Linux:sun.nio.ch.EPollSelectorImpl

3. Как EPollSelector выбирает

Системный вызов epoll в основном делится на три функции.

  • epoll_create: создать epoll fd и открыть область высокоскоростного кэша ядра epoll, построить красно-черное дерево, выделить объекты памяти нужного размера и создать список связанных списков для хранения готовых событий.
  • epoll_wait: ждать, пока ядро ​​вернет события ввода-вывода
  • epoll_ctl: добавлять, изменять или удалять новые и старые события.

3.1 Создание Epoll fd

EPollSelectorImplКод конструктора выглядит следующим образом:

EPollSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    // makePipe返回管道的2个文件描述符,编码在一个long类型的变量中
    // 高32位代表读 低32位代表写
    // 使用pipe为了实现Selector的wakeup逻辑
    long pipeFds = IOUtil.makePipe(false);
    fd0 = (int) (pipeFds >>> 32);
    fd1 = (int) pipeFds;
    // 新建一个EPollArrayWrapper
    pollWrapper = new EPollArrayWrapper();
    pollWrapper.initInterrupt(fd0, fd1);
    fdToKey = new HashMap<>();
}

посмотри сноваEPollArrayWrapperпроцесс инициализации

EPollArrayWrapper() throws IOException {
    // creates the epoll file descriptor
    // 创建epoll fd
    epfd = epollCreate();

    // the epoll_event array passed to epoll_wait
    int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
    pollArray = new AllocatedNativeObject(allocationSize, true);
    pollArrayAddress = pollArray.address();

    // eventHigh needed when using file descriptors > 64k
    if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
        eventsHigh = new HashMap<>();
}

private native int epollCreate();

вызывается при инициализацииepollCreateметод, это собственный метод.
Открытымjdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
    /*
     * epoll_create expects a size as a hint to the kernel about how to
     * dimension internal structures. We can't predict the size in advance.
     */
     // 这里的size可以不指定,从Linux2.6.8之后,改用了红黑树结构,指定了大小也没啥用
    int epfd = epoll_create(256);
    if (epfd < 0) {
       JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}

Вы можете видеть, что API операционной системы все еще используется в конце:epoll_createфункция

3.2 Ожидание Epoll ожидает событий ввода-вывода ядра

перечислитьSelector.select(), который в конечном итоге делегирует каждой реализацииdoSelectМетод, ограниченный местом, не буду постить слишком подробно, смотрите здесьEpollSelectorImplизdoSelectметод

protected int doSelect(long timeout) throws IOException {
    if (closed)
        throw new ClosedSelectorException();
    processDeregisterQueue();
    try {
        begin();
        // 真正的实现是这行
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys();

    // 以下基本都是异常处理
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}

Тогда мы идем смотретьpollWrapper.poll, Открытымjdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java:

int poll(long timeout) throws IOException {
    updateRegistrations();
    // 这个epollWait是不是有点熟悉呢?
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}

private native int epollWait(long pollAddress, int numfds, long timeout,
                             int epfd) throws IOException;

epollWaitЭто также собственный метод, откройте код c, чтобы увидеть:

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                            jlong address, jint numfds,
                                            jlong timeout, jint epfd)
{
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
        // 发起epoll_wait系统调用等待内核事件
        RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout);
    }

    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}

Как видите, наконец-то началосьepoll_waitсистемный вызов.

3.3 управление epoll и инкапсуляция управления событиями с помощью openjdk

Отношение события ввода-вывода, зарегистрированное для селектора в JDK, должно использоватьSelectionKeyпредставлять, представлятьChannelинтересные события, такие какRead,Write,Connect,Accept.

перечислитьSelector.register()события хранятся вEpollArrayWrapperпеременная-членeventsLowиeventsHighсередина


// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超过数组长度的事件会缓存到这个map中,等待下次处理
private Map<Integer,Byte> eventsHigh;


/**
 * Sets the pending update events for the given file descriptor. This
 * method has no effect if the update events is already set to KILLED,
 * unless {@code force} is {@code true}.
 */
private void setUpdateEvents(int fd, byte events, boolean force) {
    // 判断fd和数组长度
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        if ((eventsLow[fd] != KILLED) || force) {
            eventsLow[fd] = events;
        }
    } else {
        Integer key = Integer.valueOf(fd);
        if (!isEventsHighKilled(key) || force) {
            eventsHigh.put(key, Byte.valueOf(events));
        }
    }
}

см. вышеEpollArrayWrapper.poll(), первый звонокupdateRegistrations

/**
 * Returns the pending update events for the given file descriptor.
 */
private byte getUpdateEvents(int fd) {
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        return eventsLow[fd];
    } else {
        Byte result = eventsHigh.get(Integer.valueOf(fd));
        // result should never be null
        return result.byteValue();
    }
}

/**
 * Update the pending registrations.
 */
private void updateRegistrations() {
    synchronized (updateLock) {
        int j = 0;
        while (j < updateCount) {
            int fd = updateDescriptors[j];
            // 从保存的eventsLow和eventsHigh里取出事件
            short events = getUpdateEvents(fd);
            boolean isRegistered = registered.get(fd);
            int opcode = 0;

            if (events != KILLED) {
                // 判断操作类型以传给epoll_ctl
                // 没有指定EPOLLET事件类型
                if (isRegistered) {
                    opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                } else {
                    opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                }
                if (opcode != 0) {
                    // 熟悉的epoll_ctl
                    epollCtl(epfd, opcode, fd, events);
                    if (opcode == EPOLL_CTL_ADD) {
                        registered.set(fd);
                    } else if (opcode == EPOLL_CTL_DEL) {
                        registered.clear(fd);
                    }
                }
            }
            j++;
        }
        updateCount = 0;
    }
}
private native void epollCtl(int epfd, int opcode, int fd, int events);

Делегировать операцию событию после получения событияepollCtl, Это еще один собственный метод, откройте соответствующий код c, чтобы увидеть:

JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                           jint opcode, jint fd, jint events)
{
    struct epoll_event event;
    int res;

    event.events = events;
    event.data.fd = fd;

    // 发起epoll_ctl调用来进行IO事件的管理
    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

    /*
     * A channel may be registered with several Selectors. When each Selector
     * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
     * list to remove the file descriptor from epoll. The "last" Selector will
     * close the file descriptor which automatically unregisters it from each
     * epoll descriptor. To avoid costly synchronization between Selectors we
     * allow pending updates to be processed, ignoring errors. The errors are
     * harmless as the last update for the file descriptor is guaranteed to
     * be EPOLL_CTL_DEL.
     */
    if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
    }
}

Это был наш старый другepoll_ctl.
Есть небольшая деталь, которую jdk не указываетET(边缘触发)все ещеLT(水平触发), поэтому по умолчанию используетсяLT:)

существуетAbstractSelectorImplЕсть 3 заданных события сохранения

// Public views of the key sets
// 注册的所有事件
private Set<SelectionKey> publicKeys;             // Immutable
// 内核返回的IO事件封装,表示哪些fd有数据可读可写
private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition

// 取消的事件
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();

существуетEpollArrayWrapper.pollПосле завершения вызова он вызоветupdateSelectedKeysобновить три набора выше

private int updateSelectedKeys() {
    int entries = pollWrapper.updated;
    int numKeysUpdated = 0;
    for (int i=0; i<entries; i++) {
        int nextFD = pollWrapper.getDescriptor(i);
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
        // ski is null in the case of an interrupt
        if (ski != null) {
            int rOps = pollWrapper.getEventOps(i);
            if (selectedKeys.contains(ski)) {
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {
                ski.channel.translateAndSetReadyOps(rOps, ski);
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }
    return numKeysUpdated;
}

Код очень простой, возьмите событие и сравните операцию с набором.

4. Резюме

JDKSelectorЭто инкапсуляция вызова мультиплексирования ввода/вывода операционной системы, в Linux этоepollупаковка. epoll по сутиevent loopПередано ядру, поскольку сетевые данные являются первыми для ядра, прямая обработка ядра позволяет избежать ненужных системных вызовов и копирования данных, а производительность является лучшей.

Инкапсуляция событий ввода-вывода в jdkSelectionKey, спастиChannelсобытия, вызывающие озабоченность.

На данный момент передняя и задняя реализации Selector относительно ясны.