Анализ длинного 4D-текста: те вещи, которые касаются Epoll и Java Nio

Java

Epoll — это высокопроизводительный масштабируемый механизм уведомления о событиях ввода-вывода для ядра Linux.

В linux 2.5.44 впервые был представлен epoll, он призван заменить существующие системные функции select и poll, так что большое количествоУправление файловыми дескрипторамиПрограмма может играть лучше (пример из википедии: временная сложность старой системной функции O(n), временная сложность epollO(log n)). Функция, реализованная epoll, аналогична функции poll в том смысле, что она прослушивает события на нескольких файловых дескрипторах.

Нижний слой epoll сделанНастраиваемые объекты ядра операционной системыпостроен и представлен в виде файлового дескрипторапользовательское пространство(из википедии: В операционных системах виртуальная память обычно делится наПользовательское пространство и две части основного пространства. Это часть механизма защиты памяти.ядро**, расширения ядра и драйверы**, работают наосновное пространствоначальство. Другие приложения работают в пользовательском пространстве. Все приложения, работающие в пользовательском пространстве, вместе называются пользовательским пространством).

Еще немного о ядре

этоуправлятьвыдается программным обеспечениемВвод/вывод данныхПрограмма, которая обрабатывает данные ЦП и другими электронными компонентами компьютера, но очень сложна для работы непосредственно на оборудовании.Обычно ядро ​​предоставляет метод аппаратной абстракции для завершения (ядро решает, когда программа должна) оборудование работает), с помощью этих методов для завершения межпроцессного взаимодействия и системных вызовов.

Ядро макроса:

Проще говоря, макроядро сначала определяет высокоуровневый абстрактный интерфейс, называемый системным вызовом (System call), для реализации функций операционной системы, таких как управление процессами, файловой системой, управлением памятью и т. д. Эти функции выполняются несколькими операторами.Программа в режиме ядра завершена.

Микроядро:

Структура микроядра состоит из уровня аппаратной абстракции и системных вызовов и включает в себя несколько частей, необходимых для создания системы, таких как управление потоками, адресное пространство и межпроцессное взаимодействие. Цель микроядра состоит в том, чтобыВнедрение системных сервисовиОсновная работа системыправилоотдельныйНу давай же.

В качестве макроядра используется Linux. Поскольку он может вызывать модули для выполнения во время выполнения, упрощается расширение функциональности ядра.

что сделал эпол?

epoll с помощьюкрасно-черное дерево(RB-tree)поискконтролируемыйфайловый дескриптор(дескриптор файла).

на экземпляре epollПроблема с регистрацией, эпол будетсобытие добавлено вэкземпляр epollкрасно-черное деревосливатьсязарегистрировать функцию обратного вызова,когдакогда произошло событиебудет событиедобавить в список готовыхсередина.

Структура epoll?

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

①epoll_create

подать заявку на место в ядре,Создайте дескриптор для epoll, размер используется, чтобы сообщить ядру, насколько велико количество мониторов. Этот параметр отличается от первого параметра в select() и дает значение fd+1 для максимального прослушивания. В исходной реализации вызывающая программа сообщала ядру, сколько файловых дескрипторов нужно прослушивать, через параметр размера. Если количество отслеживаемых файловых дескрипторов превышает размер, ядро ​​автоматически увеличивает емкость. И теперь размер не имеет такой семантики, но вызывающийразмер по-прежнему должен быть больше 0 при вызове, чтобы обеспечить обратную совместимость. Следует отметить, что когдаСоздайте дескриптор epollпосле этого будетзанимает значение fd, Если вы посмотрите на /proc/process id/fd/ под linux, вы увидите этот fd.

②****epoll_ctl

Добавьте, измените или удалите прослушиватель для события события в fd для экземпляра ядра epoll, соответствующего epfd..opВы можете добавить новые для EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL соответственнособытие, который изменяет тип прослушиваемых событий в файловом дескрипторе и удаляет событие из экземпляра. Если установлено свойство events событияEPOLLET flag, то способ прослушивания событиякрайний триггер.

События могут быть набором следующих макросов:

  • EPOLLIN: это событие запускается, указывая на то, что в соответствующем файловом дескрипторе есть доступные для чтения данные. (включая нормальное отключение пира SOCKET);
  • EPOLLOUT: Инициировать это событие, указывающее, что данные могут быть записаны в соответствующий файловый дескриптор;
  • EPOLLPRI: Указывает, что соответствующий файловый дескриптор имеет срочные данные для чтения (должно указывать, что поступают внеполосные данные);
  • EPOLLERR: Указывает, что соответствующий файловый дескриптор содержит ошибку;
  • EPOLLHUP: указывает, что соответствующий файловый дескриптор завис;
  • EPOLLET: Установите EPOLL в режим «Запуск по фронту», который соответствует режиму «Запуск по уровню».
  • EPOLLONESHOT: прослушать событие только один раз.После прослушивания этого события, если вам нужно продолжить мониторинг сокета, вам нужно снова добавить сокет в очередь EPOLL.

