Глубокое понимание пула соединений Netty SimpleChannelPool и FixedChannelPool

Netty

В сетевой связи, вообще говоря, клиенту и серверу нужно установить только одно соединение, но в некоторых сценариях нам нужно установить несколько соединений. Например, при использовании балансировки нагрузки, если установлено только одно соединение, может возникнуть сценарий несбалансированной нагрузки.Иногда нам также необходимо установить пул соединений, чтобы увеличить пропускную способность клиента.

Самая большая трудность при создании пула соединений заключается в том, как гарантировать, что указанное нами количество соединений может быть создано в случае высокого параллелизма, и как правильно управлять пулом соединений.Например, что делать, если в пуле соединений нет доступных соединений. ? Как добавить новые соединения в пул соединений после того, как соединение было приостановлено. Netty предоставляет нам два пула соединений для реализации этих функций. SimpleChannelPool инкапсулирует основные функции пула соединений, но не может указывать количество соединений в пуле соединений, поэтому его нельзя применять в рабочей среде. FixedChannelPool — это более мощный пул соединений, который расширяет SimpleChannelPool и может применяться в рабочей среде.

Простейшее использование пула соединений Netty.

public class ClientMock {
    private static SimpleChannelPoolMap poolMap;

    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(1, new DefaultThreadFactory("Client-Event", false));
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        poolMap = new SimpleChannelPoolMap(bootstrap);

        SimpleChannelPool channelPool = poolMap.get(new InetSocketAddress(8090));
        // 从连接池获取一个连接
        channelPool.acquire().addListener(new FutureListener<Channel>() {
            @Override
            public void operationComplete(Future<Channel> future) throws Exception {
                if (future.isSuccess()) {
                    Channel channel = future.getNow();
                    channel.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8));
                    // 将连接放入连接池
                    channelPool.release(channel);
                }
                if (future.cause() != null) {
                    System.out.println(future.cause());
                }
            }
        });
    }
}
public class SimpleChannelPoolMap extends AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool> {
    private Bootstrap bootstrap;
    SimpleHandler simpleHandler = new SimpleHandler();

    public SimpleChannelPoolMap(Bootstrap bootstrap) {
        this.bootstrap = bootstrap;
    }

    @Override
    protected SimpleChannelPool newPool(InetSocketAddress key) {
        return new SimpleChannelPool(bootstrap.remoteAddress(key), new ChannelPoolHandler() {
            @Override
            public void channelReleased(Channel ch) throws Exception {
                System.out.println("channelReleased: " + ch);
            }

            @Override
            public void channelAcquired(Channel ch) throws Exception {
                System.out.println("channelAcquired: " + ch);

            }
            @Override
            public void channelCreated(Channel ch) throws Exception {
                // 为channel添加handler
                ch.pipeline().addLast(simpleHandler);
            }
        });
    }
}

Создать пул соединений netty очень просто.

SimpleChannelPool

Вопрос 1: Как сделать так, чтобы для ключа создавался только один пул соединений?

1.AbstractChannelPoolMapПередача ConcurrentHashMap при получении пула соединенийputIfAbsentГарантировано создание только одного пула соединений.

private final ConcurrentMap<K, P> map = PlatformDependent.newConcurrentHashMap();

@Override
    public final P get(K key) {
        P pool = map.get(checkNotNull(key, "key"));
        if (pool == null) {
            // 创建连接池
            pool = newPool(key);
            // 如果已经创建了连接池,那么就把新的关闭,然后返回老的
            P old = map.putIfAbsent(key, pool);
            if (old != null) {
                // We need to destroy the newly created pool as we not use it.
                poolCloseAsyncIfSupported(pool);
                pool = old;
            }
        }
        return pool;
    }

Вопрос 2: Как создать подключение?

SimpleChannelPool#acquire()

public class SimpleChannelPool implements ChannelPool {
    private static final AttributeKey<SimpleChannelPool> POOL_KEY =
        AttributeKey.newInstance("io.netty.channel.pool.SimpleChannelPool");
    private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
    
    public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
                             boolean releaseHealthCheck, boolean lastRecentUsed) {
        this.handler = checkNotNull(handler, "handler");
        this.healthCheck = checkNotNull(healthCheck, "healthCheck");
        this.releaseHealthCheck = releaseHealthCheck;
        // Clone the original Bootstrap as we want to set our own handler
        this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
        this.bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                assert ch.eventLoop().inEventLoop();
                handler.channelCreated(ch);
            }
        });
        this.lastRecentUsed = lastRecentUsed;
    }
    
    @Override
    public final Future<Channel> acquire() {
        return acquire(bootstrap.config().group().next().<Channel>newPromise());
    }

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
    }    
  • Структура данных, которую пул соединений использует для хранения соединений, представляет собой поточно-ориентированную очередь.
  • releaseHealthCheckУказывает, следует ли выполнять проверку работоспособности соединения при получении или освобождении соединения.
  • lastRecentUsedЕсли это правда, это означает, что соединение получено из хвоста очереди при получении соединения. При значении false он берется из головы очереди. Рекомендуется использовать FIFO, иначе это может привести к тому, что все время будет установлено одно соединение.

