существуетНебольшая мысль о синхронизации - далееВ этой статье мы знаем, что glibcpthread_cond_timedwait
Нижний слой реализован с помощью механизма linux futex.
Больше статей смотрите в личном блоге:GitHub.com/farmer Джон Брат…
Идеальным механизмом синхронизации должно быть использование атомарных инструкций в пользовательском режиме для решения проблемы, когда нет конфликта блокировок, и использование системного вызова, предоставленного ядром, для сна и пробуждения, когда ему нужно приостановить и подождать. Другими словами, можно ли приостановить процесс при сбое вращения в пользовательском режиме и разбудить его, когда удерживающий блокировку поток освободит блокировку? Если вы глубоко не задумывались об этой проблеме, вы, вероятно, считаете что-то вроде этого само собой разумеющимся (псевдокод):
void lock(int lockval) {
//trylock是用户级的自旋锁
while(!trylock(lockval)) {
wait();//释放cpu,并将当期线程加入等待队列,是系统调用
}
}
boolean trylock(int lockval){
int i=0;
//localval=1代表上锁成功
while(!compareAndSet(lockval,0,1)){
if(++i>10){
return false;
}
}
return true;
}
void unlock(int lockval) {
compareAndSet(lockval,1,0);
notify();
}
Проблема с приведенным выше кодом заключается в том, что между вызовами trylock и wait есть окно: Если попытка блокировки потока не удалась, и поток, удерживающий блокировку при вызове ожидания, освобождает блокировку, текущий поток все равно будет вызывать ожидание для ожидания, но после этого никто не будет разбудить поток.
Для решения вышеуказанных проблем ядро Linux вводит механизм фьютекса, который в основном включает два метода: ожидание и пробуждение:futex_wait
иfutex_wake
, который определяется следующим образом
//uaddr指向一个地址,val代表这个地址期待的值,当*uaddr==val时,才会进行wait
int futex_wait(int *uaddr, int val);
//唤醒n个在uaddr指向的锁变量上挂起等待的进程
int futex_wake(int *uaddr, int n);
Перед тем, как фьютекс фактически приостановит процесс, он проверит, равно ли значение адреса, на которое указывает addr, значению val.Если оно не равно, он немедленно вернется, и пользовательский режим продолжит блокировку. В противном случае текущий поток помещается в очередь и приостанавливается.
существуетНемного мыслей о синхронизации — часть 1В статье представлены предыстория и основные принципы работы фьютекса, те, кто не знаком с фьютексом, могут сначала прочитать ее.
В этой статье будет подробно проанализирована реализация фьютекса, чтобы читатели могли получить интуитивное представление о реализации блокировок нижнего уровня в сочетании с двумя предыдущими статьями (Немного мыслей о синхронизации — часть 1иНебольшая мысль о синхронизации - далее) может иметь полное представление о механизме синхронизации операционной системы.
Слово позже в этом процессе включает обычные процессы и потоки.
futex_wait
Прежде чем приступить к анализу исходного кода ниже, подумайте над вопросом: как убедиться, что значение val не было изменено другими процессами, когда процесс приостановлен?
Код находится в kernel/futex.c
static int futex_wait(u32 __user *uaddr, int fshared,
u32 val, ktime_t *abs_time, u32 bitset, int clockrt)
{
struct hrtimer_sleeper timeout, *to = NULL;
struct restart_block *restart;
struct futex_hash_bucket *hb;
struct futex_q q;
int ret;
...
//设置hrtimer定时任务:在一定时间(abs_time)后,如果进程还没被唤醒则唤醒wait的进程
if (abs_time) {
...
hrtimer_init_sleeper(to, current);
...
}
retry:
//该函数中判断uaddr指向的值是否等于val,以及一些初始化操作
ret = futex_wait_setup(uaddr, val, fshared, &q, &hb);
//如果val发生了改变,则直接返回
if (ret)
goto out;
//将当前进程状态改为TASK_INTERRUPTIBLE,并插入到futex等待队列,然后重新调度。
futex_wait_queue_me(hb, &q, to);
/* If we were woken (and unqueued), we succeeded, whatever. */
ret = 0;
//如果unqueue_me成功,则说明是超时触发(因为futex_wake唤醒时,会将该进程移出等待队列,所以这里会失败)
if (!unqueue_me(&q))
goto out_put_key;
ret = -ETIMEDOUT;
if (to && !to->task)
goto out_put_key;
/*
* We expect signal_pending(current), but we might be the
* victim of a spurious wakeup as well.
*/
if (!signal_pending(current)) {
put_futex_key(fshared, &q.key);
goto retry;
}
ret = -ERESTARTSYS;
if (!abs_time)
goto out_put_key;
...
out_put_key:
put_futex_key(fshared, &q.key);
out:
if (to) {
//取消定时任务
hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}
Перед блокировкой процесса текущий процесс будет вставлен в очередь ожидания.Следует отметить, что упомянутая здесь очередь ожидания на самом деле представляет собой структуру, аналогичную Java HashMap, которая является глобально уникальной.
struct futex_hash_bucket {
spinlock_t lock;
//双向链表
struct plist_head chain;
};
static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];
сконцентрируйсяfutex_wait_setup
и две функцииfutex_wait_queue_me
static int futex_wait_setup(u32 __user *uaddr, u32 val, int fshared,
struct futex_q *q, struct futex_hash_bucket **hb)
{
u32 uval;
int ret;
retry:
q->key = FUTEX_KEY_INIT;
//初始化futex_q
ret = get_futex_key(uaddr, fshared, &q->key, VERIFY_READ);
if (unlikely(ret != 0))
return ret;
retry_private:
//获得自旋锁
*hb = queue_lock(q);
//原子的将uaddr的值设置到uval中
ret = get_futex_value_locked(&uval, uaddr);
...
//如果当期uaddr指向的值不等于val,即说明其他进程修改了
//uaddr指向的值,等待条件不再成立,不用阻塞直接返回。
if (uval != val) {
//释放锁
queue_unlock(q, *hb);
ret = -EWOULDBLOCK;
}
...
return ret;
}
функцияfutex_wait_setup
В этом методе нужно сделать две основные вещи: одна — получить спин-блокировку, а вторая — определить, является ли *uaddr ожидаемым значением.
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
struct hrtimer_sleeper *timeout)
{
//设置进程状态为TASK_INTERRUPTIBLE,cpu调度时只会选择
//状态为TASK_RUNNING的进程
set_current_state(TASK_INTERRUPTIBLE);
//将当期进程(q封装)插入到等待队列中去,然后释放自旋锁
queue_me(q, hb);
//启动定时任务
if (timeout) {
hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
if (!hrtimer_active(&timeout->timer))
timeout->task = NULL;
}
/*
* If we have been removed from the hash list, then another task
* has tried to wake us, and we can skip the call to schedule().
*/
if (likely(!plist_node_empty(&q->list))) {
//如果没有设置过期时间 || 设置了过期时间且还没过期
if (!timeout || timeout->task)
//系统重新进行进程调度,这个时候cpu会去执行其他进程,该进程会阻塞在这里
schedule();
}
//走到这里说明又被cpu选中运行了
__set_current_state(TASK_RUNNING);
}
futex_wait_queue_me
В основном сделайте несколько вещей:
- Вставить текущий процесс в очередь ожидания
- Начать запланированное задание
- перепланировать процесс
Как гарантировать атомарность между условием и ожиданием
существуетfutex_wait_setup
К методу будет добавлена спин-блокировка; вfutex_wait_queue_me
установить статус наTASK_INTERRUPTIBLE
,перечислитьqueue_me
Вставьте текущий поток в очередь ожидания перед снятием спин-блокировки. То есть процесс проверки значения uaddr помещается в ту же критическую секцию, что и процесс приостановки процесса. Когда спин-блокировка снимается, уже не имеет значения изменение значения адреса addr, потому что текущий процесс добавлен в очередь ожидания и может быть разбужен пробуждением, так что проблем ни у кого не будет проснуться, как упоминалось в начале этой статьи.
резюме futex_wait
Подводить итогиfutex_wait
Процесс:
- добавить спин-блокировку
- Проверить, равен ли *uaddr val, если нет, немедленно вернуться
- Установите состояние процесса на
TASK_INTERRUPTIBLE
- Вставить текущий процесс в очередь ожидания
- освободить спин-блокировку
- Создайте задачу на время: разбудите процесс, если он не был разбужен по прошествии определенного периода времени.
- Приостановить текущий процесс
futex_wake
futex_wake
static int futex_wake(u32 __user *uaddr, int fshared, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
struct plist_head *head;
union futex_key key = FUTEX_KEY_INIT;
int ret;
...
//根据uaddr的值填充&key的内容
ret = get_futex_key(uaddr, fshared, &key, VERIFY_READ);
if (unlikely(ret != 0))
goto out;
//根据&key获得对应uaddr所在的futex_hash_bucket
hb = hash_futex(&key);
//对该hb加自旋锁
spin_lock(&hb->lock);
head = &hb->chain;
//遍历该hb的链表,注意链表中存储的节点是plist_node类型,而而这里的this却是futex_q类型,这种类型转换是通过c中的container_of机制实现的
plist_for_each_entry_safe(this, next, head, list) {
if (match_futex (&this->key, &key)) {
...
//唤醒对应进程
wake_futex(this);
if (++ret >= nr_wake)
break;
}
}
//释放自旋锁
spin_unlock(&hb->lock);
put_futex_key(fshared, &key);
out:
return ret;
}
futex_wake
Процесс выглядит следующим образом:
- Найдите соответствующий uaddr
futex_hash_bucket
, т.е. hb в коде - Добавить спин-блокировку в hb
- Пройдите по связанному списку fb и найдите узел, соответствующий uaddr
- перечислить
wake_futex
вызвать процесс ожидания - освободить спин-блокировку
wake_futex
Установите статус процесса формулировки наTASK_RUNNING
И добавить его в список системного расписания, а заодно убрать процесс из очереди ожидания фьютекса, конкретный код анализироваться не будет, а кому интересно, могут изучить его самостоятельно.
End
Нижний уровень ReentrantLock, Object.wait и Thread.sleep в Java использует фьютекс для синхронизации потоков.Понимание реализации фьютекса может помочь вам лучше понять и использовать эти механизмы синхронизации верхнего уровня. Кроме того, из-за ограниченного места и энергии отсутствует конкретный анализ соответствующего содержания, связанного с планированием процессов, но это не мешает пониманию содержания статьи.