Merge pull request #923 from Barenboim/master

Add named conditional for observer mode.
This commit is contained in:
xiehan
2022-05-30 02:19:34 +08:00
committed by GitHub
8 changed files with 461 additions and 109 deletions

View File

@@ -102,7 +102,7 @@ sudo apt-get install libworkflow1
* Timing tasks and counting tasks
* [About timer](docs/en/about-timer.md)
* [About counter](docs/en/about-counter.md)
* [Conditional and resource pool](docs/en/about-conditional.md)
* [About resource pool](docs/en/about-resource-pool.md)
* [About module](docs/en/about-module.md)
* Service governance
* [About service governance](docs/en/about-service-governance.md)

View File

@@ -85,7 +85,8 @@ make
* 其它一些重要任务与组件
* [关于定时器](docs/about-timer.md)
* [关于计数器](docs/about-counter.md)
* [条件任务与资源池](docs/about-conditional.md)
* [条件任务与观察者模式](docs/about-conditional.md)
* [关于资源池](docs/about-resource-pool.md)
* [模块任务](/docs/about-module.md)
* [DAG图任务](/docs/tutorial-11-graph_task.md)
* 服务治理

View File

@@ -1,115 +1,104 @@
# 条件任务与资源池
# 条件任务与观察者模式
在我们用workflow写异步程序时经常会遇到这样一些场景
* 任务运行时需要先从某个池子里获得一个资源。任务运行结束,则会把资源放回池子,让下一个需要资源的任务运行。
* 网络通信时需要对某一个或一些通信目标做总的并发度限制,但又不希望占用线程等待。
* 我们有许多随机到达的任务处在不同的series里。但这些任务必须**串行**的运行。
有的时候我们需要让任务在某个条件下才被执行。条件任务WFConditional就是用于解决这种问题。
条件任务是一种任务包装器,可以包装任何的任务并取代原任务。通过对条件任务发送信号来触发被包装任务的执行。
所有这些需求,都可以用资源池模块来解决。我们的[WFDnsResolver](https://github.com/sogou/workflow/blob/master/src/nameservice/WFDnsResolver.cc)就是通过这个方法来实现对dns server的并发度控制的。
# 资源池的接口
在[WFResourcePool.h](https://github.com/sogou/workflow/blob/master/src/factory/WFResourcePool.h)里,定义了资源池模块的接口:
# 条件任务的创建
在[WFTaskFactory.h](/src/factory/WFTaskFactory.h)里,可以看到条件任务的创建接口。
~~~cpp
class WFResourcePool
class WFTaskFactory
{
public:
WFConditional *get(SubTask *task, void **resbuf);
WFConditional *get(SubTask *task);
void post(void *res);
...
protected:
virtual void *pop()
{
return this->data.res[this->data.index++];
}
virtual void push(void *res)
{
this->data.res[--this->data.index] = res;
}
...
static WFConditional *create_conditional(SubTask *task);
static WFConditional *create_conditional(SubTask *task, void **msgbuf);
};
~~~
可以看到我们通过工厂的create_conditional接口创建条件任务。
其中task为被包装的任务。msgbuf是用于接收消息的缓冲区如果无需关注消息的具体内容msgbuf可以缺省。
WFConditional的主要接口
~~~cpp
class WFConditional : public WFGenericTask
{
public:
WFResourcePool(void *const *res, size_t n);
WFResourcePool(size_t n);
virtual void signal(void *msg);
...
};
~~~
#### 构造函数
第一个构造函数接受一个资源数组长度为n。数组每个元素为一个void \*。内部会再分配一份相同大小的内存,把数组复制走。
如果你的初始资源都是nullptr那么你可以使用第二个构造函数只需要传n而无需先建立一个全部为nullptr的指针数组。
大概看看内部实现就明白了:
~~~cpp
void WFResourcePool::create(size_t n)
{
this->data.res = new void *[n];
this->data.value = n;
...
}
WFResourcePool::WFResourcePool(void *const *res, size_t n)
{
this->create(n);
memcpy(this->data.res, res, n * sizeof (void *));
}
WFResourcePool::WFResourcePool(size_t n)
{
this->create(n);
memset(this->data.res, 0, n * sizeof (void *));
}
~~~
#### 使用接口
用户使用get()接口把任务打包成一个conditional。conditional是一个条件任务条件满足时运行其包装的任务。
get()接口可包含第二个参数是一个void \*\*resbuf用于保存所获得的资源。
接下来用户只需要用这个conditional取代原来的任务使用就好了可以start或串进任务流。
注意conditional是在它被执行时去尝试获得资源的而不是在它被创建的时候。要不然的话以下代码就会被卡死
~~~cpp
WFResourcePool pool(1);
int f()
{
WFHttpTask *t1 = WFTaskFactory::create_http_task(..., [](void *){pool.post(nullptr);});
WFHttpTask *t2 = WFTaskFactory::create_http_task(..., [](void *){pool.post(nullptr);});
WFConditional *c1 = pool.get(t1, &t1->user_data); // 用user_data来保存res是一种实用方法。
WFConditional *c2 = pool.get(t2, &t2->user_data);
c2->start();
// wait for t2 finish here.
...
c1->start();
...
}
~~~
以上代码c1先创建等待t2结束后才运行。这里并不会出现c2卡死因为conditional是在执行时才获得资源的。
当用户对资源使用完毕一般在任务callback里需要通过post()接口把资源放回池子。
post()时的res参数**无需**与get()得到res的一致。
#### 派生
从上面的pop()和push()函数我们可以看到我们对资源的使用默认是FILO即先进后出的。
使用FILO的原因是大多数场景下刚刚被释放的资源应该优先被复用。
但是用户可以通过派生的方式非常简单的实现一个FIFO资源池。只需要重写pop()和push()两个virtual函数即可。
如果需要,你还可以实现可动态扩展和收缩的资源池。
WFConditional是一种任务所以它满足普通workflow任务的一切属性。特别的接口只有signal用于发送信号。
# 示例
我们准备抓取一份URL列表但要求总的并发度不超过max_p。我们当然可以用parallel来实现但使用资源池可以更简单
以下示例通过timer和conditional实现一个延迟1秒执行的计算任务。
~~~cpp
int fetch_with_max(std::vector<std::string>& url_list, size_t max_p)
int main()
{
WFResourcePool pool(max_p);
for (std::string& url : url_list)
{
WFHttpTask *task = WFTaskFactory::create_http_task(url, [&pool](WFHttpTask *task) {
pool.post(nullptr);
});
WFConditional *cond = pool.get(task); // 无需保存res可以不传resbuf参数。
cond->start();
}
// wait_here...
WFGoTask *task = WFTaskFactory::create_go_task("test", [](){ printf("Done\n"); });
WFConditional *cond = WFTaskFactory::create_conditional(task);
WFTimerTask *timer = WFTaskFactory::create_timer_task(1, 0, [cond](void *){
cond->signal(NULL);
});
timer->start();
cond->start();
getchar();
}
~~~
这个示例里在定时器的回调里向cond发送信号让被包装的go task可以被执行。
注意无论cond->signal()与cond->start()哪一个先被调用,程序都完全正确。
# 观察者模式
我们看到如果直接对cond发送信息需要发送者直接持有cond的指针这在一些情况下并不是很方便。
于是,我们引入了观察者模式,也就是命名的条件任务。通过向某个名称发送信号,同时唤醒所有在这个名称下的条件任务。
命名条件任务的创建与唤醒:
~~~cpp
class WFTaskFactory
{
public:
static WFConditional *create_conditional(const std::string& cond_name, SubTask *task);
static WFConditional *create_conditional(const std::string& cond_name, SubTask *task, void **msgbuf);
static void signal_by_name(const std::string& cond_name, void *msg);
};
~~~
我们看到与普通条件任务唯一区别是命名条件任务创建时需要传入一个cond_name。
而signal_by_name()接口将msg发送到所有在这个名称上等待的条件任务将它们全部唤醒。这就相当于实现了观察者模式。
# 示例
还是上面的延迟计算示例我们增加到两个计算任务并用观察者模式来实现。用”slot1”作为条件任务名。
~~~cpp
int main()
{
WFGoTask *task1 = WFTaskFactory::create_go_task("test”, [](){ printf(“test1 done\n"); });
WFGoTask *task2 = WFTaskFactory::create_go_task("test”, [](){ printf(“test2 done\n"); });
WFConditional *cond1 = WFTaskFactory::create_conditional(“slot1”, task1);
WFConditional *cond2 = WFTaskFactory::create_conditional(“slot1”, task2);
WFTimerTask *timer = WFTaskFactory::create_timer_task(1, 0, [](void *){
WFTaskFactory::signal_by_name(“slot1”, NULL);
});
timer->start();
cond1->start();
cond2->start();
getchar();
}
~~~
我们看到在这个示例里timer在回调中通过signal_by_name方法同时唤醒了slot1下两个计算任务。
# 使用条件任务注意事项
Workflow里的任何任务如果创建之后不想运行都可以通过dismiss接口直接释放。
对于条件任务如果要被dismiss或者在某个被cancel的series里必须保证这个条件任务没有被signal过。
以下代码的行为无定义:
~~~cpp
int main()
{
WFEmptyTask *task = WFTaskFactory::create_empty_task();
WFConditional *cond = WFTaskFactory::create_conditional(“slot1”, task);
WFTimerTask *timer = WFTaskFactory::create_timer_task(0, 0, [](void *) {
WFTaskFactory::signal_by_name(“slot1”);
});
timer->start();
cond->dismiss(); // 取消任务
getchar();
}
~~~
显然如果timer的callback里已经执行或正在执行了signal_by_namecond被signal再dismiss()是一种错误行为。
这种情况一般也只会出现在命名条件任务里。所以dismiss一个命名条件任务需要特别的小心。

