Сведения о реализации CopyOnWriteList

Java

Я резюмировал ранее в сборнике JavaСведения об исходном коде ArrayList, в котором также упоминается, что ArrayList небезопасен для потоков, поэтому что такое потокобезопасный список, предоставленный нам jdk, то есть CopyOnWriteList, класс параллельной коллекции, который будет упомянут ниже, который в основном используетcopy-on-writeмысль, мысль, наверное读写分离:读时共享、写时复制(原本的array)更新(且为独占式的加锁), и конкретная реализация разбираемого ниже исходного кода также является воплощением этой идеи

Обзор

Давайте сначала вставим систему наследования CopyOnWriteList. Вы можете видеть, что она реализует интерфейсы Serializable, Cloneable и RandomAccess и имеет характеристики произвольного доступа. Она реализует интерфейс List и имеет характеристики List.

Давайте рассмотрим основные атрибуты CopyOnWriteList и методы, которые будут проанализированы ниже. Из рисунка видно, что:

  • Каждый объект CopyOnWriteList имеет массив массивов для хранения определенных элементов.

  • Используйте эксклюзивную блокировку ReentrantLock, чтобы убедиться, что только поток записи обновляет копию массива. О ReentrantLock вы можете обратиться к другой моей статьеReentrantLock приложения AQS.

  • CopyOnWriteArrayList не будет генерировать ConcurrentModificationException при обходе, и нет необходимости добавлять дополнительные блокировки при обходе

    Следующее в основном предназначено для просмотра реализации CopyOnWriteList.

Свойства члена

//这个就是保证更新数组的时候只有一个线程能够获取lock,然后更新
final transient ReentrantLock lock = new ReentrantLock();
//使用volatile修饰的array,保证写线程更新array之后别的线程能够看到更新后的array.
//但是并不能保证实时性:在数组副本上添加元素之后,还没有更新array指向新地址之前,别的读线程看到的还是旧的array
private transient volatile Object[] array;
//获取数组,非private的,final修饰
final Object[] getArray() {
    return array;
}
//设置数组
final void setArray(Object[] a) {
    array = a;
}

Метод строительства

(1) Без построения параметров, по умолчанию создается массив длины 0

//这里就是构造方法,创建一个新的长度为0的Object数组
//然后调用setArray方法将其设置给CopyOnWriteList的成员变量array
public CopyOnWriteArrayList() {
    setArray(new Object[0]);
}

(2) Конструктор, параметром которого является Коллекция.

//按照集合的迭代器遍历返回的顺序,创建包含传入的collection集合的元素的列表
//如果传递的参数为null,会抛出异常
public CopyOnWriteArrayList(Collection<? extends E> c) {
    Object[] elements; //一个elements数组
    //这里是判断传递的是否就是一个CopyOnWriteArrayList集合
    if (c.getClass() == CopyOnWriteArrayList.class)
        //如果是,直接调用getArray方法,获得传入集合的array然后赋值给elements
        elements = ((CopyOnWriteArrayList<?>)c).getArray();
    else {
        //先将传入的集合转变为数组形式
        elements = c.toArray();
        //c.toArray()可能不会正确地返回一个 Object[]数组,那么使用Arrays.copyOf()方法
        if (elements.getClass() != Object[].class)
            elements = Arrays.copyOf(elements, elements.length, Object[].class);
    }
    //直接调用setArray方法设置array属性
    setArray(elements);
}

(3) Создайте список, содержащий копию данного массива

public CopyOnWriteArrayList(E[] toCopyIn) {
    setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}

Выше приведена инициализация CopyOnWriteList. Три метода построения относительно просты для понимания. Позже мы в основном рассмотрим реализацию нескольких основных методов.

добавить элемент

Ниже приведена реализация метода add(E e) и подробные комментарии.

public boolean add(E e) {
    //获得独占锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        //获得list底层的数组array
        Object[] elements = getArray();
        //获得数组长度
        int len = elements.length;
        //拷贝到新数组,新数组长度为len+1
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        //给新数组末尾元素赋值
        newElements[len] = e;
        //用新的数组替换掉原来的数组
        setArray(newElements);
        return true; 
    } finally {
        lock.unlock();//释放锁
    }
}

