0%

rt_thread 多核调度

概括

启用多核

1
2
3
scons --menuconfig
->rt-thread kenel
->Enable SMPe

开启后,RT_USING_SMP 被打开。

控制线程绑定到某个cpu上

线程管理
参考上面的控制线程, cmd 取 RT_THREAD_CTRL_BIND_CPU

1
rt_thread_control(&tid, RT_THREAD_CTRL_BIND_CPU, 1);

调度流程

初始化cpu资源


cpu1 起来后默认只有一个任务, 就是idle任务, 做的事情就是在等待,陷入到low standby模式

1
asm volatile ("wfe":::"memory", "cc");

ARM WFI和WFE指令

任务调度

一个任务怎么控制切换到不同的cpu core上, 以及arm整体的任务调度的机制是怎样的。

rt_thread_startup 启动线程

先从启动线程说起
线程管理

从任务切换处可以看出,查找to_thread是从pcpu维护的(优先级,任务列表)的map table中取出的最高优先级对应的任务列表中的最前面的任务, 这个过程是跟rt_schedule_insert_thread(thread)是关联的,只有往该pcpu维护的map table中插入对应的任务,后面任务才会被取出,运行在对应的pcpu上。而不是说哪个cpu执行了rt_thread_startup, 哪个cpu就执行传入的该任务
注意看_scheduler_get_highest_priority_thread的实现, 有三个table, 全局的/cpu0的/cpu1的. 其中全局的table表示该就绪列表中的任务可以跑在cpu0 和 cpu1上. cpu0/cpu1的只能跑在对应的cpu上.
调度器的实现rt_schedual, 在任务切换时, 先去全局的就绪列表上找, 如果全局就绪列表上的任务优先级高于当前cpu上的最高优先级, 则会切换到全局列表上优先级最高的任务.
这里举一个例子:
main 任务 相继启动 A 任务 B任务( 优先级 B > A > main > idle), 还有两个任务 (cpu0/cpu1 上的idle). 假如 A 任务 B 任务 bind_cpu 都是0 , 是怎么调度的?

1
2
3
4
5
6
任务切换 :
idle0 -> main( cpu0)
main (cpu0) -> A (cpu0) 叫 cpu1 --> main
idle1 (cpu1) -> main (cpu1)
main(cpu1) 叫 cpu0 --> B
A (cpu0) -> B (cpu0)

rt_scheduler_ipi_handler

在上面rt_schedule_insert_thread(thread)后, 任务被插入到bind_cpu对应的pcpu维护的任务map table中, 紧接着发送了ipi中断给到对应的cpu
rt_hw_ipi_send(RT_SCHEDULE_IPI, cpu_mask);
该中断对应的处理函数为 rt_scheduler_ipi_handler
这个函数只调用了 rt_schedule 来处理,参考上面的rt_schedule过程,说明是bind_cpu对应的cpu来进行任务调度的,正好将所插入的任务取出后,执行rt_thread_startup 传入的任务

idle任务切出-tick中断响应 delay的解析

没有其他任务的情况下, 只有idle任务在运行
默认配置每隔1ms 来一次tick, 响应tick中断

cp0 响应 IRQ_PBA8_TIMER2_3 的中断, cp1 响应 IRQ_PBA8_TIMER0_1的中断, 均为GIC中断
中断处理函数为rt_hw_timer_isr rt_hw_timer2_isr

delay流程分析

从上面流程图上可以大概了解到 在任务中调用了delay函数发生了什么
假设现在只有一个主任务和一个idle任务
主任务:

  1. 主任务中调用了delay,会将主任务附带的timer的timeout_tick 设置为当前tick + delay tick 数。
  2. 将主任务附带的timer挂入到全局rt_timer_list下
  3. suspend 主任务
  4. 调度到idle任务上

idle任务:

什么也不干

每ms的tick 响应:
响应对应cpu的tick 中断处理函数

  1. 每来一次tick, tick++
  2. –remaining_tick 这个是任务初始化时的最后一个参数, 即任务默认运行多少tick个时间片(默认一个tick 1ms)
  3. remaining tick减到0, 切换到最高优先级的任务上,此处因为只有一个主任务,主任务的优先级高于idle任务时,还是会切到主任务上。如果主任务优先级和idle任务优先级一致,则会切到idle任务上
  4. 进行timer检查, 查到delay到期的任务, 找到后,将其从rt_timer_list上删除, 将该timer对应的任务挂到ready列表里
  5. 任务调度,从ready列表中找到最高优先级的任务调度执行, 此时会调度到主任务上

任务堆栈上下文

线程创建时,可以自己指定堆栈,此时为静态堆栈。或者自动创建堆栈,堆栈的空间是malloc出来的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 使用静态堆栈初始化任务
rt_err_t rt_thread_init(struct rt_thread *thread,
const char *name,
void (*entry)(void *parameter),
void *parameter,
void *stack_start,
rt_uint32_t stack_size,
rt_uint8_t priority,
rt_uint32_t tick)
// 动态分配任务堆栈
rt_thread_t rt_thread_create(const char *name,
void (*entry)(void *parameter),
void *parameter,
rt_uint32_t stack_size,
rt_uint8_t priority,
rt_uint32_t tick)

这里暂时先讨论第一种情况,第二种无非是地址不一样, 原理都是一样的
先跟一下线程/任务创建的流程,重点看下堆栈的使用

上面的流程仅是初始化的流程, 只有entry cpsr lr parameter 是有效的, 其他的都是存的默认值(无效值)。

rt_hw_context_switch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
                rt_hw_context_switch((rt_ubase_t)&current_thread->sp, (rt_ubase_t)&to_thread->sp, to_thread);
0x60094a2c <+0>: stmfd sp!, {lr} # lr 入栈 这里的lr 是作为再切回这个任务的pc用的, 在调用rt_hw_context_switch 前会更新lr为跳rt_hw_context_switch的下一行指令.
# bl rt_hw_context_switch lr <- pc+4.
0x60094a30 <+4>: push {r0, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, lr} # r0-r12 lr 入栈
0x60094a34 <+8>: mrs r4, CPSR # r4 <- CPSR
0x60094a38 <+12>: tst lr, #1 # 判断lr的最低位是否为1 根据结果更新CPSR See if called from Thumb mode, 如果最低位为1, 说明是从thumb 指令call 过来的
0x60094a3c <+16>: orrne r4, r4, #32 # 是thumb 指令, SPSR 第5位置1,表示thumb模式
0x60094a40 <+20>: stmfd sp!, {r4} # r4 入栈
0x60094a44 <+24>: stmdb sp, {sp, lr}^ # usr_sp usr_lr 入栈
0x60094a48 <+28>: sub sp, sp, #8
0x60094a4c <+32>: str sp, [r0] # from_thread->sp <- sp , r0 是 &from_thread->sp , 即更新 from_thread->sp !!!!
0x60094a50 <+36>: ldr sp, [r1] # sp <- 【r1】 r1是&(to_thread->sp) !!!! 这里更新了sp 为 to_thread->sp
0x60094a54 <+40>: mov r0, r2 # r2 是to_thread 给到r0, r0 是rt_cpus_lock_status_restore的参数
0x60094a58 <+44>: bl 0x6001bba0 <rt_cpus_lock_status_restore>
0x60094a5c <+48>: b 0x60094a74 <rt_hw_context_switch_exit>
rt_hw_context_switch_exit
0x60094d98 <+0>: mov r0, sp # r0 <- sp
0x60094d9c <+4>: cps #18 # 切回irq模式
0x60094da0 <+8>: bl 0x600196b4 <rt_signal_check> # signal 检查,有可能改变r0 寄存器的值
0x60094da4 <+12>: cps #19 # 切回svc
0x60094da8 <+16>: mov sp, r0 # svc_sp <- r0 !!!! 这个地方把to_thread->sp 给了 svc_sp
0x60094dac <+20>: ldm sp, {sp, lr}^ # usr_sp, usr_lr <- svc_sp
0x60094db0 <+24>: add sp, sp, #8 # svc_sp <- svc_sp + 8
0x60094db4 <+28>: ldmfd sp!, {r1} # r1 <-svc_sp
0x60094db8 <+32>: msr SPSR_fsxc, r1 # 恢复SPSR svc_sp <- r8 r8 是保存栈的下沿,r8 + 8 的位置应该是存的r1, 即刚进来时的SPSR的值
0x60094dbc <+36>: ldm sp!, {r0, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, lr, pc}^ # 用户模式的 r0-r12 lr pc <- svc_sp 这个地方时满递增
# 这里缺了r13, r13 是 sp,ldm结束后更新的sp就是任务切出前的sp
# pc <- sp+60-4的位置存的数据, 给pc后,后面代码就不走了,转到pc处代码执行了, 同时因为包含pc,SPSR 被拷贝到 CPSR
# sp 满递增,回到to_thread分配堆栈的栈顶,pc为to_thread entry函数的地址, 即跳到 to_thread entry函数里
.end rt_hw_context_switch_exit

