Основы: Подробный принцип разблокировки, базовая реализация синхронизированного, volatile+cas

Java
Основы: Подробный принцип разблокировки, базовая реализация синхронизированного, volatile+cas
  • С появлением многопроцессорности и многопоточности конкуренция за общие ресурсы (устройства, данные и т. д.) часто приводит к тому, что использование ресурсов выглядит случайным и неупорядоченным.
  • Например: поток хочет вывести «Я в порядке» на консоль, сразу после написания «Я», другой поток вытесняет консоль для вывода «непослушный», в результате чего получается «Я непослушный»; для использования вытеснения ресурсов, что Можем мы сделать? Конечно, это не холодный салат, вы можете использовать блокировки для управления синхронизацией, чтобы другие потоки не могли вытеснить использование ресурсов в период блокировки.

1 Классификация замков

  • пессимистический замок
    • Пессимистические блокировки: каждый раз, когда вы запрашиваете данные, вы думаете, что данные будут вытеснены и обновлены (пессимистическое мышление); поэтому каждый раз, когда вы оперируете данными, вы должны сначала добавить блокировку, а другие потоки должны ждать, чтобы получить блокировку при изменении данные. Он подходит для сценария больше писать и меньше читать, синхронизированный — это своего рода пессимистичный замок.
  • оптимистическая блокировка
    • При запросе данных чувствуется, что модификацию никто не упреждает. Когда данные фактически обновляются, оценивается, изменили ли их другие в течение этого периода (заранее прочитайте номер версии или отметку времени обновления и оцените, изменились ли они во время обновления, если изменений нет, никто не будет их изменять). в течение периода); Данные позволяют другим потокам изменять
  • блокировка спина
    • Одним словом, магия ходит по кругу. При попытке заблокировать ресурс, но сначала он заблокирован другими потоками, вместо блокировки и ожидания он снова будет заблокирован в цикле.
    • В сценарии, где блокировка часто удерживается в течение короткого периода времени, блокировка и приостановка потока приводят к частому переключению контекста ЦП, что может быть решено с помощью спин-блокировки; однако она занимает процессор, простаивающий во время вращения, поэтому не подходит для сценария длительного удержания блокировки

2 Основной принцип синхронного

  • В коде используется синхронизированная блокировка.Какой байткод после компиляции?
public class Test {
    public static void main(String[] args){
        synchronized(Test.class){
            System.out.println("hello");
        }
    }
}

Перехватите часть байт-кода следующим образом

    4: monitorenter
    5: getstatic    #9    // Field java/lang/System.out:Ljava/io/PrintStream; 
    8: ldc           #15   // String hello
    10: invokevirtual #17  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
    13: aload_1
    14: monitorexit

Байт-код имеет две инструкции: 4: monitorenter и 14: monitorexit; буквальное понимание состоит в том, чтобы контролировать вход и выход монитора. Под ним можно понимать блокировку перед выполнением блока кода и разблокировку при выходе из синхронизации

  • Что за нашими спинами делают monitorenter и monitorexit?
  • При выполнении инструкции monitorenter поток связывает объект ObjectMonitor с объектом блокировки.
objectMonitor.cpp
  ObjectMonitor() {
    _header       = NULL;
    _count        = 0;   \\用来记录获取该锁的线程数
    _waiters      = 0,
    _recursions   = 0;    \\锁的重入次数
    _object       = NULL;
    _owner        = NULL;  \\当前持有ObjectMonitor的线程
    _WaitSet      = NULL;  \\wait()方法调用后的线程等待队列
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    _cxq          = NULL ; \\阻塞等待队列
    FreeNext      = NULL ;
    _EntryList    = NULL ; \\synchronized 进来线程的排队队列
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;  \\自旋计算
    OwnerIsThread = 0 ;
  }
  • Каждый поток имеет два списка объектов ObjectMonitor, а именно списки свободных и используемых.Если текущий список свободных объектов пуст, поток запросит выделение объекта ObjectMonitor в глобальный глобальный список.
  • Свойства владельца ObjectMonitor, WaitSet, Cxq и EntryList более важны. Элементы очереди WaitSet, Cxq и EntryList — это объект после оборачивания потока — ObjectWaiter; а поток, который получает владельца, — это поток, который получает блокировку.
  • Метод выполнения, соответствующий monitorenter
