14 параллельных контейнеров Java, сколько вы использовали?

Java
Добавить Автора

предисловие

Без учета многопоточного параллелизма классы-контейнеры обычно используют небезопасные для потоков классы, такие как ArrayList и HashMap, которые более эффективны. В параллельных сценариях часто используются потокобезопасные классы контейнеров, такие как ConcurrentHashMap и ArrayBlockingQueue.Несмотря на некоторую жертву эффективности, достигается безопасность.
Упомянутые выше потокобезопасные контейнеры находятся в пакете java.util.concurrent. В этом пакете много параллельных контейнеров. Сегодня я найду их все.
Будет дано только краткое введение, после чего будут проведены более глубокие исследования.

Введение в параллельные контейнеры

  1. ConcurrentHashMap: Параллельная HashMap
  2. CopyOnWriteArrayList: Параллельная версия ArrayList
  3. CopyOnWriteArraySet: параллельный набор
  4. ConcurrentLinkedQueue: параллельная очередь (на основе связанного списка)
  5. ConcurrentLinkedDeque: параллельная очередь (на основе двусвязного списка)
  6. ConcurrentSkipListMap: параллельная карта на основе списка пропуска
  7. ConcurrentSkipListSet: параллельный набор на основе списка пропуска
  8. ArrayBlockingQueue: очередь блокировки (на основе массива)
  9. LinkedBlockingQueue: очередь блокировки (на основе связанного списка)
  10. LinkedBlockingDeque: очередь блокировки (на основе двусвязного списка)
  11. PriorityBlockingQueue: потокобезопасная приоритетная очередь
  12. SynchronousQueue: чтение и запись парных очередей
  13. LinkedTransferQueue: очередь обмена данными на основе связанного списка
  14. DelayQueue: очередь задержки

1. ConcurrentHashMap Параллельное HashMap

Один из наиболее распространенных параллельных контейнеров, который можно использовать в качестве кэша в параллельных сценариях. Нижний слой по-прежнему представляет собой хэш-таблицу, но она сильно изменилась в JAVA 8, а в JAVA 7 и JAVA 8 используется больше версий, поэтому методы реализации этих двух версий часто сравнивают (например, интервью).
Большая разница заключается в том, что в JAVA 7 сегментированные блокировки используются для снижения конкуренции блокировок, а в JAVA 8 от сегментированных блокировок отказываются и используется CAS (оптимистическая блокировка). в связанные списки (при возникновении конфликта в этом месте будет создан связанный список, и объекты с одинаковым значением хеш-функции будут связаны вместе) и будет преобразован в красно-черное дерево после длины связанного списка достигает порога (8).

2.CopyOnWriteArrayList параллельной версии ArrayList

Параллельная версия ArrayList, базовая структура которого также является массивом, отличается от ArrayList тем, что новый массив создается при добавлении и удалении элементов, указанные объекты добавляются или исключаются из нового массива, и, наконец, исходный массив заменен новым массивом. .
Применимые сценарии: поскольку операции чтения не заблокированы, а операции записи (добавление, удаление, изменение) заблокированы, он подходит для сценариев с большим количеством операций чтения и меньшим количеством операций записи.
Ограничение: Поскольку при чтении нет блокировки (эффективность чтения высокая, как и у обычного ArrayList), текущая копия считывается, поэтому грязные данные могут быть прочитаны. Если вы возражаете, рекомендуется не использовать его.
Взгляните на исходный код и почувствуйте его:
public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    final transient ReentrantLock lock = new ReentrantLock();
    private transient volatile Object[] array;
    // 添加元素,有锁
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock(); // 修改时加锁,保证并发安全
        try {
            Object[] elements = getArray(); // 当前数组
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1); // 创建一个新数组,比老的大一个空间
            newElements[len] = e; // 要添加的元素放进新数组
            setArray(newElements); // 用新数组替换原来的数组
            return true;
        } finally {
            lock.unlock(); // 解锁
        }
    }
    // 读元素,不加锁,因此可能读取到旧数据
    public E get(int index) {
        return get(getArray(), index);
    }
}


3. Параллельный набор CopyOnWriteArraySet

На основе реализации CopyOnWriteArrayList (включая переменную-член CopyOnWriteArrayList), то есть нижний слой представляет собой массив, а это означает, что каждое добавление должно пройти через всю коллекцию, чтобы узнать, существует ли она или нет. необходимо вставить (заблокировать).
Применимые сценарии: добавьте один в применимый сценарий CopyOnWriteArrayList, и коллекция не должна быть слишком большой (любой обход не повредит).

4. Параллельная очередь ConcurrentLinkedQueue (на основе связанного списка)

Параллельная очередь, реализованная на основе связанного списка, использует оптимистическую блокировку (CAS) для обеспечения безопасности потоков. Поскольку структура данных представляет собой связанный список, теоретически нет ограничений на размер очереди, а это означает, что добавление данных должно быть успешным.

5. Параллельная очередь ConcurrentLinkedDeque (на основе двусвязного списка)

Параллельная очередь, реализованная на основе двусвязного списка, может работать с началом и хвостом по отдельности, поэтому в дополнение к очереди «первым пришел — первым обслужен» (FIFO) она может быть еще и очередью «первым пришел — последним обслужен» (FILO). Конечно, если он первый, его следует называть стеком.

6. ConcurrentSkipListMap Concurrent Map на основе списка пропуска

SkipList — это список с пропусками.Список с пропусками — это структура данных, которая обменивает пространство на время.Благодаря избыточным данным связанный список индексируется слой за слоем для достижения эффекта, аналогичного бинарному поиску.

