Параллельные контейнеры и управление параллелизмом — JUC (3)

Java

We are all in the gutter, but some of us are looking at the stars.

содержание

  1. Пул потоков и ThreadLocal — JUC (1)
  2. Замок с CAS-JUC (2)
  3. Параллельные контейнеры и управление параллелизмом — JUC (3)
  4. AQS - JUC (четыре)

Резюме

  • Почему никто не использует егоVectorиHashtableуже?HashMapГде поток небезопасен?
  • ConcurrentHashMapЧто такое эволюция и операции Sao?
  • В чем идея копирования при записи? Какие примеры?
  • Зачем вам параллельные очереди? На какие параллельные очереди мы закрываем глаза?
  • Что нам делать, если мы хотим контролировать потоки в порядке очереди? Давай оправдываться по одному?

параллельный контейнер

Давайте посмотрим, что делают эти параллельные контейнеры.

1. Concurrent*  大部分是通过CAS实现并发
2. CopyOnWrite*  复制一份原数据
3. Blocking*  通过AQS实现

1. Почему Vector и Hashtable остались в длинной реке истории?

Это очень просто, не думайте слишком много, просто потому, что производительность не очень хорошая.

Так почему производительность плохая? Где теряется его производительность?

Давайте взглянемVectorизgetметод:

смотри, смотри, этоsynchronizedОн напрямую изменяется в методе.Если вы посмотрите вверх и вниз, вы обнаружите, что в основном все методы этого классаsynchronizedдекоративный.

Тогда некоторые люди могут спросить, почему метод блокировки имеет низкую производительность? Потому что эта степень детализации блокировки является экземпляром объекта.

Hashtableтоже так.

2. Почему HashMap небезопасен для потоков?

  1. В то же время коллизия приводит к потере данных

    Например, если два потока и помещают данные, и используют один и тот же ключ, вычисляемые ими хэш-значения будут одинаковыми и размещены в одном и том же месте, что неизбежно приведет к потере данных.

  2. В то же время расширение става вызывает потерю данных

    При одновременном расширении сохраняется только один расширенный массив.

  3. Бесконечный цикл вызывает CPU100%

    Одновременное расширение многопоточности может привести к тому, что круговой связанный список вызовет 100% загрузку ЦП. Но исследовать саму эту проблему бессмысленно (так же считают и представители Sun, он считает, что не стоит использовать hashMap для решения параллельных сценариев, это проблема сама по себе, не так ли?). Если вам действительно интересно, взгляните на этоклассная оболочка.cai/articles/96…

3. В чем разница между ConcurrentHashMap 1.7 и 1.8?

  1. структура данных

    Структура версии 1.7 (сегмент + хэш-запись + небезопасно):

    Структура версии 1.8 (удалить сегмент, уменьшить степень детализации блокировки, синхронизированный + CAS + узел + небезопасный):

  2. Хэш-коллизия

    Первоначальный метод застежки меняется на красно-черное дерево, если количество узлов превышает 8.

    Здесь есть небольшая проблема,Почему оно превращается в красно-черное дерево, когда превышает 8? Почему 8? Почему бы не использовать красно-черное дерево в первую очередь?

    1. Каждый узел красно-черного дерева занимает вдвое больше места, чем связный список.
    2. Автор использует расчет вероятности распределения Пуассона, и вероятность крайне мала, когда она достигает 8.
  3. Гарантированная безопасность параллелизма

    1.7 Использовать блокировку сегмента Segment (расширяет ReentreenLock) для обеспечения безопасности 16 сегментов (сегменты не могут быть динамически увеличены); 1.8 ИспользованиеCAS + synchronizedЧтобы обеспечить детализацию для каждого узла.

  4. сложность запроса

    При наличии более 8 конфликтов эффективность запроса также может быть гарантирована.O(n)красно-черное деревоO(logn)

4. Как ConcurrentHashMap реализует комбинированные операции, такие как a++?

Очевидно,ConcurrentHashMapОн гарантирует безопасность параллелизма одновременного ввода, но не говорит, что вы выполняете одновременноa++также является потокобезопасным, когда:

int a = map.get(key);
int b = a+1;
map.put(key, b);

Итак, давайте подумаем, как решить эту проблему?

использовать этоsynchronizedЗаверните?

которые используютConcurrentHashMapВ чем смысл? Давайте рассмотрим такой метод: заменить

// 利用CAS的思想,去循环的判断并进行++的操作,直到成功为止
// replace保证了并发修改的安全性
while (true) {
    int ori = map.get("key");
    int tar = ori + 1;
    boolean isSuccess = map.replace("key", ori, tar);
    if (isSuccess) {
        break;
    }
}

Давайте рассмотрим комбинированную операцию для решения проблемы покрытия put одного и того же ключа:putIfAbsent

// 核心思想如下表示
if (!map.contains("key")){
    return map.put("key", "value");
}else {
    return map.get("key");
}