void ATTR ObjectMonitor::enter(TRAPS)  {
    ...
    //获取锁:cmpxchg_ptr原子操作,尝试将_owner替换为自己,并返回旧值
    cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
    ...
    // 重复获取锁,次数加1,返回
    if (cur == Self) {
        _recursions ++ ;
        return ;
    }
    //首次获取锁情况处理
    if (Self->is_lock_owned ((address)cur)) {
        assert (_recursions == 0, "internal state error");
        _recursions = 1 ;
        _owner = Self ;
        OwnerIsThread = 1 ;
        return ;
    }
    ...
    //尝试自旋获取锁
    if (Knob_SpinEarly && TrySpin (Self) > 0) {
    ...
  • Исполнитель, соответствующий monitorexitЗаконvoid ATTR ObjectMonitor::exit(TRAPS)...Код слишком длинный, поэтому я не буду его публиковать. В основном рекурсии уменьшаются на 1, количество уменьшается на 1, или, если поток больше не удерживает владельца (блокировка без повторного входа), установить для владельца значение null, освободить состояние удержания блокировки и разбудить поток очередь Cxq

Суммировать

  • Когда поток встречает синхронизированную синхронизацию, он сначала попадает в очередь EntryList, а затем пытается установить переменную владельца в текущий поток, и в то же время счетчик счетчика в мониторе увеличивается на 1, то есть объект замок получается. в противном случае пройтиПопытаться повернуть определенное количество раз, чтобы заблокировать, в случае неудачи войти в очередь Cxq для блокировки и ждать
  • После выполнения потока удерживаемый владелец будет освобожден, переменная владельца будет восстановлена ​​до нуля, а счетчик уменьшится на 1, чтобы другие потоки могли войти, чтобы получить блокировку.
  • Принцип метода синхронной модификации аналогичен. Просто инструкция монитора не используется, а используется синхронизация метода идентификации ACC_SYNCHRONIZED
    public synchronized void lock(){
        System.out.println("world");
    }
....
  public synchronized void lock();
    descriptor: ()V
    flags: (0x0029) ACC_PUBLIC, ACC_SYNCHRONIZED
    Code:
      stack=2, locals=0, args_size=0
         0: getstatic     #20                 // Field java/lang/System.out:Ljava/io/PrintStream;
         3: ldc           #26                 // String world
         5: invokevirtual #28                 // Method java/io/PrintStream.println:(Ljava/lang/String;)V

  • Synchronized — это повторно входящая, несправедливая блокировка, потому что поток entryList будет вращаться, чтобы попытаться заблокировать первым, вместо того, чтобы присоединиться к очереди cxq для ожидания, несправедливо

3 Принцип методов ожидания и уведомления объекта

  • ожидание, уведомление должно вызываться потоком, удерживающим текущую блокировку объекта. Монитор (блокировка объекта относится к ObjectMonitor/Monitor, объект блокировки относится к объекту)
  • Как упоминалось выше, когда объект блокировки Object вызывает ожидание в синхронизированном режиме, он присоединится к очереди waitSet, а объектом-элементом WaitSet является ObjectWaiter.
class ObjectWaiter : public StackObj {
 public:
  enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ;
  enum Sorted  { PREPEND, APPEND, SORTED } ;
  ObjectWaiter * volatile _next;
  ObjectWaiter * volatile _prev;
  Thread*       _thread;
  ParkEvent *   _event;
  volatile int  _notified ;
  volatile TStates TState ;
  Sorted        _Sorted ;           // List placement disposition
  bool          _active ;           // Contention monitoring is enabled
 public:
  ObjectWaiter(Thread* thread);
  void wait_reenter_begin(ObjectMonitor *mon);
  void wait_reenter_end(ObjectMonitor *mon);
};

При вызове метода wait() блокировки объекта поток будет инкапсулирован как ObjectWaiter и, наконец, приостановлен с использованием метода park.

//objectMonitor.cpp
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS){
    ...
    //线程封装成 ObjectWaiter对象
    ObjectWaiter node(Self);
    node.TState = ObjectWaiter::TS_WAIT ;
    ...
    //一系列判断操作,当线程确实加入WaitSet时,则使用park方法挂起
    if (node._notified == 0) {
        if (millis <= 0) {
            Self->_ParkEvent->park () ;
        } else {
            ret = Self->_ParkEvent->park (millis) ;
        }
    }

И когда блокировка объекта использует notify()

  • Если waitSet пуст, вернитесь напрямую
  • waitSet не пуст Получить ObjectWaiter из waitSet, а затем добавить его в EntryList в соответствии с другими политиками или передатьAtomic::cmpxchg_ptrДобавлена ​​операция отжима по инструкцииочередь cxqИли сразу разблокировать проснуться
void ObjectMonitor::notify(TRAPS){
    CHECK_OWNER();
    //waitSet为空,则直接返回
    if (_WaitSet == NULL) {
        TEVENT (Empty-Notify) ;
        return ;
    }
    ...
    //通过DequeueWaiter获取_WaitSet列表中的第一个ObjectWaiter
    Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
    ObjectWaiter * iterator = DequeueWaiter() ;
    if (iterator != NULL) {
    ....
    if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Front = _cxq ;
                iterator->_next = Front ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                    break ;
                }
            }
         }
     }
  • Метод notifyAll объекта соответствуетvoidObjectMonitor::notifyAll(TRAPS), процесс аналогичен уведомлению. Однако узел ObjectWaiter из WaitSet будет выведен из цикла for, а затем по очереди будут разбужены все потоки.

