ICS Lab1 Coroutine Report

ICS Lab1 Coroutine实验报告

张凯文 2021010729 计12班 2022年11月21日

Task 1: 协程库的实现

代码实现

首先,在coroutine_pool.h中,实现了(不考虑协程ready属性的)协程池以串行调用执行协程的核心函数coroutine_pool::serial_execute_all

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void coroutine_pool::serial_execute_all() {
is_parallel = false;
g_pool = this;

/* START OF MY CODE */
int unfinished = coroutines.size(); //记录未执行完成的协程总数
while(unfinished) { //仍有协程未执行完时,需要重新轮询
for(int i = 0; i < coroutines.size(); ++i) { //通过vector下标轮询协程
if(!coroutines[i]->finished) { //如果未执行完
context_id = i; //更新当前正在执行的协程id
coroutines[i]->resume(); //恢复执行当前协程
} else {
if(coroutines[i]->ready) { //如果已经当前询问的协程执行完成且之前没有更新过unfinished,更新标记
--unfinished;
coroutines[i]->ready = false;
}
}
}
}
/* END OF MY CODE */

for (auto context : coroutines) {
delete context;
}
coroutines.clear();
}

然后,在context.hcommon.h中分别实现了切入和切出协程的函数resumeyield:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* 恢复协程函数运行。
* 调用 coroutine_switch,保存当前协程池的执行现场(将 callee-saved 寄存器内容暂存到当前协程的 caller_register 数组中);
* 恢复当前协程在上一次 yield 时的执行现场(将 callee_registers 数组中保存的内容恢复到真实寄存器上,设置协程栈帧(%rsp指向的位置),将 rip 恢复到协程 yield 之后所需要执行的指令地址)。
*/
virtual void coroutine_context::resume() {
coroutine_switch(caller_registers, callee_registers);
}

/* 协程主动暂停执行,保存协程的寄存器和栈帧。
* 将上下文转换至 coroutine_pool.serial_execute_all() 中的上下文进行重新的 schedule 调用。
*/
void yield() {
if (!g_pool->is_parallel) {
// 从 g_pool 中获取当前协程状态
auto context = g_pool->coroutines[g_pool->context_id];

// 调用 coroutine_switch 切换到 coroutine_pool 上下文
coroutine_switch(context->callee_registers, context->caller_registers);
}
}

并在context.S中编写实现coroutine_switch的汇编代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
.global coroutine_switch
coroutine_switch:
# 保存 callee-saved 寄存器到 %rdi 指向的上下文
movq %rsp, 64(%rdi)
movq %rbx, 72(%rdi)
movq %rbp, 80(%rdi)
movq %r12, 88(%rdi)
movq %r13, 96(%rdi)
movq %r14, 104(%rdi)
movq %r15, 112(%rdi)
# 保存的上下文中 rip 指向 ret 指令的地址(.coroutine_ret)
leaq .coroutine_ret(%rip), %rax
movq %rax, 120(%rdi)

# 从 %rsi 指向的上下文恢复 callee-saved 寄存器
movq 64(%rsi), %rsp
movq 72(%rsi), %rbx
movq 80(%rsi), %rbp
movq 88(%rsi), %r12
movq 96(%rsi), %r13
movq 104(%rsi), %r14
movq 112(%rsi), %r15
# 最后 jmpq 到上下文保存的 rip
jmpq *120(%rsi)

.coroutine_ret:
ret

分析

一个协程(coroutine_context类的实例)保存的核心信息包括:

  • 协程函数F及其参数Args
  • 一个指向协程函数栈帧基址的指针stack
  • 两个暂存协程池和协程函数寄存器状态的数组caller_registerscallee_registers

初始化一个协程时(未被协程调用之前):

  1. 首先在堆上为当前协程要执行的函数分配一块指定大小的栈帧空间(默认16KB),并将callee_registersrsp位置初始化为“栈顶”地址(这块空间最高位地址);
  2. 同样在callee_registers中,将最初的rip设置为coroutine_entry的地址,并将r12r13位置分别设置为coroutine_main函数的地址和参数(指向当前协程自身的指针this)。

