Механизм синхронизации на уровне ядра Linux --futex

Java задняя часть GitHub Linux

существуетНебольшая мысль о синхронизации - далееВ этой статье мы знаем, что glibcpthread_cond_timedwaitНижний слой реализован с помощью механизма linux futex.

Больше статей смотрите в личном блоге:GitHub.com/farmer Джон Брат…

Идеальным механизмом синхронизации должно быть использование атомарных инструкций в пользовательском режиме для решения проблемы, когда нет конфликта блокировок, и использование системного вызова, предоставленного ядром, для сна и пробуждения, когда ему нужно приостановить и подождать. Другими словами, можно ли приостановить процесс при сбое вращения в пользовательском режиме и разбудить его, когда удерживающий блокировку поток освободит блокировку? Если вы глубоко не задумывались об этой проблеме, вы, вероятно, считаете что-то вроде этого само собой разумеющимся (псевдокод):

void lock(int lockval) {
	//trylock是用户级的自旋锁
	while(!trylock(lockval)) {
		wait();//释放cpu,并将当期线程加入等待队列,是系统调用
	}
}

boolean trylock(int lockval){
	int i=0; 
	//localval=1代表上锁成功
	while(!compareAndSet(lockval,0,1)){
		if(++i>10){
			return false;
		}
	}
	return true;
}

void unlock(int lockval) {
	 compareAndSet(lockval,1,0);
	 notify();
}

Проблема с приведенным выше кодом заключается в том, что между вызовами trylock и wait есть окно: Если попытка блокировки потока не удалась, и поток, удерживающий блокировку при вызове ожидания, освобождает блокировку, текущий поток все равно будет вызывать ожидание для ожидания, но после этого никто не будет разбудить поток.

Для решения вышеуказанных проблем ядро ​​Linux вводит механизм фьютекса, который в основном включает два метода: ожидание и пробуждение:futex_waitиfutex_wake, который определяется следующим образом

//uaddr指向一个地址,val代表这个地址期待的值,当*uaddr==val时,才会进行wait
int futex_wait(int *uaddr, int val);
//唤醒n个在uaddr指向的锁变量上挂起等待的进程
int futex_wake(int *uaddr, int n);

Перед тем, как фьютекс фактически приостановит процесс, он проверит, равно ли значение адреса, на которое указывает addr, значению val.Если оно не равно, он немедленно вернется, и пользовательский режим продолжит блокировку. В противном случае текущий поток помещается в очередь и приостанавливается.

существуетНемного мыслей о синхронизации — часть 1В статье представлены предыстория и основные принципы работы фьютекса, те, кто не знаком с фьютексом, могут сначала прочитать ее.

В этой статье будет подробно проанализирована реализация фьютекса, чтобы читатели могли получить интуитивное представление о реализации блокировок нижнего уровня в сочетании с двумя предыдущими статьями (Немного мыслей о синхронизации — часть 1иНебольшая мысль о синхронизации - далее) может иметь полное представление о механизме синхронизации операционной системы.

Слово позже в этом процессе включает обычные процессы и потоки.

futex_wait

Прежде чем приступить к анализу исходного кода ниже, подумайте над вопросом: как убедиться, что значение val не было изменено другими процессами, когда процесс приостановлен?

Код находится в kernel/futex.c

static int futex_wait(u32 __user *uaddr, int fshared,
		      u32 val, ktime_t *abs_time, u32 bitset, int clockrt)
{
	struct hrtimer_sleeper timeout, *to = NULL;
	struct restart_block *restart;
	struct futex_hash_bucket *hb;
	struct futex_q q;
	int ret;

	...

	//设置hrtimer定时任务:在一定时间(abs_time)后,如果进程还没被唤醒则唤醒wait的进程
	if (abs_time) {
	    ...
		hrtimer_init_sleeper(to, current);
		...
	}

retry:
	//该函数中判断uaddr指向的值是否等于val,以及一些初始化操作
	ret = futex_wait_setup(uaddr, val, fshared, &q, &hb);
	//如果val发生了改变,则直接返回
	if (ret)
		goto out;

	//将当前进程状态改为TASK_INTERRUPTIBLE,并插入到futex等待队列,然后重新调度。
	futex_wait_queue_me(hb, &q, to);

	/* If we were woken (and unqueued), we succeeded, whatever. */
	ret = 0;
	//如果unqueue_me成功,则说明是超时触发(因为futex_wake唤醒时,会将该进程移出等待队列,所以这里会失败)
	if (!unqueue_me(&q))
		goto out_put_key;
	ret = -ETIMEDOUT;
	if (to && !to->task)
		goto out_put_key;

	/*
	 * We expect signal_pending(current), but we might be the
	 * victim of a spurious wakeup as well.
	 */
	if (!signal_pending(current)) {
		put_futex_key(fshared, &q.key);
		goto retry;
	}

	ret = -ERESTARTSYS;
	if (!abs_time)
		goto out_put_key;

	...

out_put_key:
	put_futex_key(fshared, &q.key);
out:
	if (to) {
		//取消定时任务
		hrtimer_cancel(&to->timer);
		destroy_hrtimer_on_stack(&to->timer);
	}
	return ret;
}

Перед блокировкой процесса текущий процесс будет вставлен в очередь ожидания.Следует отметить, что упомянутая здесь очередь ожидания на самом деле представляет собой структуру, аналогичную Java HashMap, которая является глобально уникальной.

struct futex_hash_bucket {
	spinlock_t lock;
	//双向链表
	struct plist_head chain;
};

static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];

сконцентрируйсяfutex_wait_setupи две функцииfutex_wait_queue_me

