В этой статье подробно рассматривается принцип реализации CAS и подробно рассматриваются инструкции ЦП.

распределенный

Введение в эту статью:

  • предисловие
  • Как обеспечить безопасность потоков
  • Анализ принципа CAS
  • Как ЦП гарантирует атомарные операции
  • Расшифровать низкоуровневые инструкции CAS
  • резюме

---

Друзья, статья сначала будет опубликована в паблике, если хотите, можете отсканировать QR-код в конце статьи и перейти по нему?

предисловие

В ежедневном процессе кодирования операции CAS в основном не используются напрямую, а используются через некоторые классы инструментов параллелизма, упакованные JDK в пакете java.util.concurrent.

Тем не менее, CAS по-прежнему является высокочастотной тестовой площадкой во время собеседования, поэтому вы должны стиснуть зубы и разбить этот навык.Это лучше, чем задавать три вопроса, не так ли?

Как правило, сначала их спрашивают о некоторых простых знаниях о параллелизме, и некоторые интервьюеры говорят более прямо:

biaoqing

Конечно, если вы действительно не понимаете CAS, вы можете подсказать интервьюеру технические моменты, в которых вы хороши, и использовать другие свои навыки, чтобы оттянуть игру.

Далее давайте рассмотрим пример кода:

// 类的成员变量
static int data = 0;
// main方法内代码
IntStream.range(0, 2).forEach((i) -> {
        new Thread(() -> {
                try {
                        Thread.sleep(20);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
                IntStream.range(0, 100).forEach(y -> {
                        data++;
                });
        }).start();
});

    try {
            Thread.sleep(2000);
    } catch (InterruptedException e) {
            e.printStackTrace();
    }

System.out.println(data);
}

Разбирайтесь со схемой:

线程不安全

Код выше, проблема очевидна, данные — это переменная-член в классе, тип int, то есть общий ресурс. Когда несколько потоков выполняются одновременноdata++Во время работы результат может быть не равен 200. Чтобы имитировать эффект, поток спит 20 миллисекунд, чтобы сделать поток готовым, и код выполняется много раз, но результат не 200.

Как обеспечить безопасность потоков

Результат выполнения примера кода показывает, что одновременная работа с общими переменными несколькими потоками приводит к неточным результатам, а потоки небезопасны. Как это решить?

Вариант 1. Используйте ключевое слово synchronized.

При использовании ключевого слова synchronized в потоке используется синхронизированный блок кода, а безопасность потока гарантируется собственным механизмом JVM.

Код синхронизированного ключа:

// 类中定义的Object锁对象
Object lock = new Object();
 
 // synchronized 同步块 () 中使用 lock 对象锁定资源
IntStream.range(0, 100).forEach(y -> {
        synchronized (lock.getClass()) {
                data++;
        }
});

synchronized保障线程安全

Вариант 2: использовать блокировку

В сценариях с высокой степенью параллелизма использование блокировки Lock значительно повысит производительность по сравнению с использованием ключевого слова synchronized. Потому что нижний слой Lock реализован через механизм AQS+CAS. Чтобы узнать о механизме AQS, обратитесь к предыдущим статьям >. Механизм CAS будет рассмотрен ниже в статье.

Код ключа с помощью блокировки:

// 类中定义成员变量  
Lock lock = new ReentrantLock();

// 执行 lock() 方法加锁,执行 unlock() 方法解锁
IntStream.range(0, 100).forEach(y -> {
        lock.lock();
        data++;
        lock.unlock();
});

Разбирайтесь со схемой:

Lock锁保障线程安全

Вариант 3. Используйте атомарные атомарные классы

В дополнение к двум вышеупомянутым решениям, есть ли более элегантное решение? Использование synchronized было оптимизировано после JDK 1.6.Если объем параллелизма невелик, он безопаснее, чем Lock, и имеет приемлемую производительность, поскольку гарантируется базовым механизмом JVM, а блокировки автоматически снимаются без жесткого кодирования. , способ снять блокировку. При использовании метода Lock нерегулярное использование метода unlock() может привести к тупиковой ситуации.

