Подробное объяснение Apache Thrift Series (2) — Модель сетевых служб

Apache Thrift

предисловие

Thriftкоторый предоставилмодель веб-сервиса:один поток,Многопоточность,управляемый событиями, который делится на:блокирующая сервисная модель,Неблокирующая модель обслуживания.

  • Блокирующая сервисная модель:TSimpleServer,TThreadPoolServer.

  • Неблокирующая модель обслуживания:TNonblockingServer,THsHaServerиTThreadedSelectorServer.

TServerИерархия классов:


текст

TServer

TServerстатический внутренний класс определенArgs,ArgsНаследовать от абстрактного классаAbstractServerArgs.AbstractServerArgsИспользуя модель строителя,TServerДоступны различные фабрики:

Заводская собственность заводской тип эффект
ProcessorFactory TProcessorFactory Класс фабрики слоя обработки для создания конкретных объектов TProcessor
InputTransportFactory TTransportFactory Класс фабрики ввода транспортного уровня для создания конкретных объектов TTransport
OutputTransportFactory TTransportFactory Выходной класс фабрики транспортного уровня для создания конкретных объектов TTransport.
InputProtocolFactory TProtocolFactory Класс фабрики ввода уровня протокола для создания конкретных объектов TProtocol
OutputProtocolFactory TProtocolFactory Класс фабрики вывода уровня протокола для создания конкретных объектов TProtocol

НижеTServerЧасть основного кода:

public abstract class TServer {
    public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
        public Args(TServerTransport transport) {
            super(transport);
        }
    }

    public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
        final TServerTransport serverTransport;
        TProcessorFactory processorFactory;
        TTransportFactory inputTransportFactory = new TTransportFactory();
        TTransportFactory outputTransportFactory = new TTransportFactory();
        TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
        TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();

        public AbstractServerArgs(TServerTransport transport) {
            serverTransport = transport;
        }
    }

    protected TProcessorFactory processorFactory_;
    protected TServerTransport serverTransport_;
    protected TTransportFactory inputTransportFactory_;
    protected TTransportFactory outputTransportFactory_;
    protected TProtocolFactory inputProtocolFactory_;
    protected TProtocolFactory outputProtocolFactory_;
    private boolean isServing;

    protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
        processorFactory_ = args.processorFactory;
        serverTransport_ = args.serverTransport;
        inputTransportFactory_ = args.inputTransportFactory;
        outputTransportFactory_ = args.outputTransportFactory;
        inputProtocolFactory_ = args.inputProtocolFactory;
        outputProtocolFactory_ = args.outputProtocolFactory;
    }

    public abstract void serve();
    public void stop() {}

    public boolean isServing() {
        return isServing;
    }

    protected void setServing(boolean serving) {
        isServing = serving;
    }
}

TServerтри метода:serve(),stop()иisServing().serve()для запуска сервиса,stop()за отключение службы,isServing()Используется для определения состояния запуска и остановки службы.

TServerизразные классы реализациизапускаются по-разному, поэтомуserve()Определяется как абстрактный метод. Не для всех служб требуется изящный выход, поэтомуstop()Метод не определен как абстрактный.


TSimpleServer

TSimpleServerизРабочий режимс помощью самого простогоблокироватьIO, метод реализации лаконичен и прост для понимания, но одновременно может быть получен и обработан только одинsocketсоединение, эффективность относительно низкая. В основном используется для демонстрацииThriftОн редко используется в реальном процессе разработки.

(1) Рабочий процесс

(2) Начало работы

Сервер:

    ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
    TServerSocket serverTransport = new TServerSocket(serverSocket);
    HelloWorldService.Processor processor =
            new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();

    TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
    tArgs.processor(processor);
    tArgs.protocolFactory(protocolFactory);
    // 简单的单线程服务模型 一般用于测试
    TServer tServer = new TSimpleServer(tArgs);
    System.out.println("Running Simple Server");
    tServer.serve();

Клиент:

    TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("Leo");
    System.out.println("Result =: " + result);
    transport.close();

(3) Анализ исходного кода

Глядя на исходный код вышеуказанного процесса, т.е.TSimpleServer.javaсерединаserve()Методы, как показано ниже:

