1. Сервисный вызов
Сначала потребитель услуг инициирует удаленный вызов через прокси-объект Proxy, а затем отправляет закодированный запрос на сетевой уровень поставщика услуг через клиента сети Клиент, то есть Сервер. Первое, что делает сервер после получения запроса — декодирует пакет. Затем декодированный запрос отправляется диспетчеру Dispatcher, а затем диспетчер отправляет запрос в указанный пул потоков, и, наконец, пул потоков вызывает конкретную службу. Это процесс отправки и получения запроса на удаленный вызов.
Итак, как запрос отправляется в dubbo? И как выглядит модель потоков?
2. Разделение потоков ввода-вывода и бизнес-потоков
-
Если логика обработки событий может быть завершена быстро и не инициируются новые запросы ввода-вывода, например просто запись идентификатора в память, быстрее обрабатывать непосредственно в потоке ввода-вывода, поскольку планирование пула потоков сокращается.
-
Однако, если логика обработки событий медленная или если необходимо инициировать новый запрос ввода-вывода, например запрос к базе данных, он должен быть отправлен в пул потоков, иначе поток ввода-вывода будет заблокирован, а другие запросы не будут получены. .
-
Если для обработки события используется поток ввода-вывода, и в процессе обработки события инициируется новый запрос ввода-вывода, например запрос входа в систему в событии соединения, будет сообщено об исключении «может вызвать взаимоблокировку», но не будет действительно тупик.
Поэтому в реальных бизнес-сценариях необходимо разделять бизнес-потоки и потоки ввода-вывода. В качестве структуры управления службами dubbo использует Netty как компонент сетевого взаимодействия в нижней части и поддерживает различные стратегии распространения при запросе распространения.
Справочная статья:блог woo woo woo.cn на.com/no_life/art…
3. Стратегия распределения запросов
установление соединения
Судя по официальному описанию, duboo поддерживает пять стратегий распространения, посмотрим, как это реализовано. отNtty4.x为
пример:
-
NettyServer
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }
-
ChannelHandlers#wrapInternal
Прошел метод построения NettyServerprotected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { // 选择调度策略 默认是all return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
ChannelHandlers#wrap
настройки методаMultiMessageHandler
,HeartbeatHandler
И выберите стратегию планирования через расширение SPI. NettyServer#doOpen
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
// 多线程模型
// boss线程池,负责和消费者建立新的连接
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
// worker线程池,负责连接的数据交换
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // nagele 算法
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TIME_WAIT
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //内存池
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) //设置编解码器
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind 端口
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
Установите количество пулов потоков-боссов Netty на 1, пул рабочих потоков (то есть потоков ввода-вывода) на количество ядер процессора + 1 и внедрите обработчик в Netty для кодирования, декодирования и обработки сообщений.
Если мы предоставляем только один сервисный порт Dubbo в процессе JVM, то процесс JVM будет иметь только один экземпляр NettyServer и только один экземпляр NettyHandler. И настройте три обработчика для обработки кодека, создания соединения, чтения и записи сообщений и т. д. Внутри даббо определяетChannelHandler
для использования с НеттиChannel
Связанный, через приведенный выше код найдетNettyServer
сама также являетсяChannelHandler
. пройти черезNettyServer#doOpen
После раскрытия сервисного порта клиент может установить соединение с сервером, а провайдер позвонит после инициализации соединения.NettyHandler#channelActive
метод созданияNettyChannel
NettyChannel
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.debug("channelActive <" + NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()) + ">" + " channle <" + ctx.channel());
//获取或者创建一个NettyChannel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
if (channel != null) {
// <ip:port, channel>
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
}
// 这里的 handler就是NettyServer
handler.connected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
Точно так же, как у Нетти и Даббо есть свой собственный ChannelHandler, у Нетти и Даббо также есть свой собственный канал. Этот метод в конечном итоге будет называтьсяNettyServer#connected
метод, чтобы проверить, будет ли новый добавленный канал превышать допустимую конфигурацию, настроенную провайдером. Если он превышает, журнал ошибок будет распечатан напрямую, и канал будет закрыт. Таким образом, потребитель, естественно, получит ненормальную информацию о соединении прерывание.Подробнее см.AbstractServer#connected
метод.
AbstractServer#connected
public void connected(Channel ch) throws RemotingException {
// If the server has entered the shutdown process, reject any new connection
if (this.isClosing() || this.isClosed()) {
logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
ch.close();
return;
}
Collection<Channel> channels = getChannels();
//大于accepts的tcp连接直接关闭
if (accepts > 0 && channels.size() > accepts) {
logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.close();
return;
}
super.connected(ch);
}
- В dubbo потребители и провайдеры по умолчанию устанавливают только длинное TCP-соединение (подробный код см. во введении к исходному коду официального веб-сайта, в разделе справки по службам). добавить на стороне потребителя:
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" connections="20"/>
- Провайдеры могут использовать accepts для управления количеством длинных подключений, чтобы предотвратить слишком много подключений.Конфигурация выглядит следующим образом:
<dubbo:protocol name="dubbo" port="20880" accepts="10"/>
запрос на получение
Когда соединение установлено, потребитель может запросить услуги провайдера, а когда запрос придёт, провайдер по очереди пройдёт следующую обработку Handler:
--->NettyServerHandler#channelRead
: получить сообщение с запросом.
--->AbstractPeer#received
: Если служба была закрыта, вернитесь, в противном случае вызовите следующий обработчик для обработки.
--->MultiMessageHandler#received
: Если это пакетный запрос, для обработки запроса будет вызван следующий обработчик.
--->HeartbeatHandler#received
: обработка сообщений пульса.
--->AllChannelHandler#received
: Обработчик Dubbo очень важен, потому что отсюда происходит изоляция пула потоков ввода-вывода и пула бизнес-потоков.
--->DecodeHandler#received
: Расшифровка сообщений.
--->HeaderExchangeHandler#received
: обработка сообщений.
--->DubboProtocol
: позвонить в сервис.
-
AllChannelHandler#received
:
public void received(Channel channel, Object message) throws RemotingException {
// 获取业务线程池
ExecutorService cexecutor = getExecutorService();
try {
// 使用线程池处理消息
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
Исключение перехватывается здесь для выполнения, поскольку пул потоков ввода-вывода не ограничен, но пул бизнес-потоков может быть ограничен, поэтому отправка выполнения может столкнуться с RejectedExecutionException.
Итак, как получить пул бизнес-потоков? ФактическиWrappedChannelHandler
даxxxChannelHandlerd
Класс украшения, который можно узнать по даббо спи, получаетAllChannelHandler
будет создан первымWrappedChannelHandler
.
WrappedChannelHandler
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 获取业务线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
резьбовая модель
FixedThreadPool
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 线程池名称DubboServerHanler-server:port
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 缺省线程数量200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 任务队列类型
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
200 потоков используются по умолчанию иSynchronousQueue
Это означает, что если все потоки в пуле потоков работают, новые задачи будут отклонены напрямую.
CachedThreadPool
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心线程数量 缺省为0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大线程数量 缺省为Integer.MAX_VALUE
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// queue 缺省为0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 空闲线程存活时间
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
Кэшированный пул потоков, видно, что если скорость отправки задач больше, чем maxThreads, потоки будут создаваться непрерывно, а ресурсы ЦП и памяти будут исчерпаны в экстремальных условиях. Он не подходит для использования при внезапном появлении большого количества трафика.
LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 缺省核心线程数量为0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 缺省最大线程数量200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 任务队列缺省0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
Если он не настроен, он ничем не отличается от FixedThreadPool.
EagerThreadPool
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// Integer.MAX_VALUE
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// 0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 60s
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// init queue and executor
// 初始任务队列为1
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
EagerThreadPoolExecutor
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
//已提交任务数量
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) { //大于最大线程数被拒绝任务 重新添加到任务队列
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
TaskQueue
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
// 获取当前线程池中的线程数量
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
// 如果已经提交的任务数量小于当前线程池中线程数量(不是很理解这里的操作)
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// return false to let executor create new worker.
//当前线程数小于最大线程程数直接创建新worker
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
Создать первымWorker
Пул потоков. когда количество задач превышаетcorePoolSize
но меньше чемmaximumPoolSize
, приоритетом является созданиеWorker
справиться с задачей. Когда количество задач превышаетmaximumPoolSize
Когда задача ставится в очередь на блокировку. Брошен, когда очередь блокировки заполненаRejectedExecutionException
. (по сравнению с cached: в cached больше задач, чемmaximumPoolSize
напрямую генерировать исключение вместо того, чтобы ставить задачу в очередь блокировки).
Согласно приведенному выше анализу кода, если запрос потребителя слишком быстрый, это может привести к тому, что пул бизнес-потоков поставщика услуг выдастRejectedExecutionException
аномальный. Это исключение является политикой отклонения треда, принятой duboo.AbortPolicyWithReport#rejectedExecution
Он выбрасывается и будет возвращен потребителю.Простым решением в настоящее время является увеличение количества пула потоков, вызывающего службу поставщика, например, в следующей конфигурации:
<dubbo:provider threads="500"/>
或
<dubbo:protocol name="dubbo" port="20882" accepts="10" threads="500"/>
Чтобы убедиться, что основная служба в модуле имеет доступные потоки (чтобы дополнительная служба не вытесняла слишком много потоков, вызывающих службу), вы можете ограничить параллелизм вторичной службы, например:
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" executes="100"/>
Стратегия диспетчера dubbo используется по умолчанию.На самом деле, лучший способ справиться с этим — отделить поток ввода-вывода от бизнес-потока, поэтому лучше использовать сообщение. И если вы используете все, если используемая версия dubo относительно низкая, это, вероятно, вызовет ошибку dubbo. Как только пул бизнес-потоков заполнится, будет выдано исключение отклонения выполнения, и перехваченный метод будет введен для обработки, и этот метод по-прежнему использует пул бизнес-потоков, поэтому весьма вероятно, что пул бизнес-потоков все еще полон в момент на этот раз, что приводит к нисходящему HeaderExchangeHandler. Нет возможности вызвать, и ответное сообщение после обработки исключения точноHeaderExchangeHandler#caught
закончить, так наконецNettyHandler#writeRequested
Если он не вызывается, потребитель может только дождаться тайм-аута и не может получить исключение о том, что пул потоков провайдера заполнен (2.6.x исправил эту проблему).
- Рекомендуемая конфигурация
<dubbo:protocol name="dubbo" port="8888" threads="500" dispatcher="message" />
Справочная статья:медленный указатель. ITeye.com/blog/239117…