0%

mips linux spinlock

spinlock结构

1
2
3
4
5
6
7
8
9
10
11
arch_spinlock_t
typedef union {
unsigned int lock;
// union结构, 总共占用一个word
struct {
// 正在服务的号 bit 0-15
unsigned short serving_now;
// 等待取的号 bit 16-31
unsigned short ticket;
} h;
} arch_spinlock_t;

api

arch_spin_is_locked

1
2
3
4
5
6
static inline int arch_spin_is_locked(arch_spinlock_t *lock)
{
unsigned int counters = ACCESS_ONCE(lock->lock);
// 取高16位和低16位做异或运算, 如果其他的 thread 已经获取了该 lock,那么返回非零值,即ticket 和 serving_now不相同时, 返回非0, 否则返回 0, 返回0时表明没上锁或者是本thread获取了锁.
return ((counters >> 16) ^ counters) & 0xffff;
}

自旋锁的变体

接口 API 的类型 spinlock 中的定义 raw_spinlock 的定义
定义 spin lock 并初始化 DEFINE_SPINLOCK DEFINE_RAW_SPINLOCK
动态初始化 spin lock spin_lock_init raw_spin_lock_init
获取指定的 spin lock spin_lock raw_spin_lock
获取指定的 spin lock 同时 disable 本 CPU 中断 spin_lock_irq raw_spin_lock_irq
保存本 CPU 当前的 irq 状态,disable 本 CPU 中断并获取指定的 spin lock spin_lock_irqsave raw_spin_lock_irqsave
获取指定的 spin lock 同时 disable 本 CPU 的 bottom half spin_lock_bh raw_spin_lock_bh
释放指定的 spin lock spin_unlock raw_spin_unlock
释放指定的 spin lock 同时 enable 本 CPU 中断 spin_unlock_irq raw_spin_unock_irq
释放指定的 spin lock 同时恢复本 CPU 的中断状态 spin_unlock_irqstore raw_spin_unlock_irqstore
获取指定的 spin lock 同时 enable 本 CPU 的 bottom half spin_unlock_bh raw_spin_unlock_bh
尝试去获取 spin lock,如果失败,不会 spin,而是返回非零值 spin_trylock raw_spin_trylock
判断 spin lock 是否是 locked,如果其他的 thread 已经获取了该 lock,那么返回非零值,否则返回 0 spin_is_locked raw_spin_is_locked

mips 体系加锁分析

arch_spin_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
int my_ticket;
int tmp;
int inc = 0x10000;
__asm__ __volatile__ (
" .set push # arch_spin_lock \n"
" .set noreorder \n"
" \n"
"1: ll %[ticket], %[ticket_ptr] \n" # ticket_ptr(lock->lock), 取出lock->lock 到 ticket(tmp) ll 原子操作, 取出一个word
" addu %[my_ticket], %[ticket], %[inc] \n" # my_ticket <- ticket(tmp) + 0x10000 这里应该是ticket + 1的意思, ticket自增,即当前的调用取的号+1,
" sc %[my_ticket], %[ticket_ptr] \n" # lock->lock <- my_ticket # 把ticket + 1后存回到 lock->ticket, 这个代表更新当前发的所有号, 来了一个新调用, 说明这个调用要抢资源, 给它分一个号
" beqz %[my_ticket], 1b \n" # sc失败, 返回结果0, 跳转回1b, 说明有其他cpu或中断在该指令执行前偷偷调用sc 把 tick_ptr给修改了, 这时sc会失败, 返回1b 重新来这个操作
" srl %[my_ticket], %[ticket], 16 \n" # my_ticket <- ticket(tmp) >> 16 my_ticket <- lock->ticket 换算ticket + 1
" andi %[ticket], %[ticket], 0xffff \n" # ticket(tmp) = ticket(tmp) & 0xffff tmp<- lock->serving_now
" bne %[ticket], %[my_ticket], 4f \n" # lock->ticket 与 lock->serving_now 不相等, 调转到 4f, 说明不是本上下文上的锁, 需要等待, 如相等,往下走 !!!!
" subu %[ticket], %[my_ticket], %[ticket] \n" # tmp <- lock->serving_now - lock->ticket, 相等的, tmp变成0了, 因为在延迟槽里,不相等的话 tmp <- lock->serving_now - lock->ticket
"2: \n"
" .subsection 2 \n" # # 程序后半段, !!!! 拼在函数jr ra 后面,正常情况下走不到这里, 注意跳转前 subu %[ticket], %[my_ticket], %[ticket] 在延迟槽里, 总是会执行
"4: andi %[ticket], %[ticket], 0x1fff \n" # lock->ticket 与 lock->serving_now 不相等时,走到这, tmp <- tmp & 0x1fff 取(lock->serving_now)0-12位
" sll %[ticket], 5 \n" # tmp << 5
" \n"
"6: bnez %[ticket], 6b \n" # tmp 不为0 , 循环tmp-1, 直到tmp 变成0
" subu %[ticket], 1 \n" # 循环体, 每次tmp-1
" \n"
" lhu %[ticket], %[serving_now_ptr] \n" # tmp <- lock->h.serving_now, 这个地方会从内存中更新serving_now, 别的调用方可能会放锁,导致serving_now变化, 因此这里需要更新
" beq %[ticket], %[my_ticket], 2b \n" # my_ticket 与 lock->serving_now 相等,则跳到2b, 调到2b后,函数就可以马上出去了
" subu %[ticket], %[my_ticket], %[ticket] \n" # tmp <- lock->h.serving_now - lock->ticket
" b 4b \n" # 回到4b, 重新等
" subu %[ticket], %[ticket], 1 \n" # tmp - 1
" .previous \n"
" .set pop \n"
: [ticket_ptr] "+m" (lock->lock),
[serving_now_ptr] "+m" (lock->h.serving_now),
[ticket] "=&r" (tmp),
[my_ticket] "=&r" (my_ticket)
: [inc] "r" (inc));
smp_llsc_mb(); // 调用sync. 内存屏障, 保证cache同步
}

