Сетевое программирование (1): Подробная экономия

Java

Что такое бережливость?

ThriftЭто облегченная межъязыковая платформа удаленного вызова служб, которая поддерживаетC++,Java,Python,PHP,RubyЖдать. Автоматически генерировать RPC-интерфейс с помощью механизма генерации кода.Thrift IDLФайлы, соответствующие различным основным языкамRPCКод шаблона сервера/клиента экономит основную работу по настройке и поддержке кодирования и декодирования интерфейса, передачи сообщений, модели многопоточности сервера и т. д. Серверу нужно только написать класс реализации интерфейса, а клиент вызывает удаленную службу в соответствии с сервисный объект.

Бережливая архитектура

ThriftАрхитектура состоит из транспортного уровня, уровня протокола, уровня обработки и уровня обслуживания снизу вверх.

  • Транспортный уровень в основном отвечает за чтение/запись данных из сети и определяет сетевой транспортный протокол.
  • Уровень протокола определяет формат передачи данных и отвечает за сериализацию и десериализацию сетевых данных.
  • слой обработкиIDLСгенерируйте, инкапсулируйте конкретные базовые методы сетевой передачи и сериализации и делегируйте их пользователю для реализации.Handleдля обработки.
  • Сервисный уровень обеспечивает сервисную модель сетевого ввода-вывода.

Что такое протоколы Thrift?

ThriftПозволяет пользователю выбрать тип протокола связи, передаваемого между клиентом и сервером.

  • TBinaryProtocol: передавать с использованием двоичного формата кодирования,ThriftТранспортный протокол по умолчанию для .
  • TCompactProtocol: передача в сжатом формате.
  • TJSONProtocol:использоватьJSONпередача форматов.
  • TDebugProtocol: Используйте текстовый формат для передачи, который удобен дляdebug.
  • TSimpleJSONProtocol:поставкаJSONПротокол только для записи, подходящий для анализа с помощью языков сценариев.

Что такое транспортные уровни Thrift?

  • TSocket: Блокирующий ввод-вывод, используемый на клиенте.
  • TServerSocket: неблокирующий ввод-вывод для прослушивания на стороне сервераTSocket.
  • TNonblockingSocket: Неблокирующий ввод-вывод для создания асинхронных клиентов.
  • TMemoryInputTransport: инкапсулирует массив байтовbyte[]Сделайте входной поток.
  • TFramedTransport: неблокирующий ввод-вывод, передача блоками (аналогичноNIO).

Что на стороне сервера Thrift?

TServer

TServerстатический внутренний класс определенArgs,ArgsНаследовать от абстрактного классаAbstractServerArgs.AbstractServerArgsИспользуя модель строителя,TServerДоступны различные фабрики.

Атрибуты Типы эффект
processorFactory TProcessorFactory Класс фабрики слоя обработки для создания объектов TProcessor
inputTransportFactory TTransportFactory Класс фабрики ввода транспортного уровня для создания объектов TTransport
outputTransportFactory TTransportFactory Выходной класс фабрики транспортного уровня для создания объектов TTransport
inputProtocolFactory TProtocolFactory Класс фабрики ввода уровня протокола для создания объектов TProtocol
outputProtocolFactory TProtocolFactory Класс фабрики вывода уровня протокола для создания объектов TProtocol

TServerОсновной метод:

serve():启动服务。serve()为抽象方法,不同实现类的启动方式不一样,可各自实现。
stop():关闭服务。
isServing():检测服务状态(启动/关闭)。
setServing(boolean serving):设置服务状态。

TSimpleServer

1. Особенности

Однопоточный блокирующий ввод-вывод.

2. Дизайн-мышление

Основной поток отвечает за мониторинг, чтение и запись данных и бизнес-обработку (одновременно может быть получен и обработан только один поток).socketсоединять).

3. Используйте

Клиент:

public class HelloClient {
    private static final Logger LOGGER = Logger.getLogger(HelloClient.class.getName());
    public static void main(String[] args) {
        TTransport transport = null;
        try {
            //传输层使用阻塞I/O
            transport = new TSocket("127.0.0.1", 9090);
            transport.open();
            //使用二进制协议传输数据
            TProtocol protocol = new TBinaryProtocol(transport);
            //使用同步客户端
            GreetingService.Client client = new GreetingService.Client(protocol);
            String name = "XuDT";
            LOGGER.info("HelloClient 请求参数[name]=" + name);
            //调用接口
            String result = client.sayHello(name);
            LOGGER.info("Server 返回结果为" + result);
        } catch (TException e) {
            e.printStackTrace();
        } finally {
            transport.close();
        }
    }
}

Сервер:

public class SimpleServer {
    private static final Logger LOGGER = Logger.getLogger(SimpleServer.class.getName());
    public static void main(String[] args) {
        try {
            //监听端口9090
            TServerSocket serverTransport = new TServerSocket(9090);
            //使用二进制协议传输数据
            TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
            //关联处理器与HelloService服务实现
            TProcessor processor = new HelloService.Processor(new HelloServiceImpl());
            TSimpleServer.Args serverArgs = new TSimpleServer.Args(serverTransport);
            serverArgs.processor(processor);
            serverArgs.protocolFactory(proFactory);
            //使用TSimpleServer服务端
            TServer server = new TSimpleServer(serverArgs);
            LOGGER.info("Start SimpleServer on port 9090...");
            //启动服务
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }
}

ProcessorзаHelloServiceвнутренний класс, вызывающийHelloService.Processor(new HelloServiceImpl())создастprocessMap,keyимя интерфейса,valueвызвать объект для метода, а затемTBaseProcessor.process()сквозьprocessMapпровестиprocessMap.get(接口名称)Операция получает интерфейс.

4. Анализ исходного кода TSimpleServer

TSimpleServerнаследоватьTServer, выполненоTServerизserve()а такжеstop()метод.

public class TSimpleServer extends TServer {

  private static final Logger LOGGER = LoggerFactory.getLogger(TSimpleServer.class.getName());

  public TSimpleServer(AbstractServerArgs args) {
    super(args);
  }

  /**
   * 启动服务
   */
  public void serve() {
    try {
      //监听端口
      serverTransport_.listen();
    } catch (TTransportException ttx) {
      LOGGER.error("Error occurred during listening.", ttx);
      return;
    }

    // Run the preServe event
    if (eventHandler_ != null) {
      eventHandler_.preServe();
    }
    //开启服务
    setServing(true);
    //循环等待客户端请求
    while (!stopped_) {
      TTransport client = null;
      TProcessor processor = null;
      TTransport inputTransport = null;
      TTransport outputTransport = null;
      TProtocol inputProtocol = null;
      TProtocol outputProtocol = null;
      ServerContext connectionContext = null;
      try {
        //接受连接
        client = serverTransport_.accept();
        if (client != null) {
          //TProcessorFactory处理器
          processor = processorFactory_.getProcessor(client);
          //获取客户端输入通道
          inputTransport = inputTransportFactory_.getTransport(client);
          //获取客户端输出通道
          outputTransport = outputTransportFactory_.getTransport(client);
          //获取客户端输入协议
          inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
          //获取客户端输出协议
          outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
          if (eventHandler_ != null) {
            connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
          }
          //处理请求
          while (true) {
            if (eventHandler_ != null) {
              eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
            }
            //处理请求
            processor.process(inputProtocol, outputProtocol);
          }
        }
      } catch (TTransportException ttx) {
        // Client died, just move on
      } catch (TException tx) {
        if (!stopped_) {
          LOGGER.error("Thrift error occurred during processing of message.", tx);
        }
      } catch (Exception x) {
        if (!stopped_) {
          LOGGER.error("Error occurred during processing of message.", x);
        }
      }
      if (eventHandler_ != null) {
        //删除事件
        eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
      }
      //关闭输入通道
      if (inputTransport != null) {
        inputTransport.close();
      }
      //关闭输出通道
      if (outputTransport != null) {
        outputTransport.close();
      }
    }
    //关闭服务
    setServing(false);
  }