Все атомарные классы в параллельном пакете JDK следующие:

并发包原子类

Реализуйте код с помощью служебного класса AtomicInteger:

// 类中成员变量定义原子类
AtomicInteger atomicData = new AtomicInteger();

// 代码中原子类的使用方式
IntStream.range(0, 2).forEach((i) -> {
    new Thread(() -> {
            try {
                    Thread.sleep(20);
            } catch (InterruptedException e) {
                    e.printStackTrace();
            }
            IntStream.range(0, 100).forEach(y -> {
                  // 原子类自增
                    atomicData.incrementAndGet();
            });
    }).start();
});

try {
        Thread.sleep(2000);
} catch (InterruptedException e) {
        e.printStackTrace();
}

// 通过 get () 方法获取结果
System.out.println(atomicData.get());

Разбирайтесь со схемой:

AtomicInteger实现

Рекомендуется использовать атомарный класс Atomic, поскольку его базовый уровень реализован на основе оптимистической блокировки CAS, которая будет подробно проанализирована ниже.

Вариант 4. Используйте атомарный класс LongAdder.

Новый атомарный класс LongAdder в JDK1.8 аналогичен классу AtomicInteger, упомянутому на схеме 3, и находится в параллельном пакете java.util.concurrent.atomic.

LongAdder подходит для сценариев с высокой степенью параллелизма, особенно для сценариев, в которых запись больше, чем чтение.По сравнению с AtomicInteger и AtomicLong, LongAdder имеет лучшую производительность за счет использования большего пространства и обмена местами на время.

Реализуйте код с помощью служебного класса LongAdder:

// 类中成员变量定义的LongAdder
LongAdder longAdderData = new LongAdder();