static int futex_wait_setup(u32 __user *uaddr, u32 val, int fshared,
			   struct futex_q *q, struct futex_hash_bucket **hb)
{
	u32 uval;
	int ret;
retry:
	q->key = FUTEX_KEY_INIT;
	//初始化futex_q
	ret = get_futex_key(uaddr, fshared, &q->key, VERIFY_READ);
	if (unlikely(ret != 0))
		return ret;

retry_private:
	//获得自旋锁
	*hb = queue_lock(q);
	//原子的将uaddr的值设置到uval中
	ret = get_futex_value_locked(&uval, uaddr);

   ... 
	//如果当期uaddr指向的值不等于val,即说明其他进程修改了
	//uaddr指向的值,等待条件不再成立,不用阻塞直接返回。
	if (uval != val) {
		//释放锁
		queue_unlock(q, *hb);
		ret = -EWOULDBLOCK;
	}

   ...
	return ret;
}

функцияfutex_wait_setupВ этом методе нужно сделать две основные вещи: одна — получить спин-блокировку, а вторая — определить, является ли *uaddr ожидаемым значением.

static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
				struct hrtimer_sleeper *timeout)
{
	//设置进程状态为TASK_INTERRUPTIBLE,cpu调度时只会选择
	//状态为TASK_RUNNING的进程
	set_current_state(TASK_INTERRUPTIBLE);
	//将当期进程(q封装)插入到等待队列中去,然后释放自旋锁
	queue_me(q, hb);

	//启动定时任务
	if (timeout) {
		hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
		if (!hrtimer_active(&timeout->timer))
			timeout->task = NULL;
	}

	/*
	 * If we have been removed from the hash list, then another task
	 * has tried to wake us, and we can skip the call to schedule().
	 */
	if (likely(!plist_node_empty(&q->list))) {
		 
		 //如果没有设置过期时间 || 设置了过期时间且还没过期
		if (!timeout || timeout->task)
			//系统重新进行进程调度,这个时候cpu会去执行其他进程,该进程会阻塞在这里
			schedule();
	}
	//走到这里说明又被cpu选中运行了
	__set_current_state(TASK_RUNNING);
}

futex_wait_queue_meВ основном сделайте несколько вещей:

  1. Вставить текущий процесс в очередь ожидания
  2. Начать запланированное задание
  3. перепланировать процесс

Как гарантировать атомарность между условием и ожиданием

существуетfutex_wait_setupК методу будет добавлена ​​спин-блокировка; вfutex_wait_queue_meустановить статус наTASK_INTERRUPTIBLE,перечислитьqueue_meВставьте текущий поток в очередь ожидания перед снятием спин-блокировки. То есть процесс проверки значения uaddr помещается в ту же критическую секцию, что и процесс приостановки процесса. Когда спин-блокировка снимается, уже не имеет значения изменение значения адреса addr, потому что текущий процесс добавлен в очередь ожидания и может быть разбужен пробуждением, так что проблем ни у кого не будет проснуться, как упоминалось в начале этой статьи.

резюме futex_wait

Подводить итогиfutex_waitПроцесс:

  1. добавить спин-блокировку
  2. Проверить, равен ли *uaddr val, если нет, немедленно вернуться
  3. Установите состояние процесса наTASK_INTERRUPTIBLE
  4. Вставить текущий процесс в очередь ожидания
  5. освободить спин-блокировку
  6. Создайте задачу на время: разбудите процесс, если он не был разбужен по прошествии определенного периода времени.
  7. Приостановить текущий процесс

futex_wake

futex_wake

static int futex_wake(u32 __user *uaddr, int fshared, int nr_wake, u32 bitset)
{
	struct futex_hash_bucket *hb;
	struct futex_q *this, *next;
	struct plist_head *head;
	union futex_key key = FUTEX_KEY_INIT;
	int ret;

	...
	//根据uaddr的值填充&key的内容
	ret = get_futex_key(uaddr, fshared, &key, VERIFY_READ);
	if (unlikely(ret != 0))
		goto out;
	//根据&key获得对应uaddr所在的futex_hash_bucket
	hb = hash_futex(&key);
	//对该hb加自旋锁
	spin_lock(&hb->lock);
	head = &hb->chain;
	//遍历该hb的链表,注意链表中存储的节点是plist_node类型,而而这里的this却是futex_q类型,这种类型转换是通过c中的container_of机制实现的
	plist_for_each_entry_safe(this, next, head, list) {
		if (match_futex (&this->key, &key)) {
			...
			//唤醒对应进程
			wake_futex(this);
			if (++ret >= nr_wake)
				break;
		}
	}
	//释放自旋锁
	spin_unlock(&hb->lock);
	put_futex_key(fshared, &key);
out:
	return ret;
}

futex_wakeПроцесс выглядит следующим образом:

  1. Найдите соответствующий uaddrfutex_hash_bucket, т.е. hb в коде
  2. Добавить спин-блокировку в hb
  3. Пройдите по связанному списку fb и найдите узел, соответствующий uaddr
  4. перечислитьwake_futexвызвать процесс ожидания
  5. освободить спин-блокировку

wake_futexУстановите статус процесса формулировки наTASK_RUNNINGИ добавить его в список системного расписания, а заодно убрать процесс из очереди ожидания фьютекса, конкретный код анализироваться не будет, а кому интересно, могут изучить его самостоятельно.

End

Нижний уровень ReentrantLock, Object.wait и Thread.sleep в Java использует фьютекс для синхронизации потоков.Понимание реализации фьютекса может помочь вам лучше понять и использовать эти механизмы синхронизации верхнего уровня. Кроме того, из-за ограниченного места и энергии отсутствует конкретный анализ соответствующего содержания, связанного с планированием процессов, но это не мешает пониманию содержания статьи.