Например:

struct epoll_event ev;
//设置与要处理的事件相关的文件描述符
ev.data.fd=listenfd;
//设置要处理的事件类型
ev.events=EPOLLIN|EPOLLET;
//注册epoll事件
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

③****epoll_wait

Linux-2.6.19 представил epoll_wait, который может блокировать указанный сигнал:epoll_pwait

Получайте интересующие пользователя события ввода-вывода, происходящие в прослушиваемом дескрипторе. Проще говоря: через цикл постоянно контролируйте открытые порты, чтобы увидеть, какой fd доступен для чтения и записи~

когдавремя ожидания равно 0, epoll_wait всегда будетвернуться сейчас. итайм-аут -1, epoll_wait будетпродолжай блокироватьпока какое-либо зарегистрированное событие не станет готовым. когдатайм-аут — положительное целое число, epoll блокирует до тех пор, покатайм-аутили зарегистрировансобытие становится готовым. Время блокировки может немного превышать время ожидания (миллисекунды) из-за задержек планирования ядра.

После того, как дескриптор файла epoll израсходован, используйте его напрямуюcloseзакрыто и будетавтоматическийУдалить из набора прослушиваемых файловых дескрипторов

Эполл бой

Сказав столько принципов, мозг боится гудеть, давайте посмотрим настоящий бой и проснемся~

Как вы знаете выше: каждый раз, когда вы добавляете/модифицируете/удаляете прослушиваемый файловый дескриптор, вам нужно вызывать epoll_ctl, поэтому вы должны вызывать epoll_ctl как можно реже, чтобы не допустить, чтобы накладные расходы, вызванные этим, компенсировали его преимущества. Иногда в приложении (например, веб-сервере) может быть большое количество коротких соединений, и epoll_ctl будет вызываться часто, что может стать узким местом системы.

традиционныйвыбрать и опроситьЭффективность сети будет уменьшаться квадратично или даже кубически из-за линейного увеличения количества онлайн-пользователей, что напрямую приводит к относительно очевидному ограничению количества людей, которое может поддерживать сетевой сервер. Это связано с их ограниченными файловыми дескрипторами и неэффективностью перебора всех fd.

Суть~

когда у тебя естьБольшая коллекция розеток., но потому чтосетевая задержка, только в любой моментчастьизсокет "активен", но выбрать/опроситьКаждый вызов будет линейно сканировать всю коллекцию, в результате чего эффективностьЛинейный спад.epollЭтой проблемы не существует, онаТолько для "активных" сокетовработать --- это потому, что в реализации ядра epoll основан наобратный вызов для каждого fdфункция реализована. Так,Только «активные» сокеты будут активно вызывать функцию обратного вызова., другие сокеты в состоянии простоя (бездействия) не будут.На данный момент epoll реализует «псевдо» AIO, потому что движущей силой в это время является ядро ​​ОС. В некоторых бенчмарках, если все сокеты в основном активны — например, в среде высокоскоростной локальной сети, epoll не эффективнее, чем select/poll, наоборот, если epoll_ctl используется слишком часто, эффективность немного ниже, чем у этого. Но когда для имитации среды WAN используются незанятые соединения, epoll становится намного эффективнее, чем select/poll.

int epfd = epoll_create(POLL_SIZE);
    struct epoll_event ev;
    struct epoll_event *events = NULL;
    nfds = epoll_wait(epfd, events, 20, 500);
    {
        for (n = 0; n < nfds; ++n) {
            if (events[n].data.fd == listener) {
                //如果是主socket的事件的话,则表示
                //有新连接进入了,进行新连接的处理。
                client = accept(listener, (structsockaddr *)&local, &addrlen);
                if (client < 0) {
                    perror("accept");
                    continue;
                }
                setnonblocking(client);        //将新连接置于非阻塞模式
                ev.events = EPOLLIN | EPOLLET; //并且将新连接也加入EPOLL的监听队列。
                //注意,这里的参数EPOLLIN|EPOLLET并没有设置对写socket的监听,
                //如果有写操作的话,这个时候epoll是不会返回事件的,如果要对写操作
                //也监听的话,应该是EPOLLIN|EPOLLOUT|EPOLLET
                ev.data.fd = client;
                if (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) {
                    //设置好event之后,将这个新的event通过epoll_ctl加入到epoll的监听队列里面,
                    //这里用EPOLL_CTL_ADD来加一个新的epoll事件,通过EPOLL_CTL_DEL来减少一个
                    //epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式。
                    fprintf(stderr, "epollsetinsertionerror:fd=%d", client);
                    return -1;
                }
            }
            else if(event[n].events & EPOLLIN)
            {
                //如果是已经连接的用户,并且收到数据,
                //那么进行读入
                int sockfd_r;
                if ((sockfd_r = event[n].data.fd) < 0)
                    continue;
                read(sockfd_r, buffer, MAXSIZE);
                //修改sockfd_r上要处理的事件为EPOLLOUT
                ev.data.fd = sockfd_r;
                ev.events = EPOLLOUT | EPOLLET;
                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)
            }
            else if(event[n].events & EPOLLOUT)
            {
                //如果有数据发送
                int sockfd_w = events[n].data.fd;
                write(sockfd_w, buffer, sizeof(buffer));
                //修改sockfd_w上要处理的事件为EPOLLIN
                ev.data.fd = sockfd_w;
                ev.events = EPOLLIN | EPOLLET;
                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_w, &ev)
            }
            do_use_fd(events[n].data.fd);
        }
    }