7. ConcurrentSkipListSet Concurrent Set на основе списка пропуска

Подобно отношениям между HashSet и HashMap, в ConcurrentSkipListSet есть ConcurrentSkipListMap, поэтому я не буду вдаваться в подробности.

8.ArrayBlockingQueue очередь блокировки (на основе массива)

Блокируемая очередь реализована на основе массива, размер массива нужно указывать при построении, а если массив заполнен, то он будет блокироваться до тех пор, пока не найдется позиция (также поддерживается прямой возврат и ожидание тайм-аута), а потокобезопасность гарантируется блокировкой ReentrantLock.
В качестве примера возьмем операцию offer:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /**
     * 读写共用此锁,线程间通过下面两个Condition通信
     * 这两个Condition和lock有紧密联系(就是lock的方法生成的)
     * 类似Object的wait/notify
     */
    final ReentrantLock lock;
    /** 队列不为空的信号,取数据的线程需要关注 */
    private final Condition notEmpty;
    /** 队列没满的信号,写数据的线程需要关注 */
    private final Condition notFull;
    // 一直阻塞直到有东西可以拿出来
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    // 在尾部插入一个元素,队列已满时等待指定时间,如果还是不能插入则返回
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); // 锁住
        try {
            // 循环等待直到队列有空闲
            while (count == items.length) {
                if (nanos <= 0)
                    return false;// 等待超时,返回
                // 暂时放出锁,等待一段时间(可能被提前唤醒并抢到锁,所以需要循环判断条件)
                // 这段时间可能其他线程取走了元素,这样就有机会插入了
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);//插入一个元素
            return true;
        } finally {
            lock.unlock(); //解锁
        }
    }

На первый взгляд будет немного запутанно.Чтение и запись это один и тот же замок.Если он пустой то придет просто читающий поток и не будет ли он постоянно блокироваться?
Ответ кроется в notEmpty и notFull.Эти две мелочи от блокировки заставляют блокировку иметь функции похожие на synchronized+wait+notify.

9.Очередь блокировки LinkedBlockingQueue (на основе связанного списка)

Блокирующая очередь, реализованная на основе связанного списка, имеет дополнительное ограничение емкости, чем неблокирующая ConcurrentLinkedQueue.Если она не задана, по умолчанию используется максимальное значение int.

10. Очередь блокировки LinkedBlockingDeque (на основе двусвязного списка)

Аналогичен LinkedBlockingQueue, но предоставляет операции, специфичные для двусвязных списков.

11.PriorityBlockingQueue потокобезопасная приоритетная очередь

При построении вы можете передать компаратор, что видно по тому, как вставленные элементы будут отсортированы, а затем использованы по порядку при чтении. Некоторые элементы с низким приоритетом могут не потребляться в течение длительного времени, потому что элементы с более высоким приоритетом продолжают поступать.

12.SynchronousQueue очередь обмена синхронизацией данных

Поддельная очередь, поскольку в ней нет реального места для хранения элементов, каждая операция вставки должна иметь соответствующую операцию выборки, и она не может продолжать вставляться, если она не извлечена.
Простой пример, чтобы почувствовать это:
import java.util.concurrent.*;
public class Main {
    public static void main(String[] args) {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                // 没有休息,疯狂写入
                for (int i = 0; ; i++) {
                    System.out.println("放入: " + i);
                    queue.put(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                // 咸鱼模式取数据
                while (true) {
                    System.out.println("取出: " + queue.take());
                    Thread.sleep((long) (Math.random() * 2000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
/* 输出:
放入: 0
取出: 0
放入: 1
取出: 1
放入: 2
取出: 2
放入: 3
取出: 3
*/

Видно, что пишущий поток не спит, можно сказать, что он ставит все в очередь, тогда как читающий поток очень неактивен, читает один и некоторое время спит. Результатом вывода является то, что операции чтения и записи появляются парами.
Сценарий использования в JAVA — Executors.newCachedThreadPool(), который создает пул потоков кэша.
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
        0, // 核心线程为0,没用的线程都被无情抛弃
        Integer.MAX_VALUE, // 最大线程数理论上是无限了,还没到这个值机器资源就被掏空了
        60L, TimeUnit.SECONDS, // 闲置线程60秒后销毁
        new SynchronousQueue<Runnable>()); // offer时如果没有空闲线程取出任务,则会失败,线程池就会新建一个线程
}


13.LinkedTransferQueue — очередь обмена данными на основе связанного списка.

Реализован интерфейс TransferQueue, и при размещении элемента через метод переноса, если обнаруживается, что поток заблокирован при извлечении элемента, элемент будет напрямую передан ожидающему потоку. Если никто не ожидает использования, элемент помещается в конец очереди, и метод блокируется до тех пор, пока кто-нибудь не прочитает этот элемент. Аналогичен SynchronousQueue, но более мощный.

14.DelayQueue очередь задержки

Элементы, помещенные в очередь, могут быть извлечены потребителями после указанной задержки, и элементы должны реализовать интерфейс Delayed.

Суммировать

Вышеприведенное кратко представляет некоторые классы-контейнеры в параллельном пакете JAVA Зная, что есть эти вещи, когда вы сталкиваетесь с подходящей сценой, вы можете подумать о готовой вещи, которую вы можете использовать. Если вы хотите знать, почему, вам придется исследовать дальше в будущем.

Наконец

Прошу всех обратить внимание на мой официальный аккаунт [Программист в погоне за ветром], в нем будут обновляться статьи, а также размещаться отсортированная информация.