Анализ и исследование сложного сетевого бизнеса типа ввода-вывода — сопрограмма и отзывчивость

Java
Анализ и исследование сложного сетевого бизнеса типа ввода-вывода — сопрограмма и отзывчивость

Недавно, когда Caffe попыталась оптимизировать бизнес, было обнаружено, что многие потоки информации о сердцебиении службы находились в нерабочем состоянии.waiting,Как показано ниже:

thread count="608" 
daemon-count="420" 
peak-count="611" 
total-started-count="13722" 
deadlocked="0" new="0" runnable="169" blocked="0" 
waiting="314"

Затем посмотрите на загрузку процессора слева направо соответственно.CPU的任务等待数/CPU核数,CPU的执行时间占比总时间(CPU执行时间+CPU空闲时间+ CPU等待时间),当前JAVA进程执行时间占比总时间

Из рисунка ясно видно, что не вычислительная служба вызывает ожидание потока, а увеличение количества потоков в очереди из-за сетевого ввода-вывода, ожидающего службы для базового запроса данных, поэтому было решено рассмотреть возможность оптимизации. эта часть. Цель оптимизации состоит в том, чтобы гарантировать, что информация пульсации службы и базового хранилища находится в безопасном диапазоне, а также максимально увеличить пропускную способность службы.

идея-сопрограмма

Во время оптимизации я впервые подумал о знаменитомquasarтрехсторонняя библиотека.quasarМожет пониматься как облегченная реализация потока, знакомый с языком go должен знатьgoroutine, мы знаем, что язык Java не поддерживает сопрограммы, и многие сценарии в бизнесе необходимо оптимизировать с помощью пулов потоков, но стоимость использования пулов потоков также очень высока, как использование памяти, так и переключение между потоками ограничивают потоки приложения. нельзя создавать бесконечно.

К счастью, сообщество открыло исходный кодJava coroutineРамкаquasar, позвольте мне плюнуть, этот фреймворк действительно написан натуралами-программистами-мужчинами (потянулись писать корутины JDK, я очень надеюсь, что JDK сможет поддерживать корутины раньше), документации очень не хватает, из-за чего я начал работать над ним локально. Я ошибся, и стартовый опыт был не очень комфортным.

Конечно, преимущества также очень заметны.quasarможет значительно улучшитьCPUпропускная способность. Краткое описание заключается в том, что большее количество запросов может быть обработано за меньшее время. не потому что в треде网络IOзаблокировать и оставить нить позадиwaitingв, когда заблокированCPUОн не рабочий, поэтому пропускную способность всей системы тянут за промежность.

В документации на официальном сайте предусмотрено два способа использования, в целях экономии места первый способ используется для демонстрации способа использования:

  1. Запуск инструментального Java-агента (переплетение загрузчика)
  2. Инструментарий с опережением времени (AOT)

Здесь я используюGradleПроект используется как 🌰, чтобы подробно объяснить, как его использовать.

один,Gradleмодуль конфигурации


