С распространением микросервисов все больше и больше компаний начали использовать SpringCloud для своей внутренней инфраструктуры микросервисов.
Согласно концепции микросервисов, функции каждого отдельного приложения должны быть разделены на микросервисы (модули) с независимыми функциями по принципу функциональной ортогональности, то есть принципу независимости функций, а затем предоставляться внешнему миру через агрегация интерфейсов.
Однако с увеличением количества микросервисных модулей появляется все больше и больше интерфейсов, которые необходимо агрегировать для служб среднего уровня, предоставляющих услуги посредством агрегации интерфейсов! Постепенно агрегация интерфейсов становится очень сложным узким местом производительности в распределенной микросервисной архитектуре!
Например, существует служба агрегации, которая должна агрегировать данные трех служб, Службы, Маршрута и Плагина, для предоставления внешних служб:
@Headers({ "Accept: application/json" })
public interface ServiceClient {
@RequestLine("GET /")
List<Service> list();
}
@Headers({ "Accept: application/json" })
public interface RouteClient {
@RequestLine("GET /")
List<Route> list();
}
@Headers({ "Accept: application/json" })
public interface PluginClient {
@RequestLine("GET /")
List<Plugin> list();
}
Используйте декларативный OpenFeign вместо HTTP-клиента для сетевых запросов
Пишите модульные тесты
public class SyncFeignClientTest {
public static final String SERVER = "http://devops2:8001";
private ServiceClient serviceClient;
private RouteClient routeClient;
private PluginClient pluginClient;
@Before
public void setup(){
BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.INFO);
String service = SERVER + "/services";
serviceClient = Feign.builder()
.target(ServiceClient.class, service);
String route = SERVER + "/routes";
routeClient = Feign.builder()
.target(RouteClient.class, route);
String plugin = SERVER + "/plugins";
pluginClient = Feign.builder()
.target(PluginClient.class, plugin);
}
@Test
public void aggressionTest() {
long current = System.currentTimeMillis();
System.out.println("开始调用聚合查询");
serviceTest();
routeTest();
pluginTest();
System.out.println("调用聚合查询结束!耗时:" + (System.currentTimeMillis() - current) + "毫秒");
}
@Test
public void serviceTest(){
long current = System.currentTimeMillis();
System.out.println("开始获取Service");
String service = serviceClient.list();
System.out.println(service);
System.out.println("获取Service结束!耗时:" + (System.currentTimeMillis() - current) + "毫秒");
}
@Test
public void routeTest(){
long current = System.currentTimeMillis();
System.out.println("开始获取Route");
String route = routeClient.list();
System.out.println(route);
System.out.println("获取Route结束!耗时:" + (System.currentTimeMillis() - current) + "毫秒");
}
@Test
public void pluginTest(){
long current = System.currentTimeMillis();
System.out.println("开始获取Plugin");
String plugin = pluginClient.list();
System.out.println(plugin);
System.out.println("获取Plugin结束!耗时:" + (System.currentTimeMillis() - current) + "毫秒");
}
}
Результаты теста:
开始调用聚合查询
开始获取Service
{"next":null,"data":[]}
获取Service结束!耗时:134毫秒
开始获取Route
{"next":null,"data":[]}
获取Route结束!耗时:44毫秒
开始获取Plugin
{"next":null,"data":[]}
获取Plugin结束!耗时:45毫秒
调用聚合查询结束!耗时:223毫秒
Process finished with exit code 0
Хорошо видно, что время, затрачиваемое на совокупный запрос, составляет 223 мс = 134 мс + 44 мс + 45 мс.
То есть время запроса сервиса агрегации пропорционально количеству интерфейсов, что явно недопустимо!
Самый распространенный способ решить эту проблему — заранее создать пул потоков и выполнить агрегацию интерфейсов через многопоточный интерфейс параллельных запросов!
Вы можете найти много такого решения в Интернете с быстрым поиском Baidu, и сегодня я больше не буду публиковать свой код! Но говорить о недостатках этого метода:
Первоначальное решение, принятое основным контейнером сервлетов JavaWeb, заключается в использовании одного потока и одного сервлета для обработки HTTP-запроса! Такой подход не представляет большой проблемы при невысоком параллелизме, но из-за невыполнения закона Мура количество потоков на одной машине по-прежнему составляет около 10 000. С параллелизмом в десятки миллионов справиться просто невозможно!
Чтобы решить трудоемкую проблему агрегации интерфейсов, использование многопоточных одновременных сетевых запросов из пула потоков еще больше подпитывает! Запрос, для обработки которого изначально требуется только один поток, агрегация интерфейсов посредством многопоточной параллельной обработки увеличивает количество потоков, необходимых для обработки каждого запроса, быстро сокращает количество доступных потоков в системе и, естественно, уменьшает количество одновременных операций. системы в системе!
В настоящее время люди думают о NIO и его среде с открытым исходным кодом Netty, которая поддерживается начиная с Java5! На основе режима Netty и Reactor в экосистеме Java появились асинхронные неблокирующие фреймворки JavaWeb, такие как SpringWebFlux! Spring5 также разработан на основе SpringWebFlux! С асинхронным неблокирующим сервером также есть асинхронный неблокирующий клиент сетевых запросов WebClient!
Сегодня я буду использовать WebClient и ReactiveFeign для создания туториала по асинхронной неблокирующей агрегации интерфейса:
Во-первых, ввести зависимости
<dependency>
<groupId>com.playtika.reactivefeign</groupId>
<artifactId>feign-reactor-core</artifactId>
<version>1.0.30</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.playtika.reactivefeign</groupId>
<artifactId>feign-reactor-webclient</artifactId>
<version>1.0.30</version>
<scope>test</scope>
</dependency>
Однако переписывание клиента Feign на базе Reactor Core заключается в изменении возвращаемого значения исходного интерфейса: List
@Headers({ "Accept: application/json" })
public interface ServiceClient {
@RequestLine("GET /")
Flux<Service> list();
}
@Headers({ "Accept: application/json" })
public interface RouteClient {
@RequestLine("GET /")
Flux<Service> list();
}
@Headers({ "Accept: application/json" })
public interface PluginClient {
@RequestLine("GET /")
Flux<Service> list();
}
затем напишите модульные тесты
public class AsyncFeignClientTest {
public static final String SERVER = "http://devops2:8001";
private CountDownLatch latch;
private ServiceClient serviceClient;
private RouteClient routeClient;
private PluginClient pluginClient;
@Before
public void setup(){
BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.INFO);
latch= new CountDownLatch(3);
String service= SERVER + "/services";
serviceClient= WebReactiveFeign
.<ServiceClient>builder()
.target(ServiceClient.class, service);
String route= SERVER + "/routes";
routeClient= WebReactiveFeign
.<RouteClient>builder()
.target(RouteClient.class, route);
String plugin= SERVER + "/plugins";
pluginClient= WebReactiveFeign
.<PluginClient>builder()
.target(PluginClient.class, plugin);
}
@Test
public void aggressionTest() throws InterruptedException {
long current= System.currentTimeMillis();
System.out.println("开始调用聚合查询");
serviceTest();
routeTest();
pluginTest();
latch.await();
System.out.println("调用聚合查询结束!耗时:" + (System.currentTimeMillis() - current) + "毫秒");
}
@Test
public void serviceTest(){
long current= System.currentTimeMillis();
System.out.println("开始获取Service");
serviceClient.list()
.subscribe(result ->{
System.out.println(result);
latch.countDown();
System.out.println("获取Service结束!耗时:" + (System.currentTimeMillis() - current) + "毫秒");
});
}
@Test
public void routeTest(){
long current= System.currentTimeMillis();
System.out.println("开始获取Route");
routeClient.list()
.subscribe(result ->{
System.out.println(result);
latch.countDown();
System.out.println("获取Route结束!耗时:" + (System.currentTimeMillis() - current) + "毫秒");
});
}
@Test
public void pluginTest(){
long current= System.currentTimeMillis();
System.out.println("开始获取Plugin");
pluginClient.list()
.subscribe(result ->{
System.out.println(result);
latch.countDown();
System.out.println("获取Plugin结束!耗时:" + (System.currentTimeMillis() - current) + "毫秒");
});
}
}
Ключевым моментом здесь является то, что исходный запрос синхронной блокировки теперь является асинхронным и неблокирующим, поэтому вам нужно использовать CountDownLatch для синхронизации, вызывать CountDownLatch.coutdown() после получения интерфейса и вызывать CountDownLatch.await( после вызова всех запросов интерфейса ) Подождите, пока все интерфейсы вернут результаты, прежде чем переходить к следующему шагу!
Результаты теста:
开始调用聚合查询
开始获取Service
开始获取Route
开始获取Plugin
{"next":null,"data":[]}
{"next":null,"data":[]}
获取Plugin结束!耗时:215毫秒
{"next":null,"data":[]}
获取Route结束!耗时:216毫秒
获取Service结束!耗时:1000毫秒
调用聚合查询结束!耗时:1000毫秒
Process finished with exit code 0
Очевидно, что время, затраченное на запрос агрегации, больше не равно сумме времени всех запросов интерфейса, а максимальному значению времени запроса интерфейса!
Приступим к тесту производительности:
Общий тест агрегации интерфейсов Feign вызывает 1000 раз:
开始调用聚合查询
开始获取Service
{"next":null,"data":[]}
获取Service结束!耗时:169毫秒
开始获取Route
{"next":null,"data":[]}
获取Route结束!耗时:81毫秒
开始获取Plugin
{"next":null,"data":[]}
获取Plugin结束!耗时:93毫秒
调用聚合查询结束!耗时:343毫秒
summary: 238515, average: 238
Используйте WebClient для выполнения запроса агрегации интерфейса 1000 раз:
开始调用聚合查询
开始获取Service
开始获取Route
开始获取Plugin
{"next":null,"data":[]}
{"next":null,"data":[]}
获取Route结束!耗时:122毫秒
{"next":null,"data":[]}
获取Service结束!耗时:122毫秒
获取Plugin结束!耗时:121毫秒
调用聚合查询结束!耗时:123毫秒
summary: 89081, average: 89
В результатах тестирования результаты тестирования WebClient ровно на треть меньше, чем у обычного FeignClient! Как и ожидалось!