 /**
   * 停止服务
   */
  public void stop() {
    stopped_ = true;
    serverTransport_.interrupt();
  }
}

TBaseProcessor.process(): вызов интерфейса для обработки запроса. Сначала получите информацию о запросе, включая параметры запроса, имя вызывающей функции и т. д., а затем получите соответствующий вызывающий объект интерфейса из processMap в соответствии с именем вызывающей функции, чтобы вызвать интерфейс для обработки запроса.

  public void process(TProtocol in, TProtocol out) throws TException {
    //获取请求信息:参数、调用函数名等
    TMessage msg = in.readMessageBegin();
    //根据函数名获取处理函数
    ProcessFunction fn = processMap.get(msg.name);
    //异常处理
    if (fn == null) {
      TProtocolUtil.skip(in, TType.STRUCT);
      in.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
      x.write(out);
      out.writeMessageEnd();
      out.getTransport().flush();
    } else {
      //处理请求
      fn.process(msg.seqid, in, out, iface);
    }
  }

ProcessFunctionявляется абстрактным классом, и подклассы также основаны наIDLгенерируется автоматически, сIDLФункции во взаимно однозначном соответствии являются прокси-процессорами.

ProcessFunction.process(): вызов интерфейса для обработки бизнес-запроса и возврата результата. Сначала инкапсулируйте параметры запроса, вызовите соответствующий интерфейс в соответствии с параметром и определением интерфейса и получите результат обработки запроса, а затем верните результат обработки запроса клиенту.

public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
    //获取一个空的参数封装
    T args = getEmptyArgsInstance();
    try {
      //从inputProtocol中获取参数赋给args
      args.read(iprot);
    } catch (TProtocolException e) {
      //异常处理
    }
    iprot.readMessageEnd();
    TSerializable result = null;
    byte msgType = TMessageType.REPLY;
    try {
      //根据参数args调用接口
      result = getResult(iface, args);
    } catch (TTransportException ex) {
      //异常处理
    }
    if(!isOneway()) {
      //输出调用结果到outputProtocol
      oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid));
      result.write(oprot);
      oprot.writeMessageEnd();
      oprot.getTransport().flush();
    }
  }

5. Временная диаграмма

6. Неадекватность

TSimpleServerможет обрабатывать только один за разsocketсвязь менее эффективна.

TThreadPoolServer

1. Особенности

Многопоточный, блокирующий ввод-вывод.

2. Дизайн-мышление

Основной поток отвечает за блокировку слушателяsocket, когда естьsocketКогда все будет готово, инкапсулируйте его вWorkerProcessОбъект передается в пул потоков, а пул потоков отвечает за чтение и запись данных, а также бизнес-обработку и возвращает результат клиенту.

线程池默认最小线程数为5,最大线程数为Integer.MAX_VALUE。

3. Используйте

Клиент тот жеTSimpleServer.

Сервер:

public class ThreadPoolServer {
    private static final Logger LOGGER = Logger.getLogger(ThreadPoolServer.class.getName());

    public static void main(String[] args) {
        try {
            //监听端口9090
            TServerSocket serverTransport = new TServerSocket(9090);
            //使用二进制协议传输数据
            TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
            //关联处理器与HelloService服务实现
            TProcessor processor = new HelloService.Processor(new HelloServiceImpl());
            TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
            serverArgs.processor(processor);
            serverArgs.protocolFactory(proFactory);
            //使用TThreadPoolServer服务端
            TServer server = new TThreadPoolServer(serverArgs);
            LOGGER.info("Start ThreadPoolServer on port 9090...");
            //启动服务
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }
}

4. Анализ исходного кода

TThreadPoolServerнаследоватьTServer, выполненоTServerизserve()а такжеstop()метод.

public class TThreadPoolServer extends TServer {
  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName());
  //参数初始化
  public static class Args extends AbstractServerArgs<Args> {
  	//线程池参数
    public int minWorkerThreads = 5;
    public int maxWorkerThreads = Integer.MAX_VALUE;
    public ExecutorService executorService;
    public int stopTimeoutVal = 60;
    public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
    public int requestTimeout = 20;
    public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
    public int beBackoffSlotLength = 100;
    public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS;

    public Args(TServerTransport transport) {
      super(transport);
    }
    public Args minWorkerThreads(int n) {
      minWorkerThreads = n;
      return this;
    }
    public Args maxWorkerThreads(int n) {
      maxWorkerThreads = n;
      return this;
    }
    public Args stopTimeoutVal(int n) {
      stopTimeoutVal = n;
      return this;
    }
    public Args stopTimeoutUnit(TimeUnit tu) {
      stopTimeoutUnit = tu;
      return this;
    }
    public Args requestTimeout(int n) {
      requestTimeout = n;
      return this;
    }
    public Args requestTimeoutUnit(TimeUnit tu) {
      requestTimeoutUnit = tu;
      return this;
    }
    //Binary exponential backoff slot length
    public Args beBackoffSlotLength(int n) {
      beBackoffSlotLength = n;
      return this;
    }
    //Binary exponential backoff slot time unit
    public Args beBackoffSlotLengthUnit(TimeUnit tu) {
      beBackoffSlotLengthUnit = tu;
      return this;
    }
    public Args executorService(ExecutorService executorService) {
      this.executorService = executorService;
      return this;
    }
  }
  //工作线程池
  private ExecutorService executorService_;
  private final TimeUnit stopTimeoutUnit;
  private final long stopTimeoutVal;
  private final TimeUnit requestTimeoutUnit;
  private final long requestTimeout;
  private final long beBackoffSlotInMillis;
  private Random random = new Random(System.currentTimeMillis());
  
