mirror of
https://github.com/sogou/workflow.git
synced 2026-02-08 01:33:17 +08:00
Update tutorial-08-matrix_multiply.md
This commit is contained in:
@@ -86,9 +86,15 @@ public:
|
||||
static T *create_thread_task(const std::string& queue_name,
|
||||
std::function<void (INPUT *, OUTPUT *)> routine,
|
||||
std::function<void (T *)> callback);
|
||||
|
||||
static T *create_thread_task(time_t seconds, long nanoseconds,
|
||||
const std::string& queue_name,
|
||||
std::function<void (INPUT *, OUTPUT *)> routine,
|
||||
std::function<void (T *)> callback);
|
||||
...
|
||||
};
|
||||
~~~
|
||||
这里包含两个创建任务的接口。第二个接口支持用户传入一下任务运行时间限制,我们在一下节介绍这个功能。
|
||||
与之前的网络工厂类或算法工厂类略有不同,这个类需要INPUT和OUTPUT两个模板参数。
|
||||
queue_name相关的知识在上一个示例里已经有介绍。routine就是你的计算过程,callback是回调。
|
||||
在我们的示例里,我们看到了这个调用的使用:
|
||||
@@ -138,6 +144,96 @@ void callback(MMTask *task) // MMtask = WFThreadTask<MMInput, MMOutput>
|
||||
普通的计算任务可以忽略失败的可能性,结束状态肯定是SUCCESS。
|
||||
callback里简单打印了输入输出。如果输入数据不合法,则打印错误。
|
||||
|
||||
# 带运行时间限制的计算任务
|
||||
|
||||
显然,我们的框架无法打断用户的计算任务,因为用户的计算任务是一个函数,用户需要自行确保函数可以正常结束。
|
||||
但我们支持用户指定一个时间限制,当计算无法在指定时间内完成,任务可以提前回到callback。带运行时间限制的接口定义如下:
|
||||
~~~cpp
|
||||
template <class INPUT, class OUTPUT>
|
||||
class WFThreadTaskFactory
|
||||
{
|
||||
private:
|
||||
using T = WFThreadTask<INPUT, OUTPUT>;
|
||||
|
||||
public:
|
||||
static T *create_thread_task(time_t seconds, long nanoseconds,
|
||||
const std::string& queue_name,
|
||||
std::function<void (INPUT *, OUTPUT *)> routine,
|
||||
std::function<void (T *)> callback);
|
||||
...
|
||||
};
|
||||
~~~
|
||||
参数seconds和nanoseconds构成了运行时限。在这里,nanoseconds的取值范围在\[0,1000000000)。
|
||||
当任务无法在运行时限内结束,会直接回到callback,并且任务的状态为WFT_STATE_ABORTED。
|
||||
还是用matrix_multiply的例子,我们可以这样写:
|
||||
~~~cpp
|
||||
void callback(MMTask *task) // MMtask = WFThreadTask<MMInput, MMOutput>
|
||||
{
|
||||
MMInput *input = task->get_input();
|
||||
MMOutput *output = task->get_output();
|
||||
|
||||
if (task->get_state() == WFT_STATE_ABORTED)
|
||||
{
|
||||
printf("Run out of time.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
assert(task->get_state() == WFT_STATE_SUCCESS)
|
||||
|
||||
if (output->error)
|
||||
printf("Error: %d %s\n", output->error, strerror(output->error));
|
||||
else
|
||||
{
|
||||
printf("Matrix A\n");
|
||||
print_matrix(input->a, output->m, output->k);
|
||||
printf("Matrix B\n");
|
||||
print_matrix(input->b, output->k, output->n);
|
||||
printf("Matrix A * Matrix B =>\n");
|
||||
print_matrix(output->c, output->m, output->n);
|
||||
}
|
||||
}
|
||||
|
||||
using namespace algorithm;
|
||||
|
||||
int main()
|
||||
{
|
||||
typedef WFThreadTaskFactory<MMInput, MMOutput> MMFactory;
|
||||
MMTask *task = MMFactory::create_thread_task(0, 1000000,
|
||||
"matrix_multiply_task",
|
||||
matrix_multiply,
|
||||
callback);
|
||||
|
||||
MMInput *input = task->get_input();
|
||||
|
||||
input->a = {{1, 2, 3}, {4, 5, 6}};
|
||||
input->b = {{7, 8}, {9, 10}, {11, 12}};
|
||||
...
|
||||
}
|
||||
~~~
|
||||
上面的示例,限制了任务运行时间不超过1毫秒,否则,以WFT_STATE_ABORTD的状态返回。
|
||||
再次提醒,我们并不会中断用户的实际运行函数。当任务超时并callback,计算函数还会一直运行直到结束。
|
||||
如果用户希望函数不再继续执行,需要在代码中自行加入检查点来实现这样的功能。可以在INPUT里加入flag,例如:
|
||||
~~~cpp
|
||||
void callback(MMTask *task) // MMtask = WFThreadTask<MMInput, MMOutput>
|
||||
{
|
||||
if (task->get_state() == WFT_STATE_ABORTED)
|
||||
{
|
||||
task->get_input()->flag = true;
|
||||
printf("Run out of time.\n");
|
||||
return;
|
||||
}
|
||||
...
|
||||
}
|
||||
|
||||
void matrix_multiply(const MMInput *in, MMOutput *out)
|
||||
{
|
||||
while (!in->flag)
|
||||
{
|
||||
....
|
||||
}
|
||||
}
|
||||
~~~
|
||||
|
||||
# 算法与协议的对称性
|
||||
|
||||
在我们的体系里,算法与协议在一个非常抽象的层面上是具有高度对称性的。
|
||||
@@ -154,4 +250,3 @@ HTTP协议的实现上,也只关心序列化反序列化,无需要关心什
|
||||
但在上一个示例里我们看到,我们可以通过算法工厂产生一个并行排序任务,这显然不是通过一个routine就能做到的。
|
||||
对于网络任务,比如一个kafka任务,可能要经过与多台机器的交互才能得到结果,但对用户来讲是完全透明的。
|
||||
所以,我们的任务都是具有复合性的,如果你熟练使用我们的框架,可以设计出很多复杂的组件出来。
|
||||
|
||||
|
||||
Reference in New Issue
Block a user