任务保存栈这里是静态的栈, 从栈顶开始依次为图中的顺序 entry lr r12-r1 r0(函数的参数)cpsr (usr_lr usr_sp) 这两个貌似没用到。 同时任务保存栈的栈顶(栈顶递减)也是该进入该任务 entry 函数后的sp。
第一次切到该任务时,只有entry cpsr lr parameter 是有效的
该任务被切出时,即切到别的任务时, 切出前,该任务角色为 from_thread, entry lr r12-r1 r0(函数的参数)cpsr 要保护起来。

delay导致的切出

这里先看第一个场景, delay函数调用后,任务被切出时,发生了什么?
这个场景最终还是会调到rt_hw_context_switch , 重点看对from_thread做了什么
从 rt_hw_context_switch 的分析可以看出,一上来就把 lr r0-r12 cpsr usr_sp usr_lr 入栈了, 这个入栈时候的sp 并不是任务保存栈的栈顶, 而是任务执行到哪了,经过数次函数调用,sp递减之后的值。但这个值还是在为任务分配的保存栈上。
在上述寄存器入栈后,from_thread->sp的值被更新成了 这个压栈的sp。
再次切到这个任务后,这个任务变成了to_thread, 通过to_thread->sp即可找到上次压栈的位置。 这个位置增加(保存r0 r12 lr lr cpsr usr_lr usr_sp) +72 后,为函数切走前的函数栈sp的位置。
通过这种方式实现任务栈的保存恢复,
任务栈只能任务自己使用,在被切出前,无论多少次函数调用,函数调用引起的压栈,都存到了这个任务栈上,函数被切走前,需要将当前的寄存器压到这个栈可用位置的尾巴上,将这个位置记下来更新到 thread->sp.
恢复任务栈时,通过thread->sp 的位置向上递增数个位置恢复出 函数切走前的寄存器, 然后重新置sp为函数切走前的sp。
thread结构体中的sp 就是一个游标, 记录了任务entry的执行情况

补充 arm相关 任务调度相关

_vector_irq

