[Практическое упражнение] Напишите яркий запрос на слияние от руки, интерпретируя истину нижнего слоя

Java

  • Зима приходит и уходит, а мне все больше нравятся более мягкие одеяла.a
  • Устав от палящего летнего зноя и стремительности осени, приезжайте в тишину зимы.

1. Думая о сбое сервера

Начальник сказал, что собирается делать живое маркетинговое мероприятие, причем в продвижении должны быть задействованы как онлайн, так и офлайн.Количество участников этого мероприятия может быть очень большим...
Конечно же, поскольку код писал не я, в тот день сервер рухнул, когда он рухнул, было очень тихо, человек, который написал код, ушел один, и когда он ушел, было очень спокойно.

💡:Почему происходит сбой, когда количество запросов достигает миллионов?

В микросервисах пункт назначения проходит через интерфейспоставщики услугЗапускhttpзапрос илиrpc(tcp)Запросы на получение данных, ведь среди большого количества запросов количество запросов, которые сервер может обработать, ограничено, сервис залит большим количеством потоков, а также будут заняты соединения с базой данных, в результате чего все медленнее и медленнее реагируют.

🌱:

  • Имеет ли отзывчивость какое-либо отношение к нашему уровню данных?
  • Можете ли вы добавить сервер сервера?
  • Было бы неплохо, если бы это могло уменьшить количество запросов от клиента к серверу?
  • Текущий лимит? Можно ли ограничить текущую сцену?
  • Не слишком ли расточительно запрашивать данные для каждого потока и каждый раз запрашивать только один результат?
  • Можем ли мы найти способ улучшить производительность вызовов в нашей системе?

2. Кто-то хочет увидеть мерж-реквест, он сегодня здесь

Некоторые из вышеперечисленных идей можно решить, добавив кеш и добавив MQ. Но кеш ограничен, MQ это очередь, эффект ограниченного потока. Итак, как мы можем улучшить вызывающую способность системы, давайте узнаем об этом, слияние запросов и распределение результатов.

  • Обычный запрос — это запрос и поток, который инициирует соответствующие бизнес-требования в фоновом режиме и вызывает бизнес по сбору данных.
  • Когда запросы объединяются, нам нужно объединить несколько запросов и вызывать их пакетами.

Общая идея конструкции такова:

  1. регулярный запрос

    常规请求
    регулярный запрос
  2. запрос на вытягивание

  3. Расскажите нам свои мысли

  • Есть много запросов для решения проблемы, таких как вызов данных о продукте, есть много сервисов, а цепочка вызовов очень длинная, поэтому количество запросов к базе данных очень велико, и пул соединений с базой данных быстро используется. , что приводит к блокировке многих запросов, а также вызывает блокировку приложения в целом. Количество потоков очень велико. Хотя проблему можно смягчить, увеличив размер пула соединений с базой данных и пройдя стресс-тест, это временное решение.
  • При запросе информации о продукте, если есть 100 запросов на один и тот же продукт одновременно, 99 из них являются избыточными, и 100 запросов могут быть объединены в реальный вызов фонового интерфейса, если контролируется безопасность потоков. Моя идея состоит в том, чтобы использовать параллельный счетчик для реализации локального кеша, и счетчик может напрямую использовать `AtomicInteger`, предоставленный JDK, который является потокобезопасным и обеспечивает атомарные операции.
  • В качестве примера возьмем получение информации о продукте, каждый идентификатор продукта соответствует счетчику, а начальное значение счетчика по умолчанию равно 0. Когда приходит запрос, счетчик увеличивается на 1 с помощью `incrementAndGet()`, а увеличенное значение равно вернулся. Когда значение равно 1, это указывает на то, что поток является первым потоком, прибывшим в этот момент времени, а затем он вызывает реальную бизнес-логику и помещает ее в локальный кеш после запроса результата. Когда значение больше 1, это указывает на то, что ранее был поток, вызывающий бизнес-логику, он переходит в состояние ожидания и циклически запрашивает, есть ли данные, доступные в локальном кэше. После получения результата вызывается `decrementAndGet()` для уменьшения счетчика на 1. Когда счетчик уменьшается до 0, он возвращается в исходное состояние, а когда он уменьшается до 0 (представляющего последний поток), кэш очищается.
  • В 1000 запросах запрашиваемый идентификатор данных отличается, но используемый интерфейс службы один и тот же, все запрашивают данные товара с идентификатором от 1 до 1000 в библиотеке товаров, которые запрашиваются из таблицы. queryDataById`(dataId)`, то я также могу объединить эти запросы, перейти к пакетному запросу, а затем вернуть распределение данных. Идея состоит в том, чтобы спроектировать каждый запрос так, чтобы он содержал запрос `unique traceId`, что немного похоже на отслеживание ссылок. Просто вы можете использовать идентификатор запроса для наибольшего идентификатора трассировки, поместить запрос в команду и использовать синхронизированный задач, таких как каждые 10 мс для сканирования очереди, и объединил эти бизнес-запросы на слияние для запроса уровня базы данных.
  • В этой схеме есть задержка данных, которая представляет собой время состояния ожидания в каждом цикле. Так как деловой вызов, который включает в себя несколько поисков в базе данных, занимает десятки миллисекунд или даже сотни миллисекунд, для ожидания ожидания можно установить меньшее значение, например 10 миллисекунд. Таким образом, процессорное время не тратится впустую, а производительность в реальном времени относительно высока, но также можно активно разбудить ожидающий поток, с которым сложнее работать. Вы также можете добавить некоторую обработку исключений, контроль времени ожидания, максимальное количество повторных попыток, максимальное количество параллелизма (если максимальное количество параллелизма истечет, он быстро выйдет из строя) и т. д.