  //TThreadPoolServer构造函数会实例化一个线程池
  public TThreadPoolServer(Args args) {
    super(args);
    stopTimeoutUnit = args.stopTimeoutUnit;
    stopTimeoutVal = args.stopTimeoutVal;
    requestTimeoutUnit = args.requestTimeoutUnit;
    requestTimeout = args.requestTimeout;
    beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength);
    //实例化线程池(可以选择自己创建线程池后以参数形式传进来或者由TThreadPoolServer创建)
    executorService_ = args.executorService != null ?
        args.executorService : createDefaultExecutorService(args);
  }

  //创建线程池
  private static ExecutorService createDefaultExecutorService(Args args) {
    //线程池等待队列
    SynchronousQueue<Runnable> executorQueue =
      new SynchronousQueue<Runnable>();
    return new ThreadPoolExecutor(args.minWorkerThreads,
                                  args.maxWorkerThreads,
                                  args.stopTimeoutVal,
                                  args.stopTimeoutUnit,
                                  executorQueue);
  }

  protected ExecutorService getExecutorService() {
    return executorService_;
  }
  
  //开启服务器进行监听
  protected boolean preServe() {
  	try {
  	  //监听端口9090
      serverTransport_.listen();
    } catch (TTransportException ttx) {
      LOGGER.error("Error occurred during listening.", ttx);
      return false;
    }

    // Run the preServe event
    if (eventHandler_ != null) {
      eventHandler_.preServe();
    }
    stopped_ = false;
    //开启服务
    setServing(true);
    
    return true;
  }

  //启动服务
  public void serve() {
  	if (!preServe()) {
  		return;
  	}
    //处理请求
  	execute();
  	//服务停止后关闭线程池
  	waitForShutdown();
    //关闭服务
    setServing(false);
  }
  
  //处理请求
  protected void execute() {
    int failureCount = 0;
    //循环等待请求
    while (!stopped_) {
      try {
        //接受连接
        TTransport client = serverTransport_.accept();
        //将客户端请求封装成一个WorkerProcess对象后丢给线程池进行处理
        WorkerProcess wp = new WorkerProcess(client);
        //记录加入线程池的重试次数
        int retryCount = 0;
        //剩余的重试时间
        long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
        while(true) {
          try {
            //提交线程池处理请求
            executorService_.execute(wp);
            break;
          } catch(Throwable t) {
            //抛异常则重试
            if (t instanceof RejectedExecutionException) {
              retryCount++;
              try {
                if (remainTimeInMillis > 0) {
                  //do a truncated 20 binary exponential backoff sleep
                  long sleepTimeInMillis = ((long) (random.nextDouble() *
                      (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
                  sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
                  TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
                  remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
                } else {
                  client.close();
                  wp = null;
                  LOGGER.warn("Task has been rejected by ExecutorService " + retryCount
                      + " times till timedout, reason: " + t);
                  break;
                }
              } catch (InterruptedException e) {
                LOGGER.warn("Interrupted while waiting to place client on executor queue.");
                Thread.currentThread().interrupt();
                break;
              }
            } else if (t instanceof Error) {
              LOGGER.error("ExecutorService threw error: " + t, t);
              throw (Error)t;
            } else {
              //for other possible runtime errors from ExecutorService, should also not kill serve
              LOGGER.warn("ExecutorService threw error: " + t, t);
              break;
            }
          }
        }
      } catch (TTransportException ttx) {
        if (!stopped_) {
          ++failureCount;
          LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
        }
      }
    }
  }
  
  //服务停止后关闭线程池
  protected void waitForShutdown() {
    //不再接受新的线程,等待之前提交的线程都处理完毕后关闭线程池
  	executorService_.shutdown();

    long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
    long now = System.currentTimeMillis();
    while (timeoutMS >= 0) {
      try {
        //阻塞,唤醒条件:所有任务执行完毕且shutdown请求被调用或timeoutMS时间到达或当前线程被中断
        executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
        break;
      } catch (InterruptedException ix) {
        long newnow = System.currentTimeMillis();
        timeoutMS -= (newnow - now);
        now = newnow;
      }
    }
  }

  //停止服务
  public void stop() {
    stopped_ = true;
    serverTransport_.interrupt();
  }

  //WorkerProcess实现Runnable,在run()方法中进行具体的业务处理
  private class WorkerProcess implements Runnable {
    private TTransport client_;
    private WorkerProcess(TTransport client) {
      client_ = client;
    }

    //具体的业务处理(其实就是将TSimpleServer中业务处理部分剥离出来放到run()方法中)
    public void run() {
      TProcessor processor = null;
      TTransport inputTransport = null;
      TTransport outputTransport = null;
      TProtocol inputProtocol = null;
      TProtocol outputProtocol = null;
      TServerEventHandler eventHandler = null;
      ServerContext connectionContext = null;

      try {
        processor = processorFactory_.getProcessor(client_);
        inputTransport = inputTransportFactory_.getTransport(client_);
        outputTransport = outputTransportFactory_.getTransport(client_);
        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);

        eventHandler = getEventHandler();
        if (eventHandler != null) {
          connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
        }
       
        while (true) {
            if (eventHandler != null) {
              eventHandler.processContext(connectionContext, inputTransport, outputTransport);
            }

            if (stopped_) {
              break;
            }
            processor.process(inputProtocol, outputProtocol);
        }
      } catch (Exception x) {
        if (!isIgnorableException(x)) {
          LOGGER.error((x instanceof TException? "Thrift " : "") + "Error occurred during processing of message.", x);
        }
      } finally {
        if (eventHandler != null) {
          eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
        }
        if (inputTransport != null) {
          inputTransport.close();
        }
        if (outputTransport != null) {
          outputTransport.close();
        }
        if (client_.isOpen()) {
          client_.close();
        }
      }
    }
    ... ...
  }
}

5. Преимущества

TThreadPoolServerсуществуетTSimpleServerОсновываясь на введении пула потоков, основной поток отвечает за мониторинг, пул потоков отвечает за чтение и запись данных и конкретную бизнес-обработку, а также может принимать соединения вовремя, когда параллелизм увеличивается. Это подходит для ситуации, когда сервер может предсказать максимальное количество одновременных клиентов.

6. Неадекватность

TThreadPoolServerЭто по-прежнему метод блокировки для мониторинга клиентских подключений, а возможности бизнес-обработки ограничены пулом потоков.Когда количество одновременных запросов превышает количество потоков в пуле потоков, новые запросы могут только блокироваться и ждать.

TNonblockingServer

1. Особенности

Однопоточный неблокирующий ввод-вывод.

2. Дизайн-мышление

TNonblockingServerсоздалSelectAcceptThreadнить, черезNIOрежим монитора несколькоsocketи читать и записывать данные.

  • поставивsocketзарегистрироваться наSelectorРеализуйте поток для мониторинга несколькихsocket,каждый разSelector.select()Конец цикла возвращает все готовоsocket:

    • для чтенияsocketПосле считывания данных запускается функция обратного вызова для выполнения соответствующей бизнес-обработки, и результат обработки возвращается клиенту;
    • для записиsocketвыполнять операции записи данных;
    • Принимайте запросы клиентов на подключение и регистрируйтесь наSelectorначальство;

3. Используйте

Клиент:

public class HelloClient {
    private static final Logger LOGGER = Logger.getLogger(HelloClient.class.getName());
    public static void main(String[] args) {
        TTransport transport = null;
        try {
            //传输层使用非阻塞I/O
            transport = new TFramedTransport.Factory().getTransport(new TSocket("127.0.0.1", 9090));
            transport.open();
            //使用二进制协议传输数据
            TProtocol protocol = new TBinaryProtocol(transport);
            //使用同步客户端
            HelloService.Client client = new HelloService.Client(protocol);
            String name = "XuDT";
            LOGGER.info("HelloClient 请求参数[name]=" + name);
            //调用接口
            String result = client.sayHello(name);
            LOGGER.info("Server 返回结果为" + result);
        } catch (TException e) {
            e.printStackTrace();
        } finally {
            transport.close();
        }
    }
}

Сервер:

public class NonblockingServer {
    private static final Logger LOGGER = Logger.getLogger(NonblockingServer.class.getName());

    public static void main(String[] args) {
        try {
            //监听端口9090
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
            //使用二进制协议传输数据
            TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
            //关联处理器与HelloService服务实现
            TProcessor processor = new HelloService.Processor(new HelloServiceImpl());
            TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
            serverArgs.processor(processor);
            serverArgs.protocolFactory(proFactory);
            serverArgs.transportFactory(new TFramedTransport.Factory());
            //使用TNonblockingServer服务端
            TServer server = new TNonblockingServer(serverArgs);
            LOGGER.info("Start NonblockingServer on port 9090...");
            //启动服务
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }
}
TNonblockingServer传输层只能使用TFramedTransport。

4. Анализ исходного кода

TNonblockingServerнаследоватьAbstractNonblockingServer,AbstractNonblockingServerнаследоватьTServer.

AbstractNonblockingServer:

public abstract class AbstractNonblockingServer extends TServer {
  protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());

  public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
    public long maxReadBufferBytes = 256 * 1024 * 1024;

    public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
      super(transport);
      transportFactory(new TFramedTransport.Factory());
    }
  }

  final long MAX_READ_BUFFER_BYTES;
  final AtomicLong readBufferBytesAllocated = new AtomicLong(0);

  public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
    super(args);
    MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
  }

  /**
   * 启动服务
   */
  public void serve() {
    //启动SelectAcceptThread线程
    if (!startThreads()) {
      return;
    }
    //启动监听
    if (!startListening()) {
      return;
    }
    //开启服务
    setServing(true);
    //阻塞等待请求并处理
    waitForShutdown();
    //关闭服务
    setServing(false);
    //停止监听,关闭ServerSocket
    stopListening();
  }
  //启动SelectAcceptThread线程
  protected abstract boolean startThreads();
  //阻塞等待请求并处理
  protected abstract void waitForShutdown();
  //启动监听
  protected boolean startListening() {
    try {
      serverTransport_.listen();
      return true;
    } catch (TTransportException ttx) {
      LOGGER.error("Failed to start listening on server socket!", ttx);
      return false;
    }
  }
  //停止监听,关闭ServerSocket
  protected void stopListening() {
    serverTransport_.close();
  }
  //业务处理回调方法
  protected abstract boolean requestInvoke(FrameBuffer frameBuffer);

  /**
   * AbstractSelectThread继承了Thread
   */
  protected abstract class AbstractSelectThread extends Thread {
    protected Selector selector;

    protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();

    public AbstractSelectThread() throws IOException {
      this.selector = SelectorProvider.provider().openSelector();
    }

	//唤醒阻塞在Selector.select()上的线程
    public void wakeupSelector() {
      selector.wakeup();
    }
	
    public void requestSelectInterestChange(FrameBuffer frameBuffer) {
      synchronized (selectInterestChanges) {
        selectInterestChanges.add(frameBuffer);
      }
      selector.wakeup();
    }

	//改变事件所感兴趣的类型,比如从读转为写(读取了客户端的请求,执行完相应的业务处理后,要将数据返回给客户端),从写转为读(要将数据返回给客户端后,重新开始接受客户端新的的请求)
    protected void processInterestChanges() {
      synchronized (selectInterestChanges) {
        for (FrameBuffer fb : selectInterestChanges) {
          fb.changeSelectInterests();
        }
        selectInterestChanges.clear();
      }
    }

    //读取客户端数据
    protected void handleRead(SelectionKey key) {
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      //读取客户端数据失败,则清除该selectionKey
      if (!buffer.read()) {
        cleanupSelectionKey(key);
        return;
      }

      //读取客户端数据成功
      if (buffer.isFrameFullyRead()) {
        //触发回调(调用相应的方法进行业务处理)
        if (!requestInvoke(buffer)) {
          //清除该selectionKey
          cleanupSelectionKey(key);
        }
      }
    }
    
    //向客户端写入数据
    protected void handleWrite(SelectionKey key) {
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (!buffer.write()) {
        cleanupSelectionKey(key);
      }
    }
    
    //清除selectionKey
    protected void cleanupSelectionKey(SelectionKey key) {
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (buffer != null) {
        buffer.close();
      }
      key.cancel();
    }
  } 
  ... ...
}