GIC 中断路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
=> 0x60094aa0 <+0>:     clrex                         # 独占访问结束时,清除cpu中本地处理器针对某块内存区域的独占访问标志
0x60094aa4 <+4>: push {r0, r1} # 寄存器入栈, r1 r0 入栈 存到sp sp+4
0x60094aa8 <+8>: cps #19 # 通过CPS指令直接修改CPSR寄存器的M[4:0],让处理器进入不同的模式, 19 svc模式 10011 !!!! 注意这个地方切换后, sp会变
0x60094aac <+12>: mov r0, sp # r0 <- sp (svc_sp)
0x60094ab0 <+16>: mov r1, lr # r1 <- lr (svc_lr)
0x60094ab4 <+20>: cps #18 # 10010 切换到 IRQ 模式 !!!! sp 变了
0x60094ab8 <+24>: sub lr, lr, #4 # lr = lr-4 流水线结构
0x60094abc <+28>: stmdb r0!, {r1, lr} # lr r1 入栈, 此时r0 就是sp
0x60094ac0 <+32>: stmdb r0!, {r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12} # r12-r2 入栈, r0 满递减, 但是sp还是保持 r0, r1入栈后的位置
0x60094ac4 <+36>: pop {r1, r2} # sp 出栈, 将刚进来时的r0,r1 赋给 r1 r2
0x60094ad4 <+52>: stmdb r0, {sp, lr}^ # usr_sp usr_lr 入栈 !!! 使用“^”后缀进行数据传送且寄存器列表不包含 PC 时,加载/存储的是用户模式的寄存器,而不是当前模式的寄存器
0x60094ad8 <+56>: sub r0, r0, #8 # 上一条 r0 没更新, 这个地方-8 相当于更新了
0x60094adc <+60>: mov r8, r0 # 上面入栈的有 r12-r2 刚进来时的 r1, r0 , SPSR usr_lr usr_sp, 将保存的栈的下沿给r8 保存栈位于 svc_sp处
0x60094ae0 <+64>: bl 0x600192dc <rt_interrupt_enter>
rt_interrupt_enter:
void rt_interrupt_enter(void)
{
level = rt_hw_interrupt_disable();
rt_interrupt_nest ++;
RT_OBJECT_HOOK_CALL(rt_interrupt_enter_hook,());
rt_hw_interrupt_enable(level);
}
.end rt_interrupt_enter
0x60094ae4 <+68>: bl 0x60023c54 <rt_hw_trap_irq>
.rt_hw_trap_irq
void rt_hw_trap_irq(void)
{
extern struct rt_irq_desc isr_table[];
int_ack = rt_hw_interrupt_get_irq(); # call arm_gic_get_active_irq #Interrupt Acknowledge Register 0xc 9:0 bit The interrupt ID.
// 获得中断号
ir = int_ack & GIC_ACK_INTID_MASK;
/* get interrupt service routine */
isr_func = isr_table[ir].handler; // 就一个中断向量处理函数表, 多cpu的话,谁注册谁处理 local 中断?看起来是,调试发现只响应对应cpu的
isr_table[ir].counter++;
if (isr_func)
{
/* Interrupt for myself. */
param = isr_table[ir].param;
/* turn to interrupt service routine */
// 进入中断处理函数
isr_func(ir, param);
}
rt_hw_interrupt_ack(int_ack);
}
.end rt_hw_trap_irq
0x60094ae8 <+72>: bl 0x60019340 <rt_interrupt_leave> # 与rt_interrupt_enter对应, 保护中断路由 通过rt_interrupt_nest++ rt_interrupt_nest--的方式表示中断嵌套
0x60094aec <+76>: cps #19 # 切回svc模式
0x60094af0 <+80>: mov sp, r8 # svc_sp <- r8 r8 是保存栈的下沿
0x60094af4 <+84>: mov r0, r8 # r0 <- r8
0x60094af8 <+88>: bl 0x6001a9c8 <rt_scheduler_do_irq_switch> # rt_schedual的变体,后面需要分析 与rt_schedual最主要的区别是 irq_switch_flag=0时, 函数返回什么都不做 =1时才可能做后面的任务调度
# rt_schedule在有中断嵌套时,将该flag设置为1 //TODO
0x60094afc <+92>: b 0x60094d98 <rt_hw_context_switch_exit>
rt_hw_context_switch_exit
0x60094d98 <+0>: mov r0, sp # r0 <- sp
0x60094d9c <+4>: cps #18 # 切回irq模式
0x60094da0 <+8>: bl 0x600196b4 <rt_signal_check> # signal 检查,有可能改变r0 寄存器的值
0x60094da4 <+12>: cps #19 # 切回svc
0x60094da8 <+16>: mov sp, r0 # svc_sp <- r0
0x60094dac <+20>: ldm sp, {sp, lr}^ # usr_sp, usr_lr <- svc_sp
0x60094db0 <+24>: add sp, sp, #8 # svc_sp <- svc_sp + 8
0x60094db4 <+28>: ldmfd sp!, {r1} # r1 <-svc_sp
0x60094db8 <+32>: msr SPSR_fsxc, r1 # 恢复SPSR svc_sp <- r8 r8 是保存栈的下沿,r8 + 8 的位置应该是存的r1, 即刚进来时的SPSR的值
0x60094dbc <+36>: ldm sp!, {r0, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, lr, pc}^ # 用户模式的 r0-r12 lr pc <- svc_sp 这个地方时满递增, 这里缺了r13, r13 是 sp,在上面已经处理了
# pc <- sp+60-4的位置存的数据, 给pc后,后面代码就不走了,转到pc处代码执行了, 同时因为包含pc,SPSR 被拷贝到 CPSR
# 这个地方跟 stmdb r0, {sp, lr}^ 呼应
.end rt_hw_context_switch_exit