При создании пула соединений метод ChannelPoolHandler#channelCreated вызывается для инициализации канала.

Утверждения здесь действительно очень подробные, вызывающиеinitChannelПри вызове метода EventLoop канала был инициализирован, поэтому здесь делается утверждение.

Далее рассмотрим реальный способ подключенияacquire

    @Override
    public final Future<Channel> acquire() {
        // 获取线程选择器,并创建一个Promise
        return acquire(bootstrap.config().group().next().<Channel>newPromise());
    }

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
    }

    private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
        try {
            final Channel ch = pollChannel();
            if (ch == null) {
                // No Channel left in the pool bootstrap a new Channel
                Bootstrap bs = bootstrap.clone();
                bs.attr(POOL_KEY, this);
                //创建连接
                ChannelFuture f = connectChannel(bs);
                if (f.isDone()) {
                    notifyConnect(f, promise);
                } else {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            notifyConnect(future, promise);
                        }
                    });
                }
                return promise;
            }
            EventLoop loop = ch.eventLoop();
            if (loop.inEventLoop()) {
                doHealthCheck(ch, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doHealthCheck(ch, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

Метод получения соединения - полностью асинхронное программирование.Если вы не понимаете исходный код Netty и принцип EevntLoop, то здесь действительно сложно разобраться.

public void promiseTest() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        EventExecutor executorA = new DefaultEventExecutor(new DefaultThreadFactory("EventA"));
        EventExecutor executorB = new DefaultEventExecutor();
        Channel channel = new NioSocketChannel();
        // 为EventLoop注册一个Promise
        Promise<Channel> newPromise = executorA.<Channel>newPromise();
        System.out.println(Thread.currentThread().getName());
        newPromise.addListener(f -> {
            if (f.isSuccess()) {
                Assert.assertEquals(channel, f.getNow());
                System.out.println(Thread.currentThread().getName());
                latch.countDown();
            }
        });
        Assert.assertEquals(false, executorB.inEventLoop());
        executorB.execute(new Runnable() {
            @Override
            public void run() {
                newPromise.setSuccess(channel);
            }
        });
        latch.await();
    }

Этот код может помочь понять стиль асинхронного программирования Netty. Затем мы рассмотрим метод получения SimpleChannelPool.

Во-первых, EventLoop получается через селектор потока EventLoop клиента Netty BootStrap, и создается Promise. Затем EventLoop будет использоваться для выполнения операции подключения канала, а Promise будет уведомлен об успешном или неудачном завершении канала.

затем согласноlastRecentUsedЗначение определяет, использовать ли LIFO или FIFO для получения соединения.

protected Channel pollChannel() {
        return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
    }

Далее, если соединение не получено, будет выполнен метод установления соединения, если соединение получено из пула соединений, будет выполнена проверка работоспособности соединения.

 ChannelFuture f = connectChannel(bs);

connectChannelЭто реальный метод для установления соединения.Этот метод следует той же логике, что и вызов BootStrap#connect().В основном он создает канал и привязывает EventLoop к этому каналу, а также отправляет операцию установления соединения в EventLoop's taskQueue.

private void notifyConnect(ChannelFuture future, Promise<Channel> promise) throws Exception {
        //执行成功
        if (future.isSuccess()) {
            Channel channel = future.channel();
            handler.channelAcquired(channel);
            // 回写结果
            if (!promise.trySuccess(channel)) {
                // Promise was completed in the meantime (like cancelled), just release the channel again
                release(channel);
            }
        } else {
            promise.tryFailure(future.cause());
        }
    }

Если соединение установлено успешно, получите канал через будущее, выполните метод ChannelPoolHandler#channelAcquired и вызовите метод trySuccess объекта Promise, чтобы попытаться установить канал в соответствии с результатом обещания.

⚠️Здесь есть очень важная информация, то есть метод Acquire не кладет соединение в пул соединений после установления соединения и записи результата в промис. Вместо этого соединение будет помещено в пул соединений, когда обещание записи не будет выполнено.

еслиpollChannel()Если полученное соединение не является пустым, для этого соединения будет выполнена проверка работоспособности.

Если соединение активно, выполните ChannelPoolHandler#channelAcquired и запишите канал в промис.

Если соединение неактивно, соединение будет закрыто. и повторно выполнитьacquireHealthyFromPoolOrNewспособ получить новое соединение из пула соединений,

SimpleChannelPool#relesae()

    @Override
    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        checkNotNull(channel, "channel");
        checkNotNull(promise, "promise");
        try {
            EventLoop loop = channel.eventLoop();
            // 判断是不是eventloop线程
            if (loop.inEventLoop()) {
                doReleaseChannel(channel, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doReleaseChannel(channel, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            closeAndFail(channel, cause, promise);
        }
        return promise;
    }

    private void doReleaseChannel(Channel channel, Promise<Void> promise) {
        assert channel.eventLoop().inEventLoop();
        // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
        if (channel.attr(POOL_KEY).getAndSet(null) != this) {
            closeAndFail(channel,
                         // Better include a stacktrace here as this is an user error.
                         new IllegalArgumentException(
                                 "Channel " + channel + " was not acquired from this ChannelPool"),
                         promise);
        } else {
            try {
                if (releaseHealthCheck) {
                    doHealthCheckOnRelease(channel, promise);
                } else {
                    releaseAndOffer(channel, promise);
                }
            } catch (Throwable cause) {
                closeAndFail(channel, cause, promise);
            }
        }
    }

    private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
        final Future<Boolean> f = healthCheck.isHealthy(channel);
        if (f.isDone()) {
            releaseAndOfferIfHealthy(channel, promise, f);
        } else {
            f.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    releaseAndOfferIfHealthy(channel, promise, f);
                }
            });
        }
    }