3. Начните упражнение

  • Имитация интерфейса удаленного вызова
 1import org.springframework.stereotype.Service;
2
3import java.util.*;
4
5/**
6 * 模拟远程调用ShopData接口
7 * @author Lijing
8 */
9@Service
10public class QueryServiceRemoteCall {
11
12    /**
13     * 调用远程的商品信息查询接口
14     *
15     * @param code 商品编码
16     * @return 返回商品信息,map格式
17     */
18    public HashMap<String, Object> queryShopDataInfoByCode(String code) {
19        try {
20            Thread.sleep(50L);
21        } catch (InterruptedException e) {
22            e.printStackTrace();
23        }
24        HashMap<String, Object> hashMap = new HashMap<>();
25        hashMap.put("shopDataId", new Random().nextInt(999999999));
26        hashMap.put("code", code);
27        hashMap.put("name", "小玩具");
28        hashMap.put("isOk", "true");
29        hashMap.put("price","3000");
30        return hashMap;
31    }
32
33    /**
34     * 批量查询 - 调用远程的商品信息查询接口
35     *
36     * @param codes 多个商品编码
37     * @return 返回多个商品信息
38     */
39    public List<Map<String, Object>> queryShopDataInfoByCodeBatch(List<String> codes) {
40        List<Map<String, Object>> result = new ArrayList<>();
41        for (String code : codes) {
42            HashMap<String, Object> hashMap = new HashMap<>();
43            hashMap.put("shopDataId", new Random().nextInt(999999999));
44            hashMap.put("code", code);
45            hashMap.put("name", "棉花糖");
46            hashMap.put("isOk", "true");
47            hashMap.put("price","6000");
48            result.add(hashMap);
49        }
50        return result;
51    }
52}
  • использоватьCountDownLatchОбщий тестовый класс для имитации одновременных запросов
 1@RunWith(SpringRunner.class)