Кратко опишу процесс:

  • При обнаружении нового соединения оно обрабатывается;
  • Если это подключенный пользователь и получает данные, после чтения измените событие, которое будет обрабатываться на sockfd_r, на EPOLLOUT (доступно для записи);
  • Если есть данные для отправки, после записи измените событие для обработки на sockfd_w на EPOLLIN (читаемое)

Как вызвать epoll на Java?

Базовые знания:

дескриптор файла:

  • (См. примечание переводчика из «Сетевого программирования Unix»)
  • файловый дескриптор — это целое число, которое идентифицирует файл в системе Unix,Философия Unix все является файлом, поэтому соответствующие ресурсы (включая обычныефайлы, каталоги, каналы, POSIX IPC, сокеты) можно рассматривать как файлы.

В мире Java NIOСелектор является центральным контроллером,Буфер — это контейнер, в котором хранятся данные.ChannelМожно сказать, что это самый простой фасад, этоместныйI/Oоборудование,Сетевой ввод-выводкоммуникационный мост.

  • Сетевые устройства ввода/вывода:
  • DatagramChannel: чтение и запись данных для связи UDP, соответствующие классу DatagramSocket.
  • SocketChannel: чтение и запись данных для связи TCP, соответствующих классу Socket.
  • ServerSocketChannel: прослушивает новые соединения TCP и создает доступный для чтения и записи SocketChannel, соответствующий классу ServerSocket.
  • Локальные устройства ввода-вывода:
  • FileChannel: чтение и запись данных локальных файлов, не поддерживает управление Selector, соответствует классу File

Начните с самого простого ServerSocketChannel

ServerSocketChannel — это прослушиватель сокетов, подобный ServerSocket, основное отличие которого состоит в том, что первый может работать в неблокирующем режиме;

// 创建一个ServerSocketChannel,将会关联一个未绑定的ServerSocket
    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }

ServerSocketChannelСоздание также зависит от реализации базовой операционной системы, и ее класс реализации в основном **ServerSocketChannelImpl**, ** давайте посмотрим на его метод построения.

     ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
        super(var1);
        // 创建一个文件操作符
        this.fd = Net.serverSocket(true);
        // 得到文件操作符是索引
        this.fdVal = IOUtil.fdVal(this.fd);
        this.state = 0;
     }

Суть создания нового ServerSocketChannelImpl заключается в создании fd (т.е. дескриптора файла) в базовой операционной системе, что эквивалентно установлению канала для сетевого взаимодействия, вызову метода bind() сокета для привязки и вызову операционной системы. через accept(). Получите TCP-соединение

public SocketChannel accept() throws IOException {
    // 忽略一些校验及无关代码
    ....

    SocketChannelImpl var2 = null;
    // var3的作用主要是说明当前的IO状态,主要有
    /**
    * EOF = -1;
    * UNAVAILABLE = -2;
    * INTERRUPTED = -3;
    * UNSUPPORTED = -4;
    * THROWN = -5;
    * UNSUPPORTED_CASE = -6;
    */
    int var3 = 0;
    // 这里本质也是用fd来获取连接
    FileDescriptor var4 = new FileDescriptor();
    // 用来存储TCP连接的地址信息
    InetSocketAddress[] var5 = new InetSocketAddress[1];

    try {
        // 这里设置了一个中断器,中断时会将连接关闭
        this.begin();
        // 这里当IO被中断时,会重新获取连接
        do {
            var3 = this.accept(this.fd, var4, var5);
        } while(var3 == -3 && this.isOpen());
    }finally {
        // 当连接被关闭且accept失败时或抛出AsynchronousCloseException
        this.end(var3 > 0);
        // 验证连接是可用的
        assert IOStatus.check(var3);
    }

    if (var3 < 1) {
        return null;
    } {
        // 默认连接是阻塞的
        IOUtil.configureBlocking(var4, true);
        // 创建一个SocketChannel的引用
        var2 = new SocketChannelImpl(this.provider(), var4, var5[0]);
        // 下面是是否连接成功校验,这里忽略...

        return var2;
    }
}

// 依赖底层操作系统实现的accept0方法
private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException {
    return this.accept0(var1, var2, var3);
}

SocketChannel

Данные, используемые для чтения и записи связи TCP, эквивалентные клиенту

  1. пройти черезopenметод создания SocketChannel,
  2. затем используйтеconnectметод инициации установления соединения с сервером, а также поддерживает некоторые методы оценки установления соединения;
  3. readиwriteПоддерживает самые основные операции чтения и записи

