Enable creating a 'Go' task with running time limit.

This commit is contained in:
XieHan
2022-06-13 21:18:05 +08:00
parent 15f1524067
commit 41cf46d10c
4 changed files with 121 additions and 4 deletions

View File

@@ -51,6 +51,23 @@ int main(void)
唯一一点不同是go task创建时不传callback但和其它任务一样可以set_callback。
如果go task函数的某个参数是引用需要使用std::ref否则会变成值传递这是c++11的特征。
# 带执行时间限制的go task
WFGoTask是到目前为止唯一支持带执行时限的一种任务。通过create_timedgo_task接口可以创建带时间限制的go task
~~~cpp
class WFTaskFactory
{
/* Create 'Go' task with running time limit in seconds plus nanoseconds.
* If time exceeded, WFT_STATE_ABORTED will be got in callback. */
template<class FUNC, class... ARGS>
static WFGoTask *create_timedgo_task(time_t seconds, long nanoseconds,
const std::string& queue_name,
FUNC&& func, ARGS&&... args);
};
~~~
相比创建普通的go taskcreate_timedgo_task函数需要多传两个参数seconds和nanoseconds。
如果func的运行时间到达seconds+nanosconds时限task直接callback且state为WFT_STATE_ABORTED。
注意框架无法中断用户执行中的任务。func依然会继续执行到结束但不会再次callback。另外nanoseconds取值区间在\[0,10亿
# 把workflow当成线程池
用户可以只使用go task这样可以将workflow退化成一个线程池而且线程数量默认等于机器cpu数。

View File

@@ -41,7 +41,7 @@ protected:
long nanoseconds;
public:
__WFTimerTask(time_t seconds, long nanoseconds, CommScheduler *scheduler,
__WFTimerTask(CommScheduler *scheduler, time_t seconds, long nanoseconds,
timer_callback_t&& cb) :
WFTimerTask(scheduler, std::move(cb))
{
@@ -53,9 +53,9 @@ public:
WFTimerTask *WFTaskFactory::create_timer_task(unsigned int microseconds,
timer_callback_t callback)
{
return new __WFTimerTask((time_t)(microseconds / 1000000),
return new __WFTimerTask(WFGlobal::get_scheduler(),
(time_t)(microseconds / 1000000),
(long)(microseconds % 1000000 * 1000),
WFGlobal::get_scheduler(),
std::move(callback));
}
@@ -69,7 +69,7 @@ WFTimerTask *WFTaskFactory::create_timer_task(const std::string& name,
WFTimerTask *WFTaskFactory::create_timer_task(time_t seconds, long nanoseconds,
timer_callback_t callback)
{
return new __WFTimerTask(seconds, nanoseconds, WFGlobal::get_scheduler(),
return new __WFTimerTask(WFGlobal::get_scheduler(), seconds, nanoseconds,
std::move(callback));
}
@@ -582,3 +582,53 @@ void WFTaskFactory::signal_by_name(const std::string& cond_name, void *msg)
__ConditionalMap::get_instance()->signal(cond_name, msg);
}
/**************** Timed Go Task *****************/
void __WFTimedGoTask::dispatch()
{
WFTimerTask *timer;
timer = WFTaskFactory::create_timer_task(this->seconds, this->nanoseconds,
__WFTimedGoTask::timer_callback);
timer->user_data = this;
this->__WFGoTask::dispatch();
timer->start();
}
SubTask *__WFTimedGoTask::done()
{
if (this->callback)
this->callback(this);
return series_of(this)->pop();
}
void __WFTimedGoTask::handle(int state, int error)
{
if (--this->ref == 3)
{
this->state = state;
this->error = error;
this->subtask_done();
}
if (--this->ref == 0)
delete this;
}
void __WFTimedGoTask::timer_callback(WFTimerTask *timer)
{
__WFTimedGoTask *task = (__WFTimedGoTask *)timer->user_data;
if (--task->ref == 3)
{
task->state = WFT_STATE_ABORTED;
task->error = 0;
task->subtask_done();
}
if (--task->ref == 0)
delete task;
}

View File

@@ -288,6 +288,13 @@ public:
static WFGoTask *create_go_task(const std::string& queue_name,
FUNC&& func, ARGS&&... args);
/* Create 'Go' task with running time limit in seconds plus nanoseconds.
* If time exceeded, state WFT_STATE_ABORTED will be got in callback. */
template<class FUNC, class... ARGS>
static WFGoTask *create_timedgo_task(time_t seconds, long nanoseconds,
const std::string& queue_name,
FUNC&& func, ARGS&&... args);
public:
static WFGraphTask *create_graph_task(graph_callback_t callback)
{

View File

@@ -28,6 +28,7 @@
#include <string>
#include <functional>
#include <utility>
#include <atomic>
#include "WFGlobal.h"
#include "Workflow.h"
#include "WFTask.h"
@@ -57,6 +58,35 @@ public:
}
};
class __WFTimedGoTask : public __WFGoTask
{
protected:
virtual void dispatch();
virtual SubTask *done();
protected:
virtual void handle(int state, int error);
protected:
static void timer_callback(WFTimerTask *timer);
protected:
time_t seconds;
long nanoseconds;
std::atomic<int> ref;
public:
__WFTimedGoTask(time_t seconds, long nanoseconds,
ExecQueue *queue, Executor *executor,
std::function<void ()>&& func) :
__WFGoTask(queue, executor, std::move(func)),
ref(4)
{
this->seconds = seconds;
this->nanoseconds = nanoseconds;
}
};
template<class FUNC, class... ARGS>
inline WFGoTask *WFTaskFactory::create_go_task(const std::string& queue_name,
FUNC&& func, ARGS&&... args)
@@ -68,6 +98,19 @@ inline WFGoTask *WFTaskFactory::create_go_task(const std::string& queue_name,
std::move(tmp));
}
template<class FUNC, class... ARGS>
WFGoTask *WFTaskFactory::create_timedgo_task(time_t seconds, long nanoseconds,
const std::string& queue_name,
FUNC&& func, ARGS&&... args)
{
auto&& tmp = std::bind(std::forward<FUNC>(func),
std::forward<ARGS>(args)...);
return new __WFTimedGoTask(seconds, nanoseconds,
WFGlobal::get_exec_queue(queue_name),
WFGlobal::get_compute_executor(),
std::move(tmp));
}
class __WFDynamicTask : public WFDynamicTask
{
protected: