mirror of
https://github.com/sogou/workflow.git
synced 2026-02-08 01:33:17 +08:00
Merge pull request #1370 from Barenboim/master
Add WFFacilities::ReplyGuard
This commit is contained in:
@@ -192,3 +192,43 @@ server任务的callback和client一样,是在http交互完成之后被调用
|
||||
这里需要说明一下,回复消息的时机是在series里所有其它任务被执行完后,自动回复,所以并没有task->reply()接口。
|
||||
但是,有task->noreply()调用,如果对server任务执行了这个调用,在原本回复的时刻,直接关闭连接。但callback依然会被调用(状态为NOREPLY)。
|
||||
在server任务的callback里,同样可以通过series_of()操作获得任务的series。那么,我们依然可以往这个series里追加新任务,虽然回复已经完成。
|
||||
|
||||
# 另外一种实现异步Server的便利方法
|
||||
|
||||
由于很多用户会直观的觉得,server的process函数结束server处理流程就结束并回复了。所以,经常有用户在process里使用wait group进行等待:
|
||||
~~~cpp
|
||||
int process(WFHttpTask *server_task)
|
||||
{
|
||||
WFFacilities::WaitGroup wait_group(1);
|
||||
WFHttpTask *task = WFTaskFactory::create_http_task(..., [&wait_group, server_task]{WFHttpTask *task) {
|
||||
*server_task->get_resp() = std::move(*task->get_resp());
|
||||
wait_group.done();
|
||||
});
|
||||
task->start();
|
||||
wait_group.wait();
|
||||
}
|
||||
~~~
|
||||
我们需要强调,以上的代码是一种不高效的写法,因为这会让一个线程进入等待。等价的高效写法是:
|
||||
~~~cpp
|
||||
int process(WFHttpTask *server_task)
|
||||
{
|
||||
WFHttpTask *task = WFTaskFactory::create_http_task(..., [server_task]{WFHttpTask *task) {
|
||||
*server_task->get_resp() = std::move(*task->get_resp());
|
||||
});
|
||||
series_of(server_task)->push_back(task);
|
||||
}
|
||||
~~~
|
||||
但鉴于很多用户不想了解series用法,我们加入一个便利类ReplyGuard,让用户可以在任何时候回复请求,用法如下:
|
||||
~~~cpp
|
||||
int process(WFHttpTask *server_task)
|
||||
{
|
||||
auto *guard = new WFFacilities::ReplyGuard(server_task);
|
||||
WFHttpTask *task = WFTaskFactory::create_http_task(..., [guard, server_task]{WFHttpTask *task) {
|
||||
*server_task->get_resp() = std::move(*task->get_resp());
|
||||
delete guard; // 此时server才会回复。
|
||||
});
|
||||
task->start();
|
||||
}
|
||||
~~~
|
||||
WFFacilities::ReplyGuard用于阻止一个server task的回复,只有这个guard被析构,才会触发回复。
|
||||
使用ReplyGuard一般不影响原server task series的使用,用户依然可以push_back任务。但**避免再调用series的cancel()**。
|
||||
|
||||
@@ -114,6 +114,8 @@ public:
|
||||
/* The following functions are intended for task implementations only. */
|
||||
SubTask *pop();
|
||||
|
||||
SubTask *get_last_task() const { return this->last; }
|
||||
|
||||
void set_last_task(SubTask *last)
|
||||
{
|
||||
last->set_pointer(this);
|
||||
@@ -125,8 +127,6 @@ public:
|
||||
const ParallelTask *get_in_parallel() const { return this->in_parallel; }
|
||||
|
||||
protected:
|
||||
SubTask *get_last_task() const { return this->last; }
|
||||
|
||||
void set_in_parallel(const ParallelTask *task) { this->in_parallel = task; }
|
||||
|
||||
void dismiss_recursive();
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#ifndef _WFFACILITIES_H_
|
||||
#define _WFFACILITIES_H_
|
||||
|
||||
#include <assert.h>
|
||||
#include "WFFuture.h"
|
||||
#include "WFTaskFactory.h"
|
||||
|
||||
@@ -76,6 +77,49 @@ public:
|
||||
WFFuture<void> future;
|
||||
};
|
||||
|
||||
public:
|
||||
class ReplyGuard
|
||||
{
|
||||
public:
|
||||
ReplyGuard(SubTask *task)
|
||||
{
|
||||
SeriesWork *series = series_of(task);
|
||||
assert(series);
|
||||
assert(task == series->get_last_task());
|
||||
this->cond = new Conditional(task, this);
|
||||
series->set_last_task(this->cond);
|
||||
}
|
||||
|
||||
~ReplyGuard()
|
||||
{
|
||||
if (this->cond)
|
||||
this->cond->signal(NULL);
|
||||
}
|
||||
|
||||
protected:
|
||||
class Conditional : public WFConditional
|
||||
{
|
||||
public:
|
||||
Conditional(SubTask *task, ReplyGuard *guard) :
|
||||
WFConditional(task)
|
||||
{
|
||||
this->guard = guard;
|
||||
}
|
||||
|
||||
/* Make it compatible with series->cancel(). */
|
||||
virtual ~Conditional()
|
||||
{
|
||||
if (this->task)
|
||||
{
|
||||
this->task = NULL;
|
||||
this->guard->cond = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
ReplyGuard *guard;
|
||||
} *cond;
|
||||
};
|
||||
|
||||
private:
|
||||
static void __timer_future_callback(WFTimerTask *task);
|
||||
static void __fio_future_callback(WFFileIOTask *task);
|
||||
|
||||
Reference in New Issue
Block a user