Асинхронный, ваш убийца параллелизма

Java задняя часть

Сегодня поговорим о том, как сделать проекты асинхронными.

1. Синхронные и асинхронные, блокирующие и неблокирующие

Синхронизация и асинхронность, блокировка и неблокировка, эти слова уже стали клише, но все еще есть много студентов, которые часто путаются, думая, что синхронизация должна быть блокирующей, а асинхронность должна быть неблокирующей.На самом деле это не одно и то же вещь.

Синхронный и асинхронный фокус на механизме передачи сообщения результата

  • Синхронизация: Синхронизация означает, что вызывающая сторона должна активно ждать возврата результата.
  • Асинхронный: асинхронный означает, что нет необходимости активно ждать возврата результата, но с помощью других средств, таких как уведомление о состоянии, функция обратного вызова и т. д.

Блокировка и неблокировка в основном связаны с ожиданием возврата результата в состояние вызывающего объекта.

  • Блокировка: означает, что текущий поток приостановлен и ничего не делает, пока не будет возвращен результат.
  • Неблокирующий: означает, что до того, как результат будет возвращен, поток может выполнять некоторые другие действия и не будет приостановлен.

Видно, что синхронные и асинхронные, блокирующие и неблокирующие в основном связаны с разными точками.Некоторые люди спросят, может ли синхронизация быть неблокирующей, а асинхронная также может блокировать? Конечно, можно.Для того, чтобы лучше проиллюстрировать смысл их сочетаний, ниже используются несколько простых примеров: 1. Синхронная блокировка: Синхронная блокировка в основном самая распространенная модель в программировании.Например, вы идете в магазин, чтобы купить одежду, и после того, как вы идете, вы обнаруживаете, что одежда распродана, затем вы ждете в магазине и делаете ничего в течение периода (включая просмотр мобильного телефона), ожидая, пока продавец пополнит запасы до тех пор, пока запасы не будут доступны, что очень неэффективно.

2. Синхронная неблокировка: Синхронная неблокировка может быть абстрагирована в режим опроса в программировании.После того, как вы заходите в магазин, вы обнаруживаете, что одежда распродана.В это время вам не нужно тупо ждать. Вы можете пойти в другие места, такие как магазины чая с молоком, купить стакан воды, но вам все равно нужно время от времени заходить в магазин и спрашивать владельца, прибыла ли новая одежда.

3. Асинхронная блокировка: асинхронная блокировка используется в программировании реже, она похожа на написание пула потоков, сразу же отправить и затем future.get(), поэтому поток фактически приостанавливается. Это немного похоже на то, когда вы идете в магазин за одеждой и обнаруживаете, что в это время одежды нет.В это время вы оставляете номер телефона начальнику и звоните мне, когда одежда прибудет.Затем вы охраняете телефон и Подожди, пока он что-нибудь прозвонит. Не делай этого. Это кажется немного глупым, поэтому этот режим используется реже.

4. Асинхронная неблокируемость. Асинхронная неблокировка сегодня также является ядром высокопараллельного программирования, и это также ядро, которое в основном обсуждается сегодня. Например, если вы идете в магазин за одеждой, а одежды нет, вам нужно просто сказать начальнику, что это мой номер телефона, и позвонить, когда одежда прибудет. Тогда вы можете играть, как вам нравится, и вам не нужно беспокоиться о том, когда прибудет одежда.Как только одежда прибудет, вы можете отправиться за одеждой, как только зазвонит телефон.

2. Синхронный блокирующий ПК асинхронный неблокирующий

Выше было видно, насколько низка эффективность синхронной блокировки.Если вы используете синхронную блокировку для покупки одежды, вы можете купить только один предмет одежды в день, и вы не можете делать что-либо еще.Если вы используете асинхронную неблокирующую блокировку купить одежду, покупка одежды - это всего лишь одна маленькая вещь, которую вы делаете в течение дня.

Мы сопоставляем это с нашим кодом.Когда наш поток выполняет вызов rpc или http-вызов или какие-либо другие трудоемкие вызовы ввода-вывода, после его инициации, если он синхронно заблокирован, наш поток будет заблокирован и приостановлен до тех пор, пока результат не будет возвращается, представьте, что если вызов ввода-вывода очень частый, то использование нашего ЦП на самом деле очень низкое. Так называемый, чтобы наилучшим образом использовать его.Поскольку использование ЦП очень низкое при вызове ввода-вывода, мы можем использовать асинхронный неблокирующий вызов.Когда происходит вызов ввода-вывода, я не забочусь о результате немедленно, я нужно только написать функцию обратного вызова.Этот вызов ввода-вывода, я могу продолжать обрабатывать новые запросы в это время, и когда вызов ввода-вывода завершится, будет вызвана функция обратного вызова. И наш поток всегда занят, поэтому он может делать более значимые вещи.

