Первое понимание принципа реализации CAS

исходный код
Первое понимание принципа реализации CAS

Отсканируйте QR-код ниже или WeChat, чтобы найти официальную учетную запись.菜鸟飞呀飞, вы можете следить за публичной учетной записью WeChat, читать дальшеSpring源码分析а такжеJava并发编程статья.

微信公众号

В области параллелизма Java всегда упоминаются атомарные операции, а Java, как язык высокого уровня, предоставляет два решения для достижения атомарных операций: 1) блокировка; в пакете JUC Дуг Ли, бог параллельного программирования , предоставляет множество атомарных классов, все из которых реализованы через CAS. Далее в этой статье в основном будут представлены знания, связанные с CAS.

1. Введение в CAS

  • Полное название CAS — Compare And Swap, что переводится как сравнение и обмен. Предположим, что значение данных в памяти — V, старое ожидаемое значение — A, а новое измененное значение — B. Тогда операцию CAS можно разделить на три шага: 1) сравнить старое ожидаемое значение A со значением V в памяти; 2) если значения A и V равны, то установить значение V равным B; 3 ) Возвратите операцию, успешно ли.
  • На многопроцессорной машине, когда несколько потоков выполняют операции CAS над данными в памяти, процессор может гарантировать, что только один поток сможет успешно изменить данные.

2. Принцип реализации CAS

В Java операция CAS может быть реализована через класс Unsafe, а класс Unsafe в итоге вызывает нативный метод, то есть конкретная реализация реализуется методом в JVM. И инструкции в JVM для вызова процессора через C++cmpxchgбыть реализованным.

2.1 Реализация CAS в JAVA

  • на Явеjava.util.concurrent.atomicПакет предоставляет множество атомарных классов. Операции этих классов являются атомарными операциями. Все они реализуют атомарные операции в соответствии с методом CAS класса Unsafe, например:compareAndSwapInt()、compareAndSwapLong()、compareAndSwapObject()Все эти три метода являются методами CAS.
  • Например, в следующем примере потокобезопасная операция накопления count реализуется с помощью операции CAS класса AtomicInteger.
public class Demo {

    // 创建一个Integer类型的原子类
    private static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) {
        List<Thread> threadList = new ArrayList<>(10);
        // 创建10个线程,每个线程中的run()方法将count累加10000次
        for (int i = 0; i < 10; i++) {
            threadList.add(new Thread(()-> {
                // 每个线程累加10000次
                for (int j = 0; j < 10000; j++) {
                    // 实现递增,incrementAndGet()方法等价于非原子操作里面的 count++
                    count.incrementAndGet();
                }
            }));
        }
        // 将10个线程启动
        for (int i = 0; i < 10; i++) {
            threadList.get(i).start();
        }

        // 让主线程休眠10秒钟,休眠10秒钟是为了让前面的10个线程全部执行完成,如果10秒钟不够,可以设置得更加长一点
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 打印count的值
        // 最终发现从控制台打印出来的结果为100000
        System.out.println(count.get());

    }
}
  • Когда указанные выше 10 потоков одновременно накапливают счетчик, для накопления используется метод AtomicInteger.incrementAndGet(), что обеспечивает атомарность. Итак, 10 потоков, каждый поток накапливает 10 000 раз, а окончательный результат подсчета равен 100 000. Далее давайте посмотрим, как класс AtomicInteger реализует атомарные операции через исходный код.
  • Часть исходного кода класса AtomicInteger выглядит следующим образом. Из исходного кода мы обнаружили, что AtomicInteger вызывает метод getAndAddInt() класса Unsafe.
public class AtomicInteger extends Number implements java.io.Serializable {
   
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    public final int incrementAndGet() {
    	return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
	}
}
  • Часть исходного кода класса Unsafe выглядит следующим образом. Из исходного кода можно узнать, что метод getAndAddInt() будет циклически вызывать метод compareAndSwapInt() (то есть метод CAS), и в случае сбоя CAS получит новое значение из памяти, вызвав метод getIntVolatile( ), а затем выполнять операцию CAS до тех пор, пока она не завершится успешно. Метод compareAndSwapInt() является родным методом, и конкретная реализация выполняется JVM.
public final class Unsafe {

