# Linux Futex 快速用户空间互斥体原理

在 pthread 编程中,我们通常使用如下代码来完成互斥量和条件变量的使用,那么本文将详细说明 Linux 提供的 futex 与pthread 线程库的搭配使用原理。

pthread_mutex_t _mutex[1];
pthread_mutex_init (_mutex, NULL); // 初始化互斥体
pthread_mutex_lock(_mutex); // 互斥体上锁
// doSomething
pthread_mutex_unlock(_mutex); // 对互斥体解锁
1
2
3
4
5

# pthread_mutex_init 函数

该函数用于初始化 pthread_mutex_t 结构。注意,mutex 可以用于 进程之间共享、Robust 健壮锁(线程或者进程挂掉自动释放)、可重入锁、自适应锁等等,但是我们这里讨论简单的线程互斥锁(不可重入),此时较为简单,因为 线程之间 共享地址和数据,所以这里简单初始化即可,读者不必深入研究。

static const struct pthread_mutexattr default_mutexattr = {
    // 默认互斥体,表示在线程中共享使用,而不是在进程之间使用
    .mutexkind = PTHREAD_MUTEX_NORMAL
};// 互斥体锁类型枚举
enum {
    PTHREAD_MUTEX_TIMED_NP,
    PTHREAD_MUTEX_NORMAL = PTHREAD_MUTEX_TIMED_NP
}int __pthread_mutex_init (mutex, mutexattr)
     pthread_mutex_t *mutex;
     const pthread_mutexattr_t *mutexattr;
{
  const struct pthread_mutexattr *imutexattr;
  imutexattr = ((const struct pthread_mutexattr *) mutexattr
        ?: &default_mutexattr); // 如果没有指定 mutexattr,那么取默认值
  memset (mutex, '\0', __SIZEOF_PTHREAD_MUTEX_T); // 将内部值初始化0
  mutex->__data.__kind = imutexattr->mutexkind & ~PTHREAD_MUTEXATTR_FLAG_BITS; // 设置 mutex 锁类型if ((imutexattr->mutexkind & PTHREAD_MUTEXATTR_FLAG_ROBUST) != 0)
      mutex->__data.__kind |= PTHREAD_MUTEX_ROBUST_NORMAL_NP;
  }
  ...
  return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# pthread_mutex_lock 函数

该函数将会对 pthread_mutex_t 加锁,流程如下:

1、原子性将 (mutex)->**data.**lock 从 0 变为 1,若成功,那么不需要进入系统调用,直接返回

2、若失败,那么进入 futex 系统调用阻塞当前线程

我们看到这里就是 futex 快速的意义所在:如果没有用户态的竞争,不会进入系统调用,也即不需要上下文切换。

int __pthread_mutex_lock (mutex)
    pthread_mutex_t *mutex;
{
    if (__builtin_expect (type == PTHREAD_MUTEX_TIMED_NP, 1)) // 普通线程间共享、互斥不可重入锁
    {
        LLL_MUTEX_LOCK (mutex); // 使用该函数上锁
    }
    ...
    // 获取当前线程 ID 保存到 mutex 中,记录当前获取锁的线程
    pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
    mutex->__data.__owner = id;
    return 0;
}# define LLL_MUTEX_LOCK(mutex)   lll_lock ((mutex)->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex)) // 第一个参数为 lock 参数,用于 futex 使用,第二个参数 用于判断是否为线程共享锁,显然我们这里是
​
​
# define __lll_lock_asm_start LOCK_INSTR "cmpxchgl %1, %2\n\t"  // 尝试获取锁#define lll_lock(futex, private) 
  (void)                                      
    ({ int ignore1, ignore2;                              
       if (__builtin_constant_p (private) && (private) == LLL_PRIVATE)    // 我们看这里即可    
     __asm __volatile (__lll_lock_asm_start                   
               "jnz _L_lock_%="  // 首先尝试了 cmpxchgl %1, %2,也即 cmpxchgl %1, %futex => 用 eax 的 0 与 futex 比较,若 futex 是0,那么不需要进入 系统调用,将 1 替换到 futex 中即可
                       
               ".subsection 1"  // 将 subsection 和 previous 之间的函数放置另外的 section 1 节中
               ".type _L_lock_%=,@function" // 定义一个函数,名为:_L_lock_%=
               "_L_lock_%=:"                      
               "1: leal %2, %%ecx"      // 获取 futex 的地址放入  ecx        
               "2: call __lll_lock_wait_private"    // 调用该函数进入系统调用      
               "3: jmp 18f"                   // 结束后 跳转到 18 标号处 退出
               "4: .size _L_lock_%=, 4b-1b"           
               ".previous"                    
          
               "18:"                          
               : "=a" (ignore1), "=c" (ignore2), "=m" (futex)     
               : "0" (0), "1" (1), "m" (futex),  // 输入时: eax = 1  ecx = 1 (内连汇编语法,详见混沌专题之并发原理)        
                 "i" (MULTIPLE_THREADS_OFFSET)            
               : "memory");                       
       else                                   
     {                                    
       ... // 进程间锁
     }                                    
    })// 进入系统调用
