DumpStack

静下来,享受技术!
  1. 首页
  2. locking
  3. 正文

内核同步机制之(五):rwsem - 读写锁

2022年9月18日 3082点热度 2人点赞 2条评论

关注公众号不迷路:DumpStack

扫码加关注

目录

  • 一、数据结构
    • 1.1 rw_semaphore
      • 1.1.1 count成员不同bit的含义
      • 1.1.2 owner成员不同bit的含义
    • 1.2 rwsem_waiter - 描述一个waiter
    • 1.3 rwsem_waiter_type - 标记waiter是reader还是writer
  • 二、概念梳理
    • 2.1 什么是乐观自旋?
    • 2.2 什么是"偷锁"?
    • 2.3 HANDOFF机制
  • 三、osq机制
    • 3.1 简介
    • 3.2 数据结构
      • 3.2.1 optimistic_spin_queue
      • 3.2.2 optimistic_spin_node
    • 3.3 osq_lock
      • 3.3.1 encode_cpu
      • 3.3.2 node_cpu
      • 3.3.3 decode_cpu
    • 3.4 osq_unlock
  • 四、乐观自旋
    • 4.1 rwsem_can_spin_on_owner - 判断当前场景是否可以自旋
      • 4.1.1 rwsem_owner_flags - 剥离owner的flags和task_struct字段
      • 4.1.2 owner_on_cpu - 判读持锁者owner当前是不是正在running
    • 4.2 rwsem_optimistic_spin - 乐观自旋流程
      • 4.2.1 rwsem_try_read_lock_unqueued - reader尝试"偷锁"
      • 4.2.2 rwsem_try_write_lock_unqueued - writer尝试"偷锁"
      • 4.2.3 rwsem_rspin_threshold - 计算当reader在临界区时,writer自旋的阈值
    • 4.3 rwsem_spin_on_owner - 围着writer转
      • 4.3.1 owner_state - 描述临界区的状态
      • 4.3.2 rwsem_owner_state - 获取临界区的状态
  • 五、唤醒逻辑
    • 5.1 rwsem_first_waiter - 取出wait_list链表最前面的waiter
    • 5.2 rwsem_mark_wake - 从wait_list链表上搜集要唤醒的线程
      • 5.1.1 rwsem_wake_type - 要唤醒哪些线程
    • 5.3 rwsem_wake - 唤醒wait_list链表上的线程
  • 六、读操作
    • 6.1 down_read - 拿不到锁进D状态
    • 6.2 __down_read_trylock - 直接拿锁,当reader在临界区,并且wait_list上没有阻塞的线程时,reader可以直接拿到锁
      • 6.2.1 rwsem_set_reader_owned - 将current设置为读owner
      • 6.2.2 __rwsem_set_reader_owned - 将指定的task设置为读owner
    • 6.3 __down_read
    • 6.4 rwsem_read_trylock - 再次尝试直接拿锁,当reader在临界区,并且wait_list上没有阻塞的线程时,reader可以直接拿到锁
      • 6.4.1 rwsem_set_nonspinnable - 设置RWSEM_NONSPINNABLE标记,关闭自旋
    • 6.5 rwsem_down_read_slowpath - 慢速路径持锁
      • 6.5.1 rwsem_reader_phase_trylock - 在挂入wait_list之前,再挣扎一下
    • 6.6 up_read - reader退出临界区时释放锁
    • 6.7 __up_read - reader退出临界区时释放锁
      • 6.7.1 rwsem_clear_reader_owned - reader退出临界区时清除owner
      • 6.7.2 clear_wr_nonspinnable - 只清除RWSEM_WR_NONSPINNABLE标记,优先让writer通过自旋拿锁
      • 6.7.3 rwsem_test_oflags - 检查owner的某个标记位是否被设置
  • 七、写操作
    • 7.1 down_write - 拿不到锁进D状态
    • 7.2 __down_write_trylock - 快速路径,无人拿锁时直接获取到锁
      • 7.2.1 rwsem_set_owner - writer拿到锁时将current设置为owner
    • 7.3 __down_write
    • 7.4 rwsem_down_write_slowpath - 慢速持锁路径
      • 7.4.1 writer_wait_state - 标记正在申请writer锁的线程的状态
      • 7.4.2 rwsem_try_write_lock - 尝试获取锁或者设置RWSEM_FLAG_HANDOFF标记
    • 7.5 up_write
    • 7.6 __up_write
      • 7.6.1 rwsem_clear_owner - writer退出临界区时清除owner,直接设置为0
  • 八、其他扩展接口
    • 8.1 down_read_interruptible - 拿不到锁进sleep
    • 8.2 down_read_killable - 拿不到锁将自己kill掉
    • 8.3 down_read_trylock - 拿不到锁后也不睡眠
    • 8.4 down_write_killable - 拿不到锁后将自己kill掉
    • 8.5 down_write_trylock - 拿不到锁后也不睡眠
  • 九、要点总结
    • 9.1 当reader在临界区时,owner的task_struct字段指向的线程并不一定在临界区
    • 9.2 当reader在临界区时,后来的reader能否直接进入临界区?
    • 9.3 什么时候禁止自旋(设置nonspinable标记)
    • 9.4 什么时候允许自旋(清除nonspinable标记)
    • 9.5 什么时候清除RWSEM_WR_NONSPINNABLE,允许writer自旋
    • 9.6 什么时候设置handoff
  • 十、参考文档
  • 关注公众号不迷路:DumpStack

 

 

 

 

rwsem特性:

a) 允许多个reader同时进入临界区;

b) 只允许一个writer同时进入临界区;

c) 当临界区是reader时,其他reader可以进入临界区,但是writer必须等着;

d) 当临界区是writer时,其他任何线程,不管是reader还是writer,都必须等着;

 

本文分析基于Linux5.10.61

 

一、数据结构

1.1 rw_semaphore

在设计rw_semaphore数据结构时考虑到了cache的一致性,我们自己在编码时也应该考虑下面信息,具体参见下面最前面的一段注释

a) 对于无竞争的rwsem,count和owner是一个task在获取rwsem时需要访问的唯一字段,因此,将它们彼此相邻放置,可以增加它们共享cacheline的机会

b) 在竞争rwsem中,owner可能是此结构中最常访问的字段,因为持有osq锁的乐观等待者将在owner上自旋。对于嵌入的rwsem,上层结构中的其它热字段应远离rwsem,以减少它们共享相同cacheline导致cacheline抖动问题的机会

/*

* For an uncontended rwsem, count and owner are the only fields a task

* needs to touch when acquiring the rwsem. So they are put next to each

* other to increase the chance that they will share the same cacheline.

*

* In a contended rwsem, the owner is likely the most frequently accessed

* field in the structure as the optimistic waiter that holds the osq lock

* will spin on owner. For an embedded rwsem, other hot fields in the

* containing structure should be moved further away from the rwsem to

* reduce the chance that they will share the same cacheline causing

* cacheline bouncing problem.

*/

struct rw_semaphore {

    //count的不用的bit代表不同的含义

    atomic_long_t count;

    /*

     * Write owner or one of the read owners as well flags regarding

     * the current state of the rwsem. Can be used as a speculative

     * check to see if the write owner is running on the cpu.

     */

    //如果是读持锁,只能记录最后一个进入临界区的reader

    //如果是写持锁,因为临界区只允许一个writer,owner记录的是那个唯一的writer

    atomic_long_t owner;

#ifdef CONFIG_RWSEM_SPIN_ON_OWNER

    struct optimistic_spin_queue osq; /* spinner MCS lock */

#endif

    raw_spinlock_t wait_lock;

 

    //阻塞的线程都挂在这个链表上,新加进来的task挂在链表末尾,唤醒从链表最前面开始唤醒

    struct list_head wait_list;

#ifdef CONFIG_DEBUG_RWSEMS

    void *magic;

#endif

#ifdef CONFIG_DEBUG_LOCK_ALLOC

    struct lockdep_map dep_map;

#endif

};

 

1.1.1 count成员不同bit的含义

各个字段含义如下:

 

相关宏定义如下:

/*

* On 64-bit architectures, the bit definitions of the count are:

*

* Bit 0 - writer locked bit

* Bit 1 - waiters present bit

* Bit 2 - lock handoff bit

* Bits 3-7 - reserved

* Bits 8-62 - 55-bit reader count

* Bit 63 - read fail bit

*

* On 32-bit architectures, the bit definitions of the count are:

*

* Bit 0 - writer locked bit

* Bit 1 - waiters present bit

* Bit 2 - lock handoff bit

* Bits 3-7 - reserved

* Bits 8-30 - 23-bit reader count

* Bit 31 - read fail bit

*

* It is not likely that the most significant bit (read fail bit) will ever

* be set. This guard bit is still checked anyway in the down_read() fastpath

* just in case we need to use up more of the reader bits for other purpose

* in the future.

*

* atomic_long_fetch_add() is used to obtain reader lock, whereas

* atomic_long_cmpxchg() will be used to obtain writer lock.

*

* There are three places where the lock handoff bit may be set or cleared.

* 1) rwsem_mark_wake() for readers.

* 2) rwsem_try_write_lock() for writers.

* 3) Error path of rwsem_down_write_slowpath().

*

* For all the above cases, wait_lock will be held. A writer must also

* be the first one in the wait_list to be eligible for setting the handoff

* bit. So concurrent setting/clearing of handoff bit is not possible.

*/

 

//RWSEM_WRITER_LOCKED : 标记当前是reader还是writer在临界区

//RWSEM_FLAG_WAITERS : 标记当前wait_list链表上有阻塞的线程

//RWSEM_FLAG_HANDOFF : 禁止自旋"偷锁",下一次将锁交给wait_list最前面的waiter

//RWSEM_FLAG_READFAIL : reader在临界区,并且临界区的reader已经非常多了,计数已经溢出

// 因为其位于最高bit,所以代码中一般通过判断正负,来判断是否有太多

// reader在临界区了

#define RWSEM_WRITER_LOCKED    (1UL << 0)                                //bit0

#define RWSEM_FLAG_WAITERS    (1UL << 1)                                //bit1

#define RWSEM_FLAG_HANDOFF    (1UL << 2)                                //bit2

#define RWSEM_FLAG_READFAIL    (1UL << (BITS_PER_LONG - 1))            //最高bit

 

//RWSEM_READER_BIAS : 实现对count字段加1

//RWSEM_READER_MASK : 不为0时,表示当前有reader在临界区

#define RWSEM_READER_SHIFT    8

#define RWSEM_READER_BIAS        (1UL << RWSEM_READER_SHIFT)            //可直线count字段加1

#define RWSEM_READER_MASK        (~(RWSEM_READER_BIAS - 1))            //bit[8:63]

 

//RWSEM_WRITER_MASK : 为1时,表示当前writer在临界区

//RWSEM_LOCK_MASK : 该字段不为0,表示当前有人在临界区,不管是reader还是writer

#define RWSEM_WRITER_MASK        RWSEM_WRITER_LOCKED                        //bit0

#define RWSEM_LOCK_MASK        (RWSEM_WRITER_MASK|RWSEM_READER_MASK)    //bit0和bit[8:63]

 

//除去count和resv字段之外的所有字段,如果该字段不为0,表示后

//来的reader不能通过快速路径拿到锁,必须进行乐观自旋或者睡眠了

// a) 已经有writer进入临界区

// b) 或有人(可能是reader或writer)阻塞在这把锁上

// c) 或当前已近禁止自旋"偷锁"

// d) 临界区的reader已经撑爆了

#define RWSEM_READ_FAILED_MASK    (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|\

                 RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL)

 

1.1.2 owner成员不同bit的含义

各个字段含义如下:

 

相关宏定义如下:

/*

* The least significant 3 bits of the owner value has the following

* meanings when set.

* - Bit 0: RWSEM_READER_OWNED - The rwsem is owned by readers

* - Bit 1: RWSEM_RD_NONSPINNABLE - Readers cannot spin on this lock.

* - Bit 2: RWSEM_WR_NONSPINNABLE - Writers cannot spin on this lock.

*

* When the rwsem is either owned by an anonymous writer, or it is

* reader-owned, but a spinning writer has timed out, both nonspinnable

* bits will be set to disable optimistic spinning by readers and writers.

* In the later case, the last unlocking reader should then check the

* writer nonspinnable bit and clear it only to give writers preference

* to acquire the lock via optimistic spinning, but not readers. Similar

* action is also done in the reader slowpath.

 

* When a writer acquires a rwsem, it puts its task_struct pointer

* into the owner field. It is cleared after an unlock.

*

* When a reader acquires a rwsem, it will also puts its task_struct

* pointer into the owner field with the RWSEM_READER_OWNED bit set.

* On unlock, the owner field will largely be left untouched. So

* for a free or reader-owned rwsem, the owner value may contain

* information about the last reader that acquires the rwsem.

*

* That information may be helpful in debugging cases where the system

* seems to hang on a reader owned rwsem especially if only one reader

* is involved. Ideally we would like to track all the readers that own

* a rwsem, but the overhead is simply too big.

*

* Reader optimistic spinning is helpful when the reader critical section

* is short and there aren't that many readers around. It makes readers

* relatively more preferred than writers. When a writer times out spinning

* on a reader-owned lock and set the nospinnable bits, there are two main

* reasons for that.

*

* 1) The reader critical section is long, perhaps the task sleeps after

* acquiring the read lock.

* 2) There are just too many readers contending the lock causing it to

* take a while to service all of them.

*

* In the former case, long reader critical section will impede the progress

* of writers which is usually more important for system performance. In

* the later case, reader optimistic spinning tends to make the reader

* groups that contain readers that acquire the lock together smaller

* leading to more of them. That may hurt performance in some cases. In

* other words, the setting of nonspinnable bits indicates that reader

* optimistic spinning may not be helpful for those workloads that cause

* it.

*

* Therefore, any writers that had observed the setting of the writer

* nonspinnable bit for a given rwsem after they fail to acquire the lock

* via optimistic spinning will set the reader nonspinnable bit once they

* acquire the write lock. Similarly, readers that observe the setting

* of reader nonspinnable bit at slowpath entry will set the reader

* nonspinnable bits when they acquire the read lock via the wakeup path.

*

* Once the reader nonspinnable bit is on, it will only be reset when

* a writer is able to acquire the rwsem in the fast path or somehow a

* reader or writer in the slowpath doesn't observe the nonspinable bit.

*

* This is to discourage reader optmistic spinning on that particular

* rwsem and make writers more preferred. This adaptive disabling of reader

* optimistic spinning will alleviate the negative side effect of this

* feature.

*/

//RWSEM_READER_OWNED : 标记这把锁是否被reader持有,1读持锁;0写持锁

//RWSEM_RD_NONSPINNABLE : 是否允许reader执行乐观自旋逻辑,还是直接进阻塞状态

//RWSEM_WR_NONSPINNABLE : 是否允许writer执行乐观自旋逻辑,还是直接进阻塞状态

#define RWSEM_READER_OWNED        (1UL << 0)

#define RWSEM_RD_NONSPINNABLE        (1UL << 1)

#define RWSEM_WR_NONSPINNABLE        (1UL << 2)

 

#define RWSEM_NONSPINNABLE        (RWSEM_RD_NONSPINNABLE | RWSEM_WR_NONSPINNABLE)

#define RWSEM_OWNER_FLAGS_MASK    (RWSEM_READER_OWNED | RWSEM_NONSPINNABLE)

 

1.2 rwsem_waiter - 描述一个waiter

下面last_rowner成员,在5cfd92e12e13中被提交,但是在617f3ef95177中又被移除了,对应的patch如下:


 

struct rwsem_waiter {

    //用于将该结构挂在rwsem->wait_list链表上

    struct list_head list;

 

    //waiter的task_struct结构

    struct task_struct *task;

 

    //标记老子是一个reader还是writer

    enum rwsem_waiter_type type;

 

    //一个时间戳,waiter如果在链表上阻塞超过4ms,并且当前也已经是

    //链表最前面的一个waiter了,就设置HANDOFF标记防止被自旋者"偷锁"

    unsigned long timeout;

 

    //记录在自旋或睡眠前,也就是申请锁的时候锁的owner

    //上一次在rwsem_down_read_slowpath中,上一次读时候的锁的owner

    unsigned long last_rowner;

};

 

1.3 rwsem_waiter_type - 标记waiter是reader还是writer

enum rwsem_waiter_type {

    RWSEM_WAITING_FOR_WRITE,            //当前线程是一个writer

    RWSEM_WAITING_FOR_READ            //当前线程是一个reader

};

 

二、概念梳理

本章参考文档:

https://zhuanlan.zhihu.com/p/390107537

 

2.1 什么是乐观自旋?

举个例子,不带乐观自旋的锁,若A线程持有锁且正在运行,此时B线程想拿锁,发现锁正被A拿着,B将立即进入睡眠状态,在A线程释放锁后再去唤醒B线程来拿锁。

但是我们通常认为持有锁的A若是在运行的情况下,通常会很快的退出临界区并释放锁,那么B去睡眠则是不必要的,毕竟睡眠和唤醒也是有代价的,完全可以多等一会。那么当开启乐观自旋的机制后,若B拿不到锁,且发现锁的持有者A仍占用着cpu运行时,则不再去睡眠,而是像自旋锁一样进行自旋等待(实际是个for循环,和spin毛关系没有),直到A释放锁,但期间若A失去cpu(即p->on_cpu为0,可能是进runnable状态或者进入睡眠状态),那么B将不会继续傻傻的自旋等待,而是进入睡眠

 

2.2 什么是"偷锁"?

当锁支持乐观自旋机制时,那么就会存在这样的情况:wait_list中有一些线程在睡眠并等待被唤醒拿锁,同时还有一些线程不在wait_list中且不断的通过乐观自旋等锁,那么wait_list上的线程大概率是抢不过自旋拿锁的进程的,这是因为调度延时的存在,因为当wait_list上的线程被唤醒到真正的真正获得cpu运行,这期间可能已经过去很长时间,锁早就被自旋等锁的线程给"偷走"了。

既然"偷锁"有可能导致wait_list上的进程饥饿,那么为什么还要允许"偷锁"的发生呢?其实允许"偷锁"的行为可以算是对锁的一种性能优化,若是不允许"偷锁"那么下次拿锁的就一定是唤醒的wait_list上的等锁进程,但是从进程被唤醒到该进程真正获得cpu运行,这中间会存在一个时间差,即调度延时。且在这段时间内若还有其他进程一直在trylock尝试偷锁那么将不会偷成功,这些进程都会因此block住。由于调度延时某些场景下会比较久(例如整机高负载时,runqueue上的进程很多),所以还不如先将锁交给正在trylock的进程,也就是允许发生一次"偷锁"。

 