参考
【解答】arm架构的linux内核中,clrex指令的作用是什么,内核中什么时候才会用到?_Heron——Linux & ARM-CSDN博客
arm 汇编指令 CPS_Louis@L.M.的博客-CSDN博客
20201102025124361.png

rt_hw_context_switch_interrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
   # r0 为 context , r1 current_thread->sp  r2 to_thread->sp r3 to_thread
0x60094d84 <+0>: str r0, [r1] # [current_thread->sp] <- context(from thread)
0x60094d88 <+4>: ldr sp, [r2] # sp <- [to_thread->sp]
0x60094d8c <+8>: mov r0, r3 # r0 <- to_thread r0 是传给 rt_cpus_lock_status_restore的入参
0x60094d90 <+12>: bl 0x600171f8 <rt_cpus_lock_status_restore> # 【!thread->cpus_lock_nest】 时 rt_hw_spin_unlock(&_cpus_lock);
0x60094d94 <+16>: b 0x60094d98 <rt_hw_context_switch_exit>
rt_hw_context_switch_exit
0x60094d98 <+0>: mov r0, sp # r0 <- sp
0x60094d9c <+4>: cps #18 # 切回irq模式
0x60094da0 <+8>: bl 0x600196b4 <rt_signal_check> # signal 检查,有可能改变r0 寄存器的值
0x60094da4 <+12>: cps #19 # 切到svc
0x60094da8 <+16>: mov sp, r0 # svc_sp <- r0 (to_thread->sp)
0x60094dac <+20>: ldm sp, {sp, lr}^ # usr_sp, usr_lr <- 【svc_sp】满递增
0x60094db0 <+24>: add sp, sp, #8 # svc_sp <- svc_sp + 8
0x60094db4 <+28>: ldmfd sp!, {r1} # r1 <- svc_sp
0x60094db8 <+32>: msr SPSR_fsxc, r1 # SPSR <- r1 恢复SPSR svc_sp <- r8 r8 是保存栈的下沿,r8 + 8 的位置应该是存的r1, 即刚进来时的SPSR的值
0x60094dbc <+36>: ldm sp!, {r0, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, lr, pc}^ # 用户模式的 r0-r12 lr pc <- 【svc_sp】 这个地方时满递增,
# pc <- sp+60-4的位置存的数据, 给pc后,后面代码就不走了,转到pc处代码执行 (执行to_thread的任务代码)
# 因为包含pc,SPSR 被拷贝到 CPSR
.end rt_hw_context_switch_exit