TNonblockingServer:

public class TNonblockingServer extends AbstractNonblockingServer {

  public static class Args extends AbstractNonblockingServerArgs<Args> {
    public Args(TNonblockingServerTransport transport) {
      super(transport);
    }
  }

  //监听线程
  private SelectAcceptThread selectAcceptThread_;

  public TNonblockingServer(AbstractNonblockingServerArgs args) {
    super(args);
  }

  /**
   * 重写AbstractNonblockingServer.startThreads()方法,开启线程处理客户端请求
   */
  @Override
  protected boolean startThreads() {
    try {
      //实例化selectAcceptThread_
      selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
      //启动线程
      selectAcceptThread_.start();
      return true;
    } catch (IOException e) {
      LOGGER.error("Failed to start selector thread!", e);
      return false;
    }
  }

  //重写AbstractNonblockingServer.waitForShutdown(),主线程阻塞等待SelectAcceptThread线程返回
  @Override
  protected void waitForShutdown() {
    joinSelector();
  }

  /**
   * 启动Selector监听线程
   */
  protected void joinSelector() {
    try {
      //主线程阻塞等待selectAcceptThread线程返回
      selectAcceptThread_.join();
    } catch (InterruptedException e) {
      LOGGER.debug("Interrupted while waiting for accept thread", e);
      Thread.currentThread().interrupt();
    }
  }

  /**
   * 停止服务
   */
  @Override
  public void stop() {
    stopped_ = true;
    if (selectAcceptThread_ != null) {
      selectAcceptThread_.wakeupSelector();
    }
  }

  /**
   * 客户端可读时触发的回调方法,具体回调操作定义在AbstractNonblockingServer的内部类FrameBuffer.invoke(),invoke()通过处理器TProcessorFactory的processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_)方法进行连接数据读取、接口调用、处理结果返回客户端等
   */
  @Override
  protected boolean requestInvoke(FrameBuffer frameBuffer) {
    frameBuffer.invoke();
    return true;
  }

  public boolean isStopped() {
    return selectAcceptThread_.isStopped();
  }

  /**
   * SelectAcceptThread继承了AbstractSelectThread,AbstractSelectThread继承了Thread,AbstractSelectThread是AbstractNonblockingServer的内部类
   */
  protected class SelectAcceptThread extends AbstractSelectThread {

    //服务端传输通道serverTransport
    private final TNonblockingServerTransport serverTransport;

    public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
    throws IOException {
      this.serverTransport = serverTransport;
      //将服务端通道serverTransport注册到Selector实现一个线程监听多个通道,Selector定义在AbstractSelectThread
      serverTransport.registerSelector(selector);
    }

    public boolean isStopped() {
      return stopped_;
    }

    /**
     * 处理请求
     */
    public void run() {
      try {
        if (eventHandler_ != null) {
          eventHandler_.preServe();
        }
        //循环等待请求
        while (!stopped_) {
          //阻塞监听所有注册在Selector上的socket,并处理就绪socket
          select();
          //处理读写事件切换
          processInterestChanges();
        }
        //服务端停止后删除Selector中的所有监听的selectionKey
        for (SelectionKey selectionKey : selector.keys()) {
          cleanupSelectionKey(selectionKey);
        }
      } catch (Throwable t) {
        LOGGER.error("run() exiting due to uncaught error", t);
      } finally {
        try {
          selector.close();
        } catch (IOException e) {
          LOGGER.error("Got an IOException while closing selector!", e);
        }
        stopped_ = true;
      }
    }