4 JVM-оптимизация синхронизированного

  • Сначала представим структуру заголовка объекта JAVA в 32-битной JVM.

  • Блокировка смещения

    • При разблокировке флаг блокировки равен 01, включая значение хеш-функции, генерацию возраста и смещенный флаг блокировки (0).
    • При применении предвзятой блокировки хеш-значение и часть бесполезной памяти будут преобразованы в информацию о потоке владельца блокировки, а также временную метку эпохи применения блокировки.В это время флаг блокировки не изменяется. , а флаг блокировки смещения изменяется на 1.
    • При блокировке сначала определите, согласуется ли текущий идентификатор потока с идентификатором потока MarkWord, и выполните код синхронизации, если они согласуются; если они несовместимы, проверьте, смещен ли флаг смещения, и используйте CAS для блокировки, если он не смещен. ;Сбой беспристрастной блокировки CASисмещенный замокПриводит к тому, что смещенный замок расширяется в облегченный замок или повторно смещается
    • Смещенная блокировка снимет блокировку только тогда, когда встретится с другими потоками, конкурирующими за смещенную блокировку, и поток не возьмет на себя инициативу снять смещенную блокировку.
  • Легкий замок

    • Когда несколько потоков конкурируют, смещенная блокировка становится облегченной блокировкой, а флаг блокировки равен 00.
    • Поток, который получает блокировку, сначала аннулирует предвзятую блокировку (в безопасной точке) и создаст запись блокировки LockRecord в кадре стека.MarkWord объекта копируется во вновь созданную LockRecord, а затем CAS пытается указать владелец записи LockRecord на объект блокировки, а затем укажите MarkWord объекта блокировки на замок, и блокировка будет успешной
    • Если блокировка CAS не работает, потокВращайте определенное количество раз, чтобы заблокировать, если он снова выйдет из строя, он будет модернизирован до тяжелого замка
  • тяжелый замок

    • Тяжелая блокировка — это механизм блокировки, введенный синхронизацией с использованием монитора Monitor.
    • Конкурирующие потоки яростны, и блокировка продолжает расширяться, становясь тяжеловесной блокировкой, которая также является блокировкой взаимного исключения.Флаг блокировки равен 10, а остальная часть содержимого MarkWord заменяется указателем на монитор блокировки объекта.
  • блокировка спина

    • Уменьшите ненужное переключение контекста ЦП; спин-блокировка используется, когда облегченная блокировка обновляется до тяжелой блокировки.
  • замок огрубление

    • Множественные операции блокировки также являются своего рода потреблением внутри JVM.Если несколько блокировок можно объединить в одну блокировку, можно уменьшить ненужные накладные расходы.