// 代码中原子类的使用方式
IntStream.range(0, 2).forEach((i) -> {
        new Thread(() -> {
                try {
                        Thread.sleep(20);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
                IntStream.range(0, 100).forEach(y -> {
                      // 使用 increment() 方法自增
                        longAdderData.increment();
                });
        }).start();
});

try {
        Thread.sleep(2000);
} catch (InterruptedException e) {
        e.printStackTrace();
}
// 使用 sum() 获取结果
System.out.println(longAdderData.sum());

Разбирайтесь со схемой:

LongAdder实现

Однако если используется атомарный класс LongAdder, конечно, его нижний слой также реализуется на основе механизма CAS. LongAdder внутренне поддерживает базовую переменную и массив Cell[]. Когда несколько потоков выполняют одновременную запись, каждый поток записывает в свою собственную ячейку. LongAdder возвращает приблизительно точное значение после операции и в конечном итоге вернет точное значение.

Другими словами, результаты, полученные после использования LongAdder, не являются реальными, и рекомендуется использовать другие атомарные классы, такие как AtomicInteger, для высоких требований к реальному времени.

изменчивая схема ключевых слов?

Может быть, некоторые друзья скажут и придумают другое решение: использовать volatileключевое слово.

volatile不能保障原子性

После проверки это невыполнимо. Вы можете попробовать. Просто выполните пример кода, приведенный в этой статье. Результат не равен 200, что указывает на то, что поток все еще небезопасен.

Назначение автоинкремента data++ не является атомарным, оно связано с моделью памяти Java.

На не потокобезопасной диаграмме есть аннотация для локального выполнения потока, и будет копия памяти, то есть локальная рабочая память, Фактический процесс выполнения будет проходить через следующие шаги:

(1) Поток выполнения считывает данные из локальной рабочей памяти.Если есть значение, оно будет получено напрямую.Если значения нет, оно будет считано из основной памяти, а затем помещено в локальную рабочую память .

(2) Исполняющий поток выполняет операцию +1 в локальной рабочей памяти.

(3) Запишите значение данных в основную память.

Вывод: Помните!

Простые операции чтения и присваивания переменной являются атомарными, а присваивание одной переменной другой — нет.

Модель памяти Java (JMM) гарантирует только то, что основные операции чтения и присваивания переменных являются атомарными, и ничего больше не гарантируется. Если вы хотите сделать определенный блок кода атомарным, вам нужно использовать ключевое слово synchronized, блокировку Lock в параллельном пакете и различные типы атомарных классов в Atomic в параллельном пакете.Возможны все четыре варианта.

иvolatileПеременные, модифицированные ключевыми словами, не могут гарантировать атомарность, а только обеспечивают видимость и порядок.

Анализ принципа CAS

CAS считается оптимистичной блокировкой, с оптимистичными блокировками и пессимистическими блокировками.

В приведенном выше примере мы используем синхронизированный. Если давление конкуренции потоков велико, синхронизированный будет обновлен до тяжеловесной блокировки. В это время только один поток может войти в блок кода для выполнения. Если блокировку нельзя снять, другой Поток заблокируется и будет ждать вечно. На данный момент его можно считать пессимистичным замком.

Пессимистическая блокировка вызовет переключение системного контекста из-за блокировки потоков, что приведет к высоким издержкам производительности системы.

Затем мы можем использовать оптимистическую блокировку для решения проблемы.Так называемая оптимистическая блокировка на самом деле является своего рода мыслью.

Оптимистичные замки будут относиться к вещам более оптимистично и думать, что они могут успешно работать. Когда несколько потоков работают с одним и тем же общим ресурсом, только один поток может успешно получить блокировку одновременно.При оптимистической блокировке другие потоки обнаруживают, что не могут успешно получить блокировку, и не блокируют поток, как пессимистическая блокировка, а возвращают напрямую, вы можете повторить попытку, чтобы снова получить блокировку, или вы можете выйти напрямую.

CAS — это основная реализация алгоритма оптимистической блокировки.

Нижний уровень AtomicInteger, LongAdder и Lock упоминается в схеме примера кода Кроме того, конечно, все атомарные классы в concurrent-пакете java.util.concurrent.atomic реализованы на основе CAS.

Возьмите атомарный целочисленный класс AtomicInteger в качестве примера для анализа базового механизма реализации CAS.

atomicData.incrementAndGet()

Исходный код выглядит следующим образом:

// 提供自增易用的方法,返回增加1后的值
public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

// 额外提供的compareAndSet方法
public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

// Unsafe 类的提供的方法
public final int getAndAddInt (Object o,long offset, int delta){
        int v;
        do {
                v = getIntVolatile(o, offset);
        } while (!weakCompareAndSetInt(o, offset, v, v + delta));
        return v;
}

Мы видели, что внутренние методы AtomicInteger реализованы на основе класса Unsafe, который представляет собой класс инструмента репликации, взаимодействующий с базовыми аппаратными инструкциями ЦП.

Видно по этому коду:

unsafe.compareAndSwapInt(this, valueOffset, expect, update)

Так называемый CAS на самом деле является аббревиатурой, полное название Compare And Swap, а обмен данными происходит после сравнения. Вышеупомянутый метод имеет несколько важных параметров:

(1) это, сам объект Unsafe, должен получить адрес смещения памяти значения через этот класс.

(2) valueOffset, адрес смещения в памяти переменной значения.

(3) ожидать, ожидать обновленное значение.

(4) обновление, последнее значение для обновления.

Если значение value в атомарной переменной равно ожидаемому, обновите значение значением обновления и верните true, в противном случае верните false.

Давайте посмотрим, как получить valueOffset:

// Unsafe实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
        try {
              // 获得value在AtomicInteger中的偏移量
                valueOffset = unsafe.objectFieldOffset
                        (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
}
// 实际变量的值
private volatile int value;

Здесь мы видим фактическое значение переменной, которое создаетсяvolatileКлючевое слово изменено, чтобы гарантировать, что в условиях многопоточностивидимость памяти.

Почему я могу получить экземпляр класса Unsafe через метод Unsafe.getUnsafe()? На самом деле, поскольку класс AtomicInteger такжеrt.jar Ниже пакета, поэтому передается класс AtomicIntegerBootstrap корневой загрузчик классовзагружен.

Исходный код выглядит следующим образом:

@CallerSensitive
public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();
        // Bootstrap 类加载器是C++的,正常返回null,否则就抛异常。
        if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
                throw new SecurityException("Unsafe");
        } else {
                return theUnsafe;
        }
}