2.3 HANDOFF机制

因为乐观自旋"偷锁"逻辑的存在,导致正在乐观自旋的申请者很容易就能获得锁,而wait_list链表最前面的线程迟迟得不到锁,所以"偷锁"在一定程度的造成wait_list的等待者长时间"饥饿",如果任由"偷锁"下去,那么wait_list最前面的线程就会被饿死,严重时直接导致系统crash,所以"偷锁"要有个度,这个度就是:当wait_list最前面的线程阻塞时间超过一定的阈值(4ms),就将锁打上RWSEM_FLAG_HANDOFF标记,禁止乐观自旋"偷锁",这样在下一次锁交换时,就一定能够将锁交给wait_list链表最前面的线程

 

三、osq机制

涉及文件:

U:\linux-5.10.61\kernel\locking\osq_lock.c

U:\linux-5.10.61\include\linux\osq_lock.h

 

3.1 简介

osq(Optimistic spin queue),即乐观自旋队列,其工作原理示意图如下:

 

 

字如其名,Optimistic spin queue就是乐观自旋队列的意思,也就是形成一组处于自旋状态的任务队列。和等待队列wait_list不一样,这个队列中的任务都是当前正在执行的任务。Osq并没有直接将这些任务的task struct形成队列结构,而是把per-CPU的mcs lock对象串联形成队列。Mcs lock中有cpu number,通过这些cpu number可以定位到指定cpu上的current thread,也就定位到了自旋的任务。

 

虽然都是自旋,但是自旋方式并不一样。Osq队列中的头部节点是持有osq锁的,只有该任务处于对mutex的owner进行乐观自旋的状态(我们称之mutex乐观自旋)。Osq队列中的其他节点都是自旋在自己的mcs lock上(我们称之mcs乐观自旋)。当头部的mcs lock释放掉后(结束mutex乐观自旋,持有了mutex锁),它会将mcs lock传递给下一个节点,从而让spinner队列上的任务一个个的按顺序进入mutex的乐观自旋,从而避免了cache-line bouncing带来的性能开销。

 

3.2 数据结构

3.2.1 optimistic_spin_queue

struct optimistic_spin_queue {

    /*

     * Stores an encoded value of the CPU # of the tail node in the queue.

     * If the queue is empty, then it's set to OSQ_UNLOCKED_VAL.

     */

    atomic_t tail;

};

 

3.2.2 optimistic_spin_node

/*

* An MCS like lock especially tailored for optimistic spinning for sleeping

* lock implementations (mutex, rwsem, etc).

*/

struct optimistic_spin_node {

    struct optimistic_spin_node *next, *prev;

    int locked; /* 1 if lock acquired */

    int cpu; /* encoded CPU # + 1 value */

};

 

3.3 osq_lock

 

bool osq_lock(struct optimistic_spin_queue *lock)

{

    struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);

    struct optimistic_spin_node *prev, *next;

    int curr = encode_cpu(smp_processor_id());

    int old;

 

    node->locked = 0;

    node->next = NULL;

    node->cpu = curr;

 

    /*

     * We need both ACQUIRE (pairs with corresponding RELEASE in

     * unlock() uncontended, or fastpath) and RELEASE (to publish

     * the node fields we just initialised) semantics when updating

     * the lock tail.

     */

    old = atomic_xchg(&lock->tail, curr);

    if (old == OSQ_UNLOCKED_VAL)

        return true;

 

    prev = decode_cpu(old);

    node->prev = prev;

 

    /*

     * osq_lock()            unqueue

     *

     * node->prev = prev        osq_wait_next()

     * WMB                MB

     * prev->next = node        next->prev = prev // unqueue-C

     *

     * Here 'node->prev' and 'next->prev' are the same variable and we need

     * to ensure these stores happen in-order to avoid corrupting the list.

     */

    smp_wmb();

 

    WRITE_ONCE(prev->next, node);

 

    /*

     * Normally @prev is untouchable after the above store; because at that

     * moment unlock can proceed and wipe the node element from stack.

     *

     * However, since our nodes are static per-cpu storage, we're

     * guaranteed their existence -- this allows us to apply

     * cmpxchg in an attempt to undo our queueing.

     */

 

    /*

     * Wait to acquire the lock or cancelation. Note that need_resched()

     * will come with an IPI, which will wake smp_cond_load_relaxed() if it

     * is implemented with a monitor-wait. vcpu_is_preempted() relies on

     * polling, be careful.

     */

    if (smp_cond_load_relaxed(&node->locked, VAL || need_resched() ||

                 vcpu_is_preempted(node_cpu(node->prev))))

        return true;

 

    /* unqueue */

    /*

     * Step - A -- stabilize @prev

     *

     * Undo our @prev->next assignment; this will make @prev's

     * unlock()/unqueue() wait for a next pointer since @lock points to us

     * (or later).

     */

 

    for (;;) {

        /*

         * cpu_relax() below implies a compiler barrier which would

         * prevent this comparison being optimized away.

         */

        if (data_race(prev->next) == node &&

         cmpxchg(&prev->next, node, NULL) == node)

            break;

 

        /*

         * We can only fail the cmpxchg() racing against an unlock(),

         * in which case we should observe @node->locked becomming

         * true.

         */

        if (smp_load_acquire(&node->locked))

            return true;

 

        cpu_relax();

 

        /*

         * Or we race against a concurrent unqueue()'s step-B, in which

         * case its step-C will write us a new @node->prev pointer.

         */

        prev = READ_ONCE(node->prev);

    }

 

    /*

     * Step - B -- stabilize @next

     *

     * Similar to unlock(), wait for @node->next or move @lock from @node

     * back to @prev.

     */

 

    next = osq_wait_next(lock, node, prev);

    if (!next)

        return false;

 

    /*

     * Step - C -- unlink

     *

     * @prev is stable because its still waiting for a new @prev->next

     * pointer, @next is stable because our @node->next pointer is NULL and

     * it will wait in Step-A.

     */

 

    WRITE_ONCE(next->prev, prev);

    WRITE_ONCE(prev->next, next);

 

    return false;

}

 

3.3.1 encode_cpu

/*

* We use the value 0 to represent "no CPU", thus the encoded value

* will be the CPU number incremented by 1.

*/

static inline int encode_cpu(int cpu_nr)

{

    return cpu_nr + 1;

}

 

3.3.2 node_cpu

static inline int node_cpu(struct optimistic_spin_node *node)

{

    return node->cpu - 1;

}

 

3.3.3 decode_cpu

static inline struct optimistic_spin_node *decode_cpu(int encoded_cpu_val)

{

    int cpu_nr = encoded_cpu_val - 1;

 

    return per_cpu_ptr(&osq_node, cpu_nr);

}

 

3.4 osq_unlock

void osq_unlock(struct optimistic_spin_queue *lock)

{

    struct optimistic_spin_node *node, *next;

    int curr = encode_cpu(smp_processor_id());

 

    /*

     * Fast path for the uncontended case.

     */

    if (likely(atomic_cmpxchg_release(&lock->tail, curr,

                     OSQ_UNLOCKED_VAL) == curr))

        return;

 

    /*

     * Second most likely case.

     */

    node = this_cpu_ptr(&osq_node);

    next = xchg(&node->next, NULL);

    if (next) {

        WRITE_ONCE(next->locked, 1);

        return;

    }

 

    next = osq_wait_next(lock, node, NULL);

    if (next)

        WRITE_ONCE(next->locked, 1);

}

 

 

 

 

 

 

 

 

 

四、乐观自旋

注意:这里说的自旋并不是靠自旋锁实现的,而是在一个for循环中不停的轮训实现的

 

4.1 rwsem_can_spin_on_owner - 判断当前场景是否可以自旋

是否允许自旋是可以通过CONFIG_RWSEM_SPIN_ON_OWNER宏控制的,当开启该宏时,下面场景不允许自旋:

a) 如果申请锁的人在下一个调度时机就会被调度出去,那么就没必要自旋了

b) 如果锁已经被打上nospinable标签(禁止自旋"偷锁"),也没必要自旋,直接挂入链表进阻塞状态

c) 如果是writer在临界区,但是这个writer拿锁不干活,即当前不是正在运行的状态,可能是runnable或者已经进入阻塞状态,但是不管是哪种,都表示这个writer短时间内是完不成了,此时也不用自旋

 

情景分析:如果当前reader或者writer在申请锁,并且锁未被打上nospinable标签,则有:

a) 如果临界区没人,则自旋

b) 如果reader在临界区,则自旋,因为我们认为reader一般都是很快就会退出临界区

c) 如果是writer在临界区,并且正在running,则自旋

d) 如果是writer在临界区,但是当前writer是处于runnable状态,则不可以自旋

 

传入的nonspinable参数取决于锁的申请者是reader还是writer:

a) 如果是reader,则从rwsem_down_read_slowpath中传入RWSEM_RD_NONSPINNABLE

b) 如果是writer,则从rwsem_down_write_slowpath中传入RWSEM_WR_NONSPINNABLE

 

注意:这里判断是否可以自旋时,并不知道调用者是reader还是writer,也就是说所有读或者写都是有可能自旋的

#ifdef CONFIG_RWSEM_SPIN_ON_OWNER

static inline bool rwsem_can_spin_on_owner(

            struct rw_semaphore *sem,

            unsigned long nonspinnable)        //锁的申请者是读者自旋还是写者自旋

{

    struct task_struct *owner;

    unsigned long flags;

    bool ret = true;

 

    //1.如果正在申请锁的线程current在下一个调度时机即将调度出去,那么也没有必要自旋了

    if (need_resched()) {

        lockevent_inc(rwsem_opt_fail);

        return false;

    }

 

    preempt_disable();

    rcu_read_lock();

 

    //2.代码走到这里,表示申请锁的这个线程一时半会是不会调度出去的,此时就需要判断

    // 是不是需要自旋等待了,而一个线程是否需要自旋等待取决于当前正在持锁的这个线程

    // 下面剥离owner的flags和task_struct字段,分别保存在flags和owner变量中

    owner = rwsem_owner_flags(sem, &flags);

 

    /*

     * Don't check the read-owner as the entry may be stale.

     */

    //3.下面场景也不允许自旋

    // a) 锁已经被打上nonspinnable标签(已经禁止自旋"偷锁"),那就不用自旋

    // b) 锁虽然没有被打上nonspinnable标签,但是锁已经被writer持有,

    // 但是writer当前不是处于running状态,可能是runnable或者已

    // 经进入阻塞状态,但是不管是那种状态,都表示这个writer短时间

    // 不会释放锁(runnable表示至少一个调度时间片都没有完成,也表

    // 明短时间不会释放锁)

 

    // 注意:当锁没有被打上nonspinnable标签时,并且满足下面两种情况允许自旋

    // a) 如果是reader在临界区的话,后面来的不管是reader还是writer,

    // 都可以自旋一会,为啥捏???

    // 这主要考虑reader很快就能结束,自旋一会可能很快就能拿到锁

    // b) 如果是writer在临界区的话,并且这个writer当前正在running

    // 那么也会返回true,笔者感觉这里有赌的成分,因为内核简单的认

    // 为只有在runnable或阻塞状态的writer,才表示这个writer短时

    // 间之内不会释放锁,那么既然writer很快能退出,那么我们也等一会把

    if ((flags & nonspinnable) ||

     (owner && !(flags & RWSEM_READER_OWNED) && !owner_on_cpu(owner)))

        ret = false;

    rcu_read_unlock();

    preempt_enable();

 

    lockevent_cond_inc(rwsem_opt_fail, !ret);

    return ret;

}

#else

static inline bool rwsem_can_spin_on_owner(

            struct rw_semaphore *sem,

            unsigned long nonspinnable)

{

    return false;

}

#endif

 

4.1.1 rwsem_owner_flags - 剥离owner的flags和task_struct字段

/*

* Return the real task structure pointer of the owner and the embedded

* flags in the owner. pflags must be non-NULL.

*/

static inline struct task_struct * rwsem_owner_flags(

            struct rw_semaphore *sem,

            unsigned long *pflags)

{

    unsigned long owner = atomic_long_read(&sem->owner);

 

    //1.返回低3bit

    *pflags = owner & RWSEM_OWNER_FLAGS_MASK;

 

    //2.屏蔽掉低3bit,返回真正的task_struct结构

    return (struct task_struct *)(owner & ~RWSEM_OWNER_FLAGS_MASK);

}

 

4.1.2 owner_on_cpu - 判读持锁者owner当前是不是正在running

如果持锁者处于runnable状态,或者已经进入阻塞状态,或者这个持锁者所在的cpu已经被抢占了,则返回false

static inline bool owner_on_cpu(struct task_struct *owner)

{

    /*

     * As lock holder preemption issue, we both skip spinning if

     * task is not on cpu or its cpu is preempted

     */

    //第一个条件表示持锁者当前正在running,不是出于runnable和阻塞状态

    //第二个条件对arm64来说始终为false,我们暂不深究

    return owner->on_cpu && !vcpu_is_preempted(task_cpu(owner));

}

 

4.2 rwsem_optimistic_spin - 乐观自旋流程

返回值:拿到锁后返回true

注意:自旋期间,trace上的表现还是running

 

关于下面对rt线程的优化操作参见下面patch

 

乐观自旋逻辑如下:

#ifdef CONFIG_RWSEM_SPIN_ON_OWNER

static bool rwsem_optimistic_spin(

            struct rw_semaphore *sem,

            bool wlock)                //指示申请锁的人是reader还是writer,