    private static final Unsafe theUnsafe = new Unsafe();

    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
            // 循环调用compareAndSwapInt(),直到设置成功
            // 如果设置失败,则通过getIntVolatile()方法从内存中获取新的值,然后再进行CAS操作
        } while (!compareAndSwapInt(o, offset, v, v + delta));
        return v;
    }

    // native方法由JVM实现
    public final native boolean compareAndSwapInt(Object o, long offset,
                                                  int expected,
                                                  int x);
}
  • Чтобы увидеть исходный код JVM, вам необходимо скачать исходный код OpenJDK, который можно скачать с github (github.com/openjdk/jdk), после входа на гитхаб можно выбрать скачивание разных версий OpenJDK под тегом ветка, тут автор скачиваетjdk8-b120версия OpenJDK.
  • Нативный метод в Java вызывается к JVM через JNI. Для класса Unsafe исходный код JVM будет соответствовать файлу unsafe.cpp. Путь к файлу:jdk-jdk8-b120/hotspot/src/share/vm/prims/unsafe.cpp. Ближе к концу файла unsafe.cpp можно увидеть следующий код (перехватывается только часть исходного кода).
static JNINativeMethod methods_18[] = {
    {CC"getObject",        CC"("OBJ"J)"OBJ"",   FN_PTR(Unsafe_GetObject)},
    {CC"putObject",        CC"("OBJ"J"OBJ")V",  FN_PTR(Unsafe_SetObject)},
    {CC"getObjectVolatile",CC"("OBJ"J)"OBJ"",   FN_PTR(Unsafe_GetObjectVolatile)},
    {CC"putObjectVolatile",CC"("OBJ"J"OBJ")V",  FN_PTR(Unsafe_SetObjectVolatile)},

    {CC"getAddress",         CC"("ADR")"ADR,             FN_PTR(Unsafe_GetNativeAddress)},
    {CC"putAddress",         CC"("ADR""ADR")V",          FN_PTR(Unsafe_SetNativeAddress)},

    {CC"allocateMemory",     CC"(J)"ADR,                 FN_PTR(Unsafe_AllocateMemory)},
    {CC"reallocateMemory",   CC"("ADR"J)"ADR,            FN_PTR(Unsafe_ReallocateMemory)},
    {CC"freeMemory",         CC"("ADR")V",               FN_PTR(Unsafe_FreeMemory)},

    {CC"objectFieldOffset",  CC"("FLD")J",               FN_PTR(Unsafe_ObjectFieldOffset)},
    {CC"staticFieldOffset",  CC"("FLD")J",               FN_PTR(Unsafe_StaticFieldOffset)},
    {CC"staticFieldBase",    CC"("FLD")"OBJ,             FN_PTR(Unsafe_StaticFieldBaseFromField)},
    {CC"ensureClassInitialized",CC"("CLS")V",            FN_PTR(Unsafe_EnsureClassInitialized)},
    {CC"arrayBaseOffset",    CC"("CLS")I",               FN_PTR(Unsafe_ArrayBaseOffset)},
    {CC"arrayIndexScale",    CC"("CLS")I",               FN_PTR(Unsafe_ArrayIndexScale)},
    {CC"addressSize",        CC"()I",                    FN_PTR(Unsafe_AddressSize)},
    {CC"pageSize",           CC"()I",                    FN_PTR(Unsafe_PageSize)},

    {CC"defineClass",        CC"("DC_Args")"CLS,         FN_PTR(Unsafe_DefineClass)},
    {CC"allocateInstance",   CC"("CLS")"OBJ,             FN_PTR(Unsafe_AllocateInstance)},
    {CC"monitorEnter",       CC"("OBJ")V",               FN_PTR(Unsafe_MonitorEnter)},
    {CC"monitorExit",        CC"("OBJ")V",               FN_PTR(Unsafe_MonitorExit)},
    {CC"tryMonitorEnter",    CC"("OBJ")Z",               FN_PTR(Unsafe_TryMonitorEnter)},
    {CC"throwException",     CC"("THR")V",               FN_PTR(Unsafe_ThrowException)},
    {CC"compareAndSwapObject", CC"("OBJ"J"OBJ""OBJ")Z",  FN_PTR(Unsafe_CompareAndSwapObject)},
    // 对应Unsafe.java文件中的compareAndSwapInt()方法,Unsafe_CompareAndSwapInt()方法时对应的C++方法
    {CC"compareAndSwapInt",  CC"("OBJ"J""I""I"")Z",      FN_PTR(Unsafe_CompareAndSwapInt)},
    {CC"compareAndSwapLong", CC"("OBJ"J""J""J"")Z",      FN_PTR(Unsafe_CompareAndSwapLong)},
    {CC"putOrderedObject",   CC"("OBJ"J"OBJ")V",         FN_PTR(Unsafe_SetOrderedObject)},
    {CC"putOrderedInt",      CC"("OBJ"JI)V",             FN_PTR(Unsafe_SetOrderedInt)},
    {CC"putOrderedLong",     CC"("OBJ"JJ)V",             FN_PTR(Unsafe_SetOrderedLong)},
    {CC"park",               CC"(ZJ)V",                  FN_PTR(Unsafe_Park)},
    {CC"unpark",             CC"("OBJ")V",               FN_PTR(Unsafe_Unpark)}
}
  • Как видно из приведенного выше кода, нативные методы класса Unsafe найдут здесь соответствующие методы C++. Как мы упоминали вышеcompareAndSwapInt()、compareAndSwapLong()、compareAndSwapObject()Эти три метода соответствуют файлам unsafe.cpp вUnsafe_CompareAndSwapInt()、Unsafe_CompareAndSwapLong()、Unsafe_SetOrderedObject(). Итак, когда мы звонимUnsafe.compareAndSwapInt()метод, в конечном итоге он будет вызываться для файла unsafe.cppUnsafe_CompareAndSwapInt()метод. Исходный код Unsafe_CompareAndSwapInt() выглядит следующим образом:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  // 核心代码是Atomic::cmpxchg()
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
  • Unsafe_CompareAndSwapInt()Основной кодAtomic::cmpxchg(x, addr, e), его роль в конечном счете состоит в том, чтобы вызвать процессорCMPXCHGинструкция. Для разных операционных систем,Atomic::cmpxchg()Есть разные реализации. Например, для 64-битной системы Linux он, наконец, вызываетjdk-jdk8-b120/hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hppв файлеAtomic::cmpxchg()метод; для операционной системы Windows он вызываетjdk-jdk8-b120/hotspot/src/os_cpu/windows_x86/vm/atomic_windows_x86.inline.hppв каталогеAtomic::cmpxchg()метод.
  • Обычно наши серверы представляют собой 64-битные Linux-системы, поэтому в качестве примера ниже приведены 64-битные Linux-системы.Atomic::cmpxchg()Исходный код выглядит следующим образом.
inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}
  • \__asm__Это означает встроить кусок ассемблерного кода,LOCK_IF_MP(%4)Указывает, что кэш или шина заблокированы,cmpxchglна ассемблере比较并交换, что гарантирует атомарные операции. (Что касается инструкций на аппаратном уровне, я не буду их здесь подробно изучать, а заинтересованные друзья могут о них узнать)
  • Итак, в конце концов, на уровне Java работа CAS реализуется через несколько собственных модифицированных методов CAS, предоставленных в классе Unsafe, а затем эти собственные методы будут вызывать методы в JVM и, наконец, передавать метод CAS в процессор.CMPXCHGинструкции для реализации атомарных операций.

2.2 Реализация атомарных операций в процессорах

Целью операции CAS является достижение атомарных операций.Атомарные операции, реализованные CAS в Java, фактически заканчиваются на процессоре, а процессор реализует атомарные операции. Так как же процессор реализует атомарные операции?

  • Разные типы процессоров имеют разные реализации атомарных операций. Ниже приведен пример процессора архитектуры x86.
  • Для однопроцессорной одноядерной системы для достижения атомарных операций, в дополнение к обеспечению того, чтобы операции чтения и записи памяти требовали выравнивания адресов, также необходимо гарантировать, что последовательность инструкций операции не прерывается. Для простых атомарных операций ЦП предоставит одну инструкцию, такую ​​​​как INC и другие инструкции, но для сложных атомарных операций необходимо выполнить несколько инструкций.В это время, если происходит переключение контекста, например переключение задач, это повлияет атомарная операция. Следовательно, в это время необходимо использовать спин-блокировку, чтобы гарантировать, что инструкция операции не будет прервана.
  • Для многопроцессорных систем, помимо гарантии однопроцессорных атомарных операций, также гарантируется отсутствие влияния других процессоров. Поскольку каждый ЦП имеет свои собственные кэши L1, L2 и L3, когда несколько ЦП одновременно записывают одни и те же данные в память, данные в конечном итоге будут неточными.

i++示意图

  • Как показано выше, дляi++, выполнить две операции i++, когда два процессора одновременноiПри записи в память окончательный результат в памяти равен 2, когда мы должны ожидать 3. Очевидно, что результат этой операции неверен.
  • Как так много процессоров достигают атомарных операций? Для процессоров архитектуры x86 предусмотрена блокировка шины или блокировка кэша для реализации атомарных операций.
  • (1) Блокировка автобуса. В приведенном выше примере операции i++ последней причиной неправильного результата является то, что несколько процессоров одновременно записывают общие переменные в память. Затем, если мы гарантируем, что когда ЦП перезаписывает общие переменные в памяти, другие ЦП не будут перезаписывать общие переменные в памяти, мы можем гарантировать правильность результата. Как мы все знаем, обмен данными между ЦП и памятью осуществляется через шину (шину). Когда CPU1 перезаписывает общую переменную в память, он выполняет инструкцию с префиксом Lock, которая выдает сигнал LOCK#, который блокирует шину.Во время блокировки шины другие CPU не могут получить доступ к шине и не могут получить доступ к шине Это означает, что другие ЦП не могут считывать и записывать данные в память через шину, поэтому в период блокировки шины ЦП, выдающий сигнал LOCK#, может использовать только общую память, тем самым обеспечивая атомарные операции.