open

  public static SocketChannel open() throws IOException {    return SelectorProvider.provider().openSocketChannel();  }
    public SocketChannel openSocketChannel() throws IOException {
        return new SocketChannelImpl(this);
    }
    // State, increases monotonically
    private static final int ST_UNINITIALIZED = -1;
    private static final int ST_UNCONNECTED = 0;
    private static final int ST_PENDING = 1;
    private static final int ST_CONNECTED = 2;
    private static final int ST_KILLPENDING = 3;
    private static final int ST_KILLED = 4;
    private int state = ST_UNINITIALIZED;    
    SocketChannelImpl(SelectorProvider sp) throws IOException {
        super(sp);
        // 创建一个scoket通道,即fd(fd的作用可参考上面的描述)
        this.fd = Net.socket(true);
        // 得到该fd的索引
        this.fdVal = IOUtil.fdVal(fd);
        // 设置为未连接
        this.state = ST_UNCONNECTED;
    }

connect устанавливает соединение

    // 代码均来自JDK1.8 部分代码
    public boolean connect(SocketAddress var1) throws IOException {
        boolean var2 = false;
        // 读写都锁住
        synchronized(this.readLock) {
            synchronized(this.writeLock) {
                 /****状态检查,channel和address****/
                // 判断channel是否open
                this.ensureOpenAndUnconnected();
                InetSocketAddress var5 = Net.checkAddress(var1);
                SecurityManager var6 = System.getSecurityManager();
                if (var6 != null) {
                    var6.checkConnect(var5.getAddress().getHostAddress(), var5.getPort());
                }

                boolean var10000;
                 /****连接建立****/
                // 阻塞状态变更的锁也锁住
                synchronized(this.blockingLock()) {
                    int var8 = 0;

                    try {
                        try {
                            this.begin(); 
                            // 如果当前socket未绑定本地端口,则尝试着判断和服务端是否能建立连接
                            synchronized(this.stateLock) {
                                if (!this.isOpen()) {
                                    boolean var10 = false;
                                    return var10;
                                }

                                if (this.localAddress == null) {
                                  // 和远程建立连接后关闭连接
                                   NetHooks.beforeTcpConnect(this.fd, var5.getAddress(), var5.getPort());
                                }

                                this.readerThread = NativeThread.current();
                            }

                            do {
                                InetAddress var9 = var5.getAddress();
                                if (var9.isAnyLocalAddress()) {
                                    var9 = InetAddress.getLocalHost();
                                }
                                // 建立连接
                                var8 = Net.connect(this.fd, var9, var5.getPort());
                            } while(var8 == -3 && this.isOpen());
                    synchronized(this.stateLock) {
                        this.remoteAddress = var5;
                        if (var8 <= 0) {
                            if (!this.isBlocking()) {
                                this.state = 1;
                            } else {
                                assert false;
                            }
                        } else {
                            this.state = 2;// 连接成功
                            if (this.isOpen()) {
                                this.localAddress = Net.localAddress(this.fd);
                            }

                            var10000 = true;
                            return var10000;
                        }
                    }
                }

                var10000 = false;
                return var10000;
            }
        }
    }

Прежде чем установить адрес привязки, нам нужно вызватьNetHooks.beforeTcpBind, этот метод заключается в преобразовании fd в сокет SDP (Sockets Direct Protocol, Java Socket Direct Protocol). SDP требует, чтобы сетевая карта поддерживала технологию высокоскоростной сетевой связи InfiniBand, которая не поддерживается Windows.

Давайте посмотрим на NetHooks.java под openjdk: src\solaris\classes\sun\net

   private static final Provider provider = new sun.net.sdp.SdpProvider();

    public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
        provider.implBeforeTcpBind(fdObj, address, port);
    }
    public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
        provider.implBeforeTcpConnect(fdObj, address, port);
    }

Вы можете видеть, что implBeforeTcpBind в SdpProvider на самом деле вызывается

 @Override
    public void implBeforeTcpBind(FileDescriptor fdObj,
                              InetAddress address,
                              int port)
        throws IOException
    {
        if (enabled)
            convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);
    }
  // converts unbound TCP socket to a SDP socket if it matches the rules
    private void convertTcpToSdpIfMatch(FileDescriptor fdObj,
                                               Action action,
                                               InetAddress address,
                                               int port)
        throws IOException
    {
        boolean matched = false;
        // 主要是先通过规则校验器判断入参是否符合,一般有PortRangeRule校验器
        // 然后再执行将fd转换为socket
        for (Rule rule: rules) {
            if (rule.match(action, address, port)) {
                SdpSupport.convertSocket(fdObj);
                matched = true;
                break;
            }
        }

    }
    public static void convertSocket(FileDescriptor fd) throws IOException {
      ...
      //获取fd索引
      int fdVal = fdAccess.get(fd);
      convert0(fdVal);
    }


    // convert0
   JNIEXPORT void JNICALL
   Java_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd)
  {
    // create方法实际是通过socket(AF_INET_SDP, SOCK_STREAM, 0);方法得到一个socket
    int s = create(env);

    if (s >= 0) {
        socklen_t len;
        int arg, res;
        struct linger linger;

        /* copy socket options that are relevant to SDP */
        len = sizeof(arg);
        // 重用TIME_WAIT的端口
        if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0)
            setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len);
        len = sizeof(arg);
        // 紧急数据放入普通数据流
        if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0)
            setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len);
        len = sizeof(linger);
        // 延迟关闭连接
        if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0)
            setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len);

        // 将fd也引用到s所持有的通道
        RESTARTABLE(dup2(s, fd), res);
        if (res < 0)
            JNU_ThrowIOExceptionWithLastError(env, "dup2");
        // 执行close方法,关闭s这个引用
        RESTARTABLE(close(s), res);
    }
  }