{                                        //readr传false,writer传true

    bool taken = false;

    int prev_owner_state = OWNER_NULL;

    int loop = 0;

    u64 rspin_threshold = 0;

 

    //1.wlock标记申请锁的人是想以读的身份还是写的身份进入临界区,

    // 申请者要读临界区传false,要写临界区传true,下面转换标记位

    unsigned long nonspinnable = wlock ?

                RWSEM_WR_NONSPINNABLE : RWSEM_RD_NONSPINNABLE;

 

    preempt_disable();

 

    /* sem->wait_lock should not be held when doing optimistic spinning */

    //2.标记乐观自旋开始

    if (!osq_lock(&sem->osq))

        goto done;

 

    /*

     * Optimistically spin on the owner field and attempt to acquire the

     * lock whenever the owner changes. Spinning will be stopped when:

     * 1) the owning writer isn't running; or

     * 2) readers own the lock and spinning time has exceeded limit.

     */

    //3.乐观自旋外循环,当出现下面情况时,退出自旋

    // a) writer持锁,但是writer并没有在运行,而是已经进入阻塞状态

    // b) reader持锁,但是申请锁的人是writer,并且这个writer已经自旋超时

    //注意:只要没有超时,当reader在临界区时,后来申请锁的人就可以一直等下去

    for (;;) {

        enum owner_state owner_state;

 

        //3.1 内循环,顾名思义,围着这个owner转,直到owner或者flags发生变化

        // 才会退出小循环,实际上,当rwsem_spin_on_owner围着owner转时,这

        // 个owner一定是writer,其他情况该函数直接返回。函数返回值可为:

        // a) OWNER_NULL: 没人在临界区

        // b) OWNER_WRITER: writer在临界区

        // c) OWNER_READER: reader在临界区

        // d) OWNER_NONSPINNABLE: 锁已经被打上nospin标记

 

        // OWNER_SPINNABLE定义为(OWNER_NULL | OWNER_WRITER | OWNER_READER)

        // 也就是说,当临界区没人、写在临界区、读在临界区时,都是有可能自旋的

        // 下面的逻辑表示,一旦发现锁已经被打上nospin标记(禁止自旋),就直接

        // 退出大循环,并返回false,自旋拿锁失败,后面将会进入睡眠状态

        owner_state = rwsem_spin_on_owner(sem, nonspinnable);

        if (!(owner_state & OWNER_SPINNABLE))

            break;

 

        //3.2 代码走到这里,这把锁一定没有被打上nospinable标记,此时的临界区要么

        // 没人,要么是reader或者writer,此时需要进一步判断是否需要继续自旋

 

        /*

         * Try to acquire the lock

         */

        //3.3 执行"偷锁"逻辑

        // 从上面小循环中返回,代码走到这里有下面三种情况

        // a) OWNER_NULL: 临界区没人,那么可以在这里尝试获取锁

        // b) OWNER_READER: reader在临界区,此时如果申请锁的人也是reader的话,

        // 那么可以直接进入临界区

        // c) OWNER_WRITER: writer在临界区,在经过上面rwsem_spin_on_owner中

        // 返回,表示有可能owner或者owner的flags发生了变化,

        // 下面尝试获取锁

        //注意:

        // 如果wait_list链表上有线程阻塞着,则这些阻塞的线程肯定是比这里正在自旋的

        // 申请者要先申请锁,但是在这里,后来的申请者却通过自旋的方式却先拿到了锁,

        // 就像是后来的申请者插队了,这公平吗?这就是传说中的"偷锁"!!!

 

        //注意:

        // 当一个reader在申请锁的时候,不管临界区时reader还是writer,即使这把锁

        // 被打上了RWSEM_FLAG_HANDOFF标记,如果reader在这里"偷锁"失败,reader

        // 也会继续自旋下去,直到在上面rwsem_spin_on_owner中发现这把锁已经被打上

        // nonspinable标记时(临界区reader已经撑爆,或者有一个writer已经自旋超时

        // 了),才会结束自旋

        taken = wlock ? rwsem_try_write_lock_unqueued(sem)

                         : rwsem_try_read_lock_unqueued(sem);

        if (taken)

            break;

 

        /*

         * Time-based reader-owned rwsem optimistic spinning

         */

        //3.4 下面的逻辑表示当前是writer在自旋申请锁,但是当前是reader在临界区,

        // 因为我们认为reader一般很快会退出临界区,所以此时writer会再等一会,

        // 但是需要注意的是,由上面对rwsem_try_read_lock_unqueued的分析可知,

        // 在writer等待期间,当reader在临界区并且锁没有被打上RWSEM_FLAG_HANDOFF

        // 标记时,那么后来的reader会通过自旋"偷锁"的方式源源不断的进入临界区,

        // 也就是说在writer自旋期间,可能会不断的有reader插队,这样writer不就

        // 永远拿不到锁了吗?为了确保writer不被饿死,不能一直让reader源源不断的

        // 进入临界区吧!解决办法就是禁止reader自旋。

        //注意:

        // 这里将锁的RWSEM_RD_NONSPINNABLE和RWSEM_WR_NONSPINNABLE位全部置位,

        // 因为此时writer已经长时间自旋都没有拿到锁了,全部置位主要有两个原因:

        // i) 设置RWSEM_WR_NONSPINNABLE,告诉后来的writer不要做无意义的

        // 尝试了,前辈已翻车你就不要自旋了,直接进sleep状态

        // ii) 设置RWSEM_RD_NONSPINNABLE,后来的reader也不不会再执行自旋

        // 的逻辑,也就不会再出现reader源源不断进入临界区的情况,从源头

        // 上防止reader的临界区越来越大

        if (wlock && (owner_state == OWNER_READER)) {

            /*

             * Re-initialize rspin_threshold every time when

             * the owner state changes from non-reader to reader.

             * This allows a writer to steal the lock in between

             * 2 reader phases and have the threshold reset at

             * the beginning of the 2nd reader phase.

             */

            //3.5 在reader刚进临界区的时候计算writer自旋的阈值

            // (这个writer在上一次循环时,临界区还不是reader(可能没人,

            // 也可能是writer在临界区),但是这次循环却发现当前临界区已经是

            // reader了,说明这些reader是刚被放进来的)

            if (prev_owner_state != OWNER_READER) {

                //3.5.1 在重新设置阈值之前,再次检查锁是否被打上了nospinnable

                // 标记,如果已经被打上了nonspinnable标记,则表示已经禁止

                // writer自旋,此时直接结束自旋,(注意:此时nonspinnable

                // 为RWSEM_WR_NONSPINNABLE)

 

                // reader在临界区时,什么时候会被打上nospin标记呢?

                // a) 临界区已经有很多reader了,已经溢出了

                // b) 有一个writer已经自旋等了很长时间,再等就饿死了

                if (rwsem_test_oflags(sem, nonspinnable))

                    break;

 

                //3.5.2 重新计算自旋的时间

                rspin_threshold = rwsem_rspin_threshold(sem);

                loop = 0;

            }

 

            /*

             * Check time threshold once every 16 iterations to

             * avoid calling sched_clock() too frequently so

             * as to reduce the average latency between the times

             * when the lock becomes free and when the spinner

             * is ready to do a trylock.

             */

            //3.6 代码如果走到这个分支,则表示这个writer在上一次循环时发现临界区时reader,

            // 这次循环发现临界区还是reader,因为当reader在临界区时,后续申请锁的reader

            // 是可以直接进入临界区的(只要临界区的reader没有被撑爆,并且锁没有被置为

            // RWSEM_FLAG_HANDOFF标记,详见rwsem_try_read_lock_unqueued),也就是说,

            // 当这个writer在这里自旋时,可能会源源不断的有新的reader进入临界区,如

            // 此下去的话,writer会一直自旋下去也拿不到锁,这显然不合理,所以我们在这

            // 里设置了一个阈值,当writer自旋一段时间后没有拿到锁,就结束自旋,返回false

 

            //注意到:

            // writer在自旋超期拿锁失败退出时,将锁的nospin标记置位,这是为啥捏?

            // 试想,一旦nospin标记被置位,那么后来的申请锁的人(不管是reader还

            // 是writer),都不会自旋了,而是直接进入D状态。为啥要这么做捏?

            // 这还是防止这个writer饿死!试想,只要允许自旋,后来的reader就有可

            // 能在自旋期间拿到锁直接进入临界区(只要临界区的reader没有被撑爆,并

            // 且锁没有被置为RWSEM_FLAG_HANDOFF标记,详见rwsem_try_read_lock_unqueued),

            // 要知道wait_list上阻塞的线程是先申请锁的啊,这里通过自旋拿锁不就相

            // 当于插队一样吗,这对于wait_list上阻塞的线程显然是不公平的,因为这

            // 样wait_list链表上的writer就会越等越久,因为前面不断有人插队,这

            // 样writer不就饿死了吗!所以,这里为了防止writer饿死,在进D状态之

            // 前,关闭插队机制,即对这把锁打上nospin标记

 

            // 换句话说,只要wait_list上有writer,后来的申请者,不管是reader

            // 还是writer,都一定不能不能自旋

 

            // 另外一个注意点,下面每循环16次检查一次自旋时间是否已经超过预期限制

            // 的值,这主要是为了防止频繁的调用sched_clock

            else if (!(++loop & 0xf) && (sched_clock() > rspin_threshold)) {

                rwsem_set_nonspinnable(sem);

                lockevent_inc(rwsem_opt_nospin);

                break;

            }

        }

 

        /*

         * An RT task cannot do optimistic spinning if it cannot

         * be sure the lock holder is running or live-lock may

         * happen if the current task and the lock holder happen

         * to run in the same CPU. However, aborting optimistic

         * spinning while a NULL owner is detected may miss some

         * opportunity where spinning can continue without causing

         * problem.

         *

         * There are 2 possible cases where an RT task may be able

         * to continue spinning.

         *

         * 1) The lock owner is in the process of releasing the

         * lock, sem->owner is cleared but the lock has not

         * been released yet.

         * 2) The lock was free and owner cleared, but another

         * task just comes in and acquire the lock before

         * we try to get it. The new owner may be a spinnable

         * writer.

         *

         * To take advantage of two scenarios listed agove, the RT

         * task is made to retry one more time to see if it can

         * acquire the lock or continue spinning on the new owning

         * writer. Of course, if the time lag is long enough or the

         * new owner is not a writer or spinnable, the RT task will

         * quit spinning.

         *

         * If the owner is a writer, the need_resched() check is

         * done inside rwsem_spin_on_owner(). If the owner is not

         * a writer, need_resched() check needs to be done here.

         */

        //3.7 关于rt线程的优化

        // 由上面的注释可知,当rt线程和持锁的线程运行在同一个cpu上时,可能会

        // 产生live-lock,(笔者在这里把他翻译为"活锁")。那么什么是"活锁"呢?

        // 试想,在同一个cpu上,一个普通的cfs线程先拿到锁进入临界区,此时一个

        // rt线程来申请锁,那么这个rt线程必然会打断这个cfs线程,只要这个rt

        // 在自旋,那么这个cfs线程就永远得不到运行,也就永远无法释放锁,这就是

        // "活锁"

        // 所以,这里理论上只要探测到时rt线程就应该立刻返回,实际上原来的代码就

        // 是这样的逻辑,修改前代码如下,在990fa7384a305这笔提交做了修改,因为

        // 作者Waiman Long通过实验发现,让rt线程在这里多尝试一次是可以提升性能,

        // 具体参见上面的patch

        // if (!sem->owner && (need_resched() || rt_task(current)))

        // break;

        // 下面我们来分析一下这段代码的具体逻辑,从哪看出是让rt线程"再试一次"的呢

 

        // 实际上,如果申请锁的人是rt的话,代码走到这里时,owner_state一定不是

        // OWNER_WRITER,只能是OWNER_NULL或者OWNER_READER。为啥捏,我们先假设

        // owner_state是OWNER_WRITER,看看会发生什么情况:此时临界区时writer,

        // 并且这个rt线程和这个writer是在同一个cpu上,那么这个rt线程一定会抢

        // 这个writer的cpu资源,导致这个writer的owner_on_cpu为false,所以在

        // 上面的rwsem_spin_on_owner中就会返回OWNER_NONSPINNABLE,所以代码根本

        // 不会走到这里!!!所以,对于申请锁的人如果是rt的话,代码走到这里owner_state

        // 一定是OWNER_NULL或者OWNER_READER

 

        // 代码走到这里,这个rt线程还是没抢到锁,有下面三个可能

        // a) owner_state为OWNER_NULL的第一种情况:当前持锁者正在释放锁,sem->owner

        // 已经被设置为NULL(注意:只有up_writer中才会调用rwsem_clear_owner

        // 接口将owner设置为NULL,reader退出临界区时并不会设置owner),导致上

        // 面rwsem_spin_on_owner中返回的是OWNER_NULL,但锁还没有完全被释放

        // (这里说的没有完全释放的意思是,在up_writer释放锁的时候,锁的

        // RWSEM_FLAG_HANDOFF标记并没有清除,所以下一个会将锁交给wait_list最

        // 前面的那个线程,到时我们在上面"偷锁"的逻辑中也获取不到锁)

        // b) owner_state为OWNER_NULL的另一种情况:虽然锁已经被完全释放了,sem->owner

        // 已经被设置为NULL,并且RWSEM_FLAG_HANDOFF标记也已经被清除了,但是因

        // 为当前在其他cpu上还可能存在其他线程也在自旋抢这把锁,因为上面"偷锁"

        // 逻辑本质上也是atomic操作,所以在上面"偷锁"逻辑中,很可能这个rt线程

        // 没有抢过其他cpu上的线程(即使其他cpu上的线程时cfs线程)

        // c) 而如果owner_state为OWNER_READER的话,则表示当前临界区是reader。那

        // 么如果这个rt线程是reader,(临界区的reader没有被撑爆,并且锁没有被置

        // 为RWSEM_FLAG_HANDOFF标记,后续的reader可以直接进入临界区),此时这个

        // rt线程是可以直接进入临界区的,此时代码不会走到这里,因为在上面"偷锁"

        // 的位置就拿到锁退出了,所以这种情况也是不可能的;如果这个rt是writer,

        // 则只能自旋。

 

        // prev_owner_state记录的是这个rt线程上一次自旋时临界区的状态,

        // owner_state记录的是本次自旋时临界区的状态。由上面分析可知,rt

        // 线程走到这里,"owner_state != OWNER_WRITER"一定成立,但是

        // "owner_state != OWNER_WRITER"成立并不表示这个申请锁的线程是rt

 

        // 如果一个rt线程连续两次走到这里,则表示其已经自旋了两次了,如果自旋

        // 了两次owner_state状态都没有变化,则停止rt的自旋

 

        if (owner_state != OWNER_WRITER) {

            //3.7.1 如果当前正在申请锁的线程马上要调度出去了,则返回false,停止自旋

            if (need_resched())

                break;

 

            //3.7.2 如图rt自旋两次,owner_state都没有发生变化,则停止自旋

            if (rt_task(current) &&

             (prev_owner_state != OWNER_WRITER))

                break;

        }

        prev_owner_state = owner_state;

 

        /*

         * The cpu_relax() call is a compiler barrier which forces

         * everything in this loop to be re-loaded. We don't need

         * memory barriers as we'll eventually observe the right

         * values at the cost of a few extra spins.

         */

        //3.8 原来这就是他妈的自旋?和spin没有半毛钱关系

        cpu_relax();

    }

 

    //4.标记自旋结束

    osq_unlock(&sem->osq);

done:

    preempt_enable();

    lockevent_cond_inc(rwsem_opt_fail, !taken);

 

    //5.乐观自旋拿到锁后返回true,否则返回false

    return taken;

}

 

#else

static inline bool rwsem_optimistic_spin(struct rw_semaphore *sem, bool wlock)

{

    return false;

}

#endif

 

4.2.1 rwsem_try_read_lock_unqueued - reader尝试"偷锁"

reader在乐观自旋期间,通过执行该函数"偷锁"。需要注意的是,因为正在执行自旋逻辑的不仅仅是你一个reader,可能还有其他的reader,也可能有其他的writer,他们同时执行"偷锁"逻辑,那么这把锁会被谁"偷"走就不一定了(谁执行atomic操作能成功要看运气),如果这把锁已经被writer"偷"走(或者这把锁原本就是被writer持有的),或者锁已经被打上了RWSEM_FLAG_HANDOFF标记(禁止"偷锁"),那么这个reader在这里就"偷锁"失败了。

所以:当reader在临界区,只要临界区没有被reader撑爆,并且锁没有被打上RWSEM_FLAG_HANDOFF标记,后来的reader是可以直接进入临界区的

/*

* Try to acquire read lock before the reader is put on wait queue.

* Lock acquisition isn't allowed if the rwsem is locked or a writer handoff

* is ongoing.

*/

static inline bool rwsem_try_read_lock_unqueued(struct rw_semaphore *sem)

{

    long count = atomic_long_read(&sem->count);

 

    //1.下面两种场景reader是"偷"不到锁的

    // a) 有一个writer在临界区

    // b) 锁已经被打上RWSEM_FLAG_HANDOFF标记,禁止"偷锁"

    if (count & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))

        return false;

 

    //2.代码走到这里,临界区可能没人或是reader,并且当前是允许"偷锁"的

    // rwsem允许多个reader同时进入临界区,count字段记录的当前有多少个

    // reader在临界区,下面对count执行加1标记拿锁。但是在这个原子操作

    // 的过程中这把锁还是有可能被其他cpu上的其他线程拿走的,所以这里要判断

    // 下面原子操作的返回值是加之前的值,所以这里需要注意,即使原子加完之后,

    // 这个reader也是有可能拿不到锁的,此时函数返回false

    count = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS, &sem->count);

    if (!(count & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {

        rwsem_set_reader_owned(sem);

        lockevent_inc(rwsem_opt_rlock);

        return true;

    }

 

    /* Back out the change */

    //3.拿锁失败,说明在执行原子操作期间,存在下面两种情况导致拿锁失败,

    // 此时需要将count重新减回去

    // a) 原子操作期间,锁被另一个cpu上的一个writer给抢了

    // c) 原子操作期间,锁突然被打上RWSEM_FLAG_HANDOFF标记,禁止偷锁

    atomic_long_add(-RWSEM_READER_BIAS, &sem->count);

    return false;

}

 

4.2.2 rwsem_try_write_lock_unqueued - writer尝试"偷锁"

执行自旋逻辑的writer在这里尝试"偷锁"进入临界区

/*

* Try to acquire write lock before the writer has been put on wait queue.

*/

static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem)

{

    long count = atomic_long_read(&sem->count);

 

    //1.下面任一条件,writer在这里都会"偷锁"失败

    // a) 临界区有人(RWSEM_LOCK_MASK标记被置位),因为对于writer来说,

    // 只要临界区有人,writer都不能进入临界区

    // b) 锁已经被打上RWSEM_FLAG_HANDOFF标记,禁止"偷锁"

    while (!(count & (RWSEM_LOCK_MASK|RWSEM_FLAG_HANDOFF))) {

        //2.进入该循环表示当前临界区没人,并且当前还是允许"偷锁"的

        // writer拿锁就是通过原子操作将count的RWSEM_WRITER_LOCKED标记置位,

        // 当然,这个原子操作拿锁的过程,也有可能其他cpu上的线程拿走,下面原子

        // 操作返回true表示xchg操作真的成功,也就是说这个writer已经拿到锁了;

        // 如果原子操作返回false,则表示锁被其他cpu上的其他线程拿走了,此时继

        // 续while循环判断,如果真的是锁被别人拿走了,就返回false拿锁失败

        // 注意:每次原子操作,count变量都会被修改

        if (atomic_long_try_cmpxchg_acquire(&sem->count, &count,

                    count | RWSEM_WRITER_LOCKED)) {

            rwsem_set_owner(sem);

            lockevent_inc(rwsem_opt_wlock);

            return true;

        }

    }

    return false;

}

 

4.2.3 rwsem_rspin_threshold - 计算当reader在临界区时,writer自旋的阈值

当writer申请锁的时候,如果发现临界区是reader,writer就会自旋等待,但是因为当前是reader在临界区,可能会有其他的reader源源不断的进入临界区(如果临界区的reader没有被撑爆,并且锁没有被置为RWSEM_FLAG_HANDOFF标记,后来的reader可以直接进入临界区的),这将导致reader的临界区不断扩大,但是writer总不能一直自旋下去吧!所以当我们发现writer自旋的时间超过一定的阈值时,就会做下面两件事:

a) writer结束自旋,之后便会进入睡眠状态

b) 顺便将锁的RWSEM_RD_NONSPINNABLE和RWSEM_WR_NONSPINNABLE位全部置位,因为此时writer已经长时间自旋都没有拿到锁了,这么做主要有两个原因:

i) 设置RWSEM_WR_NONSPINNABLE,告诉后来的writer不要做无意义的尝试了,不要自旋了,直接进sleep状态

ii) 设置RWSEM_RD_NONSPINNABLE,后来的reader也不不会再执行自旋的逻辑,也就不会再出现reader源源不断进入临界区的情况,从源头上防止reader的临界区越来越大

 

而rwsem_rspin_threshold要完成的工作就是计算这个阈值,writer最多允许自旋多长时间。显然,这个阈值的大小和当前临界区reader的个数相关。

/*

* Calculate reader-owned rwsem spinning threshold for writer

*

* The more readers own the rwsem, the longer it will take for them to

* wind down and free the rwsem. So the empirical formula used to

* determine the actual spinning time limit here is:

*

* Spinning threshold = (10 + nr_readers/2)us

*

* The limit is capped to a maximum of 25us (30 readers). This is just

* a heuristic and is subjected to change in the future.

*/

static inline u64 rwsem_rspin_threshold(struct rw_semaphore *sem)

{

    long count = atomic_long_read(&sem->count);

    int readers = count >> RWSEM_READER_SHIFT;

    u64 delta;

 

    if (readers > 30)

        readers = 30;

    delta = (20 + readers) * NSEC_PER_USEC / 2;

 

    return sched_clock() + delta;

}

 

4.3 rwsem_spin_on_owner - 围着writer转

要点:只要writer一直在运行,小循环就不会退出,一直围着这个writer转,当writer的task或flag发生变化时,退出小循环

 

后面申请锁的人(可能是reader,也可能是writer),在进入这个小循环之前,会先经过判断,下面几个情况根本就不需要进入小循环:

a) 没人在临界区:这时候后来的申请者不需要自旋就能拿到锁,返回小循环后在大循环里先尝试拿锁,如果拿不到的话则继续自旋

b) reader在临界区:如果申请锁的人是reader,因为多个reader可以直接进入临界区(临界区的reader没有被撑爆,并且锁没有被置为RWSEM_FLAG_HANDOFF标记),退出小循环在大循环里面尝试拿锁,如果拿不到锁的话则继续自旋;如果申请锁的人是writer,此时也返回,在大循环中判断是否自旋超时了。

c) 锁被打上了nonspinable标记:显式打上了该标签,还自旋个毛啊,结束整个自旋流程

 

当进入了小循环时,此时临界区一定为writer,下面情况会退出小循环

a) 如果owner或flag发生变化(例如task发生了变化,或者同一个task从writer变成了reader)

b) 如果writer进入runnable或者进入阻塞状态(writer短时间之内是不能退出了)

 

static noinline enum owner_state rwsem_spin_on_owner(

            struct rw_semaphore *sem,

            unsigned long nonspinnable)    //可为RWSEM_WR_NONSPINNABLE或RWSEM_RD_NONSPINNABLE