Test.class
//编译器会考虑将两次加锁合并
public void test(){
    synchronized(this){
        System.out.println("hello");   
    }
    synchronized(this){
        System.out.println("world");   
    }
}
  • снятие блокировки
    • Удалите ненужные операции блокировки. Если переменная является переменной стека, принадлежащей потоку, ее можно безопасно добавлять или не блокировать. Компилятор попытается устранить блокировку.
    • Включение устранения блокировки необходимо установить в параметре JVM.-server -XX:+DoEscapeAnalysis -XX:+EliminateLocks
//StringBuffer的append操作会加上synchronized,
//但是变量buf不加锁也安全的,编译器会把锁消除
public void test() {
    StringBuffer buf = new StringBuffer();
    buf.append("hello").append("world");
}
  • Другие методы оптимизации блокировки
    • Блокировка сегмента, блокировка сегмента — это не фактическая блокировка, а идея; ConcurrentHashMap — лучшая практика для изучения блокировки сегмента. В основном это разделение большого объекта на мелкие объекты, а затем операция блокировки большого объекта становится блокировкой маленького объекта, что увеличивает степень параллелизма.

5 Основной принцип CAS

  • существуетvolatile int i = 0; i++В , чтение и запись типа volatile атомарно синхронизированы, но i++ не может гарантировать синхронизацию, что нам делать?
  • Вы можете использовать синхронизированный для блокировки, также есть использование CAS (сравнение и обмен), идея использования оптимистической блокировки для синхронизации, сначала определить, изменилась ли общая переменная, и обновить, если нет. Давайте взглянем на несинхронизированную версию CAS.
int expectedValue = 1;
public boolean compareAndSet(int newValue) {
    if(expectedValue == 1){
        expectedValue = newValue;
        return ture;
    }
    return false;
}

В jdk есть синхронная версия решения CAS, в которой используется базовый метод UnSafe.java.

//UnSafe.java
    @HotSpotIntrinsicCandidate
    public final native boolean compareAndSetInt(Object o, long offset, int expected, int x) ..
    @HotSpotIntrinsicCandidate
    public final native int compareAndExchangeInt(Object o, long offset, int expected, int x)...

Давайте посмотрим на собственный метод, сравним AndSwapInt в Unsafe.cpp

//unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

В Linux x86 метод Atomic::cmpxchg реализован следующим образом.

/**
    1 __asm__表示汇编的开始;
    2 volatile表示禁止编译器优化;//禁止指令重排
    3 LOCK_IF_MP是个内联函数,
      根据当前系统是否为多核处理器,
      决定是否为cmpxchg指令添加lock前缀 //内存屏障
*/
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}

На данный момент можно сделать вывод, что:Механизм CAS, предоставляемый jdk, на уровне сборки запрещает оптимизацию инструкций с обеих сторон переменной, а затем использует инструкцию cmpxchg для сравнения и обновления значения переменной (атомарность), если она многоядерная, используйте блокировку блокировки ( блокировка кеша, МЭСИ)