configurations {
    quasar
}
//
tasks.withType(JavaExec) {
    jvmArgs "-javaagent:${configurations.quasar.iterator().next()}"
}
//
dependencies {
    compile "org.antlr:antlr4:4.7.2"
    compile "co.paralleluniverse:quasar-core:0.7.5"
    quasar "co.paralleluniverse:quasar-core:0.7.5:jdk8@jar"
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

Во-вторых, добиться привычногоechoсервер

дваFiber(эквивалент JavaThread) общаться друг с другом,increasingОтправитьintцифры даютecho,echoВернуться кincreasing,increasingполученоechoВозвращаемое сообщение сначала печатается, а затем выполняется++операции, а затем распечатайте окончательный результат. Пример кода выглядит следующим образом:

  1. increasing
        final IntChannel increasingToEcho = Channels.newIntChannel(0);
        final IntChannel echoToIncreasing = Channels.newIntChannel(0);
        //
        Fiber<Integer> increasing = new Fiber<>("INCREASING", new SuspendableCallable<Integer>() {
            @Override
            public Integer run() throws SuspendExecution, InterruptedException {
                int curr = 0;
                for (int i = 0; i < 10; ++i) {
                    Fiber.sleep(10);
                    System.out.println("INCREASING sending curr = " + curr);
                    increasingToEcho.send(curr);
                    curr = echoToIncreasing.receive();
                    System.out.println("INCREASING received curr = " + curr);
                    curr++;
                    System.out.println("INCREASING now curr = " + curr);
                }
                //
                System.out.println("INCREASING closing channel and exiting");
                increasingToEcho.close();
                return curr;
            }
        }).start();
  1. echo
        Fiber<Void> echo = new Fiber<Void>("ECHO", new SuspendableRunnable() {
            @Override
            public void run() throws SuspendExecution, InterruptedException {
                Integer curr;
                while(true) {
                    Fiber.sleep(1000);
                    curr = increasingToEcho.receive();
                    System.out.println("ECHO received curr = " + curr);
                    //
                    if (curr != null) {
                        System.out.println("ECHO sending curr = " + curr);
                        echoToIncreasing.send(curr);
                    } else {
                        System.out.println("ECHO 检测到关闭channel,closing and existing");
                        echoToIncreasing.close();
                        return;
                    }
                }
            }
        }).start();
  1. бегатьincreasingа такжеincreasing
        try {
            increasing.join();
            echo.join();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

Как видите, использование такое же, как и в Java.Threadотносительно похожи,APIСемантика понятна и понятна, что снижает затраты пользователей.

3. Принцип и меры предосторожности при использовании

1. Running the Instrumentation Java Agent

Как следует из названия, путем измененияjavaagentкстати принцип такойclassloadingЭтап динамически изменяет байт-код. такие как знакомыеAspectJкаркас, ядроajc(编译器)а также织入器(weaver)Чтобы добиться модификации байт-кода без изменения бизнес-логики,ajcНа основе java-компилятора определяются некоторые аор-грамматики, а методы, соответствующие этим грамматикам, перекомпилируются. разделен на"Предварительно скомпилированный (CTW)","Период загрузки (LTW)","Время после компиляции (PTW)"3 способа плетения.

quasarизjvaagentпринадлежит"Период загрузки (LTW)"способ плетения. Таким же образом, не влияя на обычную компиляцию, добавляется обнаружение некоторого кода."метод"нужна поддержка"Пауза"Когда функция выполняется, перекомпилируйте, чтобы получить метод приостановки для сохранения контекста, и возобновите выполнение следующего кода после завершения блокировки.

Например, метод должен запрашивать сеть во время работы.В это время метод заблокирован и должен дождаться возврата сетевого запроса, прежде чем выполнять следующий код. Затем, когда метод заблокирован, его нужно передать в управление сопрограмме, а контекст, в котором метод работает в это время, нужно сохранить. Когда сетевой запрос завершен, в контексте метода выполнения. Общий обзор того, как работают сопрограммы, примерно такой же.

Но реальный сценарий был бы намного сложнее. В реальном блоке кода может быть несколько блоков заблокированного кода, поэтому один из них должен быть вверху.调度中心, только при выполнении внутреннего блока кода блокировки будет выполнен внешний блок кода, иначе в конце он будет перепутан.

Это очень похоже на javaForkJoinPool, большая задача может разветвляться на множество подзадач, только когда子任务После того, как все выполнение будет завершено, оно будет выполнено父任务.quasarТо же самое верно Во время выполнения процесса методы, которые необходимо приостановить, и блоки кода в методах передаются в центр планирования, а центр планирования сохраняет данные между задачами.父子兄弟отношение, а затем выполнить код в соответствии с отношением иерархии задач.

1.1 quasar"ткать"условия

quasar будет плести методы, которые удовлетворяют следующим условиям:

  1. метод с@Suspendableаннотация
  2. метод выдал исключениеSuspendExecution
  3. в пути к классам/META-INF/suspendables,/META-INF/suspendable-supersОпределены некоторые классы или интерфейсы,quasarБудут проанализированы методы этих классов или интерфейсов, и будет выполнен любой из вышеперечисленных методов."ткать"
  4. Метод, вызываемый отражением внутри метода, также будет выполняться до и после"ткать"
  5. MethodHandle.invokeДинамически вызываемые методы выполняются до и после"ткать"
  6. До и после блока кода, выполняемого динамическим прокси JDK"ткать"
  7. Лямбда-выражения Java 8 до и после вызова"ткать"

Мы также можем начать сquasarСм. зависимости в официальной документации веб-сайта, включая ASM — среду обработки и анализа байт-кода Java, разработанную командой ASM, поэтому я хочу знать больше.ткатьподробности, вы можете узнатьASMРамка.CaffeКогда у меня будет время, я также опубликую отдельную статью о научно-популярных вещах, потому что эта часть вещей более предвзята.虚拟机Нижний слой.

1.2 Как превратить бизнес квазара в реальное использование?

Если в бизнесе много сервисов блокировки в одном методе, то поместите эти сервисы блокировки в разныеFiberВыполнить, вы можете увидеть вышеechoа такжеincreasingОн принадлежит двум службам блокировки, которые одновременно взаимно зависят друг от друга, и логическая зависимость передается черезChannelsрешить.

1.3 Проблемы совместимости

либо черезjavaagentвсе ещеAOTПлетение по способу (предварительно скомпилированное плетение) по сути вставляет определенные инструкции до и после байт-кода. Но это может привести к некоторым проблемам с совместимостью, например, многие крупные производителиpt-tracerЭта технология окраски используется для раскрашивания потока Java и выполнения мониторинга вызовов всей ссылки или выделения трафика измерения давления. такcaffeПодумав об этом, я отказался от его использованияquasarЭтот великолепный фреймворк сопрограмм опасается, что такой способ плетения будет несовместим с потоками.染色.

Но я постараюсь решить это позже, в конце концовquasarСпектакль заставляет людей пускать слюни, когда они его видят.

Идея 2 Реактивное программирование

С этим должен быть знаком каждый, самый известный из них — номерReactiveX/RxJava, который вandroidявляется наиболее широко используемым,caffeиспользовать здесьRxJava3Привести пример.

1. Введение в RxJava

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

Перевод представляет собой адаптивную структуру, в которой используется асинхронная реализация, управляемая событиями.事件驱动Я верю, что это было написано前端Знакомо, и это то, чем хвастается сообщество node.js高并发.RxJavaНижняя утилизация发布订阅模式(аналогично базовому режиму node.js) и поддерживает线程切换для поддержки более высокой параллелизма в течение ограниченного времени.

1. Введение в понятия, связанные с RxJava

так как это发布订阅режим, три основных элемента发布者,订阅者,事件类型.

1.1 Типы событий

Существует три основных типа событий:

  1. Next, издатель может опубликовать несколькоNextСобытия, подписчики также могут подписаться на несколькоNextмероприятие;
  2. Complete, абонент получаетCompleteСобытие больше не подписано на событие издателя;
  3. Error, издатель публикуетErrorПосле события событие больше не публикуется. Подписчик принимаетErrorСобытия также не продолжают подписываться на события.
1.2 Публикация, подписка и события в эхо-сервисе

increasingВыступать в роли издателя, отправляяechoНажмите сообщение числового типа.echoСлужба печатает сообщение после его получения.

increasing

        // 发布者发送事件
        Observable increasing = Observable.create((emitter) -> {
            Thread.sleep(new Random().nextInt(1000));
            emitter.onNext(0);
            Thread.sleep(new Random().nextInt(1000));
            emitter.onNext(1);
            Thread.sleep(new Random().nextInt(1000));
            emitter.onNext(2);
            Thread.sleep(new Random().nextInt(1000));
            emitter.onComplete();
        });

echo

          // 创建订阅者
          Observer<Integer> echo = new Observer<Integer>() {
            private Disposable disposable;
            //
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("观察者开始订阅");
                disposable = d;
            }
            //
            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println("观察者接受到消息: " + integer);
            }
            // 
            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("观察者接收到报错: " + e.getMessage());
            }
            //
            @Override
            public void onComplete() {
                System.out.println("观察者订阅完成,不再继续订阅消息");
            }
        };