{

    struct task_struct *new, *owner;

    unsigned long flags, new_flags;

    enum owner_state state;

 

    //1.由rwsem_owner_state的实现可知,当返回的state不为OWNER_WRITER时,有下面几种可能

    // a) 没人在临界区,这时候后来的申请者不需要自旋就能拿到锁,退出小循环在大循环中尝试拿锁

    // b) reader在临界区,因为多个reader可以直接进入临界区,退出小循环在大循环中尝试拿锁

    // c) 锁被打上了nonspinable标记,显式打上了该标签,还自旋个毛啊

    owner = rwsem_owner_flags(sem, &flags);

    state = rwsem_owner_state(owner, flags, nonspinnable);

 

    if (state != OWNER_WRITER)

        return state;

 

    //2.代码都到这里说明这把锁是被writer持有,也就是说spin on owner一定是围着writer转

    // 下面这个循环实际是在等锁切换writer,当writer发生切换时退出这里的循环

    rcu_read_lock();

    for (;;) {

        /*

         * When a waiting writer set the handoff flag, it may spin

         * on the owner as well. Once that writer acquires the lock,

         * we can spin on it. So we don't need to quit even when the

         * handoff bit is set.

         */

        //3.由下面的逻辑可知,下面两种情况都会被认为是切换了writer,退出小循环

        // a) 持锁的task发生变化

        // b) 持锁的task没有变,但是flag发生了变化,例如nonspinable标记发生了变化

        // 注意: 这里只是返回,并不是已经拿到到锁了,是否需要继续自旋由新的持锁者决定

        new = rwsem_owner_flags(sem, &new_flags);

        if ((new != owner) || (new_flags != flags)) {

            state = rwsem_owner_state(new, new_flags, nonspinnable);

            break;

        }

 

        /*

         * Ensure we emit the owner->on_cpu, dereference _after_

         * checking sem->owner still matches owner, if that fails,

         * owner might point to free()d memory, if it still matches,

         * the rcu_read_lock() ensures the memory stays valid.

         */

        barrier();

 

        //4.代码走到这里说明持锁的writer还没有释放锁,状态也没变,

        // 但是满足下面条件,也不需要继续自旋了

        // a) 锁的申请者即将调度出去,(自己都要挂了,还自旋个毛啊)

        // b) 或者当前持锁的writer不是正在running,可能是处于runnable

        // 状态,或者是进入D状态了。真是舔狗的宿命...

        if (need_resched() || !owner_on_cpu(owner)) {

            state = OWNER_NONSPINNABLE;

            break;

        }

 

        //5.代码走到这里表示writer还没有从临界区出来,并且当前还在运行,那么就再等一下吧

        cpu_relax();

    }

    rcu_read_unlock();

 

    return state;

}

 

4.3.1 owner_state - 描述临界区的状态

临界区的状态由下面4种

OWNER_NULL: 没人在临界区

OWNER_READER : reader在临界区

OWNER_WRITER : writer在临界区

OWNER_NONSPINNABLE : 不允许自旋

 

/*

* The rwsem_spin_on_owner() function returns the folowing 4 values

* depending on the lock owner state.

* OWNER_NULL : owner is currently NULL

* OWNER_WRITER: when owner changes and is a writer

* OWNER_READER: when owner changes and the new owner may be a reader.

* OWNER_NONSPINNABLE:

*         when optimistic spinning has to stop because either the

*         owner stops running, is unknown, or its timeslice has

*         been used up.

*/

enum owner_state {

    OWNER_NULL                = 1 << 0,            //没人在临界区

    OWNER_WRITER            = 1 << 1,            //写在临界区

    OWNER_READER            = 1 << 2,            //读在临界区

    OWNER_NONSPINNABLE        = 1 << 3,            //owner结束自旋

};

 

//由下面的注释可知,当临界区有reader, writer或者根本就没人在临界区的时候,都有可能使申请者自旋

#define OWNER_SPINNABLE        (OWNER_NULL | OWNER_WRITER | OWNER_READER)

 

4.3.2 rwsem_owner_state - 获取临界区的状态

临界区的状态是由owner成员的task_struct和flags字段共同决定,判断流程如下:

a) 先将owner成员的task_struct和flags字段剥离

b) 再根据上面剥离的字段判断临界区的状态

static noinline enum owner_state rwsem_spin_on_owner(

            struct rw_semaphore *sem,

            unsigned long nonspinnable)

{

    struct task_struct *new, *owner;

    unsigned long flags, new_flags;

    enum owner_state state;

 

    owner = rwsem_owner_flags(sem, &flags);

    state = rwsem_owner_state(owner, flags, nonspinnable);

    if (state != OWNER_WRITER)

        return state;

    ...

}

 

只要owner自旋没有被显式标记为nonspinable(bit1和bit2被置一),都是有可能产生自旋的

static inline enum owner_state rwsem_owner_state(

            struct task_struct *owner,

            unsigned long flags,            //owner成员的低3bit的flags

            unsigned long nonspinnable)    //可为RWSEM_WR_NONSPINNABLE或RWSEM_RD_NONSPINNABLE

{

    //1.判断临界区当前是否允许reader或writer自旋

    // 传入的nonspinnable可为RWSEM_WR_NONSPINNABLE或RWSEM_RD_NONSPINNABLE

    // 该字段由申请锁的人传入,如果申请锁的人是reader,则传入RWSEM_RD_NONSPINNABLE,

    // 表示这个reader是想判断这个临界区当前是否允许读者自旋;如果申请锁的是人writer,

    // 则传入RWSEM_WR_NONSPINNABLE,表示这个writer是想判断这个临界区当前是否允许写者自旋

    if (flags & nonspinnable)

        return OWNER_NONSPINNABLE;

 

    //2.下面判断到底是谁在临界区,是reader还是writer,或者根本就没人

    // 但是这并不表示申请锁的人就是可以自旋的,而是表示需要进一步判断

 

    //2.1 reader在临界区

    if (flags & RWSEM_READER_OWNED)

        return OWNER_READER;

 

    //2.2 代码走到这里只有两种可能,这两种情况也是可能产生自旋的

    // a) 写持锁

    // b) 没人持锁

    return owner ? OWNER_WRITER : OWNER_NULL;

}

 

五、唤醒逻辑

5.1 rwsem_first_waiter - 取出wait_list链表最前面的waiter

注意:这个waiter可能是reader,也可能是writer

#define rwsem_first_waiter(sem) \

    list_first_entry(&sem->wait_list, struct rwsem_waiter, list)

 

5.2 rwsem_mark_wake - 从wait_list链表上搜集要唤醒的线程

wait_list链表上的线程都是那些没有申请到锁,进入阻塞状态的线程,既有reader,也有writer,具体要唤醒哪些类型的线程,由wake_type参数决定。函数rwsem_mark_wake的执行逻辑为:根据wake_type,将要唤醒的task先临时添加进wake_q,等待后面一起唤醒

/*

* handle the lock release when processes blocked on it that can now run

* - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must

* have been set.

* - there must be someone on the queue

* - the wait_lock must be held by the caller

* - tasks are marked for wakeup, the caller must later invoke wake_up_q()

* to actually wakeup the blocked task(s) and drop the reference count,

* preferably when the wait_lock is released

* - woken process blocks are discarded from the list after having task zeroed

* - writers are only marked woken if downgrading is false

*/

static void rwsem_mark_wake(

            struct rw_semaphore *sem,

            enum rwsem_wake_type wake_type,            //要唤醒什么类型的task

            struct wake_q_head *wake_q)                //将要唤醒的task先添加到一个临时链表上

{

    struct rwsem_waiter *waiter, *tmp;

    long oldcount, woken = 0, adjustment = 0;

    struct list_head wlist;

 

    lockdep_assert_held(&sem->wait_lock);

 

    /*

     * Take a peek at the queue head waiter such that we can determine

     * the wakeup(s) to perform.

     */

    //1.获取wait_list最前面的task,队列最前面的task是等待最久的,就唤醒他了

    waiter = rwsem_first_waiter(sem);

 

    //2.如果wait_list链表最前面是writer,则不会跳过这个writer去唤醒reader

    // 下面我们情景分析一下这段代码,满足下面if条件表示链表最前面的是writer:

    // a) 如果唤醒类型是RWSEM_WAKE_ANY,那么直接将这个writer唤醒

    // b) 如果唤醒类型不是RWSEM_WAKE_ANY,那么wake_type只能是RWSEM_WAKE_READERS

    // 或RWSEM_WAKE_READ_OWNED,也就是说本次一定是想唤醒reader,我们看到下面的

    // 代码逻辑就直接return了,并不会真的执行唤醒reader的动作,为啥呢?

    // 试想,如果真的去唤醒reader的话,那么这个writer就要再多等一会,所以为了防

    // 止writer饿死,这个时候不去唤醒任何人

 

    // RWSEM_WAKE_ANY参数应该这么理解:本次要唤醒谁取决于链表最前面线程

    // a) 如果链表最前面的是reader,那么本次会唤醒链表上最前面的256个reader一起进入临界区;

    // b) 如果链表最前面的是writer,那么本次只会唤醒链表最前面的这个writer进入临界区;

    // 这么设计的原因主要是防止writer饿死,详见__up_read中的描述

 

    // wake_type可取的值还可能是RWSEM_WAKE_READERS或RWSEM_WAKE_READ_OWNED

    // 这两个type都是表示要唤醒reader,但是背景不一样

    // RWSEM_WAKE_READERS : 唤醒链表上的reader

    // RWSEM_WAKE_READ_OWNED: 用于标记waker已经拿到了reader锁,我是来带领

    // wait_list链表上的其他reader一起进入临界区的

    if (waiter->type == RWSEM_WAITING_FOR_WRITE) {

        if (wake_type == RWSEM_WAKE_ANY) {

            /*

             * Mark writer at the front of the queue for wakeup.

             * Until the task is actually later awoken later by

             * the caller, other writers are able to steal it.

             * Readers, on the other hand, will block as they

             * will notice the queued writer.

             */

            wake_q_add(wake_q, waiter->task);

            lockevent_inc(rwsem_wake_writer);

        }

        return;

    }

 

    //3.代码走到这里,wait_list最前面一定是一个reader,唤醒时一定是从链表最前

    // 面开始向后依次唤醒,所以最前面的是reader的话,下一个一定是唤醒reader,

    // 下面的逻辑是唤醒reader

 

    /*

     * No reader wakeup if there are too many of them already.

     */

    //4.出现负数表示最高bit位RWSEM_FLAG_READFAIL已经被置一

    // 也就是说当前临界区里已经有很多reader了,此时就不要再新的reader了

    if (unlikely(atomic_long_read(&sem->count) < 0))

        return;

 

    /*

     * Writers might steal the lock before we grant it to the next reader.

     * We prefer to do the first reader grant before counting readers

     * so we can bail out early if a writer stole the lock.

     */

    //5.如果当前wake_type不是RWSEM_WAKE_READ_OWNED,则表示当前的waker

    // 还没有拿到reader锁,这里唤醒的reader是第一次进入临界区

    if (wake_type != RWSEM_WAKE_READ_OWNED) {

        struct task_struct *owner;

 

        //5.1 如果writer偷了reader的锁,并且最前面的reader阻塞超时,

        // 则设置HANDOFF标记,禁止偷锁

        // 在当前cpu上要唤醒链表上的reader去拿锁,同时在其他cpu上可能存在

        // 一个writer也在申请锁(通过自旋的方式),他们通过原子操作公平的竞争,

        // 谁先抢到谁拿锁,如果下面的if条件成立,则表示在另一个cpu上的那个

        // writer已经通过自旋的方式成功拿到锁了,当前cpu上的这个reader拿锁

        // 失败,这就是writer"偷"锁的一种场景!!!

        // 这样链表最前面的reader又要继续处于D状态等下去,但是如果这个reader

        // 处于D状态的时间已经很长了,再阻塞下去可能就会导致系统crash,这时就会

        // 强制设置HANDOFF标记,禁止"偷锁"

        adjustment = RWSEM_READER_BIAS;

        oldcount = atomic_long_fetch_add(adjustment, &sem->count);

        if (unlikely(oldcount & RWSEM_WRITER_MASK)) {

            /*

             * When we've been waiting "too" long (for writers

             * to give up the lock), request a HANDOFF to

             * force the issue.

             */

            //注意:

            //这里在设置RWSEM_FLAG_HANDOFF标记时有个小技巧,if条件里

            //面的adjustment是减去RWSEM_FLAG_HANDOFF,和后面原子操作

            //中的atomic_long_add(-adjustment, &sem->count)结合,就

            //是设置RWSEM_FLAG_HANDOFF位了

            if (!(oldcount & RWSEM_FLAG_HANDOFF) &&

             time_after(jiffies, waiter->timeout)) {

                adjustment -= RWSEM_FLAG_HANDOFF;

                lockevent_inc(rwsem_rlock_handoff);

            }

 

            atomic_long_add(-adjustment, &sem->count);

            return;

        }

 

        /*

         * Set it to reader-owned to give spinners an early

         * indication that readers now have the lock.

         * The reader nonspinnable bit seen at slowpath entry of

         * the reader is copied over.

         */

        //5.2 代码走到这里,表示这个wait_list链表最前面的reader已经成功拿到了锁

        // 下面就需要设置锁的owner,但是在设置owner之前,还有一个工作要做,

        // 就是继承last_rowner的nonspin标记,为啥捏??????????

        owner = waiter->task;

        if (waiter->last_rowner & RWSEM_RD_NONSPINNABLE) {

            owner = (void *)((unsigned long)owner | RWSEM_RD_NONSPINNABLE);

            lockevent_inc(rwsem_opt_norspin);

        }

        __rwsem_set_reader_owned(sem, owner);

    }

 

    /*

     * Grant up to MAX_READERS_WAKEUP read locks to all the readers in the

     * queue. We know that the woken will be at least 1 as we accounted

     * for above. Note we increment the 'active part' of the count by the

     * number of readers before waking any processes up.

     *

     * This is an adaptation of the phase-fair R/W locks where at the

     * reader phase (first waiter is a reader), all readers are eligible

     * to acquire the lock at the same time irrespective of their order

     * in the queue. The writers acquire the lock according to their

     * order in the queue.

     *

     * We have to do wakeup in 2 passes to prevent the possibility that

     * the reader count may be decremented before it is incremented. It

     * is because the to-be-woken waiter may not have slept yet. So it

     * may see waiter->task got cleared, finish its critical section and

     * do an unlock before the reader count increment.

     *

     * 1) Collect the read-waiters in a separate list, count them and

     * fully increment the reader count in rwsem.

     * 2) For each waiters in the new list, clear waiter->task and

     * put them into wake_q to be woken up later.

     */

    //6.代码走到这里,一定是reader在临界区了,因为rwsem是允许多个reader

    // 同时进入临界区,所以下面需要唤醒wait_list链表上的其他reader,手牵

    // 手一起进入临界区,一次最多唤醒256个reader,所以对于reader,rwsem

    // 是一个相对公平的锁,因为wait_list链表上的reader不管顺序是怎么排序

    // 的,他们都可以一起进去临界区,同时获取锁,但是对于writer,是有顺序之

    // 分的,wait_list链表前面的writer肯定是比后面的writer先获取到锁

 

    //收集要唤醒的reader的工作必须分两步

    // a) 先将这些reader收集在一个单独的链表上,并完成计数,并将计数的值先加

    // 到sem->count中去,需要注意的是,这时候这些reader还没有真正的被唤醒

    // b) 对上面链表上收集到的每一个reader,清空waiter->task成员,并将这

    // 些task加到wake_q中去,以便稍后唤醒

 

    //为啥要分两步呢?上面的注释已经给出答案:

    // 防止rwsem->count成员的count字段,在增加之前被减少的可能,这是因为

    // 可能存在这一的情况:即将被唤醒的waiter可能根本就没有睡眠(参见

    // rwsem_down_read_slowpath的实现),所以在慢速路径中这个waiter可能会

    // 看到waiter->task成员被清空,于是就结束临界区,并且执行unlock操作,

    // 而在unlock中会将rwsem->count减1,但是这时候rwsem->count成员的

    // count字段还没有执行加操作,先减的话可能会出问题

 

    //6.1 第一步,先收集要唤醒的reader,最多不能超过256个,先将要唤醒的

    // reader添加到临时链表wlist上,等到第二步将其放到wake_q上

    //注意:这里在遍历链表时,跳过了writer,只唤醒reader

    INIT_LIST_HEAD(&wlist);

    list_for_each_entry_safe(waiter, tmp, &sem->wait_list, list) {

        if (waiter->type == RWSEM_WAITING_FOR_WRITE)

            continue;

 

        woken++;

        list_move_tail(&waiter->list, &wlist);

 

        /*

         * Limit # of readers that can be woken up per wakeup call.

         */

        if (woken >= MAX_READERS_WAKEUP)

            break;

    }

 

    //6.1.1 计算count要调整的值

    adjustment = woken * RWSEM_READER_BIAS - adjustment;

    lockevent_cond_inc(rwsem_wake_reader, woken);

 

    //6.1.2 如果此时wait_list为空的话,则取消RWSEM_FLAG_WAITERS标记,

    // 表示链表上已经没有waiter了

    if (list_empty(&sem->wait_list)) {

        /* hit end of list above */

        adjustment -= RWSEM_FLAG_WAITERS;

    }

 

    /*

     * When we've woken a reader, we no longer need to force writers

     * to give up the lock and we can clear HANDOFF.

     */

 

    //6.1.3 由上面的分析可知,上面设置HANDOFF是为了防止其他writer偷了链表

    // 最前面的reader的锁,从而临时禁止偷锁,现在链表最前面的reader已

    // 经得到唤醒,需要取消HANDOFF标记,重新允许自旋的线程偷锁

    if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))

        adjustment -= RWSEM_FLAG_HANDOFF;

 

    if (adjustment)

        atomic_long_add(adjustment, &sem->count);

 

    /* 2nd pass */

    //6.2 第二步:将临时链表wlist上的添加进唤醒队列wake_q中去,以便后面唤醒

    list_for_each_entry_safe(waiter, tmp, &wlist, list) {

        struct task_struct *tsk;

 

        tsk = waiter->task;

        get_task_struct(tsk);

 

        /*

         * Ensure calling get_task_struct() before setting the reader

         * waiter to nil such that rwsem_down_read_slowpath() cannot

         * race with do_exit() by always holding a reference count

         * to the task to wakeup.

         */

 

        //6.2 1 为啥要将waiter->task清空呢?

        // 这主要和rwsem_down_read_slowpath中的唤醒逻辑有关,在线程被

        // 唤醒时,会先判断这个waiter->task成员,如果该成员被清空,则表

        // 示其被真正的唤醒了,退出for循环,详见rwsem_down_read_slowpath实现

        smp_store_release(&waiter->task, NULL);

        /*

         * Ensure issuing the wakeup (either by us or someone else)

         * after setting the reader waiter to nil.

         */

        //6.2.2 确保在waiter->task设置为NULL后,再执行唤醒动作

        // 这里还不是唤醒,只是将其添加到队列中

        wake_q_add_safe(wake_q, tsk);

    }

}

 

