предисловие
Thriftкоторый предоставилмодель веб-сервиса:один поток,Многопоточность,управляемый событиями, который делится на:блокирующая сервисная модель,Неблокирующая модель обслуживания.
-
Блокирующая сервисная модель:
TSimpleServer,TThreadPoolServer. -
Неблокирующая модель обслуживания:
TNonblockingServer,THsHaServerиTThreadedSelectorServer.
TServerИерархия классов:
текст
TServer
TServerстатический внутренний класс определенArgs,ArgsНаследовать от абстрактного классаAbstractServerArgs.AbstractServerArgsИспользуя модель строителя,TServerДоступны различные фабрики:
| Заводская собственность | заводской тип | эффект |
|---|---|---|
| ProcessorFactory | TProcessorFactory | Класс фабрики слоя обработки для создания конкретных объектов TProcessor |
| InputTransportFactory | TTransportFactory | Класс фабрики ввода транспортного уровня для создания конкретных объектов TTransport |
| OutputTransportFactory | TTransportFactory | Выходной класс фабрики транспортного уровня для создания конкретных объектов TTransport. |
| InputProtocolFactory | TProtocolFactory | Класс фабрики ввода уровня протокола для создания конкретных объектов TProtocol |
| OutputProtocolFactory | TProtocolFactory | Класс фабрики вывода уровня протокола для создания конкретных объектов TProtocol |
НижеTServerЧасть основного кода:
public abstract class TServer {
public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
}
protected TProcessorFactory processorFactory_;
protected TServerTransport serverTransport_;
protected TTransportFactory inputTransportFactory_;
protected TTransportFactory outputTransportFactory_;
protected TProtocolFactory inputProtocolFactory_;
protected TProtocolFactory outputProtocolFactory_;
private boolean isServing;
protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public abstract void serve();
public void stop() {}
public boolean isServing() {
return isServing;
}
protected void setServing(boolean serving) {
isServing = serving;
}
}
TServerтри метода:serve(),stop()иisServing().serve()для запуска сервиса,stop()за отключение службы,isServing()Используется для определения состояния запуска и остановки службы.
TServerизразные классы реализациизапускаются по-разному, поэтомуserve()Определяется как абстрактный метод. Не для всех служб требуется изящный выход, поэтомуstop()Метод не определен как абстрактный.
TSimpleServer
TSimpleServerизРабочий режимс помощью самого простогоблокироватьIO, метод реализации лаконичен и прост для понимания, но одновременно может быть получен и обработан только одинsocketсоединение, эффективность относительно низкая. В основном используется для демонстрацииThriftОн редко используется в реальном процессе разработки.
(1) Рабочий процесс
(2) Начало работы
Сервер:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor processor =
new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.protocolFactory(protocolFactory);
// 简单的单线程服务模型 一般用于测试
TServer tServer = new TSimpleServer(tArgs);
System.out.println("Running Simple Server");
tServer.serve();
Клиент:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("Leo");
System.out.println("Result =: " + result);
transport.close();
(3) Анализ исходного кода
Глядя на исходный код вышеуказанного процесса, т.е.TSimpleServer.javaсерединаserve()Методы, как показано ниже:
serve()Действие метода:
- настраивать
TServerSocketизlisten()способ инициировать соединениемонитор. - отблокироватьспособ принять запрос на подключение от клиента, каждый раз, когдасоединятьто есть создать для него канал
TTransportобъект. - Создать для клиентаобъект обработчика,объект входного транспортного канала,объект выходного транспортного канала,объект протокола вводаиобъект выходного протокола.
- пройти через
TServerEventHandlerОбъекты обрабатывают определенные бизнес-запросы.
ThreadPoolServer
TThreadPoolServerрежим принятблокироватьsocketспособ работы, основной поток отвечаетблокировкаСледить за новымиsocketПриходите, конкретная бизнес-обработка будет переданаПул потоковобрабатывать.
(1) Рабочий процесс
(2) Начало работы
Сервер:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor<HelloWorldService.Iface> processor =
new HelloWorldService.Processor<>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
ttpsArgs.processor(processor);
ttpsArgs.protocolFactory(protocolFactory);
// 线程池服务模型 使用标准的阻塞式IO 预先创建一组线程处理请求
TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
System.out.println("Running ThreadPool Server");
ttpsServer.serve();
Клиент:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadPoolClient");
System.out.println("Result =: " + result);
transport.close();
(3) Анализ исходного кода
ThreadPoolServerрешеноTSimpleServerне поддерживаетсяпараллелизминесколько соединенийпроблема, представленнаяПул потоков. Реализованная модельOne Thread Per Connection. Чтобы просмотреть исходный код вышеуказанного процесса, сначала просмотритеПул потоковфрагмент кода:
TThreadPoolServer.javaсерединаserve()Методы, как показано ниже:
serve()Действие метода:
- настраивать
TServerSocketизlisten()способ инициировать соединениемонитор. - отблокироватьспособ принятьклиентиззапрос на подключение, каждый вход всоединять,будетобъект каналаупакован в
WorkerProcessобъект (WorkerProcessДостигнутоRunnabelинтерфейс) и отправитьПул потоков. -
WorkerProcessизrun()ответственный методобработка бизнеса, созданный для клиентаобъект обработчика,объект входного транспортного канала,объект выходного транспортного канала,объект протокола вводаиобъект выходного протокола. - пройти через
TServerEventHandlerОбъекты обрабатывают определенные бизнес-запросы.
WorkerProcessизrun()метод:
(4) Преимущества и недостатки
Преимущества режима TThreadPoolServer
расколотьнить прослушивания(Accept Thread) и обработкаПодключение клиентаизрабочий поток(Worker Thread),чтение данныхиобработка бизнесасдаватьПул потоковиметь дело с. Таким образом, вБольшое количество параллелизмаНовые подключения также могут быть приняты своевременно.
режим пула потоковболее подходящийСервис-терминалМожет предсказать, сколькоКлиентский параллелизмВ этом случае каждый запрос может быть вовремя обработан пулом бизнес-потоков, и производительность также очень высока.
Недостатки режима TThreadPoolServer
Вычислительная мощность режима пула потоков ограниченаПул потоковспособность работать, когдаодновременные запросыбольше, чем в пуле потоковПотоки, новые запросы могут толькождать в очереди.
TNonblockingServer
TNonblockingServerузор тожеоднопоточная работа, но используяNIOрежиме, с помощьюChannel/Selectorмеханизм, использующийIOмодель событийобрабатывать.
всеsocketзарегистрированы наselector, внитьпрошедшийseletorМониторинг циклавсеsocket.
КаждыйselectorВ конце цикла обработайте всесостояние готовностиизsocket, для входящих данныхsocketпровестичтение данныхоперация, для сокетов с данными для отправкиотправка данныхоперация, для прослушиванияsocketсоздать новый бизнесsocketи поставь эторегистрприбытьselectorначальство.
Примечание: TNonblockingServer требует, чтобы базовый транспортный канал использовал TFramedTransport.
(1) Рабочий процесс
(2) Начало работы
Сервер:
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
tnbArgs.processor(tprocessor);
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TCompactProtocol.Factory());
// 使用非阻塞式IO服务端和客户端需要指定TFramedTransport数据传输的方式
TServer server = new TNonblockingServer(tnbArgs);
System.out.println("Running Non-blocking Server");
server.serve();
Клиент:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TCompactProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("NonBlockingClient");
System.out.println("Result =: " + result);
transport.close();
(3) Анализ исходного кода
TNonblockingServerунаследовано отAbstractNonblockingServer, здесь нас больше интересуетNIOизselectorчасть кода ключа.
(4) Преимущества и недостатки
Преимущества режима TNonblockingServer
по сравнению сTSimpleServerПовышение эффективности в основном отражается наIOмультиплекс на,TNonblockingServerиспользоватьнеблокирующийIO,правильноaccept/read/writeЖдатьIOсобытиемониторииметь дело с, мониторинг несколькихsocketизменение статуса.
Недостатки режима TNonblockingServer
TNonblockingServerрежим вобработка бизнесавсе еще используюпоследовательность одного потокаЧто нужно сделать. Сравнение бизнес-процессовсложный,кропотливыйКогда, например, некоторым функциям интерфейса необходимо читать базу данных и выполняться в течение длительного времени, что вызоветвесь сервисодеялоблокироватьв прямом эфире, в это время режимНе очень эффективно,так какНесколько задач запроса вызовавсе ещеприказВыполнять по одному.
THsHaServer
с учетомTNonblockingServerнедостатки,THsHaServerунаследовано отTNonblockingServer, представилПул потоковУлучшенная обработка задачВозможность параллелизма.THsHaServerдаполусинхронный полуасинхронный(Half-Sync/Half-Async) режим обработки,Half-Aysncиспользуется дляIOобработка событий(Accept/Read/Write),Half-Syncдля бизнесаhandlerправильноrpcизСинхронная обработканачальство.
Примечание. THsHaServer, как и TNonblockingServer, требует, чтобы базовый транспортный канал использовал TFramedTransport.
(1) Рабочий процесс
(2) Начало работы
Сервер:
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 半同步半异步
THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
thhsArgs.processor(tprocessor);
thhsArgs.transportFactory(new TFramedTransport.Factory());
thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new THsHaServer(thhsArgs);
System.out.println("Running HsHa Server");
server.serve();
Клиент:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("HsHaClient");
System.out.println("Result =: " + result);
transport.close();
(3) Анализ исходного кода
THsHaServerунаследовано отTNonblockingServer, недавно добавленныйПул потоковДля функции параллельной обработки рабочих задач см. соответствующий код пула потоков:
пул потоков задачПроцесс создания:
Следующий TThreadedSelectorServer включает в себя большинство функций THsHaServer.Для анализа исходного кода обратитесь к TThreadedSelectorServer.
(4) Преимущества и недостатки
Преимущества THsHaServer
THsHaServerиTNonblockingServerрежим по сравнению сTHsHaServerпо окончаниичтение данныхПосле этогообработка бизнесапроцесс с помощьюПул потоковЧтобы сделать,основной потоквернуться прямоследующий циклЭксплуатация, эффективность значительно улучшена.
Недостатки THsHaServer
основной потокеще нужно все закончитьsocketизконтролировать прием,чтение данныхизапись данныхработать. когдаодновременные запросыбольше, и отправитьОбъем данныхбольше, мониторsocketначальствоновый запрос на подключениене может быть принято вовремя.
TThreadedSelectorServer
TThreadedSelectorServerправдаTHsHaServerрасширениеselectorсерединапрочти и напишиIOсобытие(read/write)отосновной потокотделились. импортировать одновременноworkerпул рабочих потоков, это тоже видHalf-Sync/Half-Asyncсервисная модель.
TThreadedSelectorServerрежим в настоящее времяThriftсамый продвинутыймодель обслуживания потоков, внутри него есть несколько частей:
-
Один
AcceptThreadОбъект потока, специально используемый для обработки прослушиванияsocketновое подключение на . -
несколько
SelectorThreadОбъекты специализированы для ведения бизнесаsocketизИнтернетI/Oпрочти и напиширабота, все сетевые данныепрочти и напишиВсе делается этими нитями. - Одинбалансировщик нагрузки
SelectorThreadLoadBalancerобъект, в основном используемый дляAcceptThreadнитьполучил новыйsocketКогда делается запрос на соединение, принимается решение, что этоновое соединениекоторому назначен запросSelectorThreadнить. - Один
ExecutorServiceТиппул рабочих потоков,существуетSelectorThreadВ потоке обнаружен бизнесsocketЕсли есть запрос на звонок, он будетзапросить чтение данныхПосле этого дайтеExecutorServiceПул потоковПоток in завершает конкретное выполнение этого вызова. В основном используется для обработки каждогоrpcпросилhandlerобработка обратного вызова(Эта частьСинхронный).
(1) Рабочий процесс
(2) Начало работы
Сервер:
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 多线程半同步半异步
TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
ttssArgs.processor(processor);
ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
// 使用非阻塞式IO时 服务端和客户端都需要指定数据传输方式为TFramedTransport
ttssArgs.transportFactory(new TFramedTransport.Factory());
// 多线程半同步半异步的服务模型
TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
System.out.println("Running ThreadedSelector Server");
server.serve();
Клиент:
for (int i = 0; i < 10; i++) {
new Thread("Thread " + i) {
@Override
public void run() {
// 设置传输通道 对于非阻塞服务 需要使用TFramedTransport(用于将数据分块发送)
for (int j = 0; j < 10; j++) {
TTransport transport = null;
try {
transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadedSelector Client");
System.out.println("Result =: " + result);
transport.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭传输通道
transport.close();
}
}
}
}.start();
}
(3) Основной код
Три компонента описанного выше рабочего процессаAcceptThread,SelectorThreadиExecutorServiceОпределение в исходном коде выглядит следующим образом:
TThreadedSelectorServerВ выкройке есть специальная нитьAcceptThreadдля обработкиновый запрос на подключение, так что он может ответить вовремяМного одновременных запросов на подключение; дополнительно будетСетевые операции ввода/выводараспространяться на несколькоSelectorThreadнитьзавершается в середине, поэтому можно быстроИнтернетI/Oпровестиоперации чтения и записи, может хорошо справиться сИнтернетI/Oбольше случаев.
TThreadedSelectorServerПараметры по умолчанию определены следующим образом:
- Количество потоков селектора по умолчанию, отвечающих за чтение и запись сетевых операций ввода-вывода (selectorThreads): 2.
- Количество рабочих потоков по умолчанию, отвечающих за бизнес-процессы (workerThreads): 5
- Размер очереди задач одного потока в пуле рабочих потоков (acceptQueueSizePerThread): 4
Создать, инициализировать и запуститьAcceptThreadиSelectorThreads, начать одновременноselectorрезьбовойбалансировщик нагрузки(selectorThreads).
Исходный код AcceptThread
AcceptThreadунаследовано отThread, видно, что он содержит три важных свойства:неблокирующий канал передачи(TNonblockingServerTransport),NIOСелектор(acceptSelector)ибалансировщик нагрузки потока селектора(threadChooser).
ПроверятьAcceptThreadизrun()метод, видно, чтоacceptКак только поток запустится, он будет продолжать вызыватьselect()метод:
Проверятьselect()метод,acceptSelectorСелекторждатьIOПрибытие события, получитьSelectionKeyТо есть проверить,acceptсобытие. Если да, проходитеhandleAccept()метод получаетновое соединение; иначе, если даIOчтение и запись событий,AcceptThreadНикаких действий предпринято не будет, оно будет переданоSelectorThreadЗаканчивать.
существуетhandleAccept()метод, первый проходdoAccept()получитьканал связи,ПотомSelectorбалансировщик нагрузки потоковВыбери одинSelectorнить, завершить следующуюIOчтение и запись событий.
Продолжить, чтобы увидеть следующееdoAddAccept()Реализация метода, нет ожидания, далее вызываетSelectorThreadизaddAcceptedConnection()метод, положитьобъект неблокирующего транспортного каналаПерейти кселекторная нитьделать дальшеIOоперации чтения и записи.
Исходный код SelectorThreadLoadBalancer
SelectorThreadLoadBalancerКак создать?
SelectorThreadLoadBalancerоснован наалгоритм опросаизSelectorселектор нити,пройти черезитератор потокадля новичковсоединятьпоследовательное присвоениеSelectorThread.
Исходный код SelectorThread
SelectorThreadиAcceptThreadто же самое, даTThreadedSelectorServerодин извнутренний класс-член, каждыйSelectorThreadобъект потокаесть один внутриочередь блокировки, используется для хранения потокаполучилаизканал связи.
очередь блокировкиРазмер может быть указан конструктором:
видно выше, вAcceptThreadизdoAddAccept()метод называетсяSelectorThreadизaddAcceptedConnection()метод.
Этот метод делает две вещи:
- будет это
SelectorThreadполучено потокомканал связиположить вочередь блокировкисередина. - пройти через
wakeup()метод пробужденияSelectorThreadсерединаNIOСелекторselector.
теперь, когдаSelectorThreadтакже унаследовано отThread, см. егоrun()Реализация метода:
SelectorThreadметодselect()мониторIOсобытия, только для обработкичтение данныхизапись данных. Если соединение имеетданные доступны для чтения, прочитайте и начните сframeпуть к кэшу, при необходимости к соединениюввод данных, кэшировать и отправлять данные клиента. И вчтение и запись данныхПосле обработки необходимоNIOизselectorпустойивыйтисвояSelectionKey.
-
операция записи данныхПосле завершения весь
rpcПроцесс вызова завершен.handleWrite()Методы, как показано ниже:
-
операция чтения данныхПосле завершения,
Thriftбуду использоватьчитать данныевоплощать в жизньцелевой метод,handleRead()Методы, как показано ниже:
handleReadметод выполняетсяread()метод, будетчтение данныхКогда это будет сделано, он вызоветrequestInvoke()вызов методацелевой методПолная конкретная бизнес-обработка.requestInvoke()метод будетзапросить данныеупакован какRunnableобъект, представленныйПул рабочих потоков(ExecutorService) для обработки.
select()После завершения метода поток продолжает работатьprocessAcceptedConnections()метод обработкиследующее соединениеизIOсобытие.
Вот несколько основных операций:
- попробовать из
SelectorThreadизочередь блокировкиacceptedQueueполучить одинподключенный канал передачи. Если приобретение прошло успешно, звонитеregisterAccepted()метод; в противном случае введите следующий цикл. -
registerAccepted()метод будетканал передачибазасоединятьзарегистрироваться наNIOизСелекторselectorвыше, получитьSelectionKey. - Создавать
FrameBufferобъект и привязать к полученномуSelectionKeyВыше, используется в середине передачи данныхчтение и запись кеша.
Суммировать
Эта статья оThriftвсе видымодель обслуживания потоковвведено, в том числе 2блокирующая сервисная модель:TSimpleServer,TThreadPoolServer,3 типаНеблокирующая модель обслуживания:TNonblockingServer,THsHaServerиTThreadedSelectorServer. для различных моделей обслуживанияконкретное использование,процесс работы,Принцип и реализация исходного кодаПроведен определенный анализ.
Ввиду длины статьи, пожалуйста, просматривайте ее медленно!
Ссылки по теме
-
Подробное объяснение серии Apache Thrift (1) — обзор и начало работы
-
Подробное объяснение Apache Thrift Series (2) — Модель сетевых служб
-
Подробное объяснение серии Apache Thrift (3) — механизм сериализации
Добро пожаловать в технический публичный аккаунт: Zero One Technology Stack
Эта учетная запись будет продолжать делиться сухими товарами серверных технологий, включая основы виртуальных машин, многопоточное программирование, высокопроизводительные фреймворки, асинхронное ПО, промежуточное ПО для кэширования и обмена сообщениями, распределенные и микросервисы, материалы для обучения архитектуре и расширенные учебные материалы и статьи.