# Linux Futex 快速用户空间互斥体原理
在 pthread 编程中,我们通常使用如下代码来完成互斥量和条件变量的使用,那么本文将详细说明 Linux 提供的 futex 与pthread 线程库的搭配使用原理。
pthread_mutex_t _mutex[1];
pthread_mutex_init (_mutex, NULL); // 初始化互斥体
pthread_mutex_lock(_mutex); // 互斥体上锁
// doSomething
pthread_mutex_unlock(_mutex); // 对互斥体解锁
2
3
4
5
# pthread_mutex_init 函数
该函数用于初始化 pthread_mutex_t 结构。注意,mutex 可以用于 进程之间共享、Robust 健壮锁(线程或者进程挂掉自动释放)、可重入锁、自适应锁等等,但是我们这里讨论简单的线程互斥锁(不可重入),此时较为简单,因为 线程之间 共享地址和数据,所以这里简单初始化即可,读者不必深入研究。
static const struct pthread_mutexattr default_mutexattr = {
// 默认互斥体,表示在线程中共享使用,而不是在进程之间使用
.mutexkind = PTHREAD_MUTEX_NORMAL
};
// 互斥体锁类型枚举
enum {
PTHREAD_MUTEX_TIMED_NP,
PTHREAD_MUTEX_NORMAL = PTHREAD_MUTEX_TIMED_NP
}
int __pthread_mutex_init (mutex, mutexattr)
pthread_mutex_t *mutex;
const pthread_mutexattr_t *mutexattr;
{
const struct pthread_mutexattr *imutexattr;
imutexattr = ((const struct pthread_mutexattr *) mutexattr
?: &default_mutexattr); // 如果没有指定 mutexattr,那么取默认值
memset (mutex, '\0', __SIZEOF_PTHREAD_MUTEX_T); // 将内部值初始化0
mutex->__data.__kind = imutexattr->mutexkind & ~PTHREAD_MUTEXATTR_FLAG_BITS; // 设置 mutex 锁类型
if ((imutexattr->mutexkind & PTHREAD_MUTEXATTR_FLAG_ROBUST) != 0)
mutex->__data.__kind |= PTHREAD_MUTEX_ROBUST_NORMAL_NP;
}
...
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# pthread_mutex_lock 函数
该函数将会对 pthread_mutex_t 加锁,流程如下:
1、原子性将 (mutex)->**data.**lock 从 0 变为 1,若成功,那么不需要进入系统调用,直接返回
2、若失败,那么进入 futex 系统调用阻塞当前线程
我们看到这里就是 futex 快速的意义所在:如果没有用户态的竞争,不会进入系统调用,也即不需要上下文切换。
int __pthread_mutex_lock (mutex)
pthread_mutex_t *mutex;
{
if (__builtin_expect (type == PTHREAD_MUTEX_TIMED_NP, 1)) // 普通线程间共享、互斥不可重入锁
{
LLL_MUTEX_LOCK (mutex); // 使用该函数上锁
}
...
// 获取当前线程 ID 保存到 mutex 中,记录当前获取锁的线程
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
mutex->__data.__owner = id;
return 0;
}
# define LLL_MUTEX_LOCK(mutex) lll_lock ((mutex)->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex)) // 第一个参数为 lock 参数,用于 futex 使用,第二个参数 用于判断是否为线程共享锁,显然我们这里是
# define __lll_lock_asm_start LOCK_INSTR "cmpxchgl %1, %2\n\t" // 尝试获取锁
#define lll_lock(futex, private)
(void)
({ int ignore1, ignore2;
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) // 我们看这里即可
__asm __volatile (__lll_lock_asm_start
"jnz _L_lock_%=" // 首先尝试了 cmpxchgl %1, %2,也即 cmpxchgl %1, %futex => 用 eax 的 0 与 futex 比较,若 futex 是0,那么不需要进入 系统调用,将 1 替换到 futex 中即可
".subsection 1" // 将 subsection 和 previous 之间的函数放置另外的 section 1 节中
".type _L_lock_%=,@function" // 定义一个函数,名为:_L_lock_%=
"_L_lock_%=:"
"1: leal %2, %%ecx" // 获取 futex 的地址放入 ecx
"2: call __lll_lock_wait_private" // 调用该函数进入系统调用
"3: jmp 18f" // 结束后 跳转到 18 标号处 退出
"4: .size _L_lock_%=, 4b-1b"
".previous"
"18:"
: "=a" (ignore1), "=c" (ignore2), "=m" (futex)
: "0" (0), "1" (1), "m" (futex), // 输入时: eax = 1 ecx = 1 (内连汇编语法,详见混沌专题之并发原理)
"i" (MULTIPLE_THREADS_OFFSET)
: "memory");
else
{
... // 进程间锁
}
})
// 进入系统调用
__lll_lock_wait_private:
pushl %edx
pushl %ebx
pushl %esi
movl $2, %edx // 将 2 放入 edx
movl %ecx, %ebx // 将 futex 地址放入 ebx
xorl %esi, %esi // esi 用于保存超时时间,此时没有,所以清 0
movl $(FUTEX_WAIT | FUTEX_PRIVATE_FLAG), ecx // 通知内核执行 FUTEX_WAIT 操作
cmpl %edx, %eax // 看看 无锁状态下的 futex值 是否为 当前需要设置的值,调用该函数前,我们看到 eax 的值为0,此时将会跳转到 标号 2 处执行
jne 2f
1: movl $SYS_futex, %eax // 系统调用号放入 eax
int $0x80 // 进入系统调用
// 读者注意:系统调用返回后,仍需要判断 futex 的值是否为 0
2: movl %edx, %eax // 将 2 放入 eax 中
xchgl %eax, (%ebx) // 交换 futex 与 eax 的值,此时 futex 设置为 2,eax 中保存原有的 futex 的值
testl %eax, %eax // 此时若 futex 的值不为 0,那么跳转到 标号 1 处进入系统调用
jnz 1b
// 当系统调用返回时,还原之前保存的寄存器值
popl %esi
popl %ebx
popl %edx
ret
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# pthread_mutex_unlock 原理
该函数用于释放 mutex 锁,我们看到原理很简单:原子性将 mutex->**data.**lock 减 1 ,若结果为 0 不需要进入系统调用,若不为 0,那么进入系统调用唤醒 其他线程。
int __pthread_mutex_unlock (mutex)
pthread_mutex_t *mutex;
{
return __pthread_mutex_unlock_usercnt (mutex, 1);
}
int __pthread_mutex_unlock_usercnt (mutex, decr)
pthread_mutex_t *mutex;
int decr;
{
int type = PTHREAD_MUTEX_TYPE_ELISION (mutex); // 获取锁类型
...
if (__builtin_expect (type, PTHREAD_MUTEX_TIMED_NP)
== PTHREAD_MUTEX_TIMED_NP)
{ // 这里我们关注 正常的互斥锁
mutex->__data.__owner = 0; // 将锁定的持有 线程 id 清除
if (decr)
--mutex->__data.__nusers; // 减少锁使用数量
lll_unlock (mutex->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex)); // 执行解锁操作
return 0;
}
...
}
// 原子性释放锁操作
# define __lll_unlock_asm LOCK_INSTR "subl $1, %0\n\t"
#define lll_unlock(futex, private)
(void)
({ int ignore;
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE)
__asm __volatile (__lll_unlock_asm // 等价于 LOCK_INSTR "subl $1, futex " 对 futex 原子性减 1
"jne _L_unlock_%=\n\t" // 若减 1 值不为 0,那么跳转到 _L_unlock_%= 函数 ,否则退出
".subsection 1\n\t"
".type _L_unlock_%=,@function\n" // 定义 _L_unlock_%= 函数
"_L_unlock_%=:\n"
"1:\tleal %0, %%eax\n" // 取 futex 地址放入 eax 中
"2:\tcall __lll_unlock_wake_private\n" // 调用 __lll_unlock_wake_private 函数
"3:\tjmp 18f\n"
"4:\t.size _L_unlock_%=, 4b-1b\n\t"
".previous\n"
"18:"
: "=m" (futex), "=&a" (ignore)
: "m" (futex), "i" (MULTIPLE_THREADS_OFFSET)
: "memory"); \
else \
{ \
... // 省略进程间共享锁
}
})
// 进入内核唤醒进程
__lll_unlock_wake_private:
pushl %ebx
pushl %ecx
pushl %edx
movl %eax, %ebx // 将 futex 的值放入 ebx 中
movl $0, (%eax) // 将 futex 中的值设置为 0 ,表示此时无锁状态(那么考虑下?如果现在用户态的线程调用 lock函数调用锁,此时 是不是可以直接获取,那么内核唤醒后的线程是否能获取锁呢?)
movl $(FUTEX_WAKE | FUTEX_PRIVATE_FLAG), %ecx // 通知内核执行唤醒操作
movl $1, %edx // 通知内核只唤醒一个线程(互斥锁 没必要唤醒所有线程)
movl $SYS_futex, %eax // eax 保存 sys_futex 系统调用号
int $0x80 // 进入系统调用
// 系统调用返回,其他线程被唤醒,还原保存的寄存器即可
popl %edx
popl %ecx
popl %ebx
ret
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# sys_futex 原理
上面我们看到使用 sys_futex 系统调用完成阻塞和唤醒操作:FUTEX_WAIT(标识阻塞) 、FUTEX_WAKE(唤醒)。现在我们来看看该系统调用的实现过程。
asmlinkage long sys_futex(u32 __user *uaddr, int op, u32 val,
struct timespec __user *utime, u32 __user *uaddr2,
u32 val3)
{
struct timespec ts;
ktime_t t, *tp = NULL;
u32 val2 = 0;
int cmd = op & FUTEX_CMD_MASK;
if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI)) { // 指定了超时时间的,那么我们拷贝时间并生成 ktime_t 结构,这里忽略
if (copy_from_user(&ts, utime, sizeof(ts)) != 0)
return -EFAULT;
if (!timespec_valid(&ts))
return -EINVAL;
t = timespec_to_ktime(ts);
if (cmd == FUTEX_WAIT)
t = ktime_add(ktime_get(), t);
tp = &t;
}
// 如果使用 FUTEX_REQUEUE 命令 那么转换时间值为 val2
if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE)
val2 = (u32) (unsigned long) utime;
return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);
}
// 这里我们关注 futex_wait 和 futex_wake 函数即可,可以看到他们与 u32 __user *uaddr2, u32 val2, u32 val3 参数无关,因为它们都是用于 requeue操作的,这里我们没有涉及到该概念,所以我们了解即可
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
u32 __user *uaddr2, u32 val2, u32 val3)
{
int ret;
int cmd = op & FUTEX_CMD_MASK;
struct rw_semaphore *fshared = NULL;
if (!(op & FUTEX_PRIVATE_FLAG)) // 若不是线程间futex,那么需要获取 mmap_sem 信号量用于操作时互斥
fshared = ¤t->mm->mmap_sem;
switch (cmd) {
case FUTEX_WAIT:
ret = futex_wait(uaddr, fshared, val, timeout);
break;
case FUTEX_WAKE:
ret = futex_wake(uaddr, fshared, val);
break;
... // 其他操作我们忽略,这里关注 等待和唤醒
}
return ret;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# futex_wait 函数
该函数用于在用户态竞争 futex 变量失败时调用,此时将尝试阻塞当前线程,流程如下:
1、创建表示 futex 元数据的 struct futex_q
2、根据 struct futex_q q 的元数据 找到它应该放入 的 futex_hash_bucket 结构
3、获取 用户空间中 当前 futex 的值
4、若 futex 的值 发生改变,此时退出到用户空间重新尝试获取锁
5、若 futex 的值 未改变,那么将 struct futex_q q 放入 futex_hash_bucket 结构,同时将进程等待节点放入 struct futex_q q 的等待队列,然后调用 schedule 函数 切换其他进程执行,直到被唤醒或者等待超时
static int futex_wait(u32 __user *uaddr, struct rw_semaphore *fshared,
u32 val, ktime_t *abs_time)
{
struct task_struct *curr = current;
DECLARE_WAITQUEUE(wait, curr); // 初始化等待节点 wait_queue_t
struct futex_q q; // 表示当前 futex 的元数据信息
struct futex_hash_bucket *hb; // 每个 futex 元数据 将会放置在全局结构 static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS]; 中
u32 uval; // 保存当前用户空间 futex 的值
...
retry:
if (fshared) // 若为进程间共享锁,那么需要获取到保护该进程 mm_struct 的锁
down_read(fshared);
ret = get_futex_key(uaddr, fshared, &q.key); // 找到该 futex 所在的 mm 和虚拟内存地址(读者可自己阅读,非常容易懂:进程共享的话 找到 vma 然后检测权限,没问题则赋值给 futex_key ,也即 &q.key,如果是线程间共享,那么非常简单,直接赋值 mm 和 futex 地址到 futex_key 即可)
if (unlikely(ret != 0))
goto out_release_sem;
hb = queue_lock(&q, -1, NULL); // 根据当前 futex_q 的元数据信息计算hash值,找到需要将其放入的 struct futex_hash_bucket futex_queues 索引下标出的 struct futex_hash_bucket 指针,同时对 这个 struct futex_hash_bucket 结构 上自旋锁(因为 futex_q 需要放入到该 futex_hash_bucket 结构中,为了避免多线程竞争所以需要自旋锁)
ret = get_futex_value_locked(&uval, uaddr); // 原子性获取到当前用户空间的 futex 值,放入 uval 中
if (unlikely(ret)) {
... // 失败后释放锁重新开始
}
ret = -EWOULDBLOCK;
if (uval != val) // 当前用户态的 futex 值 与 传入的比较值,失败,那么我们必然不能够 阻塞,需要返回用户态重新尝试获取锁
goto out_unlock_release_sem;
// 如果此时用户空间 futex 的值没有发生变化,那么当前线程需要阻塞等待 futex 释放,将 futex_q 放入 futex_hash_bucket 队列,同时释放 futex_hash_bucket 的自旋锁
__queue_me(&q, hb);
// 若为进程间共享锁,那么这里需要释放保护 mm 的信号量,因为进程在睡眠时,不能持有该锁
if (fshared)
up_read(fshared);
// 设置当前进程状态为可中断的阻塞状态,然后将当前进程放入 futex_q 的阻塞列表
__set_current_state(TASK_INTERRUPTIBLE);
add_wait_queue(&q.waiters, &wait);
if (likely(!plist_node_empty(&q.list))) { // 若 futex_hash_bucket 队列中不为空(当前进程仍处于队列中等待),那么根据当前线程设置的超时时间来进行阻塞,我们这里没有超时时间,所以直接调用 schedule 函数交出控制权
if (!abs_time)
schedule();
else {
... // 否则进行超时等待
}
}
// 被唤醒后,或者当前 futex_hash_bucket 队列中只存在当前 进程,那么将进程状态恢复为 执行状态
__set_current_state(TASK_RUNNING);
if (!unqueue_me(&q)) // 将当前进程从 futex_hash_bucket 队列中去除,若成功则返回
return 0;
if (rem) // 若发生超时那么返回超时错误,在上述 else块中发生,这里了解即可
return -ETIMEDOUT;
...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# futex_wake 函数
该函数用于释放调用 futex_wait 等待的进程,流程如下:
1、根据 futex 地址 ,找到对应的 futex_key key,根据该 key 找到 所述的 struct futex_hash_bucket *hb
2、对 futex_hash_bucket 结构上自旋锁
3、遍历该 futex_hash_bucket 中的所有 futex_key ,找到与需要唤醒的 futex 的进程所匹配的 futex_q,然后唤醒阻塞在上面的线程,nr_wake 用于表示唤醒的进程个数
static int futex_wake(u32 __user *uaddr, struct rw_semaphore *fshared, int nr_wake)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
struct plist_head *head;
union futex_key key;
int ret;
if (fshared) // 操作进程间共享的 futex 都需要锁定 mm 的锁,保护 mm 和 vma 不会发生改变
down_read(fshared);
ret = get_futex_key(uaddr, fshared, &key); // 获取该 futex 的元数据 futex_key key
if (unlikely(ret != 0))
goto out;
hb = hash_futex(&key); // 找到该key 所属的 futex_hash_bucket
spin_lock(&hb->lock); // 获取自旋锁,保证原子性
head = &hb->chain; // 获取头结点
plist_for_each_entry_safe(this, next, head, list) { // 遍历所有节点,找到匹配 当前 futex 的 key,然后唤醒等待在该 futex的进程
if (match_futex (&this->key, &key)) { // 为何需要匹配?因为 hash 冲突,可能该 futex 已经被别的线程释放了,不是当前 需要的 futex 元数据
...
wake_futex(this); // 唤醒阻塞在 当前 futex 上的线程
if (++ret >= nr_wake) // 唤醒 nr_wake 个 匹配成功的 futex 等待进程
break;
}
}
...
}
static void wake_futex(struct futex_q *q)
{
plist_del(&q->list, &q->list.plist); // 首先将当前 futex_q 节点 从 futex_hash_bucket 中移出
if (q->filp) // 若指定了唤醒了通知的 fd,那么发送信号通知,这里我们了解即可
send_sigio(&q->filp->f_owner, q->fd, POLL_IN);
wake_up_all(&q->waiters); // 唤醒所有等待在该 futex_q 上的进程
smp_wmb(); // 保证在释放 lock_ptr 锁之前,前面的 store 操作全部提交到内存
q->lock_ptr = NULL;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 总结
1、对于 pthread_mutex_lock 而言,需要用户空间的代码作为配合实现 快速互斥体的语义:没有竞争时无需进入内核。在 pthread 线程库中,默认无锁状态 futex 为 0,原子性将其 设置为 1(使用 lock cmpxchg 来完成该操作)若 CAS 成功,那么返回,否则 设置 futex 的值 为 2,并陷入内核等待
2、对于 pthread_mutex_unlock 而言,pthread 库首先原子性对 futex 值减 1,若结果为0,那么表示没有竞争,此时无需进入内核空间唤醒阻塞线程,若结果不为0, 比如上述的 2 -1 ,那么表示内核态有阻塞线程,那么首先释放锁,然后进入内核唤醒一个等待的线程(每个线程一个 futex_q) ,然后该线程回到用户态尝试获取锁
3、如果其他线程在线程未陷入内核态之前,但已经释放锁后,那么在内核中没有对当前 futex 的值进行判断,直接就唤醒,所以该线程唤醒后,依然拿不到锁,又得回去阻塞,但这是无解的操作,因为用户态什么时候 cas 到了 futex 内核无法得知,所以需要线程自己回到用户态自己重试