читать

public int read(ByteBuffer var1) throws IOException {
            // 省略一些判断
            synchronized(this.readLock) {
                  this.begin();
                  synchronized(this.stateLock) {
                                do {
                                // 通过IOUtil的读取fd的数据至buf
                                // 这里的nd是SocketDispatcher,用于调用底层的read和write操作
                                    var3 = IOUtil.read(this.fd, var1, -1L, nd);
                                } while(var3 == -3 && this.isOpen());
                                // 这个方法主要是将UNAVAILABLE(原为-2)这个状态返回0,否则返回n
                                var4 = IOStatus.normalize(var3);
                                var20 = false;
                                break label367;
                            }

                             this.readerCleanup();
                             assert IOStatus.check(var3);
                        }    
            }
        }
    }
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1.isReadOnly()) {
            throw new IllegalArgumentException("Read-only buffer");
        } else if (var1 instanceof DirectBuffer) {
            return readIntoNativeBuffer(var0, var1, var2, var4);
        } else {
    // 临时缓冲区,大小为buf的remain(limit - position),堆外内存,使用ByteBuffer.allocateDirect(size)分配
    // Notes:这里分配后后面有个try-finally块会释放该部分内存
            ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

            int var7;
            try {
                // 将网络中的buf读进direct buffer
                int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
                var5.flip();// 待读取
                if (var6 > 0) {
                    var1.put(var5);// 成功时写入
                }

                var7 = var6;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var5);
            }

            return var7;
        }
    }
private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
            // 忽略变量init
            if (var2 != -1L) {
                // pread方法只有在同步状态下才能使用
                var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2);
            } else {
                // 其调用SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法
                var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7);
            }

            if (var9 > 0) {
                var1.position(var5 + var9);
            }

            return var9;
        }
    }
// 同样找到openjdk:src\solaris\native\sun\nio\ch 
//FileDispatcherImpl.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,
                             jobject fdo, jlong address, jint len)
{
    jint fd = fdval(env, fdo);// 获取fd索引
    void *buf = (void *)jlong_to_ptr(address);
    // 调用底层read方法
    return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);
}

Подведите итоги процесса чтения.

  1. Инициализировать прямой буфер, если его собственный буфер является прямым, его не нужно инициализировать
  2. Вызовите базовый метод чтения для записи в прямой буфер.
  3. Наконец, запишите прямой буфер в объект входящего буфера.

написать написать

После чтения предыдущего чтения весь процесс выполнения записи в основном такой же, а конкретные детали заключаются в следующем.

public int write(ByteBuffer var1) throws IOException {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            synchronized(this.writeLock) {
                this.ensureWriteOpen();
                        this.begin();
                        synchronized(this.stateLock) {
                            if (!this.isOpen()) {
                                var5 = 0;
                                var20 = false;
                                break label310;
                            }
                            this.writerThread = NativeThread.current();
                        }
                        do {
                            // 通过IOUtil的读取fd的数据至buf
                            // 这里的nd是SocketDispatcher,用于调用底层的read和write操作
                            var3 = IOUtil.write(this.fd, var1, -1L, nd);
                        } while(var3 == -3 && this.isOpen());

                        var4 = IOStatus.normalize(var3);
                        var20 = false;
                    this.writerCleanup();
                    assert IOStatus.check(var3);
                    return var4;
                }
            }
        }
    }
static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1 instanceof DirectBuffer) {
            return writeFromNativeBuffer(var0, var1, var2, var4);
        } else {

            ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);

            int var10;
            try {
                // 这里的pos为buf初始的position,意思是将buf重置为最初的状态;因为目前还没有真实的写入到channel中
                var8.put(var1);
                var8.flip();
                var1.position(var5);
                // 调用
                int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
                if (var9 > 0) {
                    var1.position(var5 + var9);
                }

                var10 = var9;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var8);
            }

            return var10;
        }
    }