Первое, на что следует обратить внимание, это то, что асинхронность не является панацеей, асинхронность не может сократить время вызова всей ссылки, но может значительно увеличить максимальное количество запросов в секунду. Как правило, есть две части нашего бизнеса, которые требуют больше времени:

  • Процессор: Потребление процессора относится к нашей общей бизнес-логике, такую ​​как операция, последовательность некоторых объектов данных. Эти асинхронные технологии не могут быть решены, приходится полагаться на некоторые алгоритмы оптимизации или некоторой высокопроизводительной структуры.
  • жду: Io занимает много времени, как мы сказали выше, обычно происходит при сетевых вызовах, передаче файлов, на этот раз поток обычно зависает. И наша асинхронизация обычно используется для решения этой части проблемы.

3. Что может быть асинхронным?

Как упоминалось выше, асинхронность используется для решения проблемы блокировки ввода-вывода, и мы можем использовать асинхронность в общих проектах следующим образом:

  • асинхронный сервлет, асинхронный springmvc
  • Вызовы RPC, такие как (dubbo, thrift), HTTP-вызовы, являются асинхронными.
  • Вызовы базы данных, вызовы кеша являются асинхронными

Далее я представлю асинхронность из вышеупомянутых аспектов.

4.Сервлет асинхронный

Сервлеты знакомы программистам-разработчикам Java.Используете ли вы в проекте struts2 или springmvc, они по сути являются инкапсулированными сервлетами. Но наша общая разработка фактически использует режим синхронной блокировки следующим образом:

这里写图片描述

Преимущество вышеуказанного режима заключается в том, что кодирование простое и подходит для проектов с меньшим доступом или большим количеством операций ЦП на ранней стадии запуска проекта.

Недостатком является то, что поток бизнес-логики и поток контейнера сервлета одинаковы, а общая бизнес-логика должна иметь некоторые операции ввода-вывода, такие как запрос к базе данных, например, генерация вызовов RPC, и в это время произойдет блокировка, и наш сервлет поток контейнера определенно ограничен. Когда потоки контейнера сервлета заблокированы, нашему сервису будет отказано в доступе в это время. В противном случае мы, конечно, можем решить эту проблему, добавив ряд методов к машине, но, как говорится, полагаясь на людей хуже, чем полагаться на себя.Лучше я сделаю это сам, если кто-то другой разделит просьбу за меня. Итак, после того, как сервлет 3.0 поддержит асинхронность, после того, как мы примем асинхронность, она станет следующей:

这里写图片描述

Здесь мы используем новый поток для обработки бизнес-логики, блокировка вызовов ввода-вывода не повлияет на наш серввет, а код для реализации асинхронного серввета относительно прост:

