Введение в API асинхронного канала Java Статьи серии NIO.2

Java задняя часть сервер API

Обзор NIO.2

NIO.2, также известный как AIO, представил улучшенную версию NIO в Java 7, NIO 2, которая представляет собой асинхронный неблокирующий метод ввода-вывода.

Основная концепция AIO заключается в инициировании неблокирующей операции ввода-вывода, немедленном ответе, но не немедленном возврате результата, и уведомлении о завершении операции ввода-вывода.

В этой статье в основном представлено некоторое содержание API асинхронного канала NIO 2, а в последующих статьях будут проанализированы другие функции NIO.2.

API асинхронного канала

Начиная с Java 7, в пакет java.nio.channel добавлено 4 новых асинхронных канала:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

Эти классы похожи по стилю на API NIO Channel, они используют те же методы и структуры параметров, и большинство параметров, доступных для классов NIO Channel, по-прежнему доступны для новой асинхронной версии.

API асинхронного канала предоставляет два механизма для мониторинга и управления запущенными асинхронными операциями:

  • Во-первых, возвращаяjava.util.concurrent.Future объектдля представления результата асинхронной операции
  • Второй — путем передачи объекта нового класса в операциюjava.nio.channels.CompletionHandlerдля завершения он определяет метод обработчика, который будет выполняться после завершения операции.

Future

Начиная с Java 1.5 введен интерфейс Future, с помощью которого можно получить результат выполнения задачи после выполнения задачи.. В NIO 2 объект Future представляет собой результат асинхронной операции, допустим, мы хотим создать сервер для прослушивания клиентских подключений, открыть AsynchronousServerSocketChannel и привязать его к адресу, аналогичному ServerSocketChannel:

AsynchronousServerSocketChannel server 
  = AsynchronousServerSocketChannel.open().bind(null);

Метод bind() принимает адрес сокета в качестве параметра, здесь передается нулевой адрес, он автоматически привязывает сокет к адресу локального хоста и использует свободный эфемерный порт, точно так же, как традиционный ServerSocket с портом 0 Опять же, используйте эфемерный порт, случайно назначенный операционной системой. Затем вызовите метод accept() сервера:

Future<AsynchronousSocketChannel> future = server.accept();

Когда мы вызываем метод accept() для ServerSocketChannel в NIO, он блокируется до тех пор, пока от клиента не будет получено входящее соединение. Но метод accept() AsynchronousServerSocketChannel немедленно возвращает объект Future.

Универсальный тип объекта Future — это возвращаемый тип операции, в приведенном выше примере это AsynchronousSocketChannel, но он также может быть Integer или String, в зависимости от окончательного возвращаемого типа операции.

Мы можем использовать объект Future для запроса статуса операции.

future.isDone();

Этот API возвращает true, если базовая операция завершена. Обратите внимание, что в этом случае завершение может означать корректное завершение, исключение или отмену.

Мы также можем явно проверить, была ли операция отменена, и вернуть true, если операция была отменена до ее нормального завершения. следующим образом:

future.isCancelled();

Фактическая операция отмены выглядит следующим образом:

future.cancel(true)

Метод cancel() может использовать логический флаг, чтобы указать, можно ли прервать принимающий поток.

Чтобы получить результат операции, мы используем метод get(), который блокирует ожидание возврата результата:

AsynchronousSocketChannel client= future.get();

Кроме того, мы также можем установить время блокировки, в следующем примере установлено значение 10 с:

AsynchronousSocketChannel worker = future.get(10, TimeUnit.SECONDS);

CompletionHandler

Альтернативой использованию Futures для обработки операций является использование механизма обратного вызова класса CompletionHandler. Асинхронные каналы позволяют указать обработчик завершения для использования результата операции:

AsynchronousServerSocketChannel listener
  = AsynchronousServerSocketChannel.open().bind(null);
 
listener.accept(
  attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {
    public void completed(
      AsynchronousSocketChannel client, Object attachment) {
          // do whatever with client
      }
    public void failed(Throwable exc, Object attachment) {
          // handle failure
      }
  });

Когда операция ввода-вывода завершается успешно, вызывается завершенный API обратного вызова. В случае сбоя операции вызовите отказавший API.

Экземпляр API асинхронного канала

Сервер (с будущим)

Вот как создать сервер с помощью Future.

public class AsyncEchoServer {
    private AsynchronousServerSocketChannel server;
    private Future<AsynchronousSocketChannel> future;
    private AsynchronousSocketChannel worker;

    public AsyncEchoServer() throws IOException, ExecutionException, InterruptedException {
        System.out.println("Open Server Channel");
        server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9090));
        future = server.accept();
    }

    public void runServer() throws ExecutionException, InterruptedException, IOException, TimeoutException {
        //获取操作结果
        worker = future.get();
        if (worker != null && worker.isOpen()) {
            ByteBuffer buffer = ByteBuffer.allocate(100);
            //将通道中的数据写入缓冲区
            worker.read(buffer).get(10,TimeUnit.SECONDS);
            System.out.println("received from client: " + new String(buffer.array()));
        }
        server.close();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException, TimeoutException {
        AsyncEchoServer server = new AsyncEchoServer();
        server.runServer();
    }
}

Сервер (с CompletionHandler)

Ниже мы увидим, как реализовать тот же код на стороне сервера, используя метод CompletionHandler вместо метода Future.