serve()Действие метода:

  1. настраиватьTServerSocketизlisten()способ инициировать соединениемонитор.
  2. отблокироватьспособ принять запрос на подключение от клиента, каждый раз, когдасоединятьто есть создать для него каналTTransportобъект.
  3. Создать для клиентаобъект обработчика,объект входного транспортного канала,объект выходного транспортного канала,объект протокола вводаиобъект выходного протокола.
  4. пройти черезTServerEventHandlerОбъекты обрабатывают определенные бизнес-запросы.

ThreadPoolServer

TThreadPoolServerрежим принятблокироватьsocketспособ работы, основной поток отвечаетблокировкаСледить за новымиsocketПриходите, конкретная бизнес-обработка будет переданаПул потоковобрабатывать.

(1) Рабочий процесс

(2) Начало работы

Сервер:

    ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
    TServerSocket serverTransport = new TServerSocket(serverSocket);
    HelloWorldService.Processor<HelloWorldService.Iface> processor =
            new HelloWorldService.Processor<>(new HelloWorldServiceImpl());

    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
    TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
    ttpsArgs.processor(processor);
    ttpsArgs.protocolFactory(protocolFactory);

    // 线程池服务模型 使用标准的阻塞式IO 预先创建一组线程处理请求
    TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
    System.out.println("Running ThreadPool Server");
    ttpsServer.serve();

Клиент:

    TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);

    transport.open();
    String result = client.say("ThreadPoolClient");
    System.out.println("Result =: " + result);
    transport.close();

(3) Анализ исходного кода

ThreadPoolServerрешеноTSimpleServerне поддерживаетсяпараллелизминесколько соединенийпроблема, представленнаяПул потоков. Реализованная модельOne Thread Per Connection. Чтобы просмотреть исходный код вышеуказанного процесса, сначала просмотритеПул потоковфрагмент кода:

TThreadPoolServer.javaсерединаserve()Методы, как показано ниже:

serve()Действие метода:

  1. настраиватьTServerSocketизlisten()способ инициировать соединениемонитор.
  2. отблокироватьспособ принятьклиентиззапрос на подключение, каждый вход всоединять,будетобъект каналаупакован вWorkerProcessобъект (WorkerProcessДостигнутоRunnabelинтерфейс) и отправитьПул потоков.
  3. WorkerProcessизrun()ответственный методобработка бизнеса, созданный для клиентаобъект обработчика,объект входного транспортного канала,объект выходного транспортного канала,объект протокола вводаиобъект выходного протокола.
  4. пройти черезTServerEventHandlerОбъекты обрабатывают определенные бизнес-запросы.

WorkerProcessизrun()метод:

(4) Преимущества и недостатки

Преимущества режима TThreadPoolServer

расколотьнить прослушивания(Accept Thread) и обработкаПодключение клиентаизрабочий поток(Worker Thread),чтение данныхиобработка бизнесасдаватьПул потоковиметь дело с. Таким образом, вБольшое количество параллелизмаНовые подключения также могут быть приняты своевременно.

режим пула потоковболее подходящийСервис-терминалМожет предсказать, сколькоКлиентский параллелизмВ этом случае каждый запрос может быть вовремя обработан пулом бизнес-потоков, и производительность также очень высока.

Недостатки режима TThreadPoolServer

Вычислительная мощность режима пула потоков ограниченаПул потоковспособность работать, когдаодновременные запросыбольше, чем в пуле потоковПотоки, новые запросы могут толькождать в очереди.


TNonblockingServer

TNonblockingServerузор тожеоднопоточная работа, но используяNIOрежиме, с помощьюChannel/Selectorмеханизм, использующийIOмодель событийобрабатывать.

всеsocketзарегистрированы наselector, внитьпрошедшийseletorМониторинг циклавсеsocket.

КаждыйselectorВ конце цикла обработайте всесостояние готовностиизsocket, для входящих данныхsocketпровестичтение данныхоперация, для сокетов с данными для отправкиотправка данныхоперация, для прослушиванияsocketсоздать новый бизнесsocketи поставь эторегистрприбытьselectorначальство.

Примечание: TNonblockingServer требует, чтобы базовый транспортный канал использовал TFramedTransport.

(1) Рабочий процесс

(2) Начало работы

