// choose从句deadline对应的超时回调,销毁所有的choose从句并resume协程
static void mill_choose_callback(struct mill_timer *timer) {
struct mill_cr *cr = mill_cont(timer, struct mill_cr, timer);
struct mill_slist_item *it;
for(it = mill_slist_begin(&cr->choosedata.clauses); it; it = mill_slist_next(it)) {
struct mill_clause *itcl = mill_cont(it, struct mill_clause, chitem);
mill_assert(itcl->used);
mill_list_erase(&itcl->ep->clauses, &itcl->epitem);
}
mill_resume(cr, -1);
}
// choose deadline从句
void mill_choose_deadline_(int64_t ddline) {
if(mill_slow(mill_running->choosedata.othws || mill_running->choosedata.ddline >= 0))
mill_panic("multiple 'otherwise' or 'deadline' clauses in a choose statement");
if(ddline < 0)
return;
mill_running->choosedata.ddline = ddline;
}
// choose otherwise从句
void mill_choose_otherwise_(void) {
if(mill_slow(mill_running->choosedata.othws ||
mill_running->choosedata.ddline >= 0))
mill_panic("multiple 'otherwise' or 'deadline' clauses in a choose statement");
mill_running->choosedata.othws = 1;
}
// 往chan追加数据val
static void mill_enqueue(struct mill_chan_ *ch, void *val) {
// 如果chan上还有关联的receiver执行choose in从句,唤醒对应的协程收数据(当然先写数据再唤醒)
if(!mill_list_empty(&ch->receiver.clauses)) {
mill_assert(ch->items == 0);
struct mill_clause *cl = mill_cont(
mill_list_begin(&ch->receiver.clauses), struct mill_clause, epitem);
// 写数据
memcpy(mill_valbuf(cl->cr, ch->sz), val, ch->sz);
// 唤醒收数据的协程
mill_choose_unblock(cl);
return;
}
// 只写数据
assert(ch->items < ch->bufsz);
size_t pos = (ch->first + ch->items) % ch->bufsz;
memcpy(((char*)(ch + 1)) + (pos * ch->sz) , val, ch->sz);
++ch->items;
}
// 从chan中取队首的数据val
static void mill_dequeue(struct mill_chan_ *ch, void *val) {
// 拿chan上sender的第一个choose out从句
struct mill_clause *cl = mill_cont(
mill_list_begin(&ch->sender.clauses), struct mill_clause, epitem);
// chan中valbuf当前无数据可读
if(!ch->items) {
// 调用了chdone后肯定没有sender要发送数据了,直接拷走数据即可(chdone追加的)
if(mill_slow(ch->done)) {
mill_assert(!cl);
memcpy(val, ((char*)(ch + 1)) + (ch->bufsz * ch->sz), ch->sz);
return;
}
// 还没有调用chdone,直接从choose out从句中拷走数据,再唤醒因为执行choose out阻塞的协程
mill_assert(cl);
memcpy(val, cl->val, ch->sz);
mill_choose_unblock(cl);
return;
}
// chan中valbuf当前有数据可读
// - 读取chan中的数据;
// - 如果对应的choose out从句cl存在,则拷贝其数据到chan valbuf并唤醒执行该从句的协程
memcpy(val, ((char*)(ch + 1)) + (ch->first * ch->sz), ch->sz);
ch->first = (ch->first + 1) % ch->bufsz;
--ch->items;
if(cl) {
assert(ch->items < ch->bufsz);
size_t pos = (ch->first + ch->items) % ch->bufsz;
memcpy(((char*)(ch + 1)) + (pos * ch->sz) , cl->val, ch->sz);
++ch->items;
mill_choose_unblock(cl);
}
}
// choose wait从句
int mill_choose_wait_(void) {
struct mill_choosedata *cd = &mill_running->choosedata;
struct mill_slist_item *it;
struct mill_clause *cl;
// 每个协程都有一个对应的choosedata数据结构
//
// 如果当前有就绪的choose in/out从句,则选择一个并执行
if(cd->available > 0) {
// 只有1个就绪的choose从句直接去检查el->ep->type就知道干什么了
// 如果有多个就绪的choose从句,随机选择一个就绪的从句去执行
int chosen = cd->available == 1 ? 0 : (int)(random() % (cd->available));
for(it = mill_slist_begin(&cd->clauses); it; it = mill_slist_next(it)) {
cl = mill_cont(it, struct mill_clause, chitem);
if(!cl->available)
continue;
if(!chosen)
break;
--chosen;
}
struct mill_chan_ *ch = mill_getchan(cl->ep);
// 根据choose从句类型决定是向chan发送数据,还是从chan读取数据
if(cl->ep->type == MILL_SENDER)
mill_enqueue(ch, cl->val);
else
mill_dequeue(ch, mill_valbuf(cl->cr, ch->sz));
mill_resume(mill_running, cl->idx);
return mill_suspend();
}
// 如果没有choose in/out从句事件就绪但是有otherwise从句,直接执行otherwise从句
// - 这里实际上相当于将当前运行的协程重新加入调度队列,然后主动挂起当前协程
if(cd->othws) {
mill_resume(mill_running, -1);
return mill_suspend();
}
// 如果指定了deadline从句,为其启动一个定时器,并绑定超时回调
if(cd->ddline >= 0)
mill_timer_add(&mill_running->timer, cd->ddline, mill_choose_callback);
// 其他情况下,将当前协程和被查询的chan进行注册,等到直到有一个choose从句unblock
for(it = mill_slist_begin(&cd->clauses); it; it = mill_slist_next(it)) {
cl = mill_cont(it, struct mill_clause, chitem);
if(mill_slow(cl->ep->refs > 1)) {
if(cl->ep->tmp == -1)
cl->ep->tmp =
cl->ep->refs == 1 ? 0 : (int)(random() % cl->ep->refs);
if(cl->ep->tmp) {
--cl->ep->tmp;
cl->used = 0;
continue;
}
cl->ep->tmp = -2;
}
mill_list_insert(&cl->ep->clauses, &cl->epitem, NULL);
}
// 如果有多个协程并发的执行chdone,只可能有一个执行成功,其他的都必须阻塞在下面这行
return mill_suspend();
}
// 获取正在运行的协程的chan数据存储缓冲区valbuf
void *mill_choose_val_(size_t sz) {
return mill_valbuf(mill_running, sz);
}
// 向chan中发送数据
void mill_chs_(struct mill_chan_ *ch, void *val, size_t sz,
const char *current) {
if(mill_slow(!ch))
mill_panic("null channel used");
mill_trace(current, "chs(<%d>)", (int)ch->debug.id);
mill_choose_init(current);
mill_running->state = MILL_CHS;
struct mill_clause cl;
mill_choose_out_(&cl, ch, val, sz, 0);
mill_choose_wait_();
}
// 从chan中接收数据
void *mill_chr_(struct mill_chan_ *ch, size_t sz, const char *current) {
if(mill_slow(!ch))
mill_panic("null channel used");
mill_trace(current, "chr(<%d>)", (int)ch->debug.id);
mill_running->state = MILL_CHR;
mill_choose_init(current);
struct mill_clause cl;
mill_choose_in_(&cl, ch, sz, 0);
mill_choose_wait_();
return mill_choose_val_(sz);
}
// chan上的chdone操作
void mill_chdone_(struct mill_chan_ *ch, void *val, size_t sz,
const char *current) {
if(mill_slow(!ch))
mill_panic("null channel used");
mill_trace(current, "chdone(<%d>)", (int)ch->debug.id);
if(mill_slow(ch->done))
mill_panic("chdone on already done-with channel");
if(mill_slow(ch->sz != sz))
mill_panic("send of a type not matching the channel");
/* Panic if there are other senders on the same channel. */
if(mill_slow(!mill_list_empty(&ch->sender.clauses)))
mill_panic("send to done-with channel");
/* Put the channel into done-with mode. */
ch->done = 1;
// 在valbuf末尾再追加一个元素,不能chs往valbuf中写因为这样没有receiver的情况下会阻塞
memcpy(((char*)(ch + 1)) + (ch->bufsz * ch->sz) , val, ch->sz);
// 追加上述一个多余的元素后,需要唤醒chan上所有等待的receiver
while(!mill_list_empty(&ch->receiver.clauses)) {
struct mill_clause *cl = mill_cont(
mill_list_begin(&ch->receiver.clauses), struct mill_clause, epitem);
memcpy(mill_valbuf(cl->cr, ch->sz), val, ch->sz);
mill_choose_unblock(cl);
}
}