协程

实际上协程的概念比线程还要早,按照Knuth的说法”子例程是协程的特例”,一个子例程就是一次子函数调用,那么实际上协程就是类函数一样的程序组件,可以在一个线程里面轻松创建数十万个协程,就像数十万次函数调用一样,只不过子例程只有一个调用入口起始点,返回之后就结束了,而协程入口既可以是起始点,又可以从上一个返回点继续执行,也就是说协程之间可以通过 yield 方式转移执行权,对称(symmetric)平级地调用对方,而不是像例程那样上下级调用关系。当然 Knuth的”特例”指的是协程也可以模拟例程那样实现上下级调用关系,这就叫非对称协程(asymmetric coroutines),C标准库给我们提供了两种协程调度原语:一种是setjmp/longjmp,另一种是ucontext组件,它们内部(当然是用汇编语言)实现了协程的上下文切换,github上也有很多已经实现了的协程开源库:libtask,libco等;

protothreads的实现

(1)lc-switch.h是协程最基础的宏定义的封装,lc_t是一个unsigned short类型,LC_INIT初始化,LC_RESUME和LC_SET组合应该是switch的奇淫巧计,LC_END就是一个结尾的标识;

typedef unsigned short lc_t;
#define LC_INIT(s) s = 0
#define LC_RESUME(s) switch(s) { case 0:
#define LC_SET(s) s = __LINE__; case __LINE__:
#define LC_END(s) }

那么思考一下:如何通过每次调用函数实现返回0到9?
当然最简单的方法就是直接用静态变量如:int function(void) { static int i = 0; i++; return i; },这种类似的方法就不说了,来说说用switch实现类似的方法:

#include <stdio.h>

int function2(void) 
{
  static int i, state = 0;
  switch (state) 
  {
    case 0:
    for (i = 0; i < 10; i++) 
    {
      state = 1;
      return i;
      case 1:;
    }
  }
}
int main()
{
    fprintf(stdout, "func0 : %d\n", function2());
    fprintf(stdout, "func1 : %d\n", function2());
    fprintf(stdout, "func2 : %d\n", function2());
    fprintf(stdout, "func3 : %d\n", function2());
    fprintf(stdout, "func4 : %d\n", function2());
    fprintf(stdout, "func5 : %d\n", function2());
    fprintf(stdout, "func6 : %d\n", function2());

    return 0;
}
结果:
LINKXZHOU-MC0:test_code linkxzhou$ ./gt_01
func0 : 0
func1 : 1
func2 : 2
func3 : 3
func4 : 4
func5 : 5
func6 : 6

分析:通过switch的case来控制跳转,state控制当前的实际状态,这样就实现了协程的状态保存;

(2)pt.h提供了调用的各种原语,下面就分析protothreads如何通过宏来控制状态跳转:

struct pt {
  lc_t lc;
};
#define PT_WAITING 0
#define PT_YIELDED 1
#define PT_EXITED  2
#define PT_ENDED   3

struct pt结构体和状态宏,其中struct pt中的lc变量是保存状态使用;

#define PT_INIT(pt)   LC_INIT((pt)->lc)
#define PT_THREAD(name_args) char name_args
#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc)
#define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \
                   PT_INIT(pt); return PT_ENDED; }

PT_INIT(pt) 是pt的lc初始化,宏定义展开是:(pt)->lc = 0;
PT_THREAD(name_args) 是用于函数的定义,如:PT_THREAD(codelock_thread(struct pt *pt)) -> char codelock_thread(struct pt *pt);
PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc) 调用函数的开始位置,定义展开是: { char PT_YIELD_FLAG = 1; switch((pt)->lc) { case 0: ;
PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; PT_INIT(pt); return PT_ENDED; },PT_BEGIN 和 PT_END一定要成对出现,定义展开是:}; PT_YIELD_FLAG = 0; (pt)->lc = 0;return PT_ENDED; } ;

/* 协程阻塞点(blocking),本质上等同于 PT_YIELD_UNTIL,只
不过退出码是 PT_WAITING,用来模拟信号量同步 */
#define PT_WAIT_UNTIL(pt, condition)          \
  do {            \
    LC_SET((pt)->lc);       \
    if(!(condition)) {        \
      return PT_WAITING;      \
    }           \
  } while(0)

#define PT_WAIT_WHILE(pt, cond)  PT_WAIT_UNTIL((pt), !(cond))
#define PT_WAIT_THREAD(pt, thread) PT_WAIT_WHILE((pt), PT_SCHEDULE(thread))
#define PT_SPAWN(pt, child, thread)   \
  do {            \
    PT_INIT((child));       \
    PT_WAIT_THREAD((pt), (thread));   \
  } while(0)