115
docs/about-resource-pool.md Normal file
View File

@@ -0,0 +1,115 @@
# 条件任务与资源池
在我们用workflow写异步程序时经常会遇到这样一些场景
* 任务运行时需要先从某个池子里获得一个资源。任务运行结束,则会把资源放回池子,让下一个需要资源的任务运行。
* 网络通信时需要对某一个或一些通信目标做总的并发度限制,但又不希望占用线程等待。
* 我们有许多随机到达的任务处在不同的series里。但这些任务必须**串行**的运行。
所有这些需求,都可以用资源池模块来解决。我们的[WFDnsResolver](https://github.com/sogou/workflow/blob/master/src/nameservice/WFDnsResolver.cc)就是通过这个方法来实现对dns server的并发度控制的。
# 资源池的接口
在[WFResourcePool.h](https://github.com/sogou/workflow/blob/master/src/factory/WFResourcePool.h)里,定义了资源池模块的接口:
~~~cpp
class WFResourcePool
{
public:
WFConditional *get(SubTask *task, void **resbuf);
WFConditional *get(SubTask *task);
void post(void *res);
...
protected:
virtual void *pop()
{
return this->data.res[this->data.index++];
}
virtual void push(void *res)
{
this->data.res[--this->data.index] = res;
}
...
public:
WFResourcePool(void *const *res, size_t n);
WFResourcePool(size_t n);
...
};
~~~
#### 构造函数
第一个构造函数接受一个资源数组长度为n。数组每个元素为一个void \*。内部会再分配一份相同大小的内存,把数组复制走。
如果你的初始资源都是nullptr那么你可以使用第二个构造函数只需要传n而无需先建立一个全部为nullptr的指针数组。
大概看看内部实现就明白了:
~~~cpp
void WFResourcePool::create(size_t n)
{
this->data.res = new void *[n];
this->data.value = n;
...
}
WFResourcePool::WFResourcePool(void *const *res, size_t n)
{
this->create(n);
memcpy(this->data.res, res, n * sizeof (void *));
}
WFResourcePool::WFResourcePool(size_t n)
{
this->create(n);
memset(this->data.res, 0, n * sizeof (void *));
}
~~~
#### 使用接口
用户使用get()接口把任务打包成一个conditional。conditional是一个条件任务条件满足时运行其包装的任务。
get()接口可包含第二个参数是一个void \*\*resbuf用于保存所获得的资源。
接下来用户只需要用这个conditional取代原来的任务使用就好了可以start或串进任务流。
注意conditional是在它被执行时去尝试获得资源的而不是在它被创建的时候。要不然的话以下代码就会被卡死
~~~cpp
WFResourcePool pool(1);
int f()
{
WFHttpTask *t1 = WFTaskFactory::create_http_task(..., [](void *){pool.post(nullptr);});
WFHttpTask *t2 = WFTaskFactory::create_http_task(..., [](void *){pool.post(nullptr);});
WFConditional *c1 = pool.get(t1, &t1->user_data); // 用user_data来保存res是一种实用方法。
WFConditional *c2 = pool.get(t2, &t2->user_data);
c2->start();
// wait for t2 finish here.
...
c1->start();
...
}
~~~
以上代码c1先创建等待t2结束后才运行。这里并不会出现c2卡死因为conditional是在执行时才获得资源的。
当用户对资源使用完毕一般在任务callback里需要通过post()接口把资源放回池子。
post()时的res参数**无需**与get()得到res的一致。
#### 派生
从上面的pop()和push()函数我们可以看到我们对资源的使用默认是FILO即先进后出的。
使用FILO的原因是大多数场景下刚刚被释放的资源应该优先被复用。
但是用户可以通过派生的方式非常简单的实现一个FIFO资源池。只需要重写pop()和push()两个virtual函数即可。
如果需要,你还可以实现可动态扩展和收缩的资源池。
# 示例
我们准备抓取一份URL列表但要求总的并发度不超过max_p。我们当然可以用parallel来实现但使用资源池可以更简单
~~~cpp
int fetch_with_max(std::vector<std::string>& url_list, size_t max_p)
{
WFResourcePool pool(max_p);
for (std::string& url : url_list)
{
WFHttpTask *task = WFTaskFactory::create_http_task(url, [&pool](WFHttpTask *task) {
pool.post(nullptr);
});
WFConditional *cond = pool.get(task); // 无需保存res可以不传resbuf参数。
cond->start();
}
// wait_here...
}
~~~

View File

@@ -22,7 +22,7 @@
#include "WFTask.h"
#include "WFResourcePool.h"
class __WFConditional : public WFConditional
class __RPConditional : public WFConditional
{
public:
struct list_head list;
@@ -33,14 +33,14 @@ public:
virtual void signal(void *res) { }
public:
__WFConditional(SubTask *task, void **resbuf,
__RPConditional(SubTask *task, void **resbuf,
struct WFResourcePool::Data *data) :
WFConditional(task, resbuf)
{
this->data = data;
}
__WFConditional(SubTask *task,
__RPConditional(SubTask *task,
struct WFResourcePool::Data *data) :
WFConditional(task)
{
@@ -48,7 +48,7 @@ public:
}
};
void __WFConditional::dispatch()
void __RPConditional::dispatch()
{
struct WFResourcePool::Data *data = this->data;
@@ -64,12 +64,12 @@ void __WFConditional::dispatch()
WFConditional *WFResourcePool::get(SubTask *task, void **resbuf)
{
return new __WFConditional(task, resbuf, &this->data);
return new __RPConditional(task, resbuf, &this->data);
}
WFConditional *WFResourcePool::get(SubTask *task)
{
return new __WFConditional(task, &this->data);
return new __RPConditional(task, &this->data);
}
void WFResourcePool::create(size_t n)
@@ -101,7 +101,7 @@ void WFResourcePool::post(void *res)
data->mutex.lock();
if (++data->value <= 0)
{
cond = list_entry(data->wait_list.next, __WFConditional, list);
cond = list_entry(data->wait_list.next, __RPConditional, list);
list_del(data->wait_list.next);
}
else

View File

@@ -343,3 +343,242 @@ WFMailboxTask *WFTaskFactory::create_mailbox_task(mailbox_callback_t callback)
{
return new WFMailboxTask(std::move(callback));
}
/****************** Named Conditional ******************/
class __WFConditional;
struct __conditional_node
{
struct list_head list;
__WFConditional *cond;
};
struct __ConditionalList
{
__ConditionalList(const std::string& str):
name(str)
{
INIT_LIST_HEAD(&this->head);
}
void push_back(struct __conditional_node *node)
{
list_add_tail(&node->list, &this->head);
}
bool empty() const
{
return list_empty(&this->head);
}
void del(struct __conditional_node *node)
{
list_del(&node->list);
}
struct rb_node rb;
struct list_head head;
std::string name;
friend class __ConditionalMap;
};
class __ConditionalMap
{
public:
static __ConditionalMap *get_instance()
{
static __ConditionalMap kInstance;
return &kInstance;
}
WFConditional *create(const std::string& name,
SubTask *task, void **msgbuf);
WFConditional *create(const std::string& name, SubTask *task);
void signal(const std::string& name, void *msg);
void signal(struct __ConditionalList *conds,
struct __conditional_node *node,
void *msg);
void remove(struct __ConditionalList *conds,
struct __conditional_node *node);
private:
struct __ConditionalList *get_list(const std::string& name);
__ConditionalMap()
{
conds_map_.rb_node = NULL;
}
struct rb_root conds_map_;
std::mutex mutex_;
};
class __WFConditional : public WFConditional
{
public:
__WFConditional(SubTask *task, void **msgbuf,
struct __ConditionalList *conds) :
WFConditional(task, msgbuf),
conds_(conds)
{
node_.cond = this;
conds_->push_back(&node_);
}
__WFConditional(SubTask *task, struct __ConditionalList *conds) :
WFConditional(task),
conds_(conds)
{
node_.cond = this;
conds_->push_back(&node_);
}
virtual ~__WFConditional()
{
if (!this->flag)
__ConditionalMap::get_instance()->remove(conds_, &node_);
}
virtual void signal(void *msg)
{
__ConditionalMap::get_instance()->signal(conds_, &node_, msg);
}
private:
struct __conditional_node node_;
struct __ConditionalList *conds_;
friend class __ConditionalMap;
};
WFConditional *__ConditionalMap::create(const std::string& name,
SubTask *task, void **msgbuf)
{
std::lock_guard<std::mutex> lock(mutex_);
struct __ConditionalList *conds = get_list(name);
return new __WFConditional(task, msgbuf, conds);
}
WFConditional *__ConditionalMap::create(const std::string& name,
SubTask *task)
{
std::lock_guard<std::mutex> lock(mutex_);
struct __ConditionalList *conds = get_list(name);
return new __WFConditional(task, conds);
}
struct __ConditionalList *__ConditionalMap::get_list(const std::string& name)
{
struct rb_node **p = &conds_map_.rb_node;
struct rb_node *parent = NULL;
struct __ConditionalList *conds;
while (*p)
{
parent = *p;
conds = rb_entry(*p, struct __ConditionalList, rb);
if (name < conds->name)
p = &(*p)->rb_left;
else if (name > conds->name)
p = &(*p)->rb_right;
else
break;
}
if (*p == NULL)
{
conds = new struct __ConditionalList(name);
rb_link_node(&conds->rb, parent, p);
rb_insert_color(&conds->rb, &conds_map_);
}
return conds;
}
void __ConditionalMap::signal(const std::string& name, void *msg)
{
struct rb_node **p = &conds_map_.rb_node;
struct __ConditionalList *conds = NULL;
mutex_.lock();
while (*p)
{
conds = rb_entry(*p, struct __ConditionalList, rb);
if (name < conds->name)
p = &(*p)->rb_left;
else if (name > conds->name)
p = &(*p)->rb_right;
else
{
rb_erase(&conds->rb, &conds_map_);
break;
}
}
mutex_.unlock();
if (!conds)
return;
struct list_head *pos;
struct list_head *tmp;
struct __conditional_node *node;
list_for_each_safe(pos, tmp, &conds->head)
{
node = list_entry(pos, struct __conditional_node, list);
node->cond->WFConditional::signal(msg);
}
delete conds;
}
void __ConditionalMap::signal(struct __ConditionalList *conds,
struct __conditional_node *node,
void *msg)
{
mutex_.lock();
conds->del(node);
if (conds->empty())
{
rb_erase(&conds->rb, &conds_map_);
delete conds;
}
mutex_.unlock();
node->cond->WFConditional::signal(msg);
}
void __ConditionalMap::remove(struct __ConditionalList *conds,
struct __conditional_node *node)
{
mutex_.lock();
conds->del(node);
if (conds->empty())
{
rb_erase(&conds->rb, &conds_map_);
delete conds;
}
mutex_.unlock();
}
WFConditional *WFTaskFactory::create_conditional(const std::string& cond_name,
SubTask *task, void **msgbuf)
{
return __ConditionalMap::get_instance()->create(cond_name, task, msgbuf);
}
WFConditional *WFTaskFactory::create_conditional(const std::string& cond_name,
SubTask *task)
{
return __ConditionalMap::get_instance()->create(cond_name, task);
}
void WFTaskFactory::signal_by_name(const std::string& cond_name, void *msg)
{
__ConditionalMap::get_instance()->signal(cond_name, msg);
}

View File

@@ -275,6 +275,14 @@ public:
return new WFConditional(task);
}
static WFConditional *create_conditional(const std::string& cond_name,
SubTask *task, void **msgbuf);
static WFConditional *create_conditional(const std::string& cond_name,
SubTask *task);
static void signal_by_name(const std::string& cond_name, void *msg);
public:
template<class FUNC, class... ARGS>
static WFGoTask *create_go_task(const std::string& queue_name,