Метод выпуска сначала определяет, является ли текущий поток потоком цикла событий канала, и если нет, отправляет задачу, помещенную в пул соединений, в поток цикла событий для выполнения.

Когда соединение помещается в пул соединений, оно будет основано наreleaseHealthCheckОпределите, следует ли выполнять проверку работоспособности соединения.

  • Проверка работоспособности включена: если соединение активно, поместите соединение в пул соединений. Если соединение установлено успешно, выполните метод ChannelPoolHandler#channelRelease. В противном случае соединение будет закрыто, а результат будет отправлен в Promise. Если соединение не активно, просто выполните ChannelPoolHandler#channelRelease и уведомите промис о результате.
  • Закрытие проверки работоспособности: поместите соединение непосредственно в пул соединений, в случае сбоя закройте соединение.

SimpleChannelPool реализует основные функции пула соединений, но не может поддерживать ограничение количества соединений в пуле соединений, поэтому в производственной среде нам нужно использоватьFixedChannelPool

FixedChannelPool

public FixedChannelPool(Bootstrap bootstrap,
                            ChannelPoolHandler handler,
                            ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
                            final long acquireTimeoutMillis,
                            int maxConnections, int maxPendingAcquires,
                            boolean releaseHealthCheck, boolean lastRecentUsed) {
        super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
        if (maxConnections < 1) {
            throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
        }
        if (maxPendingAcquires < 1) {
            throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
        }
        if (action == null && acquireTimeoutMillis == -1) {
            timeoutTask = null;
            acquireTimeoutNanos = -1;
        } else if (action == null && acquireTimeoutMillis != -1) {
            throw new NullPointerException("action");
        } else if (action != null && acquireTimeoutMillis < 0) {
            throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
        } else {
            acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
            switch (action) {
            case FAIL:
                timeoutTask = new TimeoutTask() {
                    @Override
                    public void onTimeout(AcquireTask task) {
                        // Fail the promise as we timed out.
                        task.promise.setFailure(new TimeoutException(
                                "Acquire operation took longer then configured maximum time") {
                            @Override
                            public Throwable fillInStackTrace() {
                                return this;
                            }
                        });
                    }
                };
                break;
            case NEW:
                timeoutTask = new TimeoutTask() {
                    @Override
                    public void onTimeout(AcquireTask task) {
                        // Increment the acquire count and delegate to super to actually acquire a Channel which will
                        // create a new connection.
                        task.acquired();

                        FixedChannelPool.super.acquire(task.promise);
                    }
                };
                break;
            default:
                throw new Error();
            }
        }
        executor = bootstrap.config().group().next();
        this.maxConnections = maxConnections;
        this.maxPendingAcquires = maxPendingAcquires;
    }
  • maxConnectionsМаксимальное количество соединений в пуле соединений.
  • acquireTimeoutNanosМаксимальное время ожидания соединения из пула соединений в миллисекундах.
  • maxPendingAcquiresКогда запрос на получение/установку соединения превышает количество maxConnections, создайте максимальное количество задач с синхронизацией, ожидающих установления соединения. Например, maxConnections=2, на данный момент установлено 2 соединения, но они не попали в пул соединений.Следующий запрос будет помещен в запланированную задачу, выполняемую в фоновом режиме.Если в соединении нет соединения пул в то время, он может быть установлен не более чемmaxPendingAcquiresКоличество соединений, если есть соединение в пуле соединений, оно будет получено из пула соединений.
  • executorEventLoop для получения и освобождения соединения.
  • TimeoutTask.FAIL: если в пуле соединений больше нет доступных соединений, подождитеacquireTimeoutNanosПосле этого выдается исключение тайм-аута.
  • TimeoutTask.NEW: если в пуле соединений больше нет доступных соединений, подождитеacquireTimeoutNanosПосле этого создайте новое подключение.