#define PT_RESTART(pt)        \
do {            \
  PT_INIT(pt);        \
  return PT_WAITING;      \
} while(0)

#define PT_EXIT(pt)       \
  do {            \
    PT_INIT(pt);        \
    return PT_EXITED;     \
  } while(0)
/* 协程调度,调用协程 f 并检查它的退出码,直到协程终止返回 0,否则返回 1 */
#define PT_SCHEDULE(f) ((f) < PT_EXITED)
/* 协程出让点,如果此时协程状态变量 lc 已经变为 __LINE__ 跳转过来的,
那么 PT_YIELD_FLAG = 1,表示从出让点继续执行*/
#define PT_YIELD(pt)        \
  do {            \
    PT_YIELD_FLAG = 0;        \
    LC_SET((pt)->lc);       \
    if(PT_YIELD_FLAG == 0) {      \
      return PT_YIELDED;      \
    }           \
  } while(0)
/* 附加出让条件 */
#define PT_YIELD_UNTIL(pt, cond)    \
  do {            \
    PT_YIELD_FLAG = 0;        \
    LC_SET((pt)->lc);       \
    if((PT_YIELD_FLAG == 0) || !(cond)) { \
      return PT_YIELDED;      \
    }           \
  } while(0)

这里我主要主要的宏,PT_WAIT_UNTIL其完全展开如下:

 do {
    (pt)->lc = __LINE__; case __LINE__:;
    if(!(condition)) { // 条件不成立则直接进入PT_WAITING
      return PT_WAITING;
    }
  } while(0)

那么结合PT_BEGIN就形成了switch(…) {case 0: while (…) { … case LINE: …}}的形式,从而根据switch的变量实现各种状态跳转的源语; (3)pt-sem.h是扩展的信号量

struct pt_sem {
  unsigned int count;
};
// 初始化为某个值
#define PT_SEM_INIT(s, c) (s)->count = c
// 条件等待
#define PT_SEM_WAIT(pt, s)  \
  do {            \
    PT_WAIT_UNTIL(pt, (s)->count > 0);    \
    --(s)->count;       \
  } while(0)
// 发送信号
#define PT_SEM_SIGNAL(pt, s) ++(s)->count

protothreads的使用

protothreads的使用通用的一个方式:

PT_THREAD(thread1(struct pt *pt))
{
  PT_BEGIN(pt);
  while(1) {
    ...                         // 业务逻辑
    PT_WAIT_UNTIL(pt, 判断条件1);
    ...                         // 业务逻辑
    PT_WAIT_UNTIL(pt, 判断条件2);
    ...                         // 业务逻辑
  }

  PT_END(pt);
}
PT_THREAD(thread2(struct pt *pt))
{
  PT_BEGIN(pt);
  while(1) {
    ...                         // 业务逻辑
    PT_WAIT_UNTIL(pt, 判断条件1);
    ...                         // 业务逻辑
    PT_WAIT_UNTIL(pt, 判断条件2);
    ...                         // 业务逻辑
  }

  PT_END(pt);
}
...

// 定义静态变量
static struct pt pt1, pt2 xxx;
int main()
{
  ...
  PT_INIT(&pt1);
  PT_INIT(&pt2);
  PT_INIT(xxx) .... // 添加上述的静态变量
  
  while(1) {
    thread1(&pt1);
    thread2(&pt2);
    ... // 可以继续添加运行的协程
    ... // 其他的业务逻辑
  }

  return 0;
}

当然有一个更全面的例子,可以参考pt-1.4源码中的example-buffer.c,通过改写后的信号量来处理生产者何消费者模型,整体框架就是一个asymmetric coroutines,包括一个主协程driver_thread和两个子协程producer和consumer;上面讲了很多pt的使用,那么它的缺点呢?
(1)其实pt使用与保存栈方法的协程都是stackless,它本身没有一个局部变量,但这不是问题,首先我们已经假定运行环境是单线程的,其次在一个简化的需求下也用不了多少”局部变量”;(不过这个缺点可以通过参考该文件解决:https://www.chiark.greenend.org.uk/~sgtatham/coroutine.h). (2)pt使用了switch-case,那么我们在使用过程中不能再在中间加switch-case的语法;
(3)一个协程内部可以调用其它例程,但必须保证该例程是非阻塞的,否则所在线程内的所有协程都将被阻塞;

参考引用

(1)一个“蝇量级” C 语言协程库