представлять
AQS — это аббревиатура AbstractQueuedSynchronizer в java.util.concurrent.locks.Это серия фреймворков для создания многопоточных блокировок и синхронизаторов с помощью исходного кода Java.Он используется для синхронизации между многопоточными приложениями Java.Его классы и классы структурная схема выглядит следующим образом:
принцип
Очередь FIFO, реализованная с использованием двусвязного списка Node, поддерживается в классе AQS для сохранения ожидающих потоков, а состояние типа int используется для представления состояния.При его использовании он управляется путем наследования класса AQS и реализации его получения. и методы освобождения состояния для достижения синхронизации потоков.
Взяв в качестве примера ReentrantLock, состояние инициализируется равным 0, что указывает на разблокированное состояние. Когда поток A блокирует(), он вызывает функцию tryAcquire(), чтобы монополизировать блокировку и состояние+1. После этого другие потоки будут терпеть неудачу, когда они снова попытаются выполнить попытку Acquire().Пока поток A не разблокирует () до состояния=0 (то есть не снимет блокировку), у других потоков нет шансов получить блокировку. Конечно, прежде чем снять блокировку, поток A может повторно ее получить (состояние будет накапливаться), что является концепцией повторного входа. Но обратите внимание на то, сколько раз вам нужно его отпустить, чтобы убедиться, что состояние может вернуться в нулевое состояние.
Взяв в качестве примера CountDownLatch, задача делится на N подпотоков для выполнения, и состояние также инициализируется равным N (обратите внимание, что N должно соответствовать количеству потоков). N подпотоков выполняются параллельно.После выполнения каждого подпотока countDown() выполняется один раз, и состояние уменьшится на 1 в CAS (Сравнить и поменять местами). После выполнения всех дочерних потоков (т. е. состояние = 0) основным вызывающим потоком будет unpark(), а затем основной вызывающий поток вернется из функции await() и продолжит остальную часть действия.
Использование различных компонентов
CountDownLatch
Он в основном используется для ожидания, пока поток будет ждать выполнения других потоков, прежде чем выполнять его.Реализация заключается в том, чтобы определить, уменьшен ли счетчик до 0. После выполнения каждого другого потока вызовите метод countDown() для уменьшения счетчика. на единицу, и ожидающий поток вызывает метод await() до тех пор, пока счетчик не достигнет 1.
Основной поток демонстрации ожидает завершения выполнения 200 потоков перед выполнением:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* \* Created with IntelliJ IDEA.
* \* @author: guohezuzi
* \* Date: 2019-06-08
* \* Time: 下午4:14
* \* Description: ContDownLatch用法:通过引入CountDownLatch计数器,来等待其他线程执行完毕
* \
*/
@Slf4j
public class CountDownLatchExample {
private static int threadCount = 200;
public static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}",threadNum);
Thread.sleep(100);
}
public static void main(String[] args) throws InterruptedException {
ExecutorService pool= Executors.newCachedThreadPool();
final CountDownLatch countDownLatch=new CountDownLatch(200);
for (int i = 0; i < threadCount; i++) {
final int threadNum=i;
pool.execute(()->{
try {
Thread.sleep(1);
test(threadNum);
}catch (Exception e){
log.error("exception",e);
}finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
pool.shutdown();
}
}
CyclicBarrier
Он используется для ожидания готовности нескольких потоков, прежде чем продолжить.После того, как каждый поток готов, счетчик увеличивается на 1, и все начинаются после прибавления к указанному значению.
demo 20 потоков каждый раз ждут 5 потоков
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
/**
* \* Created with IntelliJ IDEA.
* \* @author: guohezuzi
* \* Date: 2019-06-08
* \* Time: 下午5:20
* \* Description:
* 用于等待多个线程都准备好
* 每一个线程准备好后 计数器加1 加到指定值后全部开始
* \
*/
public class CyclicBarrierExample {
private static final Logger logger = LoggerFactory.getLogger(CountDownLatchExample.class);
private static CyclicBarrier cyclicBarrier=new CyclicBarrier(5);
public static void race(int threadNum) throws InterruptedException{
Thread.sleep(1000);
logger.info("{} is ready",threadNum);
try {
//等待指定数量的其他线程执行 无参一直等待不抛异常 有参数表示等待指定时间若数量还未等到抛出异常
cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
} catch (BrokenBarrierException | TimeoutException e) {
logger.error("exception",e);
}
logger.info("{} is continue");
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService= Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
final int threadNum=i;
executorService.execute(() -> {
try {
race(threadNum);
} catch (InterruptedException e) {
logger.error("exception",e);
}
});
}
executorService.shutdown();
}
}
Semaphore
Семафор перевода на английский язык используется для управления количеством ресурсов, к которым можно получить доступ одновременно. Например, количество одновременных ресурсов базы данных, которыми можно управлять, равно 20.
demo:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
/**
* \* Created with IntelliJ IDEA.
* \* @author: guohezuzi
* \* Date: 2019-06-08
* \* Time: 下午3:39
* \* Description: 信号量学习例子 控制某个资源同时可被访问的个数 如控制数据库资源可以同时并发数量为20
* \
*/
public class SemaphoreExample {
private static final Logger logger = LoggerFactory.getLogger(CountDownLatchExample.class);
private static int threadCount = 200;
public static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
logger.info("{}",threadNum);
Thread.sleep(1000);
}
public static void main(String[] args) throws InterruptedException {
ExecutorService pool= Executors.newCachedThreadPool();
//定义允许并发的信号量m
final Semaphore semaphore=new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
final int threadNum=i;
//该线程的最大并发数为m/n
pool.execute(()->{
try {
//获取n个信号量 无参为一个
semaphore.acquire(4);
test(threadNum);
//释放n个信号量 无参为一个
semaphore.release(4);
}catch (Exception e){
logger.error("exception",e);
}
});
}
pool.shutdown();
}
}
ReentrantReadWriteLock
Блокировки чтения-записи используются для блокировки/разблокировки до и после синхронизации ресурсов. Когда один поток получает блокировку чтения, другие потоки могут продолжать получать блокировку чтения. Когда один поток получает блокировку записи, другие потоки должны ждать Таким образом, это может привести к блокировке записи.Нехватка блокировки означает, что блокировка записи никогда не была получена.
демонстрация: частично потокобезопасная карта на основе блокировки aqs
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* \* Created with IntelliJ IDEA.
* \* @author: guohezuzi
* \* Date: 2019-06-08
* \* Time: 下午11:58
* \* Description: 读写锁 当一个线程获取读锁后其他线程可以继续获取读锁 当一个线程获取写锁后其他线程都需等待
* \
*/
public class ReentrantReadWriteLockExample {
final Map map = new TreeMap<>();
private final static ReentrantLock lock = new ReentrantLock();
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set getAllkeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data vlaue) {
writeLock.lock();
try {
return map.put(key, vlaue);
} finally {
writeLock.unlock();
}
}
class Data {
}
}
StampLock
Подобно функции и использованию блокировки чтения-записи, но со следующими двумя отличиями.
-
Каждый раз, когда вы получаете блокировку, вы получаете длинный штамп в качестве возвращаемого значения, и вам нужно вернуть его, чтобы разблокировать.
-
Существует оптимистичная операция чтения, которая подходит для чтения больше и меньше записи.Когда ресурс заблокирован блокировкой чтения, операция чтения будет выполняться в зависимости от того, был ли ресурс изменен, вместо запрещения операции чтения.
demo:
import java.util.concurrent.locks.StampedLock;
/**
* \* Created with IntelliJ IDEA.
* \* @author: guohezuzi
* \* Date: 2019-06-09
* \* Time: 下午1:08
* \* Description:
* 使用是每次获取锁会得到一个long类型的stamp所为返回值,解锁是需要将其回传
* 该类有 写 读 乐观读:指当资源被读锁锁定时,会根据资源是否被变更,进行读取操作
*/
public class StampLockExample {
private int count = 0;
private final StampedLock lock = new StampedLock();
class AddHundredNum extends Thread {
@Override
public void run() {
// synchronized (addHundredNum.class) {
long stamp = lock.writeLock();
try {
for (int i = 0; i < 1000; i++) {
count++;
}
} finally {
lock.unlock(stamp);
}
// }
}
}
public void test() throws InterruptedException {
StampLockExample.AddHundredNum[] addHundredNums = new StampLockExample.AddHundredNum[100];
for (int i = 0; i < addHundredNums.length; i++) {
addHundredNums[i] = new StampLockExample.AddHundredNum();
}
for (StampLockExample.AddHundredNum addHundredNum : addHundredNums) {
addHundredNum.start();
}
for (StampLockExample.AddHundredNum addHundredNum : addHundredNums) {
addHundredNum.join();
}
}
public static void main(String[] args) throws Exception {
StampLockExample example = new StampLockExample();
example.test();
System.out.println(example.count);
}
}
Condition
С механизмом прерывания/ожидания потока, реализованным блокировкой AQS, ожидающий поток перемещается в очередь, поддерживаемую условием, а прерывание/ожидание управляется условием.
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* \* Created with IntelliJ IDEA.
* \* @author: guohezuzi
* \* Date: 2019-06-09
* \* Time: 下午1:26
* \* Description:
* \
*/
@Slf4j
public class ConditionExample {
public static void main(String[] args){
final ReentrantLock lock=new ReentrantLock();
Condition condition=lock.newCondition();
new Thread(()->{
lock.lock();
log.info("wait signal");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal");
lock.unlock();
}).start();
new Thread(() -> {
lock.lock();
log.info("get lock");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();
log.info("send signal ~");
lock.unlock();
}).start();
}
}