Сервер:

    TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);

    TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
    tnbArgs.processor(tprocessor);
    tnbArgs.transportFactory(new TFramedTransport.Factory());
    tnbArgs.protocolFactory(new TCompactProtocol.Factory());

    // 使用非阻塞式IO服务端和客户端需要指定TFramedTransport数据传输的方式
    TServer server = new TNonblockingServer(tnbArgs);
    System.out.println("Running Non-blocking Server");
    server.serve();

Клиент:

    TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    // 协议要和服务端一致
    TProtocol protocol = new TCompactProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("NonBlockingClient");
    System.out.println("Result =: " + result);
    transport.close();

(3) Анализ исходного кода

TNonblockingServerунаследовано отAbstractNonblockingServer, здесь нас больше интересуетNIOизselectorчасть кода ключа.

(4) Преимущества и недостатки

Преимущества режима TNonblockingServer

по сравнению сTSimpleServerПовышение эффективности в основном отражается наIOмультиплекс на,TNonblockingServerиспользоватьнеблокирующийIO,правильноaccept/read/writeЖдатьIOсобытиемониторииметь дело с, мониторинг несколькихsocketизменение статуса.

Недостатки режима TNonblockingServer

TNonblockingServerрежим вобработка бизнесавсе еще используюпоследовательность одного потокаЧто нужно сделать. Сравнение бизнес-процессовсложный,кропотливыйКогда, например, некоторым функциям интерфейса необходимо читать базу данных и выполняться в течение длительного времени, что вызоветвесь сервисодеялоблокироватьв прямом эфире, в это время режимНе очень эффективно,так какНесколько задач запроса вызовавсе ещеприказВыполнять по одному.

THsHaServer

с учетомTNonblockingServerнедостатки,THsHaServerунаследовано отTNonblockingServer, представилПул потоковУлучшенная обработка задачВозможность параллелизма.THsHaServerдаполусинхронный полуасинхронный(Half-Sync/Half-Async) режим обработки,Half-Aysncиспользуется дляIOобработка событий(Accept/Read/Write),Half-Syncдля бизнесаhandlerправильноrpcизСинхронная обработканачальство.

Примечание. THsHaServer, как и TNonblockingServer, требует, чтобы базовый транспортный канал использовал TFramedTransport.

(1) Рабочий процесс

(2) Начало работы

Сервер:

    TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    // 半同步半异步
    THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
    thhsArgs.processor(tprocessor);
    thhsArgs.transportFactory(new TFramedTransport.Factory());
    thhsArgs.protocolFactory(new TBinaryProtocol.Factory());

    TServer server = new THsHaServer(thhsArgs);
    System.out.println("Running HsHa Server");
    server.serve();

Клиент:

    TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    // 协议要和服务端一致
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("HsHaClient");
    System.out.println("Result =: " + result);
    transport.close();

(3) Анализ исходного кода

THsHaServerунаследовано отTNonblockingServer, недавно добавленныйПул потоковДля функции параллельной обработки рабочих задач см. соответствующий код пула потоков:

пул потоков задачПроцесс создания:

Следующий TThreadedSelectorServer включает в себя большинство функций THsHaServer.Для анализа исходного кода обратитесь к TThreadedSelectorServer.

(4) Преимущества и недостатки

Преимущества THsHaServer

THsHaServerиTNonblockingServerрежим по сравнению сTHsHaServerпо окончаниичтение данныхПосле этогообработка бизнесапроцесс с помощьюПул потоковЧтобы сделать,основной потоквернуться прямоследующий циклЭксплуатация, эффективность значительно улучшена.

Недостатки THsHaServer

основной потокеще нужно все закончитьsocketизконтролировать прием,чтение данныхизапись данныхработать. когдаодновременные запросыбольше, и отправитьОбъем данныхбольше, мониторsocketначальствоновый запрос на подключениене может быть принято вовремя.


TThreadedSelectorServer

TThreadedSelectorServerправдаTHsHaServerрасширениеselectorсерединапрочти и напишиIOсобытие(read/write)отосновной потокотделились. импортировать одновременноworkerпул рабочих потоков, это тоже видHalf-Sync/Half-Asyncсервисная модель.