echoподпискаincreasing

increasing.subscribe(echo);

Вы можете увидеть окончательный результат выполнения:

观察者开始订阅
观察者接受到消息: 0
观察者接受到消息: 1
观察者接受到消息: 2
观察者订阅完成,不再继续订阅消息

Итак, в RxJava,Observableиграть в издательство,Observerподписчик игры,ObservableOnSubscribe.subscribeспособ завершения事件выпуск. Связь между публикацией и подпиской осуществляется черезObservable.subscribeзавершить.

1.3 Издатели и подписчики线程切换

В приведенном выше примере кода весь код издателя и подписчика выполняется в основном потоке. Однако в реальном бизнес-сценарии бизнес издателя и бизнес подписчика должны выполняться в разных потоках, чтобы сократить время, отнимаемое бизнесом.

线程切换Код выглядит следующим образом:

increasing:

            CountDownLatch latch = new CountDownLatch(3);
            // 发布者发送事件
            ObservableOnSubscribe<Integer> onSubscribe = new ObservableOnSubscribe<Integer>() {
            //
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                System.out.println("发布者开始发布事件-Thread.currentThread().getName() = " + Thread.currentThread().getName());
                Thread.sleep(new Random().nextInt(1000));
                emitter.onNext(0);
                Thread.sleep(new Random().nextInt(1000));
                emitter.onNext(1);
                Thread.sleep(new Random().nextInt(1000));
                emitter.onNext(2);
                emitter.onComplete();
            }
          };
          Observable<Integer> increasing = Observable.create(onSubscribe);