当协程池第一次调用当前协程时,上述在callee_registers中初始化的寄存器信息就会被拷贝到真实寄存器上,因此第一次调用该协程时首先执行的是地址保存在rip中的coroutine_entry

1
2
3
coroutine_entry:
movq %r13, %rdi
callq *%r12

coroutine_entry将r13中保存的参数拷贝到rdi(保存调用函数的第一个参数)中,随即调用地址保存在r12中的coroutine_main函数:

1
2
3
4
5
6
7
8
void coroutine_main(struct basic_context *context) {
context->run();
context->finished = true;
coroutine_switch(context->callee_registers, context->caller_registers);

// unreachable
assert(false);
}

至此,当前协程调用run(),所保存的协程函数F才真正开始执行起来。在F执行过程中,随时有可能调用yield主动暂停执行,保存当下执行现场(协程的寄存器和栈帧),将CPU控制权还给协程池的调度函数,直到下一次resume恢复上一次的执行现场继续执行。这样的过程可能重复出现,直到F函数彻底执行完毕退出,协程主函数coroutine_maincontext->run()这条语句才算执行完成,随后将当前协程contextfinished状态标为true,并将控制权交还给协程池,当前协程执行完毕。

在协程切换时,栈和寄存器的变化可以用下图说明:

Task 2: 实现 sleep 函数

代码实现

coroutine_pool.h中,考虑协程的ready属性后,实现了完整的coroutine_pool::serial_execute_all函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void serial_execute_all() {
is_parallel = false;
g_pool = this;

int unfinished = coroutines.size();
while(unfinished) {
for(int i = 0; i < coroutines.size(); ++i) {
if(!coroutines[i]->finished) {
/* START OF MY TASK2 CODE */
if(coroutines[i]->ready) { //如果当前询问的协程是ready的,才真正执行调用
context_id = i;
coroutines[i]->resume();
} else {
coroutines[i]->ready = coroutines[i]->ready_func(); //如果还没ready,就不用费事切换进去了,转而通过ready_func查看创建时间戳,更新ready状态
}
/* END OF MY TASK2 CODE */
} else {
if(coroutines[i]->ready) {
--unfinished;
coroutines[i]->ready = false;
}
}
}
}

for (auto context : coroutines) {
delete context;
}
coroutines.clear();
}

并在common.h中完成sleep函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void sleep(uint64_t ms) {
if (g_pool->is_parallel) {
auto cur = get_time();
while (
std::chrono::duration_cast<std::chrono::milliseconds>(get_time() - cur)
.count() < ms)
;
} else {
/* START OF MY CODE */
auto context = g_pool->coroutines[g_pool->context_id]; // 从 g_pool 中获取当前协程状态
context->ready = false; //将协程置为不可用状态
// 获取当前时间,更新 ready_func
// ready_func:检查当前时间,如果已经超时,则返回 true (注册该函数相当于打下一个时间戳)
auto cur = get_time();
context->ready_func = [cur, ms]() {
return std::chrono::duration_cast<std::chrono::milliseconds>(get_time() - cur)
.count() >= ms;
};
yield(); // 切出协程,切换到 coroutine_pool 上下文
/* END OF MY CODE */
}
}

分析

sleep_sort的基本执行流程为:

  1. 在第一次轮询时,所有初始化的协程均为ready状态,会依次执行sleep函数,将协程的ready状态置为false,并注册ready_func,打下开始睡眠时的时间戳,然后切出;
  2. 之后协程池会不断重复轮询各个协程,调用ready_func更新各个协程的ready状态;
  3. 睡眠时间最少的协程将最早达到预定时间,最早被更新为ready状态,那么在紧接着的下一次轮询中该协程就会被恢复调用,执行printf语句打印保存的x值(睡眠时间的ms数),因此,x越小睡眠时间越少,就越先被打印出来,从而实现从小到大的排序功能;
  4. 协程一旦被执行完成,会被更新为finished状态,那么在下一次轮询到它时,会将unfinished数减1,同时再次将它的ready状态置为false,这里的ready起到标记该已完成的协程是否更新过unfinished数的作用,防止同一个已完成的协程反复更新unfinished数导致协程池调度函数异常提前退出;
  5. 上述2-4步会重复执行,直到unfinished数为0,全部协程执行完成,相应地,输入的整数也被由小到大依次打印。