    /**
     * 阻塞监听所有注册在Selector上的socket
     */
    private void select() {
      try {
        //阻塞监听请求,每次select()结束获取所有就绪socket
        selector.select();

        //获取所有就绪socket
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        //处理就绪socket
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();
          // skip if not valid
          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }
          if (key.isAcceptable()) {
            //接受连接
            handleAccept();
          } else if (key.isReadable()) {
            //读取数据
            handleRead(key);
          } else if (key.isWritable()) {
            //写数据
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

    protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
        final SelectionKey selectionKey,
        final AbstractSelectThread selectThread) {
        return processorFactory_.isAsyncProcessor() ?
                  new AsyncFrameBuffer(trans, selectionKey, selectThread) :
                  new FrameBuffer(trans, selectionKey, selectThread);
    }

    /**
     * 处理客户端连接请求
     */
    private void handleAccept() throws IOException {
      SelectionKey clientKey = null;
      TNonblockingTransport client = null;
      try {
        //接受客户端连接请求
        client = (TNonblockingTransport)serverTransport.accept();
        //将客户端连接注册到Selector上,为了后续处理该客户端连接上的请求
        clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
        //实例化FrameBuffer
        FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);
        //将frameBuffer添加到clientKey,后续handleRead()和handleWrite()读写数据都是基于这个frameBuffer
        clientKey.attach(frameBuffer);
      } catch (TTransportException tte) {
        // something went wrong accepting.
        LOGGER.warn("Exception trying to accept!", tte);
        if (clientKey != null) cleanupSelectionKey(clientKey);
        if (client != null) client.close();
      }
    }
  } 
}

5. Преимущества

Благодаря мультиплексированию ввода-вывода один поток контролирует несколькоsocket.

6. Неадекватность

TNonblockingServerЧтение и запись данных, а также бизнес-обработка выполняются с использованием однопоточной блокировки, когда бизнес-обработка сложна или требует много времени, сервисы будут заблокированы, а эффективность невысока.

THsHaServer

1. Особенности

Полусинхронный и полуасинхронный, неблокирующий ввод-вывод.

2. Дизайн-мышление

Синхронный неблокирующий: черезSelectAcceptThreadПотоки реализуют мониторинг (неблокирующий ввод-вывод), чтение и запись данных (синхронизацию) через NIO. Асинхронный: бизнес-обработка выполняется через пул потоков.

3. Используйте

Клиент тот жеTNonblockingServer.

Сервер:

public class HsHaServer {
    private static final Logger LOGGER = Logger.getLogger(HsHaServer.class.getName());

    public static void main(String[] args) {
        try {
            //监听端口9090
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
            //使用二进制协议传输数据
            TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
            //关联处理器与HelloService服务实现
            TProcessor processor = new HelloService.Processor(new HelloServiceImpl());
            THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
            serverArgs.processor(processor);
            serverArgs.protocolFactory(proFactory);
            serverArgs.transportFactory(new TFramedTransport.Factory());
            //使用THsHaServer服务端
            TServer server = new THsHaServer(serverArgs);
            LOGGER.info("Start HsHaServer on port 9090...");
            //启动服务
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }
}

4. Анализ исходного кода

THsHaServerнаследоватьTNonblockingServer.

public class THsHaServer extends TNonblockingServer {

  public static class Args extends AbstractNonblockingServerArgs<Args> {
	//设置线程池参数
    public int minWorkerThreads = 5;
    public int maxWorkerThreads = Integer.MAX_VALUE;
    private int stopTimeoutVal = 60;
    private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
    private ExecutorService executorService = null;

    public Args(TNonblockingServerTransport transport) {
      super(transport);
    }

	//设置线程池最少线程数
    public Args minWorkerThreads(int n) {
      minWorkerThreads = n;
      return this;
    }

	//设置线程池最大线程数
    public Args maxWorkerThreads(int n) {
      maxWorkerThreads = n;
      return this;
    }

	//获取线程池最少线程数
    public int getMinWorkerThreads() {
      return minWorkerThreads;
    }

	//获取线程池最大线程数
    public int getMaxWorkerThreads() {
      return maxWorkerThreads;
    }

    public int getStopTimeoutVal() {
      return stopTimeoutVal;
    }

    public Args stopTimeoutVal(int stopTimeoutVal) {
      this.stopTimeoutVal = stopTimeoutVal;
      return this;
    }

    public TimeUnit getStopTimeoutUnit() {
      return stopTimeoutUnit;
    }

    public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
      this.stopTimeoutUnit = stopTimeoutUnit;
      return this;
    }

    public ExecutorService getExecutorService() {
      return executorService;
    }

    public Args executorService(ExecutorService executorService) {
      this.executorService = executorService;
      return this;
    }
  }


  //指向工作线程池,方便Selector调用工作线程
  private final ExecutorService invoker;

  private final Args args;

  public THsHaServer(Args args) {
    super(args);
	//线程池可以自己定义后以参数的形式传入或者由THsHaServer创建
    invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
    this.args = args;
  }

  //重写AbstractNonblockingServer.waitForShutdown()
  @Override
  protected void waitForShutdown() {
	//主线程阻塞等待SelectAcceptThread线程返回,调用的是TNonblockingServer.joinSelector()
    joinSelector();
	//关闭线程池
    gracefullyShutdownInvokerPool();
  }

  //创建线程池
  protected static ExecutorService createInvokerPool(Args options) {
    int minWorkerThreads = options.minWorkerThreads;
    int maxWorkerThreads = options.maxWorkerThreads;
    int stopTimeoutVal = options.stopTimeoutVal;
    TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
    ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
      maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);

    return invoker;
  }

  protected ExecutorService getInvoker() {
    return invoker;
  }

  //关闭线程池
  protected void gracefullyShutdownInvokerPool() {
	//shutdown()会等待正在处理的任务结束后再关闭线程池
    invoker.shutdown();

    long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
    long now = System.currentTimeMillis();
    while (timeoutMS >= 0) {
      try {
		//awaitTermination()会一直等待直到线程池状态为TERMINATED或者超时时间到
        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
        break;
      } catch (InterruptedException ix) {
        long newnow = System.currentTimeMillis();
        timeoutMS -= (newnow - now);
        now = newnow;
      }
    }
  }

  //重写AbstractNonblockingServer.requestInvoke(),调用线程池执行业务处理
  @Override
  protected boolean requestInvoke(FrameBuffer frameBuffer) {
    try {
	  //获取要执行的业务处理
      Runnable invocation = getRunnable(frameBuffer);
	  //执行业务处理
      invoker.execute(invocation);
      return true;
    } catch (RejectedExecutionException rx) {
      LOGGER.warn("ExecutorService rejected execution!", rx);
      return false;
    }
  }

  protected Runnable getRunnable(FrameBuffer frameBuffer){
    return new Invocation(frameBuffer);
  }
}


class Invocation implements Runnable {
  private final FrameBuffer frameBuffer;

  public Invocation(final FrameBuffer frameBuffer) {
    this.frameBuffer = frameBuffer;
  }

  public void run() {
	//调用的是AbstractNonblockingServer的内部类FrameBuffer..invoke()
    frameBuffer.invoke();
  }
}

5. Преимущества

THsHaServerсуществуетTNonblockingServerНа базе реализован пул потоков, когда чтение данных завершено, они передаются в пул потоков для бизнес-обработки, а основной поток возвращается непосредственно для мониторинга следующего цикла, что значительно повышает эффективность.