Отношения делегирования загрузчика классов:

类加载器

Как ЦП реализует атомарные операции

Скорость процессора процессора намного больше, чем в основной памяти.Чтобы решить разницу в скорости, между ними устанавливаются многоуровневые кеши, такие как кеши уровня L1, L2 и L3.Чем ближе эти кеши к ЦП, тем быстрее они будут использоваться чаще. Данные операции кэшируются здесь для увеличения скорости доступа, как показано на следующем рисунке:

CPU架构

Теперь все они являются многоядерными процессорами ЦП.Каждый процессор ЦП поддерживает байт памяти, а каждое ядро ​​поддерживает байт кэш-памяти.При одновременном чтении и записи нескольких потоков возникает несогласованность данных кэша.

В этот момент процессор обеспечивает:

  • автобусный замок

Когда процессор хочет работать с общей переменной, он выдает сигнал блокировки на шине BUS, и другие процессоры не могут работать с общей переменной.

Недостаток очевиден: когда блокировка шины блокирует запросы других процессоров на получение общей переменной, это также может привести к большому количеству блокировок, тем самым увеличивая нагрузку на систему.

  • блокировка кеша

Более поздние процессоры предоставляют механизм блокировки кеша, то есть, когда процессор работает с общей переменной в кеше, другие процессоры будут иметь механизм перехвата для аннулирования кеша общей переменной других процессоров, когда другие потоки считывают последние данные. из основной памяти реализован на основе протокола когерентности кэш-памяти MESI.

Современные процессоры в основном поддерживают и используют механизм блокировки кэша.

Уведомление:

Есть две ситуации, в которых процессор не будет использовать блокировку кеша:

(1) Когда данные операции занимают несколько строк кэша или не кэшируются внутри процессора, процессор будет использовать блокировку шины.

(2) Некоторые процессоры не поддерживают блокировку кэша, например: Процессоры Intel 486 и Pentium также вызывают блокировку шины.

Расшифровать низкоуровневые инструкции CAS

На самом деле, освоив вышеизложенное, понимание механизма CAS становится относительно ясным.

Конечно, если вам интересно, вы можете продолжить подробное изучение того, какие аппаратные инструкции ЦП используются.

Базовое оборудование семантически реализует несколько операций в CAS на аппаратном уровне и обеспечивает атомарные операции с помощью инструкции процессора. Эти инструкции заключаются в следующем:

(1) Тест и установка (Tetst-and-Set)

(2) Выборка и увеличение

(3) Обмен

(4) Сравнить и поменять местами

(5) Связанное с загрузкой/условное сохранение

Большая часть первых трех процессоров реализована, а последние два являются новыми дополнениями к современным процессорам. И по разным архитектурам есть существенные отличия в инструкциях.

В IA64 набор инструкций x86 имеетcmpxchgИнструкция дополняет функцию CAS, которая также доступна в sparc-TSO.casaреализации инструкций, в то время как на архитектурах ARM и PowerPC параldrex/strexинструкция по завершению функции LL/SC. В архитектуре с сокращенным набором команд обычно используется пара инструкций, например:load and reserveиstore conditional Реализованная CAS является очень легковесной операцией на большинстве процессоров, что также является ее преимуществом.

Основные методы CAS в sun.misc.Unsafe:

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

Эти три метода могут соответствовать просмотру исходного кода точки доступа openjdk:

Местоположение источника:hotspot/src/share/vm/prims/unsafe.cpp

#define FN_PTR(f) CAST_FROM_FN_PTR(void*, &f)

{CC"compareAndSwapObject", CC"("OBJ"J"OBJ""OBJ")Z",  FN_PTR(Unsafe_CompareAndSwapObject)},