@WebServlet(name = "WorkServlet",urlPatterns = "/work",asyncSupported =true)
public class WorkServlet extends HttpServlet{
    private static final long serialVersionUID = 1L;
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        this.doPost(req, resp);
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        //设置ContentType,关闭缓存
        resp.setContentType("text/plain;charset=UTF-8");
        resp.setHeader("Cache-Control","private");
        resp.setHeader("Pragma","no-cache");
        final PrintWriter writer= resp.getWriter();
        writer.println("老师检查作业了");
        writer.flush();
        List<String> zuoyes=new ArrayList<String>();
        for (int i = 0; i < 10; i++) {
            zuoyes.add("zuoye"+i);;
        }
        //开启异步请求
        final AsyncContext ac=req.startAsync();
        doZuoye(ac, zuoyes);
        writer.println("老师布置作业");
        writer.flush();
    }

    private void doZuoye(final AsyncContext ac, final List<String> zuoyes) {
        ac.setTimeout(1*60*60*1000L);
        ac.start(new Runnable() {
            @Override
            public void run() {
                //通过response获得字符输出流
                try {
                    PrintWriter writer=ac.getResponse().getWriter();
                    for (String zuoye:zuoyes) {
                        writer.println("\""+zuoye+"\"请求处理中");
                        Thread.sleep(1*1000L);
                        writer.flush();
                    }
                    ac.complete();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

Ключ к реализации serlvet заключается в том, что http использует длинное соединение, то есть когда приходит запрос, он не будет закрыт, даже если есть возврат, потому что данные могут быть еще до тех пор, пока не будет возвращена команда закрытия. AsyncContext ac=req.startAsync(); используется для получения асинхронного контекста, а затем мы используем этот асинхронный контекст для обратного вызова и возврата данных, немного похоже на то, когда мы покупаем одежду, мы звоним боссу, и этот контекст также является звонок, когда прибудет одежда Когда данные будут готовы, вы можете сделать звонок для отправки данных. ac.complete(); используется для закрытия длинных ссылок.

4.1 SpringMVC Asynchronous

На самом деле в программирование Serlvet приходит очень мало людей, и все они напрямую используют какие-то готовые фреймворки, такие как struts2, springmvc. Ниже описано, как использовать springmvc для асинхронности:

  • Сначала убедитесь, что Servlet в вашем проекте выше 3.0! ! , а затем springMVC4.0+
<dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>javax.servlet-api</artifactId>
      <version>3.1.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>4.2.3.RELEASE</version>
    </dependency>
  • Объявление заголовка web.xml должно быть 3.0, фильтр и серверлет настроены на асинхронность
<?xml version="1.0" encoding="UTF-8"?>
<web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee 
    http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">
    <filter>
        <filter-name>testFilter</filter-name>
        <filter-class>com.TestFilter</filter-class>
        <async-supported>true</async-supported>
    </filter>
   
    <servlet>
        <servlet-name>mvc-dispatcher</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        .........
        <async-supported>true</async-supported>
    </servlet>
  • Используя springmvc для инкапсуляции AsyncContext сервлета, его относительно просто использовать. В прошлом контроллер нашего синхронного режима возвращал ModelAndView, в то время как асинхронный режим напрямую генерировал defrredResult (поддерживая наше расширение тайм-аута) для сохранения контекста.Ниже приведена простая демонстрация того, как сопоставить наш HttpClient
@RequestMapping(value="/asynctask", method = RequestMethod.GET)
    public DeferredResult<String> asyncTask() throws IOReactorException {
        IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build();
        ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
        PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor);
        conManager.setMaxTotal(100);
        conManager.setDefaultMaxPerRoute(100);
        CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build();
        // Start the client
        httpclient.start();
        //设置超时时间200ms
        final DeferredResult<String> deferredResult = new DeferredResult<String>(200L);
        deferredResult.onTimeout(new Runnable() {
            @Override
            public void run() {
                System.out.println("异步调用执行超时!thread id is : " + Thread.currentThread().getId());
                deferredResult.setResult("超时了");
            }
        });
        System.out.println("/asynctask 调用!thread id is : " + Thread.currentThread().getId());
        final HttpGet request2 = new HttpGet("http://www.apache.org/");
        httpclient.execute(request2, new FutureCallback<HttpResponse>() {
​
            public void completed(final HttpResponse response2) {
                System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine());
                deferredResult.setResult(request2.getRequestLine() + "->" + response2.getStatusLine());
            }
​
            public void failed(final Exception ex) {
                System.out.println(request2.getRequestLine() + "->" + ex);
            }
​
            public void cancelled() {
                System.out.println(request2.getRequestLine() + " cancelled");
            }
​
        });
        return deferredResult;
    }
​

Примечание: существует проблема в асинхронизации serlvet, заключающаяся в том, что нельзя использовать пострезультатную обработку фильтра.Для некоторых наших менеджеров невозможно напрямую использовать асинхронный serlvet для статистики результатов. В springmvc эта проблема решена очень хорошо, Springmvc использует хитрый способ переадресации запроса, чтобы запрос можно было снова отфильтровать. Однако была введена новая проблема, то есть фильтр будет обрабатываться дважды.Здесь мы можем использовать метод самооценки в исходном коде SpringMVC.Мы можем использовать следующее предложение в фильтре, чтобы судить, принадлежит ли оно на запрос, перенаправленный springmvc, Таким образом, предварительное событие фильтра не обрабатывается, а обрабатывается только пост-событие:

Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);
return asyncManagerAttr instanceof WebAsyncManager ;

5. Асинхронная полная ссылка

Над нами индукция serlvet, я полагаю, что студенты внимательны и, кажется, видят, что это не решает основную проблему, мое препятствие ввода-вывода все еще существует, оно просто изменило позиции, часто вызывается, когда тот же пул потоков ввода-вывода будет вносить быстрые изменения в дело полное, хотя поток контейнера серлвет не блокируется, но дело все равно становится недоступным.

Итак, как мы можем решить вышеуказанную проблему? Ответ: полносвязная асинхронность. Стремление к полносвязной асинхронности состоит в том, чтобы не блокировать, загружать ЦП и максимально сжимать производительность машины. Диаграмма модели выглядит следующим образом:

Что именно делает конкретный клиент NIO, как показано в следующей модели:

Выше показана наша полносвязная асинхронная диаграмма (часть пула потоков может быть оптимизирована). Суть всей ссылки заключается в том, что пока мы сталкиваемся с вызовами ввода-вывода, мы можем использовать NIO, чтобы избежать блокировки, что также решает неловкий сценарий переполнения пула бизнес-потоков.

5.1 Асинхронные удаленные вызовы

Обычно мы используем rpc или http для удаленных вызовов. Для RPC thrift, http, motan и т. д. обычно поддерживают асинхронные вызовы, а внутренние принципы также являются моделью NIO, управляемой событиями.Для http общие apachehttpclient и okhttp также обеспечивают асинхронные вызовы. Ниже приводится краткое введение в то, как выполняются асинхронные вызовы Http: Сначала посмотрите на пример:

public class HTTPAsyncClientDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException {
      //具体参数含义下文会讲
       //apache提供了ioReactor的参数配置,这里我们配置IO 线程为1
        IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build();
      //根据这个配置创建一个ioReactor
        ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
      //asyncHttpClient使用PoolingNHttpClientConnectionManager管理我们客户端连接
        PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor);
      //设置总共的连接的最大数量
        conManager.setMaxTotal(100);
      //设置每个路由的连接的最大数量
        conManager.setDefaultMaxPerRoute(100);
      //创建一个Client
        CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build();
        // Start the client
        httpclient.start();

        // Execute request
        final HttpGet request1 = new HttpGet("http://www.apache.org/");
        Future<HttpResponse> future = httpclient.execute(request1, null);
        // and wait until a response is received
        HttpResponse response1 = future.get();
        System.out.println(request1.getRequestLine() + "->" + response1.getStatusLine());

        // One most likely would want to use a callback for operation result
        final HttpGet request2 = new HttpGet("http://www.apache.org/");
        httpclient.execute(request2, new FutureCallback<HttpResponse>() {
						//Complete成功后会回调这个方法
            public void completed(final HttpResponse response2) {
                System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine());
            }

            public void failed(final Exception ex) {
                System.out.println(request2.getRequestLine() + "->" + ex);
            }

            public void cancelled() {
                System.out.println(request2.getRequestLine() + " cancelled");
            }

        });
    }
}

