diff --git a/README_cn.md b/README_cn.md index dfb621b5..f0f712ed 100644 --- a/README_cn.md +++ b/README_cn.md @@ -113,6 +113,7 @@ sudo dnf install workflow * [关于计数器](docs/about-counter.md) * [模块任务](docs/about-module.md) * [DAG图任务](docs/tutorial-11-graph_task.md) + * [Selector任务](docs/about-selector.md) * 任务间通信 * [条件任务与观察者模式](docs/about-conditional.md) * [资源池与消息队列](docs/about-resource-pool.md) diff --git a/docs/about-selector.md b/docs/about-selector.md new file mode 100644 index 00000000..76b96d07 --- /dev/null +++ b/docs/about-selector.md @@ -0,0 +1,46 @@ +# 关于Selector任务 + +我们业务中经常有一些需求,从几个异步分支中选择第一个成功完成的结果进行处理,丢弃其它结果。 +Selector任务就是为了上述这种多选一场景而设计的。 + +# Selector解决的问题 +常见的多选一场景例如: +* 向多个下游发送网络请求,只要任意一个下游返回正确结果,工作流程就可以继续。 +* 执行一组复杂的操作,操作执行完成或整体超时,流程都会继续。 +* 并行计算中,任何一个线程计算出预期的结果即完成,例如MD5碰撞计算。 +* 网络应用中的‘backup request’,也可以用selector配合timer来实现。 + +在selector任务被引入之前,这些场景很难被很好解决,涉及到任务生命周期以及丢弃结果的资源回收等问题。 + +# 创建Selector任务 +Selector也是一种任务,所以一般由WFTaskFactory里的工厂函数产生: +~~~cpp +using selector_callback_t = std::function; + +class WFTaskFactory +{ +public: + static WFSelectorTask *create_selector_task(size_t candidates, + selector_callback_t callback); +}; +~~~ +其中,candidates参数代表从多少个候选路径中选择。Selector任务创建后,必须有candidates次被提交才会被销毁。 +因此,用户可以放心的(也是必须的)向selector提交candidates次,无需要担心selector的生命周期问题。 + +# Selector类的接口 +WFSelectorTask类包括两个主要接口。其中,对提交者来讲,只需要关注submit函数。对于等待者,只需使用到get_message。 +~~~cpp +class WFSelectorTask : public WFGenericTask +{ +public: + virtual int submit(void *msg); + + void *get_message() const; +}; +~~~ +当第一个候选者通过submit提交了一个非空指针的msg,这个消息会被接受。如果任务已经启动,selector的callback被调用。 +和其它任何类型的任务一样,callback结束之后,任务所在的series就继续执行了,无需等待其它候选被提交。 +当一个候选消息被接受,submit函数返回1。之后的submit调用都返回0表示拒绝。一般这种情况下用户需要释放msg对应资源。 +注意空指针永远不会被接受,所以submit一个NULL永远返回0。一般来讲,submit(NULL)用于表示这个分支失败了。 +如果所有候选都提交了NULL,selector运行到callback时,state=WFT_STATE_SYS_ERROR, error=ENOMSG。 +作为等待者,在selector的callback里调用另外一个接口get_message()就可以得到被成功接受的消息了。 diff --git a/src/factory/WFTask.h b/src/factory/WFTask.h index d2595f88..78c8c318 100644 --- a/src/factory/WFTask.h +++ b/src/factory/WFTask.h @@ -485,10 +485,7 @@ public: } } - void **get_mailbox() const - { - return this->mailbox; - } + void **get_mailbox() const { return this->mailbox; } public: void set_callback(std::function cb) @@ -542,6 +539,99 @@ protected: virtual ~WFMailboxTask() { } }; +class WFSelectorTask : public WFGenericTask +{ +public: + virtual int submit(void *msg) + { + void *tmp = NULL; + int ret = 0; + + if (this->message.compare_exchange_strong(tmp, msg) && msg) + { + ret = 1; + if (this->flag.exchange(true)) + { + this->state = WFT_STATE_SUCCESS; + this->subtask_done(); + } + } + + if (--this->nleft == 0) + { + if (!this->message) + { + this->state = WFT_STATE_SYS_ERROR; + this->error = ENOMSG; + this->subtask_done(); + } + + delete this; + } + + return ret; + } + + void *get_message() const { return this->message; } + +public: + void set_callback(std::function cb) + { + this->callback = std::move(cb); + } + +protected: + virtual void dispatch() + { + if (this->flag.exchange(true)) + { + this->state = WFT_STATE_SUCCESS; + this->subtask_done(); + } + + if (--this->nleft == 0) + { + if (!this->message) + { + this->state = WFT_STATE_SYS_ERROR; + this->error = ENOMSG; + this->subtask_done(); + } + + delete this; + } + } + + virtual SubTask *done() + { + SeriesWork *series = series_of(this); + + if (this->callback) + this->callback(this); + + return series->pop(); + } + +protected: + std::atomic message; + std::atomic flag; + std::atomic nleft; + std::function callback; + +public: + WFSelectorTask(size_t candidates, + std::function&& cb) : + message(NULL), + flag(false), + nleft(candidates + 1), + callback(std::move(cb)) + { + } + +protected: + virtual ~WFSelectorTask() { } +}; + class WFConditional : public WFGenericTask { public: diff --git a/src/factory/WFTaskFactory.h b/src/factory/WFTaskFactory.h index 75f37a94..974cd960 100644 --- a/src/factory/WFTaskFactory.h +++ b/src/factory/WFTaskFactory.h @@ -89,9 +89,10 @@ using fsync_callback_t = std::function; using timer_callback_t = std::function; using counter_callback_t = std::function; -// Mailbox is like counter with data passing using mailbox_callback_t = std::function; +using selector_callback_t = std::function; + // Graph (DAG) task. using graph_callback_t = std::function; @@ -296,6 +297,13 @@ public: static void send_by_name(const std::string& mailbox_name, void *msg, size_t max); +public: + static WFSelectorTask *create_selector_task(size_t candidates, + selector_callback_t callback) + { + return new WFSelectorTask(candidates, std::move(callback)); + } + public: static WFConditional *create_conditional(SubTask *task, void **msgbuf) { @@ -336,7 +344,7 @@ public: return WFTaskFactory::release_guard(resource_name, NULL); } - static int release_guard(const std::string& resaource_name, void *msg); + static int release_guard(const std::string& resource_name, void *msg); static int release_guard_safe(const std::string& resource_name) {