Распараллеливание — ваш убийца параллелизма

Java задняя часть

1. Введение

Мои одноклассники, которые любят игры, должно быть, в детстве мечтали, что если бы они могли овладеть несколькими тенями, такими как Наруто, они могли бы играть в игры и посещать уроки одновременно К сожалению, комиксы комиксами, и в Честно ходите на занятия или просто прогуливайте занятия, чтобы поиграть в игры. Хотя мы не можем реализовать технологию множественных теневых клонов в реальности, мы можем реализовать свое желание в компьютерном мире.

2. Клонировать в компьютере

Техника аватара в компьютере не рождается. В 1971 году Intel выпустила первый в мире микропроцессор общего назначения 4004, состоящий из 2300 транзисторов. В то время Гордон Мур, один из соучредителей компании, предложил знаменитый «Закон Мура» — каждые 18 месяцев количество транзисторов, которые можно интегрировать в микросхему, будет удваиваться. Исходная основная частота составляет 740 кГц (740 000 раз в секунду), и вот уже почти 50 лет, когда каждый покупает компьютер, он обнаружит, что текущая основная частота может достигать 4,0 ГГц (4 миллиарда раз в секунду). Однако чем выше частота, тем меньше преимуществ:

  • Подсчитано, что на каждый 1G прирост основной частоты потребляемая мощность будет увеличиваться на 25 Вт, а когда потребляемая мощность чипа превысит 150 Вт, существующая система отвода тепла с воздушным охлаждением не сможет удовлетворить потребности. рассеивания тепла. Некоторые процессоры можно использовать для жарки яиц.
  • Конвейер слишком длинный, что снижает частотную характеристику устройства.На самом деле, общая производительность большей основной частоты не так хороша, как меньшей основной частоты.
  • Гордон Мур считает, что закон Мура не сработает в ближайшие 10-20 лет.

Когда одноядерная частота сталкивается с узким местом, появляются многоядерные процессоры, как того требует время, что не только повышает производительность, но и снижает энергопотребление. Таким образом, многоядерные процессоры постепенно стали основным направлением современного рынка, что упрощает наше многопоточное программирование.

Когда дело доходит до многоядерного процессора, мы должны говорить о графическом процессоре. Вы можете быть незнакомы с этим, но когда дело доходит до видеокарты, вы должны быть знакомы с этим. Автор некоторое время занимался программированием CUDA, и я понял что это и есть настоящие параллельные вычисления.Знайте пиксели картинки, например 1920Изображение 1080 имеет 2,1 миллиона пикселей.Если мы хотим преобразовать каждый пиксель изображения, нам может потребоваться 2,1 миллиона циклов в нашем java. Даже если мы используем многопоточный 8-ядерный процессор, это потребует сотни тысяч циклов. Но при использовании Cuda до 365535*512=100 661 760 (100 миллионов) потоков выполняются параллельно, и изображения этого уровня обрабатываются сразу. Однако Cuda в целом подходит для картинок, там большое количество пикселей, которые нужно обрабатывать одновременно, но поддерживаемых инструкций не так много, поэтому логика не может быть слишком сложной. Графический процессор используется только для расширения введения.Если вам интересно, вы можете связаться с автором.

3. Параллелизм в приложениях

Когда дело доходит до способов сделать ваш сервис высокопроизводительным, то асинхронность и распараллеливание точно впервые всплывут в вашей голове, в предыдущей статье:«Асинхронность, ваш убийца параллелизма»Метод оптимизации асинхронизации был введен в , и заинтересованные друзья могут взглянуть. Распараллеливание можно использовать в сочетании с асинхронизацией или только для оптимизации.

Мы можем подумать о таком спросе.Когда вы размещаете заказ на вынос, может также потребоваться проверка заказа, информация о пользователе, информация о скидке, бизнес-информация, информация о блюде и т. д., и вызывать его синхронно, как показано на следующий рисунок:

Представьте, что эти пять сервисов запросов потребляют в среднем 50 мс каждый раз, тогда этот вызов составляет не менее 250 мс.Давайте подумаем, эти пять сервисов на самом деле не имеют никаких зависимостей, кто их получит первым, тот получит их позже, тогда мы можем думать о том, можно ли использовать технику множественных теней для получения информации этих пяти сервисов одновременно? Оптимизация выглядит следующим образом:

Параллельное обслуживание этих пяти запросов в идеальном случае можно оптимизировать до 50 мс. Конечно, легко сказать, как мы на самом деле приземляемся?

3.1 CountDownLatch/Phaser

CountDownLatch и Phaser — это классы инструментов синхронизации, предоставляемые JDK. Phaser — это класс инструментов, предоставляемый после версии 1.7, а CountDownLatch — это класс инструментов, предоставляемый после версии 1.5. Вот краткое введение в CountDownLatch, который можно рассматривать как счетчик.Метод await() может блокироваться до тех пор, пока время ожидания или счетчик не уменьшится до 0. Когда другие потоки завершат свои цели, они могут уменьшиться на 1. Используя этот механизм , мы можем использовать его для параллелизма. Мы можем использовать следующий код для выполнения вышеуказанных требований для размещения заказа:

public class CountDownTask {
    private static final int CORE_POOL_SIZE = 4;
    private static final int MAX_POOL_SIZE = 12;
    private static final long KEEP_ALIVE_TIME = 5L;
    private final static int QUEUE_SIZE = 1600;

    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,
            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));
    public static void main(String[] args) throws InterruptedException {
        // 新建一个为5的计数器
        CountDownLatch countDownLatch = new CountDownLatch(5);
        OrderInfo orderInfo = new OrderInfo();
        THREAD_POOL.execute(() -> {
            System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName());
            orderInfo.setCustomerInfo(new CustomerInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName());
            orderInfo.setDiscountInfo(new DiscountInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName());
            orderInfo.setFoodListInfo(new FoodListInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("当前任务Tenant,线程名字为:" + Thread.currentThread().getName());
            orderInfo.setTenantInfo(new TenantInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("当前任务OtherInfo,线程名字为:" + Thread.currentThread().getName());
            orderInfo.setOtherInfo(new OtherInfo());
            countDownLatch.countDown();
        });
        countDownLatch.await(1, TimeUnit.SECONDS);
        System.out.println("主线程:"+ Thread.currentThread().getName());
    }
}

Установите пул потоков (конкретная конфигурация зависит от конкретного бизнеса, конкретной конфигурации машины), одновременно выполняйте наши задачи (создавайте информацию о пользователе, информацию о блюдах и т. д.) и, наконец, используйте метод await для блокировки и ожидания результата. вернуться успешно.

3.2CompletableFuture

Я полагаю, вы обнаружили, что, хотя CountDownLatch может выполнять функции, которые нам нужны, все еще существует проблема, заключающаяся в том, что нам нужно связать код CountDownLatch с нашим бизнес-кодом.Например, после получения информации о пользователе мы выполним countDownLatch. countDown() , очевидно, что наш бизнес-код не должен заботиться об этой части логики, и если она будет упущена в процессе разработки, наш метод await будет разбужен только различными исключениями.

Поэтому в JDK1.8 предусмотрен класс CompletableFuture, который является многофункциональным неблокирующим Future. (Что будет в будущем: он используется для представления асинхронных результатов и предоставляет методы для проверки завершения вычислений, ожидания завершения, получения результатов и т. д.) Это подробно описано в моей предыдущей статье.CompletableFuture асинхронных навыков, вы можете прочитать эту статью, если вам интересно. Мы используем CompletableFuture для представления результатов расчета каждой задачи и используем CompletableFuture.allOf для объединения в большое CompletableFuture, а затем используем метод get() для блокировки.

public class CompletableFutureParallel {
    private static final int CORE_POOL_SIZE = 4;
    private static final int MAX_POOL_SIZE = 12;
    private static final long KEEP_ALIVE_TIME = 5L;
    private final static int QUEUE_SIZE = 1600;

    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,
            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        OrderInfo orderInfo = new OrderInfo();
        //CompletableFuture 的List
        List<CompletableFuture> futures = new ArrayList<>();
        futures.add(CompletableFuture.runAsync(() -> {
            System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName());
            orderInfo.setCustomerInfo(new CustomerInfo());
        }, THREAD_POOL));
        futures.add(CompletableFuture.runAsync(() -> {
            System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName());
            orderInfo.setDiscountInfo(new DiscountInfo());
        }, THREAD_POOL));
        futures.add( CompletableFuture.runAsync(() -> {
            System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName());
            orderInfo.setFoodListInfo(new FoodListInfo());
        }, THREAD_POOL));
        futures.add(CompletableFuture.runAsync(() -> {
            System.out.println("当前任务Other,线程名字为:" + Thread.currentThread().getName());
            orderInfo.setOtherInfo(new OtherInfo());
        }, THREAD_POOL));
        CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        allDoneFuture.get(10, TimeUnit.SECONDS);
        System.out.println(orderInfo);
    }
}

Видно, что мы можем быстро выполнить требования с помощью CompletableFuture, но этого, конечно же, недостаточно.

3.3 Fork/Join

Выше мы использовали CompletableFuture для завершения параллельного выполнения нескольких наборов задач, но он по-прежнему зависит от нашего пула потоков.В нашем пуле потоков мы используем блокирующие очереди, то есть когда один из наших потоков завершает выполнение задач, нам нужно пройти Для этой очереди блокировки обязательно будет конкуренция, поэтому в JDK1.7 предусмотрены ForkJoinTask и ForkJoinPool.

Каждый поток в ForkJoinPool имеет свою собственную рабочую очередь, а алгоритм Work-Steal используется для предотвращения голодания потока. Рабочие потоки используют метод LIFO для извлечения задач, но используют метод FIFO для кражи задач из чужих очередей, что уменьшает конфликты блокировок.