rt_schedual 与 rt_scheduler_do_irq_switch

一个是运行在非中断服务上下文, 一个是运行在中断服务上下文
pcpu->irq_nest 为标志, 进入中断时会响应rt_interrupt_enter, 该值+1. 中断处理函数处理完毕后, 调用rt_interrupt_leave, 该值-1.
该处讨论的是外部中断来了以后, 与调度器调度的处理逻辑.

  • 首先从rt_schedual的实现来看, 在中断处理函数处理完毕前是禁用调度器的.
  • rt_scheduler_do_irq_switch 只能在中断服务中调用.
  • 只有在处理外部中断时, 触发过rt_schedual, 中断处理完毕后, 进接着调用rt_scheduler_do_irq_switch才会往下走. 如果调度器的状态正常, 且irq_nest为0, 会进行任务调度.
    • 表明因外部中断处理而耽搁的任务调度要在中断处理完毕后马上进行.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void rt_schedule(void)    
{
level = rt_hw_interrupt_disable();
...
/* whether do switch in interrupt */

if (pcpu->irq_nest)
{
pcpu->irq_switch_flag = 1;
rt_hw_interrupt_enable(level);
goto __exit;
}
}
void rt_scheduler_do_irq_switch(void *context)
{
...
if (pcpu->irq_switch_flag == 0)
{
rt_hw_interrupt_enable(level);
return;
}
...
if (current_thread->scheduler_lock_nest == 1 && pcpu->irq_nest == 0)
{
... 切换任务
}
}

spin_lock 与中断状态

api

1
2
3
void rt_hw_spin_lock_init(rt_hw_spinlock_t *lock);
void rt_hw_spin_lock(rt_hw_spinlock_t *lock);
void rt_hw_spin_unlock(rt_hw_spinlock_t *lock);

整个调度中, 与自旋锁相关的有两个锁 _cpus_lock ``_rt_critical_lock
自旋锁的状态之前已经分析过, 可以参考这篇文章
(8 条消息) Linux 内核同步(二):自旋锁(Spinlock)_StephenZhou-CSDN 博客_spinlock

rt_cpus_lock

rt_hw_interrupt_disable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
rt_base_t rt_cpus_lock(void)
{
rt_base_t level;
struct rt_cpu* pcpu;
// 禁用中断, 同时返回禁用前的cspr
level = rt_hw_local_irq_disable();
// 当前cpu句柄
pcpu = rt_cpu_self();
if (pcpu->current_thread != RT_NULL)
{
register rt_ubase_t lock_nest = pcpu->current_thread->cpus_lock_nest;
// 当前cpu的当前任务的cpus_lock_nest+1 (cpus lock count)
pcpu->current_thread->cpus_lock_nest++;
// 当前cpu的当前任务cpus_lock_nest 为0 时, 加锁, 当前cpu的当前任务的scheduler_lock_nest+1 (scheduler lock count)
if (lock_nest == 0)
{
pcpu->current_thread->scheduler_lock_nest++;
rt_hw_spin_lock(&_cpus_lock);
}
}
return level;
}

rt_cpus_unlock

rt_hw_interrupt_enable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void rt_cpus_unlock(rt_base_t level)
{
struct rt_cpu* pcpu = rt_cpu_self();

if (pcpu->current_thread != RT_NULL)
{
pcpu->current_thread->cpus_lock_nest--;
// 当前cpu的当前任务 cpus_lock_nest 先 -1, 传进来是1的情况下 scheduler_lock_nest-1, 释放_cpus_lock锁
if (pcpu->current_thread->cpus_lock_nest == 0)
{
pcpu->current_thread->scheduler_lock_nest--;
rt_hw_spin_unlock(&_cpus_lock);
}
}
// 启用中断
rt_hw_local_irq_enable(level);
}