总线加锁示意图

  • (2) Используйте блокировки кеша для обеспечения атомарности. Хотя блокировка шины может гарантировать атомарные операции, ее эффективность слишком низка, поскольку в период блокировки шины другие процессоры не могут получить доступ к шине, что сильно снижает производительность мультипроцессоров. На самом деле, когда мы гарантируем атомарные операции, нам нужно гарантировать атомарную операцию только для определенного адреса общей переменной в памяти, поэтому мы используем блокировку кеша для реализации атомарных операций.缓存锁定是指它会锁住这块内存区域的缓存并回写到内存,并使用缓存一致性机制来确保修改的原子性,缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据. (Это предложение относится ко второй главе, раздел 2.1, страница 10 книги «Искусство параллельного программирования на Java».) В настоящее время, поскольку шина не будет заблокирована, атомарные операции и производительность гарантируются. В настоящее время в некоторых сценариях процессоры используют блокировки кэша вместо блокировки шины.

3. Проблемы, возникающие в связи с CAS

Хотя CAS решает атомарность, у него также есть три основные проблемы.

  • (1) Проблема АВА. Во время работы CAS сначала проверяется, изменилось ли значение.Если нет изменений, будет выполнена операция обновления.Если есть изменения, операция обновления не будет выполняться. Предположим, исходное значение равно A, затем оно обновляется до B, а затем обновляется до A. В это время, когда выполняется операция проверки CAS, значение в памяти по-прежнему равно A, и он ошибочно думает, что значение в памяти не изменилось, а затем выполнить операцию обновления, фактически в это время значение в памяти изменилось. Так как же решить проблему ABA? Вы можете добавлять номер версии каждый раз, когда обновляете значение, тогда A->B->A становится 1A->2B->3A, и в это время проблем с ABA не будет. Начиная с JDK1.5, класс AtomicStampedReference предоставляется в составе пакета JUC для решения проблемы ABA. Метод compareAndSet() этого класса сначала проверит, равна ли текущая ссылка ожидаемой ссылке, а затем проверит, равен ли текущий флаг ожидаемому флагу.Если они равны, метод casPair() будет вызван для выполнить операцию обновления.casPair()Наконец, метод вызывает метод CAS в классе Unsafe. Исходный код метода compareAndSet() класса AtomicStampedReference выглядит следующим образом.
public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
    Pair<V> current = pair;
    // 先检查期望的引用是否等于当前引用,期望的标识是否等于当前的标识
    // 然后才执行CAS操作(casPair()也是调用了CAS方法)
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}
private boolean casPair(Pair<V> cmp, Pair<V> val) {
    return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
  • (2) Проблемы с производительностью. CAS будет реализовывать атомарные операции в цикле.Если настройка долгосрочного цикла не удалась, он всегда будет занимать ЦП, что принесет много накладных расходов на выполнение ЦП и снизит производительность приложения.
  • (3) Гарантируется только атомарная операция общей переменной. При выполнении операций CAS над общей общей переменной могут быть гарантированы атомарные операции, но если операции выполняются над несколькими общими переменными одновременно, CAS не может гарантировать атомарность этих нескольких общих переменных одновременно. В настоящее время вы можете инкапсулировать несколько общих переменных в один объект, а затем использовать класс AtomicReference, предоставляемый в пакете JUC, для реализации атомарных операций. Другим решением является использование замков.

4. Резюме

  • Эта статья начинается с демонстрации в сочетании с классом AtomicInteger, постепенно углубляется в исходный код и знакомит с реализацией CAS-операций в Java, а затем знакомит с реализацией атомарных операций в процессоре.
  • Наконец, в этой статье обобщаются три основные проблемы, связанные с работой CAS, и текущие решения этих проблем.
  • Для реализации работы CAS в Java неотъемлемо использование класса Unsafe.Эта статья лишь кратко знакомит с использованием класса Unsafe.В следующей статье будет подробно проанализирован исходный код и сценарии использования класса Unsafe.

Использованная литература:

  1. Искусство параллельного программирования на Java

微信公众号

связанное предложение