2@SpringBootTest(classes = MyBotApplication.class)
3public class MergerApplicationTests {
4
5    long timed = 0L;
6
7    @Before
8    public void start() {
9        System.out.println("开始测试");
10        timed = System.currentTimeMillis();
11    }
12
13    @After
14    public void end() {
15        System.out.println("结束测试,执行时长:" + (System.currentTimeMillis() - timed));
16    }
17
18    // 模拟的请求数量
19    private static final int THREAD_NUM = 1000;
20
21    // 倒计数器 juc包中常用工具类
22    private CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
23
24    @Autowired
25    private ShopDataService shopDataService;
26
27    @Test
28    public void simulateCall() throws IOException {
29        // 创建 并不是马上发起请求
30        for (int i = 0; i < THREAD_NUM; i++) {
31            final String code = "code-" + (i + 1);
32            // 多线程模拟用户查询请求
33            Thread thread = new Thread(() -> {
34                try {
35                    // 代码在这里等待,等待countDownLatch为0,代表所有线程都start,再运行后续的代码
36                    countDownLatch.await();
37                    // 模拟 http请求,实际上就是多线程调用这个方法
38                    Map<String, Object> result = shopDataService.queryData(code);
39                    System.out.println(Thread.currentThread().getName() + " 查询结束,结果是:" + result);
40                } catch (Exception e) {
41                    System.out.println(Thread.currentThread().getName() + " 线程执行出现异常:" + e.getMessage());
42                }
43            });
44            thread.setName("price-thread-" + code);
45            thread.start();
46            // 启动后,倒计时器倒计数 减一,代表又有一个线程就绪了
47            countDownLatch.countDown();
48        }
49
50        System.in.read();
51    }
52
53}
  • Начнем с демонстрации обычного звонка
 1/**
2 * 商品数据服务类
3 * @author lijing
4 */
5@Service
6public class ShopDataService {
7    @Autowired
8    QueryServiceRemoteCall queryServiceRemoteCall;
9
10    // 1000 用户请求,1000个线程
11    public Map<String, Object> queryData(String shopDataId) throws ExecutionException, InterruptedException {
12         return queryServiceRemoteCall.queryShopDataInfoByCode(shopDataId);
13    }
14}
  • Отображение результатов запроса
 1开始测试
2price-thread-code-3 查询结束,结果是:{code=code-3, shopDataId=165800794, price=3000, isOk=true, name=小玩具}
3price-thread-code-994 查询结束,结果是:{code=code-994, shopDataId=735455508, price=3000, isOk=true, name=小玩具}
4price-thread-code-36 查询结束,结果是:{code=code-36, shopDataId=781610507, price=3000, isOk=true, name=小玩具}
5price-thread-code-993 查询结束,结果是:{code=code-993, shopDataId=231087525, price=3000, isOk=true, name=小玩具}
6
7....... 省略代码中。。。。
8
9price-thread-code-25 查询结束,结果是:{code=code-25, shopDataId=149193873, price=3000, isOk=true, name=小玩具}
10price-thread-code-2 查询结束,结果是:{code=code-2, shopDataId=324877405, price=3000, isOk=true, name=小玩具}
11
12.......共计1000次的查询结果
13
14结束测试,执行时长:150
  • Затем мы обнаруживаем, что можем использоватьcodeкак следtraceId, затем используйтеScheduledExecutorService,CompletableFuture,LinkedBlockingQueueВ ожидании некоторой многопоточной технологии вы можете реализовать это слияние запросов и простую демонстрацию реализации распределения запросов.
 1import javax.annotation.PostConstruct;
2import java.util.ArrayList;
3import java.util.HashMap;
4import java.util.List;
5import java.util.Map;
6import java.util.concurrent.*;
7
8/**
9 * 商品数据服务类
10 *
11 * @author lijing
12 */
13@Service
14public class ShopDataService {
15
16    class Request {
17        String shopDataId;
18        CompletableFuture<Map<String, Object>> completableFuture;
19    }
20
21    // 集合,积攒请求,每N毫秒处理
22    LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>();
23
24    @PostConstruct
25    public void init() {
26        ScheduledExecutorService scheduledExecutorPool = Executors.newScheduledThreadPool(5);
27        scheduledExecutorPool.scheduleAtFixedRate(() -> {
28            // TODO 取出所有queue的请求,生成一次批量查询
29            int size = queue.size();
30            if (size == 0) {
31                return;
32            }
33            System.out.println("此次合并了多少请求:" + size);
34            // 1、 取出
35            ArrayList<Request> requests = new ArrayList<>();
36            ArrayList<String> shopDataIds = new ArrayList<>();
37            for (int i = 0; i < size; i++) {
38                Request request = queue.poll();
39                requests.add(request);
40                shopDataIds.add(request.shopDataId);
41            }
42            // 2、 组装一个批量查询 (不会比单次查询慢很多)
43            List<Map<String, Object>> mapList = queryServiceRemoteCall.queryShopDataInfoByCodeBatch(shopDataIds);
44
45            // 3、 分发响应结果,给每一个request用户请求 (多线程 之间的通信)
46            HashMap<String, Map<String, Object>> resultMap = new HashMap<>(); //  1000---- 007
47            for (Map<String, Object> map : mapList) {
48                String code = map.get("code").toString();
49                resultMap.put(code, map);
50            }
51
52            // 1000个请求
53            for (Request req : requests) { 
54                Map<String, Object> result = resultMap.get(req.shopDataId);
55                // 怎么通知对应的1000多个线程,取结果呢?
56                req.completableFuture.complete(result);
57            }
58        }, 0, 10, TimeUnit.MILLISECONDS);
59    }
60
61
62    @Autowired
63    QueryServiceRemoteCall queryServiceRemoteCall;
64
65    /**
66     * 1000 用户请求,1000个线程
67     *
68     * @param shopDataId
69     * @return
70     * @throws ExecutionException
71     * @throws InterruptedException
72     */
73    public Map<String, Object> queryData(String shopDataId) throws ExecutionException, InterruptedException {
74        Request request = new Request();
75        request.shopDataId = shopDataId;
76        CompletableFuture<Map<String, Object>> future = new CompletableFuture<>();
77        request.completableFuture = future;
78        queue.add(request);
79        // 等待其他线程通知拿结果
80        return future.get();
81    }
82}
  • Результаты теста
 1开始测试
