Постоянное соединение Http и пул соединений HttpClient

Java

Советы: Обратите внимание на официальный аккаунт: отчет Songhua Preserved Egg на доске, получайте ежемесячную зарплату программиста 25K + читы, обязательно для входа в BAT!

1. Предпосылки

Протокол HTTP является протоколом без сохранения состояния, то есть каждый запрос не зависит друг от друга. Следовательно, его первоначальная реализация заключается в том, что каждый http-запрос будет открывать соединение с сокетом tcp, и соединение будет закрыто, когда взаимодействие будет завершено.

Протокол HTTP является полнодуплексным протоколом, поэтому для установления соединения и его разрыва требуется три рукопожатия и четыре волны. Очевидно, что в таком дизайне каждый раз, когда отправляется Http-запрос, он будет потреблять много дополнительных ресурсов, то есть на установление и разрушение соединения.

В результате также был разработан протокол HTTP, а мультиплексирование сокетных соединений осуществляется методом постоянного соединения.


Как видно из рисунка:

  1. В последовательном соединении соединение открывается и закрывается для каждого взаимодействия.
  2. В постоянном соединении соединение открывается для первого взаимодействия, и соединение не закрывается после взаимодействия, и процесс установления соединения опускается для следующего взаимодействия.

Существует две реализации постоянных соединений: постоянные соединения HTTP/1.0+ и постоянные соединения HTTP/1.1.

2. Поддержка HTTP/1.0+

С 1996 года многие браузеры и серверы HTTP/1.0 расширили протокол, который является протоколом расширения «поддержки активности».

Обратите внимание, что этот протокол расширения появился как дополнение к «экспериментальному постоянному соединению» версии 1.0. keep-alive больше не используется и не описан в последней спецификации HTTP/1.1, но многие приложения продолжают работать.

Клиенты, использующие HTTP/1.0, добавляют в заголовок «Connection:Keep-Alive», запрашивая сервер о том, чтобы соединение оставалось открытым. Сервер включит тот же заголовок в ответ, если он хочет оставить соединение открытым. Если ответ не содержит заголовка «Connection: Keep-Alive», клиент будет думать, что сервер не поддерживает keep-alive, и закроет текущее соединение после отправки ответного сообщения.

Через дополнительный протокол keep-alive устанавливается постоянное соединение между клиентом и сервером, но есть еще некоторые проблемы:

  • В HTTP/1.0 keep-alive не является стандартным протоколом, клиент должен отправить Connection:Keep-Alive, чтобы активировать соединение проверки активности.
  • Прокси-сервер может не поддерживать поддержку активности, потому что некоторые прокси-серверы являются «слепыми ретрансляторами» и не могут понять значение заголовка, а просто пересылают заголовок шаг за шагом. Следовательно, это может привести к тому, что и клиент, и сервер будут поддерживать соединение, но прокси-сервер не будет принимать данные о соединении.

3. Постоянное соединение HTTP/1.1

HTTP/1.1 заменяет Keep-Alive постоянными соединениями.

Соединения HTTP/1.1 по умолчанию являются постоянными. Если вы хотите закрыть его явно, вам нужно добавить в сообщение заголовок Connection:Close. То есть в HTTP/1.1 все соединения мультиплексируются.

Однако, как и Keep-Alive, незанятые постоянные соединения также могут быть закрыты клиентом и сервером в любое время. Отсутствие отправки Connection:Close не означает, что сервер обещает всегда держать соединение открытым.

В-четвертых, как HttpClient создает постоянные соединения.

Пул соединений используется в HttpClien для управления удерживаемыми соединениями. На том же TCP-канале соединение можно использовать повторно. HttpClient обеспечивает сохранение соединения посредством пула соединений.

По сути, технология «бассейн» представляет собой общую конструкцию, и ее конструкторская мысль не сложна:

  1. Установить соединение, когда соединение используется в первый раз
  2. В конце соответствующее соединение не закрывается и возвращается в пул
  3. Следующее соединение с той же целью может получить доступное соединение из пула.
  4. Периодически очищайте просроченные соединения

Все пулы соединений следуют этой идее, но мы в основном фокусируемся на двух моментах при рассмотрении исходного кода HttpClient:

  • Конкретная схема проектирования пула соединений для дальнейшего использования для настройки пула соединений.
  • Как соответствовать протоколу HTTP, то есть реализация теоретической абстракции в коде