下面以$n=3$,输入序列为$3\ 1\ 2$为例,展示sleep_sort中不同协程的运行情况(注:协程函数F仅展示代入参数实现实际功能的伪代码):


目前的协程库实现方式是不断重复轮询各个协程ready_func是否为true更新ready状态实现的,事实上,也可以通过在协程池中根据第一次轮询时各个协程睡眠时间长短维护一个优先级队列(睡眠时间最短的协程在队首),后续每次只须询问队首协程是否ready,调用ready_func更新队首协程状态即可(避免了调用后续协程的ready_func做无效更新),直到队首协程readytrue,执行该协程并将其弹出优先级队列,以此类推,直到队列为空为止。

Task 3: 利用协程优化二分查找

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void lookup_coroutine(const uint32_t *table, size_t size, uint32_t value,
uint32_t *result) {
size_t low = 0;
while ((size / 2) > 0) {
size_t half = size / 2;
size_t probe = low + half;

/* START OF MY CODE */
// 使用 __builtin_prefetch 预取容易产生缓存缺失的内存
// 并调用 yield
__builtin_prefetch(&table[probe]);
yield();
/* END OF MY CODE */

uint32_t v = table[probe];
if (v <= value) {
low = probe;
}
size -= half;
}
*result = low;
}

分析

利用协程主要就是为了优化从table中取值(uint32_t v = table[probe])这一句,由于二分查找的非局部性,待取的值很可能不在缓存中,这时可以不要立刻去内存中找值,而是切换到其他协程干其他活的同时让 CPU 异步将要用的数据预取到缓存区,等到再返回到刚才使用预取指令的协程的时候,CPU 已经把数据读取到了缓存中,从而省下了去内存中读取数据的这一段 IO 时间。

以下是性能提升效果的展示:

测试环境:Linux hp 5.4.0-132-generic #148-Ubuntu 系统;x86_64 架构;g++ (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0 编译器

  1. 改变 batch size(协程池中一次放入的协程个数)对性能的影响:

    可以发现,在 batch size 设为 8~32 时对二分查找性能提升的效果较明显,当 batch size 设得太小时,CPU 还没来得及将数据预取到缓存区就再次轮询到原协程,与并行执行相比反而影响了性能;batch size 过大时可能由于不同协程间切换操作过多等也会影响性能。

  2. 改变排序数组大小对性能的影响:

    可以发现,当排序数组过小时,缓存大部分时候足以包含到待取数据,引入协程串行执行也将成为画蛇添足,会影响性能;只有当待排序数组足够大时才有性能提升效果。

注:源代码和实验数据放在代码包/exp目录下。

总结与感想

本次协程实验使我第一次站在系统层面理解程序运行的过程,协程库的实现让我深入理解了协程池调度函数和各个协程之间的相互关系,其中协程切换汇编代码的实现让我对内存的布局结构以及协程切换、函数调用时寄存器和栈的变化有了进一步的理解,深刻认识到寄存器、栈帧的状态和返回地址记录一个函数的执行现场的作用,能够通过设置rip指向的地址直接控制程序执行的走向是最让我感到激动和成就感的部分;协程睡眠的实现使我学会通过注册函数储存和访问历史信息;二分查找的性能优化使我对缓存预取和协程串行运行的实际意义有了直观的认识。整个实验完成下来,从最开始的举步维艰到逐步对整个协程运行架构有了整体深刻的理解,我深感受益匪浅。

注:本次实验主体部分均为我个人独立完成,主要用到的参考是助教老师的框架导读;Task3中对l m b不同参数的自动化测试脚本/exp/experiment.py由我的舍友宋曦轩编写,秉承不重复造轮子的原则我复用了他的代码;对实验结果的可视化分析代码/exp/plot.py由我个人编写完成。
另注:本报告主要长度来自对修改代码完整函数的展现以及可视化图片占据的空间,文字说明部分已尽可能通过结点/数字列表等方式整理精简呈现。