В Интернете есть много примеров этой структуры.Давайте посмотрим, как использовать код для выполнения наших требований к заказу выше:

public class OrderTask extends RecursiveTask<OrderInfo> {
    @Override
    protected OrderInfo compute() {
        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
        // 定义其他五种并行TasK
        CustomerTask customerTask = new CustomerTask();
        TenantTask tenantTask = new TenantTask();
        DiscountTask discountTask = new DiscountTask();
        FoodTask foodTask = new FoodTask();
        OtherTask otherTask = new OtherTask();
        invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask);
        OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join());
        return orderInfo;
    }
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() -1 );
        System.out.println(forkJoinPool.invoke(new OrderTask()));
    }
}
class CustomerTask extends RecursiveTask<CustomerInfo>{

    @Override
    protected CustomerInfo compute() {
        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
        return new CustomerInfo();
    }
}
class TenantTask extends RecursiveTask<TenantInfo>{

    @Override
    protected TenantInfo compute() {
        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
        return new TenantInfo();
    }
}
class DiscountTask extends RecursiveTask<DiscountInfo>{

    @Override
    protected DiscountInfo compute() {
        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
        return new DiscountInfo();
    }
}
class FoodTask extends RecursiveTask<FoodListInfo>{

    @Override
    protected FoodListInfo compute() {
        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
        return new FoodListInfo();
    }
}
class OtherTask extends RecursiveTask<OtherInfo>{

    @Override
    protected OtherInfo compute() {
        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
        return new OtherInfo();
    }
}

Мы определяем OrderTask и определяем пять задач для получения информации, разветвляемся для выполнения этих пяти задач в вычислениях и, наконец, получаем результаты этих пяти задач с помощью Join и, наконец, выполняем наши требования к распараллеливанию.

3.4 parallelStream

API параллельного потока предоставляется в jdk1.8. Когда мы используем коллекции, мы можем очень хорошо выполнять параллельную обработку. Вот простой пример добавления от 1 до 100:

public class ParallelStream {
    public static void main(String[] args) {
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (int i = 1; i <= 100; i++) {
            list.add(i);
        }
        LongAdder sum = new LongAdder();
        list.parallelStream().forEach(integer -> {
//            System.out.println("当前线程" + Thread.currentThread().getName());
            sum.add(integer);
        });
        System.out.println(sum);
    }
}

Набор, используемый на нижнем уровне в parallelStream, также является набором Fork/Join.Уровнем параллелизма по умолчанию является число доступных ЦП -1.

3.5 Шардинг

Не исключено, что есть такая потребность выдавать купоны пользователям, чьи ID находятся в определенном диапазоне каждый день.Например, пользователей в этом диапазоне миллионы.Если отправить на одну машину, то может занять много времени чтобы отправить их все. , поэтому распределенные структуры планирования, такие как: elastic-job. Оба обеспечивают функцию шардирования, например, если вы используете 50 машин, то id%50=0 будет выдаваться на 0-й машине, а с id%50=0 будет выдаваться на 1-й машине, то наш время выполнения фактически будет выделено на другую машину.

4. Вопросы параллелизации

  • Потокобезопасность: LongAdder используется в коде, который мы указали в parallelStream, а наши Integer и Long не используются напрямую, потому что Integer и Long не являются потокобезопасными в многопоточной среде. Поэтому нам нужно уделить особое внимание безопасности потоков.
  • Разумная конфигурация параметров: видно, что нам нужно настроить множество параметров, таких как размер нашего пула потоков, размер очереди ожидания, размер параллелизма и время ожидания ожидания и т. д. Нам всем нужно постоянно настраивайтесь в соответствии с нашими собственными делами, чтобы предотвратить недостаточную очередь или необоснованное время ожидания и так далее.

5. Наконец

В этой статье рассказывается о том, что такое распараллеливание, различные истории распараллеливания, как оно реализовано в Java и соображения по поводу распараллеливания. Я надеюсь, что у всех есть более полное представление о распараллеливании. Напоследок два небольших вопроса к вам:

  1. В нашем распараллеливании есть задача, что делать, если в задаче есть исключение?
  2. В нашем распараллеливании информация какой-то задачи не сильно зависима, то есть если есть проблема, то эта часть информации нам не нужна.Что делать при распараллеливании, если эта задача нештатная?

Наконец, эта статья была включена в JGrowing, всеобъемлющий и отличный маршрут изучения Java, совместно созданный сообществом.Если вы хотите участвовать в обслуживании проектов с открытым исходным кодом, вы можете создать его вместе.Адрес github:GitHub.com/Java растет…Пожалуйста, дайте мне маленькую звезду.

Если вы считаете, что в этой статье есть статьи для вас, вы можете подписаться на мой технический паблик.За последнее время автор собрал много свежих обучающих материалов,видео и материалов интервью.Вы можете получить их, обратив внимание.Ваше внимание и пересылка самая большая поддержка для меня. , O(∩_∩)O