FixedChannelPool#acquire

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        // 使用同一个executor保证线程安全
        try {
            if (executor.inEventLoop()) {
                acquire0(promise);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        acquire0(promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.setFailure(cause);
        }
        return promise;
    }
    
    private void acquire0(final Promise<Channel> promise) {
        assert executor.inEventLoop();

        if (closed) {
            promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
            return;
        }
        if (acquiredChannelCount.get() < maxConnections) {
            assert acquiredChannelCount.get() >= 0;

            // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
            // EventLoop
            // 创建一个新的Promise
            Promise<Channel> p = executor.newPromise();
            AcquireListener l = new AcquireListener(promise);
            l.acquired();
            p.addListener(l);
            super.acquire(p);
        } else {
            if (pendingAcquireCount >= maxPendingAcquires) {
                tooManyOutstanding(promise);
            } else {
                AcquireTask task = new AcquireTask(promise);
                if (pendingAcquireQueue.offer(task)) {
                    ++pendingAcquireCount;

                    if (timeoutTask != null) {
                        task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
                    }
                } else {
                    tooManyOutstanding(promise);
                }
            }

            assert pendingAcquireCount > 0;
        }
    }

FixedChannelPool переопределяет SimpleChannelPoolacquire(final Promise<Channel> promise)способ, поставить все задачи по получению соединенияacquire0Передайте его EventLoop для выполнения, позвольте мне получить его вместо использования селектора потоков для выбора EventLoop каждый раз, когда соединение устанавливается в SimpleChannelPool, чтобы гарантировать, что в случае высокого параллелизмаacquiredChannelCountd < maxConnectionsбезопасность, может быть создано ожидаемое количество подключений. так вacquire0Вам нужно создать новое обещание для EventLoop в FixedChannelPool, а затем вызвать SimpleChannelPool.acquire(final Promise<Channel> promise)Метод используется для установления нового соединения или получения соединения из пула соединений.

Каждый раз, когда соединение создается и соединение не помещается в пул соединенийacquiredChannelCountувеличится на 1, чтобы гарантировать, что не болееmaxConnectionsколичество подключений.acquiredChannelCountd > maxConnections, FixedChannelPool будет основан наpendingAcquireCountЗначение, определяющее, следует ли создавать синхронизированную задачу для установления нового соединения.

private abstract class TimeoutTask implements Runnable {              
    @Override                                                         
    public final void run() {                                         
        assert executor.inEventLoop();                                
        long nanoTime = System.nanoTime();                            
        for (;;) {                                                    
            AcquireTask task = pendingAcquireQueue.peek();            
            // 检查是否到了执行时间        
            if (task == null || nanoTime - task.expireNanoTime < 0) { 
                break;                                                
            }                                                         
            pendingAcquireQueue.remove();                             
                                                                      
            --pendingAcquireCount;                                    
            onTimeout(task);                                          
        }                                                             
    }                                                                 
                                                                      
    public abstract void onTimeout(AcquireTask task);                 
}                                                                     
                                                                      

FixedChannelPool#release

При разрыве соединения EventLoop в FixedChannelPool также используется для обеспечения безопасности потоков при высоком уровне параллелизма и будет выполняться при разрыве соединения.decrementAndRunTaskQueue()метод, после успеха он попытается завершить запланированную задачу и вернуть соединение с промисом из пула соединений.

private void decrementAndRunTaskQueue() {
        // We should never have a negative value.
        int currentCount = acquiredChannelCount.decrementAndGet();
        assert currentCount >= 0;
        runTaskQueue();
    }
    
private void runTaskQueue() {
        while (acquiredChannelCount.get() < maxConnections) {
            AcquireTask task = pendingAcquireQueue.poll();
            if (task == null) {
                break;
            }

            // Cancel the timeout if one was scheduled
            ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
            if (timeoutFuture != null) {
                timeoutFuture.cancel(false);
            }

            --pendingAcquireCount;
            task.acquired();

            super.acquire(task.promise);
        }

        // We should never have a negative value.
        assert pendingAcquireCount >= 0;
        assert acquiredChannelCount.get() >= 0;
    }    
    
    

Самое главное — понять, как используется и работает пул соединений Netty, и как пул соединений Netty решает проблему создания заданного количества соединений в условиях высокой параллелизма. В следующей статье я проанализирую разницу между пулом соединений в Sofa-bolt и реализацией пула соединений Netty.