TThreadedSelectorServerрежим в настоящее времяThriftсамый продвинутыймодель обслуживания потоков, внутри него есть несколько частей:

  1. ОдинAcceptThreadОбъект потока, специально используемый для обработки прослушиванияsocketновое подключение на .
  2. несколькоSelectorThreadОбъекты специализированы для ведения бизнесаsocketизИнтернетI/Oпрочти и напиширабота, все сетевые данныепрочти и напишиВсе делается этими нитями.
  3. Одинбалансировщик нагрузкиSelectorThreadLoadBalancerобъект, в основном используемый дляAcceptThreadнитьполучил новыйsocketКогда делается запрос на соединение, принимается решение, что этоновое соединениекоторому назначен запросSelectorThreadнить.
  4. ОдинExecutorServiceТиппул рабочих потоков,существуетSelectorThreadВ потоке обнаружен бизнесsocketЕсли есть запрос на звонок, он будетзапросить чтение данныхПосле этого дайтеExecutorServiceПул потоковПоток in завершает конкретное выполнение этого вызова. В основном используется для обработки каждогоrpcпросилhandlerобработка обратного вызова(Эта частьСинхронный).

(1) Рабочий процесс

(2) Начало работы

Сервер:

    TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    // 多线程半同步半异步
    TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
    ttssArgs.processor(processor);
    ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
    // 使用非阻塞式IO时 服务端和客户端都需要指定数据传输方式为TFramedTransport
    ttssArgs.transportFactory(new TFramedTransport.Factory());

    // 多线程半同步半异步的服务模型
    TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
    System.out.println("Running ThreadedSelector Server");
    server.serve();

Клиент:

for (int i = 0; i < 10; i++) {
    new Thread("Thread " + i) {
        @Override
        public void run() {
            // 设置传输通道 对于非阻塞服务 需要使用TFramedTransport(用于将数据分块发送)
            for (int j = 0; j < 10; j++) {
                TTransport transport = null;
                try {
                    transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
                    TProtocol protocol = new TBinaryProtocol(transport);
                    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
                    transport.open();
                    String result = client.say("ThreadedSelector Client");
                    System.out.println("Result =: " + result);
                    transport.close();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 关闭传输通道
                    transport.close();
                }
            }
        }
    }.start();
}

(3) Основной код

Три компонента описанного выше рабочего процессаAcceptThread,SelectorThreadиExecutorServiceОпределение в исходном коде выглядит следующим образом:

TThreadedSelectorServerВ выкройке есть специальная нитьAcceptThreadдля обработкиновый запрос на подключение, так что он может ответить вовремяМного одновременных запросов на подключение; дополнительно будетСетевые операции ввода/выводараспространяться на несколькоSelectorThreadнитьзавершается в середине, поэтому можно быстроИнтернетI/Oпровестиоперации чтения и записи, может хорошо справиться сИнтернетI/Oбольше случаев.

TThreadedSelectorServerПараметры по умолчанию определены следующим образом:

  • Количество потоков селектора по умолчанию, отвечающих за чтение и запись сетевых операций ввода-вывода (selectorThreads): 2.
  • Количество рабочих потоков по умолчанию, отвечающих за бизнес-процессы (workerThreads): 5
  • Размер очереди задач одного потока в пуле рабочих потоков (acceptQueueSizePerThread): 4

Создать, инициализировать и запуститьAcceptThreadиSelectorThreads, начать одновременноselectorрезьбовойбалансировщик нагрузки(selectorThreads).

Исходный код AcceptThread

