Модель реактора

Java

Чтобы читать эту статью без препятствий, вам нужно иметь общее представление о NIO, по крайней мере, вы должны уметь писать NIO Hello World.

Когда дело доходит до NIO и Netty, модель Reactor должна быть неизбежной, потому что эта архитектура модели слишком классическая, но многие люди склонны игнорировать базовое обучение, когда они учатся.Но я не успокоился и не стал внимательно смотреть на Краеугольный камень Netty - модель Reactor. В этой статье вы познакомитесь с моделью Reactor, чтобы вы могли получить простое и интуитивное представление о модели Reactor.

Когда речь идет о Reactor, я должен упомянуть статью. Автор статьи — знаменитый Дуг Леа. Конкурентный пакет в Java от него. Позвольте мне попытаться выделить из статьи что-то важное и объединить мое понимание. Давайте поговорим о модели Reactor и посмотрим, насколько разные схемы мозга Дуга Ли.

классический сервисный дизайн

图片.png

Это самый традиционный дизайн службы сокетов: к серверу подключаются несколько клиентов, и сервер открывает множество потоков, и один поток обслуживает одного клиента.

В большинстве сценариев обработка сетевого запроса состоит из следующих шагов:

  1. чтение: чтение данных из сокета.
  2. decode: декодирование, так как данные по сети передаются в виде байтов, чтобы получить настоящий запрос, их необходимо декодировать.
  3. вычисления: вычисления, то есть бизнес-обработка, делайте все, что хотите.
  4. encode:coding, по той же причине, потому что данные по сети передаются в виде байтов, то есть сокет получает только байты, поэтому кодирование должно быть обязательным.