5.1.1 rwsem_wake_type - 要唤醒哪些线程

在调用rwsem_mark_wake唤醒wait_list上的线程时,rwsem_wake_type类型用于指示本次要唤醒哪些waiter

a) RWSEM_WAKE_ANY : 表示唤醒wait_list最前面的线程,他可能是reader,也可能是writer

b) RWSEM_WAKE_READERS : 只唤醒链表上的reader,唤醒的时候会跳过writer

c) RWSEM_WAKE_READ_OWNED : 标记执行唤醒动作的waker,这个waker已经拿到了reader锁

enum rwsem_wake_type {

    RWSEM_WAKE_ANY,            /* Wake whatever's at head of wait list */

    RWSEM_WAKE_READERS,        /* Wake readers only */

    RWSEM_WAKE_READ_OWNED        /* Waker thread holds the read lock */

};

 

5.3 rwsem_wake - 唤醒wait_list链表上的线程

下面两条路径会调用本函数:

a) __up_write : 那个唯一的writer退出临界区了

b) __up_reader : 临界区的所有reader全部都退出临界区了

 

但是不管是哪一个路径,都表示:

a) 临界区没人了;

b) wait_list链表上还有人等着;

 

这时候调用rwsem_mark_wake函数从wait_list上重新唤醒一波线程进入临界区,传入的参数为RWSEM_WAKE_ANY,该参数表示根据wait_list链表最前面的线程是reader还是writer来决定唤醒哪些线程

a) 如果链表最前面的是reader,那么本次会唤醒链表上最前面的256个reader一起进入临界区;

b) 如果链表最前面的是writer,那么本次只会唤醒链表最前面的这个writer进入临界区;

/*

* handle waking up a waiter on the semaphore

* - up_read/up_write has decremented the active part of count if we come here

*/

static struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem, long count)

{

    unsigned long flags;

    DEFINE_WAKE_Q(wake_q);

 

    //1.先将要唤醒的线程放到临时链表wake_q上,然后一起唤醒

    raw_spin_lock_irqsave(&sem->wait_lock, flags);

    if (!list_empty(&sem->wait_list))

        rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);

 

    //2.执行唤醒的动作

    wake_up_q(&wake_q);

 

    return sem;

}

 

六、读操作

6.1 down_read - 拿不到锁进D状态

void __sched down_read(struct rw_semaphore *sem)

{

    //1.增加抢占点,可能会进入睡眠

    // 打开CONFIG_PREEMPT_VOLUNTARY宏表示自愿抢占,是内核的一种抢占模型,

    // 打开该宏后,might_sleep、might_resched这类接口才会有实现,否则就是

    // 空的,在内核中会有多个模块调用这两个函数,例如mutex、信号量、驱动等。

    // 这两个函数要完成的工作实际就是判断current的TIF_NEED_RESCHED标记是

    // 否被打上,也就是说判断是否要自愿让出cpu,如果被打上的话,就主动调用

    // schedule函数调度其他线程运行,通过这样的方式,相当于在代码中会增加了

    // 抢占点,这里先简单了解一下,后面在调度章节详细分析

    might_sleep();

 

    //2.只有在使能CONFIG_DEBUG_LOCK_ALLOC时才有效

    rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);

 

    //3.这里才是真正的持锁

    LOCK_CONTENDED(sem, __down_read_trylock, __down_read);

}

 

CONFIG_LOCK_STAT宏实现如下,快速路径尝试失败,才会走到慢速路径

#ifdef CONFIG_LOCK_STAT

#define LOCK_CONTENDED(_lock, try, lock)            \

do {                                \

    if (!try(_lock)) {                    \

        lock_contended(&(_lock)->dep_map, _RET_IP_);    \

        lock(_lock);                    \

    }                            \

    lock_acquired(&(_lock)->dep_map, _RET_IP_);            \

} while (0)

#else

#define LOCK_CONTENDED(_lock, try, lock) \

    lock(_lock)

#endif

 

6.2 __down_read_trylock - 直接拿锁,当reader在临界区,并且wait_list上没有阻塞的线程时,reader可以直接拿到锁

当一个reader在申请rwsem时,如果没人在临界区,或者同时满足下面条件,这个reader能直接拿到锁

a) 当前是reader在临界区

b) 并且wait_list上没有阻塞的线程时

 

直接拿锁成功返回1,失败返回0

static inline int __down_read_trylock(struct rw_semaphore *sem)

{

    long tmp;

 

    //1.调试相关,暂时不关注

    DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);

 

    /*

     * Optimize for the case when the rwsem is not locked at all.

     */

    //2.尝试直接拿锁,(所谓拿到锁,就是成功对这个原子变量执行了加1操作)

    // reader拿到锁时完成下面两个工作

    // a) 将sem->count的count字段加1

    // b) 将sem->owner设置为current

    // 需要注意的是,对于reader来说,owner记录的是最后一个进入临界区的reader

    // 但是并不表示owner指向的task一定就在临界区,详见rwsem_clear_reader_owned

 

    // 首先来看一下atomic_try_cmpxchg_acquire原子操作:

    // 如果第一个参数和第二个参数相等,则将第三个参数赋值给第一个参数,并返回true,

    // 否则不执行赋值操作,并返回false,关键字try用于标记是否完成和chg操作

    // 注意:

    // 不管是否交换成功,每次调用atomic_try_cmpxchg_acquire操作后,tmp

    // 的值都会被修改,tmp会被改为原子变量的旧值,结合这里的do-while表示:

    // 只要count没有被打上RWSEM_READ_FAILED_MASK标记,这个reader就

    // 会一直循环尝试修改这个原子变量的值,直到成功

 

    // RWSEM_READ_FAILED_MASK定义如下:

    // (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL),

    // 这表示满足下面任意一个条件,reader就会退出循环,快速持锁路径都会失败:

    // a) 如果有writer已经进入临界区

    // b) wait_list上有人(可能是reader或writer),即有人阻塞在这把锁上

    // c) 锁已经被打上HANDOFF标记

    // d) 临界区的reader已经撑爆了

 

    // 换句话说,只有同时满足下面条件,reader才能直接拿到锁

    // a) 当前在reader在临界区

    // b) 并且wait_list上没有阻塞的线程时

    // c) 锁没有被打上HANDOFF标记,即允许偷锁

    // (严格意义上来讲,这里不应该叫偷锁,应该叫插队拿锁)

 

    // 还有一个问题,那么这里为什么要不停的循环尝试呢?

    // 这主要是防止多个cpu同时修改这个count值撞到一块去了,也就是多个cpu

    // 上在同一时刻恰好都有reader需要进入临界区时,这时候多个cpu会同时来修

    // 改这个count的值,一个cpu在写count时,另一个cpu就必须等着

 

    // 返回值:直接拿锁成功返回1,失败返回0

    tmp = RWSEM_UNLOCKED_VALUE;

    do {

        if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,

                    tmp + RWSEM_READER_BIAS)) {

            rwsem_set_reader_owned(sem);

 

            return 1;

        }

    } while (!(tmp & RWSEM_READ_FAILED_MASK));

    return 0;

}

 

6.2.1 rwsem_set_reader_owned - 将current设置为读owner

注意:当reader在临界区时,sem->owner中记录的task并不一定在临界区!!!这是因为当reader在退出临界区时执行的rwsem_clear_reader_owned实际上是空的

static inline void rwsem_set_reader_owned(struct rw_semaphore *sem)

{

    //将当前线程current标记为读的owner

    __rwsem_set_reader_owned(sem, current);

}

 

6.2.2 __rwsem_set_reader_owned - 将指定的task设置为读owner

/*

* The task_struct pointer of the last owning reader will be left in

* the owner field.

*

* Note that the owner value just indicates the task has owned the rwsem

* previously, it may not be the real owner or one of the real owners

* anymore when that field is examined, so take it with a grain of salt.

*

* The reader non-spinnable bit is preserved.

*/

static inline void __rwsem_set_reader_owned(

            struct rw_semaphore *sem,

            struct task_struct *owner)

{

    //1.在设置read_owner时,除了设置task_struct之外,还完成下面3个工作

    // a) 标记RWSEM_READER_OWNED,这个很好理解,因为现在是reader持锁

    // b) 保留owner中原来的RWSEM_RD_NONSPINNABLE位的值

    // c) 需要特别注意的是,此处还清除了RWSEM_WR_NONSPINNABLE位被清零,

    // 也就是说,当reader在临界区时,我们认为reader很快就会退出临界

    // 区,writer应该再自旋等一下,不用直接进D状态

    // 需要注意的是,只要有reader进入临界区,都会清一次RWSEM_RD_NONSPINNABLE

    // 标记位,也就是说后来的writer都会自旋

    unsigned long val = (unsigned long)owner | RWSEM_READER_OWNED |

        (atomic_long_read(&sem->owner) & RWSEM_RD_NONSPINNABLE);

 

    atomic_long_set(&sem->owner, val);

}

 

6.3 __down_read

static inline void __down_read(struct rw_semaphore *sem)

{

    //背景:代码走到__down_read这里,一定表示前面在调用

    // __down_read_trylock时失败了,也就是说此时可能

    // a) 临界区里是writer

    // b) 或者临界区里是reader,但是wait_list上有线程阻塞着

 

    //1.首先是尝试走快速路径持锁,拿到锁后就直接设置owner了

    if (!rwsem_read_trylock(sem)) {

 

        //2.上面快速路径拿锁失败,不得不走到这里执行慢速路径

        rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE);

        DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);

    } else {

        //3.快速路径持锁成功后设置owner

        // 对于reader,owner记录的是最后一次进入临界区的reader

        rwsem_set_reader_owned(sem);

    }

}

 

6.4 rwsem_read_trylock - 再次尝试直接拿锁,当reader在临界区,并且wait_list上没有阻塞的线程时,reader可以直接拿到锁

感觉这个逻辑和__down_read_trylock是一样的啊,为啥要做成两个函数呢?

返回true持锁成功,返回false持锁失败

static inline bool rwsem_read_trylock(struct rw_semaphore *sem)

{

    //1.原子操作返回的值是加完1之后的值,注意这里的逻辑

    // 不管这个reader是否能够拿到锁,也不管wait_list链表上是否有线程在等着,

    // 上来就先将rwsem->count成员的count字段加1,要知道,count字段中记录的

    // 是在临界区的reader的个数啊,你这个reader还不一定能成功拿到锁呢,凭啥上

    // 来就加1呢?这么自信吗?

    // 实际上如果这里持锁失败,则会进入慢速路径,在慢速路径中会将这个1减去的

    // 那么为啥要提前加呢?难道是起到了提前预警的作用?

    long cnt = atomic_long_add_return_acquire(RWSEM_READER_BIAS, &sem->count);

 

    //2.这里判断小于0实际是判断count的最高bit位RWSEM_FLAG_READFAIL

    // RWSEM_FLAG_READFAIL位被置一表示已经有很多reader在临界区了

    // 这时候设置nospinable标记,也就是后续不管是reader还是writer

    // 都不需要再自旋了,直接进阻塞状态,因为服务完这么多reader需要很长时间

    if (WARN_ON_ONCE(cnt < 0))

        rwsem_set_nonspinnable(sem);

 

    //3.RWSEM_READ_FAILED_MASK的定义如下

    // (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL)

    // 这表示:

    // a) 如果临界区的是writer,后来的reader快速持锁失败

    // b) 如果wait_list上有等待的线程,则后来的reader也会持锁失败

    // c) 如果锁已经被打上HANDOFF标记,则后来的reader持锁失败

    // d) 如果临界区的reader已经特别多了,后来的reader也会持锁失败

 

    // 也就是说,只有在当临界区全是reader,并且wait_list上没有阻塞的线程时,

    // 后来的reader才可以直接拿到锁进入临界区

    return !(cnt & RWSEM_READ_FAILED_MASK);

}

 

6.4.1 rwsem_set_nonspinnable - 设置RWSEM_NONSPINNABLE标记,关闭自旋

RWSEM_NONSPINNABLE是RWSEM_RD_NONSPINNABLE和RWSEM_WR_NONSPINNABLE,设置这两个bit表示reader和writer都不能自旋,直接进阻塞状态,共有两个地方会调用该函数设置nonspinable标记

a) 在rwsem_read_trylock中,发现已经有很多reader在临界区了,此时设置nonspinnable标记,这样后续申请锁的人,不管是reader还是wirter,都不会在自旋了,直接进阻塞状态,因为要等待这些reader都处理完可能要很长时间

b) 在rwsem_optimistic_spin中,reader在临界区,当一个writer已经自旋超时后,会设置nonspinnable标记并退出自旋,这样后续申请锁的人,不管是reader还是wirter,都不会在自旋了,直接进D状态。表示前面已经有一个writer已经很急迫了,关闭了自旋,也就切断了后续的reader源源不断的通过自旋偷锁的方式进入临界区,避免reader的临界区越来越大,这也是防止writer饿死的一种机制。从这一点也可以看出,只有wait_list上有writer阻塞着,后面申请锁的线程,不管是reader还是writer,都不能自旋,直接进阻塞状态

 

/*

* Set the RWSEM_NONSPINNABLE bits if the RWSEM_READER_OWNED flag

* remains set. Otherwise, the operation will be aborted.

*/

static inline void rwsem_set_nonspinnable(struct rw_semaphore *sem)

{

    unsigned long owner = atomic_long_read(&sem->owner);

 

    do {

        //1.下面的条件表示:

        // a) 如果是reader在临界区时,继续向下,一定会设置RWSEM_NONSPINNABLE,

        // 此时后来的reader或者writer都不允许自旋

        // b) 如果是writer或者没人在临界区时,直接返回,RWSEM_NONSPINNABLE标记

        // 保持原来的状态,此时后来的reader或者writer允许自旋,为啥呀?

        // 该函数的调用时机,都是reader在临界区,根本的目的是禁止reader通过

        // 乐观自旋的方式源源不断的进入临界区,导致reader的临界区越来越大,这

        // 也是防止writer饿死的一种机制

        if (!(owner & RWSEM_READER_OWNED))

            break;

 

        if (owner & RWSEM_NONSPINNABLE)

            break;

 

    //2.代码走到这里,表示reader在临界区,并且还没有设置

    // RWSEM_NONSPINNABLE标记,下面就需要设置该标记位

 

    // atomic_long_try_cmpxchg有三个参数,如果第一个参数等于第二个参数,

    // 则执行xchg操作将第三个参数的值赋值给第一个参数,并返回true;不相

    // 等则不执行xchg操作,也就不执行赋值操作,并返回false;并且每次原子

    // 操作后,不管是否成功,owner每次都会被更改为最新读取到的sem->owner

    // 的值,函数返回标记是否已经成功执行了xchg操作;

 

    // 所以,这里的循环的意思是:只有真正的执行了xchg操作时,atomic_long_try_cmpxchg

    // 才会返回true,表示已经成功将RWSEM_NONSPINNABLE设置上,此时退出循环;

    // 如果在执行原子变量期间owner成员发生变化(可能值持锁者发生变化,也可能

    // 是标记发生变化),atomic_long_try_cmpxchg没有真正的执行xchg操作,

    // 此时返回false,继续循环操作,直到成功设置RWSEM_NONSPINNABLE标记为止

    } while (!atomic_long_try_cmpxchg(&sem->owner, &owner, owner | RWSEM_NONSPINNABLE));

}

 

6.5 rwsem_down_read_slowpath - 慢速路径持锁

该函数传入的参数state表示如果reader持锁失败,则将这个reader设置为什么状态,可取值为:

a) TASK_UNINTERRUPTIBLE : 进入D状态

b) TASK_INTERRUPTIBLE : 进入sleep状态

c) TASK_KILLABLE : 直接杀死

/*

* Wait for the read lock to be granted

*/

static struct rw_semaphore __sched * rwsem_down_read_slowpath(

            struct rw_semaphore *sem,

            int state)                        //持锁失败时线程进入的状态