AcceptThreadунаследовано отThread, видно, что он содержит три важных свойства:неблокирующий канал передачи(TNonblockingServerTransport),NIOСелектор(acceptSelectorбалансировщик нагрузки потока селектора(threadChooser).

ПроверятьAcceptThreadизrun()метод, видно, чтоacceptКак только поток запустится, он будет продолжать вызыватьselect()метод:

Проверятьselect()метод,acceptSelectorСелекторждатьIOПрибытие события, получитьSelectionKeyТо есть проверить,acceptсобытие. Если да, проходитеhandleAccept()метод получаетновое соединение; иначе, если даIOчтение и запись событий,AcceptThreadНикаких действий предпринято не будет, оно будет переданоSelectorThreadЗаканчивать.

существуетhandleAccept()метод, первый проходdoAccept()получитьканал связи,ПотомSelectorбалансировщик нагрузки потоковВыбери одинSelectorнить, завершить следующуюIOчтение и запись событий.

Продолжить, чтобы увидеть следующееdoAddAccept()Реализация метода, нет ожидания, далее вызываетSelectorThreadизaddAcceptedConnection()метод, положитьобъект неблокирующего транспортного каналаПерейти кселекторная нитьделать дальшеIOоперации чтения и записи.

Исходный код SelectorThreadLoadBalancer

SelectorThreadLoadBalancerКак создать?

SelectorThreadLoadBalancerоснован наалгоритм опросаизSelectorселектор нити,пройти черезитератор потокадля новичковсоединятьпоследовательное присвоениеSelectorThread.

Исходный код SelectorThread

SelectorThreadиAcceptThreadто же самое, даTThreadedSelectorServerодин извнутренний класс-член, каждыйSelectorThreadобъект потокаесть один внутриочередь блокировки, используется для хранения потокаполучилаизканал связи.

очередь блокировкиРазмер может быть указан конструктором:

видно выше, вAcceptThreadизdoAddAccept()метод называетсяSelectorThreadизaddAcceptedConnection()метод.

Этот метод делает две вещи:

  1. будет этоSelectorThreadполучено потокомканал связиположить вочередь блокировкисередина.
  2. пройти черезwakeup()метод пробужденияSelectorThreadсерединаNIOСелекторselector.

теперь, когдаSelectorThreadтакже унаследовано отThread, см. егоrun()Реализация метода:

SelectorThreadметодselect()мониторIOсобытия, только для обработкичтение данныхизапись данных. Если соединение имеетданные доступны для чтения, прочитайте и начните сframeпуть к кэшу, при необходимости к соединениюввод данных, кэшировать и отправлять данные клиента. И вчтение и запись данныхПосле обработки необходимоNIOизselectorпустойивыйтисвояSelectionKey.

  • операция записи данныхПосле завершения весьrpcПроцесс вызова завершен.handleWrite()Методы, как показано ниже:

  • операция чтения данныхПосле завершения,Thriftбуду использоватьчитать данныевоплощать в жизньцелевой метод,handleRead()Методы, как показано ниже:

handleReadметод выполняетсяread()метод, будетчтение данныхКогда это будет сделано, он вызоветrequestInvoke()вызов методацелевой методПолная конкретная бизнес-обработка.requestInvoke()метод будетзапросить данныеупакован какRunnableобъект, представленныйПул рабочих потоков(ExecutorService) для обработки.

select()После завершения метода поток продолжает работатьprocessAcceptedConnections()метод обработкиследующее соединениеизIOсобытие.

Вот несколько основных операций:

  1. попробовать изSelectorThreadизочередь блокировкиacceptedQueueполучить одинподключенный канал передачи. Если приобретение прошло успешно, звонитеregisterAccepted()метод; в противном случае введите следующий цикл.
  2. registerAccepted()метод будетканал передачибазасоединятьзарегистрироваться наNIOизСелекторselectorвыше, получитьSelectionKey.
  3. СоздаватьFrameBufferобъект и привязать к полученномуSelectionKeyВыше, используется в середине передачи данныхчтение и запись кеша.

Суммировать

Эта статья оThriftвсе видымодель обслуживания потоковвведено, в том числе 2блокирующая сервисная модель:TSimpleServer,TThreadPoolServer,3 типаНеблокирующая модель обслуживания:TNonblockingServer,THsHaServerиTThreadedSelectorServer. для различных моделей обслуживанияконкретное использование,процесс работы,Принцип и реализация исходного кодаПроведен определенный анализ.

Ввиду длины статьи, пожалуйста, просматривайте ее медленно!

Ссылки по теме

  1. Подробное объяснение серии Apache Thrift (1) — обзор и начало работы

  2. Подробное объяснение Apache Thrift Series (2) — Модель сетевых служб

  3. Подробное объяснение серии Apache Thrift (3) — механизм сериализации


Добро пожаловать в технический публичный аккаунт: Zero One Technology Stack

零壹技术栈

Эта учетная запись будет продолжать делиться сухими товарами серверных технологий, включая основы виртуальных машин, многопоточное программирование, высокопроизводительные фреймворки, асинхронное ПО, промежуточное ПО для кэширования и обмена сообщениями, распределенные и микросервисы, материалы для обучения архитектуре и расширенные учебные материалы и статьи.