6. Неадекватность

THsHaServerвсе ещеSelectAcceptThreadПотоки отвечают за мониторинг, чтение и запись данных, и при большом количестве одновременных запросов запросы могут быть не приняты вовремя.

TThreadedSelectorServer

1. Особенности

Многопоточный неблокирующий ввод-вывод.

2. Дизайн-мышление

Раздельный мониторинг, чтение и запись данных и бизнес-обработка:

  • ОдинAcceptThreadОбъект потока специально используется для прослушивания клиентских запросов на подключение;
  • несколькоSelectorThreadОбъекты потоков (по умолчанию 2) используются для чтения и записи данных;
  • балансировщик нагрузкиSelectorThreadLoadBalancerобъект используется дляAcceptThreadНовые запросы на соединение, полученные потоком, назначаютсяSelectorThreadнить.
  • ОдинExecutorServiceПул потоков используется для бизнес-обработки. когдаSelectorThreadМетод обратного вызова запускается после того, как объект потока прочитал данныеrequestInvoke()Передайте запрос в пул потоков для бизнес-обработки.

3. Используйте

Клиент тот жеTNonblockingServer.

Сервер:

public class ThreadedSelectorServer {
    private static final Logger LOGGER = Logger.getLogger(ThreadedSelectorServer.class.getName());

    public static void main(String[] args) {
        try {
            //监听端口9090
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
            //使用二进制协议传输数据
            TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
            //关联处理器与HelloService服务实现
            TProcessor processor = new HelloService.Processor(new HelloServiceImpl());
            TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
            serverArgs.processor(processor);
            serverArgs.protocolFactory(proFactory);
            //使用TThreadedSelectorServer服务端
            TServer server = new TThreadedSelectorServer(serverArgs);
            LOGGER.info("Start ThreadedSelectorServer on port 9090...");
            //启动服务,调用到的是AbstractNonblockingServer.serve()
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }
}

4. Анализ исходного кода

TThreadedSelectorServerнаследоватьAbstractNonblockingServer, который содержит внутренний классAcceptThread,SelectorThread,SelectorThreadLoadBalancer.

TThreadedSelectorServer:

public class TThreadedSelectorServer extends AbstractNonblockingServer {
  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName());

  public static class Args extends AbstractNonblockingServerArgs<Args> {
	//selectorThreads线程数
    public int selectorThreads = 2;
	//工作线程池核心线程数,如果设置为0则业务处理会交由selectorThreads负责
    private int workerThreads = 5;
    private int stopTimeoutVal = 60;
    private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
	//工作线程池
    private ExecutorService executorService = null;
	//selectorThreads缓存队列大小
    private int acceptQueueSizePerThread = 4;

    public static enum AcceptPolicy {
      FAIR_ACCEPT,
	  //立即接受请求
      FAST_ACCEPT
    }

    private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;

    public Args(TNonblockingServerTransport transport) {
      super(transport);
    }

    public Args selectorThreads(int i) {
      selectorThreads = i;
      return this;
    }

    public int getSelectorThreads() {
      return selectorThreads;
    }

    public Args workerThreads(int i) {
      workerThreads = i;
      return this;
    }

    public int getWorkerThreads() {
      return workerThreads;
    }

    public int getStopTimeoutVal() {
      return stopTimeoutVal;
    }

    public Args stopTimeoutVal(int stopTimeoutVal) {
      this.stopTimeoutVal = stopTimeoutVal;
      return this;
    }

    public TimeUnit getStopTimeoutUnit() {
      return stopTimeoutUnit;
    }

    public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
      this.stopTimeoutUnit = stopTimeoutUnit;
      return this;
    }

    public ExecutorService getExecutorService() {
      return executorService;
    }

    public Args executorService(ExecutorService executorService) {
      this.executorService = executorService;
      return this;
    }

    public int getAcceptQueueSizePerThread() {
      return acceptQueueSizePerThread;
    }

    public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) {
      this.acceptQueueSizePerThread = acceptQueueSizePerThread;
      return this;
    }

    public AcceptPolicy getAcceptPolicy() {
      return acceptPolicy;
    }

    public Args acceptPolicy(AcceptPolicy acceptPolicy) {
      this.acceptPolicy = acceptPolicy;
      return this;
    }
	
	//参数校验
    public void validate() {
      if (selectorThreads <= 0) {
        throw new IllegalArgumentException("selectorThreads must be positive.");
      }
      if (workerThreads < 0) {
        throw new IllegalArgumentException("workerThreads must be non-negative.");
      }
      if (acceptQueueSizePerThread <= 0) {
        throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
      }
    }
  }
  
  //监听线程
  private AcceptThread acceptThread;
  //数据读写线程
  private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
  //工作线程池
  private final ExecutorService invoker;

  private final Args args;

  public TThreadedSelectorServer(Args args) {
    super(args);
	//参数校验
    args.validate();
	//线程池可以自己定义后以参数的形式传入或者由TThreadedSelectorServer创建
    invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
    this.args = args;
  }

  //重写AbstractNonblockingServer.startThreads(),启动AcceptThread、SelectorThread线程
  @Override
  protected boolean startThreads() {
    try {
	  //实例化SelectorThread线程并放进一个Set集合中
      for (int i = 0; i < args.selectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
      }
	  //实例化AcceptThread线程
      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
        createSelectorThreadLoadBalancer(selectorThreads));
	  //启动SelectorThread线程
      for (SelectorThread thread : selectorThreads) {
        thread.start();
      }
	  //启动AcceptThread线程
      acceptThread.start();
      return true;
    } catch (IOException e) {
      LOGGER.error("Failed to start threads!", e);
      return false;
    }
  }

  //重写AbstractNonblockingServer.waitForShutdown()
  @Override
  protected void waitForShutdown() {
    try {
	  //阻塞主线程等待AcceptThread、SelectorThread线程返回
      joinThreads();
    } catch (InterruptedException e) {
      LOGGER.error("Interrupted while joining threads!", e);
    }
	//关闭线程池
    gracefullyShutdownInvokerPool();
  }

  //阻塞主线程等待AcceptThread、SelectorThread线程返回
  protected void joinThreads() throws InterruptedException {
    acceptThread.join();
    for (SelectorThread thread : selectorThreads) {
      thread.join();
    }
  }

  //重写AbstractNonblockingServer.stop(),wakeupSelector()调用了Selector.wakeup(),将会唤醒Selector执行select()时阻塞的线程,进行新的循环,Selector.select()的循环条件是while (!stopped_ ),stopped_已被设置为true,所以新的一次循环将会跳出,从而停止SelectorThread、AcceptThread线程
  @Override
  public void stop() {
    stopped_ = true;
	//停止监听
    stopListening();
	//关闭AcceptThread线程
    if (acceptThread != null) {
	  //wakeupSelector()
      acceptThread.wakeupSelector();
    }
	//关闭SelectorThread线程
    if (selectorThreads != null) {
      for (SelectorThread thread : selectorThreads) {
        if (thread != null)
          thread.wakeupSelector();
      }
    }
  }
  
  //关闭线程池
  protected void gracefullyShutdownInvokerPool() {
    invoker.shutdown();
   
    long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
    long now = System.currentTimeMillis();
    while (timeoutMS >= 0) {
      try {
        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
        break;
      } catch (InterruptedException ix) {
        long newnow = System.currentTimeMillis();
        timeoutMS -= (newnow - now);
        now = newnow;
      }
    }
  }

  //重写AbstractNonblockingServer.requestInvoke(),调用线程池执行业务处理
  @Override
  protected boolean requestInvoke(FrameBuffer frameBuffer) {
    Runnable invocation = getRunnable(frameBuffer);
    if (invoker != null) {
      try {
        invoker.execute(invocation);
        return true;
      } catch (RejectedExecutionException rx) {
        LOGGER.warn("ExecutorService rejected execution!", rx);
        return false;
      }
    } else {
      invocation.run();
      return true;
    }
  }

  protected Runnable getRunnable(FrameBuffer frameBuffer) {
    return new Invocation(frameBuffer);
  }

  //创建线程池
  protected static ExecutorService createDefaultExecutor(Args options) {
    return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null;
  }

  private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) {
    if (queueSize == 0) {
      return new LinkedBlockingQueue<TNonblockingTransport>();
    }
    return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
  }

  //创建一个负载均衡器
  protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
    return new SelectorThreadLoadBalancer(threads);
  }
}