__lll_lock_wait_private:
    pushl   %edx
    pushl   %ebx
    pushl   %esi
    movl    $2, %edx // 将 2 放入 edx
    movl    %ecx, %ebx // 将 futex 地址放入 ebx
    xorl    %esi, %esi  // esi 用于保存超时时间,此时没有,所以清 0
    movl    $(FUTEX_WAIT | FUTEX_PRIVATE_FLAG), ecx // 通知内核执行 FUTEX_WAIT 操作
    cmpl    %edx, %eax  // 看看 无锁状态下的 futex值 是否为 当前需要设置的值,调用该函数前,我们看到 eax 的值为0,此时将会跳转到 标号 2 处执行
    jne 2f1:  movl    $SYS_futex, %eax // 系统调用号放入 eax
    int $0x80 // 进入系统调用
    
    // 读者注意:系统调用返回后,仍需要判断  futex 的值是否为 0
2:  movl    %edx, %eax  // 将 2 放入 eax 中
    xchgl   %eax, (%ebx) // 交换 futex 与 eax 的值,此时 futex 设置为 2,eax 中保存原有的 futex 的值
    testl   %eax, %eax  // 此时若 futex 的值不为 0,那么跳转到 标号 1 处进入系统调用
    jnz 1b
​
    // 当系统调用返回时,还原之前保存的寄存器值
    popl    %esi
    popl    %ebx
    popl    %edx
    ret
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

# pthread_mutex_unlock 原理

该函数用于释放 mutex 锁,我们看到原理很简单:原子性将 mutex->**data.**lock 减 1 ,若结果为 0 不需要进入系统调用,若不为 0,那么进入系统调用唤醒 其他线程。

int __pthread_mutex_unlock (mutex)
     pthread_mutex_t *mutex;
{
  return __pthread_mutex_unlock_usercnt (mutex, 1); 
}int __pthread_mutex_unlock_usercnt (mutex, decr)
     pthread_mutex_t *mutex;
     int decr;
{
  int type = PTHREAD_MUTEX_TYPE_ELISION (mutex); // 获取锁类型
  ...
  if (__builtin_expect (type, PTHREAD_MUTEX_TIMED_NP)
      == PTHREAD_MUTEX_TIMED_NP)
    {  // 这里我们关注 正常的互斥锁
      mutex->__data.__owner = 0; // 将锁定的持有 线程 id 清除
      if (decr)
         --mutex->__data.__nusers; // 减少锁使用数量
      lll_unlock (mutex->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex)); // 执行解锁操作
      return 0;
    }
  ...
}// 原子性释放锁操作
# define __lll_unlock_asm LOCK_INSTR "subl $1, %0\n\t"#define lll_unlock(futex, private) 
  (void)                                      
    ({ int ignore;                                
       if (__builtin_constant_p (private) && (private) == LLL_PRIVATE)        
     __asm __volatile (__lll_unlock_asm     // 等价于 LOCK_INSTR "subl $1, futex " 对 futex 原子性减 1 
               "jne _L_unlock_%=\n\t"   // 若减 1 值不为 0,那么跳转到 _L_unlock_%= 函数 ,否则退出
                       
               ".subsection 1\n\t"                    
               ".type _L_unlock_%=,@function\n" // 定义 _L_unlock_%= 函数
               "_L_unlock_%=:\n"                      
               "1:\tleal %0, %%eax\n"   // 取 futex 地址放入 eax 中            
               "2:\tcall __lll_unlock_wake_private\n"   // 调用  __lll_unlock_wake_private 函数    
               "3:\tjmp 18f\n"                    
               "4:\t.size _L_unlock_%=, 4b-1b\n\t"            
               ".previous\n"    
                       
               "18:"                          
               : "=m" (futex), "=&a" (ignore)             
               : "m" (futex), "i" (MULTIPLE_THREADS_OFFSET)       
               : "memory");                       \
       else                                   \
     {                                    \
       ... // 省略进程间共享锁
     }                                    
    })// 进入内核唤醒进程