Обобщите поток выполнения метода add.

  • Поток, вызывающий метод add, сначала получит блокировку, а затем вызовет метод блокировки для блокировки списка (те, кто понимает ReentrantLock, знают, что это эксклюзивная блокировка, поэтому при многопоточности блокировку получит только один поток).
  • Только поток получит блокировку, поэтому только один поток будет обновлять массив.Во время этого процесса другие потоки, вызывающие метод добавления, блокируются в ожидании
  • Поток, получивший блокировку, продолжает выполняться
    • Сначала получите исходный массив и его длину, затем скопируйте элементы в новый массив (длина newArray равна исходной длине + 1)
    • Присвоить указанному индексу массива len+1
    • Замените старый массив новым массивом
  • Наконец, отпустите замок

Итак, подведем итог: только один поток может получить блокировку при многопоточности, а затем добавить элементы, скопировав исходный массив, затем заменить исходный массив новым массивом и, наконец, снять блокировку (другие добавить потоки для выполнения) ) .

Последний момент заключается в том, что длина массива не является фиксированной, и длина массива будет +1 после каждой записи, поэтому CopyOnWriteList не имеет таких атрибутов, как длина или размер, но предоставляет метод size() для получения фактический размер коллекции, методы size(), как показано ниже

public int size() {
    return getArray().length;
}

получить элемент

Используйте get (i), чтобы получить элемент в указанной позиции i. Конечно, если элемент не существует, будет выдано исключение массива за пределами границ.

public E get(int index) {
    return get(getArray(), index);
}
final Object[] getArray() {
    return array;
}
private E get(Object[] a, int index) {
    return (E) a[index];
}

Конечно, здесь также отражен метод get.copy-on-write-listпроблема слабой согласованности. Мы используем следующую диаграмму, чтобы кратко объяснить. Предположение, данное на рисунке, таково: threadA обращается к элементу с индексом = 1.

  • ① Получить массив массивов
  • ②Доступ к элементу, указанному входящим параметром

Потому что мы видим, что процесс get не заблокирован (при условии, что в массиве три элемента, как показано на рисунке). Предположим, что поток A выполняет ① после ② и до ②, поток B выполняет операцию удаления (1), поток B получает эксклюзивную блокировку, а затем выполняет операцию копирования при записи, то есть复制一个新的数组neArray, а затем выполните операцию удаления (1) в newArray, чтобы обновить массив. После выполнения threadB элемент с index=1 в массиве уже является item3.

Затем threadA продолжает выполняться, но поскольку threadA работает с элементами исходного массива, index=1 в это время по-прежнему является item2. Таким образом, последнее явление заключается в том, что хотя поток B удаляет элемент в позиции 1, поток A по-прежнему обращается к элементу исходного массива. Это若一致性问题

изменить элемент

Модификация также принадлежит, поэтому вам нужно получить блокировку, ниже приведена реализация метода set

public E set(int index, E element) {
    //获取锁
    final ReentrantLock lock = this.lock;
    //进行加锁
    lock.lock();
    try {
        //获取数组array
        Object[] elements = getArray();
        //获取index位置的元素
        E oldValue = get(elements, index);
        // 要修改的值和原值不相等
        if (oldValue != element) {
            //获取旧数组的长度
            int len = elements.length;
            //复制到一个新数组中
            Object[] newElements = Arrays.copyOf(elements, len);
            //在新数组中设置元素值
            newElements[index] = element;
            //用新数组替换掉原数组
            setArray(newElements);
        } else {
            // Not quite a no-op; ensures volatile write semantics
            //为了保证volatile 语义,即使没有修改,也要替换成新的数组
            setArray(elements);
        }
        return oldValue; //返回旧值
    } finally {
        lock.unlock();//释放锁
    }
}

Посмотрев на метод set, я обнаружил, что он на самом деле похож на метод add.

  • Получите эксклюзивную блокировку, чтобы гарантировать, что только один поток может изменять массив одновременно.
  • Получите текущий массив, вызовите метод get, чтобы получить элемент массива в указанной позиции
  • Определите значение, полученное с помощью get, и параметры, переданные в
    • Equal, чтобы обеспечить volatile семантику, все же необходимо пересоздать массив
    • Если они не равны, скопируйте элементы исходного массива в новый массив, а затем измените их по индексу нового массива.После модификации замените исходный массив новым массивом.
  • разблокировать замок