AcceptThread: слушать клиентские подключения

  //监听连接线程
  protected class AcceptThread extends Thread {
	//获取客户端传输通道
    private final TNonblockingServerTransport serverTransport;
	//NIO选择器
    private final Selector acceptSelector;
	//负载均衡器
    private final SelectorThreadLoadBalancer threadChooser;


    public AcceptThread(TNonblockingServerTransport serverTransport,
        SelectorThreadLoadBalancer threadChooser) throws IOException {
      this.serverTransport = serverTransport;
      this.threadChooser = threadChooser;
	  //实例化Selector
      this.acceptSelector = SelectorProvider.provider().openSelector();
	  //将客户端传输通道注册到Selector上
      this.serverTransport.registerSelector(acceptSelector);
    }

	//监听客户端请求
    public void run() {
      try {
        if (eventHandler_ != null) {
          eventHandler_.preServe();
        }
		//循环监听等待客户端请求
        while (!stopped_) {  
          select();
        }
      } catch (Throwable t) {
        LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
      } finally {
        try {
          acceptSelector.close();
        } catch (IOException e) {
          LOGGER.error("Got an IOException while closing accept selector!", e);
        }
        TThreadedSelectorServer.this.stop();
      }
    }

	//关闭线程
    public void wakeupSelector() {
      acceptSelector.wakeup();
    }

	//监听客户端请求
    private void select() {
      try {
		//接收客户端请求
        acceptSelector.select();

        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }
		  //如果是连接请求则调用handleAccept()进行处理,否则不做任何处理
          if (key.isAcceptable()) {
            handleAccept();
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

	//接受客户端连接请求
    private void handleAccept() {
	  //获取客户端传输通道
      final TNonblockingTransport client = doAccept();
      if (client != null) {
        final SelectorThread targetThread = threadChooser.nextThread();
		//接受连接
        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
		  //
          doAddAccept(targetThread, client);
        } else {
          try {
            invoker.submit(new Runnable() {
              public void run() {
                doAddAccept(targetThread, client);
              }
            });
          } catch (RejectedExecutionException rx) {
            LOGGER.warn("ExecutorService rejected accept registration!", rx);
         
            client.close();
          }
        }
      }
    }

	//获取客户端传输通道
    private TNonblockingTransport doAccept() {
      try {
        return (TNonblockingTransport) serverTransport.accept();
      } catch (TTransportException tte) {
       
        LOGGER.warn("Exception trying to accept!", tte);
        return null;
      }
    }

	//接受连接(将客户端通道加入SelectorThread的缓存队列中)
    private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
	  //调用SelectorThread.addAcceptedConnection()
      if (!thread.addAcceptedConnection(client)) {
        client.close();
      }
    }
  } 

SelectorThread: прослушивание событий чтения и записи

  //读写数据线程 
  protected class SelectorThread extends AbstractSelectThread {
	//缓存队列
    private final BlockingQueue<TNonblockingTransport> acceptedQueue;

    public SelectorThread() throws IOException {
      this(new LinkedBlockingQueue<TNonblockingTransport>());
    }

    public SelectorThread(int maxPendingAccepts) throws IOException {
      this(createDefaultAcceptQueue(maxPendingAccepts));
    }


    public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException {
      this.acceptedQueue = acceptedQueue;
    }

	//将客户端通道加入SelectorThread的缓存队列中(如果队列满了将会阻塞线程)
    public boolean addAcceptedConnection(TNonblockingTransport accepted) {
      try {
		//通过队列的put()方法放入队列中
        acceptedQueue.put(accepted);
      } catch (InterruptedException e) {
        LOGGER.warn("Interrupted while adding accepted connection!", e);
        return false;
      }
	  //唤醒阻塞在Selector.select()上的线程
      selector.wakeup();
      return true;
    }

	//读写数据
    public void run() {
      try {
		//循环监听I/O事件
        while (!stopped_) {
		  //处理就绪的可读写事件
          select();
		  //处理客户端连接
          processAcceptedConnections();
		  //监听客户端读写切换,调用的是AbstractNonblockingServer.processInterestChanges()
          processInterestChanges();
        }
        for (SelectionKey selectionKey : selector.keys()) {
          cleanupSelectionKey(selectionKey);
        }
      } catch (Throwable t) {
        LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
      } finally {
        try {
          selector.close();
        } catch (IOException e) {
          LOGGER.error("Got an IOException while closing selector!", e);
        }
        TThreadedSelectorServer.this.stop();
      }
    }

	//处理就绪的可读写事件
    private void select() {
      try {
        doSelect();
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }

          if (key.isReadable()) {
			//处理可读事件
            handleRead(key);
          } else if (key.isWritable()) {
			//处理可写事件
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

	//解决epoll空轮询导致的CPU 100%bug
    private void doSelect() throws IOException {
      long beforeSelect = System.currentTimeMillis();
      int selectedNums = selector.select();
      long afterSelect = System.currentTimeMillis();

      if (selectedNums == 0) {
        jvmBug++;
      } else {
        jvmBug = 0;
      }

      long selectedTime = afterSelect - beforeSelect;
      if (selectedTime >= MONITOR_PERIOD) {
        jvmBug = 0;
      } else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
        LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug);
        rebuildSelector();
        selector.selectNow();
        jvmBug = 0;
      }

    }

    private synchronized void rebuildSelector() {
      final Selector oldSelector = selector;
      if (oldSelector == null) {
        return;
      }
      Selector newSelector = null;
      try {
        newSelector = Selector.open();
        LOGGER.warn("Created new Selector.");
      } catch (IOException e) {
        LOGGER.error("Create new Selector error.", e);
      }

      for (SelectionKey key : oldSelector.selectedKeys()) {
        if (!key.isValid() && key.readyOps() == 0)
          continue;
        SelectableChannel channel = key.channel();
        Object attachment = key.attachment();

        try {
          if (attachment == null) {
            channel.register(newSelector, key.readyOps());
          } else {
            channel.register(newSelector, key.readyOps(), attachment);
          }
        } catch (ClosedChannelException e) {
          LOGGER.error("Register new selector key error.", e);
        }

      }

      selector = newSelector;
      try {
        oldSelector.close();
      } catch (IOException e) {
        LOGGER.error("Close old selector error.", e);
      }
      LOGGER.warn("Replace new selector success.");
    }

	//处理客户端连接
    private void processAcceptedConnections() {
	  //循环取出缓存队列中的客户端传输通道,并注册到Selector上
      while (!stopped_) {
        TNonblockingTransport accepted = acceptedQueue.poll();
        if (accepted == null) {
          break;
        }
        registerAccepted(accepted);
      }
    }

    protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
        final SelectionKey selectionKey,
        final AbstractSelectThread selectThread) {
        return processorFactory_.isAsyncProcessor() ?
                  new AsyncFrameBuffer(trans, selectionKey, selectThread) :
                  new FrameBuffer(trans, selectionKey, selectThread);
    }

	//将客户端传输通道注册到Selector上
    private void registerAccepted(TNonblockingTransport accepted) {
      SelectionKey clientKey = null;
      try {
		//将客户端传输通道注册到Selector上
        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
		//创建一个FrameBuffer,后续这个客户端传输通道上的数据读写都是基于这个FrameBuffer
        FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);

        clientKey.attach(frameBuffer);
      } catch (IOException e) {
        LOGGER.warn("Failed to register accepted connection to selector!", e);
        if (clientKey != null) {
          cleanupSelectionKey(clientKey);
        }
        accepted.close();
      }
    }
  } 