{

    //1.adjustment的初值为-1,表示对count字段减1

    // 因为在调用rwsem_down_read_slowpath之前一定已经调用过rwsem_read_trylock

    // 由前面的分析可知,在rwsem_read_trylock中不管三七二十一直接对count加1了

    // 走到这里肯定是没拿到锁,需要减掉了,但是为啥要提前加呢????

    long count, adjustment = -RWSEM_READER_BIAS;

    struct rwsem_waiter waiter;

 

    //2.定义一个等待队列,批量唤醒,这个在后面分析

    DEFINE_WAKE_Q(wake_q);

    bool wake = false;

 

    /*

     * Save the current read-owner of rwsem, if available, and the

     * reader nonspinnable bit.

     */

    //3.备份当前rwsem的owner,owner的值可为:

    // a) 如果是读持锁,只能记录最后一个进入临界区的reader,但是需要注意的是,这个

    // owner指向的task当前并不一定在临界区哦,详见rwsem_clear_reader_owned实现

    // b) 如果是写持锁,因为临界区只允许一个writer,owner记录的是那个唯一的writer

 

    // 如果当前是writer在临界区,则只保留RWSEM_RD_NONSPINNABLE标记,为啥捏????

    // 原因就是方便在后面rwsem_reader_phase_trylock中判断锁的持有者是否被更换了。

    // 这里主要是考虑到了这种情况:同一个task,可能一会是reader,而在另一个时刻却

    // 摇身一变成了writer,过一会又会在变成reader,这种情况下Linux也会被看做是不

    // 同的owner持了锁,但是这种情况下owner的task_struct字段却又是一样的,这样在

    // rwsem_reader_phase_trylock中通过(owner ^ last_rowner)的方式判断是否切换过

    // owner就会失败,所以这里如果是writer在临界区,会将task_struct字段清空

 

    // 题外话,不得不吐槽一下这里的代码逻辑,真是太乱了,怎么审核过的!!!

    waiter.last_rowner = atomic_long_read(&sem->owner);

    if (!(waiter.last_rowner & RWSEM_READER_OWNED))

        waiter.last_rowner &= RWSEM_RD_NONSPINNABLE;

 

    //4.判断这个reader是否满足乐观自旋的条件,下面情况不满足乐观自旋的条件,

    // 详见rwsem_can_spin_on_owner实现

    // a) current,也就是当前申请锁的这个reader如果不久就会被调度出去,

    // 在下一个调度时机就会让出cpu,那么就没必要乐观自旋了

    // b) 如果这把锁已经被持锁者打上RWSEM_RD_NONSPINNABLE标签

    // 设置nospinable时机详见后面要点总结

    // c) 虽然这把锁没有被打上RWSEM_RD_NONSPINNABLE,但是在spin on

    // writer的过程中,如果这个writer已经不在cpu上了(on_cpu为0,

    // 也就是不是在running状态,可能是runnable或已经进入阻塞状态),

    // 则表示这个writer短时间内是不会退出临界区的,那么后来的reader

    // 也没必要乐观自旋了

    if (!rwsem_can_spin_on_owner(sem, RWSEM_RD_NONSPINNABLE))

        goto queue;

 

    //5.代码走到这里表示后来的这个reader在当前场景下是满足乐观自旋的条件的

 

    /*

     * Undo read bias from down_read() and do optimistic spinning.

     */

    //6.还记得我们在rwsem_read_trylock中不管三七二十一,先对count字段执行加1

    // 操作吗,现在这个reader需要执行乐观自旋,短时间拿不到锁,进不了临界区,所

    // 以先将count减1,因为count字段记录的是在临界区的reader的数量

    // 将adjustment清0表示这里已经减去1了,不需要后面再调整了

    // 但是为啥要提前加捏??难道是想提前预警吗?

    atomic_long_add(-RWSEM_READER_BIAS, &sem->count);

    adjustment = 0;

 

    //7.乐观自旋也不是一直自旋下去,也不一定能拿到锁,下面情况会拿锁失败

    // a) 自旋者如果是rt线程,不管是reader还是writer,如果连续尝试过两次自旋失败,就结束自旋

    // b) 自旋者如果是writer,当writer自旋超时(临界区是reader),拿锁失败,结束自旋

    // c) 自旋者如果是reader,会一直自旋下去,除非自己马上要调度出去,或者当一个writer已

    // 经处于自旋状态等临界区的reader超时了,自旋的reader也会退出自旋。(因为此时会设置

    // RWSEM_RD_NONSPINNABLE和RWSEM_WR_NONSPINNABLE标记,详见rwsem_optimistic_spin

    // 实现。换句话说,只要wait_list上有writer,后来的申请者,不管是reader还是writer,

    // 都一定不能不能自旋)

    if (rwsem_optimistic_spin(sem, false)) {

        /* rwsem_optimistic_spin() implies ACQUIRE on success */

        /*

         * Wake up other readers in the wait list if the front

         * waiter is a reader.

         */

        //8.代码走到这里表示本人reader通过乐观自旋已经拿到锁了,此时临界区已经是

        // reader的天下,因为允许reader同时进入临界区,所以这里唤醒wait_list

        // 链表上阻塞的reader,手牵手一起进入临界区。具体做法是先将这些reader

        // 添加进临时的等待队列wake_q,然后一起唤醒

 

        // 这里有个问题,在判断wait_list链表上是否有阻塞的线程时,为啥已经通过count

        // 的RWSEM_FLAG_WAITERS字段判断了,还要再通过list_empty判断一次呢????

        if ((atomic_long_read(&sem->count) & RWSEM_FLAG_WAITERS)) {

 

            //8.1 将wait_list链表上的reader先添加进wake_q

            raw_spin_lock_irq(&sem->wait_lock);

            if (!list_empty(&sem->wait_list))

                rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED, &wake_q);

            raw_spin_unlock_irq(&sem->wait_lock);

 

            //8.2 再讲wake_q链表上的所有reader全部唤醒

            wake_up_q(&wake_q);

        }

 

        //8.3 返回非空表示成功拿到锁

        return sem;

    }

 

    //9.代码走到这里表示reader通过乐观自旋没有拿到锁,下面还有一次机会

    // 在真正的挂入链表之前,再尝试一次吧

    // waiter.last_rowner中记录的那个在乐观自旋前,锁的持有者,

    // 我们想通过这个知道到底临界区的持锁者是否换过一波

    else if (rwsem_reader_phase_trylock(sem, waiter.last_rowner)) {

        /* rwsem_reader_phase_trylock() implies ACQUIRE on success */

        return sem;

    }

 

    //10.代码走到这里表示乐观自旋拿锁失败,下面也需要将本reader挂到链表上了

 

queue:

    //11.代码走到这里表示不得不把本reader挂到wait_list上了

    // 下面初始化rwsem_waiter结构,阻塞超时时间是4ms,需要注意的是,

    // 这里并不是说阻塞超过4ms就退出,而是阻塞超过4ms设置HANDOFF标

    // 记,防止饿死,禁止哪些后来的申请锁的人在自旋过程中拿到锁

    waiter.task = current;

    waiter.type = RWSEM_WAITING_FOR_READ;

    waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;

 

    raw_spin_lock_irq(&sem->wait_lock);

 

    //12.如果链表为空,则说明之前没有其他的线程阻塞在这把锁上

    // 第一次有线程被加到wait_list上,需要设置RWSEM_FLAG_WAITERS标记

    if (list_empty(&sem->wait_list)) {

        /*

         * In case the wait queue is empty and the lock isn't owned

         * by a writer or has the handoff bit set, this reader can

         * exit the slowpath and return immediately as its

         * RWSEM_READER_BIAS has already been set in the count.

         */

        //13.尝试"偷锁",这里实际上是偷了正在自旋的writer的锁

        // a) 如果adjustment为0,则表示本reader在此之前就已经执行过自旋逻辑了,

        // 给你机会你不中用啊,还是老实去wait_list上呆着吧

        // b) 如果adjustment不为0,则表示这个reader之前不满足自旋的条件,前面并

        // 没有乐观自旋过,如果同时满足下面两个条件,则在这里偷锁

        // i) 当前是临界区没人或者是reader

        // ii) 并且锁没有被加上HANDOFF标记,也就是说还是允许"偷锁"的

        if (adjustment && !(atomic_long_read(&sem->count) &

                (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {

            /* Provide lock ACQUIRE */

            smp_acquire__after_ctrl_dep();

            raw_spin_unlock_irq(&sem->wait_lock);

            rwsem_set_reader_owned(sem);

            lockevent_inc(rwsem_rlock_fast);

            return sem;

        }

 

        //14.加上RWSEM_FLAG_WAITERS标记

        adjustment += RWSEM_FLAG_WAITERS;

    }

 

    //15.将reader添加进wait_list链表,添加到末尾

    list_add_tail(&waiter.list, &sem->wait_list);

 

    /* we're now waiting on the lock, but no longer actively locking */

    //16.调整rwsem->count成员

    if (adjustment)

        count = atomic_long_add_return(adjustment, &sem->count);

    else

        count = atomic_long_read(&sem->count);

 

    /*

     * If there are no active locks, wake the front queued process(es).

     *

     * If there are no writers and we are first in the queue,

     * wake our own waiter to join the existing active readers !

     */

    //17.满足下面条件表示当前临界区已经没人了

    // 为啥临界区没人了,这个reader却无法进入临界区呢???

 

    // 当同时满足下面两个条件时,即使临界区是空的,后来的reader也不能进入临界区

    // a) 如果wait_list上已经有writer阻塞着,则后来的这个reader则在慢速路径中首先不会自旋

    // b) 如果链表最前面的writer已经阻塞超时了,并且已经设置了RWSEM_FLAG_HANDOFF标记,则在

    // 上面也是有可能拿到锁的

 

    // 下面的操作:

    // a) 清除RWSEM_WR_NONSPINNABLE标记,后来的writer在申请锁的时候又可以乐观自旋了

    // 为什么呢要这么做呢?为什么不把RWSEM_RD_NONSPINNABLE标记也清掉了?

    // 详细参见后面clear_wr_nonspinnable的分析

    // b) 并将wake标记为true,因为临界区没人了,可以从wait_list链表上重新唤醒一波进入临界区了

    if (!(count & RWSEM_LOCK_MASK)) {

        clear_wr_nonspinnable(sem);

        wake = true;

    }

 

    //18.满足下面任意一个条件,去wait_list上唤醒一波

    // a) 如果当前临界区里已经没人了;

    // b) 或者是reader在临界区,并且本reader在上面也刚刚被加入队列,这是为啥捏???

    //注意:

    // 这里的rwsem_mark_wake传入的参数为RWSEM_WAKE_ANY,表示会根据

    // 链表最前面的线程,决定唤醒reader还是writer

    if (wake || (!(count & RWSEM_WRITER_MASK) &&

         (adjustment & RWSEM_FLAG_WAITERS)))

        rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);

 

    raw_spin_unlock_irq(&sem->wait_lock);

    wake_up_q(&wake_q);

 

    /* wait to be given the lock */

    //19.下面将要进入阻塞状态了

    for (;;) {

        //19.1 设置当前线程的状态

        set_current_state(state);

 

        //19.2 注意:这个waiter可能都还没有真正的睡眠就被唤醒了,也就是刚

        // 被加到队列中,就在上面的rwsem_mark_wake中被唤醒了。也正是

        // 因为在这里是判断waiter.task是否为空,所以在rwsem_mark_wake

        // 时需要分两步唤醒

        if (!smp_load_acquire(&waiter.task)) {

            /* Matches rwsem_mark_wake()'s smp_store_release(). */

            break;

        }

 

        //19.3 检查是否有挂起的信号等待处理,如果有挂起的信号等待处理,

        // 则进入该分支处理信号,可以被kill之类的操作唤醒

        if (signal_pending_state(state, current)) {

            raw_spin_lock_irq(&sem->wait_lock);

 

            //19.4 被信号唤醒后先判断一下,是否拿到锁了

            if (waiter.task)

                goto out_nolock;

            raw_spin_unlock_irq(&sem->wait_lock);

            /* Ordered by sem->wait_lock against rwsem_mark_wake(). */

            break;

        }

 

        //19.5 调度其他线程执行

        schedule();

        lockevent_inc(rwsem_sleep_reader);

    }

 

    //20.代码走到这里表示已经成功拿到锁了,重新设置为running状态

    __set_current_state(TASK_RUNNING);

    lockevent_inc(rwsem_rlock);

    return sem;

 

out_nolock:

    //21.代码走到这里表示拿锁失败,从D状态中唤醒时因为被kill等操作唤醒的

    // 下面将其从链表上摘下来,如果wait_list链表空了的话,则清除

    // RWSEM_FLAG_WAITERS和RWSEM_FLAG_HANDOFF标记

    list_del(&waiter.list);

    if (list_empty(&sem->wait_list)) {

        atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF, &sem->count);

    }

 

    //22.重新设置running状态,并返回拿锁失败

    raw_spin_unlock_irq(&sem->wait_lock);

    __set_current_state(TASK_RUNNING);

    lockevent_inc(rwsem_rlock_fail);

    return ERR_PTR(-EINTR);

}

 

6.5.1 rwsem_reader_phase_trylock - 在挂入wait_list之前,再挣扎一下

由下面的注释可知,只有在一个reader经过乐观自旋还是没有获取到锁时,才会调用到这个函数,这个函数的作用就是,在乐观自旋失败,本reader即将被挂入链表进入阻塞状态之前,再挣扎一把,看看能不能获取到锁。

怎么个挣扎法呢?

本reader在执行乐观自旋之前锁的持有者被记录在last_rowner中,乐观自旋结束后锁的新持有者被记录在sem->owner中,为了方便描述,我们把这个新的owner记为new_owner,则存在下面情况:

a) 新的持有者new_owner是一个writer,那么reader是注定在短时间内拿不到锁了,这时候返回false

b) 如果新的持有者new_owner也是一个reader,那么还存在下面两个情况:

i) new_owner和last_rowner一样,也就是说在本reader乐观自旋期间,临界区的reader们压根就没变过,之前是哪些人现在还是哪些人,没有任何一个新的owner进入过临界区,那还等个毛啊,注定在临界区的reader们短时间之内是不会退出了。举个例子,你去办事大厅取号办事,前面排了很多人,要等,此时你突然肚子疼想拉屎,拉屎前你发现现在窗口正在办理业务的是张三,你一泡屎拉了10分钟,回来后发现窗口前还是在办张三的业务,这说明张三的业务很负载,短时间之内是办理不完了,此时返回false,不挣扎了,去wait_list上迷瞪一会吧;

ii) 但是如果new_owner和last_owner不一样,那至少说明还是有新的reader可以进入临界区的。也就是说你在拉屎前发现正在办理张三的业务,拉完屎后办理的是李四的业务,至少办事流程是走的,还是有希望的,这时候可以尝试一下

 

/*

* This function is called when the reader fails to acquire the lock via

* optimistic spinning. In this case we will still attempt to do a trylock

* when comparing the rwsem state right now with the state when entering

* the slowpath indicates that the reader is still in a valid reader phase.

* This happens when the following conditions are true:

*

* 1) The lock is currently reader owned, and

* 2) The lock is previously not reader-owned or the last read owner changes.

*

* In the former case, we have transitioned from a writer phase to a

* reader-phase while spinning. In the latter case, it means the reader

* phase hasn't ended when we entered the optimistic spinning loop. In

* both cases, the reader is eligible to acquire the lock. This is the

* secondary path where a read lock is acquired optimistically.

*

* The reader non-spinnable bit wasn't set at time of entry or it will

* not be here at all.

*/

static inline bool rwsem_reader_phase_trylock(

            struct rw_semaphore *sem,

            unsigned long last_rowner)    //记录乐观自旋之前的reader

{

    unsigned long owner = atomic_long_read(&sem->owner);

 

    //1.经历一段时间的乐观自旋后,新的持有者是一个writer,

    // 那么reader是注定在短时间内拿不到锁了,这时候返回false

    if (!(owner & RWSEM_READER_OWNED))

        return false;

 

    //2.代码走到这表示当前还是reader们在临界区,此时的rwsem->owner记录的是最

    // 后一个进入临界区的reader,(owner ^ last_rowner)表示如果乐观自旋前后的

    // owner不一样,则说明临界区里还是有新的reader进去了,为什么下一个不能是

    // 我呢,此时可以再次尝试一下获取锁,说不定就拿到了

 

    // 注意:

    // 在执行rwsem_try_read_lock_unqueued时,申请锁的是reader,锁的owner也是reader

    if (((owner ^ last_rowner) & ~RWSEM_OWNER_FLAGS_MASK) &&

                    rwsem_try_read_lock_unqueued(sem)) {

        lockevent_inc(rwsem_opt_rlock2);

        lockevent_add(rwsem_opt_fail, -1);

        return true;

    }

 

    //3.代码走到这里,虽然此时临界区里面还是reader的天下,但是可能存在下面两种情况

    // a) 临界区的reader们就没换过,也就是说这里的所有reader操作都是很耗时的

    // b) 虽然不断有新的reader进入临界区,但是轮也轮不到我啊!崩溃

    return false;

}

 

6.6 up_read - reader退出临界区时释放锁

void up_read(struct rw_semaphore *sem)

{

    rwsem_release(&sem->dep_map, _RET_IP_);

    __up_read(sem);

}

 

6.7 __up_read - reader退出临界区时释放锁

注意:reader退出临界区时,是一个一个退出的,此时的owner并不一定是指向自己,并且当临界区还是reader的天下时,owner的task_struct字段并不一定在临界区

/*

* unlock after reading

*/

static inline void __up_read(struct rw_semaphore *sem)

{

    long tmp;

 

    //1.调试信息

    DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);

    DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);

 

    //2.清除owner,正式项目没有开启CONFIG_DEBUG_RWSEMS宏,啥也不干

    // 也就是说即使owner所指向的task_struct已经退出临界区,但是这个

    // owner的task_struct可能依然指向他(假设没有新的reader进入临界区)

    rwsem_clear_reader_owned(sem);

 

    //3.有一个reader已经退出临界区了,count减1

    tmp = atomic_long_add_return_release(-RWSEM_READER_BIAS, &sem->count);

    DEBUG_RWSEMS_WARN_ON(tmp < 0, sem);

 

    //4.所有reader退出临界区时,唤醒wait_list链表上阻塞的线程进入临界区

    // 满足下面条件表示临界区已经没人了,但是wait_list上还有阻塞着的线程,

    // 此时从wait_list链表上唤醒新的任务进入临界区

 

    // 在唤醒wait_list链表上其他线程时,为啥要等所有reader都退出时才唤醒呢?

    // 因为多个reader是可以同时进入临界区的,并且一般reader在临界区的时间都

    // 很短,在临界区的reader的数量又很多。如过不加上上面两个条件,那么只要有

    // reader退出,就执行rwsem_wake操作,那么将会带来下面两个问题:

    // a) 频繁的遍历wait_list挑选要唤醒的线程,带来额外的开销

    // b) 因为当前reader在临界区,所以rwsem_wake只会唤醒reader,

    // 那么就会导致这样的现象:只要wait_list上有reader,就会被

    // 唤醒进入临界区,这样wait_list上的reader源源不断的进入临

    // 界区,reader的临界区不断变大,writer却迟迟不能等到锁,直

    // 到wait_list上没有任何reader后,writer才允许进入临界区,

    // 这样writer不就饿死了吗

 

    // 加上上面两个条件限制,完美的解决了这个问题,并不是每次有reader退出临界区,

    // 都会从wait_list链表上重新唤醒新的线程进入临界区,而是当临界区的reader全

    // 部退出临界区后才重新从wait_list上唤醒新的线程进入临界区,而这些新唤醒的线

    // 程可能是reader,也可能是writer,如果是writer,一次只能放一个进来,如果是

    // reader,一次最多放256个进来

 

    // 这里为啥只清除RWSEM_WR_NONSPINNABLE呢,而RWSEM_RD_NONSPINNABLE保持原样呢?

    // 此时临界区为空了,此时一切从头再来,所以这里理论上应该将RWSEM_WR_NONSPINNABLE

    // 和RWSEM_RD_NONSPINNABLE全部都清除才合理,即后面来的不管是reader还是writer

    // 都又可以通过自旋拿锁了,但是这里偏偏只清除RWSEM_WR_NONSPINNABLE呢,而

    // RWSEM_RD_NONSPINNABLE保持原样,这是为啥呢?

    // 试想,当前时刻是所有的reader都退出临界区,因为rwsem是允许多个reader同时进

    // 入临界区的,当reader在临界区时,后来的reader是可以直接进临界区的(详见

    // __down_read_trylock实现),所以当所有reader全部退出临界区时,reader可能已经

    // 霸占这把锁相当一段时间了,如果我们在这里将RWSEM_RD_NONSPINNABLE也清除,那么很

    // 可能后来的reader又会通过自旋的方式进入临界区,又会出现长时间霸占锁的情况,这对

    // writer不公平!所以我们在这里只将RWSEM_WR_NONSPINNABLE标记清零,也就是说值允

    // 许writer自旋拿锁,而后面的reader会直接被挂进wait_list上

    if (unlikely((tmp & (RWSEM_LOCK_MASK|RWSEM_FLAG_WAITERS)) == RWSEM_FLAG_WAITERS)) {

        clear_wr_nonspinnable(sem);

        rwsem_wake(sem, tmp);

    }

}

 

6.7.1 rwsem_clear_reader_owned - reader退出临界区时清除owner