4.1 Реализация пула соединений HttpClient

Обработка постоянных соединений HttpClient может быть сосредоточена в следующем коде. Следующие части извлечены из MainClientExec, связанные с пулом соединений, а другие части удалены:

public class MainClientExec implements ClientExecChain {

    @Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
     //从连接管理器HttpClientConnectionManager中获取一个连接请求ConnectionRequest
        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);final HttpClientConnection managedConn;
        final int timeout = config.getConnectionRequestTimeout();        //从连接请求ConnectionRequest中获取一个被管理的连接HttpClientConnection
        managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
     //将连接管理器HttpClientConnectionManager与被管理的连接HttpClientConnection交给一个ConnectionHolder持有
        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            HttpResponse response;
            if (!managedConn.isOpen()) {          //如果当前被管理的连接不是出于打开状态,需要重新建立连接
                establishRoute(proxyAuthState, managedConn, route, request, context);
            }
       //通过连接HttpClientConnection发送请求
            response = requestExecutor.execute(request, managedConn, context);
       //通过连接重用策略判断是否连接可重用         
            if (reuseStrategy.keepAlive(response, context)) {
                //获得连接有效期
                final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                //设置连接有效期
                connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);          //将当前连接标记为可重用状态
                connHolder.markReusable();
            } else {
                connHolder.markNonReusable();
            }
        }
        final HttpEntity entity = response.getEntity();
        if (entity == null || !entity.isStreaming()) {
            //将当前连接释放到池中,供下次调用
            connHolder.releaseConnection();
            return new HttpResponseProxy(response, null);
        } else {
            return new HttpResponseProxy(response, connHolder);
        }
}

Здесь видно, что обработка соединения в процессе запроса Http соответствует спецификации протокола, здесь мы обсудим конкретную реализацию.

PoolingHttpClientConnectionManager — это диспетчер соединений по умолчанию для HttpClient. Во-первых, запрос на соединение получается через requestConnection(). Обратите внимание, что это не соединение.

public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {
            @Override
            public boolean cancel() {
                return future.cancel(true);
            }
            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                final HttpClientConnection conn = leaseConnection(future, timeout, tunit);
                if (conn.isOpen()) {
                    final HttpHost host;
                    if (route.getProxyHost() != null) {
                        host = route.getProxyHost();
                    } else {
                        host = route.getTargetHost();
                    }
                    final SocketConfig socketConfig = resolveSocketConfig(host);
                    conn.setSocketTimeout(socketConfig.getSoTimeout());
                }
                return conn;
            }
        };
    }

Вы можете видеть, что возвращенный объект ConnectionRequest на самом деле является холдингом Future, а CPoolEntry — реальным экземпляром соединения, управляемым пулом соединений.

Из приведенного выше кода мы должны сосредоточиться на:

  • Future<CPoolEntry> future = this.pool.lease(route, state, null)
    • Как получить асинхронное соединение из пула соединений CPool, Future
  • HttpClientConnection conn = leaseConnection(future, timeout, tunit)
    • Как получить реальное соединение HttpClientConnection путем асинхронного подключения Future

4.2 Future