public class AsyncEchoServerWithCallBack {
    private AsynchronousServerSocketChannel server;
    private AsynchronousSocketChannel worker;
    private AsynchronousChannelGroup group;
    public AsyncEchoServerWithCallBack() throws IOException, ExecutionException, InterruptedException {
        System.out.println("Open Server Channel");
        group = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
        server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("127.0.0.1", 9090));
        //当有新连接建立时会调用 CompletionHandler接口实现对象中的 completed()方法
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {
                if (server.isOpen()) {
                    server.accept(null, this);
                }
                worker = result;
                if ((worker != null) && (worker.isOpen())) {
                    ByteBuffer byteBuffer = ByteBuffer.allocate(100);
                    worker.read(byteBuffer);
                    System.out.println("received the client: "+new String(byteBuffer.array()));
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                //TODO
            }
        });
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException, TimeoutException {
        AsyncEchoServerWithCallBack server = new AsyncEchoServerWithCallBack();
    }
}

При установлении нового соединения будет вызываться метод complete() в объекте реализации интерфейса CompletionHandler, а при возникновении ошибки будет вызываться неудавшийся метод.

Первым параметром метода accept может быть объект любого типа, называемый «присоединенным объектом» во время вызова. Объект вложения передается при вызове метода accept() и может быть получен из параметров (вложения) завершенных и неудачных методов в объекте реализации интерфейса CompletionHandler, чтобы можно было передавать данные. Все методы, использующие интерфейс CompletionHandler, поддерживают использование объектов вложений для передачи данных.

Класс асинхронной группы каналов

Когда асинхронный канал обрабатывает запросы ввода-вывода, ему необходимо использовать класс AsynchronousChannelGroup.Объект этого класса представляет группу асинхронных каналов, и каждой группе соответствует соответствующий пул потоков., вам нужно использовать статический фабричный метод withFixedThreadPool, withCachedThreadPool или withThreaPool класса AsynchronousChannelGroup для установки пула потоков. Потоки в этом пуле потоков используются для обработки событий ввода-вывода. Несколько асинхронных каналов могут совместно использовать сгруппированный ресурс пула потоков.

При вызове метода open классов AsynchronousSocketChannel и AsynchronousServerSocketChannel для открытия канала асинхронного сокета можно передать объект класса AsynchronousChannelGroup. Если открытый метод вызывается без объекта класса AsynchronousChannelGroup, по умолчанию используется группа, предоставленная системой.Поток в пуле потоков, соответствующий системной группе,Нить демона, если используется группировка по умолчанию, программа завершает работу вскоре после запуска,Потому что поток демона, используемый системной группой, не препятствует выходу виртуальной машины.

клиент

public class AsyncEchoClient {
    private AsynchronousSocketChannel client;
    private Future<Void> future;

    public AsyncEchoClient() throws IOException {
        //打开一个异步channel
        System.out.println("Open client channel");
        client = AsynchronousSocketChannel.open();
        //连接本地端口和地址,在连接成功后不返回任何内容,但是,我们仍然可以使用Future对象来监视异步操作的状态
        System.out.println("Connect to server");
        future = client.connect(new InetSocketAddress("127.0.0.1", 9090));
    }

    /**
     * 向服务端发送消息
     *
     * @param message
     * @return
     */
    public void sendMessage(String message) throws ExecutionException, InterruptedException {
        if (!future.isDone()) {
            future.cancel(true);
            return;
        }
        //将一个字节数组封装到ByteBuffer中
        ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes());
        System.out.println("Sending message to the server");
        //将数据写入通道
        int numberBytes = client.write(byteBuffer).get();
        byteBuffer.clear();
    }

    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        AsyncEchoClient client = new AsyncEchoClient();
        client.sendMessage("hello world");
    }
}

Результаты теста

Клиент:

Open client channel
Connect to server

Сервер:

Open Server Channel
received the client: hello world

Вариант реализации асинхронного ввода-вывода Java NIO 2

Все мы знаем, что NIO 2.0, предоставляемый JDK 1.7, добавляет асинхронный канал сокета, который является настоящим асинхронным вводом-выводом.Переменные могут передаваться во время асинхронных операций ввода-вывода, и связанные методы будут вызываться после завершения операции. Так как же отражается асинхронная неблокирующая функция NIO 2? Многие детали можно увидеть из предыдущего описания:

Асинхронное проявление

кAsynchronousServerSocketChannelНапример, когда вызывается метод accept() объекта этого класса, он возвращаетFuture<AsynchronousSocketChannel>объект, вызов метода accept() аналогичен вызову традиционного метода ввода-вывода.ServerSocket的accept()То же самое, по сути, и получение клиентских запросов на подключение, ноAsynchronousServerSocketChannelОбъект не блокирует ожидание все время, а сразу возвращаетFutureобъект, использованиеFutureизgetспособ получения результата соединения,FutureОбъект является результатом асинхронной операции, мы также можем использовать функцию Future.isDoneМетод запрашивает статус завершения операции, что является воплощением асинхронности.

Разумеется, так же, как и метод CompletionHandler, при установлении нового соединения будет вызван методpleted() в объекте реализации интерфейса CompletionHandler, а при возникновении ошибки будет вызван сбойный метод.

Неблокирующий вариант

при звонкеAsynchronousServerSocketChannelПосле метода accept() объекта возвращается объект Future. В это время поток может продолжать заниматься другими делами. Это не блокирует. Чтобы получить результат операции, вызовите Future'sisDoneметод, чтобы запросить, завершена ли операция, используйтеget()чтобы получить результат, типичная неблокирующая операция. В традиционной модели ввода-вывода объект класса сокетаacceptМетод заблокируется и будет ждать, пока не появится новое соединение.

резюме

NIO.2, также известный как AIO, понимание его API асинхронного канала также может помочь нам лучше понять асинхронные операции ввода-вывода. Когда мы изучаем API NIO2, мы также можем учиться у API канала в NIO, и у них все еще есть много общего.

Ссылки и благодарности