5. Каковы применимые сценарии CopyOnWriteArrayList?

Давайте сначала подумаем, почему существует такая вещь?

  1. заменитьVectorиSynchronizedList, какConcurrentHashMapзаменятьSynchronizedMapта же причина
  2. VectorиSynchronizedListСтепень детализации блокировки слишком велика, эффективность параллелизма относительно низка, и ее нельзя изменить во время обхода.
  3. Контейнеры Copy-On-Write также включаютCopyOnWriteArraySet, который используется вместо синхронногоSet

Хорошо, теперь поговорим о применимых сценариях

  1. Больше читайте и меньше пишите

    Операции чтения должны выполняться как можно быстрее, а операции записи могут выполняться медленнее, например, в сценариях с черным списком.

  2. Требования к записи в реальном времени невысокие

    То есть, моя модификация не означает, что она должна вступить в силу немедленно, пусть даже ненадолго, или она все еще читает предыдущие данные, это не имеет значения.

Каковы правила чтения и записи для CopyOnWriteArrayList?

  • Синхронизация чтения-чтения, синхронизация чтения-записи, взаимное исключение записи-записи

  • Это похоже на правила для блокировок чтения-записи? Но это другое. Апгрейд здесь в том, что даже если вы пишете, я могу это прочитать.

  • Посмотрим, когда мы напишем

    Придумайте и сначала заблокируйте его, затем выйдите с новым массивом, вставьте новый элемент, затем измените исходный адрес, разблокируйте его и сделайте это за один раз.
  • при чтении?

    Никакой защиты нет, просто прочитайте и все.

Каков принцип реализации?

  • Принцип CopyOnWrite: При записи сначала будет сделана копия, а адрес будет изменен после записи
  • Принцип «неизменяемости»: каждый список эквивалентен неизменяемому множеству, и над ним не выполняются никакие операции модификации

В чем недостаток?

  • Непротиворечивость данных: может быть гарантирована только окончательная согласованность данных, непротиворечивость в реальном времени не может быть гарантирована.
  • Использование памяти: копия занимает в два раза больше памяти.

6. Что такое параллельная очередь?

Это очередь, обеспечивающая безопасность параллелизма.Она делится на блокирующие очереди (как показано на рисунке ниже) и неблокирующие очереди.Разница между блокирующими и неблокирующими заключается в том, что когда вы кладете (или вынимаете) элементы, очередь заполнена (или пуста), будет ли она заблокирована.

Наиболее типичным применением очереди блокировки является производитель-потребитель.

Каковы характеристики этих очередей на картинке выше? мы собрались вместе

  1. ArrayBlockingQueue: Ограничено, можно указать емкость, нижний слой реализован массивом, справедливость можно установить

    Мы видим, что на шаге размещения элемента используется запирающий (ломаемый) метод реентерабельной блокировки.
  2. PriorityBlockingQueue: поддержка настройки приоритета (в соответствии с естественным порядком сортировки объекта или компаратором, введенным конструктором), порядок входа и выхода из очереди - установленный порядок (не первый вошел первым вышел), неограниченная очередь,PriorityQueueПоточно-безопасная версия .

  3. LinkedBlockingQueue: По умолчанию неограничен (мощность Integer.MAX_VALUE, емкость можно указать), внутренняя структура представляет собой массив узлов + две блокировки, производительность в целом лучше, чемArrayBlockingQueue(меньшая степень детализации блокировки)

    используется здесьCondition + ReentrantLock, что на самом деле эквивалентноsynchronized + Object.wait() + Object.notify(), которая является отличной очередью для производителей-потребителей.Если вы хотите увидеть нативную реализацию, вы можете взглянуть на производителя-потребителя, который я реализовал, используя ожидание + уведомление в принципе параллелизма.
    Можем понять, посмотрев на такой способ выставления элементов: если очередь заполнена, блокируем, иначе ставим элемент, если можно продолжать ставить, пробуждаем поток, ожидающий в notFull, если перед выставлением очередь пуста, то будит потоки, ожидающие notEmpty.
  4. SynchronousQueue: Емкость равна 0, а не 1, пул потоковExecutors.newCacheThreadPool()Используемая очередь блокировки

    Он не имеет фактической емкости.Любой поток (поток-производитель или поток-потребитель, операции производственного типа, такие как размещение, предложение, операции типа потребления, такие как опрос, получение) будет ждать, пока данные не будут получены или доставка не будет завершена, прежде чем вернуться. Задача потока-производителя состоит в том, чтобы доставить данные, прикрепленные к потоку, в поток-потребитель, а поток-потребитель ожидает данных от потока-производителя.
  5. ConcurrentLinkedQueue: Единственная неблокирующая очередь в параллельном пакете, которая реализована внутри посредством связанного списка, и основной идеей также является CAS.

  6. DelayQueue: Очередь задержки (отсортирована по времени задержки), элементы должны быть реализованыDelayedинтерфейс (указать сортировку)