SelectorThreadLoadBalancer: балансировщик нагрузки

  //负载均衡器
  protected static class SelectorThreadLoadBalancer {
    private final Collection<? extends SelectorThread> threads;
    private Iterator<? extends SelectorThread> nextThreadIterator;

	//实例化一个SelectorThreadLoadBalancer,实例化时会将SelectorThread以参数形式传进来
    public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
      if (threads.isEmpty()) {
        throw new IllegalArgumentException("At least one selector thread is required");
      }
      this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
      nextThreadIterator = this.threads.iterator();
    }

	//SelectorThreadLoadBalancer通过轮询的方式来选择SelectorThread线程
    public SelectorThread nextThread() {
      if (!nextThreadIterator.hasNext()) {
        nextThreadIterator = threads.iterator();
      }
      return nextThreadIterator.next();
    }
  }

5. Преимущества

TThreadedSelectorServerдаThriftпревосходная степеньServer, который разделяет мониторинг, чтение и запись данных и бизнес-обработку:

  • есть специальныйAcceptThreadПотоки используются для обработки запросов на подключение и могут своевременно обрабатывать запросы на подключение;
  • Распределите операции ввода-вывода по несколькимSelectorThreadThread, который может быстро читать и записывать данные;
  • Через пул потоков для бизнес-обработки улучшите возможность параллельной обработки.

Сравнение каждого сервера

  • TSimpleServer: один поток блокирует ввод-вывод, а основной поток отвечает за мониторинг, чтение и запись данных и бизнес-обработку.
  • TThreadPoolServer: многопоточный блокирующий ввод-вывод, основной поток отвечает за мониторинг, а пул потоков отвечает за чтение и запись данных и бизнес-обработку.
  • TNonblockingServer: однопоточный неблокирующий ввод-вывод, используемый основным потокомNIOОтвечает за мониторинг (неблокирующий ввод-вывод), чтение и запись данных и бизнес-обработку.
  • THsHaServer: полусинхронный и полуасинхронный, основной поток использует NIO для мониторинга (неблокирующий ввод-вывод), чтения и записи данных, а пул потоков отвечает за бизнес-обработку.
  • TThreadedSelectorServer: многопоточный неблокирующий ввод/вывод,AcceptThreadПотоки используют NIO для мониторинга (неблокирующий ввод-вывод), несколькоSelectorThreadПотоки отвечают за чтение и запись данных, а пул потоков отвечает за бизнес-обработку.

Пример бережливого использования

  1. Создайте файл интерфейса службы(HelloService.thrift):HelloServiceвключена 1 услугаsayHelloметод.
namespace java com.xudt.thrift.service
service HelloService {
  string sayHello(1:string name)
}
  1. http://thrift.apache.org/downloadскачатьThrift IDLпереводчик.

  2. будетthrift exeа такжеHelloService.thriftв той же папке,cmdзайдите в эту папку и выполните командуthrift-0.13.0.exe -gen java hello.thrift, который генерирует сервисный интерфейсHelloService.javaдокумент.

  3. Создаватьthrift-demo MavenПроект является родительским модулем, а затем создаются 4 подмодуля:

  • thrift-demo-interface: место храненияHelloService.thriftгенерируется служебным файломHelloService.javaкод.
  • thrift-demo-service: реализует интерфейс службы.
  • thrift-demo-server:Сервис-терминал.
  • thrift-demo-client: Клиент.

Сгенерировать сервисный интерфейс.javaФайл содержит:

  • сервер синхронизацииIface
  • Асинхронный серверAsyncIface
  • Клиент синхронизацииClient
  • Асинхронный клиентAsyncClient
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-07-04")
public class HelloService {

  /**
   * 同步服务端
   */
  public interface Iface {
    public String sayHello(String name) throws org.apache.thrift.TException;
  }
  /**
   * 异步服务端
   */
  public interface AsyncIface {
    public void sayHello(String name, org.apache.thrift.async.AsyncMethodCallback<String> resultHandler) throws org.apache.thrift.TException;
  }
  /**
   * 同步客户端
   */
  public static class Client extends org.apache.thrift.TServiceClient implements Iface {
    public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
      public Factory() {}
      public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
        return new Client(prot);
      }
      public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
        return new Client(iprot, oprot);
      }
    }

    public Client(org.apache.thrift.protocol.TProtocol prot) {
      super(prot, prot);
    }

    public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
      super(iprot, oprot);
    }
    
    //调用sayHello接口并接收处理结果
    public String sayHello(String name) throws org.apache.thrift.TException {
      send_sayHello(name);
      return recv_sayHello();
    }

    //发送接口调用请求
    public void send_sayHello(String name) throws org.apache.thrift.TException {
      sayHello_args args = new sayHello_args();
      args.setName(name);
      sendBase("sayHello", args);
    }

    //接收接口调用结果
    public String recv_sayHello() throws org.apache.thrift.TException {
      sayHello_result result = new sayHello_result();
      receiveBase(result, "sayHello");
      if (result.isSetSuccess()) {
        return result.success;
      }
      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHello failed: unknown result");
    }
  }
    /**
     * 处理器
     */
    public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
    private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());
    //初始化processMap
    public Processor(I iface) {
      super(iface, getProcessMap(new java.util.HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
    }

    protected Processor(I iface, java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
      super(iface, getProcessMap(processMap));
    }

    private static <I extends Iface> java.util.Map<String,  org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      processMap.put("sayHello", new sayHello());
      return processMap;
    }

    public static class sayHello<I extends Iface> extends org.apache.thrift.ProcessFunction<I, sayHello_args> {
      public sayHello() {
        super("sayHello");
      }

      public sayHello_args getEmptyArgsInstance() {
        return new sayHello_args();
      }

      protected boolean isOneway() {
        return false;
      }

      @Override
      protected boolean rethrowUnhandledExceptions() {
        return false;
      }

      //调用接口(真正调用到HelloService的实现类HelloServiceImpl中sayHello())
      public sayHello_result getResult(I iface, sayHello_args args) throws org.apache.thrift.TException {
        sayHello_result result = new sayHello_result();
        result.success = iface.sayHello(args.name);
        return result;
      }
    }
  }
  ... ...
}