Что такое бережливость?
Thrift
Это облегченная межъязыковая платформа удаленного вызова служб, которая поддерживаетC++
,Java
,Python
,PHP
,Ruby
Ждать. Автоматически генерировать RPC-интерфейс с помощью механизма генерации кода.Thrift IDL
Файлы, соответствующие различным основным языкамRPC
Код шаблона сервера/клиента экономит основную работу по настройке и поддержке кодирования и декодирования интерфейса, передачи сообщений, модели многопоточности сервера и т. д. Серверу нужно только написать класс реализации интерфейса, а клиент вызывает удаленную службу в соответствии с сервисный объект.
Бережливая архитектура
Thrift
Архитектура состоит из транспортного уровня, уровня протокола, уровня обработки и уровня обслуживания снизу вверх.
- Транспортный уровень в основном отвечает за чтение/запись данных из сети и определяет сетевой транспортный протокол.
- Уровень протокола определяет формат передачи данных и отвечает за сериализацию и десериализацию сетевых данных.
- слой обработки
IDL
Сгенерируйте, инкапсулируйте конкретные базовые методы сетевой передачи и сериализации и делегируйте их пользователю для реализации.Handle
для обработки. - Сервисный уровень обеспечивает сервисную модель сетевого ввода-вывода.
Что такое протоколы Thrift?
Thrift
Позволяет пользователю выбрать тип протокола связи, передаваемого между клиентом и сервером.
-
TBinaryProtocol
: передавать с использованием двоичного формата кодирования,Thrift
Транспортный протокол по умолчанию для . -
TCompactProtocol
: передача в сжатом формате. -
TJSONProtocol
:использоватьJSON
передача форматов. -
TDebugProtocol
: Используйте текстовый формат для передачи, который удобен дляdebug
. -
TSimpleJSONProtocol
:поставкаJSON
Протокол только для записи, подходящий для анализа с помощью языков сценариев.
Что такое транспортные уровни Thrift?
-
TSocket
: Блокирующий ввод-вывод, используемый на клиенте. -
TServerSocket
: неблокирующий ввод-вывод для прослушивания на стороне сервераTSocket
. -
TNonblockingSocket
: Неблокирующий ввод-вывод для создания асинхронных клиентов. -
TMemoryInputTransport
: инкапсулирует массив байтовbyte[]
Сделайте входной поток. -
TFramedTransport
: неблокирующий ввод-вывод, передача блоками (аналогичноNIO
).
Что на стороне сервера Thrift?
TServer
TServer
статический внутренний класс определенArgs
,Args
Наследовать от абстрактного классаAbstractServerArgs
.AbstractServerArgs
Используя модель строителя,TServer
Доступны различные фабрики.
Атрибуты | Типы | эффект |
---|---|---|
processorFactory | TProcessorFactory | Класс фабрики слоя обработки для создания объектов TProcessor |
inputTransportFactory | TTransportFactory | Класс фабрики ввода транспортного уровня для создания объектов TTransport |
outputTransportFactory | TTransportFactory | Выходной класс фабрики транспортного уровня для создания объектов TTransport |
inputProtocolFactory | TProtocolFactory | Класс фабрики ввода уровня протокола для создания объектов TProtocol |
outputProtocolFactory | TProtocolFactory | Класс фабрики вывода уровня протокола для создания объектов TProtocol |
TServer
Основной метод:
serve():启动服务。serve()为抽象方法,不同实现类的启动方式不一样,可各自实现。
stop():关闭服务。
isServing():检测服务状态(启动/关闭)。
setServing(boolean serving):设置服务状态。
TSimpleServer
1. Особенности
Однопоточный блокирующий ввод-вывод.
2. Дизайн-мышление
Основной поток отвечает за мониторинг, чтение и запись данных и бизнес-обработку (одновременно может быть получен и обработан только один поток).socket
соединять).
3. Используйте
Клиент:
public class HelloClient {
private static final Logger LOGGER = Logger.getLogger(HelloClient.class.getName());
public static void main(String[] args) {
TTransport transport = null;
try {
//传输层使用阻塞I/O
transport = new TSocket("127.0.0.1", 9090);
transport.open();
//使用二进制协议传输数据
TProtocol protocol = new TBinaryProtocol(transport);
//使用同步客户端
GreetingService.Client client = new GreetingService.Client(protocol);
String name = "XuDT";
LOGGER.info("HelloClient 请求参数[name]=" + name);
//调用接口
String result = client.sayHello(name);
LOGGER.info("Server 返回结果为" + result);
} catch (TException e) {
e.printStackTrace();
} finally {
transport.close();
}
}
}
Сервер:
public class SimpleServer {
private static final Logger LOGGER = Logger.getLogger(SimpleServer.class.getName());
public static void main(String[] args) {
try {
//监听端口9090
TServerSocket serverTransport = new TServerSocket(9090);
//使用二进制协议传输数据
TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
//关联处理器与HelloService服务实现
TProcessor processor = new HelloService.Processor(new HelloServiceImpl());
TSimpleServer.Args serverArgs = new TSimpleServer.Args(serverTransport);
serverArgs.processor(processor);
serverArgs.protocolFactory(proFactory);
//使用TSimpleServer服务端
TServer server = new TSimpleServer(serverArgs);
LOGGER.info("Start SimpleServer on port 9090...");
//启动服务
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}
Processor
заHelloService
внутренний класс, вызывающийHelloService.Processor(new HelloServiceImpl())
создастprocessMap
,key
имя интерфейса,value
вызвать объект для метода, а затемTBaseProcessor.process()
сквозьprocessMap
провестиprocessMap.get(接口名称)
Операция получает интерфейс.
4. Анализ исходного кода TSimpleServer
TSimpleServer
наследоватьTServer
, выполненоTServer
изserve()
а такжеstop()
метод.
public class TSimpleServer extends TServer {
private static final Logger LOGGER = LoggerFactory.getLogger(TSimpleServer.class.getName());
public TSimpleServer(AbstractServerArgs args) {
super(args);
}
/**
* 启动服务
*/
public void serve() {
try {
//监听端口
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
return;
}
// Run the preServe event
if (eventHandler_ != null) {
eventHandler_.preServe();
}
//开启服务
setServing(true);
//循环等待客户端请求
while (!stopped_) {
TTransport client = null;
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
ServerContext connectionContext = null;
try {
//接受连接
client = serverTransport_.accept();
if (client != null) {
//TProcessorFactory处理器
processor = processorFactory_.getProcessor(client);
//获取客户端输入通道
inputTransport = inputTransportFactory_.getTransport(client);
//获取客户端输出通道
outputTransport = outputTransportFactory_.getTransport(client);
//获取客户端输入协议
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
//获取客户端输出协议
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
if (eventHandler_ != null) {
connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
}
//处理请求
while (true) {
if (eventHandler_ != null) {
eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
//处理请求
processor.process(inputProtocol, outputProtocol);
}
}
} catch (TTransportException ttx) {
// Client died, just move on
} catch (TException tx) {
if (!stopped_) {
LOGGER.error("Thrift error occurred during processing of message.", tx);
}
} catch (Exception x) {
if (!stopped_) {
LOGGER.error("Error occurred during processing of message.", x);
}
}
if (eventHandler_ != null) {
//删除事件
eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
}
//关闭输入通道
if (inputTransport != null) {
inputTransport.close();
}
//关闭输出通道
if (outputTransport != null) {
outputTransport.close();
}
}
//关闭服务
setServing(false);
}
/**
* 停止服务
*/
public void stop() {
stopped_ = true;
serverTransport_.interrupt();
}
}
TBaseProcessor.process()
: вызов интерфейса для обработки запроса. Сначала получите информацию о запросе, включая параметры запроса, имя вызывающей функции и т. д., а затем получите соответствующий вызывающий объект интерфейса из processMap в соответствии с именем вызывающей функции, чтобы вызвать интерфейс для обработки запроса.
public void process(TProtocol in, TProtocol out) throws TException {
//获取请求信息:参数、调用函数名等
TMessage msg = in.readMessageBegin();
//根据函数名获取处理函数
ProcessFunction fn = processMap.get(msg.name);
//异常处理
if (fn == null) {
TProtocolUtil.skip(in, TType.STRUCT);
in.readMessageEnd();
TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
} else {
//处理请求
fn.process(msg.seqid, in, out, iface);
}
}
ProcessFunction
является абстрактным классом, и подклассы также основаны наIDL
генерируется автоматически, сIDL
Функции во взаимно однозначном соответствии являются прокси-процессорами.
ProcessFunction.process()
: вызов интерфейса для обработки бизнес-запроса и возврата результата. Сначала инкапсулируйте параметры запроса, вызовите соответствующий интерфейс в соответствии с параметром и определением интерфейса и получите результат обработки запроса, а затем верните результат обработки запроса клиенту.
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
//获取一个空的参数封装
T args = getEmptyArgsInstance();
try {
//从inputProtocol中获取参数赋给args
args.read(iprot);
} catch (TProtocolException e) {
//异常处理
}
iprot.readMessageEnd();
TSerializable result = null;
byte msgType = TMessageType.REPLY;
try {
//根据参数args调用接口
result = getResult(iface, args);
} catch (TTransportException ex) {
//异常处理
}
if(!isOneway()) {
//输出调用结果到outputProtocol
oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid));
result.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
}
}
5. Временная диаграмма
6. Неадекватность
TSimpleServer
может обрабатывать только один за разsocket
связь менее эффективна.
TThreadPoolServer
1. Особенности
Многопоточный, блокирующий ввод-вывод.
2. Дизайн-мышление
Основной поток отвечает за блокировку слушателяsocket
, когда естьsocket
Когда все будет готово, инкапсулируйте его вWorkerProcess
Объект передается в пул потоков, а пул потоков отвечает за чтение и запись данных, а также бизнес-обработку и возвращает результат клиенту.
线程池默认最小线程数为5,最大线程数为Integer.MAX_VALUE。
3. Используйте
Клиент тот же
TSimpleServer
.
Сервер:
public class ThreadPoolServer {
private static final Logger LOGGER = Logger.getLogger(ThreadPoolServer.class.getName());
public static void main(String[] args) {
try {
//监听端口9090
TServerSocket serverTransport = new TServerSocket(9090);
//使用二进制协议传输数据
TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
//关联处理器与HelloService服务实现
TProcessor processor = new HelloService.Processor(new HelloServiceImpl());
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
serverArgs.processor(processor);
serverArgs.protocolFactory(proFactory);
//使用TThreadPoolServer服务端
TServer server = new TThreadPoolServer(serverArgs);
LOGGER.info("Start ThreadPoolServer on port 9090...");
//启动服务
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}
4. Анализ исходного кода
TThreadPoolServer
наследоватьTServer
, выполненоTServer
изserve()
а такжеstop()
метод.
public class TThreadPoolServer extends TServer {
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName());
//参数初始化
public static class Args extends AbstractServerArgs<Args> {
//线程池参数
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
public ExecutorService executorService;
public int stopTimeoutVal = 60;
public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
public int requestTimeout = 20;
public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
public int beBackoffSlotLength = 100;
public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS;
public Args(TServerTransport transport) {
super(transport);
}
public Args minWorkerThreads(int n) {
minWorkerThreads = n;
return this;
}
public Args maxWorkerThreads(int n) {
maxWorkerThreads = n;
return this;
}
public Args stopTimeoutVal(int n) {
stopTimeoutVal = n;
return this;
}
public Args stopTimeoutUnit(TimeUnit tu) {
stopTimeoutUnit = tu;
return this;
}
public Args requestTimeout(int n) {
requestTimeout = n;
return this;
}
public Args requestTimeoutUnit(TimeUnit tu) {
requestTimeoutUnit = tu;
return this;
}
//Binary exponential backoff slot length
public Args beBackoffSlotLength(int n) {
beBackoffSlotLength = n;
return this;
}
//Binary exponential backoff slot time unit
public Args beBackoffSlotLengthUnit(TimeUnit tu) {
beBackoffSlotLengthUnit = tu;
return this;
}
public Args executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
}
//工作线程池
private ExecutorService executorService_;
private final TimeUnit stopTimeoutUnit;
private final long stopTimeoutVal;
private final TimeUnit requestTimeoutUnit;
private final long requestTimeout;
private final long beBackoffSlotInMillis;
private Random random = new Random(System.currentTimeMillis());
//TThreadPoolServer构造函数会实例化一个线程池
public TThreadPoolServer(Args args) {
super(args);
stopTimeoutUnit = args.stopTimeoutUnit;
stopTimeoutVal = args.stopTimeoutVal;
requestTimeoutUnit = args.requestTimeoutUnit;
requestTimeout = args.requestTimeout;
beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength);
//实例化线程池(可以选择自己创建线程池后以参数形式传进来或者由TThreadPoolServer创建)
executorService_ = args.executorService != null ?
args.executorService : createDefaultExecutorService(args);
}
//创建线程池
private static ExecutorService createDefaultExecutorService(Args args) {
//线程池等待队列
SynchronousQueue<Runnable> executorQueue =
new SynchronousQueue<Runnable>();
return new ThreadPoolExecutor(args.minWorkerThreads,
args.maxWorkerThreads,
args.stopTimeoutVal,
args.stopTimeoutUnit,
executorQueue);
}
protected ExecutorService getExecutorService() {
return executorService_;
}
//开启服务器进行监听
protected boolean preServe() {
try {
//监听端口9090
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
return false;
}
// Run the preServe event
if (eventHandler_ != null) {
eventHandler_.preServe();
}
stopped_ = false;
//开启服务
setServing(true);
return true;
}
//启动服务
public void serve() {
if (!preServe()) {
return;
}
//处理请求
execute();
//服务停止后关闭线程池
waitForShutdown();
//关闭服务
setServing(false);
}
//处理请求
protected void execute() {
int failureCount = 0;
//循环等待请求
while (!stopped_) {
try {
//接受连接
TTransport client = serverTransport_.accept();
//将客户端请求封装成一个WorkerProcess对象后丢给线程池进行处理
WorkerProcess wp = new WorkerProcess(client);
//记录加入线程池的重试次数
int retryCount = 0;
//剩余的重试时间
long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
while(true) {
try {
//提交线程池处理请求
executorService_.execute(wp);
break;
} catch(Throwable t) {
//抛异常则重试
if (t instanceof RejectedExecutionException) {
retryCount++;
try {
if (remainTimeInMillis > 0) {
//do a truncated 20 binary exponential backoff sleep
long sleepTimeInMillis = ((long) (random.nextDouble() *
(1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
} else {
client.close();
wp = null;
LOGGER.warn("Task has been rejected by ExecutorService " + retryCount
+ " times till timedout, reason: " + t);
break;
}
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while waiting to place client on executor queue.");
Thread.currentThread().interrupt();
break;
}
} else if (t instanceof Error) {
LOGGER.error("ExecutorService threw error: " + t, t);
throw (Error)t;
} else {
//for other possible runtime errors from ExecutorService, should also not kill serve
LOGGER.warn("ExecutorService threw error: " + t, t);
break;
}
}
}
} catch (TTransportException ttx) {
if (!stopped_) {
++failureCount;
LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
}
}
}
}
//服务停止后关闭线程池
protected void waitForShutdown() {
//不再接受新的线程,等待之前提交的线程都处理完毕后关闭线程池
executorService_.shutdown();
long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
//阻塞,唤醒条件:所有任务执行完毕且shutdown请求被调用或timeoutMS时间到达或当前线程被中断
executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException ix) {
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
}
//停止服务
public void stop() {
stopped_ = true;
serverTransport_.interrupt();
}
//WorkerProcess实现Runnable,在run()方法中进行具体的业务处理
private class WorkerProcess implements Runnable {
private TTransport client_;
private WorkerProcess(TTransport client) {
client_ = client;
}
//具体的业务处理(其实就是将TSimpleServer中业务处理部分剥离出来放到run()方法中)
public void run() {
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
TServerEventHandler eventHandler = null;
ServerContext connectionContext = null;
try {
processor = processorFactory_.getProcessor(client_);
inputTransport = inputTransportFactory_.getTransport(client_);
outputTransport = outputTransportFactory_.getTransport(client_);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
eventHandler = getEventHandler();
if (eventHandler != null) {
connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
}
while (true) {
if (eventHandler != null) {
eventHandler.processContext(connectionContext, inputTransport, outputTransport);
}
if (stopped_) {
break;
}
processor.process(inputProtocol, outputProtocol);
}
} catch (Exception x) {
if (!isIgnorableException(x)) {
LOGGER.error((x instanceof TException? "Thrift " : "") + "Error occurred during processing of message.", x);
}
} finally {
if (eventHandler != null) {
eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
}
if (inputTransport != null) {
inputTransport.close();
}
if (outputTransport != null) {
outputTransport.close();
}
if (client_.isOpen()) {
client_.close();
}
}
}
... ...
}
}
5. Преимущества
TThreadPoolServer
существуетTSimpleServer
Основываясь на введении пула потоков, основной поток отвечает за мониторинг, пул потоков отвечает за чтение и запись данных и конкретную бизнес-обработку, а также может принимать соединения вовремя, когда параллелизм увеличивается.
Это подходит для ситуации, когда сервер может предсказать максимальное количество одновременных клиентов.
6. Неадекватность
TThreadPoolServer
Это по-прежнему метод блокировки для мониторинга клиентских подключений, а возможности бизнес-обработки ограничены пулом потоков.Когда количество одновременных запросов превышает количество потоков в пуле потоков, новые запросы могут только блокироваться и ждать.
TNonblockingServer
1. Особенности
Однопоточный неблокирующий ввод-вывод.
2. Дизайн-мышление
TNonblockingServer
создалSelectAcceptThread
нить, черезNIO
режим монитора несколькоsocket
и читать и записывать данные.
-
поставив
socket
зарегистрироваться наSelector
Реализуйте поток для мониторинга несколькихsocket
,каждый разSelector.select()
Конец цикла возвращает все готовоsocket
:- для чтения
socket
После считывания данных запускается функция обратного вызова для выполнения соответствующей бизнес-обработки, и результат обработки возвращается клиенту; - для записи
socket
выполнять операции записи данных; - Принимайте запросы клиентов на подключение и регистрируйтесь на
Selector
начальство;
- для чтения
3. Используйте
Клиент:
public class HelloClient {
private static final Logger LOGGER = Logger.getLogger(HelloClient.class.getName());
public static void main(String[] args) {
TTransport transport = null;
try {
//传输层使用非阻塞I/O
transport = new TFramedTransport.Factory().getTransport(new TSocket("127.0.0.1", 9090));
transport.open();
//使用二进制协议传输数据
TProtocol protocol = new TBinaryProtocol(transport);
//使用同步客户端
HelloService.Client client = new HelloService.Client(protocol);
String name = "XuDT";
LOGGER.info("HelloClient 请求参数[name]=" + name);
//调用接口
String result = client.sayHello(name);
LOGGER.info("Server 返回结果为" + result);
} catch (TException e) {
e.printStackTrace();
} finally {
transport.close();
}
}
}
Сервер:
public class NonblockingServer {
private static final Logger LOGGER = Logger.getLogger(NonblockingServer.class.getName());
public static void main(String[] args) {
try {
//监听端口9090
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
//使用二进制协议传输数据
TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
//关联处理器与HelloService服务实现
TProcessor processor = new HelloService.Processor(new HelloServiceImpl());
TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
serverArgs.processor(processor);
serverArgs.protocolFactory(proFactory);
serverArgs.transportFactory(new TFramedTransport.Factory());
//使用TNonblockingServer服务端
TServer server = new TNonblockingServer(serverArgs);
LOGGER.info("Start NonblockingServer on port 9090...");
//启动服务
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}
TNonblockingServer传输层只能使用TFramedTransport。
4. Анализ исходного кода
TNonblockingServer
наследоватьAbstractNonblockingServer
,AbstractNonblockingServer
наследоватьTServer
.
AbstractNonblockingServer
:
public abstract class AbstractNonblockingServer extends TServer {
protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
public long maxReadBufferBytes = 256 * 1024 * 1024;
public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
super(transport);
transportFactory(new TFramedTransport.Factory());
}
}
final long MAX_READ_BUFFER_BYTES;
final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
super(args);
MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
}
/**
* 启动服务
*/
public void serve() {
//启动SelectAcceptThread线程
if (!startThreads()) {
return;
}
//启动监听
if (!startListening()) {
return;
}
//开启服务
setServing(true);
//阻塞等待请求并处理
waitForShutdown();
//关闭服务
setServing(false);
//停止监听,关闭ServerSocket
stopListening();
}
//启动SelectAcceptThread线程
protected abstract boolean startThreads();
//阻塞等待请求并处理
protected abstract void waitForShutdown();
//启动监听
protected boolean startListening() {
try {
serverTransport_.listen();
return true;
} catch (TTransportException ttx) {
LOGGER.error("Failed to start listening on server socket!", ttx);
return false;
}
}
//停止监听,关闭ServerSocket
protected void stopListening() {
serverTransport_.close();
}
//业务处理回调方法
protected abstract boolean requestInvoke(FrameBuffer frameBuffer);
/**
* AbstractSelectThread继承了Thread
*/
protected abstract class AbstractSelectThread extends Thread {
protected Selector selector;
protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
public AbstractSelectThread() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
}
//唤醒阻塞在Selector.select()上的线程
public void wakeupSelector() {
selector.wakeup();
}
public void requestSelectInterestChange(FrameBuffer frameBuffer) {
synchronized (selectInterestChanges) {
selectInterestChanges.add(frameBuffer);
}
selector.wakeup();
}
//改变事件所感兴趣的类型,比如从读转为写(读取了客户端的请求,执行完相应的业务处理后,要将数据返回给客户端),从写转为读(要将数据返回给客户端后,重新开始接受客户端新的的请求)
protected void processInterestChanges() {
synchronized (selectInterestChanges) {
for (FrameBuffer fb : selectInterestChanges) {
fb.changeSelectInterests();
}
selectInterestChanges.clear();
}
}
//读取客户端数据
protected void handleRead(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
//读取客户端数据失败,则清除该selectionKey
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
//读取客户端数据成功
if (buffer.isFrameFullyRead()) {
//触发回调(调用相应的方法进行业务处理)
if (!requestInvoke(buffer)) {
//清除该selectionKey
cleanupSelectionKey(key);
}
}
}
//向客户端写入数据
protected void handleWrite(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.write()) {
cleanupSelectionKey(key);
}
}
//清除selectionKey
protected void cleanupSelectionKey(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (buffer != null) {
buffer.close();
}
key.cancel();
}
}
... ...
}
TNonblockingServer
:
public class TNonblockingServer extends AbstractNonblockingServer {
public static class Args extends AbstractNonblockingServerArgs<Args> {
public Args(TNonblockingServerTransport transport) {
super(transport);
}
}
//监听线程
private SelectAcceptThread selectAcceptThread_;
public TNonblockingServer(AbstractNonblockingServerArgs args) {
super(args);
}
/**
* 重写AbstractNonblockingServer.startThreads()方法,开启线程处理客户端请求
*/
@Override
protected boolean startThreads() {
try {
//实例化selectAcceptThread_
selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
//启动线程
selectAcceptThread_.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start selector thread!", e);
return false;
}
}
//重写AbstractNonblockingServer.waitForShutdown(),主线程阻塞等待SelectAcceptThread线程返回
@Override
protected void waitForShutdown() {
joinSelector();
}
/**
* 启动Selector监听线程
*/
protected void joinSelector() {
try {
//主线程阻塞等待selectAcceptThread线程返回
selectAcceptThread_.join();
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for accept thread", e);
Thread.currentThread().interrupt();
}
}
/**
* 停止服务
*/
@Override
public void stop() {
stopped_ = true;
if (selectAcceptThread_ != null) {
selectAcceptThread_.wakeupSelector();
}
}
/**
* 客户端可读时触发的回调方法,具体回调操作定义在AbstractNonblockingServer的内部类FrameBuffer.invoke(),invoke()通过处理器TProcessorFactory的processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_)方法进行连接数据读取、接口调用、处理结果返回客户端等
*/
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
frameBuffer.invoke();
return true;
}
public boolean isStopped() {
return selectAcceptThread_.isStopped();
}
/**
* SelectAcceptThread继承了AbstractSelectThread,AbstractSelectThread继承了Thread,AbstractSelectThread是AbstractNonblockingServer的内部类
*/
protected class SelectAcceptThread extends AbstractSelectThread {
//服务端传输通道serverTransport
private final TNonblockingServerTransport serverTransport;
public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
throws IOException {
this.serverTransport = serverTransport;
//将服务端通道serverTransport注册到Selector实现一个线程监听多个通道,Selector定义在AbstractSelectThread
serverTransport.registerSelector(selector);
}
public boolean isStopped() {
return stopped_;
}
/**
* 处理请求
*/
public void run() {
try {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
//循环等待请求
while (!stopped_) {
//阻塞监听所有注册在Selector上的socket,并处理就绪socket
select();
//处理读写事件切换
processInterestChanges();
}
//服务端停止后删除Selector中的所有监听的selectionKey
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
stopped_ = true;
}
}
/**
* 阻塞监听所有注册在Selector上的socket
*/
private void select() {
try {
//阻塞监听请求,每次select()结束获取所有就绪socket
selector.select();
//获取所有就绪socket
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
//处理就绪socket
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
if (key.isAcceptable()) {
//接受连接
handleAccept();
} else if (key.isReadable()) {
//读取数据
handleRead(key);
} else if (key.isWritable()) {
//写数据
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey,
final AbstractSelectThread selectThread) {
return processorFactory_.isAsyncProcessor() ?
new AsyncFrameBuffer(trans, selectionKey, selectThread) :
new FrameBuffer(trans, selectionKey, selectThread);
}
/**
* 处理客户端连接请求
*/
private void handleAccept() throws IOException {
SelectionKey clientKey = null;
TNonblockingTransport client = null;
try {
//接受客户端连接请求
client = (TNonblockingTransport)serverTransport.accept();
//将客户端连接注册到Selector上,为了后续处理该客户端连接上的请求
clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
//实例化FrameBuffer
FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);
//将frameBuffer添加到clientKey,后续handleRead()和handleWrite()读写数据都是基于这个frameBuffer
clientKey.attach(frameBuffer);
} catch (TTransportException tte) {
// something went wrong accepting.
LOGGER.warn("Exception trying to accept!", tte);
if (clientKey != null) cleanupSelectionKey(clientKey);
if (client != null) client.close();
}
}
}
}
5. Преимущества
Благодаря мультиплексированию ввода-вывода один поток контролирует несколькоsocket
.
6. Неадекватность
TNonblockingServer
Чтение и запись данных, а также бизнес-обработка выполняются с использованием однопоточной блокировки, когда бизнес-обработка сложна или требует много времени, сервисы будут заблокированы, а эффективность невысока.
THsHaServer
1. Особенности
Полусинхронный и полуасинхронный, неблокирующий ввод-вывод.
2. Дизайн-мышление
Синхронный неблокирующий: черезSelectAcceptThread
Потоки реализуют мониторинг (неблокирующий ввод-вывод), чтение и запись данных (синхронизацию) через NIO.
Асинхронный: бизнес-обработка выполняется через пул потоков.
3. Используйте
Клиент тот же
TNonblockingServer
.
Сервер:
public class HsHaServer {
private static final Logger LOGGER = Logger.getLogger(HsHaServer.class.getName());
public static void main(String[] args) {
try {
//监听端口9090
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
//使用二进制协议传输数据
TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
//关联处理器与HelloService服务实现
TProcessor processor = new HelloService.Processor(new HelloServiceImpl());
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
serverArgs.processor(processor);
serverArgs.protocolFactory(proFactory);
serverArgs.transportFactory(new TFramedTransport.Factory());
//使用THsHaServer服务端
TServer server = new THsHaServer(serverArgs);
LOGGER.info("Start HsHaServer on port 9090...");
//启动服务
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}
4. Анализ исходного кода
THsHaServer
наследоватьTNonblockingServer
.
public class THsHaServer extends TNonblockingServer {
public static class Args extends AbstractNonblockingServerArgs<Args> {
//设置线程池参数
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
private int stopTimeoutVal = 60;
private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
private ExecutorService executorService = null;
public Args(TNonblockingServerTransport transport) {
super(transport);
}
//设置线程池最少线程数
public Args minWorkerThreads(int n) {
minWorkerThreads = n;
return this;
}
//设置线程池最大线程数
public Args maxWorkerThreads(int n) {
maxWorkerThreads = n;
return this;
}
//获取线程池最少线程数
public int getMinWorkerThreads() {
return minWorkerThreads;
}
//获取线程池最大线程数
public int getMaxWorkerThreads() {
return maxWorkerThreads;
}
public int getStopTimeoutVal() {
return stopTimeoutVal;
}
public Args stopTimeoutVal(int stopTimeoutVal) {
this.stopTimeoutVal = stopTimeoutVal;
return this;
}
public TimeUnit getStopTimeoutUnit() {
return stopTimeoutUnit;
}
public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
this.stopTimeoutUnit = stopTimeoutUnit;
return this;
}
public ExecutorService getExecutorService() {
return executorService;
}
public Args executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
}
//指向工作线程池,方便Selector调用工作线程
private final ExecutorService invoker;
private final Args args;
public THsHaServer(Args args) {
super(args);
//线程池可以自己定义后以参数的形式传入或者由THsHaServer创建
invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
this.args = args;
}
//重写AbstractNonblockingServer.waitForShutdown()
@Override
protected void waitForShutdown() {
//主线程阻塞等待SelectAcceptThread线程返回,调用的是TNonblockingServer.joinSelector()
joinSelector();
//关闭线程池
gracefullyShutdownInvokerPool();
}
//创建线程池
protected static ExecutorService createInvokerPool(Args options) {
int minWorkerThreads = options.minWorkerThreads;
int maxWorkerThreads = options.maxWorkerThreads;
int stopTimeoutVal = options.stopTimeoutVal;
TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
return invoker;
}
protected ExecutorService getInvoker() {
return invoker;
}
//关闭线程池
protected void gracefullyShutdownInvokerPool() {
//shutdown()会等待正在处理的任务结束后再关闭线程池
invoker.shutdown();
long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
//awaitTermination()会一直等待直到线程池状态为TERMINATED或者超时时间到
invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException ix) {
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
}
//重写AbstractNonblockingServer.requestInvoke(),调用线程池执行业务处理
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
try {
//获取要执行的业务处理
Runnable invocation = getRunnable(frameBuffer);
//执行业务处理
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
}
protected Runnable getRunnable(FrameBuffer frameBuffer){
return new Invocation(frameBuffer);
}
}
class Invocation implements Runnable {
private final FrameBuffer frameBuffer;
public Invocation(final FrameBuffer frameBuffer) {
this.frameBuffer = frameBuffer;
}
public void run() {
//调用的是AbstractNonblockingServer的内部类FrameBuffer..invoke()
frameBuffer.invoke();
}
}
5. Преимущества
THsHaServer
существуетTNonblockingServer
На базе реализован пул потоков, когда чтение данных завершено, они передаются в пул потоков для бизнес-обработки, а основной поток возвращается непосредственно для мониторинга следующего цикла, что значительно повышает эффективность.
6. Неадекватность
THsHaServer
все ещеSelectAcceptThread
Потоки отвечают за мониторинг, чтение и запись данных, и при большом количестве одновременных запросов запросы могут быть не приняты вовремя.
TThreadedSelectorServer
1. Особенности
Многопоточный неблокирующий ввод-вывод.
2. Дизайн-мышление
Раздельный мониторинг, чтение и запись данных и бизнес-обработка:
- Один
AcceptThread
Объект потока специально используется для прослушивания клиентских запросов на подключение; - несколько
SelectorThread
Объекты потоков (по умолчанию 2) используются для чтения и записи данных; - балансировщик нагрузки
SelectorThreadLoadBalancer
объект используется дляAcceptThread
Новые запросы на соединение, полученные потоком, назначаютсяSelectorThread
нить. - Один
ExecutorService
Пул потоков используется для бизнес-обработки. когдаSelectorThread
Метод обратного вызова запускается после того, как объект потока прочитал данныеrequestInvoke()
Передайте запрос в пул потоков для бизнес-обработки.
3. Используйте
Клиент тот же
TNonblockingServer
.
Сервер:
public class ThreadedSelectorServer {
private static final Logger LOGGER = Logger.getLogger(ThreadedSelectorServer.class.getName());
public static void main(String[] args) {
try {
//监听端口9090
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
//使用二进制协议传输数据
TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
//关联处理器与HelloService服务实现
TProcessor processor = new HelloService.Processor(new HelloServiceImpl());
TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
serverArgs.processor(processor);
serverArgs.protocolFactory(proFactory);
//使用TThreadedSelectorServer服务端
TServer server = new TThreadedSelectorServer(serverArgs);
LOGGER.info("Start ThreadedSelectorServer on port 9090...");
//启动服务,调用到的是AbstractNonblockingServer.serve()
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}
4. Анализ исходного кода
TThreadedSelectorServer
наследоватьAbstractNonblockingServer
, который содержит внутренний классAcceptThread
,SelectorThread
,SelectorThreadLoadBalancer
.
TThreadedSelectorServer
:
public class TThreadedSelectorServer extends AbstractNonblockingServer {
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName());
public static class Args extends AbstractNonblockingServerArgs<Args> {
//selectorThreads线程数
public int selectorThreads = 2;
//工作线程池核心线程数,如果设置为0则业务处理会交由selectorThreads负责
private int workerThreads = 5;
private int stopTimeoutVal = 60;
private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
//工作线程池
private ExecutorService executorService = null;
//selectorThreads缓存队列大小
private int acceptQueueSizePerThread = 4;
public static enum AcceptPolicy {
FAIR_ACCEPT,
//立即接受请求
FAST_ACCEPT
}
private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;
public Args(TNonblockingServerTransport transport) {
super(transport);
}
public Args selectorThreads(int i) {
selectorThreads = i;
return this;
}
public int getSelectorThreads() {
return selectorThreads;
}
public Args workerThreads(int i) {
workerThreads = i;
return this;
}
public int getWorkerThreads() {
return workerThreads;
}
public int getStopTimeoutVal() {
return stopTimeoutVal;
}
public Args stopTimeoutVal(int stopTimeoutVal) {
this.stopTimeoutVal = stopTimeoutVal;
return this;
}
public TimeUnit getStopTimeoutUnit() {
return stopTimeoutUnit;
}
public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
this.stopTimeoutUnit = stopTimeoutUnit;
return this;
}
public ExecutorService getExecutorService() {
return executorService;
}
public Args executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
public int getAcceptQueueSizePerThread() {
return acceptQueueSizePerThread;
}
public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) {
this.acceptQueueSizePerThread = acceptQueueSizePerThread;
return this;
}
public AcceptPolicy getAcceptPolicy() {
return acceptPolicy;
}
public Args acceptPolicy(AcceptPolicy acceptPolicy) {
this.acceptPolicy = acceptPolicy;
return this;
}
//参数校验
public void validate() {
if (selectorThreads <= 0) {
throw new IllegalArgumentException("selectorThreads must be positive.");
}
if (workerThreads < 0) {
throw new IllegalArgumentException("workerThreads must be non-negative.");
}
if (acceptQueueSizePerThread <= 0) {
throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
}
}
}
//监听线程
private AcceptThread acceptThread;
//数据读写线程
private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
//工作线程池
private final ExecutorService invoker;
private final Args args;
public TThreadedSelectorServer(Args args) {
super(args);
//参数校验
args.validate();
//线程池可以自己定义后以参数的形式传入或者由TThreadedSelectorServer创建
invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
this.args = args;
}
//重写AbstractNonblockingServer.startThreads(),启动AcceptThread、SelectorThread线程
@Override
protected boolean startThreads() {
try {
//实例化SelectorThread线程并放进一个Set集合中
for (int i = 0; i < args.selectorThreads; ++i) {
selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
}
//实例化AcceptThread线程
acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
createSelectorThreadLoadBalancer(selectorThreads));
//启动SelectorThread线程
for (SelectorThread thread : selectorThreads) {
thread.start();
}
//启动AcceptThread线程
acceptThread.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start threads!", e);
return false;
}
}
//重写AbstractNonblockingServer.waitForShutdown()
@Override
protected void waitForShutdown() {
try {
//阻塞主线程等待AcceptThread、SelectorThread线程返回
joinThreads();
} catch (InterruptedException e) {
LOGGER.error("Interrupted while joining threads!", e);
}
//关闭线程池
gracefullyShutdownInvokerPool();
}
//阻塞主线程等待AcceptThread、SelectorThread线程返回
protected void joinThreads() throws InterruptedException {
acceptThread.join();
for (SelectorThread thread : selectorThreads) {
thread.join();
}
}
//重写AbstractNonblockingServer.stop(),wakeupSelector()调用了Selector.wakeup(),将会唤醒Selector执行select()时阻塞的线程,进行新的循环,Selector.select()的循环条件是while (!stopped_ ),stopped_已被设置为true,所以新的一次循环将会跳出,从而停止SelectorThread、AcceptThread线程
@Override
public void stop() {
stopped_ = true;
//停止监听
stopListening();
//关闭AcceptThread线程
if (acceptThread != null) {
//wakeupSelector()
acceptThread.wakeupSelector();
}
//关闭SelectorThread线程
if (selectorThreads != null) {
for (SelectorThread thread : selectorThreads) {
if (thread != null)
thread.wakeupSelector();
}
}
}
//关闭线程池
protected void gracefullyShutdownInvokerPool() {
invoker.shutdown();
long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException ix) {
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
}
//重写AbstractNonblockingServer.requestInvoke(),调用线程池执行业务处理
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
Runnable invocation = getRunnable(frameBuffer);
if (invoker != null) {
try {
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
} else {
invocation.run();
return true;
}
}
protected Runnable getRunnable(FrameBuffer frameBuffer) {
return new Invocation(frameBuffer);
}
//创建线程池
protected static ExecutorService createDefaultExecutor(Args options) {
return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null;
}
private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) {
if (queueSize == 0) {
return new LinkedBlockingQueue<TNonblockingTransport>();
}
return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
}
//创建一个负载均衡器
protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
return new SelectorThreadLoadBalancer(threads);
}
}
AcceptThread
: слушать клиентские подключения
//监听连接线程
protected class AcceptThread extends Thread {
//获取客户端传输通道
private final TNonblockingServerTransport serverTransport;
//NIO选择器
private final Selector acceptSelector;
//负载均衡器
private final SelectorThreadLoadBalancer threadChooser;
public AcceptThread(TNonblockingServerTransport serverTransport,
SelectorThreadLoadBalancer threadChooser) throws IOException {
this.serverTransport = serverTransport;
this.threadChooser = threadChooser;
//实例化Selector
this.acceptSelector = SelectorProvider.provider().openSelector();
//将客户端传输通道注册到Selector上
this.serverTransport.registerSelector(acceptSelector);
}
//监听客户端请求
public void run() {
try {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
//循环监听等待客户端请求
while (!stopped_) {
select();
}
} catch (Throwable t) {
LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
} finally {
try {
acceptSelector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing accept selector!", e);
}
TThreadedSelectorServer.this.stop();
}
}
//关闭线程
public void wakeupSelector() {
acceptSelector.wakeup();
}
//监听客户端请求
private void select() {
try {
//接收客户端请求
acceptSelector.select();
Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
//如果是连接请求则调用handleAccept()进行处理,否则不做任何处理
if (key.isAcceptable()) {
handleAccept();
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
//接受客户端连接请求
private void handleAccept() {
//获取客户端传输通道
final TNonblockingTransport client = doAccept();
if (client != null) {
final SelectorThread targetThread = threadChooser.nextThread();
//接受连接
if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
//
doAddAccept(targetThread, client);
} else {
try {
invoker.submit(new Runnable() {
public void run() {
doAddAccept(targetThread, client);
}
});
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected accept registration!", rx);
client.close();
}
}
}
}
//获取客户端传输通道
private TNonblockingTransport doAccept() {
try {
return (TNonblockingTransport) serverTransport.accept();
} catch (TTransportException tte) {
LOGGER.warn("Exception trying to accept!", tte);
return null;
}
}
//接受连接(将客户端通道加入SelectorThread的缓存队列中)
private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
//调用SelectorThread.addAcceptedConnection()
if (!thread.addAcceptedConnection(client)) {
client.close();
}
}
}
SelectorThread
: прослушивание событий чтения и записи
//读写数据线程
protected class SelectorThread extends AbstractSelectThread {
//缓存队列
private final BlockingQueue<TNonblockingTransport> acceptedQueue;
public SelectorThread() throws IOException {
this(new LinkedBlockingQueue<TNonblockingTransport>());
}
public SelectorThread(int maxPendingAccepts) throws IOException {
this(createDefaultAcceptQueue(maxPendingAccepts));
}
public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException {
this.acceptedQueue = acceptedQueue;
}
//将客户端通道加入SelectorThread的缓存队列中(如果队列满了将会阻塞线程)
public boolean addAcceptedConnection(TNonblockingTransport accepted) {
try {
//通过队列的put()方法放入队列中
acceptedQueue.put(accepted);
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while adding accepted connection!", e);
return false;
}
//唤醒阻塞在Selector.select()上的线程
selector.wakeup();
return true;
}
//读写数据
public void run() {
try {
//循环监听I/O事件
while (!stopped_) {
//处理就绪的可读写事件
select();
//处理客户端连接
processAcceptedConnections();
//监听客户端读写切换,调用的是AbstractNonblockingServer.processInterestChanges()
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
TThreadedSelectorServer.this.stop();
}
}
//处理就绪的可读写事件
private void select() {
try {
doSelect();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
if (key.isReadable()) {
//处理可读事件
handleRead(key);
} else if (key.isWritable()) {
//处理可写事件
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
//解决epoll空轮询导致的CPU 100%bug
private void doSelect() throws IOException {
long beforeSelect = System.currentTimeMillis();
int selectedNums = selector.select();
long afterSelect = System.currentTimeMillis();
if (selectedNums == 0) {
jvmBug++;
} else {
jvmBug = 0;
}
long selectedTime = afterSelect - beforeSelect;
if (selectedTime >= MONITOR_PERIOD) {
jvmBug = 0;
} else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug);
rebuildSelector();
selector.selectNow();
jvmBug = 0;
}
}
private synchronized void rebuildSelector() {
final Selector oldSelector = selector;
if (oldSelector == null) {
return;
}
Selector newSelector = null;
try {
newSelector = Selector.open();
LOGGER.warn("Created new Selector.");
} catch (IOException e) {
LOGGER.error("Create new Selector error.", e);
}
for (SelectionKey key : oldSelector.selectedKeys()) {
if (!key.isValid() && key.readyOps() == 0)
continue;
SelectableChannel channel = key.channel();
Object attachment = key.attachment();
try {
if (attachment == null) {
channel.register(newSelector, key.readyOps());
} else {
channel.register(newSelector, key.readyOps(), attachment);
}
} catch (ClosedChannelException e) {
LOGGER.error("Register new selector key error.", e);
}
}
selector = newSelector;
try {
oldSelector.close();
} catch (IOException e) {
LOGGER.error("Close old selector error.", e);
}
LOGGER.warn("Replace new selector success.");
}
//处理客户端连接
private void processAcceptedConnections() {
//循环取出缓存队列中的客户端传输通道,并注册到Selector上
while (!stopped_) {
TNonblockingTransport accepted = acceptedQueue.poll();
if (accepted == null) {
break;
}
registerAccepted(accepted);
}
}
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey,
final AbstractSelectThread selectThread) {
return processorFactory_.isAsyncProcessor() ?
new AsyncFrameBuffer(trans, selectionKey, selectThread) :
new FrameBuffer(trans, selectionKey, selectThread);
}
//将客户端传输通道注册到Selector上
private void registerAccepted(TNonblockingTransport accepted) {
SelectionKey clientKey = null;
try {
//将客户端传输通道注册到Selector上
clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
//创建一个FrameBuffer,后续这个客户端传输通道上的数据读写都是基于这个FrameBuffer
FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);
clientKey.attach(frameBuffer);
} catch (IOException e) {
LOGGER.warn("Failed to register accepted connection to selector!", e);
if (clientKey != null) {
cleanupSelectionKey(clientKey);
}
accepted.close();
}
}
}
SelectorThreadLoadBalancer
: балансировщик нагрузки
//负载均衡器
protected static class SelectorThreadLoadBalancer {
private final Collection<? extends SelectorThread> threads;
private Iterator<? extends SelectorThread> nextThreadIterator;
//实例化一个SelectorThreadLoadBalancer,实例化时会将SelectorThread以参数形式传进来
public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
if (threads.isEmpty()) {
throw new IllegalArgumentException("At least one selector thread is required");
}
this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
nextThreadIterator = this.threads.iterator();
}
//SelectorThreadLoadBalancer通过轮询的方式来选择SelectorThread线程
public SelectorThread nextThread() {
if (!nextThreadIterator.hasNext()) {
nextThreadIterator = threads.iterator();
}
return nextThreadIterator.next();
}
}
5. Преимущества
TThreadedSelectorServer
даThrift
превосходная степеньServer
, который разделяет мониторинг, чтение и запись данных и бизнес-обработку:
- есть специальный
AcceptThread
Потоки используются для обработки запросов на подключение и могут своевременно обрабатывать запросы на подключение; - Распределите операции ввода-вывода по нескольким
SelectorThread
Thread, который может быстро читать и записывать данные; - Через пул потоков для бизнес-обработки улучшите возможность параллельной обработки.
Сравнение каждого сервера
-
TSimpleServer
: один поток блокирует ввод-вывод, а основной поток отвечает за мониторинг, чтение и запись данных и бизнес-обработку. -
TThreadPoolServer
: многопоточный блокирующий ввод-вывод, основной поток отвечает за мониторинг, а пул потоков отвечает за чтение и запись данных и бизнес-обработку. -
TNonblockingServer
: однопоточный неблокирующий ввод-вывод, используемый основным потокомNIO
Отвечает за мониторинг (неблокирующий ввод-вывод), чтение и запись данных и бизнес-обработку. -
THsHaServer
: полусинхронный и полуасинхронный, основной поток использует NIO для мониторинга (неблокирующий ввод-вывод), чтения и записи данных, а пул потоков отвечает за бизнес-обработку. -
TThreadedSelectorServer
: многопоточный неблокирующий ввод/вывод,AcceptThread
Потоки используют NIO для мониторинга (неблокирующий ввод-вывод), несколькоSelectorThread
Потоки отвечают за чтение и запись данных, а пул потоков отвечает за бизнес-обработку.
Пример бережливого использования
- Создайте файл интерфейса службы
(HelloService.thrift
):HelloService
включена 1 услугаsayHello
метод.
namespace java com.xudt.thrift.service
service HelloService {
string sayHello(1:string name)
}
-
http://thrift.apache.org/download
скачатьThrift IDL
переводчик. -
будет
thrift exe
а такжеHelloService.thrift
в той же папке,cmd
зайдите в эту папку и выполните командуthrift-0.13.0.exe -gen java hello.thrift
, который генерирует сервисный интерфейсHelloService.java
документ. -
Создавать
thrift-demo Maven
Проект является родительским модулем, а затем создаются 4 подмодуля:
-
thrift-demo-interface
: место храненияHelloService.thrift
генерируется служебным файломHelloService.java
код. -
thrift-demo-service
: реализует интерфейс службы. -
thrift-demo-server
:Сервис-терминал. -
thrift-demo-client
: Клиент.
Сгенерировать сервисный интерфейс.java
Файл содержит:
- сервер синхронизации
Iface
- Асинхронный сервер
AsyncIface
- Клиент синхронизации
Client
- Асинхронный клиент
AsyncClient
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-04")
public class HelloService {
/**
* 同步服务端
*/
public interface Iface {
public String sayHello(String name) throws org.apache.thrift.TException;
}
/**
* 异步服务端
*/
public interface AsyncIface {
public void sayHello(String name, org.apache.thrift.async.AsyncMethodCallback<String> resultHandler) throws org.apache.thrift.TException;
}
/**
* 同步客户端
*/
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
public Factory() {}
public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
return new Client(prot);
}
public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
return new Client(iprot, oprot);
}
}
public Client(org.apache.thrift.protocol.TProtocol prot) {
super(prot, prot);
}
public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
super(iprot, oprot);
}
//调用sayHello接口并接收处理结果
public String sayHello(String name) throws org.apache.thrift.TException {
send_sayHello(name);
return recv_sayHello();
}
//发送接口调用请求
public void send_sayHello(String name) throws org.apache.thrift.TException {
sayHello_args args = new sayHello_args();
args.setName(name);
sendBase("sayHello", args);
}
//接收接口调用结果
public String recv_sayHello() throws org.apache.thrift.TException {
sayHello_result result = new sayHello_result();
receiveBase(result, "sayHello");
if (result.isSetSuccess()) {
return result.success;
}
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHello failed: unknown result");
}
}
/**
* 处理器
*/
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());
//初始化processMap
public Processor(I iface) {
super(iface, getProcessMap(new java.util.HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
protected Processor(I iface, java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
super(iface, getProcessMap(processMap));
}
private static <I extends Iface> java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
processMap.put("sayHello", new sayHello());
return processMap;
}
public static class sayHello<I extends Iface> extends org.apache.thrift.ProcessFunction<I, sayHello_args> {
public sayHello() {
super("sayHello");
}
public sayHello_args getEmptyArgsInstance() {
return new sayHello_args();
}
protected boolean isOneway() {
return false;
}
@Override
protected boolean rethrowUnhandledExceptions() {
return false;
}
//调用接口(真正调用到HelloService的实现类HelloServiceImpl中sayHello())
public sayHello_result getResult(I iface, sayHello_args args) throws org.apache.thrift.TException {
sayHello_result result = new sayHello_result();
result.success = iface.sayHello(args.name);
return result;
}
}
}
... ...
}