❝Недавно, когда Caffe попыталась оптимизировать бизнес, было обнаружено, что многие потоки информации о сердцебиении службы находились в нерабочем состоянии.
❞waiting
,Как показано ниже:
thread count="608"
daemon-count="420"
peak-count="611"
total-started-count="13722"
deadlocked="0" new="0" runnable="169" blocked="0"
waiting="314"
❝Затем посмотрите на загрузку процессора слева направо соответственно.
❞CPU的任务等待数/CPU核数
,CPU的执行时间占比总时间(CPU执行时间+CPU空闲时间+ CPU等待时间)
,当前JAVA进程执行时间占比总时间
❝Из рисунка ясно видно, что не вычислительная служба вызывает ожидание потока, а увеличение количества потоков в очереди из-за сетевого ввода-вывода, ожидающего службы для базового запроса данных, поэтому было решено рассмотреть возможность оптимизации. эта часть. Цель оптимизации состоит в том, чтобы гарантировать, что информация пульсации службы и базового хранилища находится в безопасном диапазоне, а также максимально увеличить пропускную способность службы.
❞
идея-сопрограмма
Во время оптимизации я впервые подумал о знаменитомquasar
трехсторонняя библиотека.quasar
Может пониматься как облегченная реализация потока, знакомый с языком go должен знатьgoroutine
, мы знаем, что язык Java не поддерживает сопрограммы, и многие сценарии в бизнесе необходимо оптимизировать с помощью пулов потоков, но стоимость использования пулов потоков также очень высока, как использование памяти, так и переключение между потоками ограничивают потоки приложения. нельзя создавать бесконечно.
К счастью, сообщество открыло исходный кодJava coroutine
Рамкаquasar
, позвольте мне плюнуть, этот фреймворк действительно написан натуралами-программистами-мужчинами (потянулись писать корутины JDK, я очень надеюсь, что JDK сможет поддерживать корутины раньше), документации очень не хватает, из-за чего я начал работать над ним локально. Я ошибся, и стартовый опыт был не очень комфортным.
Конечно, преимущества также очень заметны.quasar
может значительно улучшитьCPU
пропускная способность. Краткое описание заключается в том, что большее количество запросов может быть обработано за меньшее время. не потому что в треде网络IO
заблокировать и оставить нить позадиwaiting
в, когда заблокированCPU
Он не рабочий, поэтому пропускную способность всей системы тянут за промежность.
В документации на официальном сайте предусмотрено два способа использования, в целях экономии места первый способ используется для демонстрации способа использования:
- Запуск инструментального Java-агента (переплетение загрузчика)
- Инструментарий с опережением времени (AOT)
Здесь я используюGradle
Проект используется как 🌰, чтобы подробно объяснить, как его использовать.
один,Gradle
модуль конфигурации
configurations { quasar } // tasks.withType(JavaExec) { jvmArgs "-javaagent:${configurations.quasar.iterator().next()}" } // dependencies { compile "org.antlr:antlr4:4.7.2" compile "co.paralleluniverse:quasar-core:0.7.5" quasar "co.paralleluniverse:quasar-core:0.7.5:jdk8@jar" testCompile group: 'junit', name: 'junit', version: '4.12' }
Во-вторых, добиться привычногоecho
сервер
дваFiber
(эквивалент JavaThread
) общаться друг с другом,increasing
Отправитьint
цифры даютecho
,echo
Вернуться кincreasing
,increasing
полученоecho
Возвращаемое сообщение сначала печатается, а затем выполняется++
операции, а затем распечатайте окончательный результат. Пример кода выглядит следующим образом:
increasing
final IntChannel increasingToEcho = Channels.newIntChannel(0);
final IntChannel echoToIncreasing = Channels.newIntChannel(0);
//
Fiber<Integer> increasing = new Fiber<>("INCREASING", new SuspendableCallable<Integer>() {
@Override
public Integer run() throws SuspendExecution, InterruptedException {
int curr = 0;
for (int i = 0; i < 10; ++i) {
Fiber.sleep(10);
System.out.println("INCREASING sending curr = " + curr);
increasingToEcho.send(curr);
curr = echoToIncreasing.receive();
System.out.println("INCREASING received curr = " + curr);
curr++;
System.out.println("INCREASING now curr = " + curr);
}
//
System.out.println("INCREASING closing channel and exiting");
increasingToEcho.close();
return curr;
}
}).start();
echo
Fiber<Void> echo = new Fiber<Void>("ECHO", new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
Integer curr;
while(true) {
Fiber.sleep(1000);
curr = increasingToEcho.receive();
System.out.println("ECHO received curr = " + curr);
//
if (curr != null) {
System.out.println("ECHO sending curr = " + curr);
echoToIncreasing.send(curr);
} else {
System.out.println("ECHO 检测到关闭channel,closing and existing");
echoToIncreasing.close();
return;
}
}
}
}).start();
- бегать
increasing
а такжеincreasing
try {
increasing.join();
echo.join();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
Как видите, использование такое же, как и в Java.Thread
относительно похожи,API
Семантика понятна и понятна, что снижает затраты пользователей.
3. Принцип и меры предосторожности при использовании
1. Running the Instrumentation Java Agent
Как следует из названия, путем измененияjavaagent
кстати принцип такойclassloading
Этап динамически изменяет байт-код. такие как знакомыеAspectJ
каркас, ядроajc(编译器)
а также织入器(weaver)
Чтобы добиться модификации байт-кода без изменения бизнес-логики,ajc
На основе java-компилятора определяются некоторые аор-грамматики, а методы, соответствующие этим грамматикам, перекомпилируются. разделен на"Предварительно скомпилированный (CTW)","Период загрузки (LTW)","Время после компиляции (PTW)"3 способа плетения.
quasar
изjvaagent
принадлежит"Период загрузки (LTW)"способ плетения. Таким же образом, не влияя на обычную компиляцию, добавляется обнаружение некоторого кода."метод"нужна поддержка"Пауза"Когда функция выполняется, перекомпилируйте, чтобы получить метод приостановки для сохранения контекста, и возобновите выполнение следующего кода после завершения блокировки.
Например, метод должен запрашивать сеть во время работы.В это время метод заблокирован и должен дождаться возврата сетевого запроса, прежде чем выполнять следующий код. Затем, когда метод заблокирован, его нужно передать в управление сопрограмме, а контекст, в котором метод работает в это время, нужно сохранить. Когда сетевой запрос завершен, в контексте метода выполнения. Общий обзор того, как работают сопрограммы, примерно такой же.
Но реальный сценарий был бы намного сложнее. В реальном блоке кода может быть несколько блоков заблокированного кода, поэтому один из них должен быть вверху.调度中心
, только при выполнении внутреннего блока кода блокировки будет выполнен внешний блок кода, иначе в конце он будет перепутан.
Это очень похоже на javaForkJoinPool
, большая задача может разветвляться на множество подзадач, только когда子任务
После того, как все выполнение будет завершено, оно будет выполнено父任务
.quasar
То же самое верно Во время выполнения процесса методы, которые необходимо приостановить, и блоки кода в методах передаются в центр планирования, а центр планирования сохраняет данные между задачами.父子兄弟
отношение, а затем выполнить код в соответствии с отношением иерархии задач.
1.1 quasar"ткать"условия
quasar будет плести методы, которые удовлетворяют следующим условиям:
- метод с
@Suspendable
аннотация - метод выдал исключение
SuspendExecution
- в пути к классам
/META-INF/suspendables
,/META-INF/suspendable-supers
Определены некоторые классы или интерфейсы,quasar
Будут проанализированы методы этих классов или интерфейсов, и будет выполнен любой из вышеперечисленных методов."ткать" - Метод, вызываемый отражением внутри метода, также будет выполняться до и после"ткать"
-
MethodHandle.invoke
Динамически вызываемые методы выполняются до и после"ткать" - До и после блока кода, выполняемого динамическим прокси JDK"ткать"
- Лямбда-выражения Java 8 до и после вызова"ткать"
Мы также можем начать сquasar
См. зависимости в официальной документации веб-сайта, включая ASM — среду обработки и анализа байт-кода Java, разработанную командой ASM, поэтому я хочу знать больше.ткатьподробности, вы можете узнатьASM
Рамка.Caffe
Когда у меня будет время, я также опубликую отдельную статью о научно-популярных вещах, потому что эта часть вещей более предвзята.虚拟机
Нижний слой.
1.2 Как превратить бизнес квазара в реальное использование?
Если в бизнесе много сервисов блокировки в одном методе, то поместите эти сервисы блокировки в разныеFiber
Выполнить, вы можете увидеть вышеecho
а такжеincreasing
Он принадлежит двум службам блокировки, которые одновременно взаимно зависят друг от друга, и логическая зависимость передается черезChannels
решить.
1.3 Проблемы совместимости
либо черезjavaagent
все ещеAOT
Плетение по способу (предварительно скомпилированное плетение) по сути вставляет определенные инструкции до и после байт-кода. Но это может привести к некоторым проблемам с совместимостью, например, многие крупные производителиpt-tracer
Эта технология окраски используется для раскрашивания потока Java и выполнения мониторинга вызовов всей ссылки или выделения трафика измерения давления. такcaffe
Подумав об этом, я отказался от его использованияquasar
Этот великолепный фреймворк сопрограмм опасается, что такой способ плетения будет несовместим с потоками.染色
.
Но я постараюсь решить это позже, в конце концовquasar
Спектакль заставляет людей пускать слюни, когда они его видят.
Идея 2 Реактивное программирование
❝С этим должен быть знаком каждый, самый известный из них — номер
❞ReactiveX/RxJava
, который вandroid
является наиболее широко используемым,caffe
использовать здесьRxJava3
Привести пример.
1. Введение в RxJava
❝RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
❞
Перевод представляет собой адаптивную структуру, в которой используется асинхронная реализация, управляемая событиями.事件驱动
Я верю, что это было написано前端
Знакомо, и это то, чем хвастается сообщество node.js高并发
.RxJava
Нижняя утилизация发布订阅模式
(аналогично базовому режиму node.js) и поддерживает线程切换
для поддержки более высокой параллелизма в течение ограниченного времени.
1. Введение в понятия, связанные с RxJava
так как это发布订阅
режим, три основных элемента发布者
,订阅者
,事件类型
.
1.1 Типы событий
Существует три основных типа событий:
-
Next
, издатель может опубликовать несколькоNext
События, подписчики также могут подписаться на несколькоNext
мероприятие; -
Complete
, абонент получаетComplete
Событие больше не подписано на событие издателя; -
Error
, издатель публикуетError
После события событие больше не публикуется. Подписчик принимаетError
События также не продолжают подписываться на события.
1.2 Публикация, подписка и события в эхо-сервисе
increasing
Выступать в роли издателя, отправляяecho
Нажмите сообщение числового типа.echo
Служба печатает сообщение после его получения.
increasing
// 发布者发送事件
Observable increasing = Observable.create((emitter) -> {
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(0);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(1);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(2);
Thread.sleep(new Random().nextInt(1000));
emitter.onComplete();
});
echo
// 创建订阅者
Observer<Integer> echo = new Observer<Integer>() {
private Disposable disposable;
//
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("观察者开始订阅");
disposable = d;
}
//
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("观察者接受到消息: " + integer);
}
//
@Override
public void onError(@NonNull Throwable e) {
System.out.println("观察者接收到报错: " + e.getMessage());
}
//
@Override
public void onComplete() {
System.out.println("观察者订阅完成,不再继续订阅消息");
}
};
echo
подпискаincreasing
increasing.subscribe(echo);
Вы можете увидеть окончательный результат выполнения:
观察者开始订阅
观察者接受到消息: 0
观察者接受到消息: 1
观察者接受到消息: 2
观察者订阅完成,不再继续订阅消息
Итак, в RxJava,Observable
играть в издательство,Observer
подписчик игры,ObservableOnSubscribe.subscribe
способ завершения事件
выпуск. Связь между публикацией и подпиской осуществляется черезObservable.subscribe
завершить.
1.3 Издатели и подписчики线程切换
В приведенном выше примере кода весь код издателя и подписчика выполняется в основном потоке. Однако в реальном бизнес-сценарии бизнес издателя и бизнес подписчика должны выполняться в разных потоках, чтобы сократить время, отнимаемое бизнесом.
线程切换
Код выглядит следующим образом:
increasing
:
CountDownLatch latch = new CountDownLatch(3);
// 发布者发送事件
ObservableOnSubscribe<Integer> onSubscribe = new ObservableOnSubscribe<Integer>() {
//
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
System.out.println("发布者开始发布事件-Thread.currentThread().getName() = " + Thread.currentThread().getName());
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(0);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(1);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(2);
emitter.onComplete();
}
};
Observable<Integer> increasing = Observable.create(onSubscribe);
echo
:
// 订阅者接受事件
Observer<Integer> echo = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("订阅者开始订阅事件-" + Thread.currentThread().getName());
disposable = d;
}
//
@Override
public void onNext(@NonNull Integer integer) {
latch.countDown();
System.out.println("订阅者接收到事件-" + Thread.currentThread().getName() + " " + integer);
}
//
@Override
public void onError(@NonNull Throwable e) {
System.out.println("订阅者接收到报错,停止接受订阅事件-" + Thread.currentThread().getName());
}
//
@Override
public void onComplete() {
System.out.println("订阅者接收到complete事件,停止接受订阅事件-" + Thread.currentThread().getName());
}
};
线程切换
:
// 订阅者和发布者切换线程订阅
increasing
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.trampoline())
.subscribe(echo);
//
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Конечный результат:
订阅者开始订阅事件-main
发布者开始发布事件-Thread.currentThread().getName() = RxCachedThreadScheduler-1
订阅者接收到事件-RxCachedThreadScheduler-1 0
订阅者接收到事件-RxCachedThreadScheduler-1 1
订阅者接收到事件-RxCachedThreadScheduler-1 2
订阅者接收到complete事件,停止接受订阅事件-RxCachedThreadScheduler-1
Как видите, издатель и подписчик выполняются в разных потоках. вObservable.subscribeOn(@NonNull Scheduler scheduler)
это планировщик, определяющий выполнение метода публикатора,Observable。observeOn(@NonNull Scheduler scheduler)
Планировщик с определенными методами подписчика.
а также调度器
Есть много категорий, таких какIoScheduler
,NewThreadScheduler
,SingleScheduler
,ComputationScheduler
д., необходимо сделать разумный выбор в соответствии с различными бизнес-сценариями.Scheduler
.
Поэтому, если вам нужно более глубокое понимание RxJava, вам нужно перейти наScheduler
Конкретная реализация caffe будет подробно проанализирована в статье после того, как caffe будет приготовлено.
❝
Caffe
Трудно писать, пожалуйста, поставьте лайк и подпишитесь на сообщение, большое спасибо за вашу поддержку и поддержку. Увидимся на следующей неделе!