Давайте сначала представим сценарий.
Существует http-интерфейс A, который на самом деле представляет собой комбинацию результатов, возвращаемых тремя другими интерфейсами B, C и D. Эти три интерфейса не зависят друг от друга. Наш общий способ записи состоит в том, чтобы выполнять B, C и D синхронно и последовательно, а затем собирать их вместе после получения результатов по очереди. Тогда, если три интерфейса занимают 2 секунды соответственно, то интерфейс A займет 6 секунд. Если B, C и D могут выполняться одновременно, то интерфейс A теоретически занимает всего 2 секунды.
Конечно, реальная ситуация определенно сложнее, если есть трудоемкие вызовы, которые не зависят друг от друга внутри интерфейса, то мы можем сделать такое слияние, и сокращение времени отклика все равно будет очень очевидным. Время отклика всего интерфейса зависит от самого длинного внутреннего интерфейса.
Итак, давайте посмотрим, какие методы существуют в Java для достижения этой цели. Если вы хорошенько об этом подумаете, то обнаружите, что если вы хотите распараллелить, вы можете сделать это только с помощью многопоточности в Java. В реальной ситуации время обработки каждого потока определенно отличается, поэтому как сделать так, чтобы поток, обрабатываемый первым, останавливался и ждал, пока последний завершит обработку. Если вы часто используете многопоточность, вы определенно можете подумать о классе инструментов CountDownLatch. Конечно, есть и простые и жестокие методы, опрашивающие каждый поток в пустом цикле на предмет завершения, но это точно не элегантно.
Тогда код непосредственно написан ниже: Предположим, что есть студенческий сервис, который предоставляет запрос имени учащегося, возраста и информации о семье, и нет никакой взаимозависимости между каждым сервисом. Мы просто моделируем интерфейс для получения информации о студентах.
нормальный метод
@RequestMapping("/getStudentInfo")
public Object getStudentInfo() {
long start = System.currentTimeMillis();
Map<String, Object> resultMap = new HashMap<>(10);
try {
resultMap.put("studentName", studentService.getStudentName());
resultMap.put("studentAge", studentService.getSutdentAge());
resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
Синхронное выполнение, занимающее по времени 6 секунд.
1. Future
@RequestMapping("/getStudentInfoWithFuture")
public Object testWhitCallable() {
long start = System.currentTimeMillis();
Map<String, Object> resultMap = new HashMap<>(10);
try {
CountDownLatch countDownLatch = new CountDownLatch(3);
Future futureStudentName = es.submit(() -> {
Object studentName = studentService.getStudentName();
countDownLatch.countDown();
return studentName;
});
Future futureStudentAge = es.submit(() -> {
Object studentAge = studentService.getSutdentAge();
countDownLatch.countDown();
return studentAge;
});
Future futureStudentFamilyInfo = es.submit(() -> {
Object studentFamilyInfo = studentService.getSutdentFamilyInfo();
countDownLatch.countDown();
return studentFamilyInfo;
});
//同步等待所有线程执行完之后再继续
countDownLatch.await();
resultMap.put("studentName", futureStudentName.get());
resultMap.put("studentAge", futureStudentAge.get());
resultMap.put("studentFamilyInfo", futureStudentFamilyInfo.get());
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
2.RxJava
@RequestMapping("/getStudentInfoWithRxJava")
public Object testWithRxJava() {
long start = System.currentTimeMillis();
Map<String, Object> resultMap = new HashMap<>(10);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
Observable studentNameObservable = Observable.create(observableEmitter -> {
resultMap.put("studentName", studentService.getStudentName());
observableEmitter.onComplete();
}).subscribeOn(Schedulers.io());
Observable studentAgeObservable = Observable.create(observableEmitter -> {
resultMap.put("studentAge", studentService.getSutdentAge());
observableEmitter.onComplete();
}).subscribeOn(Schedulers.io());
Observable familyInfoObservable = Observable.create(observableEmitter -> {
resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());
observableEmitter.onComplete();
}).subscribeOn(Schedulers.io());
//创建一个下游 Observer
Observer<Object> observer = new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
//因为后面用了 merge 操作符,所以会合并后发射,那么只要 countdown 一次就行了。
countDownLatch.countDown();
}
};
//建立连接,
Observable.merge(studentNameObservable, studentAgeObservable, familyInfoObservable).subscribe(observer);
//等待异步线程完成
countDownLatch.await();
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
Я не знаком с RxJava, и я также временно изучаю его, я не знаю, является ли этот способ написания лучшим.
3.CompletableFutures
@RequestMapping("/getStudentInfoWithCompletableFuture")
public Object getStudentInfoWithCompletableFuture() {
long start = System.currentTimeMillis();
Map<String, Object> resultMap = new HashMap<>(10);
try {
CompletableFuture<Object> completableFutureStudentName = CompletableFuture.supplyAsync(() -> {
try {
return studentService.getStudentName();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
CompletableFuture<Object> completableFutureSutdentAge = CompletableFuture.supplyAsync(() -> {
try {
return studentService.getSutdentAge();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
CompletableFuture<Object> completableFutureFamilyInfo = CompletableFuture.supplyAsync(() -> {
try {
return studentService.getSutdentFamilyInfo();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
CompletableFuture.allOf(completableFutureStudentName, completableFutureSutdentAge, completableFutureFamilyInfo).join();
resultMap.put("studentName", completableFutureStudentName.get());
resultMap.put("studentAge", completableFutureSutdentAge.get());
resultMap.put("studentFamilyInfo", completableFutureFamilyInfo.get());
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
Поставляется с последним ожиданием синхронизации, нет необходимости в CountDownLatch. CompletableFuture имеет много других полезных методов.
Если интересно, можете попробовать сами. адрес проекта на гитхабеreactive-programming-sample.
использованная литература
Processing streaming data with Spring WebFluxХотя эта статья посвящена webflux, в ней перечислены многие другие решения.
RxJava Essentials перевод на китайский язык
Этот пост я попросил на v2ex, спасибо пользователям сети за помощь.
Ниже приведены некоторые решения для справки, основанные на ответах пользователей сети:
- Фьючерсы, предоставляемые Java 5
- CompletableFutures, предоставляемые Java 8
- Сторонняя библиотека RxJava
- Реактивные потоки, предоставляемые Spring 5, реализованные Reactor.
- Webflux весной 5.
There are no comments on this post.