IOUtil.writeFromNativeBuffer(fd , buf , position , nd)
{
    // ... 忽略一些获取buf变量的代码    
    int written = 0;
    if (position != -1) {
        // pread方法只有在同步状态下才能使用
        written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position);
    } else {
        // 其调用SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法
        written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
    }
    //....
}
FileDispatcherImpl.write0
{
    // 调用底层的write方法写入
    return convertReturnVal(env, write(fd, buf, len), JNI_FALSE);
}
}

Подведем итоги написания:

  1. Если buf является прямым буфером, начните запись напрямую, иначе вам нужно инициализировать прямой буфер, размер которого равен остатку от buf
  2. Запишите содержимое buf в прямой буфер и восстановите позицию buf
  3. Вызовите базовый метод записи для записи в канал
  4. Обновите позицию buf, то есть позицию после того, как содержимое будет прочитано прямым буфером

Потерпите, скоро будет Epoll

После понимания некоторых предыдущих основ, следующая часть будет посвящена тому, как Java использует epoll.

Краткое описание селектора

Роль Selector заключается в управлении группой мультиплексоров в Java NIO.SelectableChannelобъект и можетИдентифицироватьКанал такой какПодготовка к событиям чтения и записиКомпоненты --Java doc

img

Процесс создания Selector выглядит следующим образом:

// 1.创建Selector
Selector selector = Selector.open();

// 2.将Channel注册到选择器中
// ....... new channel的过程 ....

//Notes:channel要注册到Selector上就必须是非阻塞的,所以FileChannel是不可以
//使用Selector的,因为FileChannel是阻塞的
channel.configureBlocking(false);

// 第二个参数指定了我们对 Channel 的什么类型的事件感兴趣
SelectionKey key = channel.register(selector , SelectionKey.OP_READ);

// 也可以使用或运算|来组合多个事件,例如
SelectionKey key = channel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);

// 不过值得注意的是,一个 Channel 仅仅可以被注册到一个 Selector 一次,
// 如果将 Channel 注册到 Selector 多次, 那么其实就是相当于更新 SelectionKey 
//的 interest set.

Канал, зарегистрированный в Selector, представляет событие SelectionKey. Типы SelectionKey включают:

  • OP_READ: событие, доступное для чтения, значение: 1
  • OP_WRITE: записываемое событие; значение: 1
  • OP_CONNECT: событие подключения клиента к серверу (соединение TCP), обычно создающее клиентский канал SocketChannel; значение: 1
  • OP_ACCEPT: сервер получает событие подключения клиента, обычно создавая серверный канал ServerSocketChannel; значение: 1

Селектор внутренне поддерживает три набора ключей:

  1. key set: все ключи, зарегистрированные в селекторе по текущему каналу; вы можете вызвать keys(), чтобы получить их
  2. selected-key set: Событие готовности текущего канала, его можно получить вызовом selectedKeys()
  3. cancelled-key: Активно срабатывающий метод SelectionKey#cancel() будет помещен в эту коллекцию при условии, что канал не был разрегистрирован, его нельзя вызвать внешним методом

Всего класс Selector содержит следующие 10 методов:

  • open(): создать объект селектора
  • isOpen(): Независимо от того, находится ли он в открытом состоянии, если вызывается метод close(), он вернет false
  • provider(): получить поставщика текущего селектора
  • keys(): Как упоминалось выше, получить все ключи текущего канала, зарегистрированные в селекторе.
  • selectedKeys(): получить список событий, готовых для текущего канала
  • selectNow(): Подготовить текущее событие, этот метод возвращает результат немедленно, без блокировки; если возвращаемое значение > 0, это означает, что имеется одно или несколько событий.
  • select(длительный тайм-аут): метод тайм-аута блокировки selectNow в течение периода тайм-аута вернется, когда событие будет готово; в противном случае он также вернется после превышения времени.
  • select(): метод блокировки selectNow, который не вернется, пока событие не будет готово.
  • wakeup(): При вызове этого метода поток, заблокированный в select(), немедленно вернется; (ps: следующее предложение является ключевым моментом) Метод select() Поток также немедленно возвращает результат, что эквивалентно однократному выполнению метода selectNow().
  • close(): вызов его метода close() после использования селектора закроет селектор и сделает недействительными все экземпляры SelectionKey, зарегистрированные в селекторе. Сам канал не закрывается.

О SelectionKey

Когда дело доходит до Selector, мы должны упомянуть SelectionKey. Они тесно связаны и используются вместе. Как показано выше, регистрация Selector с Channel вернет объект SelectionKey, который содержит следующее содержимое:

  • interest set, набор событий, в котором заинтересован текущий Канал, то есть набор интересов, заданный вызовом метода register
  • ready set
  • channel
  • selector
  • attached object, необязательный дополнительный объект

①****процентный наборНабор интересов можно получить и установить с помощью методов класса SelectionKey.

// 返回当前感兴趣的事件列表
int interestSet = key.interestOps();

// 也可通过interestSet判断其中包含的事件
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;    

// 可以通过interestOps(int ops)方法修改事件列表
key.interestOps(interestSet | SelectionKey.OP_WRITE);