{CC"compareAndSwapInt",  CC"("OBJ"J""I""I"")Z",      FN_PTR(Unsafe_CompareAndSwapInt)},

{CC"compareAndSwapLong", CC"("OBJ"J""J""J"")Z",      FN_PTR(Unsafe_CompareAndSwapLong)},

Вышеупомянутые три метода в конечном итоге вызовут унифицированную функцию cmpxchg в реализации исходного кода точки доступа, а основной код можно найти в исходном коде точки доступа.

Адрес источника:hotspot/src/share/vm/runtime/Atomic.cpp

Исходный код функции cmpxchg:

jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte*dest, jbyte compare_value) {
         assert (sizeof(jbyte) == 1,"assumption.");
         uintptr_t dest_addr = (uintptr_t) dest;
         uintptr_t offset = dest_addr % sizeof(jint);
         volatile jint*dest_int = ( volatile jint*)(dest_addr - offset);
         // 对象当前值
         jint cur = *dest_int;
         // 当前值cur的地址
         jbyte * cur_as_bytes = (jbyte *) ( & cur);
         // new_val地址
         jint new_val = cur;
         jbyte * new_val_as_bytes = (jbyte *) ( & new_val);
          // new_val存exchange_value,后面修改则直接从new_val中取值
         new_val_as_bytes[offset] = exchange_value;
         // 比较当前值与期望值,如果相同则更新,不同则直接返回
         while (cur_as_bytes[offset] == compare_value) {
          // 调用汇编指令cmpxchg执行CAS操作,期望值为cur,更新值为new_val
             jint res = cmpxchg(new_val, dest_int, cur);
             if (res == cur) break;
             cur = res;
             new_val = cur;
             new_val_as_bytes[offset] = exchange_value;
         }
         // 返回当前值
         return cur_as_bytes[offset];
}

Комментарии добавляются к определенным переменным в исходном коде, потому что все они представляют собой код C++, так что вы можете их понять ~

jint res = cmpxchg(new_val, dest_int, cur);

Здесь вызывается ассемблерная инструкция cmpxchg, которая также содержит три параметра, которые могут соответствовать параметрам на CAS.

Суммировать

Ни одна технология не является панацеей, чтобы найти подходящий сценарий, то же самое верно и для механизма CAS, который также имеет побочные эффекты.

Вопрос 1:

В качестве реализации оптимистической блокировки, когда многопоточная конкуренция за ресурсы является жесткой, а обработка заблокированных ресурсов требует много времени, другие потоки должны учитывать ограничение на количество вращений, чтобы избежать чрезмерного потребления ресурсов ЦП.

Кроме того, вы можете рассмотреть LongAdder, упомянутый в примере кода выше, для решения проблемы, LongAdder использует пространство за время для решения проблемы занятых ресурсов ЦП в течение длительного времени после большого количества сбоев CAS, что увеличивает накладные расходы на производительность системы.

Вопрос 2:

A-->B--->AПроблема, предположим, что есть переменная A, измененная на B, а затем измененная на A. Она действительно была изменена, но CAS может этого не заметить, что приводит к необоснованной операции изменения значения.

С целочисленным типом все в порядке, но что, если это ссылочный тип объекта, который содержит несколько переменных? Добавьте номер версии или отметку времени, не проблема!

В пакете параллелизма java.util.concurrent.atomic в JDK он предоставляетAtomicStampedReference, создав номер версии в виде штампа для ссылки, чтобы обеспечить правильность работы CAS.

Я надеюсь, что все соберут и переварят эту статью.CAS — очень важный алгоритм в базовой реализации параллельных пакетов JDK.

Написать статью непросто, поправьте меня, если в статье есть какие-то проблемы!

Добро пожаловать, чтобы обратить внимание на мою официальную учетную запись, отсканируйте QR-код, чтобы следовать, чтобы разблокировать больше интересных статей и расти вместе с вами ~Java爱好者社区