LockSupport — это базовый примитив блокировки потока, используемый для создания блокировок.Например, метод реализации приостановки потока в AQS — припарковать, а соответствующее пробуждение — разпарковать. Те, которые используются в JDK, следующие:
LockSupport предоставил лицензию, если лицензия существует, поток, вызывающийpark
В это время он вернется сразу, и лицензия также будет израсходована в это время.Если лицензии нет, она будет заблокирована. При вызове unpark лицензия будет доступна, если сама лицензия недоступна.
Лицензия только одна, не накопительная
отслеживание источника в парке
Форма декларации парка имеет следующие два блока
Некоторые из них имеют дополнительный параметр Object, который действует как блокиратор, а другие — нет. Преимущество блокировщика в том, что можно узнать причину паркинга при диагностике проблемыРекомендуется использовать работу парка с объектом
функция парковки
park используется для приостановки текущего потока. Если лицензия доступна, он немедленно вернется и использует лицензию.
- park(Object): Условие восстановления: 1: поток вызвал unpark; 2: другие потоки прервали поток; 3: произошло что-то неожиданное
- parkNanos (блокировщик объектов, long nanos): условие восстановления: 1: поток вызвал unpark; 2: другие потоки прервали поток; 3: произошла непредвиденная вещь; 4: время истечения срока действия истекло.
- parkUntil (блокировщик объектов, длительный срок): условие восстановления: 1: поток вызвал unpark; 2: другие потоки прервали поток; 3: произошло что-то непредвиденное; 4: наступила указанная deadLine
Возьмите исходный код парка в качестве примера
public static void park(Object blocker) { //获取当前线程 Thread t = Thread.currentThread(); //记录当前线程阻塞的原因,底层就是unsafe.putObject,就是把对象存储起来 setBlocker(t, blocker); //执行park unsafe.park(false, 0L); //线程恢复后,去掉阻塞原因 setBlocker(t, null); }
Из исходного кода видно, что реальная реализация небезопасна.
unsafe.park
Основная реализация выглядит следующим образом
JavaThread* thread=JavaThread::thread_from_jni_environment(env);
...
thread->parker()->park(isAbsolute != 0, time);
Это получить объект паркера потока Java, а затем выполнить его метод парка. Определение Паркера выглядит следующим образом.
class Parker : public os::PlatformParker {
private:
//表示许可
volatile int _counter ;
Parker * FreeNext ;
JavaThread * AssociatedWith ; // Current association
public:
Parker() : PlatformParker() {
//初始化_counter
_counter = 0 ;
FreeNext = NULL ;
AssociatedWith = NULL ;
}
protected:
~Parker() { ShouldNotReachHere(); }
public:
void park(bool isAbsolute, jlong time);
void unpark();
// Lifecycle operators
static Parker * Allocate (JavaThread * t) ;
static void Release (Parker * e) ;
private:
static Parker * volatile FreeList ;
static volatile int ListLock ;
};
Он наследует os::PlatformParker и имеет встроенный _counter of volatitle. PlatformParker имеет разные реализации в разных операционных системах, например Linux.
class PlatformParker : public CHeapObj {
protected:
//互斥变量类型
pthread_mutex_t _mutex [1] ;
//条件变量类型
pthread_cond_t _cond [1] ;
public:
~PlatformParker() { guarantee (0, "invariant") ; }
public:
PlatformParker() {
int status;
//初始化条件变量,使用 pthread_cond_t之前必须先执行初始化
status = pthread_cond_init (_cond, NULL);
assert_status(status == 0, status, "cond_init”);
// 初始化互斥变量,使用 pthread_mutex_t之前必须先执行初始化
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
}
}
Приведенный выше код используется интерфейсом потока POSIX, поэтому pthread ссылается на posixThread.
Паркер реализован следующим образом
void Parker::park(bool isAbsolute, jlong time) {
if (_counter > 0) {
//已经有许可了,用掉当前许可
_counter = 0 ;
//使用内存屏障,确保 _counter赋值为0(写入操作)能够被内存屏障之后的读操作获取内存屏障事前的结果,也就是能够正确的读到0
OrderAccess::fence();
//立即返回
return ;
}
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
if (Thread::is_interrupted(thread, false)) {
// 线程执行了中断,返回
return;
}
if (time < 0 || (isAbsolute && time == 0) ) {
//时间到了,或者是代表绝对时间,同时绝对时间是0(此时也是时间到了),直接返回,java中的parkUtil传的就是绝对时间,其它都不是
return;
}
if (time > 0) {
//传入了时间参数,将其存入absTime,并解析成absTime->tv_sec(秒)和absTime->tv_nsec(纳秒)存储起来,存的是绝对时间
unpackTime(&absTime, isAbsolute, time);
}
//进入safepoint region,更改线程为阻塞状态
ThreadBlockInVM tbivm(jt);
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
//如果线程被中断,或者是在尝试给互斥变量加锁的过程中,加锁失败,比如被其它线程锁住了,直接返回
return;
}
//这里表示线程互斥变量锁成功了
int status ;
if (_counter > 0) {
// 有许可了,返回
_counter = 0;
//对互斥变量解锁
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
OrderAccess::fence();
return;
}
#ifdef ASSERT
// Don't catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)
//debug用
sigset_t oldsigs;
sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
//将java线程所拥有的操作系统线程设置成 CONDVAR_WAIT状态 ,表示在等待某个条件的发生
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
//将java的_suspend_equivalent参数设置为true
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
if (time == 0) {
//把调用线程放到等待条件的线程列表上,然后对互斥变量解锁,(这两是原子操作),这个时候线程进入等待,当它返回时,互斥变量再次被锁住。
//成功返回0,否则返回错误编号
status = pthread_cond_wait (_cond, _mutex) ;
} else {
//同pthread_cond_wait,只是多了一个超时,如果超时还没有条件出现,那么重新获取胡吃两然后返回错误码 ETIMEDOUT
status = os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
//WorkAroundNPTLTimedWaitHang 是JVM的运行参数,默认为1
//去除初始化
pthread_cond_destroy (_cond) ;
//重新初始化
pthread_cond_init (_cond, NULL);
}
}
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");
#ifdef ASSERT
pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
//等待结束后,许可被消耗,改为0 _counter = 0 ;
//释放互斥量的锁
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
//加入内存屏障指令
OrderAccess::fence();
}
Как видно из реализации парка
- Независимо от того, какова ситуация возврата, сам метод парковки не будет информировать вызывающую сторону о причине возврата, поэтому при выполнении вызова он обычно оценивает возвращенную сцену и выполняет различную обработку в зависимости от сцены.
- Ожидание и приостановка, пробуждение и т. д. потоков — это используемые API потоков POSIX.
- 'S _count переменная разрешения на использование парка, достигнутая атомом, при потреблении _count равна 0, пока лицензия возвращается немедленно
OrderAccess::fence();
Принцип реализации в linux следующий
inline void OrderAccess::fence() {
if (os::is_MP()) {
#ifdef AMD64
// 没有使用mfence,因为mfence有时候性能差于使用 locked addl
__asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
#else __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
#endif }
}
ThreadBlockInVM tbivm(jt)
Это относится к синтаксису создания переменной в C++, то есть к вызову конструктора для создания новой переменной, имя переменной — tbivm, а параметр — jt. Класс реализован как
class ThreadBlockInVM : public ThreadStateTransition {
public:
ThreadBlockInVM(JavaThread *thread)
: ThreadStateTransition(thread) {
// Once we are blocked vm expects stack to be walkable
thread->frame_anchor()->make_walkable(thread);
//把线程由运行状态转成阻塞状态
trans_and_fence(_thread_in_vm, _thread_blocked);
}
...
};
_thread_in_vm указывает, что поток в настоящее время выполняется на виртуальной машине, а _thread_blocked указывает, что поток в настоящее время заблокирован.globalDefinitions.hpp
перечисление определено в
//这个枚举是用来追踪线程在代码的那一块执行,用来给 safepoint code使用,有4种重要的类型,_thread_new/_thread_in_native/_thread_in_vm/_thread_in_Java。形如xxx_trans的状态都是中间状态,表示线程正在由一种状态变成另一种状态,这种方式使得 safepoint code在处理线程状态时,不需要对线程进行挂起,使得safe point code运行更快,而给定一个状态,通过+1就可以得到他的转换状态
enum JavaThreadState {
_thread_uninitialized = 0, // should never happen (missing initialization)
_thread_new = 2, // just starting up, i.e., in process of being initialized
_thread_new_trans = 3, // corresponding transition state (not used, included for completeness)
_thread_in_native = 4, // running in native code . This is a safepoint region, since all oops will be in jobject handles
_thread_in_native_trans = 5, // corresponding transition state
_thread_in_vm = 6, // running in VM
_thread_in_vm_trans = 7, // corresponding transition state
_thread_in_Java = 8, // Executing either interpreted or compiled Java code running in Java or in stub code
_thread_in_Java_trans = 9, // corresponding transition state (not used, included for completeness)
_thread_blocked = 10, // blocked in vm
_thread_blocked_trans = 11, // corresponding transition state
_thread_max_state = 12 // maximum thread state+1 - used for statistics allocation
};
Trans_and_fence определяется в родительском классе ThreadStateTransition следующим образом.
void trans_and_fence(JavaThreadState from, JavaThreadState to) { transition_and_fence(_thread, from, to);} //_thread即构造函数传进来de thread
// transition_and_fence must be used on any thread state transition
// where there might not be a Java call stub on the stack, in
// particular on Windows where the Structured Exception Handler is
// set up in the call stub. os::write_memory_serialize_page() can
// fault and we can't recover from it on Windows without a SEH in
// place.
//transition_and_fence方法必须在任何线程状态转换的时候使用
static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) {
assert(thread->thread_state() == from, "coming from wrong thread state");
assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states");
//标识线程转换中
thread->set_thread_state((JavaThreadState)(from + 1));
// 设置内存屏障,确保新的状态能够被VM 线程看到
if (os::is_MP()) {
if (UseMembar) {
// Force a fence between the write above and read below
OrderAccess::fence();
} else {
// Must use this rather than serialization page in particular on Windows
InterfaceSupport::serialize_memory(thread);
}
}
if (SafepointSynchronize::do_call_back()) {
SafepointSynchronize::block(thread);
}
//线程状态转换成最终的状态,对待这里的场景就是阻塞
thread->set_thread_state(to);
CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();)
}
Общее значение состояния потока операционной системы
Примерное значение состояния потока операционной системы задается в osThread, которое само зависит от платформы.
enum ThreadState {
ALLOCATED, // Memory has been allocated but not initialized
INITIALIZED, // The thread has been initialized but yet started
RUNNABLE, // Has been started and is runnable, but not necessarily running
MONITOR_WAIT, // Waiting on a contended monitor lock
CONDVAR_WAIT, // Waiting on a condition variable
OBJECT_WAIT, // Waiting on an Object.wait() call
BREAKPOINTED, // Suspended at breakpoint
SLEEPING, // Thread.sleep()
ZOMBIE // All done, but not reclaimed yet
};
отменить отслеживание источника
Реализация выглядит следующим образом
void Parker::unpark() {
int s, status ;
//给互斥量加锁,如果互斥量已经上锁,则阻塞到互斥量被解锁
//park进入wait时,_mutex会被释放
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
//存储旧的_counter
s = _counter;
//许可改为1,每次调用都设置成发放许可
_counter = 1;
if (s < 1) {
//之前没有许可
if (WorkAroundNPTLTimedWaitHang) {
//默认执行 ,释放信号,表明条件已经满足,将唤醒等待的线程
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
//释放锁
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
}
} else {
//一直有许可,释放掉自己加的锁,有许可park本身就返回了
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
Из исходного кода видно, что unpark сам выдает лицензии и уведомляет ожидающие потоки о том, что ожидание может быть завершено.
Суммировать
- park/unpark может просыпаться и точно ждать потоков.
- Реализация Linux достигается за счет ожидания, пробуждения, взаимного исключения и состояния API потока POSIX.
- Во время процесса исполнения Park предпочитает проверять, есть ли лицензия. Если есть лицензия, она будет возвращена немедленно, и каждый раз, когда нетронут лицензию на да, что означает, что вы можете выполнить unpark, а затем немедленно выполнить парк. Он подходит для производителя быстро, а потребитель не завершил сценуСправочный адрес