subsection的作用是分割线, subsection前面的部分为函数本应该在的位置, subsection后面的部分直接拼接到了函数尾部.
比如函数调用结束后, 一般会调用jr ra 返回, 而subsection后面的部分直接拼在了这条jr ra后面, 不破坏函数的执行, 只有b语句 标签[no]f 的时候才会跳到这里,否则程序一条一条往下运行, 永远不会跑到这里

这个函数做了两件事:

  1. 每来一个调用, 就给它一个号, 号每次+1.
  2. 判断serving_now 与 ticket是否相等, 相等的话, 就退出函数, 不相等,就让这个调用方在这等着, 等ticket 与 serving_now 相等.

这里两条原子性操作 ll``sc保证读取与写入lock(共享内存)时, 读操作和写操作都是原子性的, 函数最后调用了sync, 保证了cache同步.
sync的作用可以参考, 简单概括就是 约束执行CPU上的读写操作的顺序,CPU一定是完成read memory barrier之前的读写操作之后,才开始执行read memory barrier之后的读写操作。
内存屏障(Memory Barrier)究竟是个什么鬼? - 知乎
那什么时候serving_now变化的呢. 就需要看下释放锁的函数

arch_spin_unlock

1
2
3
4
5
6
7
static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
unsigned int serving_now = lock->h.serving_now + 1; // serving_now + 1
wmb(); #调用 __syncw()
lock->h.serving_now = (unsigned short)serving_now; // 存回lock->serving_now
nudge_writes(); // 调用 syncw
}

小结

多个函数片段拿锁时, 会陷入到arch_spin_lock的等待中, 每个调用都给分了一个ticket, 在subsection 4中, 每个调用持有的是my_ticket, 即调用arch_spin_lock时分给这个调用过程的ticket号, 这个号属于该次调用.
在这个循环中, 最终延时了一定数目的时钟周期后, 都要判断my_ticket与内存中的serving_now是否相等, 相等则退出arch_spin_lock的等待.
arch_spin_unlock放锁过程, 会把serving_now++, 存回到共享内存中.
每个因调用arch_spin_lock陷入到循环中的调用体, 等到自己的my_ticket与serving_now相等时, 解除等待状态.

spin_lock的主体是调用过程, 而不是本线程, 或本函数, 不支持递归拿锁或反复拿锁

取号机制可以避免有的调用过程被饿死, 防止多个调用过程之间的无序竞争.