6 Проблемы с работой синхронизации CAS

  • АВА-проблема
    • Поток X собирается изменить значение переменной с A на B, но в этот период поток Y меняет значение переменной с A на C, а затем на A; наконец, поток X обнаруживает, что значение переменной равно A и заменяет его на Б. Но на самом деле А больше не является первоначальным А.
    • Решение состоит в том, чтобы сделать переменную уникальным типом. К значению можно добавить номер версии или отметку времени. Если номер версии добавляется, модификация потока Y становится A1-> B2-> A3. В это время, если поток X снова обновляется, можно судить, что A1 не равен A3.
  • Только гарантированные атомарные операции над общей переменной
    • Атомарная работа только одной общей переменной гарантируется.При синхронизации нескольких общих переменных CAS цикла не может гарантировать атомарную операцию операции.

7 Принципов синхронизации блокировок на основе volatile + CAS

  • CAS может синхронизировать изменение только одной переменной, как мы должны использовать ее для блокировки блока кода?
  • Давайте сначала поговорим об элементах реализации блокировки
    • 1 Блок синхронизированного кода может выполняться только одним потоком за раз
    • 2 Операция блокировки должна происходить до операции в блоке кода синхронизации, а операция в блоке кода должна происходить до операции разблокировки
    • 3 После окончания синхронизированного блока кода измененные переменные видны другим потокам (видимость памяти)
  • Элемент 1: его можно реализовать с помощью атомарности CAS, только один поток может успешно манипулировать переменными в любой момент времени.
    • Сначала представьте, что общая переменная операции CAS является переменной состояния синхронизации связанного блока кода, и CAS обновляется до начала синхронизации.Переменные состоянияВ заблокированном состоянии после завершения синхронизации CASПеременные состояниябез замков
    • Если есть второй поток для блокировки в течение периода, будет обнаружено, что переменная состояния заблокирована, и выполнение блока синхронизированного кода будет прекращено.
  • Элемент 2: Используйте volatile для изменения переменных состояния, чтобы запретить перестановку инструкций
    • Volatile гарантирует, что операции в синхронизированном коде выполняются до операций разблокировки, а операции блокировки выполняются до операций в блоках кода.
  • Элемент 3: по-прежнему используйте volatile, барьеры памяти будут вставлены до и после инструкции записи volatile переменной.
    • Перед тем, как переменная состояния, модифицированная volatile, будет освобождена CAS от блокировки, грязные данные блока синхронизированного кода будут обновлены и видны каждому потоку.
//伪代码
volatile state = 0 ;   // 0-无锁 1-加锁;volatile禁止指令重排,加入内存屏障
...
if(cas(state, 0 , 1)){ // 1 加锁成功,只有一个线程能成功加锁
    ...                // 2 同步代码块
    cas(state, 1, 0);  // 3 解锁时2的操作具有可见性
}

8 LockSupport, чтобы узнать

  • LockSupport — это класс инструментов для работы с потоками, предоставляемый JDK на основе класса Unsafe.приостановить поток, разбудить поток. Unsafe.park, при выполнении операции unpark будет вызываться для выполнения переменный парковочный агент текущего потока. Код Паркера
JavaThread* thread=JavaThread::thread_from_jni_environment(env);
...
thread->parker()->park(isAbsolute != 0, time);
class PlatformParker : public CHeapObj {
  protected:
    //互斥变量类型
    pthread_mutex_t _mutex [1] ; 
   //条件变量类型
    pthread_cond_t  _cond  [1] ;
    ...
}

class Parker : public os::PlatformParker {  
private:  
  volatile int _counter ;  
  ...  
public:  
  void park(bool isAbsolute, jlong time);  
  void unpark();  
  ...  
}
  • В системе Linux мьютекс (mutex) и условие в библиотеке потоков POSIX pthread используются для приостановки и пробуждения потока.
  • Примечание: при парковке переменная счетчика устанавливается в 0, при разпарковке эта переменная устанавливается в 1
  • Когда порядок выполнения unpark и park отличается, изменения состояния counter и cond следующие:
    • Сначала парковать, потом разпарковывать; парковать: значение счетчика остается неизменным, но будет установлен конд; разпарковать: сначала счетчик увеличивается на 1, проверяется наличие конда, и счетчик уменьшается до 0
    • Сначала разпарковать, а затем припарковать; парковать: счетчик становится равным 1, но cond не устанавливается; разпарковать: счетчик уменьшается до 0 (поток не будет приостановлен из-за парковки)
    • Сначала отмените парковку несколько раз; счетчик установлен только на 1