2结束测试,执行时长:164
3
4此次合并了多少请求:63
5此次合并了多少请求:227
6此次合并了多少请求:32
7此次合并了多少请求:298
8此次合并了多少请求:68
9此次合并了多少请求:261
10此次合并了多少请求:51
11
12price-thread-code-747 查询结束,结果是:{code=code-747, shopDataId=113980125, price=6000, isOk=true, name=棉花糖}
13price-thread-code-821 查询结束,结果是:{code=code-821, shopDataId=568038265, price=6000, isOk=true, name=棉花糖}
14price-thread-code-745 查询结束,结果是:{code=code-745, shopDataId=998247608, price=6000, isOk=true, name=棉花糖}
15
16....... 省略代码中。。。。
17
18price-thread-code-809 查询结束,结果是:{code=code-809, shopDataId=479029433, price=6000, isOk=true, name=棉花糖}
19price-thread-code-806 查询结束,结果是:{code=code-806, shopDataId=929748878, price=6000, isOk=true, name=棉花糖}

Видно, что мы объединили 1000 запросов, и данные моделируются нормально.

4. Резюме

Недостатки:

  • Платой за включение запросов является дополнительная задержка перед выполнением фактической логики.
  • Если среднее время выполнения составляет всего 5 мс, в сценарии пакетного слияния 10 мс в худшем случае время выполнения может стать 15 мс. (Он не должен подходить для сценариев RPC с малой задержкой и не должен подходить для сценариев с низким параллелизмом.)

Сцены:

  • В этом нет необходимости, если редко выполняется более 1 или 2 запросов одновременно.
  • Конкретный запрос используется много раз одновременно, и несколько или даже сотни из них могут быть объединены вместе, поэтому это того стоит, если вы можете согласиться на немного более длительное время обработки, чтобы уменьшить потребность в сетевых подключениях. (Типичный пример: база данных, Http-интерфейс)

расширение:

  • Мы не изобретаем велосипед, компоненты в SpringCloudspring-cloud-starter-netflix-hystrixЕсть уже упакованные колеса вHystrixизHystrixCollapser来实现请求的合并,以减少通信消耗和线程数的占用.
  • Конечно, его компоненты более сложные и всеобъемлющие, поддерживающие асинхронные, синхронные, тайм-аут, исключения и другие механизмы обработки.
  • Однако, с точки зрения лежащей в основе идеи, это не что иное, как связь между потоками, переключение потоков, очереди и другие технологии, связанные с параллельным программированием. рука.

Это конец сегодняшнего объяснения, пожалуйста, перейдите к моему конкретному кодуВетка Mybot Project Master на GithubПроверьте это, разветвите, чтобы испытать это, или оставьте сообщение в области комментариев, чтобы обсудить, письмо не очень хорошее, пожалуйста, дайте мне еще совет~~