cpus_lock_nest作用

这个函数看来是跟 rt_cpus_lock 配对的, cpus_lock_nest变量的作用是为了避免线程多次调用rt_cpus_lock , 卡死在rt_hw_spin_lock处.
因此一个cpu的同一个任务可以多次调用rt_cpus_lock , 而不会卡在rt_hw_spin_lock处. 只要成对调用rt_cpus_unlock, 最后锁的状态就能放开.
自旋锁的实现是针对每个调用过程的, 如果不加这个字段, 同一任务连续两次调用rt_cpus_lock 会阻塞在rt_hw_spin_lock

scheduler_lock_nest作用

调度器锁定状态, 这个变量为1时, 才能进行调度

rt_enter_critical

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void rt_enter_critical(void)
{
register rt_base_t level;
struct rt_thread *current_thread;
// 禁用中断
level = rt_hw_local_irq_disable();

current_thread = rt_cpu_self()->current_thread;
if (!current_thread)
{
// 当前cpu上没有运行任务时, 启用中断, 退出. 什么都不做
rt_hw_local_irq_enable(level);
return;
}
{
register rt_uint16_t lock_nest = current_thread->cpus_lock_nest;
current_thread->cpus_lock_nest++;
if (lock_nest == 0)
{
// scheduler_lock_nest + 1
current_thread->scheduler_lock_nest ++;
rt_hw_spin_lock(&_cpus_lock);
}
}
current_thread->critical_lock_nest ++;
// scheduler_lock_nest 多加了一次
current_thread->scheduler_lock_nest ++;
// 启用中断
rt_hw_local_irq_enable(level);
}

该函数的作用是不能进行调度, 但可以响应中断
进入临界区的实现与rt_cpus_lock的实现差不多, 多出的操作是

  1. scheduler_lock_nest 多加了一次 (lock scheduler for local cpu),为什么多加了一次, 表示禁用调度器
  2. critical_lock_nest 加了一次 (critical for local cpu )
  3. 中断的状态, 出了函数后, 中断的状态没变, 但rt_cpu_lock出来后正常是把中断禁用了, rt_cpu_unlock出来后正常是把中断启用了.

rt_exit_critical

离开临界区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void rt_exit_critical(void)
{
register rt_base_t level;
struct rt_thread *current_thread;

// 禁用中断
level = rt_hw_local_irq_disable();

current_thread = rt_cpu_self()->current_thread;
if (!current_thread)
{
rt_hw_local_irq_enable(level);
return;
}

current_thread->scheduler_lock_nest --;
current_thread->critical_lock_nest --;

current_thread->cpus_lock_nest--;
if (current_thread->cpus_lock_nest == 0)
{
current_thread->scheduler_lock_nest --;
// 释放锁
rt_hw_spin_unlock(&_cpus_lock);
}
if (current_thread->scheduler_lock_nest <= 0)
{
current_thread->scheduler_lock_nest = 0;
// 启用中断
rt_hw_local_irq_enable(level);
rt_schedule();
}
else
{
// 启用中断
rt_hw_local_irq_enable(level);
}
}

_cpus_lock cpus_lock_nest

