Java Concurrency (9) — от синхронных контейнеров к параллельным контейнерам

Java задняя часть контейнер Безопасность

введение

Контейнеры являются наиболее часто используемой частью библиотеки базовых классов Java. Пакет коллекции Java предоставляет большое количество классов-контейнеров, чтобы помочь нам упростить разработку. В моей предыдущей статье я провел серию анализов ключевых контейнеров в коллекции Java. Однако эти классы-коллекции не являются потокобезопасными, то есть в многопоточной среде необходимы другие дополнительные средства для обеспечения корректности данных. потокобезопасные контейнерные коды. Выполнять синхронно. Хотя этот метод может обеспечить безопасность потоков, существует несколько очевидных проблем: во-первых, существует определенная сложность кодирования, и необходимо добавлять блокировки к соответствующим сегментам кода. Во-вторых, этот универсальный подход плохо работает в ситуациях с высокой степенью параллелизма и в основном эквивалентен последовательному выполнению. JDK1.5 предоставляет нам ряд параллельных контейнеров, которые сосредоточены в пакете java.util.concurrent для решения этих двух проблем, начиная с контейнера синхронизации.

Синхронизируйте контейнер Vector и HashTable

Чтобы упростить процесс разработки кода, ранний JDK предоставлял два контейнера синхронизации, Vector и HashTable в пакете java.util Реализация этих двух контейнеров в основном такая же, как и в ранней реализации кода ArrayList и HashMap. Разница в том, что Vector и HashTable находятся в каждом ключевом слове synchronized, добавленном к каждому методу, чтобы гарантировать, что только один поток может получить доступ к одному и тому же экземпляру в одно и то же время.Часть исходного кода выглядит следующим образом:

//Vector
public synchronized int size() {};
public synchronized E get(int index) {};

//HashTable 
public synchronized V put(K key, V value) {};
public synchronized V remove(Object key) {};

Добавляя synchronized к каждому методу, гарантируется сериализация нескольких операций. Хотя этот метод удобен в использовании, он не решает проблему производительности при высоком параллелизме, он ничем не отличается от ручной блокировки ArrayList и HashMap, будь то чтение или запись, весь контейнер будет заблокирован. Во-вторых, у этого метода есть еще одна проблема: когда несколько потоков выполняют составные операции, он небезопасен для потоков. Это можно проиллюстрировать следующим кодом:

public static void deleteVector(){
    int index = vectors.size() - 1;
    vectors.remove(index);
}

Код выполняет двухшаговые операции над вектором: сначала получается размер, а затем удаляется последний элемент, в случае многопоточности, если два потока выполняются крест-накрест, после вызова размера потоком А поток Б удаляет последний элемент. В это время поток A удаляет последний элемент. Продолжение удаления вызовет ошибку индекса вне диапазона.

Итак, как решить эту проблему? Самая простая модификация — заблокировать блок кода, чтобы предотвратить одновременное выполнение нескольких потоков:

public static void deleteVector(){
    synchronized (vectors) {
        int index = vectors.size() - 1;
        vectors.remove(index);
    }
}

Если вышеуказанная проблема не слишком интуитивно понятна для решения с помощью блокировки, посмотрите на итерацию по векторам:

public static void foreachVector(){
    synchronized (vectors) {
        for (int i = 0; i < vectors.size(); i++) {
            System.out.println(vectors.get(i).toString());
        }
    }
}

Во избежание модификации векторов другими потоками во время процесса итерации в случае многопоточности весь процесс итерации должен быть заблокирован.Представьте себе такой сценарий, если операция итерации очень частая, или элементы векторы очень велики, то все модификации и операции чтения будут ждать вне блокировки, что окажет огромное влияние на многопоточную производительность. Итак, есть ли способ хорошо разделить итеративную операцию и операцию модификации контейнера, чтобы во время модификации не затрагивалась итеративная операция контейнера? Это требует появления различных concurrent-контейнеров в пакете java.util.concurrent.