注意:只有在使能CONFIG_DEBUG_RWSEMS才会清除owner成员中的task_struct字段

#ifdef CONFIG_DEBUG_RWSEMS

/*

* With CONFIG_DEBUG_RWSEMS configured, it will make sure that if there

* is a task pointer in owner of a reader-owned rwsem, it will be the

* real owner or one of the real owners. The only exception is when the

* unlock is done by up_read_non_owner().

*/

static inline void rwsem_clear_reader_owned(struct rw_semaphore *sem)

{

    unsigned long val = atomic_long_read(&sem->owner);

 

    //因为rwsem允许多个reader在临界区,此时rwsem->owner中记录的是最后一个进入

    //临界区的线程。所以当一个reader退出临界区时,rwsem->owner并不一定是指向这个

    //reader的,需要判断,如果不一致的话则不做任何操作;一致则会清除owner成员中的

    //task_struct字段,保留flags字段,这时候task_struct字段是空的

 

    //注意:

    // 使能CONFIG_DEBUG_RWSEMS时,如果没有新的reader进入临界区,只是不断的有

    // reader退出临界区,那么此时owner的task_struct字段会一直是空的,所以,

    // 如果当使能CONFIG_DEBUG_RWSEMS宏时,发现owner字段为空的话,并不表示当前

    // 临界区没人哦

    while ((val & ~RWSEM_OWNER_FLAGS_MASK) == (unsigned long)current) {

        if (atomic_long_try_cmpxchg(&sem->owner, &val,

                     val & RWSEM_OWNER_FLAGS_MASK))

            return;

    }

}

#else

//注意:

// 没有开启CONFIG_DEBUG_RWSEMS宏时,啥也不干。也就是说即使owenr指向的task_struct

// 字段已经退出临界区了,也不会将这个task_struct字段清除或重新赋值(我们假设没有新的

// reader进入临界区),所以在我们分析dump文件时,如果临界区是reader时,owner的

// task_struct字段所指向的线程并不一定在临界区

static inline void rwsem_clear_reader_owned(struct rw_semaphore *sem)

{

}

#endif

 

6.7.2 clear_wr_nonspinnable - 只清除RWSEM_WR_NONSPINNABLE标记,优先让writer通过自旋拿锁

由下面的注释可知,该函数仅在所有reader退出临界区时才会被调用,该函数共有下面两个调用位置:

a) 在__up_read中,也就是所有reader退出临界区的时候,发现wait_list链表上有人等着的时候,会清除RWSEM_WR_NONSPINNABLE标记

b) 在rwsem_down_read_slowpath中,当一个reader因申请不到锁被挂入wait_list后,发现临界区没人了

 

清除RWSEM_WR_NONSPINNABLE标记后,后续的writer再申请锁时,会先尝试自旋,而不是直接进入D状态。那么既然临界区没人了,为什么只清除RWSEM_WR_NONSPINNABLE而不清除RWSEM_RD_NONSPINNABLE呢?由下面的注释可知,所有的reader都退出临界区之后,说明这个rwsem可能已经被reader持有了很长一段时间,这时候只将RWSEM_WR_NONSPINNABLE清除是为了确保优先让writer获取到锁,因为这时候后来的reader可能会直接进D状态,而后来的writer会尝试自旋拿锁

/*

* Clear the owner's RWSEM_WR_NONSPINNABLE bit if it is set. This should

* only be called when the reader count reaches 0.

*

* This give writers better chance to acquire the rwsem first before

* readers when the rwsem was being held by readers for a relatively long

* period of time. Race can happen that an optimistic spinner may have

* just stolen the rwsem and set the owner, but just clearing the

* RWSEM_WR_NONSPINNABLE bit will do no harm anyway.

*/

static inline void clear_wr_nonspinnable(struct rw_semaphore *sem)

{

    //如果确实设置了该标记位,则清除掉

    if (rwsem_test_oflags(sem, RWSEM_WR_NONSPINNABLE))

        atomic_long_andnot(RWSEM_WR_NONSPINNABLE, &sem->owner);

}

 

6.7.3 rwsem_test_oflags - 检查owner的某个标记位是否被设置

/*

* Test the flags in the owner field.

*/

static inline bool rwsem_test_oflags(struct rw_semaphore *sem, long flags)

{

    return atomic_long_read(&sem->owner) & flags;

}

 

七、写操作

申请锁的人是想以写的身份进入临界区

 

7.1 down_write - 拿不到锁进D状态

/*

* lock for writing

*/

void __sched down_write(struct rw_semaphore *sem)

{

    might_sleep();

    rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);

    LOCK_CONTENDED(sem, __down_write_trylock, __down_write);

}

 

7.2 __down_write_trylock - 快速路径,无人拿锁时直接获取到锁

static inline int __down_write_trylock(struct rw_semaphore *sem)

{

    long tmp;

 

    DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);

 

    //如果当前临界区没人持锁的话,则直接获取到锁,并打上RWSEM_WRITER_LOCKED标记

    tmp = RWSEM_UNLOCKED_VALUE;

    if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp, RWSEM_WRITER_LOCKED)) {

        rwsem_set_owner(sem);

        return true;

    }

    return false;

}

 

7.2.1 rwsem_set_owner - writer拿到锁时将current设置为owner

/*

* All writes to owner are protected by WRITE_ONCE() to make sure that

* store tearing can't happen as optimistic spinners may read and use

* the owner value concurrently without lock. Read from owner, however,

* may not need READ_ONCE() as long as the pointer value is only used

* for comparison and isn't being dereferenced.

*/

static inline void rwsem_set_owner(struct rw_semaphore *sem)

{

    //该函数一定是writer进入临界区的时候才会调用,需要注意的是,该函数在设置

    //owner的同时,还将下面三个标记位清空了:

    //a) RWSEM_READER_OWNED : 这个很好理解,因为当前是writer进入临界区

    //b) RWSEM_RD_NONSPINNABLE : 后来的reader有可以通过自旋的方式去拿锁了

    //c) RWSEM_WR_NONSPINNABLE : 后来的writer有可以通过自旋的方式去拿锁了

    atomic_long_set(&sem->owner, (long)current);

}

 

7.3 __down_write

/*

* lock for writing

*/

static inline void __down_write(struct rw_semaphore *sem)

{

    long tmp = RWSEM_UNLOCKED_VALUE;

 

    //为啥在这里又试一次呢????

    if (unlikely(!atomic_long_try_cmpxchg_acquire(&sem->count, &tmp, RWSEM_WRITER_LOCKED)))

        rwsem_down_write_slowpath(sem, TASK_UNINTERRUPTIBLE);

    else

        rwsem_set_owner(sem);

}

 

7.4 rwsem_down_write_slowpath - 慢速持锁路径

/*

* Wait until we successfully acquire the write lock

*/

static struct rw_semaphore * rwsem_down_write_slowpath(

            struct rw_semaphore *sem,

            int state)                //线程挂在链表上的状态

{

    long count;

    bool disable_rspin;

 

    //0.注意,这个wstate是一个局部变量,每个任务的栈空间都有一个,

    // 也就是说该变量是针对正在申请锁的这个writer的

    //注意:

    // RWSEM_FLAG_HANDOFF和WRITER_HANDOFF的区别,前者针对整个锁,

    // 后者针对本writer,因为wstate是一个局部变量,每一个申请锁的

    // writer线程的任务栈里面都有一个wstate变量,所以WRITER_HANDOFF

    // 是针对一个指定的线程

    enum writer_wait_state wstate;

    struct rwsem_waiter waiter;

    struct rw_semaphore *ret = sem;

    DEFINE_WAKE_Q(wake_q);

 

    /* do optimistic spinning and steal lock if possible */

    //1.尝试通过乐观自旋的方式去拿锁

    if (rwsem_can_spin_on_owner(sem, RWSEM_WR_NONSPINNABLE) &&

     rwsem_optimistic_spin(sem, true)) {

        /* rwsem_optimistic_spin() implies ACQUIRE on success */

        return sem;

    }

 

    //2.代码走到这里,表示乐观自旋拿锁失败,下面进入睡眠逻辑

 

    /*

     * Disable reader optimistic spinning for this rwsem after

     * acquiring the write lock when the setting of the nonspinnable

     * bits are observed.

     */

    //3.记录锁的RWSEM_NONSPINNABLE标记,以便在后面恢复,为啥捏????

    disable_rspin = atomic_long_read(&sem->owner) & RWSEM_NONSPINNABLE;

 

    /*

     * Optimistic spinning failed, proceed to the slowpath

     * and block until we can acquire the sem.

     */

    //4.准备waiter结构,阻塞超过4ms后设置RWSEM_FLAG_HANDOFF

    waiter.task = current;

    waiter.type = RWSEM_WAITING_FOR_WRITE;

    waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;

 

    raw_spin_lock_irq(&sem->wait_lock);

 

    /* account for this before adding a new element to the list */

    //5.标记新插进来的writer是不是链表上的第一个

    wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;

 

    //6.插链表,注意,这里仅仅是插入链表,还没有真正的进入睡眠状态(sleep状态或D状态)

    list_add_tail(&waiter.list, &sem->wait_list);

 

    /* we're now waiting on the lock */

    //7.为了缩短在wait_list上的阻塞时间,唤醒在本writer前面的线程!

 

    // 满足该条件,表示刚插进来的writer并不是链表最前面的线程,

    // 前面还有其他线程,这时候将前面阻塞的线程唤醒,为啥捏??

    // 试想,wait_list链表上的线程的唤醒顺序一定是从前向后依次

    // 唤醒的,当前我是一个writer插在链表的末尾,一定要等到前

    // 面的线程全部被唤醒后才能有机会唤醒我,所以如果当前临界区

    // 没人或者全是reader的时候,还不如先唤醒一波线程进入临界

    // 区赶紧处理,这样我这个writer也可以少等一会,缩短阻塞时间

    if (wstate == WRITER_NOT_FIRST) {

        count = atomic_long_read(&sem->count);

 

        /*

         * If there were already threads queued before us and:

         * 1) there are no active locks, wake the front

         * queued process(es) as the handoff bit might be set.

         * 2) there are no active writers and some readers, the lock

         * must be read owned; so we try to wake any read lock

         * waiters that were queued ahead of us.

         */

        //7.1 对于rwsem来说,如果临界区是writer时,则其他任何

        // 线程都不能进入临界区,所以此时无法唤醒任何线程

        if (count & RWSEM_WRITER_MASK)

            goto wait;

 

        //7.2 代码走到这里,说明临界区要么没人,要么是reader在临界区,

        // 但是不管是哪种情况,都可以唤醒一波线程

        // a) 如果临界区是reader的话,则唤醒前面的reader

        // b) 如果临界区没人的话,则根据链表最前面的线程是reader

        // 还是writer决定唤醒谁

        rwsem_mark_wake(sem,

                (count & RWSEM_READER_MASK) ? RWSEM_WAKE_READERS : RWSEM_WAKE_ANY,

                &wake_q);

 

        if (!wake_q_empty(&wake_q)) {

            /*

             * We want to minimize wait_lock hold time especially

             * when a large number of readers are to be woken up.

             */

            raw_spin_unlock_irq(&sem->wait_lock);

            wake_up_q(&wake_q);

            wake_q_init(&wake_q);    /* Used again, reinit */

            raw_spin_lock_irq(&sem->wait_lock);

        }

    } else {

        //7.3 代码走到这里,表示在这个writer插链表之前,链表上是空的,

        // writer插入链表后,自己就是链表最前面的线程了,也就不需要

        // 再唤醒其他人,并且因为链表上第一次有了waiter,此时设置

        // RWSEM_FLAG_WAITERS标记

        atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);

    }

 

wait:

    /* wait until we successfully acquire the lock */

    //8.下面进入阻塞状态(sleep状态或D状态),直到真正的拿到锁才会

    // 退出下面的循环。需要注意的是,下面是两个嵌套的循环,外循环只

    // 有在真正的拿到锁才退出

    set_current_state(state);

    for (;;) {

        //9.唤醒后尝试获取锁,成功拿锁后退出外循环

        if (rwsem_try_write_lock(sem, wstate)) {

            /* rwsem_try_write_lock() implies ACQUIRE on success */

            break;

        }

 

        raw_spin_unlock_irq(&sem->wait_lock);

 

        /*

         * After setting the handoff bit and failing to acquire

         * the lock, attempt to spin on owner to accelerate lock

         * transfer. If the previous owner is a on-cpu writer and it

         * has just released the lock, OWNER_NULL will be returned.

         * In this case, we attempt to acquire the lock again

         * without sleeping.

         */

        //10.由上面的注释可知,如果刚刚被唤醒的writer被打上WRITER_HANDOFF

        // 标记,则说明这个writer已经很急迫了(此时他一定是wait_list链表

        // 前面第一个线程),此时如果临界区没人,此时就不要再睡眠了,直接跳转

        // 到trylock_again再次尝试获取锁,直到这把锁已经被别人拿走,才会重

        // 新进入睡眠状态

 

        // rwsem_spin_on_owner在下面场景中会返回OWNER_NULL

        // a) 当前临界区没人,直接返回OWNER_NULL,根本不会乐观自旋

        // b) 上一个锁的owner是一个writer,这个writer退出临界区后,

        // 没有其他人拿锁进入临界区,此时也会返回OWNER_NULL

        if (wstate == WRITER_HANDOFF &&

         rwsem_spin_on_owner(sem, RWSEM_NONSPINNABLE) == OWNER_NULL)

            goto trylock_again;

 

        /* Block until there are no active lockers. */

        //11.代码走到这里,则有下面两种可能,则但是不管是哪种,都躲不了进睡眠状

        // 态了(sleep状态或D状态)

        // a) 要么本writer当前并不是很急迫(还没有设置WRITER_HANDOFF标记,

        // 可能其不是链表最前面的线程,或者阻塞等待没有超时),此时肯定要先

        // 执行链表最前面的线程啊,本writer也只能进睡眠状态了(即sleep状

        // 态或D状态,对于哪些处于sleep状态,并被kill等信号唤醒的线程,

        // 在下面for循环刚开始的时候识别到由未处理的信号就退出了)

        // b) 要么本writer虽然当前已经很急迫(WRITER_HANDOFF标记已经被设置),

        // 但是此时临界区是有人的,此时writer也只能进睡眠状态了(sleep状

        // 态或D状态)

        for (;;) {

            //11.1 在下一次唤醒的时候,首先检查是否有挂起的信号等待处理,

            // 如果有挂起的信号等待处理,则跳转到out_nolock,此时就

            // 拿锁失败了,例如kill等信号是可以唤醒这些睡眠的writer

            // 的,当然,这里说的睡眠状态的writer是指sleep状态,对

            // 于D状态,是无法响应信号的

            if (signal_pending_state(state, current))

                goto out_nolock;

 

            //11.2 代码走到这里,表示没有需要处理的信号,继续进睡眠状态(sleep

            // 状态或D状态),只有下面两种情况才会唤醒这个writer,才会从

            // schedule中退出:

            // a) 被kill等信号唤醒(只针对sleep状态的writer)

            // b) 临界区没人了,退出临界区时被唤醒wait_list上阻塞的线程

            schedule();

            lockevent_inc(rwsem_sleep_writer);

 

            //11.3 被唤醒后重新设置状态睡眠状态(sleep状态),因为此时还不一定

            // 能获取到锁,防止其被调度执行

            set_current_state(state);

 

            /*

             * If HANDOFF bit is set, unconditionally do

             * a trylock.

             */

            //11.4 如果设置了WRITER_HANDOFF标记,则表示当前这个writer

            // 已经是链表最前面的线程,并且已经很急迫了,此时无条件的

            // 退出内循环,在外循环中重新尝试拿锁

            //注意:

            // 处于sleep状态的writer被kill唤醒,在这里是有可能拿

            // 到锁的,拿到锁访问完临界区,在下一个退货用户空间的时候

            // 处理pending的型号,这时候才真正的被杀死

            if (wstate == WRITER_HANDOFF)

                break;

 

            //11.5 本writer睡了一段时间后被唤醒,wait_list前面可能有线程进

            // 入临界区,本writer在wait_list链表上的位置可能已经发生了

            // 变化,此时重新判断本writer是不是链表最前面的线程,如果是,

            // 则设置WRITER_FIRST标记

            //注意:

            // 这里你可以有这样的疑惑,既然这个writer都被唤醒了,那么他怎

            // 么可能不是wait_list链表最前面的线程呢?对于writer的唤醒不

            // 是严格的从先往后的顺序唤醒的吗?

            // 我们在上面讲过,唤醒有两种情况:

            // a) 如果是因为临界区空了,被退出临界区的线程唤醒的,那么这个

            // writer必定是wait_list链表前面的线程

            // b) 但是如果这个writer是被kill等异常信号唤醒的,那么他就

            // 不一定是链表最前面的线程了哦

            if ((wstate == WRITER_NOT_FIRST) &&

             (rwsem_first_waiter(sem) == &waiter))

                wstate = WRITER_FIRST;

 

            //11.6 如果当前临界区没人了,则退出内循环,尝试在外循环中拿锁

            // 这里你可能有这样的疑问:老子都被唤醒了,临界区可能有人

            // 那为啥唤醒我,既然唤醒我必然是没人啊!!!实际上根据上面

            // 吗?的分析,唤醒由两种可能:

            // a) 被kill信号唤醒,此时无法确定临界区是否有人。

            // 如果此时临界区有人则继续向下进入内存换,被信号唤醒的

            // 线程会在下一次内循环中处理kill信号;但是只要临界区

            // 没人的话,就退出内循环,在外循环中尝试拿锁,所以kill

            // 的线程还是有可能拿到锁的(当然是哪些处于sleep状态的

            // 线程,D状态时不会响应信号的)

            // b) 被退出临界区的线程唤醒,此时临界区实际上也不一定是空的。

            // 因为在我们刚被唤醒时,可能出现锁被一个自旋的申请者拿走了,

            // 也就是说锁被"偷"了,这时候临界区也不是空的

            count = atomic_long_read(&sem->count);

            if (!(count & RWSEM_LOCK_MASK))

                break;

 

            /*

             * The setting of the handoff bit is deferred

             * until rwsem_try_write_lock() is called.

             */

            //11.7 代码走到这里,说明临界区里面还有人

            // 由上面的分析可知,锁被"偷"了可能会导致我这个writer长时间

            // 获取不到锁,这不公平,所以,当我们阻塞的时间超过4ms时,会设

            // 置WRITER_HANDOFF标记,(在rwsem_try_write_lock中会根据

            // WRITER_HANDOFF标记对锁打上RWSEM_FLAG_HANDOFF标记),禁止其

            // 他线程通过自旋的方式拿锁,也就是防止锁被"偷"了

            // 需要注意的是,如果当前阻塞的线程是rt线程,为了不让rt线程阻

            // 塞太长时间,这里也设置WRITER_HANDOFF标记,而不管是否会超时

            if ((wstate == WRITER_FIRST) && (rt_task(current) ||

             time_after(jiffies, waiter.timeout))) {

                wstate = WRITER_HANDOFF;

                lockevent_inc(rwsem_wlock_handoff);

                break;

            }

        }

trylock_again:

        raw_spin_lock_irq(&sem->wait_lock);

    }

 

    //12.代码走到这里表示writer已经成功拿到锁,下面设置状态,并将其从wait_list上摘下来

    __set_current_state(TASK_RUNNING);

    list_del(&waiter.list);

 

    //13.如果这个writer在阻塞之前,这把锁被打上了nospin标记,则在拿到锁之后恢复,为啥捏

    rwsem_disable_reader_optspin(sem, disable_rspin);

    raw_spin_unlock_irq(&sem->wait_lock);

    lockevent_inc(rwsem_wlock);

 

    return ret;

 