echo:

            // 订阅者接受事件
            Observer<Integer> echo = new Observer<Integer>() {
            private Disposable disposable;
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("订阅者开始订阅事件-" + Thread.currentThread().getName());
                disposable = d;
            }
            // 
            @Override
            public void onNext(@NonNull Integer integer) {
                latch.countDown();
                System.out.println("订阅者接收到事件-" + Thread.currentThread().getName() + "   " + integer);
            }
            //
            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("订阅者接收到报错,停止接受订阅事件-" + Thread.currentThread().getName());
            }
            //
            @Override
            public void onComplete() {
                System.out.println("订阅者接收到complete事件,停止接受订阅事件-" + Thread.currentThread().getName());
            }
        };

线程切换:

        // 订阅者和发布者切换线程订阅
        increasing
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.trampoline())
                .subscribe(echo);
        //
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

Конечный результат:

订阅者开始订阅事件-main
发布者开始发布事件-Thread.currentThread().getName() = RxCachedThreadScheduler-1
订阅者接收到事件-RxCachedThreadScheduler-1   0
订阅者接收到事件-RxCachedThreadScheduler-1   1
订阅者接收到事件-RxCachedThreadScheduler-1   2
订阅者接收到complete事件,停止接受订阅事件-RxCachedThreadScheduler-1

Как видите, издатель и подписчик выполняются в разных потоках. вObservable.subscribeOn(@NonNull Scheduler scheduler)это планировщик, определяющий выполнение метода публикатора,Observable。observeOn(@NonNull Scheduler scheduler)Планировщик с определенными методами подписчика.

а также调度器Есть много категорий, таких какIoScheduler,NewThreadScheduler,SingleScheduler,ComputationSchedulerд., необходимо сделать разумный выбор в соответствии с различными бизнес-сценариями.Scheduler.

Поэтому, если вам нужно более глубокое понимание RxJava, вам нужно перейти наSchedulerКонкретная реализация caffe будет подробно проанализирована в статье после того, как caffe будет приготовлено.

CaffeТрудно писать, пожалуйста, поставьте лайк и подпишитесь на сообщение, большое спасибо за вашу поддержку и поддержку. Увидимся на следующей неделе!