Параллельный контейнер CopyOnWrite

CopyOnWrite — контейнер с копированием при записи — часто используемый параллельный контейнер. Он позволяет повысить производительность одновременного выполнения за счет разделения операций чтения и записи в многопоточном режиме. Операция записи должна быть заблокирована. Отличие в том, что в CopyOnWrite после блокировки операции модификации контейнера измените его, скопировав новый контейнер, а затем замените контейнер новым контейнером после модификации.

Преимущества этого метода очевидны: при копировании нового контейнера для модификации операция чтения не нуждается в блокировке и может читаться параллельно, поскольку в процессе чтения используется старый контейнер, даже если новый контейнер модифицируется. к старому Контейнер не имеет никакого эффекта, а также решает проблему параллелизма, вызванную модификациями других потоков в процессе итерации.

Параллельные контейнеры, предоставляемые в JDK, включают CopyOnWriteArrayList и CopyOnWriteArraySet. Давайте разберемся с этой идеей через частичный исходный код CopyOnWriteArrayList:

//添加元素
public boolean add(E e) {
    //独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        //复制一个新的数组newElements
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        //修改后指向新的数组
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

public E get(int index) {
    //未加锁,直接获取
    return get(getArray(), index);
}

Код очень прост: в операции добавления для получения блокировки используется общий ReentrantLock, который может предотвратить одновременное изменение содержимого контейнера несколькими потоками в условиях многопоточности. После получения блокировки новый контейнер копируется через Arrays.copyOf, затем новый контейнер модифицируется и, наконец, исходная ссылка на массив напрямую указывает на новый массив через setArray, что позволяет избежать ошибок при повторении данных в процессе модификации. Поскольку операция get является операцией чтения, она не заблокирована и может быть прочитана напрямую. CopyOnWriteArraySet аналогичен и не будет подробно объясняться здесь.

Хотя контейнер CopyOnWrite безопасен для использования в условиях многопоточности и по сравнению с Vector, он также значительно повышает производительность чтения и записи, но и у него есть свои проблемы.

Во-первых, это производительность.Как упоминалось в статье, объясняющей ArrayList, расширение ArrayList оказывает определенное влияние на производительность из-за использования Arrays.copyOf для применения большего пространства и копирования существующих элементов в новый массив каждый раз. Контейнер CopyOnWrite не является исключением: каждая операция модификации будет применена к новому пространству массива, а затем заменит его. Таким образом, в случае высокой параллелизма и частого изменения контейнера будет постоянно применяться новое пространство, и в то же время будет вызываться частый GC.В настоящее время использование контейнера CopyOnWrite не является хорошим выбором.

Во-вторых, существует проблема непротиворечивости данных, так как новый массив копируется и заменяется во время модификации, и если старый массив все еще используется, новые данные не могут быть вовремя прочитаны, что приводит к несогласованности данных. требуется, а контейнер CopyOnWrite тоже не подходит.

Параллельный контейнер ConcurrentHashMap

По сравнению с контейнером CopyOnWrite контейнер ConcurrentHashMap имеет дополнительную оптимизацию в отношении детализации блокировки параллелизма.Он обеспечивает более детальное управление параллелизмом за счет изменения блокировки на одном элементе хэш-контейнера. Прежде чем разбираться в контейнере ConcurrentHashMap, я рекомендую вам прочитать мою предыдущую статью об анализе исходного кода HashMap:Коллекция Java (5) Один HashMap и HashSet, потому что в базовой структуре данных и ConcurrentHashMap, и HashMap используют метод массив + связанный список + красно-черное дерево, но добавляют только некоторые элементы управления, связанные с параллелизмом, на основе HashMap, поэтому здесь я делаю только некоторый анализ связанных с параллелизмом код в ConcurrentHashMap.

Или начните с операции записи ConcurrentHashMap, вот метод put:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode()); //计算桶的hash值
    int binCount = 0;
    //循环插入元素,避免并发插入失败
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果当前桶无元素,则通过cas操作插入新节点
            if (casTabAt(tab, i, null,
                            new Node<K,V>(hash, key, value, null)))
                break;                   
        }
        //如果当前桶正在扩容,则协助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //hash冲突时锁住当前需要添加节点的头元素,可能是链表头节点或者红黑树的根节点
            synchronized (f) { 
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                            value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

В процессе размещения элементов есть несколько ключевых моментов для параллельной обработки:

  • Если узел, соответствующий текущему сегменту, еще не вставил элемент, попробуйте вставить новый узел с помощью типичной операции cas без блокировки, чтобы уменьшить вероятность блокировки.Если вставка не удалась при параллельных условиях, легко подумать спина, то естьfor (Node<K,V>[] tab = table;;).
  • Если текущий сегмент расширяется, помогите расширить((fh = f.hash) == MOVED). Вот ключевой момент.Расширение ConcurrentHashMap отличается от расширения HashMap.Он расширяет емкость в многопоточных условиях или с использованием нескольких потоков одновременно.Каждый поток расширяет указанную часть хеш-ковша.После расширения текущего потока указанный сегмент, он будет продолжать получать следующую задачу расширения, пока расширение не будет завершено. Размер расширения такой же, как и у HashMap, который удваивается, что может эффективно уменьшить количество перемещаемых элементов, то есть причина использования степени 2, что также и в HashMap.
  • Только когда возникает конфликт хэшей, могут быть только элементы заголовка, необходимые в данный момент для добавления узлов, которые могут быть корневым узлом связанного узла заголовка или красной стрижкой, а другие узлы ведра не нужно блокировать, что значительно уменьшает размер замка.

В процессе добавления элементов в ConcurrentHashMap мы знаем, что контейнер ConcurrentHashMap реализует управление параллелизмом через CAS + synchronized. Вот дополнительный вопрос: зачем использовать синхронизированный вместо ReentrantLock? В моей предыдущей статье также анализировалась реализация и производительность synchronized и ReentrantLock, насколько я понимаю, у synchronized больше места для последующей оптимизации, чем у ReentrantLock.

Параллельный контейнер ConcurrentSkipListMap

Соответствующие контейнеры в java.util могут в основном найти соответствующие concurrent-контейнеры в пакете java.util.concurrent: List и Set имеют соответствующие CopyOnWriteArrayList и CopyOnWriteArraySet, HashMap имеет соответствующие ConcurrentHashMap, но упорядоченное TreeMap или нет соответствующего ConcurrentTreeMap.

Почему нет ConcurrentTreeMap? Это связано с тем, что внутри TreeMap используется красно-черное дерево. Красно-черное дерево является самобалансирующимся двоичным деревом. Когда дерево изменяется, его необходимо перебалансировать. Операция перебалансировки может повлиять на большинство узлов в дереве. Если параллелизм очень высок В больших случаях это требует добавления блокировок мьютексов на многих узлах дерева, а параллелизм не имеет смысла. Поэтому предоставляется другая реализация упорядоченной карты в условиях параллелизма: ConcurrentSkipListMap.

ConcurrentSkipListMap реализуется внутри структуры данных SkipList. Его структура очень проста для понимания по сравнению с красно-черным деревом, и его относительно просто реализовать. Теоретически его сложность времени поиска, вставки и удаления составляет log (n) . Что касается параллелизма, ConcurrentSkipListMap использует CAS+spin без блокировки для управления.

Пропущенный список — это просто многоуровневый связанный список.Нижний слой — это обычный связанный список, который затем сокращается слой за слоем.Обычно используется простой алгоритм, чтобы понять, что элементы каждого слоя составляют половину элементы следующего слоя, так что при поиске поиск начинается с самого верхнего элемента, что можно назвать другой формой бинарного поиска.

Простой алгоритм вероятности доложимости слоя приобретения выполнен следующим образом:

int random_level()  {  
    K = 1;  
    while (random(0,1))  
        K++;  
  
    return K;  
}  

Получение вероятности и вероятность простого 0, слой 1 составляет 50%, вероятность слоя 2 составляет 25%, вероятность слоя 3 составляет 12,5%, так что постепенно снижается.

Процесс добавления элементов в трехуровневый список пропуска выглядит следующим образом:

Вставьте узел со значением 15:
После вставки:
В Википедии есть движущийся граф для добавления узлов, который также размещен здесь для простоты понимания:

Изучите список пропусков и управление параллелизмом CAS, проанализировав метод put ConcurrentSkipListMap:

 private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前继节点
            if (n != null) { //查找到前继节点
                Object v; int c;
                Node<K,V> f = n.next; //获取后继节点的后继节点
                if (n != b.next)  //发生竞争,两次节点获取不一致,并发导致
                    break;
                if ((v = n.value) == null) {  // 节点已经被删除
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n) 
                    break;
                if ((c = cpr(cmp, key, n.key)) > 0) { //进行下一轮查找,比当前key大
                    b = n;
                    n = f;
                    continue;
                }
                if (c == 0) { //相等时直接cas修改值
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    break; // restart if lost race to replace value
                }
                // else c < 0; fall through
            }

            z = new Node<K,V>(key, value, n); //9. n.key > key > b.key
            if (!b.casNext(n, z)) //cas修改值 
                break;         // restart if lost race to append to b
            break outer;
        }
    }

    int rnd = ThreadLocalRandom.nextSecondarySeed(); //获取随机数
    if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
        int level = 1, max;
        while (((rnd >>>= 1) & 1) != 0) // 获取跳表层级
            ++level;
        Index<K,V> idx = null;
        HeadIndex<K,V> h = head;
        if (level <= (max = h.level)) { //如果获取的调表层级小于等于当前最大层级,则直接添加,并将它们组成一个上下的链表
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
        }
        else { // try to grow by one level //否则增加一层level,在这里体现为Index<K,V>数组
            level = max + 1; // hold in array and later pick the one to use
            @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                (Index<K,V>[])new Index<?,?>[level+1];
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            for (;;) {
                h = head;
                int oldLevel = h.level;
                if (level <= oldLevel) // lost race to add level
                    break;
                HeadIndex<K,V> newh = h;
                Node<K,V> oldbase = h.node;
                for (int j = oldLevel+1; j <= level; ++j) //新添加的level层的具体数据
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        // 逐层插入数据过程
        splice: for (int insertionLevel = level;;) {
            int j = h.level;
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
                if (q == null || t == null)
                    break splice;
                if (r != null) {
                    Node<K,V> n = r.node;
                    // compare before deletion check avoids needing recheck
                    int c = cpr(cmp, key, n.key);
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                if (j == insertionLevel) {
                    if (!q.link(r, t))
                        break; // restart
                    if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    if (--insertionLevel == 0)
                        break splice;
                }

                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}

Метод вставки здесь очень сложен и может состоять из трех шагов: первый шаг — получить узел-предшественник, а затем вставить узел через CAS; второй шаг — оценить количество уровней, и если оно больше, чем максимальное количество слоев, вставить слой; Третий шаг вставляет данные соответствующего слоя. Весь процесс вставки использует спин CAS для обеспечения правильности данных в параллельных условиях.

Суммировать

JDK предоставляет нам для использования множество параллельных контейнеров, и введение в этой статье не является исчерпывающим.Смысл в том, чтобы понять их уникальные сценарии использования, поняв принципы работы различных параллельных контейнеров. Вот краткое резюме: когда вам нужно использовать List и Set в сценарии, в котором количество одновременных операций чтения гораздо больше, чем изменений, вы можете рассмотреть возможность использования CopyOnWriteArrayList и CopyOnWriteArraySet; когда вам нужно использовать пары ключ-значение для доступ к данным одновременно, вы можете использовать ConcurrentHashMap ; ConcurrentSkipListMap можно использовать, когда гарантированно упорядочены параллельные пары ключ-значение .