Я резюмировал ранее в сборнике JavaСведения об исходном коде ArrayList, в котором также упоминается, что ArrayList небезопасен для потоков, поэтому что такое потокобезопасный список, предоставленный нам jdk, то есть CopyOnWriteList, класс параллельной коллекции, который будет упомянут ниже, который в основном используетcopy-on-write
мысль, мысль, наверное读写分离:读时共享、写时复制(原本的array)更新(且为独占式的加锁)
, и конкретная реализация разбираемого ниже исходного кода также является воплощением этой идеи
Обзор
Давайте сначала вставим систему наследования CopyOnWriteList. Вы можете видеть, что она реализует интерфейсы Serializable, Cloneable и RandomAccess и имеет характеристики произвольного доступа. Она реализует интерфейс List и имеет характеристики List.
Давайте рассмотрим основные атрибуты CopyOnWriteList и методы, которые будут проанализированы ниже. Из рисунка видно, что:
-
Каждый объект CopyOnWriteList имеет массив массивов для хранения определенных элементов.
-
Используйте эксклюзивную блокировку ReentrantLock, чтобы убедиться, что только поток записи обновляет копию массива. О ReentrantLock вы можете обратиться к другой моей статьеReentrantLock приложения AQS.
-
CopyOnWriteArrayList не будет генерировать ConcurrentModificationException при обходе, и нет необходимости добавлять дополнительные блокировки при обходе
Следующее в основном предназначено для просмотра реализации CopyOnWriteList.
Свойства члена
//这个就是保证更新数组的时候只有一个线程能够获取lock,然后更新
final transient ReentrantLock lock = new ReentrantLock();
//使用volatile修饰的array,保证写线程更新array之后别的线程能够看到更新后的array.
//但是并不能保证实时性:在数组副本上添加元素之后,还没有更新array指向新地址之前,别的读线程看到的还是旧的array
private transient volatile Object[] array;
//获取数组,非private的,final修饰
final Object[] getArray() {
return array;
}
//设置数组
final void setArray(Object[] a) {
array = a;
}
Метод строительства
(1) Без построения параметров, по умолчанию создается массив длины 0
//这里就是构造方法,创建一个新的长度为0的Object数组
//然后调用setArray方法将其设置给CopyOnWriteList的成员变量array
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}
(2) Конструктор, параметром которого является Коллекция.
//按照集合的迭代器遍历返回的顺序,创建包含传入的collection集合的元素的列表
//如果传递的参数为null,会抛出异常
public CopyOnWriteArrayList(Collection<? extends E> c) {
Object[] elements; //一个elements数组
//这里是判断传递的是否就是一个CopyOnWriteArrayList集合
if (c.getClass() == CopyOnWriteArrayList.class)
//如果是,直接调用getArray方法,获得传入集合的array然后赋值给elements
elements = ((CopyOnWriteArrayList<?>)c).getArray();
else {
//先将传入的集合转变为数组形式
elements = c.toArray();
//c.toArray()可能不会正确地返回一个 Object[]数组,那么使用Arrays.copyOf()方法
if (elements.getClass() != Object[].class)
elements = Arrays.copyOf(elements, elements.length, Object[].class);
}
//直接调用setArray方法设置array属性
setArray(elements);
}
(3) Создайте список, содержащий копию данного массива
public CopyOnWriteArrayList(E[] toCopyIn) {
setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}
Выше приведена инициализация CopyOnWriteList. Три метода построения относительно просты для понимания. Позже мы в основном рассмотрим реализацию нескольких основных методов.
добавить элемент
Ниже приведена реализация метода add(E e) и подробные комментарии.
public boolean add(E e) {
//获得独占锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//获得list底层的数组array
Object[] elements = getArray();
//获得数组长度
int len = elements.length;
//拷贝到新数组,新数组长度为len+1
Object[] newElements = Arrays.copyOf(elements, len + 1);
//给新数组末尾元素赋值
newElements[len] = e;
//用新的数组替换掉原来的数组
setArray(newElements);
return true;
} finally {
lock.unlock();//释放锁
}
}
Обобщите поток выполнения метода add.
- Поток, вызывающий метод add, сначала получит блокировку, а затем вызовет метод блокировки для блокировки списка (те, кто понимает ReentrantLock, знают, что это эксклюзивная блокировка, поэтому при многопоточности блокировку получит только один поток).
- Только поток получит блокировку, поэтому только один поток будет обновлять массив.Во время этого процесса другие потоки, вызывающие метод добавления, блокируются в ожидании
- Поток, получивший блокировку, продолжает выполняться
- Сначала получите исходный массив и его длину, затем скопируйте элементы в новый массив (длина newArray равна исходной длине + 1)
- Присвоить указанному индексу массива len+1
- Замените старый массив новым массивом
- Наконец, отпустите замок
Итак, подведем итог: только один поток может получить блокировку при многопоточности, а затем добавить элементы, скопировав исходный массив, затем заменить исходный массив новым массивом и, наконец, снять блокировку (другие добавить потоки для выполнения) ) .
Последний момент заключается в том, что длина массива не является фиксированной, и длина массива будет +1 после каждой записи, поэтому CopyOnWriteList не имеет таких атрибутов, как длина или размер, но предоставляет метод size() для получения фактический размер коллекции, методы size(), как показано ниже
public int size() {
return getArray().length;
}
получить элемент
Используйте get (i), чтобы получить элемент в указанной позиции i. Конечно, если элемент не существует, будет выдано исключение массива за пределами границ.
public E get(int index) {
return get(getArray(), index);
}
final Object[] getArray() {
return array;
}
private E get(Object[] a, int index) {
return (E) a[index];
}
Конечно, здесь также отражен метод get.copy-on-write-list
проблема слабой согласованности. Мы используем следующую диаграмму, чтобы кратко объяснить. Предположение, данное на рисунке, таково: threadA обращается к элементу с индексом = 1.
- ① Получить массив массивов
- ②Доступ к элементу, указанному входящим параметром
Потому что мы видим, что процесс get не заблокирован (при условии, что в массиве три элемента, как показано на рисунке). Предположим, что поток A выполняет ① после ② и до ②, поток B выполняет операцию удаления (1), поток B получает эксклюзивную блокировку, а затем выполняет операцию копирования при записи, то есть复制一个新的数组neArray
, а затем выполните операцию удаления (1) в newArray, чтобы обновить массив. После выполнения threadB элемент с index=1 в массиве уже является item3.
Затем threadA продолжает выполняться, но поскольку threadA работает с элементами исходного массива, index=1 в это время по-прежнему является item2. Таким образом, последнее явление заключается в том, что хотя поток B удаляет элемент в позиции 1, поток A по-прежнему обращается к элементу исходного массива. Это若一致性问题
изменить элемент
Модификация также принадлежит写
, поэтому вам нужно получить блокировку, ниже приведена реализация метода set
public E set(int index, E element) {
//获取锁
final ReentrantLock lock = this.lock;
//进行加锁
lock.lock();
try {
//获取数组array
Object[] elements = getArray();
//获取index位置的元素
E oldValue = get(elements, index);
// 要修改的值和原值不相等
if (oldValue != element) {
//获取旧数组的长度
int len = elements.length;
//复制到一个新数组中
Object[] newElements = Arrays.copyOf(elements, len);
//在新数组中设置元素值
newElements[index] = element;
//用新数组替换掉原数组
setArray(newElements);
} else {
// Not quite a no-op; ensures volatile write semantics
//为了保证volatile 语义,即使没有修改,也要替换成新的数组
setArray(elements);
}
return oldValue; //返回旧值
} finally {
lock.unlock();//释放锁
}
}
Посмотрев на метод set, я обнаружил, что он на самом деле похож на метод add.
- Получите эксклюзивную блокировку, чтобы гарантировать, что только один поток может изменять массив одновременно.
- Получите текущий массив, вызовите метод get, чтобы получить элемент массива в указанной позиции
- Определите значение, полученное с помощью get, и параметры, переданные в
- Equal, чтобы обеспечить volatile семантику, все же необходимо пересоздать массив
- Если они не равны, скопируйте элементы исходного массива в новый массив, а затем измените их по индексу нового массива.После модификации замените исходный массив новым массивом.
- разблокировать замок
удалить элемент
Ниже приведена реализация метода удаления.
- Получите эксклюзивную блокировку, чтобы гарантировать, что только один поток может удалить элемент
- Подсчитайте количество элементов массива для перемещения
- Если последний элемент удален, то приведенный выше результат вычисления равен 0, а первые элементы len-1 исходного массива напрямую заменяются новым массивом.
- Удаляемый элемент не является последним элементом, затем он делится на две части по индексу, соответственно копируется в новый массив, а затем заменяется.
- разблокировать замок
public E remove(int index) {
//获取锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//获取原数组
Object[] elements = getArray();
//获取原数组长度
int len = elements.length;
//获取原数组index处的值
E oldValue = get(elements, index);
//因为数组删除元素需要移动,所以这里就是计算需要移动的个数
int numMoved = len - index - 1;
//计算的numMoved=0,表示要删除的是最后一个元素,
//那么旧直接将原数组的前len-1个复制到新数组中,替换旧数组即可
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
//要删除的不是最后一个元素
else {
//创建一个长度为len-1的数组
Object[] newElements = new Object[len - 1];
//将原数组中index之前的元素复制到新数组
System.arraycopy(elements, 0, newElements, 0, index);
//将原数组中index之后的元素复制到新数组
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
//用新数组替换原数组
setArray(newElements);
}
return oldValue;//返回旧值
} finally {
lock.unlock();//释放锁
}
}
итератор
Основное использование итератора заключается в следующем: метод hashNext() используется для определения наличия еще элементов, а метод next возвращает конкретный элемент.
CopyOnWriteArrayList list = new CopyOnWriteArrayList();
Iterator<?> itr = list.iterator();
while(itr.hashNext()) {
//do sth
itr.next();
}
Так как же реализован итератор в CopyOnWriteArrayList и почему у него слабая согласованность? (先获取迭代器的,但是如果在获取迭代器之后别的线程对list进行了修改,这对于迭代器是不可见的
), поговорим о реализации в CopyOnWriteArrayList
//Iterator<?> itr = list.iterator();
public Iterator<E> iterator() {
//这里可以看到,是先获取到原数组getArray(),这里记为oldArray
//然后调用COWIterator构造器将oldArray作为参数,创建一个迭代器对象
//从下面的COWIterator类中也能看到,其中有一个成员存储的就是oldArray的副本
return new COWIterator<E>(getArray(), 0);
}
static final class COWIterator<E> implements ListIterator<E> {
//array的快照版本
private final Object[] snapshot;
//后续调用next返回的元素索引(数组下标)
private int cursor;
//构造器
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}
//变量是否结束:下标小于数组长度
public boolean hasNext() {
return cursor < snapshot.length;
}
//是否有前驱元素
public boolean hasPrevious() {
return cursor > 0;
}
//获取元素
//hasNext()返回true,直接通过cursor记录的下标获取值
//hasNext()返回false,抛出异常
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
//other method...
}
В приведенном выше коде мы видим, что метод списка iterator() фактически возвращает объект COWIterator, а переменная-член моментального снимка объекта COWIterator сохраняет当前
Содержимое хранится в массиве в списке, но можно сказать, что снимок является снимком массива, почему вы так говорите?
То, что мы проходим, хотя текущий
array
, но могут быть и другие темыarray
измененный, а затем оригинальныйarray
Заменил, то в это время в спискеarray
а такжеsnapshot
цитируетсяarray
не один, как оригиналarray
Если снимок существует, итератор не будет обращаться к обновленному массиву. Это проявление слабой консистенции
Давайте посмотрим на следующий пример
public class TestCOW {
private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList();
public static void main(String[] args) throws InterruptedException {
list.add("item1");
list.add("item2");
list.add("item3");
Thread thread = new Thread() {
@Override
public void run() {
list.set(1, "modify-item1");
list.remove("item2");
}
};
//main线程先获得迭代器
Iterator<String> itr = list.iterator();
thread.start();//启动thread线程
thread.join();//这里让main线程等待thread线程执行完,然后再遍历看看输出的结果是不是修改后的结果
while (itr.hasNext()) {
System.out.println(Thread.currentThread().getName() + "线程中的list的元素:" + itr.next());
}
}
}
Результат работы следующий. На самом деле, в приведенной выше программе мы сначала добавляем несколько элементов в список, а затем модифицируем список в потоке, при этом позволяяmain线程先获得list的迭代器
, и подождите, пока поток завершит выполнение, а затем распечатайте элементы в списке.Обнаружено, что основной поток не нашел изменение массива в списке, и на выходе остается исходный список, который является воплощением слабой консистенции.
Элемент списка в основном потоке: item1 Элемент списка в основном потоке: item2 Элемент списка в основном потоке: item3
Суммировать
- Как гарантируется CopyOnWriteArrayList
写
Потокобезопасность: используйте ReentrantLock в качестве эксклюзивной блокировки, чтобы гарантировать, что только один поток одновременно имеет доступ к коллекции.写
действовать - Данные хранятся в массиве array в CopyOnWriteArrayList, а длина массива изменяется динамически (
写
операция обновит массив) - При изменении массива вместо непосредственного управления массивом копируется новый массив, модификация завершается, а старый массив заменяется новым.
- Нет необходимости в блокировке при использовании итератора для обхода, и ConcurrentModificationException не будет выброшено, потому что итератор используется для обхода копии массива (конечно, это тот случай, когда другие потоки изменяют список)
установить детали метода
Обратите внимание, что в методе set есть фрагмент кода, который выглядит следующим образом:
else { //oldValue = element(element是传入的参数)
// Not quite a no-op; ensures volatile write semantics
//为了保证volatile 语义,即使没有修改,也要替换成新的数组
setArray(elements);
}
На самом деле это означает, что изменяемое значение в указанной позиции совпадает со значением этой позиции в массиве, но для обновления массива все же необходимо вызвать метод set.Почему так? этосообщение, вывод такой维护happens-before原则
. Сначала посмотрите на это предложение
Методы всех классов в java.util.concurrent и его подпакетах расширяют эти гарантии для синхронизации более высокого уровня. В частности: операции в потоке перед помещением объекта в любую параллельную коллекцию — до доступа или удаления элемента из коллекции в другом потоке**
后续操作
**.
Можно понять, что это делается для того, чтобы последовательность операций до операции set происходила до того, как другие потоки получили доступ к массиву (без блокировки).后续操作
, обратитесь к следующему примеру
// 这是两个线程的初始情况
int nonVolatileField = 0; //一个不被volatile修饰的变量
//伪代码
CopyOnWriteArrayList<String> list = {"x","y","z"}
// Thread 1
// (1)这里更新了nonVolatileField
nonVolatileField = 1;
// (2)这里是set()修改(写)操作,注意这里会对volatile修饰的array进行写操作
list.set(0, "x");
// Thread 2
// (3)这里是访问(读)操作
String s = list.get(0);
// (4)使用nonVolatileField
if (s == "x") {
int localVar = nonVolatileField;
}
Предполагая, что описанный выше сценарий существует, если можно гарантировать, что будет только такая траектория: (1) -> (2) -> (3) -> (4).Согласно соглашению в приведенном выше документе API Java, Существуют
(2) произойдет до и (3), операции в потоке: (1) произойдет до и (2), (3) произойдет до и (4), чтение и запись nonVolatileField в соответствии с транзитивностью события -до того, как переменные имеют (1) событие-до и (4)
Таким образом, операция записи потока 1 в nonVolatileField видна операции чтения потока 2. Если нет setArray(elements) в другом наборе CopyOnWriteArrayList
对volatile变量的写
Если это так, (2) произошло раньше и (3) больше не доступны, и указанная выше видимость не может быть гарантирована.
Таким образом, необходимо обеспечить, чтобы серия операций до операции set происходила до того, как другие потоки получили доступ к массиву (без блокировки).后续操作
,