_cpus_lock是全局变量, 而前面api里涉及的scheduler_lock_nest cpus_lock_nest critical_lock_nest 都是属于thread的成员变量.
这个地方应该怎样理解呢?
首先先分析下调度过程.
假设两个cpu:
A C thread 跑在cpu0 上, B thread跑在cpu1上.
A 是当前正在运行的任务, A -> C的过程
rt_schedual, 进来时, 先调用rt_cpu_lock 禁用中断, 并上锁. A thread的cpus_lock_nestscheduler_lock_nest 都变成1
切到C时会调用rt_cpus_lock_status_restore, 在C thread的cpus_lock_nest是0时,才能放锁. 不为0的时候, 不能放锁.
rt_cpus_lock_status_restore 当前线程没有锁住cpu, 需要放锁.
这个地方是为什么?
我猜测应该是跟调度过程有关:
A 和 C都在cpu0上, 一次调度过程, spin_lock的加锁放锁是成对的, 调度过程中相当于把调度器锁上了, 如果这个时候cpu1也要进行调度, 那不好意思, 要等cpu0调度完(放锁后)才能进行调度
在调度器的上下文中, 是可能发生在同一个cpu上的两个任务的 cpu_lock_nest 都是1的情况
rt_schedual中, 进入调度状态, 假设A是current_thread, C 是to_thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void rt_schedule(void)
{
// A线程持锁, A 为current_thread A cpus_lock_nest = 1, 关闭中断
level = rt_hw_interrupt_disable();
...
// 切换到C 任务
rt_hw_context_switch((rt_ubase_t)&current_thread->sp,
(rt_ubase_t)&to_thread->sp, to_thread);

rt_hw_context_switch:
mov r0, r2
bl rt_cpus_lock_status_restore // 这个地方要检查to_thread->cpus_lock_nest是0时, 要放锁

----->
// C 任务执行, 切回到A任务
void rt_schedule(void)
{
// C任务持锁, C 为current_thread C任务 cpus_lock_nest = 1, 此时C任务怎么拿到的锁?
// 其实这个地方理解的没问题, 但是锁在前面的rt_cpus_lock_status_restore的调用的地方已经放掉了, 因为C任务在前面的切换过程中是to_thread. 它在那时的cpus_lock_nest是0, 已经放锁了.
level = rt_hw_interrupt_disable();
...
// 要切换到A 任务, 这个时候看 A 和 C的 cpus_lock_nest 都为1, 但是锁在切换过程中已经被放了.
rt_hw_context_switch((rt_ubase_t)&current_thread->sp,
(rt_ubase_t)&to_thread->sp, to_thread);
}

// 回到A 任务
rt_hw_interrupt_enable(level);
}
  1. 这个锁的处理比较绕. 就记住一点调度是以cpu为单位的. 但是arch_spin_lock是以调度过程为单位的, 因此要封装成cpu持锁的形式, 而一个cpu在一个时刻上只能有一个任务在执行, 因此这就是为什么要对current_thread 的cpus_lock_nest 做嵌套处理, 用它来控制何时加锁, 何时放锁了

但是这个过程不能想当然, 在cpu的任务切换期间还是要对这个cpus_lock_nest 做特殊处理.

  1. arm上实现的cpus_lock_nest 的ldrex``strex 都有cpu的独占标记.

对于Non-shareable memory:
通过local monitor的监控cpu的独占标记, ldrex 可以重复调, 但是存指令 strex 只有在前一条指令是ldrex 的时候才能操作成功.

  • 如果调用strex前没有调用过ldrex 设置标记, 则strex 会失败.
  • 如果调用strex前调用了strex把独占标记清了, 这次strex也会失败. 需要重新调用ldrex, 再调用strex, 才能操作成功.
    thread 1 thread 2 local monitor的状态
    Open Access state
    LDREX Exclusive Access state
    LDREX Exclusive Access state
    Modify Exclusive Access state
    STREX Open Access state
    Modify Open Access state
    STREX 在Open Access state的状态下,执行STREX指令会导致该指令执行失败
    保持Open Access state,直到下一个LDREX指令

scheduler_lock_nest

调度器上锁状态.
current_thread->**scheduler_lock_nest** == 1) , 才会进行任务切换.
这个是嵌套在cpus_lock引用计数里的, 只有当前核持有spinlock锁时, scheduler_lock_nest 才会上锁
参考rt_enter_criticalrt_exit_critical的实现, 这个变量为1时, 可以进行调度, 变量不为1时, 不能进行调度.
rt_enter_critical进入临界区的意思就是不能进行调度, 但可以响应中断