Полная диаграмма классов httpAsync приведена ниже:

Для нашего HTTPAysncClient в конце фактически используется InternalHttpAsyncClient.В InternalHttpAsyncClient есть ConnectionManager, который является менеджером нашего управления соединениями, и есть только одна реализация в httpAsync, которая является PoolingNHttpClientConnectionManager.Есть два диспетчера соединений, которые мы больше беспокоит.Один из них - Reactor, а другой - Cpool.

  • Реактор : Вся реактор здесь достигается интерфейс IOREACTOR. В PooliphnhttpClientConnectionManager будет иметь реактор, то есть defaultconnectingioreactor, этот defaultconnectingioreactor, отвечающий за обработку акцептора. Там метод вырезка Укажите DolexConnectingioreactor, генерируя IOREActor IS, мы рисуем Baseioreactor, операцию IO. Эта модель является моделью нашего выше 1.2.2.

  • CPool : В PoolingNHttpClientConnectionManager есть CPool, который в основном отвечает за управление нашим соединением. Через него контролируются упомянутые выше maxTotal и defaultMaxPerRoute. Если каждый маршрут заполнен, он отключит самый старый. быть помещен в арендованную очередь, и он будет повторно подключен, когда освободится место.

5.2 Асинхронные вызовы базы данных

Общий фреймворк для обращений к БД не предусматривает асинхронного метода, рекомендуется инкапсулировать его самостоятельно или использовать открытый исходный код онлайн, здесь у нашей компании есть открытый исходный код.GitHub.com/люблю тебя, жизнь/ тогда…Может очень хорошо поддерживать асинхронность

6. Наконец

Индукция высокого параллелизма не является серебряной пулей, но с асинхронной технологией действительно может улучшить вашу машину qps, пропускную способность и так далее. Некоторые из приведенных выше моделей говорят о том, что если мы можем разумно провести некоторую оптимизацию, то применим ее, что, мы надеемся, очень поможет вашему сервису.

Для получения дополнительной информации, пожалуйста, подпишитесь на мой официальный аккаунт

Наконец, эта статья была включена в JGrowing, всеобъемлющий и отличный маршрут изучения Java, совместно созданный сообществом.Если вы хотите участвовать в обслуживании проектов с открытым исходным кодом, вы можете создать его вместе.Адрес github:GitHub.com/Java растет…Пожалуйста, дайте мне маленькую звезду.