удалить элемент

Ниже приведена реализация метода удаления.

  • Получите эксклюзивную блокировку, чтобы гарантировать, что только один поток может удалить элемент
  • Подсчитайте количество элементов массива для перемещения
    • Если последний элемент удален, то приведенный выше результат вычисления равен 0, а первые элементы len-1 исходного массива напрямую заменяются новым массивом.
    • Удаляемый элемент не является последним элементом, затем он делится на две части по индексу, соответственно копируется в новый массив, а затем заменяется.
  • разблокировать замок
public E remove(int index) {
    //获取锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        //获取原数组
        Object[] elements = getArray();
        //获取原数组长度
        int len = elements.length;
        //获取原数组index处的值
        E oldValue = get(elements, index);
        //因为数组删除元素需要移动,所以这里就是计算需要移动的个数
        int numMoved = len - index - 1;
        //计算的numMoved=0,表示要删除的是最后一个元素,
        //那么旧直接将原数组的前len-1个复制到新数组中,替换旧数组即可
        if (numMoved == 0)
            setArray(Arrays.copyOf(elements, len - 1));
        //要删除的不是最后一个元素
        else {
            //创建一个长度为len-1的数组
            Object[] newElements = new Object[len - 1];
            //将原数组中index之前的元素复制到新数组
            System.arraycopy(elements, 0, newElements, 0, index);
            //将原数组中index之后的元素复制到新数组
            System.arraycopy(elements, index + 1, newElements, index,
                             numMoved);
            //用新数组替换原数组
            setArray(newElements);
        }
        return oldValue;//返回旧值
    } finally {
        lock.unlock();//释放锁
    }
}

итератор

Основное использование итератора заключается в следующем: метод hashNext() используется для определения наличия еще элементов, а метод next возвращает конкретный элемент.

CopyOnWriteArrayList list = new CopyOnWriteArrayList();
Iterator<?> itr = list.iterator();
while(itr.hashNext()) {
    //do sth
    itr.next();
}

Так как же реализован итератор в CopyOnWriteArrayList и почему у него слабая согласованность? (先获取迭代器的,但是如果在获取迭代器之后别的线程对list进行了修改,这对于迭代器是不可见的), поговорим о реализации в CopyOnWriteArrayList

//Iterator<?> itr = list.iterator();
public Iterator<E> iterator() {
    //这里可以看到,是先获取到原数组getArray(),这里记为oldArray
    //然后调用COWIterator构造器将oldArray作为参数,创建一个迭代器对象
    //从下面的COWIterator类中也能看到,其中有一个成员存储的就是oldArray的副本
    return new COWIterator<E>(getArray(), 0);
}
static final class COWIterator<E> implements ListIterator<E> {
    //array的快照版本
    private final Object[] snapshot;
    //后续调用next返回的元素索引(数组下标)
    private int cursor;
    //构造器
    private COWIterator(Object[] elements, int initialCursor) {
        cursor = initialCursor;
        snapshot = elements;
    }
    //变量是否结束:下标小于数组长度
    public boolean hasNext() {
        return cursor < snapshot.length;
    }
    //是否有前驱元素
    public boolean hasPrevious() {
        return cursor > 0;
    }
    //获取元素
    //hasNext()返回true,直接通过cursor记录的下标获取值
    //hasNext()返回false,抛出异常
    public E next() {
        if (! hasNext())
            throw new NoSuchElementException();
        return (E) snapshot[cursor++];
    }
    //other method...
}

В приведенном выше коде мы видим, что метод списка iterator() фактически возвращает объект COWIterator, а переменная-член моментального снимка объекта COWIterator сохраняет当前Содержимое хранится в массиве в списке, но можно сказать, что снимок является снимком массива, почему вы так говорите?

То, что мы проходим, хотя текущийarray, но могут быть и другие темыarrayизмененный, а затем оригинальныйarrayЗаменил, то в это время в спискеarrayа такжеsnapshotцитируетсяarrayне один, как оригиналarrayЕсли снимок существует, итератор не будет обращаться к обновленному массиву. Это проявление слабой консистенции

Давайте посмотрим на следующий пример

public class TestCOW {

    private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList();

