Files
workflow/docs/tutorial-08-matrix_multiply.md
2020-08-05 14:42:13 +08:00

158 lines
5.5 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 自定义计算任务matrix_multiply
# 示例代码
[tutorial-08-matrix_multiply.cc](../tutorial/tutorial-08-matrix_multiply.cc)
# 关于matrix_multiply
程序执行代码里两个矩阵的乘法,并将相乘结果打印在屏幕上。
示例的主要目的是展现怎么实现一个自定义CPU计算任务。
# 定义计算任务
定义计算任务需要提供3个基本信息分别为INPUTOUTPUT和routine。
INPUT和OUTPUT是两个模板参数可以是任何类型。routine表示从INPUT到OUTPUT的过程定义如下
~~~cpp
template <class INPUT, class OUTPUT>
class __WFThreadTask
{
...
std::function<void (INPUT *, OUTPUT *)> routine;
...
};
~~~
可以看出routine是一个简单的从INPUT到OUTPUT的计算过程。INPUT指针不要求是const但用户也可以传const INPUT *的函数。
比如一个加法任务,就可这么做:
~~~cpp
struct add_input
{
int x;
int y;
};
struct add_ouput
{
int res;
};
void add_routine(const add_input *input, add_output *output)
{
output->res = input->x + input->y;
}
typedef WFThreadTask<add_input, add_output> add_task;
~~~
在我们的矩阵乘法的示例里,输入是两个矩阵,输出为一个矩阵。其定义如下:
~~~cpp
namespace algorithm
{
using Matrix = std::vector<std::vector<double>>;
struct MMInput
{
Matrix a;
Matrix b;
};
struct MMOutput
{
int error;
size_t m, n, k;
Matrix c;
};
void matrix_multiply(const MMInput *in, MMOutput *out)
{
...
}
}
~~~
矩阵乘法存在有输入矩阵不合法的问题所以output里多了一个error域用来表示错误。
# 生成计算任务
定义好输入输出的类型以及算法的过程之后就可以通过WFThreadTaskFactory工厂来产生计算任务了。
在[WFTaskFactory.h](../src/factory/WFTaskFactory.h)里,计算工厂类的定义如下:
~~~cpp
template <class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:
using T = WFThreadTask<INPUT, OUTPUT>;
public:
static T *create_thread_task(const std::string& queue_name,
std::function<void (INPUT *, OUTPUT *)> routine,
std::function<void (T *)> callback);
...
};
~~~
与之前的网络工厂类或算法工厂类略有不同这个类需要INPUT和OUTPUT两个模板参数。
queue_name相关的知识在上一个示例里已经有介绍。routine就是你的计算过程callback是回调。
在我们的示例里,我们看到了这个调用的使用:
~~~cpp
using MMTask = WFThreadTask<algorithm::MMInput,
algorithm::MMOutput>;
using namespace algorithm;
int main()
{
typedef WFThreadTaskFactory<MMInput, MMOutput> MMFactory;
MMTask *task = MMFactory::create_thread_task("matrix_multiply_task",
matrix_multiply,
callback);
MMInput *input = task->get_input();
input->a = {{1, 2, 3}, {4, 5, 6}};
input->b = {{7, 8}, {9, 10}, {11, 12}};
...
}
~~~
产生了task之后通过get_input()接口得到输入数据的指针。这个可以类比网络任务的get_req()。
任务的发起和结束什么,与网络任务并没有什么区别。同样,回调也很简单:
~~~cpp
void callback(MMTask *task) // MMtask = WFThreadTask<MMInput, MMOutput>
{
MMInput *input = task->get_input();
MMOutput *output = task->get_output();
assert(task->get_state() == WFT_STATE_SUCCESS);
if (output->error)
printf("Error: %d %s\n", output->error, strerror(output->error));
else
{
printf("Matrix A\n");
print_matrix(input->a, output->m, output->k);
printf("Matrix B\n");
print_matrix(input->b, output->k, output->n);
printf("Matrix A * Matrix B =>\n");
print_matrix(output->c, output->m, output->n);
}
}
~~~
普通的计算任务可以忽略失败的可能性结束状态肯定是SUCCESS。
callback里简单打印了输入输出。如果输入数据不合法则打印错误。
# 算法与协议的对称性
在我们的体系里,算法与协议在一个非常抽象的层面上是具有高度对称性的。
有自定义算法的线程任务,那显然也存在自定义协议的网络任务。
自定义算法要求提供算法的过程,而自定义协议则需要用户提供序列化和反序列化的过程,[简单的用户自定义协议client/server](./tutorial-10-user_defined_protocol.md)有介绍。
无论是自定义算法还是自定义协议,我们都必须强调算法和协议都是非常纯粹的。
例如算法就是一个从INPUT到OUPUT的转换过程算法并不知道taskseries等的存在。
HTTP协议的实现上也只关心序列化反序列化无需要关心什么是task。而是在http task里去引用HTTP协议。
# 线程任务与网络任务的复合性
在这个示例里我们通过WFThreadTaskFactory构建了一个线程任务。可以说这是一种最简单的计算任务构建大多数情况下也够用了。
同样用户可以非常简单的定义一个自有协议的server和client。
但在上一个示例里我们看到我们可以通过算法工厂产生一个并行排序任务这显然不是通过一个routine就能做到的。
对于网络任务比如一个kafka任务可能要经过与多台机器的交互才能得到结果但对用户来讲是完全透明的。
所以,我们的任务都是具有复合性的,如果你熟练使用我们的框架,可以设计出很多复杂的组件出来。