Посмотрите, как CPool выпускает Future.Основной код AbstractConnPool выглядит следующим образом:

    private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final Future<E> future) throws IOException, InterruptedException, TimeoutException {
     //首先对当前连接池加锁,当前锁是可重入锁ReentrantLockthis.lock.lock();
        try {        //获得一个当前HttpRoute对应的连接池,对于HttpClient的连接池而言,总池有个大小,每个route对应的连接也是个池,所以是“池中池”
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");          //死循环获得连接
                for (;;) {            //从route对应的池中拿连接,可能是null,也可能是有效连接
                    entry = pool.getFree(state);            //如果拿到null,就退出循环
                    if (entry == null) {
                        break;
                    }            //如果拿到过期连接或者已关闭连接,就释放资源,继续循环获取
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {              //如果拿到有效连接就退出循环
                        break;
                    }
                }          //拿到有效连接就退出
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }
          //到这里证明没有拿到有效连接,需要自己生成一个                
                final int maxPerRoute = getMax(route);
                //每个route对应的连接最大数量是可配置的,如果超过了,就需要通过LRU清理掉一些连接
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
          //当前route池中的连接数,没有达到上线
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);            //判断连接池是否超过上线,如果超过了,需要通过LRU清理掉一些连接
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();               //如果空闲连接数已经大于剩余可用空间,则需要清理下空闲连接
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }              //根据route建立一个连接
                        final C conn = this.connFactory.create(route);              //将这个连接放入route对应的“小池”中
                        entry = pool.add(conn);              //将这个连接放入“大池”中
                        this.leased.add(entry);
                        return entry;
                    }
                }
         //到这里证明没有从获得route池中获得有效连接,并且想要自己建立连接时当前route连接池已经到达最大值,即已经有连接在使用,但是对当前线程不可用
                boolean success = false;
                try {
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }            //将future放入route池中等待
                    pool.queue(future);            //将future放入大连接池中等待
                    this.pending.add(future);            //如果等待到了信号量的通知,success为true
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                } finally {
                    //从等待队列中移除
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                //如果没有等到信号量通知并且当前时间已经超时,则退出循环
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }       //最终也没有等到信号量通知,没有拿到可用连接,则抛异常
            throw new TimeoutException("Timeout waiting for connection");
        } finally {       //释放对大连接池的锁
            this.lock.unlock();
        }
    }

Приведенная выше логика кода имеет несколько важных моментов:

  • Пул соединений имеет максимальное количество соединений, каждый маршрут соответствует небольшому пулу соединений, а также имеет максимальное количество соединений.
  • Будь то большой пул соединений или небольшой пул соединений, при превышении числа некоторые соединения должны быть освобождены через LRU.
  • Если получено доступное соединение, оно возвращается на верхний уровень для использования.
  • Если доступного соединения нет, HttpClient определит, превышает ли текущий пул соединений маршрута максимальное число, и создаст новое соединение, если оно не достигает верхнего предела, и поместит его в пул.
  • Если достигнут верхний предел, подождите в очереди, дождитесь семафора, получите его снова и сгенерируйте исключение тайм-аута, если он не дождется.
  • Чтобы получить соединение через пул потоков, его необходимо заблокировать с помощью ReetrantLock, чтобы обеспечить потокобезопасность.

На данный момент программа имеет доступный экземпляр CPoolEntry или выдает исключение для завершения программы.

4.3 HttpClientConnection

    protected HttpClientConnection leaseConnection(
            final Future<CPoolEntry> future,
            final long timeout,
            final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
        final CPoolEntry entry;
        try {       //从异步操作Future<CPoolEntry>中获得CPoolEntry
            entry = future.get(timeout, tunit);
            if (entry == null || future.isCancelled()) {
                throw new InterruptedException();
            }
            Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
            }       //获得一个CPoolEntry的代理对象,对其操作都是使用同一个底层的HttpClientConnection
            return CPoolProxy.newProxy(entry);
        } catch (final TimeoutException ex) {
            throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
        }
    }

5. Как HttpClient повторно использует постоянные соединения?

В предыдущей главе мы видели, что HttpClient получает соединения через пул соединений и получает их из пула, когда ему нужно использовать соединение.

Отвечая на вопрос в главе 3:

  1. Установить соединение, когда соединение используется в первый раз
  2. В конце соответствующее соединение не закрывается и возвращается в пул
  3. Следующее соединение с той же целью может получить доступное соединение из пула.
  4. Периодически очищайте просроченные соединения

В главе 4 мы видели, как HttpClient обрабатывает вопросы 1 и 3, так как же решается вопрос 2?