    public static void main(String[] args) throws InterruptedException {
        list.add("item1");
        list.add("item2");
        list.add("item3");

        Thread thread = new Thread() {
            @Override
            public void run() {
                list.set(1, "modify-item1");
                list.remove("item2");
            }
        };
        //main线程先获得迭代器
        Iterator<String> itr = list.iterator();
        thread.start();//启动thread线程
        thread.join();//这里让main线程等待thread线程执行完,然后再遍历看看输出的结果是不是修改后的结果
        while (itr.hasNext()) {
            System.out.println(Thread.currentThread().getName() + "线程中的list的元素:" + itr.next());
        }
    }
}

Результат работы следующий. На самом деле, в приведенной выше программе мы сначала добавляем несколько элементов в список, а затем модифицируем список в потоке, при этом позволяяmain线程先获得list的迭代器, и подождите, пока поток завершит выполнение, а затем распечатайте элементы в списке.Обнаружено, что основной поток не нашел изменение массива в списке, и на выходе остается исходный список, который является воплощением слабой консистенции.

Элемент списка в основном потоке: item1 Элемент списка в основном потоке: item2 Элемент списка в основном потоке: item3

Суммировать

  • Как гарантируется CopyOnWriteArrayListПотокобезопасность: используйте ReentrantLock в качестве эксклюзивной блокировки, чтобы гарантировать, что только один поток одновременно имеет доступ к коллекции.действовать
  • Данные хранятся в массиве array в CopyOnWriteArrayList, а длина массива изменяется динамически (операция обновит массив)
  • При изменении массива вместо непосредственного управления массивом копируется новый массив, модификация завершается, а старый массив заменяется новым.
  • Нет необходимости в блокировке при использовании итератора для обхода, и ConcurrentModificationException не будет выброшено, потому что итератор используется для обхода копии массива (конечно, это тот случай, когда другие потоки изменяют список)

установить детали метода

Обратите внимание, что в методе set есть фрагмент кода, который выглядит следующим образом:

else { //oldValue = element(element是传入的参数)
    // Not quite a no-op; ensures volatile write semantics
    //为了保证volatile 语义,即使没有修改,也要替换成新的数组
    setArray(elements);
}

На самом деле это означает, что изменяемое значение в указанной позиции совпадает со значением этой позиции в массиве, но для обновления массива все же необходимо вызвать метод set.Почему так? этосообщение, вывод такой维护happens-before原则. Сначала посмотрите на это предложение

Методы всех классов в java.util.concurrent и его подпакетах расширяют эти гарантии для синхронизации более высокого уровня. В частности: операции в потоке перед помещением объекта в любую параллельную коллекцию — до доступа или удаления элемента из коллекции в другом потоке**后续操作**.

Можно понять, что это делается для того, чтобы последовательность операций до операции set происходила до того, как другие потоки получили доступ к массиву (без блокировки).后续操作, обратитесь к следующему примеру

// 这是两个线程的初始情况
int nonVolatileField = 0; //一个不被volatile修饰的变量
//伪代码
CopyOnWriteArrayList<String> list = {"x","y","z"}

// Thread 1
// (1)这里更新了nonVolatileField
nonVolatileField = 1;
// (2)这里是set()修改(写)操作,注意这里会对volatile修饰的array进行写操作
list.set(0, "x");

// Thread 2
// (3)这里是访问(读)操作
String s = list.get(0);
// (4)使用nonVolatileField
if (s == "x") {
    int localVar = nonVolatileField;
}

Предполагая, что описанный выше сценарий существует, если можно гарантировать, что будет только такая траектория: (1) -> (2) -> (3) -> (4).Согласно соглашению в приведенном выше документе API Java, Существуют

(2) произойдет до и (3), операции в потоке: (1) произойдет до и (2), (3) произойдет до и (4), чтение и запись nonVolatileField в соответствии с транзитивностью события -до того, как переменные имеют (1) событие-до и (4)

Таким образом, операция записи потока 1 в nonVolatileField видна операции чтения потока 2. Если нет setArray(elements) в другом наборе CopyOnWriteArrayList对volatile变量的写Если это так, (2) произошло раньше и (3) больше не доступны, и указанная выше видимость не может быть гарантирована.

Таким образом, необходимо обеспечить, чтобы серия операций до операции set происходила до того, как другие потоки получили доступ к массиву (без блокировки).后续操作,