Контролируйте параллельные процессы

Давайте посмотрим, что может управлять параллельным процессом

7. Каковы типичные применимые сценарии CountDownLatch?

  1. Один поток ожидает завершения выполнения нескольких потоков.

    При обработке мультимедиа команду сплайсинга необходимо выполнять после того, как будут загружены все задачи для сплайсинга.

  2. Несколько потоков ждут завершения выполнения одного потока

    В сценарии стресс-теста все потоки ждут приказа немедленно оказать давление на сервер, вместо того, чтобы медленно создавать потоки.

  3. ждать много

Возьмем небольшой каштан, чтобы имитировать сцену бега спортсменов, все спортсмены начинают бежать после выстрелов, а судья объявляет об окончании игры после того, как спортсмены достигли финиша.

CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(10);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
    final int no = i + 1;
    Runnable runnable = () -> {
        System.out.println("No." + no + "准备完毕,等待枪响");
        try {
            begin.await();
            System.out.println("No." + no + "开始冲刺了");
            Thread.sleep((long) (Math.random() * 10000));
            System.out.println("No." + no + "跑到终点了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            end.countDown();
        }
    };
    service.submit(runnable);
}
//裁判员搞一些准备工作...
Thread.sleep(5000);
System.out.println("枪响,比赛开始!");
begin.countDown();

end.await();
System.out.println("所有人到达终点,比赛结束!");

Этот класс нельзя использовать повторно, т.е. нельзя пересчитывать. Если требуется повторное использование, рассмотрите вариант CyclicBarrier и создайте новый экземпляр.

8. Каковы применимые сценарии Semaphore?

Когда вам нужно контролировать количество одновременных потоков.

Например, в сценарии обработки мультимедиа производительность ЦП машины ограничена, и мы можем разрешить одновременное выполнение до 3 задач транскодирования.В это время нам нужно контролировать количество потоков, которые выполняют транскодирование.

Давайте разберемся с картинкой:

Далее смотрите небольшую демонстрацию
static Semaphore semaphore = new Semaphore(3, true);

public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(50);
    for (int i = 0; i < 100; i++) {
        service.submit(new Task());
    }
    service.shutdown();
}

static class Task implements Runnable {

    @Override
    public void run() {
        try {
            semaphore.acquire();
            // 这里需要设置请求的数量
            //semaphore.acquire(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "拿到了许可证");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "释放了许可证");
        semaphore.release();
        // 也可以设置释放的数量
        //semaphore.release(2);
    }
}

будь осторожен:

  1. Количество приобретения и выпуска должно быть одинаковым (например, вы запросили 3, а выпускаете только 2 за раз, поэтому эта лицензия будет использоваться все реже и реже, эта вещь не ограничена на уровне класса, поэтому вам нужно обращать внимание при кодировании).
  2. Честность, как правило, разумнее установить в true (поскольку семафоры обычно используются для управления медленными сервисами, если вы также установите несправедливость, легко вызвать голодание).
  3. Он не обязательно должен быть освобожден запрашивающим потоком, но может быть освобожден и другими потоками (может случиться так, что поток A получил 2, а поток B не получил, а только освободил 2).

9. В чем разница между CyclicBarrier и CountDownLatch?

  1. Он работает иначе:CyclicBarrierнеобходимо дождаться, пока фиксированное количество потоков достигнет ограждения, прежде чем продолжить, в то время какCountDownLatchПросто посчитай до 0. Это,CountDownLatchдля мероприятий,CyclicBarrierдля нитей.

    увидеть одинCyclicBarrierДемо:

public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("所有人都到场了, 大家统一出发!"));
    for (int i = 0; i < 10; i++) {
        new Thread(new Task(i, cyclicBarrier)).start();
    }
}

static class Task implements Runnable{
    private int id;
    private CyclicBarrier cyclicBarrier;

    public Task(int id, CyclicBarrier cyclicBarrier) {
        this.id = id;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        System.out.println("线程" + id + "现在前往集合地点");
        try {
            Thread.sleep((long) (Math.random()*10000));
            System.out.println("线程"+id+"到了集合地点,开始等待其他人到达");
            cyclicBarrier.await();
            System.out.println("线程"+id+"出发了");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}
  1. Повторное использование отличается:new CyclicBarrier(5)Выполнить после получения 5 потоковrun, вы можете продолжать ждать прибытия следующих 5 потоков.

Суммировать

Угу, я наконец закончил говорить о причудливых параллельных контейнерах Теперь я знаю, что CopyOnWrite можно использовать для решения сценария больше читать и меньше писать, использовать блокирующие очереди для реализации модели производитель-потребитель, использоватьCountDownLatchЧтобы укротить нити, которые быстро ускользают.

В следующей главе мы совершим большое дело и отправим президента мира:AQS. попробуй использоватьAQSЧтобы реализовать собственный инструмент параллелизма.