готовый список событий, к которым готов текущий канал

int readySet = key.readyOps();

// 也可通过四个方法来分别判断不同事件是否就绪
key.isReadable();    //读事件是否就绪
key.isWritable();    //写事件是否就绪
key.isConnectable(); //客户端连接事件是否就绪
key.isAcceptable();  //服务端连接事件是否就绪

канал и селектор Мы можем получить текущий канал и селектор через SelectionKey

// 返回当前事件关联的通道,可转换的选项包括:`ServerSocketChannel`和`SocketChannel`
Channel channel = key.channel();

//返回当前事件所关联的Selector对象
Selector selector = key.selector();

прикрепленный объект Мы можем прикрепить объект к selectionKey или прикрепить его непосредственно при регистрации:

key.attach(theObject);
Object attachedObj = key.attachment();
// 在注册时直接附加
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

Начиная с многоэтажки, базовые знания почти одинаковые.Если вы это понимаете, то можете найти какие-нибудь nio demos или netty demos для отработки своих навыков. Далее я объясню более важный ~epoll в этом разделе.

Openjdk уже упоминался много раз, и конкретная реализация селектора определенно связана с операционной системой, давайте посмотрим.

img

Видно, что реализация Selector — это SelectorImpl, а затем SelectorImpl делегирует обязанности конкретным платформам, таким как linux2.6 на рисункеEpollSelectorImpl, окна естьWindowsSelectorImpl, MacOSX естьKQueueSelectorImpl

Согласно предыдущему мы знаем, Selector.open() может получить экземпляр Selector, как этого добиться?

// Selector.java
public static Selector open() throws IOException {
    // 首先找到provider,然后再打开Selector
    return SelectorProvider.provider().openSelector();
}

// java.nio.channels.spi.SelectorProvider
    public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                            // 这里就是打开Selector的真正方法
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

В openjdk каждая операционная система имеет реализацию sun.nio.ch.DefaultSelectorProvider с srcsolarisПример DefaultSelectorProvider в \classes\sun\nio\ch:

/**
 * Returns the default SelectorProvider.
 */
public static SelectorProvider create() {
    // 获取OS名称
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    // 根据名称来创建不同的Selctor
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}

открыть источникsolarisEPollSelectorProvider.java в \classes\sun\nio\ch

public class EPollSelectorProvider
    extends SelectorProviderImpl
{
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        return InheritedChannel.getChannel();
    }
}

Платформа Linux получает окончательную реализацию Selector: srcsolarisEPollSelectorImpl.java в \classes\sun\nio\ch

Давайте посмотрим на конструктор, который он реализует:

    EPollSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        // makePipe返回管道的2个文件描述符,编码在一个long类型的变量中
        // 高32位代表读 低32位代表写
        // 使用pipe为了实现Selector的wakeup逻辑
        long pipeFds = IOUtil.makePipe(false);
        fd0 = (int) (pipeFds >>> 32);
        fd1 = (int) pipeFds;
        // 新建一个EPollArrayWrapper
        pollWrapper = new EPollArrayWrapper();
        pollWrapper.initInterrupt(fd0, fd1);
        fdToKey = new HashMap<>();
    }

EPollArrayWrapper.c в \src\solaris\native\sun\nio\ch

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
    /*
     * epoll_create expects a size as a hint to the kernel about how to
     * dimension internal structures. We can't predict the size in advance.
     */
    int epfd = epoll_create(256);
    if (epfd < 0) {
       JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}

epoll_create уже упоминался ранее, поэтому я не буду повторяться здесь.

②epoll wait ожидает событий ввода/вывода ядра

Вызов Selector.select (количество возвращаемых ключей, которое может быть равно нулю) будет, наконец, делегирован методу doSelect каждой реализации. Из-за нехватки места я не буду публиковать слишком много подробностей. Вот посмотрите на EpollSelectorImpldoSelectметод

protected int doSelect(long timeout) throws IOException {    if (closed)      throw new ClosedSelectorException();    processDeregisterQueue();    try {      begin();      //EPollArrayWrapper pollWrapper      pollWrapper.poll(timeout);//重点在这里    } finally {      end();    }    processDeregisterQueue();    int numKeysUpdated = updateSelectedKeys();// 后面会讲到    if (pollWrapper.interrupted()) {      // Clear the wakeup pipe      pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);      synchronized (interruptLock) {        pollWrapper.clearInterrupted();        IOUtil.drain(fd0);        interruptTriggered = false;      }    }    return numKeysUpdated;  }
int poll(long timeout) throws IOException {
    updateRegistrations();// 这个代码在下面讲,涉及到epoo_ctl
    // 这个epollWait是不是有点熟悉呢?
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;

Взгляните на EPollArrayWrapper.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                            jlong address, jint numfds,
                                            jlong timeout, jint epfd)
{
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
        //系统调用等待内核事件
        RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout);
    }

    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}

Вы можете видеть, что Selector.select() в Linux на самом деле вызывает epoll_wait.