Давайте посмотрим на традиционный код BIO:

      public static void main(String[] args) {
        try {
            ServerSocket serverSocket = new ServerSocket(9696);
            Socket socket = serverSocket.accept();
            new Thread(() -> {
                try {
                    byte[] byteRead = new byte[1024];
                    socket.getInputStream().read(byteRead);

                    String req = new String(byteRead, StandardCharsets.UTF_8);//encode
                    // do something

                    byte[] byteWrite = "Hello".getBytes(StandardCharsets.UTF_8);//decode
                    socket.getOutputStream().write(byteWrite);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

Этот код не нужно объяснять, его нужно понять, иначе что вам здесь поможет. . .

Каковы недостатки этого метода обработки, вы можете узнать ответ с первого взгляда: нужно открыть большое количество потоков.

Итак, нам нужно его улучшить, чтобы улучшить вещь, должна быть цель, в чем наша цель? Без полостей. Парень, ты ошибся набором.

наша цель:

  1. Изящная деградация по мере увеличения нагрузки;
  2. С улучшением ресурсов производительность можно постоянно улучшать;
  3. Также соблюдайте показатели доступности и производительности: 3.1 Низкая задержка 3.2 Удовлетворение пикового спроса 3.3 Настраиваемое качество обслуживания

Давайте подумаем, почему традиционные сокеты имеют такие недостатки:

  1. блокировать Будь то ожидание подключения клиента или ожидание данных клиента, оно заблокировано. Один человек закрыт, а десять тысяч человек не открыты. Независимо от того, когда вы подключитесь ко мне, независимо от того, когда вы дадите мне данные, я буду все еще ждать вас. Давайте подумаем: если два метода accept() и read() неблокирующие, будет ли традиционная проблема Socket решена наполовину?
  2. Синхронизировать Сервер смотрит на клиента, чтобы увидеть, подключен ли клиент ко мне и отправил ли он мне данные. Если я могу пить чай и использовать пестициды, а вы отправляете данные и подключаетесь ко мне, система уведомляет меня, и я разбираюсь с этим, это было бы здорово, так что традиционная проблема с сокетом была решена наполовину.

Итак, Бог сказал, что если есть NIO, то есть NIO.

NIO

Что означает НИО? сокращение для чего? Неблокирующая, неблокирующая модель ввода-вывода — это основное утверждение, но я думаю, что оно понимается как Новый ввод-вывод — новое поколение модели ввода-вывода может быть лучше, по крайней мере, в области Java. Как это понимать, зависит от судей.

NIO очень хорошо решает традиционную проблему сокетов:

  1. Поток может отслеживать несколько сокетов, и это больше не отношения одного человека, и десять тысяч человек не могут его открыть;
  2. Основано на событии: Когда происходят различные события, система может уведомить меня, и я буду с этим разбираться.

Здесь не будет объясняться больше концепций NIO, все вышесказанное предназначено только для того, чтобы представить главного героя сегодняшнего дня: Реактора.

Reactor

Прежде чем говорить о модели Rector, я сначала опубликую клиентский код, который позже будет использован для реализации модели Reactor:

public class Client {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket();
            socket.connect(new InetSocketAddress("localhost", 9090));
            new Thread(() -> {
                while (true) {
                    try {
                        InputStream inputStream = socket.getInputStream();
                        byte[] bytes = new byte[1024];
                        inputStream.read(bytes);
                        System.out.println(new String(bytes, StandardCharsets.UTF_8));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();

            while (true) {
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String s = scanner.nextLine();
                    socket.getOutputStream().write(s.getBytes());
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Однопоточная модель с одним реактором

图片.png

Это самая простая модель Reactor, вы можете видеть, что к Reactor подключено несколько клиентов, а внутри Reactor есть диспетчер (распределитель).

После того, как есть запрос на соединение, Reactor отправит запрос акцептору через диспетчеризацию для обработки, а после того, как произойдет событие чтения и записи ввода-вывода, он будет передан конкретному обработчику через диспетчеризацию для обработки.

В настоящее время, поскольку Reactor отвечает за обработку запросов на подключение, он также отвечает за обработку запросов на чтение и запись.Вообще говоря, обработка запросов на подключение происходит быстро, но обработка конкретных запросов на чтение и запись требует обработки бизнес-логики, что является относительно медленным. . . . Когда Reactor обрабатывает запросы на чтение и запись, другие запросы могут только ждать, а следующий запрос может быть обработан только после завершения обработки.

Голос за кадром: Скажу слабо, когда я изучал NIO и Reactor, у меня была проблема, которую я не мог понять: Разве не сказано, что NIO очень мощный, когда поток не открыт, сервер может многократно клиенты обрабатываются одновременно? Почему здесь сказано, что только один запрос может быть обработан до того, как будет обработан следующий запрос. Не уверен, что у кого-то есть идея со мной, надеюсь, я не единственный. . . Когда NIO не открывает поток, сервер может обрабатывать несколько клиентов одновременно, а это означает, что клиент может отслеживать соединение, читать и записывать события нескольких клиентов, а реальная бизнес-обработка по-прежнему «один муж — единственный». один, эффект «Вкл.» Ванфумо.

Однопоточная модель Reactor проста в программировании и больше подходит для сценариев, в которых каждый запрос может быть выполнен быстро, но не может использовать преимущества многоядерных процессоров.В общем случае однопоточная модель с одним реактором не используется. .

Истина, которая остается неизменной на протяжении тысячелетий, есть много вещей, которые можно вспомнить только после того, как вы на практике их попрактикуете. время, я думаю, это займет меньше полумесяца.Забудьте, так что вам все равно придется печатать на клавиатуре самостоятельно, чтобы реализовать однопотоковую модель с одним Reactor.

public class Reactor implements Runnable {
    ServerSocketChannel serverSocketChannel;
    Selector selector;

    public Reactor(int port) {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectionKey.attach(new Acceptor(selector, serverSocketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    dispatcher(selectionKey);
                    iterator.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatcher(SelectionKey selectionKey) {
        Runnable runnable = (Runnable) selectionKey.attachment();
        runnable.run();
    }
}

Определен класс Reactor.

В методе построения регистрируется событие подключения, а объект Acceptor присоединяется к объекту selectionKey, который является классом, используемым для обработки запросов на подключение.

Класс Reactor реализует интерфейс Runnable и реализует метод run. Слушайте различные события. После события вызовите метод диспетчера. В методе диспетчера получите объект, прикрепленный к selectionKey, а затем вызовите метод запуска. Обратите внимание, что в это время вызывается метод запуска, а поток не начался, просто обычный звонок.

public class Acceptor implements Runnable {
    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }

    @Override
    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("有客户端连接上来了," + socketChannel.getRemoteAddress());
            socketChannel.configureBlocking(false);
            SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
            selectionKey.attach(new WorkHandler(socketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

В настоящее время, если событие происходит, оно должно быть событием подключения, поскольку в конструкторе класса Reactor зарегистрировано только событие подключения, а события чтения и записи не зарегистрированы.

После того, как событие подключения происходит, метод диспетчера класса Reactor получает прикрепленный объект Acceptor, вызывает метод запуска Acceptor, регистрирует событие чтения в методе запуска, а затем прикрепляет объект WorkHandler к selectionKey.

После выполнения метода run акцептора он продолжит возвращаться к методу run в классе Reactor, который отвечает за мониторинг событий.

На этом этапе Reactor прослушивает два события: одно — событие подключения, а другое — событие чтения.

Когда происходит событие записи клиента, Reactor снова вызывает метод диспетчера.Дополнительный объект, полученный в это время, — это WorkHandler, поэтому он переходит к методу запуска в WorkHandler.

public class WorkHandler implements Runnable {
    private SocketChannel socketChannel;

    public WorkHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
            System.out.println(socketChannel.getRemoteAddress() + "发来的消息是:" + message);
            socketChannel.write(ByteBuffer.wrap("你的消息我收到了".getBytes(StandardCharsets.UTF_8)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

WorkHandler действительно отвечает за обработку событий записи клиента.

public class Main {
    public static void main(String[] args) {
        Reactor reactor = new Reactor(9090);
        reactor.run();
    }
}

Теперь мы можем протестировать:

有客户端连接上来了,/127.0.0.1:63912
/127.0.0.1:63912发来的消息是:你好
有客户端连接上来了,/127.0.0.1:49290
有客户端连接上来了,/127.0.0.1:49428
/127.0.0.1:49290发来的消息是:我不好
/127.0.0.1:49428发来的消息是:嘻嘻嘻嘻 

Голос за кадром: Цель этой статьи просто сделать так, чтобы всем было проще и проще понять модель Reactor, поэтому многие вещи были удалены, такие как регистрация событий записи, переключение чтения и записи, пробуждение и т. д. Если эти тривиальные добавляются вещи, это скорее всего пускает всех в заблуждение, интересно, зачем нужно регистрироваться и писать события, можно ли еще писать, если вы не регистрируетесь, зачем просыпаться, не можете ли вы все еще слушать только что добавленные события, если вы не просыпаетесь, и это имеет мало общего с моделью Reactor.

Многопоточная модель с одним реактором

Мы знаем, что у однопоточной модели single-Reactor так много недостатков, что мы можем целенаправленно их решить. Давайте рассмотрим недостатки однопоточной модели Reactor для заказа: пока обрабатывается запрос клиента, другие запросы могут только ждать.

Так что нам просто нужно + к концепции многопоточности, верно? Да, это многопоточная модель с одним реактором.

图片.png

Видно, что Reactor по-прежнему отвечает за обработку событий подключения и запись событий на стороне клиента, разница в том, что есть дополнительное понятие пула потоков.

Когда клиент инициирует запрос на подключение, Reactor передает задачу акцептору для обработки.Если клиент инициирует запрос на запись, Reactor передает задачу в пул потоков для обработки, чтобы сервер мог обслуживать N клиентов одновременно.

Продолжим печатать на клавиатуре и реализуем многопоточную модель с одним Reactor:

public class Reactor implements Runnable {

    ServerSocketChannel serverSocketChannel;

    Selector selector;

    public Reactor(int port) {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(9090));
            serverSocketChannel.configureBlocking(false);
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectionKey.attach(new Acceptor(serverSocketChannel, selector));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    dispatcher(selectionKey);
                    iterator.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatcher(SelectionKey selectionKey) {
        Runnable runnable = (Runnable) selectionKey.attachment();
        runnable.run();
    }
}
public class Acceptor implements Runnable {
    ServerSocketChannel serverSocketChannel;

    Selector selector;

    public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }


    @Override
    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("有客户端连接上来了," + socketChannel.getRemoteAddress());
            socketChannel.configureBlocking(false);
            SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
            System.out.println("acceptor thread:" + Thread.currentThread().getName());
            selectionKey.attach(new WorkHandler(socketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class WorkHandler implements Runnable {

    static ExecutorService pool = Executors.newFixedThreadPool(2);

    private SocketChannel socketChannel;

    public WorkHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            System.out.println("workHandler thread:" + Thread.currentThread().getName());
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            socketChannel.read(buffer);
            pool.execute(new Process(socketChannel, buffer));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class Process implements Runnable {

    private SocketChannel socketChannel;

    private ByteBuffer byteBuffer;

    public Process(SocketChannel socketChannel, ByteBuffer byteBuffer) {
        this.byteBuffer = byteBuffer;
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            System.out.println("process thread:" + Thread.currentThread().getName());
            String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
            System.out.println(socketChannel.getRemoteAddress() + "发来的消息是:" + message);
            socketChannel.write(ByteBuffer.wrap("你的消息我收到了".getBytes(StandardCharsets.UTF_8)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class Main {
    public static void main(String[] args) {
        Reactor reactor = new Reactor(9100);
        reactor.run();
    }
}

Существует небольшая разница между однопоточным кодом single-Reactor и многопоточным кодом single-Reactor, но есть концепция многопоточности.

Проверим еще раз:

有客户端连接上来了,/127.0.0.1:55789
acceptor thread:main
有客户端连接上来了,/127.0.0.1:56681
acceptor thread:main
有客户端连接上来了,/127.0.0.1:56850
acceptor thread:main
workHandler thread:main
process thread:pool-1-thread-1
/127.0.0.1:55789发来的消息是:我是客户端1
workHandler thread:main
process thread:pool-1-thread-2
/127.0.0.1:56681发来的消息是:我是客户端2
workHandler thread:main
process thread:pool-1-thread-1
/127.0.0.1:56850发来的消息是:我是客户端3

Хорошо видно, что акцептор и workHandler по-прежнему являются основным потоком, но при достижении процесса включается многопоточность.

Многопоточная модель с одним Reactor выглядит очень хорошо, но у нее все еще есть недостатки: Reactor по-прежнему отвечает за запросы на подключение, а также запросы на чтение и запись, запросы на подключение выполняются очень быстро, а клиенту обычно достаточно подключиться только один раз. . , но будет много запросов на запись. Если может быть несколько Reactors, один Reactor отвечает за обработку событий подключения, а несколько Reactor — за обработку событий записи клиента. Это больше соответствует единой ответственности, поэтому мастер -Родилась модель Slave Reactor.

Модель главного-ведомого реактора

图片.png
Это модель master-slave Reactor.Вы можете видеть, что mainReactor отвечает только за запросы на подключение, а subReactor Отвечает только за обработку событий записи клиента.

Давайте реализуем модель Master-Slave Reactor.Следует отметить, что модель Master-Slave Reactor, которую я реализовал, отличается от изображения. На картинке показан один мастер и один слейв, а то, что я реализовал, это один мастер и восемь слейвов.На картинке пул потоков открыт под субреактором, а под реализованным мной субреактором пула потоков нет.Хотя это другое, основная идея такая же.

public class Reactor implements Runnable {
    private ServerSocketChannel serverSocketChannel;

    private Selector selector;

    public Reactor(int port) {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectionKey.attach(new Acceptor(serverSocketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    dispatcher(selectionKey);
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void dispatcher(SelectionKey selectionKey) {
        Runnable runnable = (Runnable) selectionKey.attachment();
        runnable.run();
    }
}
public class Acceptor implements Runnable {
    private ServerSocketChannel serverSocketChannel;
    private final int CORE = 8;

    private int index;

    private SubReactor[] subReactors = new SubReactor[CORE];
    private Thread[] threads = new Thread[CORE];
    private final Selector[] selectors = new Selector[CORE];

    public Acceptor(ServerSocketChannel serverSocketChannel) {
        this.serverSocketChannel = serverSocketChannel;
        for (int i = 0; i < CORE; i++) {
            try {
                selectors[i] = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
            subReactors[i] = new SubReactor(selectors[i]);
            threads[i] = new Thread(subReactors[i]);
            threads[i].start();
        }
    }

    @Override
    public void run() {
        try {
            System.out.println("acceptor thread:" + Thread.currentThread().getName());
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("有客户端连接上来了," + socketChannel.getRemoteAddress());
            socketChannel.configureBlocking(false);
            selectors[index].wakeup();
            SelectionKey selectionKey = socketChannel.register(selectors[index], SelectionKey.OP_READ);
            selectionKey.attach(new WorkHandler(socketChannel));
            if (++index == threads.length) {
                index = 0;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class SubReactor implements Runnable {
    private Selector selector;

    public SubReactor(Selector selector) {
        this.selector = selector;
    }


    @Override
    public void run() {
        while (true) {
            try {
                selector.select();
                System.out.println("selector:" + selector.toString() + "thread:" + Thread.currentThread().getName());
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    dispatcher(selectionKey);
                    iterator.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatcher(SelectionKey selectionKey) {
        Runnable runnable = (Runnable) selectionKey.attachment();
        runnable.run();
    }
}
public class WorkHandler implements Runnable {
    private SocketChannel socketChannel;

    public WorkHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
            System.out.println(socketChannel.getRemoteAddress() + "发来的消息是:" + message);
            socketChannel.write(ByteBuffer.wrap("你的消息我收到了".getBytes(StandardCharsets.UTF_8)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class Main {
    public static void main(String[] args) {
        Reactor reactor = new Reactor(9090);
        reactor.run();
    }
}

Самая большая разница заключается в методе построения класса Acceptor. Я открыл потоки 8, субреакторы 8 и селекторы 8. После запуска программы будут выполняться потоки 8. Выполнение - это метод запуска, определенный в субреакторе для отслеживания событий. В методе запуска в Acceptor регистрируется событие чтения, поэтому метод запуска, определенный в ubReactor, прослушивает событие чтения.

Давайте проверим это:

acceptor thread:main
有客户端连接上来了,/127.0.0.1:57986
selector:sun.nio.ch.WindowsSelectorImpl@94f1d6thread:Thread-0
acceptor thread:main
有客户端连接上来了,/127.0.0.1:58142
selector:sun.nio.ch.WindowsSelectorImpl@1819b93thread:Thread-1
acceptor thread:main
有客户端连接上来了,/127.0.0.1:58183
selector:sun.nio.ch.WindowsSelectorImpl@1d04799thread:Thread-2
selector:sun.nio.ch.WindowsSelectorImpl@94f1d6thread:Thread-0
/127.0.0.1:57986发来的消息是:1
selector:sun.nio.ch.WindowsSelectorImpl@1819b93thread:Thread-1
/127.0.0.1:58142发来的消息是:2
selector:sun.nio.ch.WindowsSelectorImpl@1d04799thread:Thread-2
/127.0.0.1:58183发来的消息是:3
acceptor thread:main
有客户端连接上来了,/127.0.0.1:59462
selector:sun.nio.ch.WindowsSelectorImpl@11d3ebfthread:Thread-3
selector:sun.nio.ch.WindowsSelectorImpl@11d3ebfthread:Thread-3
/127.0.0.1:59462发来的消息是:1111

Хорошо видно, что от начала до конца у акцептора всего один основной поток, а за обработку клиентских запросов на запись отвечают разные потоки, а также разные реакторы и селекторы.

Структурная схема модели реактора

После прочтения трех моделей Reactor нам также необходимо взглянуть на структурную схему модели Reactor.Рисунки взяты из лучших работ по модели Reactor в отрасли, ни одна из которых не признана.

图片.png

Это выглядит немного сложно, давайте пройдемся по ним один за другим.

  • Демультиплексор синхронных событий: демультиплексор синхронных событий, используемый для мониторинга различных событий, вызывающий объект будет заблокирован при вызове метода прослушивания и не вернется, пока не произойдет событие. Для Linux разделитель синхронных событий относится к модели мультиплексирования ввода-вывода, такой как epoll, poll и т. д. Для Java NIO компонент, соответствующий разделителю синхронных событий, является селектором, а соответствующий метод блокировки — select.
  • Обработчик: по сути, это дескриптор файла. Это абстрактное понятие. Его можно просто понимать как одно событие. Событие может поступать извне, например, события подключения клиента, события записи клиента и т. д., или внутренние события. , такие как события таймера, генерируемые операционной системой, и т. д.
  • Обработчик событий: обработчик событий, по сути, является методом обратного вызова. Когда происходит событие, платформа вызывает соответствующий метод обратного вызова в соответствии с обработчиком. В большинстве случаев это виртуальная функция, и пользователю необходимо реализовать интерфейс и реализовать конкретный метод.
  • Конкретный обработчик событий. Конкретный обработчик событий — это конкретная реализация обработчика событий.
  • Диспетчер инициализации: первоначальный распространитель, который на самом деле является ролью Reactor, предоставляет ряд методов для регистрации и удаления обработчика событий; он также вызывает синхронный демультиплексор событий для прослушивания различных событий; когда происходит событие, он также вызывает соответствующий Обработчик события.

Эта статья заканчивается здесь.