То есть, как HttpClient решает, должно ли соединение быть закрыто после использования или должно быть помещено в пул для повторного использования другими? Взгляните еще раз на код MainClientExec


          //发送Http连接                response = requestExecutor.execute(request, managedConn, context);
                //根据重用策略判断当前连接是否要复用
                if (reuseStrategy.keepAlive(response, context)) {
                    //需要复用的连接,获取连接超时时间,以response中的timeout为准
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    if (this.log.isDebugEnabled()) {
                        final String s;               //timeout的是毫秒数,如果没有设置则为-1,即没有超时时间
                        if (duration > 0) {
                            s = "for " + duration + " " + TimeUnit.MILLISECONDS;
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection can be kept alive " + s);
                    }            //设置超时时间,当请求结束时连接管理器会根据超时时间决定是关闭还是放回到池中
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    //将连接标记为可重用            connHolder.markReusable();
                } else {            //将连接标记为不可重用
                    connHolder.markNonReusable();
                }

Можно видеть, что когда запрос выполняется с использованием соединения, существует стратегия повторной попытки соединения, чтобы решить, следует ли повторно использовать соединение. конец.

Итак, какова логика стратегии повторного использования соединения?

public class DefaultClientConnectionReuseStrategy extends DefaultConnectionReuseStrategy {

    public static final DefaultClientConnectionReuseStrategy INSTANCE = new DefaultClientConnectionReuseStrategy();

    @Override
    public boolean keepAlive(final HttpResponse response, final HttpContext context) {
     //从上下文中拿到request
        final HttpRequest request = (HttpRequest) context.getAttribute(HttpCoreContext.HTTP_REQUEST);
        if (request != null) {       //获得Connection的Header
            final Header[] connHeaders = request.getHeaders(HttpHeaders.CONNECTION);
            if (connHeaders.length != 0) {
                final TokenIterator ti = new BasicTokenIterator(new BasicHeaderIterator(connHeaders, null));
                while (ti.hasNext()) {
                    final String token = ti.nextToken();            //如果包含Connection:Close首部,则代表请求不打算保持连接,会忽略response的意愿,该头部这是HTTP/1.1的规范
                    if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {
                        return false;
                    }
                }
            }
        }     //使用父类的的复用策略
        return super.keepAlive(response, context);
    }

}

Взгляните на стратегию повторного использования родительского класса.

            if (canResponseHaveBody(request, response)) {
                final Header[] clhs = response.getHeaders(HTTP.CONTENT_LEN);
                //如果reponse的Content-Length没有正确设置,则不复用连接          //因为对于持久化连接,两次传输之间不需要重新建立连接,则需要根据Content-Length确认内容属于哪次请求,以正确处理“粘包”现象                //所以,没有正确设置Content-Length的response连接不能复用
                if (clhs.length == 1) {
                    final Header clh = clhs[0];
                    try {
                        final int contentLen = Integer.parseInt(clh.getValue());
                        if (contentLen < 0) {
                            return false;
                        }
                    } catch (final NumberFormatException ex) {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        if (headerIterator.hasNext()) {
            try {
                final TokenIterator ti = new BasicTokenIterator(headerIterator);
                boolean keepalive = false;
                while (ti.hasNext()) {
                    final String token = ti.nextToken();            //如果response有Connection:Close首部,则明确表示要关闭,则不复用
                    if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {
                        return false;            //如果response有Connection:Keep-Alive首部,则明确表示要持久化,则复用
                    } else if (HTTP.CONN_KEEP_ALIVE.equalsIgnoreCase(token)) {
                        keepalive = true;
                    }
                }
                if (keepalive) {
                    return true;
                }
            } catch (final ParseException px) {
                return false;
            }
        }
     //如果response中没有相关的Connection首部说明,则高于HTTP/1.0版本的都复用连接  
        return !ver.lessEquals(HttpVersion.HTTP_1_0);

в заключении:

  • Если заголовок запроса содержит Connection:Close, не используйте повторно
  • Если длина Content-Length в ответе установлена ​​неправильно, она не будет использоваться повторно.
  • Если заголовок ответа содержит Connection:Close, не используйте повторно
  • Если заголовок ответа содержит Connection:Keep-Alive, повторно используйте
  • В случае отсутствия обращений использовать повторно, если версия HTTP выше 1.0.

Как видно из кода, стратегия его реализации согласуется с ограничениями уровня протокола в главах 2 и 3.

6. Как HttpClient очищает подключения с истекшим сроком действия

До версии HttpClient 4.4, когда повторно используемое соединение получается из пула соединений, он проверяет, не истек ли срок его действия, и если оно истекает, оно очищается.

Более поздние версии отличаются: будет отдельный поток для сканирования соединений в пуле соединений, и он будет очищен, когда будет обнаружено, что самое последнее использование превышает установленное время. Тайм-аут по умолчанию составляет 2 секунды.

    public CloseableHttpClient build() {            //如果指定了要清理过期连接与空闲连接,才会启动清理线程,默认是不启动的
            if (evictExpiredConnections || evictIdleConnections) {          //创造一个连接池的清理线程
                final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(cm,
                        maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS,
                        maxIdleTime, maxIdleTimeUnit);
                closeablesCopy.add(new Closeable() {
                    @Override
                    public void close() throws IOException {
                        connectionEvictor.shutdown();
                        try {
                            connectionEvictor.awaitTermination(1L, TimeUnit.SECONDS);
                        } catch (final InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                        }
                    }

                });          //执行该清理线程
                connectionEvictor.start();
}

Вы можете увидеть, когда строить в HttpClientBuilder, если вы открываете указанную функцию очистки, она создает поток очистки пула соединений и запускает его.

    public IdleConnectionEvictor(
            final HttpClientConnectionManager connectionManager,
            final ThreadFactory threadFactory,
            final long sleepTime, final TimeUnit sleepTimeUnit,
            final long maxIdleTime, final TimeUnit maxIdleTimeUnit) {
        this.connectionManager = Args.notNull(connectionManager, "Connection manager");
        this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory();
        this.sleepTimeMs = sleepTimeUnit != null ? sleepTimeUnit.toMillis(sleepTime) : sleepTime;
        this.maxIdleTimeMs = maxIdleTimeUnit != null ? maxIdleTimeUnit.toMillis(maxIdleTime) : maxIdleTime;
        this.thread = this.threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                try {            //死循环,线程一直执行
                    while (!Thread.currentThread().isInterrupted()) {              //休息若干秒后执行,默认10秒
                        Thread.sleep(sleepTimeMs);               //清理过期连接
                        connectionManager.closeExpiredConnections();               //如果指定了最大空闲时间,则清理空闲连接
                        if (maxIdleTimeMs > 0) {
                            connectionManager.closeIdleConnections(maxIdleTimeMs, TimeUnit.MILLISECONDS);
                        }
                    }
                } catch (final Exception ex) {
                    exception = ex;
                }

            }
        });
    }

в заключении:

  • Только после того, как HttpClientBuilder будет установлен вручную, будет включена очистка просроченных и простаивающих соединений.
  • После ручной настройки поток будет выполняться в бесконечном цикле.Каждый раз, когда спящий режим выполняется в течение определенного периода времени, вызывается метод очистки HttpClientConnectionManager для очистки просроченных и простаивающих соединений.

7. Резюме этой статьи

  • Протокол HTTP устраняет проблему чрезмерного количества соединений в ранней разработке благодаря постоянному соединению.
  • Существует два способа постоянного подключения: Keep-Avlive для HTTP/1.0+ и постоянное подключение по умолчанию для HTTP/1.1.
  • HttpClient управляет постоянными соединениями через пулы соединений. Пулы соединений разделены на два, один из которых является общим пулом соединений, а другой — пулом соединений, соответствующим каждому маршруту.
  • HttpClient получает соединение из пула через асинхронное Future
  • Политика повторного использования соединения по умолчанию согласуется с ограничениями протокола HTTP. Согласно ответу, если сначала будет оценено Connection:Close, оно будет закрыто, а если будет оценено Connection:Keep-Alive, оно будет включено.Последняя версия больше 1,0 и включен.
  • Соединения в пуле соединений будут очищаться только после того, как в HttpClientBuilder вручную будет включен переключатель для очистки просроченных и простаивающих соединений.
  • Версии после HttpClient 4.4 очищают просроченные и простаивающие соединения через поток бесконечного цикла, который приостанавливается на некоторое время при каждом выполнении для достижения эффекта регулярного выполнения.

Вышеупомянутое исследование основано на личном понимании исходного кода HttpClient.Если есть какая-либо ошибка, я надеюсь, что вы активно оставите сообщение для обсуждения.


Источник статьи:www.liangsonghua.me

Обратите внимание на общедоступную учетную запись WeChat: отчет о консервированных яйцах Songhua Preserved Egg на доске, становитесь более захватывающим!

Введение в общедоступную учетную запись: поделитесь техническими знаниями о работе на JD.com, а также технологиями JAVA и лучшими отраслевыми практиками, большинство из которых являются прагматичными, понятными и воспроизводимыми.