__lll_unlock_wake_private:
    pushl   %ebx
    pushl   %ecx
    pushl   %edx
    movl    %eax, %ebx // 将 futex 的值放入 ebx 中
    movl    $0, (%eax) // 将 futex 中的值设置为 0 ,表示此时无锁状态(那么考虑下?如果现在用户态的线程调用 lock函数调用锁,此时 是不是可以直接获取,那么内核唤醒后的线程是否能获取锁呢?)
    movl    $(FUTEX_WAKE | FUTEX_PRIVATE_FLAG), %ecx // 通知内核执行唤醒操作
    movl    $1, %edx    // 通知内核只唤醒一个线程(互斥锁 没必要唤醒所有线程)
    movl    $SYS_futex, %eax // eax 保存 sys_futex 系统调用号
    int $0x80 // 进入系统调用
        
    // 系统调用返回,其他线程被唤醒,还原保存的寄存器即可
    popl    %edx
    popl    %ecx
    popl    %ebx
    ret    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

# sys_futex 原理

上面我们看到使用 sys_futex 系统调用完成阻塞和唤醒操作:FUTEX_WAIT(标识阻塞) 、FUTEX_WAKE(唤醒)。现在我们来看看该系统调用的实现过程。

asmlinkage long sys_futex(u32 __user *uaddr, int op, u32 val,
              struct timespec __user *utime, u32 __user *uaddr2,
              u32 val3)
{
    struct timespec ts;
    ktime_t t, *tp = NULL;
    u32 val2 = 0;
    int cmd = op & FUTEX_CMD_MASK;if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI)) { // 指定了超时时间的,那么我们拷贝时间并生成 ktime_t 结构,这里忽略
        if (copy_from_user(&ts, utime, sizeof(ts)) != 0)
            return -EFAULT;
        if (!timespec_valid(&ts))
            return -EINVAL;
​
        t = timespec_to_ktime(ts);
        if (cmd == FUTEX_WAIT)
            t = ktime_add(ktime_get(), t);
        tp = &t;
    }// 如果使用 FUTEX_REQUEUE 命令 那么转换时间值为 val2
    if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE)
        val2 = (u32) (unsigned long) utime;return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);
}// 这里我们关注 futex_wait 和 futex_wake 函数即可,可以看到他们与  u32 __user *uaddr2, u32 val2, u32 val3 参数无关,因为它们都是用于 requeue操作的,这里我们没有涉及到该概念,所以我们了解即可
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
        u32 __user *uaddr2, u32 val2, u32 val3)
{
    int ret;
    int cmd = op & FUTEX_CMD_MASK;
    struct rw_semaphore *fshared = NULL;if (!(op & FUTEX_PRIVATE_FLAG)) // 若不是线程间futex,那么需要获取 mmap_sem 信号量用于操作时互斥
        fshared = &current->mm->mmap_sem;switch (cmd) {
    case FUTEX_WAIT:
        ret = futex_wait(uaddr, fshared, val, timeout);
        break;
    case FUTEX_WAKE:
        ret = futex_wake(uaddr, fshared, val);
        break;
    ... // 其他操作我们忽略,这里关注 等待和唤醒
    }
    return ret;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

# futex_wait 函数

该函数用于在用户态竞争 futex 变量失败时调用,此时将尝试阻塞当前线程,流程如下:

1、创建表示 futex 元数据的 struct futex_q

2、根据 struct futex_q q 的元数据 找到它应该放入 的 futex_hash_bucket 结构

3、获取 用户空间中 当前 futex 的值

4、若 futex 的值 发生改变,此时退出到用户空间重新尝试获取锁

5、若 futex 的值 未改变,那么将 struct futex_q q 放入 futex_hash_bucket 结构,同时将进程等待节点放入 struct futex_q q 的等待队列,然后调用 schedule 函数 切换其他进程执行,直到被唤醒或者等待超时

static int futex_wait(u32 __user *uaddr, struct rw_semaphore *fshared,
              u32 val, ktime_t *abs_time)
{
    struct task_struct *curr = current;
    DECLARE_WAITQUEUE(wait, curr); // 初始化等待节点   wait_queue_t
    struct futex_q q; // 表示当前 futex 的元数据信息
    struct futex_hash_bucket *hb; // 每个 futex 元数据 将会放置在全局结构 static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS]; 中
    u32 uval; // 保存当前用户空间 futex 的值
    ...
 retry:
    if (fshared)  // 若为进程间共享锁,那么需要获取到保护该进程 mm_struct 的锁
        down_read(fshared);
    ret = get_futex_key(uaddr, fshared, &q.key); // 找到该 futex 所在的 mm 和虚拟内存地址(读者可自己阅读,非常容易懂:进程共享的话 找到 vma 然后检测权限,没问题则赋值给 futex_key ,也即 &q.key,如果是线程间共享,那么非常简单,直接赋值 mm 和 futex 地址到 futex_key 即可)
    if (unlikely(ret != 0)) 
        goto out_release_sem;
    hb = queue_lock(&q, -1, NULL); // 根据当前 futex_q 的元数据信息计算hash值,找到需要将其放入的 struct futex_hash_bucket futex_queues 索引下标出的 struct futex_hash_bucket 指针,同时对 这个 struct futex_hash_bucket  结构 上自旋锁(因为 futex_q 需要放入到该 futex_hash_bucket 结构中,为了避免多线程竞争所以需要自旋锁)
​
    ret = get_futex_value_locked(&uval, uaddr); // 原子性获取到当前用户空间的 futex 值,放入 uval 中
    if (unlikely(ret)) {
        ... // 失败后释放锁重新开始
    }
    ret = -EWOULDBLOCK;
    if (uval != val) // 当前用户态的 futex 值 与 传入的比较值,失败,那么我们必然不能够 阻塞,需要返回用户态重新尝试获取锁
        goto out_unlock_release_sem;// 如果此时用户空间 futex 的值没有发生变化,那么当前线程需要阻塞等待 futex 释放,将 futex_q 放入 futex_hash_bucket 队列,同时释放 futex_hash_bucket 的自旋锁
    __queue_me(&q, hb);
    
    // 若为进程间共享锁,那么这里需要释放保护 mm  的信号量,因为进程在睡眠时,不能持有该锁
    if (fshared)
        up_read(fshared);
    // 设置当前进程状态为可中断的阻塞状态,然后将当前进程放入 futex_q 的阻塞列表
    __set_current_state(TASK_INTERRUPTIBLE);
    add_wait_queue(&q.waiters, &wait);if (likely(!plist_node_empty(&q.list))) { // 若 futex_hash_bucket 队列中不为空(当前进程仍处于队列中等待),那么根据当前线程设置的超时时间来进行阻塞,我们这里没有超时时间,所以直接调用 schedule 函数交出控制权
        if (!abs_time)
            schedule();
        else {
            ... // 否则进行超时等待
        }
    }
    // 被唤醒后,或者当前 futex_hash_bucket 队列中只存在当前 进程,那么将进程状态恢复为 执行状态
    __set_current_state(TASK_RUNNING);if (!unqueue_me(&q)) // 将当前进程从 futex_hash_bucket 队列中去除,若成功则返回
        return 0;
    
    if (rem) // 若发生超时那么返回超时错误,在上述 else块中发生,这里了解即可
        return -ETIMEDOUT;
    ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# futex_wake 函数

该函数用于释放调用 futex_wait 等待的进程,流程如下:

1、根据 futex 地址 ,找到对应的 futex_key key,根据该 key 找到 所述的 struct futex_hash_bucket *hb

2、对 futex_hash_bucket 结构上自旋锁

3、遍历该 futex_hash_bucket 中的所有 futex_key ,找到与需要唤醒的 futex 的进程所匹配的 futex_q,然后唤醒阻塞在上面的线程,nr_wake 用于表示唤醒的进程个数

static int futex_wake(u32 __user *uaddr, struct rw_semaphore *fshared, int nr_wake)
{
    struct futex_hash_bucket *hb;
    struct futex_q *this, *next;
    struct plist_head *head;
    union futex_key key;
    int ret;if (fshared) // 操作进程间共享的 futex 都需要锁定 mm 的锁,保护 mm 和 vma 不会发生改变
        down_read(fshared);
​
    ret = get_futex_key(uaddr, fshared, &key); // 获取该 futex 的元数据 futex_key key
    if (unlikely(ret != 0))
        goto out;
    
    hb = hash_futex(&key); // 找到该key 所属的 futex_hash_bucket
    spin_lock(&hb->lock); // 获取自旋锁,保证原子性
    head = &hb->chain; // 获取头结点
    plist_for_each_entry_safe(this, next, head, list) { // 遍历所有节点,找到匹配 当前 futex 的 key,然后唤醒等待在该 futex的进程
        if (match_futex (&this->key, &key)) { // 为何需要匹配?因为 hash 冲突,可能该 futex 已经被别的线程释放了,不是当前 需要的 futex 元数据
            ...
            wake_futex(this); // 唤醒阻塞在 当前 futex 上的线程
            if (++ret >= nr_wake) // 唤醒 nr_wake 个 匹配成功的 futex 等待进程
                break;
        }
    }
    ...
}static void wake_futex(struct futex_q *q)
{
    plist_del(&q->list, &q->list.plist); // 首先将当前 futex_q 节点 从 futex_hash_bucket 中移出
    if (q->filp) // 若指定了唤醒了通知的 fd,那么发送信号通知,这里我们了解即可
        send_sigio(&q->filp->f_owner, q->fd, POLL_IN);
    wake_up_all(&q->waiters); // 唤醒所有等待在该 futex_q 上的进程
    smp_wmb(); // 保证在释放 lock_ptr 锁之前,前面的 store 操作全部提交到内存
    q->lock_ptr = NULL;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 总结

1、对于 pthread_mutex_lock 而言,需要用户空间的代码作为配合实现 快速互斥体的语义:没有竞争时无需进入内核。在 pthread 线程库中,默认无锁状态 futex 为 0,原子性将其 设置为 1(使用 lock cmpxchg 来完成该操作)若 CAS 成功,那么返回,否则 设置 futex 的值 为 2,并陷入内核等待

2、对于 pthread_mutex_unlock 而言,pthread 库首先原子性对 futex 值减 1,若结果为0,那么表示没有竞争,此时无需进入内核空间唤醒阻塞线程,若结果不为0, 比如上述的 2 -1 ,那么表示内核态有阻塞线程,那么首先释放锁,然后进入内核唤醒一个等待的线程(每个线程一个 futex_q) ,然后该线程回到用户态尝试获取锁

3、如果其他线程在线程未陷入内核态之前,但已经释放锁后,那么在内核中没有对当前 futex 的值进行判断,直接就唤醒,所以该线程唤醒后,依然拿不到锁,又得回去阻塞,但这是无解的操作,因为用户态什么时候 cas 到了 futex 内核无法得知,所以需要线程自己回到用户态自己重试