out_nolock:

    //14.代码走到这里,一定是收到了kill等信号退出了,拿锁失败

    __set_current_state(TASK_RUNNING);

    raw_spin_lock_irq(&sem->wait_lock);

    list_del(&waiter.list);

 

    //14.1 取消RWSEM_FLAG_HANDOFF标记,

    // 只要这个线程被设置了WRITER_HANDOFF,那么其一定是链表最前面的线程了,

    // 此时如果他被kill掉的话,不应该把RWSEM_FLAG_HANDOFF标记继续保留着,

    // 因为RWSEM_FLAG_HANDOFF是针对全局的(而WRITER_HANDOFF是针对当前这

    // 个writer的)

    if (unlikely(wstate == WRITER_HANDOFF))

        atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count);

 

    //14.2 如果链表上为空了,则清除将RWSEM_FLAG_WAITERS标记清零,否则,继续唤醒一波

    // 这里唤醒链表上的其他线程的原因和上面一样,都是为了使wait_list上的线程阻塞

    // 的时间短一些

    if (list_empty(&sem->wait_list))

        atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);

    else

        rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);

    raw_spin_unlock_irq(&sem->wait_lock);

    wake_up_q(&wake_q);

    lockevent_inc(rwsem_wlock_fail);

 

    return ERR_PTR(-EINTR);

}

 

7.4.1 writer_wait_state - 标记正在申请writer锁的线程的状态

a) WRITER_NOT_FIRST: writer不是wait_list链表最前面的线程

b) WRITER_FIRST: writer是wait_list链表最前面的线程

c) WRITER_HANDOFF: writer不是wait_list链表最前面的线程,并且handoff

enum writer_wait_state {

    WRITER_NOT_FIRST,    /* Writer is not first in wait list */

    WRITER_FIRST,        /* Writer is first in wait list */

    WRITER_HANDOFF        /* Writer is first & handoff needed */

};

 

7.4.2 rwsem_try_write_lock - 尝试获取锁或者设置RWSEM_FLAG_HANDOFF标记

本函数完成的操作:要么成功拿到锁,要么设置RWSEM_FLAG_HANDOFF标记。

锁的状态和线程的状态共有下面6种场景,下面依次分析这6种场景完成的操作

场景

锁状态

线程状态

1

has_handoff

WRITER_NOT_FIRST

2

has_handoff

WRITER_FIRST

3

has_handoff

WRITER_HANDOFF

4

!has_handoff

WRITER_NOT_FIRST

5

!has_handoff

WRITER_FIRST

6

!has_handoff

WRITER_HANDOFF

 

函数实现如下:

/*

* This function must be called with the sem->wait_lock held to prevent

* race conditions between checking the rwsem wait list and setting the

* sem->count accordingly.

*

* If wstate is WRITER_HANDOFF, it will make sure that either the handoff

* bit is set or the lock is acquired with handoff bit cleared.

*/

static inline bool rwsem_try_write_lock(

            struct rw_semaphore *sem,

            enum writer_wait_state wstate)        //记录当前正在申请锁的线程的状态

{

    long count, new;

 

    lockdep_assert_held(&sem->wait_lock);

 

    count = atomic_long_read(&sem->count);

    do {

        //1.记录这把锁是否设置了RWSEM_FLAG_HANDOFF标记

        bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);

 

        //2.下面两个条件对应上面表格中的场景1

        // a) has_handoff: 禁止"偷锁",wait_list链表最前面的线程很急迫

        // b) WRITER_NOT_FIRST: 当前这个writer并不是链表最前面的的线程

 

        // 因为本函数的作用是:要么拿到锁,要么设置RWSEM_FLAG_HANDOFF标记

        // 但是现在RWSEM_FLAG_HANDOFF标记已经被别人抢先设置了,并且当前本

        // writer也不是链表最前面的线程,所以这里的RWSEM_FLAG_HANDOFF标记

        // 对本writer是无效的。这种场景啥也干不了,直接返回

        if (has_handoff && wstate == WRITER_NOT_FIRST)

            return false;

 

        new = count;

 

        //3.上面表格中场景2,3,4,5,6会走到这里

        if (count & RWSEM_LOCK_MASK) {

            //3.1 进入该分支表示临界区有人,此时无论如何这个writer都不能拿

            // 到锁,顶多是设置RWSEM_FLAG_HANDOFF标记就退出了,以便在

            // 下一次拿锁

 

            // 下面if条件对应上表中场景2,3,4,5,直接返回

            // 对于场景2,3: RWSEM_FLAG_HANDOFF标记已经被设置上了,无需

            // 重复设置,并且因为此时这个writer是链表最前

            // 面的线程,下一次锁交接的时候就能获取锁;

            // 对于场景4: 因为此时本writer并不是wait_list链表最前面的

            // 线程,此时即使是设置了RWSEM_FLAG_HANDOFF标记

            // 对自己也是无效的,所以也不需要设置

            // 对于场景5: 虽然锁的RWSEM_FLAG_HANDOFF标记没有被设置上,

            // 并且此时本writer也已经是wait_list链表最前面

            // 的线程了,但是这个writer阻塞的时间还不够,还不

            // 够紧急,没有被打上WRITER_HANDOFF标记,此时也

            // 不用设置RWSEM_FLAG_HANDOFF标记

            if (has_handoff || (wstate != WRITER_HANDOFF))

                return false;

 

            //3.2 只有上面场景6才会走到这里

            // 此时锁没有被标记RWSEM_FLAG_HANDOFF,并且这个waiter已经是

            // wait_list链表最前面的线程,并且已经很紧急了。已经被打上了

            // WRITER_HANDOFF标记,这时候对这把锁标记RWSEM_FLAG_HANDOFF

            // 后,在下一个锁交换的时候就能够拿到锁

            new |= RWSEM_FLAG_HANDOFF;

        } else {

            //3.3 进入该分支,表示当前临界区已经没人了,此时任何线程都可以通过

            // 原子操作公平的竞争这把锁,而不管其在链表中的位置,即使它不是

            // 链表最前面的线程

 

            // 上面表格中场景2,3,4,5,6都能走到这,但是需要注意的是,对于场

            // 景2,3表示锁已经被打上了RWSEM_WRITER_LOCKED标记,并且本writer

            // 就是wait_list链表最前面的线程,下一个锁交换时,一定会将锁交

            // 给本writer

 

            // a) writer即将进入临界区,打上RWSEM_WRITER_LOCKED标记

            // b) 如果已经设置了RWSEM_FLAG_HANDOFF标记,则清除,未设置的话

            // 则本身就是0

            new |= RWSEM_WRITER_LOCKED;

            new &= ~RWSEM_FLAG_HANDOFF;

 

            //3.4 如果链表上只有本writer一个线程在等锁了,那么成功

            // 拿锁后将RWSEM_FLAG_WAITERS清除

            if (list_is_singular(&sem->wait_list))

                new &= ~RWSEM_FLAG_WAITERS;

        }

 

    //4.进行xchg操作

    // a) 如果sem->count和count不相等,则说明锁的状态已经发生变化,此时根本就不

    // 会进行xchg操作,原子操作返回false,循环继续,继续下一次xchg

    // b) 如果sem->count和count相等,则进行xchg操作,但是如果在原子操作期间,

    // 锁可能被其他cpu上的其他线程拿走了,锁的状态又发生了变化,则原子操作也返

    // 回false,循环继续,继续下一次xchg

    // c) 如果sem->count和count相等,则进行xchg操作,如果成功完成xchg操作,

    // 则原子操作返回true,循环结束

    // 需要注意的是,这里的count变量在每次执行完原子操作后(不管原子操作是否成功),

    // 都会被重新赋值的

    } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));

 

    /*

     * We have either acquired the lock with handoff bit cleared or

     * set the handoff bit.

     */

    //5.代码走到这里,仅表示上面xchg已经成功了,但是这并不表示已经成功拿到锁了,

    // 由上面分析可知,如果临界区有人,这个writer是拿不到锁的,顶多是对锁设置

    // RWSEM_FLAG_HANDOFF标记。所以代码走到这里有下面两种情况

    // a) 已经拿到了锁,并且将RWSEM_FLAG_HANDOFF标记清除

    // b) 或者根本没有拿到锁,但是已经设置了RWSEM_FLAG_HANDOFF标记

    if (new & RWSEM_FLAG_HANDOFF)

        return false;

 

    //6.代码走到这表示成功拿到锁,设置owner后退出

    rwsem_set_owner(sem);

    return true;

}

 

7.5 up_write

/*

* release a write lock

*/

void up_write(struct rw_semaphore *sem)

{

    rwsem_release(&sem->dep_map, _RET_IP_);

    __up_write(sem);

}

 

7.6 __up_write

/*

* unlock after writing

*/

static inline void __up_write(struct rw_semaphore *sem)

{

    long tmp;

 

    DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);

    /*

     * sem->owner may differ from current if the ownership is transferred

     * to an anonymous writer by setting the RWSEM_NONSPINNABLE bits.

     */

    DEBUG_RWSEMS_WARN_ON((rwsem_owner(sem) != current) &&

             !rwsem_test_oflags(sem, RWSEM_NONSPINNABLE), sem);

 

    //1.因为rwsem只允许一个writer在临界区,所以当这个writer退出临界区时,会直接

    // 将sem->owner设置为0,不仅清除了task_struct字段,还清除了nonspin标记

    rwsem_clear_owner(sem);

 

    //2.count字段将RWSEM_WRITER_LOCKED标记清零,表示当前不是write在临界区

    tmp = atomic_long_fetch_add_release(-RWSEM_WRITER_LOCKED, &sem->count);

 

    //3.如果链表中还有其他线程在等这把锁,则唤醒链表最前面的线程

    if (unlikely(tmp & RWSEM_FLAG_WAITERS))

        rwsem_wake(sem, tmp);

}

 

7.6.1 rwsem_clear_owner - writer退出临界区时清除owner,直接设置为0

因为rwsem只允许一个writer退出临界区,所以当一个writer退出临界区时,临界区一定空了

static inline void rwsem_clear_owner(struct rw_semaphore *sem)

{

    //注意:这里直接设置为0,表示nonspinable标记也清零了,后来申请锁的人

    // 不管是reader还是writer,都会重新尝试自旋

    atomic_long_set(&sem->owner, 0);

}

 

八、其他扩展接口

8.1 down_read_interruptible - 拿不到锁进sleep

int __sched down_read_interruptible(struct rw_semaphore *sem)

{

    might_sleep();

    rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);

 

    if (LOCK_CONTENDED_RETURN(sem, __down_read_trylock, __down_read_interruptible)) {

        rwsem_release(&sem->dep_map, _RET_IP_);

        return -EINTR;

    }

 

    return 0;

}

 

8.2 down_read_killable - 拿不到锁将自己kill掉

int __sched down_read_killable(struct rw_semaphore *sem)

{

    might_sleep();

    rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);

 

    if (LOCK_CONTENDED_RETURN(sem, __down_read_trylock, __down_read_killable)) {

        rwsem_release(&sem->dep_map, _RET_IP_);

        return -EINTR;

    }

 

    return 0;

}

 

8.3 down_read_trylock - 拿不到锁后也不睡眠

/*

* trylock for reading -- returns 1 if successful, 0 if contention

*/

int down_read_trylock(struct rw_semaphore *sem)

{

    int ret = __down_read_trylock(sem);

 

    if (ret == 1)

        rwsem_acquire_read(&sem->dep_map, 0, 1, _RET_IP_);

    return ret;

}

 

8.4 down_write_killable - 拿不到锁后将自己kill掉

/*

* lock for writing

*/

int __sched down_write_killable(struct rw_semaphore *sem)

{

    might_sleep();

    rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);

 

    if (LOCK_CONTENDED_RETURN(sem, __down_write_trylock,

                 __down_write_killable)) {

        rwsem_release(&sem->dep_map, _RET_IP_);

        return -EINTR;

    }

 

    return 0;

}

 

8.5 down_write_trylock - 拿不到锁后也不睡眠

/*

* trylock for writing -- returns 1 if successful, 0 if contention

*/

int down_write_trylock(struct rw_semaphore *sem)

{

    int ret = __down_write_trylock(sem);

 

    if (ret == 1)

        rwsem_acquire(&sem->dep_map, 0, 1, _RET_IP_);

 

    return ret;

}

 

九、要点总结

9.1 当reader在临界区时,owner的task_struct字段指向的线程并不一定在临界区

因为rwsem是允许多个reader同时进入临界区的,当有reader退出临界区时,会调用rwsem_clear_reader_owned函数将owner字段清空。但是当没有开启CONFIG_DEBUG_RWSEMS宏时,该函数啥也不干。也就是说可能存在这种情况:即使owenr指向的task_struct字段已经退出临界区了,也不会将这个task_struct字段清除或重新赋值(我们假设没有新的reader进入临界区)。极端情况下,当所有的reader都退出临界区后,这个owner还是指向哪个曾经的reader,所以在我们分析dump文件时,如果临界区是reader时,owner的task_struct字段所指向的线程并不一定在临界区

那么什么时候清除这个task_struct结构呢?难道是当writer进入并退出临界区的时候

 

9.2 当reader在临界区时,后来的reader能否直接进入临界区?

当reader在临界区时,后来的reader不一定能够直接进入临界区,如果满足下面任一情况,后来的这个reader一定会进入慢速路径(乐观自旋或者睡眠状态),详见__down_read_trylock和rwsem_read_trylock和RWSEM_READ_FAILED_MASK宏的实现

a) RWSEM_FLAG_WAITERS: 有人阻塞在wait_list上

b) RWSEM_FLAG_HANDOFF: 锁已经被打上HANDOFF标记,禁止自旋偷锁

c) RWSEM_FLAG_READFAIL: 临界区的reader已经撑爆了

 

但是在慢速路径中,如果是reader在临界区,后来的reader是有可能"偷"到锁进入临界区的,除非存在下面情况导致偷锁失败,详见rwsem_try_read_lock_unqueued实现

a) wait_list最前面的线程已经很紧急,锁已经被打上RWSEM_FLAG_HANDOFF

 

总结:如果临界区的reader没有被撑爆,并且锁没有被置为RWSEM_FLAG_HANDOFF标记,后来的reader可以直接进入临界区的

 

9.3 什么时候禁止自旋(设置nonspinable标记)

下面两个场景:

a) 当临界区的reader已经撑爆时,会同时设置RWSEM_RD_NONSPINNABLE和RWSEM_WR_NONSPINNABLE标记,详见rwsem_read_trylock

b) 当一个writer处于自旋状态等临界区的reader,并且已经超时了,会同时设置RWSEM_RD_NONSPINNABLE和RWSEM_WR_NONSPINNABLE标记,详见rwsem_optimistic_spin实现。(换句话说,只要wait_list上有writer,后来的申请者,不管是reader还是writer,都一定不能自旋)

 

9.4 什么时候允许自旋(清除nonspinable标记)

a) 因为rwsem一次只允许一个writer进入临界区,所以当一个writer退出临界区时,临界区一定空了,此时会同时清除RWSEM_WR_NONSPINNABLE和RWSEM_RD_NONSPINNABLE标记,后面再有reader会在writer来申请锁,都可以自旋等锁了,详见__up_writer -> rwsem_clear_owner

b) 当writer进入临界区是,调用rwsem_set_owner,将nonspinable清除,允许reader和writer重新通过自旋拿锁

 

9.5 什么时候清除RWSEM_WR_NONSPINNABLE,允许writer自旋

a) 每当有新的reader进入临界区的时候,都会调用rwsem_set_reader_owned函数设置锁的owner,但是在rwsem_set_reader_owned接口的实现中会将owner的RWSEM_WR_NONSPINNABLE位被清零,也就是说,只要源源不断的有新的reader进入临界区,RWSEM_WR_NONSPINNABLE位就会一直处于0的状态,也就是说后来的writer都可以尝试自旋等锁了

b) 在rwsem_down_read_slowpath中,当判断当前没人在临界区的时候,将清除RWSEM_WR_NONSPINNABLE标记,表示writer可以自旋了,详见rwsem_down_read_slowpath -> clear_wr_nonspinnable

 

9.6 什么时候设置handoff

wait_list最前面的线程已经很紧急,锁已经被打上RWSEM_FLAG_HANDOFF

 

十、参考文档

https://blog.csdn.net/wangpengqi/article/details/8043340

https://www.dazhuanlan.com/yingji_224/topics/1860486

http://www.wowotech.net/kernel_synchronization/504.html

https://www.cnblogs.com/hellokitty2/p/16343272.html

https://blog.csdn.net/wangquan1992/article/details/124863014

https://blog.csdn.net/jianchi88/article/details/123638915

https://zhuanlan.zhihu.com/p/71156910

https://www.jianshu.com/p/a9817c353ef5

https://blog.codingnow.com/2014/12/skynet_spinlock.html

 

 

 

关注公众号不迷路:DumpStack

扫码加关注

本作品采用 知识共享署名-非商业性使用 4.0 国际许可协议 进行许可
标签: 暂无
最后更新:2022年9月18日

tmmdh

这个人很懒,什么都没留下

打赏 点赞
< 上一篇
下一篇 >

文章评论

  • Jemmy

    非常不错的文章。网站可以考虑支持下 RSS 订阅嘛

    2023年12月2日
    回复
  • feifei

    CONFIG_LOCK_STAT 宏都是默认关的, LOCK_CONTENDED 相当于只执行了 down_read(sem) down_write(sem) , 没有在LOCK_CONTENDED 里try

    2024年11月28日
    回复
  • 取消回复

    COPYRIGHT © 2022 dumpstack.cn. ALL RIGHTS RESERVED.

    浙ICP备2022000966号