9 Разница между LockSupport.park и Object.wait

  • Оба способа могут иметь приостановленные потоки
  • Поток должен ждать, пока Object.notify после Object.wait проснется
  • LockSupport может сначала разпарковать поток и дождаться выполнения потока. LockSupport.park не будет приостановлен, вы можете продолжить выполнение
  • Следует отметить, что даже если поток не припаркован несколько раз, это может привести к парковке потока только в первый раз, а не к зависанию.

10 AbstractQueuedSynchronizer(AQS)

  • AQS на самом деле является шаблоном блокировки, основанным на volatile+cas; если вам нужен блокирующий ожидание потока, механизм пробуждения, используйте LockSupport для приостановки и пробуждения потока.
//AbstractQueuedSynchronizer.java
public class AbstractQueuedSynchronizer{
    //线程节点
    static final class Node {
        ...
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        ...
    }    
    ....
    //head 等待队列头尾节点
    private transient volatile Node head;
    private transient volatile Node tail;
    // The synchronization state. 同步状态
    private volatile int state;  
    ...
    //提供CAS操作,状态具体的修改由子类实现
    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }
}
  • AQS поддерживает внутреннюю очередь синхронизации, а элемент — это узел, обертывающий поток.
  • Первый узел в очереди синхронизации — это узел, который получает блокировку. Когда он освобождает блокировку, он пробуждает узел-преемник. Когда узел-преемник получает блокировку, он устанавливает себя в качестве первого узла.
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}
  • Поток сначала попытается получить блокировку, и если это не удастся, он будет инкапсулирован как Node, а CAS присоединится к хвосту очереди синхронизации. При присоединении к хвосту очереди синхронизации он определит, является ли узел-предшественник головным узлом, и попытается заблокировать (возможно, узел-предшественник только что снял блокировку), в противном случае поток войдет в режим ожидания блокировки.

В AQS также есть внутренний класс ConditionObject, и его механизм использования аналогичен Object.wait и notify.

