Поскольку wait() и notify() используются в сочетании с synchronized, если используется блокировка дисплея Lock, ее нельзя использовать. Следовательно, блокировка дисплея должна обеспечивать собственный механизм ожидания/уведомления, и появилось условие.
Состояние в
await()
метод эквивалентен методу объектаwait()
метод, в состоянииsignal()
метод эквивалентен методу объектаnotify()
метод, в состоянииsignalAll()
Эквивалент объектаnotifyAll()
метод. Разница в том, что в Objectwait(),notify(),notifyAll()
метод и"同步锁"
(синхронизированное ключевое слово) входит в комплект; Условие необходимо сочетать с"互斥锁"/"共享锁"
используется в связках.
1. Список функций
-
void await() throws InterruptedException
Текущий поток входит в состояние ожидания, пока он не получит сигнал или не будет прерван, текущий поток входит в состояние выполнения и возвращается из await(); -
void awaitUninterruptibly()
Текущий поток переходит в состояние ожидания до тех пор, пока не получит уведомление и не ответит на прерывание; -
long awaitNanos(long nanosTimeout) throws InterruptedException
Ответ тайм-аута добавляется на основе возвращаемого условия интерфейса 1. Возвращаемое значение указывает текущее оставшееся время.Если он проснулся до nanosTimeout, возвращаемое значение = nanosTimeout - фактическое затраченное время, а возвращаемое значение -
boolean await(long time, TimeUnit unit) throws InterruptedException
Ответ тайм-аута также добавляется на основе условия возврата интерфейса 1. Отличие от интерфейса 3: Единицу тайм-аута можно настроить; Возвращаемое значение возвращает значение true/false, если оно проснулось раньше времени, оно возвращает значение true, а если истекло время ожидания, оно возвращает значение false. -
boolean awaitUntil(Date deadline) throws InterruptedException
Текущий поток переходит в состояние ожидания до тех пор, пока он не будет уведомлен в указанное время в будущем.Если он не будет уведомлен в указанное время, вернуть true, иначе, по достижении указанного времени, вернуть false; -
void signal()
Разбудить поток, ожидающий условия; -
void signalAll()
Разбудите все потоки, ожидающие условия.
2. Конкретная реализация
Когда я увидел, что кто-то задал вопрос на компьютере, я использовал его как пример для демонстрации:
Написание Java-приложения требует трех процессов: ученик1, ученик2, учитель, где поток ученик1 готовится «заснуть» в течение 1 минуты перед началом занятия, а поток ученик2 готовится «заснуть» за 5 минут до начала занятия. После того, как учитель выведет 4 предложения слова «класс», он «будит» спящую нить ученик1, после того как нить ученик1 «пробуждается», он отвечает за «пробуждение» спящей нити ученик2.
2.1 Реализация первая:
На основе объекта и синхронизированной реализации.
package com.fantJ.bigdata;
/**
* Created by Fant.J.
* 2018/7/2 16:36
*/
public class Ten {
static class Student1{
private boolean student1Flag = false;
public synchronized boolean isStudent1Flag() {
System.out.println("学生1开始睡觉1min");
if (!this.student1Flag){
try {
System.out.println("学生1睡着了");
wait(1*1000*60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("学生1被唤醒");
return student1Flag;
}
public synchronized void setStudent1Flag(boolean student1Flag) {
this.student1Flag = student1Flag;
notify();
}
}
static class Student2{
private boolean student2Flag = false;
public synchronized boolean isStudent2Flag() {
System.out.println("学生2开始睡觉5min");
if (!this.student2Flag){
try {
System.out.println("学生2睡着了");
wait(5*1000*60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("学生2被唤醒");
return student2Flag;
}
public synchronized void setStudent2Flag(boolean student2Flag) {
notify();
this.student2Flag = student2Flag;
}
}
static class Teacher{
private boolean teacherFlag = true;
public synchronized boolean isTeacherFlag() {
if (!this.teacherFlag){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("老师准备吼着要上课");
return teacherFlag;
}
public synchronized void setTeacherFlag(boolean teacherFlag) {
this.teacherFlag = teacherFlag;
notify();
}
}
public static void main(String[] args) {
Student1 student1 = new Student1();
Student2 student2 = new Student2();
Teacher teacher = new Teacher();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0;i<4;i++){
System.out.println("上课");
}
teacher.isTeacherFlag();
System.out.println("学生1被吵醒了,1s后反应过来");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
student1.setStudent1Flag(true);
}
});
Thread s1 = new Thread(new Runnable() {
@Override
public void run() {
student1.isStudent1Flag();
System.out.println("准备唤醒学生2,唤醒需要1s");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
student2.setStudent2Flag(true);
}
});
Thread s2 = new Thread(new Runnable() {
@Override
public void run() {
student2.isStudent2Flag();
}
});
s1.start();
s2.start();
t.start();
}
}
Конечно, сnotifyAll
Может использовать меньше кода, хотя эта реализация сложна, одиночная производительность будет лучше, чем при использованииnotifyAll()
Гораздо надежнее, потому что нет потерь ресурсов, вызванных конфликтом блокировок. Но, как видите, код очень сложный, и между экземплярами нужно обеспечить хорошую изоляцию.
2.2 Реализация вторая:
На основе условия, реализации ReentrantLock.
public class xxx{
private int signal = 0;
public Lock lock = new ReentrantLock();
Condition teacher = lock.newCondition();
Condition student1 = lock.newCondition();
Condition student2 = lock.newCondition();
public void teacher(){
lock.lock();
while (signal != 0){
try {
teacher.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("老师叫上课");
signal++;
student1.signal();
lock.unlock();
}
public void student1(){
lock.lock();
while (signal != 1){
try {
student1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("学生1醒了,准备叫醒学生2");
signal++;
student2.signal();
lock.unlock();
}
public void student2(){
lock.lock();
while (signal != 2){
try {
student2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("学生2醒了");
signal=0;
teacher.signal();
lock.unlock();
}
public static void main(String[] args) {
ThreadCommunicate2 ten = new ThreadCommunicate2();
new Thread(() -> ten.teacher()).start();
new Thread(() -> ten.student1()).start();
new Thread(() -> ten.student2()).start();
}
}
Condition
зависит отLock
Интерфейс, основной код для создания условия:lock.newCondition()
перечислитьCondition
изawait()
иsignal()
метод, должен бытьlock
в пределах защиты, то есть он должен находиться в пределахlock.lock()
иlock.unlock
можно использовать между ними.
Как можно заметить, я отменилsynchronized
Ключевые слова метода, которые добавляются до и после каждого заблокированного метода.lock.lock(); lock.unlock();
для получения/разыгрывания блокировки и разыгрывания желаемого приведения перед снятием блокировкиCondition
объект. Так же мы используемsignal
для завершения связи между потоками.
3. Условие реализует ограниченную очередь
Зачем использовать его для реализации ограниченной очереди, потому что мы можем использовать Condition для достижения блокировки (когда очередь пуста или заполнена). Это избавляет нас от многих проблем.
public class MyQueue<E> {
private Object[] objects;
private Lock lock = new ReentrantLock();
private Condition addCDT = lock.newCondition();
private Condition rmCDT = lock.newCondition();
private int addIndex;
private int rmIndex;
private int queueSize;
MyQueue(int size){
objects = new Object[size];
}
public void add(E e){
lock.lock();
while (queueSize == objects.length){
try {
addCDT.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
objects[addIndex] = e;
System.out.println("添加了数据"+"Objects["+addIndex+"] = "+e);
if (++addIndex == objects.length){
addIndex = 0;
}
queueSize++;
rmCDT.signal();
lock.unlock();
}
public Object remove(){
lock.lock();
while (queueSize == 0){
try {
System.out.println("队列为空");
rmCDT.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object temp = objects[rmIndex];
objects[rmIndex] = null;
System.out.println("移除了数据"+"Objects["+rmIndex+"] = null");
if (++rmIndex == objects.length){
rmIndex = 0;
}
queueSize--;
addCDT.signal();
lock.unlock();
return temp;
}
public void foreach(E e){
if (e instanceof String){
Arrays.stream(objects).map(obj->{
if (obj == null){
obj = " ";
}
return obj;
}).map(Object::toString).forEach(System.out::println);
}
if (e instanceof Integer){
Arrays.stream(objects).map(obj -> {
if (obj == null ){
obj = 0;
}
return obj;
}).map(object -> Integer.valueOf(object.toString())).forEach(System.out::println);
}
}
}
add
Метод заключается в добавлении данных в очередь.remove
заключается в удалении данных из очереди по FIFO.foreach
метод является служебным методом для наблюдения за содержимым очереди, легко увидеть, что он используется для обхода.
public static void main(String[] args) {
MyQueue<Integer> myQueue = new MyQueue<>(5);
myQueue.add(5);
myQueue.add(4);
myQueue.add(3);
// myQueue.add(2);
// myQueue.add(1);
myQueue.remove();
myQueue.foreach(5);
}
添加了数据Objects[0] = 5
添加了数据Objects[1] = 4
添加了数据Objects[2] = 3
移除了数据Objects[0] = null
0
4
3
0
0
4. Анализ исходного кода
ReentrantLock.class
public Condition newCondition() {
return sync.newCondition();
}
прослеживаемость синхронизации:
private final Sync sync;
В классе Sync есть метод newCondition():
final ConditionObject newCondition() {
return new ConditionObject();
}
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
await源码:
public final void await() throws InterruptedException {
// 1.如果当前线程被中断,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 2.将节点加入到Condition队列中去,这里如果lastWaiter是cancel状态,那么会把它踢出Condition队列。
Node node = addConditionWaiter();
// 3.调用tryRelease,释放当前线程的锁
long savedState = fullyRelease(node);
int interruptMode = 0;
// 4.为什么会有在AQS的等待队列的判断?
// 解答:signal操作会将Node从Condition队列中拿出并且放入到等待队列中去,在不在AQS等待队列就看signal是否执行了
// 如果不在AQS等待队列中,就park当前线程,如果在,就退出循环,这个时候如果被中断,那么就退出循环
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 5.这个时候线程已经被signal()或者signalAll()操作给唤醒了,退出了4中的while循环
// 自旋等待尝试再次获取锁,调用acquireQueued方法
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
-
присоединиться к текущей теме
Condition
очередь блокировки. В частности, это отличается отAQS
, запись здесьCondition
изFIFO
очередь. -
Освободите замок. Здесь видно, что блокировка снята, иначе другие потоки не смогут получить блокировку и возникнет взаимоблокировка.
-
вращение
(while
) приостановить до пробуждения (signal
Поместите его обратно в очередь ожидания AQS) или тайм-аут или CACELLED и т. д. -
получить замок
(acquireQueued
). и удалить себя изCondition
изFIFO
Отпустили в очередь, указав, что блокировка мне больше не нужна (я уже получил блокировку).
signal()源码
public final void signal() {
if (!isHeldExclusively())
//如果同步状态不是被当前线程独占,直接抛出异常。从这里也能看出来,Condition只能配合独占类同步组件使用。
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//通知等待队列队首的节点。
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && //transferForSignal方法尝试唤醒当前节点,如果唤醒失败,则继续尝试唤醒当前节点的后继节点。
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//如果当前节点状态为CONDITION,则将状态改为0准备加入同步队列;如果当前状态不为CONDITION,说明该节点等待已被中断,则该方法返回false,doSignal()方法会继续尝试唤醒当前节点的后继节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); //将节点加入同步队列,返回的p是节点在同步队列中的先驱节点
int ws = p.waitStatus;
//如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,线程被唤醒后会执行acquireQueued方法,该方法会重新尝试将节点的先驱状态设为SIGNAL并再次park线程;如果当前设置前驱节点状态为SIGNAL成功,那么就不需要马上唤醒线程了,当它的前驱节点成为同步队列的首节点且释放同步状态后,会自动唤醒它。
//其实笔者认为这里不加这个判断条件应该也是可以的。只是对于CAS修改前驱节点状态为SIGNAL成功这种情况来说,如果不加这个判断条件,提前唤醒了线程,等进入acquireQueued方法了节点发现自己的前驱不是首节点,还要再阻塞,等到其前驱节点成为首节点并释放锁时再唤醒一次;而如果加了这个条件,线程被唤醒的时候它的前驱节点肯定是首节点了,线程就有机会直接获取同步状态从而避免二次阻塞,节省了硬件资源。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal
только что проснулсяCondition
Первый неОТМЕНЕННЫЙ поток узла в очереди, и signalAll должен разбудить все неОТМЕНЕННЫЕ потоки узла.CANCELLED
Поток узла, по сути, переводит узел изCondition
Убирается ли один из очереди или все узлы помещаются вAQS
очередь ожидания. Хотя всеNode
Они все могут быть разбужены, но вам нужно знать, что только один поток может получить блокировку, а другие потоки, которые не получают блокировку, все еще должны вращаться и ждать, просто перейдите к шагу 4, упомянутому выше (acquireQueued
).
Обзор процесса внедрения
мы знаем
Lock的本质是AQS
,AQS
Очередь, поддерживаемая сама по себе, — это очередь, ожидающая в данный момент ресурсов.AQS
После того, как ресурс будет освобожден, он по очереди разбудит все узлы в очереди от начала до конца, так что их соответствующие потоки возобновят выполнение до тех пор, пока очередь не станет пустой. иCondition
Я также поддерживаю очередь, роль очереди состоит в том, чтобы поддерживать ожиданиеsignal
Очередь сигналов. Однако роли двух очередей различны.На самом деле каждый поток будет существовать только в одной из двух вышеуказанных очередей одновременно.Процесс выглядит следующим образом:
- Поток 1 вызывает
reentrantLock.lock
, попробуйте получить замок. Возвращает в случае успеха изAQS
удалить поток из очереди, иначе заблокировать, оставитьAQS
в очереди ожидания. - Поток 1 вызывает
await
При вызове метода соответствующая операция добавляется вCondition
В очереди ожидания дождитесь сигнального сигнала, одновременно снимите блокировку. - После снятия блокировки головной узел в очереди AQS будет активирован, поэтому блокировку получит поток 2.
- Звонки потока 2
signal
метод, на этот разCondition
В очереди ожидания есть только один узел потока 1, поэтому он удаляется и добавляется в очередь ожидания AQS. Обратите внимание, что в это время поток 1 не пробуждается, а только добавляется в очередь ожидания AQS. -
signal
После выполнения метода поток 2 вызываетunLock()
метод снятия блокировки. В это время, поскольку в AQS есть только поток 1, поток 1 пробуждается, и поток 1 возобновляет выполнение. так: Отправитьsignal
сигнал будет простоCondition
Потоки в очереди добавляются кAQS
в очереди ожидания. только для отправкиsignal
сигнальный вызов потокаreentrantLock.unlock()
Эти потоки не будут разбужены до тех пор, пока блокировка не будет снята.
Видно, что весь процесс сотрудничестваAQS
очередь ожидания иCondition
Очередь ожидания реализована путем перемещения вперед и назад,Condition
В качестве условного класса очень хорошо поддерживать очередь, ожидающую сигналов, и добавлять узлы вAQS
Операция пробуждения реализована в очереди ожидания.
Суть состояния ожидания уведомления см.:woo woo woo.cn blog on.com/she Eva/afraid/64…