③Контроль epoll и инкапсуляция openjdk для управления событиями

Отношение события ввода-вывода, зарегистрированное для селектора в JDK, должно использоватьSelectionKeyЧтобы указать, что он представляет события, в которых заинтересован канал, напримерRead,Write,Connect,Accept.

При вызове **Selector.register()** события будут храниться в переменных-членах eventsLow и eventsHigh файла EpollArrayWrapper.java.

// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超过数组长度的事件会缓存到这个map中,等待下次处理
private Map<Integer,Byte> eventsHigh;


/**
 * Sets the pending update events for the given file descriptor. This
 * method has no effect if the update events is already set to KILLED,
 * unless {@code force} is {@code true}.
 */
private void setUpdateEvents(int fd, byte events, boolean force) {
    // 判断fd和数组长度
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        if ((eventsLow[fd] != KILLED) || force) {
            eventsLow[fd] = events;
        }
    } else {
        Integer key = Integer.valueOf(fd);
        if (!isEventsHighKilled(key) || force) {
            eventsHigh.put(key, Byte.valueOf(events));
        }
    }
}
  /**
     * Returns the pending update events for the given file descriptor.
     */
    private byte getUpdateEvents(int fd) {
        if (fd < MAX_UPDATE_ARRAY_SIZE) {
            return eventsLow[fd];
        } else {
            Byte result = eventsHigh.get(Integer.valueOf(fd));
            // result should never be null
            return result.byteValue();
        }
  

Участвует в коде опроса выше

    int poll(long timeout) throws IOException {
    updateRegistrations();/

   /**
     * Update the pending registrations.
     */
    private void updateRegistrations() {
        synchronized (updateLock) {
            int j = 0;
            while (j < updateCount) {
                int fd = updateDescriptors[j];
                // 从保存的eventsLow和eventsHigh里取出事件
                short events = getUpdateEvents(fd);
                boolean isRegistered = registered.get(fd);
                int opcode = 0;

                if (events != KILLED) {
                    if (isRegistered) {
                        // 判断操作类型以传给epoll_ctl
                        // 没有指定EPOLLET事件类型
                        opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                    } else {
                        opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                    }
                    if (opcode != 0) {
                         // 熟悉的epoll_ctl
                        epollCtl(epfd, opcode, fd, events);
                        if (opcode == EPOLL_CTL_ADD) {
                            registered.set(fd);
                        } else if (opcode == EPOLL_CTL_DEL) {
                            registered.clear(fd);
                        }
                    }
                }
                j++;
            }
            updateCount = 0;
        }
  private native void epollCtl(int epfd, int opcode, int fd, int events);

Вы видите нативный метод, вызываемый epollCtl, мы вводим EpollArrayWrapper.c

JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                           jint opcode, jint fd, jint events)
{
    struct epoll_event event;
    int res;

    event.events = events;
    event.data.fd = fd;
    // epoll_ctl这里就不用多说了吧
    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

    /*
     * A channel may be registered with several Selectors. When each Selector
     * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
     * list to remove the file descriptor from epoll. The "last" Selector will
     * close the file descriptor which automatically unregisters it from each
     * epoll descriptor. To avoid costly synchronization between Selectors we
     * allow pending updates to be processed, ignoring errors. The errors are
     * harmless as the last update for the file descriptor is guaranteed to
     * be EPOLL_CTL_DEL.
     */
    if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
    }
}

После выполнения опроса метода doSelect будет обновлен updateSelectedKeys в EpollSelectorImpl.java, который представляет собой три набора наборов в Selector.Подробнее см. в начале.

/**

*更新已被epoll选择fd的键。

*将就绪兴趣集添加到就绪队列。

*/
private int updateSelectedKeys() {
        int entries = pollWrapper.updated;
        int numKeysUpdated = 0;
        for (int i=0; i<entries; i++) {
            int nextFD = pollWrapper.getDescriptor(i);
            SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
            // ski is null in the case of an interrupt
            if (ski != null) {
                int rOps = pollWrapper.getEventOps(i);
                if (selectedKeys.contains(ski)) {
                    if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                        numKeysUpdated++;
                    }
                } else {
                    ski.channel.translateAndSetReadyOps(rOps, ski);
                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                        selectedKeys.add(ski);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }

Суммировать

Из этой статьи вы должны узнать основные принципы Channel, Selector и как использовать Epoll в Java. (Включая более подробную взаимосвязь преобразования между fd, каналом и сокетом). Овладение этими базовыми знаниями, а затем просмотр исходного кода сетевой инфраструктуры NIO может быть не таким сложным. В следующей статье я расскажу о Netty, в конце концов, это стало основным направлением распределенной сетевой коммуникации!

благодарный

zh.wikipedia.org/wiki/EpollВикипедия

Encyclopedia.Baidu.com/item/epoll/…

nuggets.capable/post/684490…

woo woo Краткое описание.com/afraid/post 26 post 1 oh ah ah 7…