//AbstractQueuedSynchronizer.java
public class ConditionObject implements Condition, java.io.Serializable {
    //条件队列;Node 复用了AQS中定义的Node
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    ...
  • Каждый объект Condition содержит очередь условий FIFO элементов Node внутри.
  • Когда поток вызывает метод Condition.await(), поток снимает блокировку, создает узел для присоединения к очереди условий и переходит в состояние ожидания.
//类似Object.wait
public final void await() throws InterruptedException{
    ...
    Node node = addConditionWaiter(); //构造Node,加入条件队列
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //挂起线程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //notify唤醒线程后,加入同步队列继续竞争锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
  • При вызове Condition.signal получите головной узел очереди условий, переместите его в очередь синхронизации и используйте LockSupport для пробуждения потока в узле. Затем продолжайте выполнять состояние до того, как ожидание будет приостановлено, и вызовите acquireQueued(node, saveState), чтобы конкурировать за состояние синхронизации.
    //类似Object.notify
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
  • Механизм volatile+cas обеспечивает синхронизацию и видимость кода, а AQS инкапсулирует блокировку потока и ожидание приостановки, разблокируя логику пробуждения других потоков.. Подкласс AQS должен только оценить, может ли блокировка быть получена и может ли блокировка быть успешно снята в соответствии с переменной состояния.
  • Наследование AQS требует дополнительной перезаписи следующих интерфейсов.
protected boolean tryAcquire(int arg);//尝试独占性加锁
protected boolean tryRelease(int arg);//对应tryAcquire释放锁
protected int tryAcquireShared(int arg);//尝试共享性加锁
protected boolean tryReleaseShared(int arg);//对应tryAcquireShared释放锁
protected boolean isHeldExclusively();//该线程是否正在独占资源,只有用到condition才需要取实现它

11 Принцип ReentrantLock

  • ReentrantLock реализует интерфейс Lock и использует внутренний класс Sync (Sync наследует AbstractQueuedSynchronizer) для реализации операций синхронизации.
  • Синхронизация внутреннего класса ReentrantLock
abstract static class Sync extends AbstractQueuedSynchronizer{
    .... 
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //直接CAS状态加锁,非公平操作
                if (compareAndSetState(0, acquires)) { 
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
    ...
    //重写了tryRelease
    protected final boolean tryRelease(int releases) {
        c = state - releases; //改变同步状态
        ...
        //修改volatile 修饰的状态变量
        setState(c); 
        return free;
    }
}
  • Подклассы синхронизации NonfairSync и FairSync переопределяют метод tryAcquire.
  • Среди них tryAcquire из NonfairSync вызывает метод nonfairTryAcquire родительского класса, а FairSync переписывает логику tryAcquire самостоятельно. Который вызывает hasQueuedPredecessors(), чтобы определить, есть ли узел в очереди, и возвращает false, если он существует (false заставит текущий поток стоять в очереди в ожидании блокировки)
    static final class NonfairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    ....
    static final class FairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&   
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
    ....    

12 Демонстрационный пример монопольной блокировки AQS

public class TwinsLock implements Lock {

    private final Sync sync = new Sync(2);
    @Override
    public void lockInterruptibly() throws InterruptedException {  throw new RuntimeException(""); }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {throw new RuntimeException("");}
    @Override
    public Condition newCondition() {  return sync.newCondition(); }
    @Override
    public void lock() {  sync.acquireShared(1); }
    @Override
    public void unlock() {  sync.releaseShared(1); } }
    @Override
    public boolean tryLock() { return sync.tryAcquireShared(1) > -1;  }
}

Давайте еще раз посмотрим на код Sync

class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero");
            }
            setState(count);
        }
        @Override
        public int tryAcquireShared(int reduceCount) {
            for (; ; ) {
                int current = getState(); 
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }
        @Override
        public boolean tryReleaseShared(int returnCount) {
            for (; ; ) {
                int current = getState();
                int newCount = current + returnCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }
        public Condition newCondition() {
            return new AbstractQueuedSynchronizer.ConditionObject();
        }
    }

13. Могут ли потоки предотвратить бесконечные циклы с помощью блокировок?

  • Ответ не обязательно; это можно сделать для одного ресурса; но будут взаимоблокировки для нескольких ресурсов, например, поток A удерживает ресурс X и ждет ресурс Y, а поток B удерживает ресурс Y и ждет ресурс X
  • С блокировками к ресурсам можно добавить контроль состояния, но нам также необходимо предотвратить возникновение взаимоблокировок, нарушив одно из четырех условий, вызывающих взаимоблокировки.
  • 1 Ресурс не может быть занят двумя или более пользователями повторно
  • 2 Пользователь удерживает ресурс и ждет другого ресурса
  • 3 Ресурсы не могут быть вытеснены
  • 4 Несколько пользователей образуют цикл, ожидая ресурсов друг друга.

14 Может ли ThreadLocal гарантировать синхронизацию ресурсов?

  • Когда переменная объявляется с помощью ThreadLocal, ThreadLocal предоставляет независимую копию переменной для каждого потока, использующего эту переменную, и каждый поток может изменять свою собственную копию независимо, не затрагивая копии, соответствующие другим потокам.
  • Как видно из вышеизложенных концепций, ThreadLocal фактически не гарантирует синхронизацию переменных, а лишь присваивает каждому потоку копию переменной

Обратите внимание на публичный номер и общайтесь со всеми

Справочная статья