предисловие
Thrift
который предоставилмодель веб-сервиса:один поток,Многопоточность,управляемый событиями, который делится на:блокирующая сервисная модель,Неблокирующая модель обслуживания.
-
Блокирующая сервисная модель:
TSimpleServer
,TThreadPoolServer
. -
Неблокирующая модель обслуживания:
TNonblockingServer
,THsHaServer
иTThreadedSelectorServer
.
TServer
Иерархия классов:
текст
TServer
TServer
статический внутренний класс определенArgs
,Args
Наследовать от абстрактного классаAbstractServerArgs
.AbstractServerArgs
Используя модель строителя,TServer
Доступны различные фабрики:
Заводская собственность | заводской тип | эффект |
---|---|---|
ProcessorFactory | TProcessorFactory | Класс фабрики слоя обработки для создания конкретных объектов TProcessor |
InputTransportFactory | TTransportFactory | Класс фабрики ввода транспортного уровня для создания конкретных объектов TTransport |
OutputTransportFactory | TTransportFactory | Выходной класс фабрики транспортного уровня для создания конкретных объектов TTransport. |
InputProtocolFactory | TProtocolFactory | Класс фабрики ввода уровня протокола для создания конкретных объектов TProtocol |
OutputProtocolFactory | TProtocolFactory | Класс фабрики вывода уровня протокола для создания конкретных объектов TProtocol |
НижеTServer
Часть основного кода:
public abstract class TServer {
public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
}
protected TProcessorFactory processorFactory_;
protected TServerTransport serverTransport_;
protected TTransportFactory inputTransportFactory_;
protected TTransportFactory outputTransportFactory_;
protected TProtocolFactory inputProtocolFactory_;
protected TProtocolFactory outputProtocolFactory_;
private boolean isServing;
protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public abstract void serve();
public void stop() {}
public boolean isServing() {
return isServing;
}
protected void setServing(boolean serving) {
isServing = serving;
}
}
TServer
три метода:serve()
,stop()
иisServing()
.serve()
для запуска сервиса,stop()
за отключение службы,isServing()
Используется для определения состояния запуска и остановки службы.
TServer
изразные классы реализациизапускаются по-разному, поэтомуserve()
Определяется как абстрактный метод. Не для всех служб требуется изящный выход, поэтомуstop()
Метод не определен как абстрактный.
TSimpleServer
TSimpleServer
изРабочий режимс помощью самого простогоблокироватьIO, метод реализации лаконичен и прост для понимания, но одновременно может быть получен и обработан только одинsocket
соединение, эффективность относительно низкая. В основном используется для демонстрацииThrift
Он редко используется в реальном процессе разработки.
(1) Рабочий процесс
(2) Начало работы
Сервер:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor processor =
new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.protocolFactory(protocolFactory);
// 简单的单线程服务模型 一般用于测试
TServer tServer = new TSimpleServer(tArgs);
System.out.println("Running Simple Server");
tServer.serve();
Клиент:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("Leo");
System.out.println("Result =: " + result);
transport.close();
(3) Анализ исходного кода
Глядя на исходный код вышеуказанного процесса, т.е.TSimpleServer.java
серединаserve()
Методы, как показано ниже:
serve()
Действие метода:
- настраивать
TServerSocket
изlisten()
способ инициировать соединениемонитор. - отблокироватьспособ принять запрос на подключение от клиента, каждый раз, когдасоединятьто есть создать для него канал
TTransport
объект. - Создать для клиентаобъект обработчика,объект входного транспортного канала,объект выходного транспортного канала,объект протокола вводаиобъект выходного протокола.
- пройти через
TServerEventHandler
Объекты обрабатывают определенные бизнес-запросы.
ThreadPoolServer
TThreadPoolServer
режим принятблокироватьsocket
способ работы, основной поток отвечаетблокировкаСледить за новымиsocket
Приходите, конкретная бизнес-обработка будет переданаПул потоковобрабатывать.
(1) Рабочий процесс
(2) Начало работы
Сервер:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor<HelloWorldService.Iface> processor =
new HelloWorldService.Processor<>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
ttpsArgs.processor(processor);
ttpsArgs.protocolFactory(protocolFactory);
// 线程池服务模型 使用标准的阻塞式IO 预先创建一组线程处理请求
TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
System.out.println("Running ThreadPool Server");
ttpsServer.serve();
Клиент:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadPoolClient");
System.out.println("Result =: " + result);
transport.close();
(3) Анализ исходного кода
ThreadPoolServer
решеноTSimpleServer
не поддерживаетсяпараллелизминесколько соединенийпроблема, представленнаяПул потоков. Реализованная модельOne Thread Per Connection
. Чтобы просмотреть исходный код вышеуказанного процесса, сначала просмотритеПул потоковфрагмент кода:
TThreadPoolServer.java
серединаserve()
Методы, как показано ниже:
serve()
Действие метода:
- настраивать
TServerSocket
изlisten()
способ инициировать соединениемонитор. - отблокироватьспособ принятьклиентиззапрос на подключение, каждый вход всоединять,будетобъект каналаупакован в
WorkerProcess
объект (WorkerProcess
ДостигнутоRunnabel
интерфейс) и отправитьПул потоков. -
WorkerProcess
изrun()
ответственный методобработка бизнеса, созданный для клиентаобъект обработчика,объект входного транспортного канала,объект выходного транспортного канала,объект протокола вводаиобъект выходного протокола. - пройти через
TServerEventHandler
Объекты обрабатывают определенные бизнес-запросы.
WorkerProcess
изrun()
метод:
(4) Преимущества и недостатки
Преимущества режима TThreadPoolServer
расколотьнить прослушивания(Accept Thread
) и обработкаПодключение клиентаизрабочий поток(Worker Thread
),чтение данныхиобработка бизнесасдаватьПул потоковиметь дело с. Таким образом, вБольшое количество параллелизмаНовые подключения также могут быть приняты своевременно.
режим пула потоковболее подходящийСервис-терминалМожет предсказать, сколькоКлиентский параллелизмВ этом случае каждый запрос может быть вовремя обработан пулом бизнес-потоков, и производительность также очень высока.
Недостатки режима TThreadPoolServer
Вычислительная мощность режима пула потоков ограниченаПул потоковспособность работать, когдаодновременные запросыбольше, чем в пуле потоковПотоки, новые запросы могут толькождать в очереди.
TNonblockingServer
TNonblockingServer
узор тожеоднопоточная работа, но используяNIO
режиме, с помощьюChannel/Selector
механизм, использующийIO
модель событийобрабатывать.
всеsocket
зарегистрированы наselector
, внитьпрошедшийseletor
Мониторинг циклавсеsocket
.
Каждыйselector
В конце цикла обработайте всесостояние готовностиизsocket
, для входящих данныхsocket
провестичтение данныхоперация, для сокетов с данными для отправкиотправка данныхоперация, для прослушиванияsocket
создать новый бизнесsocket
и поставь эторегистрприбытьselector
начальство.
Примечание: TNonblockingServer требует, чтобы базовый транспортный канал использовал TFramedTransport.
(1) Рабочий процесс
(2) Начало работы
Сервер:
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
tnbArgs.processor(tprocessor);
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TCompactProtocol.Factory());
// 使用非阻塞式IO服务端和客户端需要指定TFramedTransport数据传输的方式
TServer server = new TNonblockingServer(tnbArgs);
System.out.println("Running Non-blocking Server");
server.serve();
Клиент:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TCompactProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("NonBlockingClient");
System.out.println("Result =: " + result);
transport.close();
(3) Анализ исходного кода
TNonblockingServer
унаследовано отAbstractNonblockingServer
, здесь нас больше интересуетNIO
изselector
часть кода ключа.
(4) Преимущества и недостатки
Преимущества режима TNonblockingServer
по сравнению сTSimpleServer
Повышение эффективности в основном отражается наIO
мультиплекс на,TNonblockingServer
использоватьнеблокирующийIO
,правильноaccept/read/write
ЖдатьIO
событиемониторииметь дело с, мониторинг несколькихsocket
изменение статуса.
Недостатки режима TNonblockingServer
TNonblockingServer
режим вобработка бизнесавсе еще используюпоследовательность одного потокаЧто нужно сделать. Сравнение бизнес-процессовсложный,кропотливыйКогда, например, некоторым функциям интерфейса необходимо читать базу данных и выполняться в течение длительного времени, что вызоветвесь сервисодеялоблокироватьв прямом эфире, в это время режимНе очень эффективно,так какНесколько задач запроса вызовавсе ещеприказВыполнять по одному.
THsHaServer
с учетомTNonblockingServer
недостатки,THsHaServer
унаследовано отTNonblockingServer
, представилПул потоковУлучшенная обработка задачВозможность параллелизма.THsHaServer
даполусинхронный полуасинхронный(Half-Sync/Half-Async
) режим обработки,Half-Aysnc
используется дляIO
обработка событий(Accept/Read/Write
),Half-Sync
для бизнесаhandler
правильноrpc
изСинхронная обработканачальство.
Примечание. THsHaServer, как и TNonblockingServer, требует, чтобы базовый транспортный канал использовал TFramedTransport.
(1) Рабочий процесс
(2) Начало работы
Сервер:
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 半同步半异步
THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
thhsArgs.processor(tprocessor);
thhsArgs.transportFactory(new TFramedTransport.Factory());
thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new THsHaServer(thhsArgs);
System.out.println("Running HsHa Server");
server.serve();
Клиент:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("HsHaClient");
System.out.println("Result =: " + result);
transport.close();
(3) Анализ исходного кода
THsHaServer
унаследовано отTNonblockingServer
, недавно добавленныйПул потоковДля функции параллельной обработки рабочих задач см. соответствующий код пула потоков:
пул потоков задачПроцесс создания:
Следующий TThreadedSelectorServer включает в себя большинство функций THsHaServer.Для анализа исходного кода обратитесь к TThreadedSelectorServer.
(4) Преимущества и недостатки
Преимущества THsHaServer
THsHaServer
иTNonblockingServer
режим по сравнению сTHsHaServer
по окончаниичтение данныхПосле этогообработка бизнесапроцесс с помощьюПул потоковЧтобы сделать,основной потоквернуться прямоследующий циклЭксплуатация, эффективность значительно улучшена.
Недостатки THsHaServer
основной потокеще нужно все закончитьsocket
изконтролировать прием,чтение данныхизапись данныхработать. когдаодновременные запросыбольше, и отправитьОбъем данныхбольше, мониторsocket
начальствоновый запрос на подключениене может быть принято вовремя.
TThreadedSelectorServer
TThreadedSelectorServer
правдаTHsHaServer
расширениеselector
серединапрочти и напишиIO
событие(read/write
)отосновной потокотделились. импортировать одновременноworker
пул рабочих потоков, это тоже видHalf-Sync/Half-Async
сервисная модель.
TThreadedSelectorServer
режим в настоящее времяThrift
самый продвинутыймодель обслуживания потоков, внутри него есть несколько частей:
-
Один
AcceptThread
Объект потока, специально используемый для обработки прослушиванияsocket
новое подключение на . -
несколько
SelectorThread
Объекты специализированы для ведения бизнесаsocket
изИнтернетI/O
прочти и напиширабота, все сетевые данныепрочти и напишиВсе делается этими нитями. - Одинбалансировщик нагрузки
SelectorThreadLoadBalancer
объект, в основном используемый дляAcceptThread
нитьполучил новыйsocket
Когда делается запрос на соединение, принимается решение, что этоновое соединениекоторому назначен запросSelectorThread
нить. - Один
ExecutorService
Типпул рабочих потоков,существуетSelectorThread
В потоке обнаружен бизнесsocket
Если есть запрос на звонок, он будетзапросить чтение данныхПосле этого дайтеExecutorService
Пул потоковПоток in завершает конкретное выполнение этого вызова. В основном используется для обработки каждогоrpc
просилhandler
обработка обратного вызова(Эта частьСинхронный).
(1) Рабочий процесс
(2) Начало работы
Сервер:
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 多线程半同步半异步
TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
ttssArgs.processor(processor);
ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
// 使用非阻塞式IO时 服务端和客户端都需要指定数据传输方式为TFramedTransport
ttssArgs.transportFactory(new TFramedTransport.Factory());
// 多线程半同步半异步的服务模型
TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
System.out.println("Running ThreadedSelector Server");
server.serve();
Клиент:
for (int i = 0; i < 10; i++) {
new Thread("Thread " + i) {
@Override
public void run() {
// 设置传输通道 对于非阻塞服务 需要使用TFramedTransport(用于将数据分块发送)
for (int j = 0; j < 10; j++) {
TTransport transport = null;
try {
transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadedSelector Client");
System.out.println("Result =: " + result);
transport.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭传输通道
transport.close();
}
}
}
}.start();
}
(3) Основной код
Три компонента описанного выше рабочего процессаAcceptThread
,SelectorThread
иExecutorService
Определение в исходном коде выглядит следующим образом:
TThreadedSelectorServer
В выкройке есть специальная нитьAcceptThread
для обработкиновый запрос на подключение, так что он может ответить вовремяМного одновременных запросов на подключение; дополнительно будетСетевые операции ввода/выводараспространяться на несколькоSelectorThread
нитьзавершается в середине, поэтому можно быстроИнтернетI/O
провестиоперации чтения и записи, может хорошо справиться сИнтернетI/O
больше случаев.
TThreadedSelectorServer
Параметры по умолчанию определены следующим образом:
- Количество потоков селектора по умолчанию, отвечающих за чтение и запись сетевых операций ввода-вывода (selectorThreads): 2.
- Количество рабочих потоков по умолчанию, отвечающих за бизнес-процессы (workerThreads): 5
- Размер очереди задач одного потока в пуле рабочих потоков (acceptQueueSizePerThread): 4
Создать, инициализировать и запуститьAcceptThread
иSelectorThreads
, начать одновременноselector
резьбовойбалансировщик нагрузки(selectorThreads
).
Исходный код AcceptThread
AcceptThread
унаследовано отThread
, видно, что он содержит три важных свойства:неблокирующий канал передачи(TNonblockingServerTransport
),NIO
Селектор(acceptSelector
)ибалансировщик нагрузки потока селектора(threadChooser
).
ПроверятьAcceptThread
изrun()
метод, видно, чтоaccept
Как только поток запустится, он будет продолжать вызыватьselect()
метод:
Проверятьselect()
метод,acceptSelector
СелекторждатьIO
Прибытие события, получитьSelectionKey
То есть проверить,accept
событие. Если да, проходитеhandleAccept()
метод получаетновое соединение; иначе, если даIO
чтение и запись событий,AcceptThread
Никаких действий предпринято не будет, оно будет переданоSelectorThread
Заканчивать.
существуетhandleAccept()
метод, первый проходdoAccept()
получитьканал связи,ПотомSelector
балансировщик нагрузки потоковВыбери одинSelector
нить, завершить следующуюIO
чтение и запись событий.
Продолжить, чтобы увидеть следующееdoAddAccept()
Реализация метода, нет ожидания, далее вызываетSelectorThread
изaddAcceptedConnection()
метод, положитьобъект неблокирующего транспортного каналаПерейти кселекторная нитьделать дальшеIO
операции чтения и записи.
Исходный код SelectorThreadLoadBalancer
SelectorThreadLoadBalancer
Как создать?
SelectorThreadLoadBalancer
основан наалгоритм опросаизSelector
селектор нити,пройти черезитератор потокадля новичковсоединятьпоследовательное присвоениеSelectorThread
.
Исходный код SelectorThread
SelectorThread
иAcceptThread
то же самое, даTThreadedSelectorServer
один извнутренний класс-член, каждыйSelectorThread
объект потокаесть один внутриочередь блокировки, используется для хранения потокаполучилаизканал связи.
очередь блокировкиРазмер может быть указан конструктором:
видно выше, вAcceptThread
изdoAddAccept()
метод называетсяSelectorThread
изaddAcceptedConnection()
метод.
Этот метод делает две вещи:
- будет это
SelectorThread
получено потокомканал связиположить вочередь блокировкисередина. - пройти через
wakeup()
метод пробужденияSelectorThread
серединаNIO
Селекторselector
.
теперь, когдаSelectorThread
также унаследовано отThread
, см. егоrun()
Реализация метода:
SelectorThread
методselect()
мониторIO
события, только для обработкичтение данныхизапись данных. Если соединение имеетданные доступны для чтения, прочитайте и начните сframe
путь к кэшу, при необходимости к соединениюввод данных, кэшировать и отправлять данные клиента. И вчтение и запись данныхПосле обработки необходимоNIO
изselector
пустойивыйтисвояSelectionKey
.
-
операция записи данныхПосле завершения весь
rpc
Процесс вызова завершен.handleWrite()
Методы, как показано ниже:
-
операция чтения данныхПосле завершения,
Thrift
буду использоватьчитать данныевоплощать в жизньцелевой метод,handleRead()
Методы, как показано ниже:
handleRead
метод выполняетсяread()
метод, будетчтение данныхКогда это будет сделано, он вызоветrequestInvoke()
вызов методацелевой методПолная конкретная бизнес-обработка.requestInvoke()
метод будетзапросить данныеупакован какRunnable
объект, представленныйПул рабочих потоков(ExecutorService
) для обработки.
select()
После завершения метода поток продолжает работатьprocessAcceptedConnections()
метод обработкиследующее соединениеизIO
событие.
Вот несколько основных операций:
- попробовать из
SelectorThread
изочередь блокировкиacceptedQueue
получить одинподключенный канал передачи. Если приобретение прошло успешно, звонитеregisterAccepted()
метод; в противном случае введите следующий цикл. -
registerAccepted()
метод будетканал передачибазасоединятьзарегистрироваться наNIO
изСелекторselector
выше, получитьSelectionKey
. - Создавать
FrameBuffer
объект и привязать к полученномуSelectionKey
Выше, используется в середине передачи данныхчтение и запись кеша.
Суммировать
Эта статья оThrift
все видымодель обслуживания потоковвведено, в том числе 2блокирующая сервисная модель:TSimpleServer
,TThreadPoolServer
,3 типаНеблокирующая модель обслуживания:TNonblockingServer
,THsHaServer
иTThreadedSelectorServer
. для различных моделей обслуживанияконкретное использование,процесс работы,Принцип и реализация исходного кодаПроведен определенный анализ.
Ввиду длины статьи, пожалуйста, просматривайте ее медленно!
Ссылки по теме
-
Подробное объяснение серии Apache Thrift (1) — обзор и начало работы
-
Подробное объяснение Apache Thrift Series (2) — Модель сетевых служб
-
Подробное объяснение серии Apache Thrift (3) — механизм сериализации
Добро пожаловать в технический публичный аккаунт: Zero One Technology Stack
Эта учетная запись будет продолжать делиться сухими товарами серверных технологий, включая основы виртуальных машин, многопоточное программирование, высокопроизводительные фреймворки, асинхронное ПО, промежуточное ПО для кэширования и обмена сообщениями, распределенные и микросервисы, материалы для обучения архитектуре и расширенные учебные материалы и статьи.