arch_spin_trylock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static inline unsigned int arch_spin_trylock(arch_spinlock_t *lock)
{
int tmp, tmp2, tmp3;
int inc = 0x10000;

__asm__ __volatile__ (
" .set push # arch_spin_trylock \n"
" .set noreorder \n"
" \n"
"1: ll %[ticket], %[ticket_ptr] \n" # tmp <- lock->lock
" srl %[my_ticket], %[ticket], 16 \n" # my_ticket <- lock->ticket
" andi %[now_serving], %[ticket], 0xffff \n" # now_serving <- lock->serving
" bne %[my_ticket], %[now_serving], 3f \n" # my_ticket == now_serving? (等于则继续) : 3f (不等, 跳3f)
" addu %[ticket], %[ticket], %[inc] \n" # ticket + 1
" sc %[ticket], %[ticket_ptr] \n" # 更新lock->ticket, 即lock->ticket ++
" beqz %[ticket], 1b \n" # ticket 溢出, 回到1b
" li %[ticket], 1 \n" # 函数出去, 返回ticket (tmp) 返回1, 表示能拿到锁
"2: \n"
" .subsection 2 \n"
"3: b 2b \n"
" li %[ticket], 0 \n" # 函数出去, 返回ticket(tmp) 返回0, 表示拿不到锁
" .previous \n"
" .set pop \n"
: [ticket_ptr] "+m" (lock->lock),
[ticket] "=&r" (tmp),
[my_ticket] "=&r" (tmp2),
[now_serving] "=&r" (tmp3)
: [inc] "r" (inc));
// sync
smp_llsc_mb();
return tmp;
}

尝试拿锁, 即判断my_ticket 与 now_serving 是否相等, 相等则返回1, 表示能拿到锁

读写锁

允许多个调用过程同时持同一个读锁, 但同一时刻只能有一个写锁.

arch_read_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static inline void arch_read_lock(arch_rwlock_t *rw)
{
unsigned int tmp;
do {
__asm__ __volatile__(
"1: ll %1, %2 # arch_read_lock \n" # tmp <- rw->lock
" bltz %1, 1b \n" # tmp 小于0, 循环, 即有写锁情况下 (0x80000000=-1), 阻塞
" addu %1, 1 \n" # tmp + 1
"2: sc %1, %0 \n" # 存回rw->lock, 即serving+1
: "=m" (rw->lock), "=&r" (tmp)
: "m" (rw->lock)
: "memory");
} while (!tmp); # tmp是0, 循环, 不为0, 退出循环
smp_llsc_mb();
}

有写锁时阻塞, 没有写锁, 多个读可以同时调用

arch_read_unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static inline void arch_read_unlock(arch_rwlock_t *rw)
{
unsigned int tmp;
// 同步
smp_mb__before_llsc();


do {
__asm__ __volatile__(
"1: ll %1, %2 # arch_read_unlock \n" # tmp <- rw->lock
" sub %1, 1 \n" # tmp - 1
" sc %1, %0 \n" # 存回rw->lock, 即serving-1
: "=m" (rw->lock), "=&r" (tmp)
: "m" (rw->lock)
: "memory");
} while (!tmp);

}

arch_write_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static inline void arch_write_lock(arch_rwlock_t *rw)
{
unsigned int tmp;
do {
__asm__ __volatile__(
"1: ll %1, %2 # arch_write_lock \n" # tmp <- rw->lock
" bnez %1, 1b \n" # 不为0 循环, 即有读锁或写锁时, 自旋等待
" lui %1, 0x8000 \n" # tmp <- 0x80000000
"2: sc %1, %0 \n" # rw->lock 更新为0x80000000
: "=m" (rw->lock), "=&r" (tmp)
: "m" (rw->lock)
: "memory");
} while (!tmp);
smp_llsc_mb();
}

为0时才可以持锁(有读锁或写锁时,自旋等待), 持锁后, lock被写入0x80000000. 锁上期间新的调用过程拿不到锁, 直到该调用过程把锁放掉

arch_write_unlock

1
2
3
4
5
6
7
8
9
10
static inline void arch_write_unlock(arch_rwlock_t *rw)
{
smp_mb();
__asm__ __volatile__(
" # arch_write_unlock \n"
" sw $0, %0 \n" # rw->lock更新为0
: "=m" (rw->lock)
: "m" (rw->lock)
: "memory");
}