Оптимизация асинхронных параллельных вызовов Java Microservice

Java Микросервисы
Оптимизация асинхронных параллельных вызовов Java Microservice

Давайте сначала представим сценарий.

Существует http-интерфейс A, который на самом деле представляет собой комбинацию результатов, возвращаемых тремя другими интерфейсами B, C и D. Эти три интерфейса не зависят друг от друга. Наш общий способ записи состоит в том, чтобы выполнять B, C и D синхронно и последовательно, а затем собирать их вместе после получения результатов по очереди. Тогда, если три интерфейса занимают 2 секунды соответственно, то интерфейс A займет 6 секунд. Если B, C и D могут выполняться одновременно, то интерфейс A теоретически занимает всего 2 секунды.

Конечно, реальная ситуация определенно сложнее, если есть трудоемкие вызовы, которые не зависят друг от друга внутри интерфейса, то мы можем сделать такое слияние, и сокращение времени отклика все равно будет очень очевидным. Время отклика всего интерфейса зависит от самого длинного внутреннего интерфейса.

Итак, давайте посмотрим, какие методы существуют в Java для достижения этой цели. Если вы хорошенько об этом подумаете, то обнаружите, что если вы хотите распараллелить, вы можете сделать это только с помощью многопоточности в Java. В реальной ситуации время обработки каждого потока определенно отличается, поэтому как сделать так, чтобы поток, обрабатываемый первым, останавливался и ждал, пока последний завершит обработку. Если вы часто используете многопоточность, вы определенно можете подумать о классе инструментов CountDownLatch. Конечно, есть и простые и жестокие методы, опрашивающие каждый поток в пустом цикле на предмет завершения, но это точно не элегантно.

Тогда код непосредственно написан ниже: Предположим, что есть студенческий сервис, который предоставляет запрос имени учащегося, возраста и информации о семье, и нет никакой взаимозависимости между каждым сервисом. Мы просто моделируем интерфейс для получения информации о студентах.

нормальный метод

    @RequestMapping("/getStudentInfo")
    public Object getStudentInfo() {
        long start = System.currentTimeMillis();
        Map<String, Object> resultMap = new HashMap<>(10);

        try {
            resultMap.put("studentName", studentService.getStudentName());
            resultMap.put("studentAge", studentService.getSutdentAge());
            resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());
        } catch (Exception e) {
            resultMap.put("errMsg", e.getMessage());
        }
        resultMap.put("total cost", System.currentTimeMillis() - start);
        return resultMap;
    }

Синхронное выполнение, занимающее по времени 6 секунд.

1. Future

 @RequestMapping("/getStudentInfoWithFuture")
    public Object testWhitCallable() {
        long start = System.currentTimeMillis();
        Map<String, Object> resultMap = new HashMap<>(10);

        try {
            CountDownLatch countDownLatch = new CountDownLatch(3);

            Future futureStudentName = es.submit(() -> {
                Object studentName = studentService.getStudentName();
                countDownLatch.countDown();
                return studentName;
            });

            Future futureStudentAge = es.submit(() -> {
                Object studentAge = studentService.getSutdentAge();
                countDownLatch.countDown();
                return studentAge;
            });

            Future futureStudentFamilyInfo = es.submit(() -> {
                Object studentFamilyInfo = studentService.getSutdentFamilyInfo();
                countDownLatch.countDown();
                return studentFamilyInfo;
            });

            //同步等待所有线程执行完之后再继续
            countDownLatch.await();

            resultMap.put("studentName", futureStudentName.get());
            resultMap.put("studentAge", futureStudentAge.get());
            resultMap.put("studentFamilyInfo", futureStudentFamilyInfo.get());
        } catch (Exception e) {
            resultMap.put("errMsg", e.getMessage());
        }

        resultMap.put("total cost", System.currentTimeMillis() - start);

        return resultMap;
    }

2.RxJava

@RequestMapping("/getStudentInfoWithRxJava")
    public Object testWithRxJava() {
        long start = System.currentTimeMillis();
        Map<String, Object> resultMap = new HashMap<>(10);

        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);

            Observable studentNameObservable = Observable.create(observableEmitter -> {
                resultMap.put("studentName", studentService.getStudentName());
                observableEmitter.onComplete();
            }).subscribeOn(Schedulers.io());

            Observable studentAgeObservable = Observable.create(observableEmitter -> {
                resultMap.put("studentAge", studentService.getSutdentAge());
                observableEmitter.onComplete();
            }).subscribeOn(Schedulers.io());

            Observable familyInfoObservable = Observable.create(observableEmitter -> {
                resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());
                observableEmitter.onComplete();
            }).subscribeOn(Schedulers.io());
            //创建一个下游 Observer
            Observer<Object> observer = new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(Object o) {
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                    //因为后面用了 merge 操作符,所以会合并后发射,那么只要 countdown 一次就行了。
                    countDownLatch.countDown();
                }
            };
            //建立连接,
            Observable.merge(studentNameObservable, studentAgeObservable, familyInfoObservable).subscribe(observer);
            //等待异步线程完成
            countDownLatch.await();

        } catch (Exception e) {
            resultMap.put("errMsg", e.getMessage());
        }
        resultMap.put("total cost", System.currentTimeMillis() - start);
        return resultMap;
    }

Я не знаком с RxJava, и я также временно изучаю его, я не знаю, является ли этот способ написания лучшим.

3.CompletableFutures

    @RequestMapping("/getStudentInfoWithCompletableFuture")
    public Object getStudentInfoWithCompletableFuture() {
        long start = System.currentTimeMillis();
        Map<String, Object> resultMap = new HashMap<>(10);

        try {
            CompletableFuture<Object> completableFutureStudentName = CompletableFuture.supplyAsync(() -> {
                try {
                    return studentService.getStudentName();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return null;
            });

            CompletableFuture<Object> completableFutureSutdentAge = CompletableFuture.supplyAsync(() -> {
                try {
                    return studentService.getSutdentAge();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return null;
            });

            CompletableFuture<Object> completableFutureFamilyInfo = CompletableFuture.supplyAsync(() -> {
                try {
                    return studentService.getSutdentFamilyInfo();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return null;
            });

            CompletableFuture.allOf(completableFutureStudentName, completableFutureSutdentAge, completableFutureFamilyInfo).join();

            resultMap.put("studentName", completableFutureStudentName.get());
            resultMap.put("studentAge", completableFutureSutdentAge.get());
            resultMap.put("studentFamilyInfo", completableFutureFamilyInfo.get());

        } catch (Exception e) {
            resultMap.put("errMsg", e.getMessage());
        }

        resultMap.put("total cost", System.currentTimeMillis() - start);

        return resultMap;
    }

Поставляется с последним ожиданием синхронизации, нет необходимости в CountDownLatch. CompletableFuture имеет много других полезных методов.

Если интересно, можете попробовать сами. адрес проекта на гитхабеreactive-programming-sample.

использованная литература

Processing streaming data with Spring WebFluxХотя эта статья посвящена webflux, в ней перечислены многие другие решения.
RxJava Essentials перевод на китайский язык

Этот пост я попросил на v2ex, спасибо пользователям сети за помощь.
Ниже приведены некоторые решения для справки, основанные на ответах пользователей сети:

  • Фьючерсы, предоставляемые Java 5
  • CompletableFutures, предоставляемые Java 8
  • Сторонняя библиотека RxJava
  • Реактивные потоки, предоставляемые Spring 5, реализованные Reactor.
  • Webflux весной 5.

There are no comments on this post.