update tutorial format

This commit is contained in:
liyingxin
2020-07-30 20:22:02 +08:00
parent e151e8d79a
commit 71c9fd805e
14 changed files with 419 additions and 419 deletions

View File

@@ -105,4 +105,4 @@ int some_function()
}
~~~
在这个示例中当http task是连接上的首个请求是我们设置了cookie。如果不是首个请求根据约定不再设置cookie。
另外prepare函数里可以安全的使用连接上下文。同一个连接上prepare不会并发。
另外prepare函数里可以安全的使用连接上下文。同一个连接上prepare不会并发。

View File

@@ -203,4 +203,4 @@ public:
...
};
~~~
有需要的用户可以自行查阅相关代码。由于WFTaskFactory没有提供工厂函数创造container任务需要自己调用new。
有需要的用户可以自行查阅相关代码。由于WFTaskFactory没有提供工厂函数创造container任务需要自己调用new。

View File

@@ -29,14 +29,14 @@ void callback(WFXxxTask *task)
~~~cpp
enum
{
WFT_STATE_UNDEFINED = -1,
WFT_STATE_SUCCESS = CS_STATE_SUCCESS,
WFT_STATE_TOREPLY = CS_STATE_TOREPLY, /* for server task only */
WFT_STATE_NOREPLY = CS_STATE_TOREPLY + 1, /* for server task only */
WFT_STATE_SYS_ERROR = CS_STATE_ERROR,
WFT_STATE_SSL_ERROR = 65,
WFT_STATE_DNS_ERROR = 66, /* for client task only */
WFT_STATE_TASK_ERROR = 67
WFT_STATE_UNDEFINED = -1,
WFT_STATE_SUCCESS = CS_STATE_SUCCESS,
WFT_STATE_TOREPLY = CS_STATE_TOREPLY, /* for server task only */
WFT_STATE_NOREPLY = CS_STATE_TOREPLY + 1, /* for server task only */
WFT_STATE_SYS_ERROR = CS_STATE_ERROR,
WFT_STATE_SSL_ERROR = 65,
WFT_STATE_DNS_ERROR = 66, /* for client task only */
WFT_STATE_TASK_ERROR = 67
};
~~~
##### 需要关注的几个状态:

View File

@@ -30,19 +30,18 @@ using upstream_route_t = std::function<unsigned int (const char *, const char *,
class UpstreamManager
{
public:
static int upstream_create_consistent_hash(const std::string& name,
upstream_route_t consitent_hash);
static int upstream_create_consistent_hash(const std::string& name,
upstream_route_t consitent_hash);
static int upstream_create_weighted_random(const std::string& name,
bool try_another);
static int upstream_create_manual(const std::string& name,
upstream_route_t select,
bool try_another,
upstream_route_t consitent_hash);
static int upstream_create_weighted_random(const std::string& name,
bool try_another);
static int upstream_create_manual(const std::string& name,
upstream_route_t select,
bool try_another,
upstream_route_t consitent_hash);
static int upstream_delete(const std::string& name);
static int upstream_delete(const std::string& name);
...
};
~~~
@@ -137,7 +136,6 @@ int main()
UpstreamManager::upstream_add_server("redis.name", "10.135.35.55");
...
UpstreamManager::upstream_add_server("redis.name", "10.135.35.62");
auto *task = WFTaskFactory::create_redis_task("redis://:mypassword@redis.name/2?a=hello#111", ...);
...
@@ -164,21 +162,21 @@ struct EndpointParams
// In UpstreamMananger.h
struct AddressParams
{
struct EndpointParams endpoint_params; ///< Connection config
unsigned int dns_ttl_default; ///< in seconds, DNS TTL when network request success
unsigned int dns_ttl_min; ///< in seconds, DNS TTL when network request fail
struct EndpointParams endpoint_params; ///< Connection config
unsigned int dns_ttl_default; ///< in seconds, DNS TTL when network request success
unsigned int dns_ttl_min; ///< in seconds, DNS TTL when network request fail
/**
* - The max_fails directive sets the number of consecutive unsuccessful attempts to communicate with the server.
* - After 30s following the server failure, upstream probe the server with some live clients requests.
* - If the probes have been successful, the server is marked as a live one.
* - If max_fails is set to 1, it means server would out of upstream selection in 30 seconds when failed only once
*/
unsigned int max_fails; ///< [1, INT32_MAX] max_fails = 0 means max_fails = 1
unsigned short weight; ///< [1, 65535] weight = 0 means weight = 1. only for master
#define SERVER_TYPE_MASTER 0
#define SERVER_TYPE_SLAVE 1
int server_type; ///< default is SERVER_TYPE_MASTER
int group_id; ///< -1 means no group. Slave without group will backup for any master
unsigned int max_fails; ///< [1, INT32_MAX] max_fails = 0 means max_fails = 1
unsigned short weight; ///< [1, 65535] weight = 0 means weight = 1. only for master
#define SERVER_TYPE_MASTER 0
#define SERVER_TYPE_SLAVE 1
int server_type; ///< default is SERVER_TYPE_MASTER
int group_id; ///< -1 means no group. Slave without group will backup for any master
};
~~~
大多数参数的作用一眼了然。其中endpoint_params和dns相关参数可以覆盖全局的配置。

View File

@@ -13,18 +13,18 @@
~~~cpp
struct EndpointParams
{
size_t max_connections;
int connect_timeout;
int response_timeout;
int ssl_connect_timeout;
size_t max_connections;
int connect_timeout;
int response_timeout;
int ssl_connect_timeout;
};
static constexpr struct EndpointParams ENDPOINT_PARAMS_DEFAULT =
{
.max_connections = 200,
.connect_timeout = 10 * 1000,
.response_timeout = 10 * 1000,
.ssl_connect_timeout = 10 * 1000,
.max_connections = 200,
.connect_timeout = 10 * 1000,
.response_timeout = 10 * 1000,
.ssl_connect_timeout = 10 * 1000,
};
~~~
其中与超时相关的配置包括以下3项。
@@ -40,24 +40,24 @@ static constexpr struct EndpointParams ENDPOINT_PARAMS_DEFAULT =
~~~cpp
struct WFGlobalSettings
{
EndpointParams endpoint_params;
unsigned int dns_ttl_default;
unsigned int dns_ttl_min;
int dns_threads;
int poller_threads;
int handler_threads;
int compute_threads;
EndpointParams endpoint_params;
unsigned int dns_ttl_default;
unsigned int dns_ttl_min;
int dns_threads;
int poller_threads;
int handler_threads;
int compute_threads;
};
static constexpr struct WFGlobalSettings GLOBAL_SETTINGS_DEFAULT =
{
.endpoint_params = ENDPOINT_PARAMS_DEFAULT,
.dns_ttl_default = 12 * 3600, /* in seconds */
.dns_ttl_min = 180, /* reacquire when communication error */
.dns_threads = 8,
.poller_threads = 2,
.handler_threads = 20,
.compute_threads = -1
.endpoint_params = ENDPOINT_PARAMS_DEFAULT,
.dns_ttl_default = 12 * 3600, /* in seconds */
.dns_ttl_min = 180, /* reacquire when communication error */
.dns_threads = 8,
.poller_threads = 2,
.handler_threads = 20,
.compute_threads = -1
};
//compute_threads<=0 means auto-set by system cpu number
~~~
@@ -97,10 +97,10 @@ class WFNetworkTask : public CommRequest
{
...
public:
/* All in milleseconds. timeout == -1 for unlimited. */
void set_send_timeout(int timeout) { this->send_timeo = timeout; }
void set_receive_timeout(int timeout) { this->receive_timeo = timeout; }
void set_keep_alive(int timeout) { this->keep_alive_timeo = timeout; }
/* All in milleseconds. timeout == -1 for unlimited. */
void set_send_timeout(int timeout) { this->send_timeo = timeout; }
void set_receive_timeout(int timeout) { this->receive_timeo = timeout; }
void set_keep_alive(int timeout) { this->keep_alive_timeo = timeout; }
...
}
~~~

View File

@@ -1,6 +1,5 @@
# 关于定时器
和网络任务,计算任务或文件任务一样,定时器是我们框架中的一种基础任务。
定时器的作用是不占线程的等待一个确定时间同样通过callback来通知定时器到期。
# 定时器的创建

View File

@@ -42,25 +42,25 @@ Upstream和域名DNS解析都可以将一组ip配置到一个Host但是
class UpstreamManager
{
public:
static int upstream_create_consistent_hash(const std::string& name,
upstream_route_t consitent_hash);
static int upstream_create_weighted_random(const std::string& name,
bool try_another);
static int upstream_create_manual(const std::string& name,
upstream_route_t select,
bool try_another,
upstream_route_t consitent_hash);
static int upstream_delete(const std::string& name);
static int upstream_create_consistent_hash(const std::string& name,
upstream_route_t consitent_hash);
static int upstream_create_weighted_random(const std::string& name,
bool try_another);
static int upstream_create_manual(const std::string& name,
upstream_route_t select,
bool try_another,
upstream_route_t consitent_hash);
static int upstream_delete(const std::string& name);
public:
static int upstream_add_server(const std::string& name,
const std::string& address);
static int upstream_add_server(const std::string& name,
const std::string& address,
const struct AddressParams *address_params);
static int upstream_remove_server(const std::string& name,
const std::string& address);
...
static int upstream_add_server(const std::string& name,
const std::string& address);
static int upstream_add_server(const std::string& name,
const std::string& address,
const struct AddressParams *address_params);
static int upstream_remove_server(const std::string& name,
const std::string& address);
...
}
~~~
@@ -68,8 +68,8 @@ public:
配置一个本地反向代理将本地发出的my_proxy.name所有请求均匀的打到6个目标server上
~~~cpp
UpstreamManager::upstream_create_weighted_random(
"my_proxy.name",
true);//如果遇到熔断机器,再次尝试直至找到可用或全部熔断
"my_proxy.name",
true);//如果遇到熔断机器,再次尝试直至找到可用或全部熔断
UpstreamManager::upstream_add_server("my_proxy.name", "192.168.2.100:8081");
UpstreamManager::upstream_add_server("my_proxy.name", "192.168.2.100:8082");
@@ -91,8 +91,8 @@ http_task->start();
配置一个本地反向代理将本地发出的weighted.random所有请求按照5/20/1的权重分配打到3个目标server上
~~~cpp
UpstreamManager::upstream_create_weighted_random(
"weighted.random",
false);//如果遇到熔断机器,不再尝试,这种情况下此次请求必定失败
"weighted.random",
false);//如果遇到熔断机器,不再尝试,这种情况下此次请求必定失败
AddressParams address_params = ADDRESS_PARAMS_DEFAULT;
address_params.weight = 5;//权重为5
@@ -112,8 +112,8 @@ http_task->start();
### 例3 在多个目标中按照框架默认的一致性哈希访问
~~~cpp
UpstreamManager::upstream_create_consistent_hash(
"abc.local",
nullptr);//nullptr代表使用框架默认的一致性哈希函数
"abc.local",
nullptr);//nullptr代表使用框架默认的一致性哈希函数
UpstreamManager::upstream_add_server("abc.local", "192.168.2.100:8081");
UpstreamManager::upstream_add_server("abc.local", "192.168.2.100:8082");
@@ -134,21 +134,21 @@ http_task->start();
### 例4 自定义一致性哈希函数
~~~cpp
UpstreamManager::upstream_create_consistent_hash(
"abc.local",
[](const char *path, const char *query, const char *fragment) -> unsigned int {
unsigned int hash = 0;
"abc.local",
[](const char *path, const char *query, const char *fragment) -> unsigned int {
unsigned int hash = 0;
while (*path)
hash = (hash * 131) + (*path++);
while (*path)
hash = (hash * 131) + (*path++);
while (*query)
hash = (hash * 131) + (*query++);
while (*query)
hash = (hash * 131) + (*query++);
while (*fragment)
hash = (hash * 131) + (*fragment++);
while (*fragment)
hash = (hash * 131) + (*fragment++);
return hash;
});
return hash;
});
UpstreamManager::upstream_add_server("abc.local", "192.168.2.100:8081");
UpstreamManager::upstream_add_server("abc.local", "192.168.2.100:8082");
@@ -166,12 +166,12 @@ http_task->start();
### 例5 自定义选取策略
~~~cpp
UpstreamManager::upstream_create_manual(
"xyz.cdn",
[](const char *path, const char *query, const char *fragment) -> unsigned int {
return atoi(fragment);
},
true,//如果选择到已经熔断的目标,将进行二次选取
nullptr);//nullptr代表二次选取时使用框架默认的一致性哈希函数
"xyz.cdn",
[](const char *path, const char *query, const char *fragment) -> unsigned int {
return atoi(fragment);
},
true,//如果选择到已经熔断的目标,将进行二次选取
nullptr);//nullptr代表二次选取时使用框架默认的一致性哈希函数
UpstreamManager::upstream_add_server("xyz.cdn", "192.168.2.100:8081");
UpstreamManager::upstream_add_server("xyz.cdn", "192.168.2.100:8082");
@@ -191,8 +191,8 @@ http_task->start();
### 例6 简单的主备模式
~~~cpp
UpstreamManager::upstream_create_weighted_random(
"simple.name",
true);//一主一备这项设什么没区别
"simple.name",
true);//一主一备这项设什么没区别
AddressParams address_params = ADDRESS_PARAMS_DEFAULT;
address_params.server_type = SERVER_TYPE_MASTER;
@@ -215,8 +215,8 @@ redis_task->get_req()->set_query("MGET", {"key1", "key2", "key3", "key4"});
### 例7 主备+一致性哈希+分组
~~~cpp
UpstreamManager::upstream_create_consistent_hash(
"abc.local",
nullptr);//nullptr代表使用框架默认的一致性哈希函数
"abc.local",
nullptr);//nullptr代表使用框架默认的一致性哈希函数
AddressParams address_params = ADDRESS_PARAMS_DEFAULT;
address_params.server_type = SERVER_TYPE_MASTER;
@@ -266,42 +266,42 @@ round-robin/weighted-round-robin视为与[1]等价,暂不提供
~~~cpp
struct EndpointParams
{
size_t max_connections;
int connect_timeout;
int response_timeout;
int ssl_connect_timeout;
size_t max_connections;
int connect_timeout;
int response_timeout;
int ssl_connect_timeout;
};
static constexpr struct EndpointParams ENDPOINT_PARAMS_DEFAULT =
{
.max_connections = 200,
.connect_timeout = 10 * 1000,
.response_timeout = 10 * 1000,
.ssl_connect_timeout = 10 * 1000,
.max_connections = 200,
.connect_timeout = 10 * 1000,
.response_timeout = 10 * 1000,
.ssl_connect_timeout = 10 * 1000,
};
struct AddressParams
{
struct EndpointParams endpoint_params;
unsigned int dns_ttl_default;
unsigned int dns_ttl_min;
unsigned int max_fails;
unsigned short weight;
#define SERVER_TYPE_MASTER 0
#define SERVER_TYPE_SLAVE 1
int server_type;
int group_id;
struct EndpointParams endpoint_params;
unsigned int dns_ttl_default;
unsigned int dns_ttl_min;
unsigned int max_fails;
unsigned short weight;
#define SERVER_TYPE_MASTER 0
#define SERVER_TYPE_SLAVE 1
int server_type;
int group_id;
};
static constexpr struct AddressParams ADDRESS_PARAMS_DEFAULT =
{
.endpoint_params = ENDPOINT_PARAMS_DEFAULT,
.dns_ttl_default = 12 * 3600,
.dns_ttl_min = 180,
.max_fails = 200,
.weight = 1, //only for master of UPSTREAM_WEIGHTED_RANDOM
.server_type = SERVER_TYPE_MASTER,
.group_id = -1,
.endpoint_params = ENDPOINT_PARAMS_DEFAULT,
.dns_ttl_default = 12 * 3600,
.dns_ttl_min = 180,
.max_fails = 200,
.weight = 1, //only for master of UPSTREAM_WEIGHTED_RANDOM
.server_type = SERVER_TYPE_MASTER,
.group_id = -1,
};
~~~
每个Addreess都可以配置自己的自定义参数

View File

@@ -85,4 +85,5 @@ public:
~~~
相信这个cursor在使用上应该不会有什么疑惑。
之后一行resp->get_parsed_body()获得response的http body。这个调用在任务成功的状态下必然返回truebody指向数据区。
这个调用得到的是原始的http body不解码chunk编码。如需解码chunk编码可使用[HttpUtil.h](../src/protocol/HttpUtil.h)里的HttpChunkCursor。
这个调用得到的是原始的http body不解码chunk编码。如需解码chunk编码可使用[HttpUtil.h](../src/protocol/HttpUtil.h)里的HttpChunkCursor。

View File

@@ -35,7 +35,7 @@ public:
int start(int family, unsigned short port);
int start(const char *host, unsigned short port);
int start(int family, const char *host, unsigned short port);
int start(const struct sockaddr *bind_addr, socklen_t addrlen);
int start(const struct sockaddr *bind_addr, socklen_t addrlen);
/* To start an SSL server */
int start(unsigned short port, const char *cert_file, const char *key_file);
@@ -45,8 +45,8 @@ public:
const char *cert_file, const char *key_file);
int start(int family, const char *host, unsigned short port,
const char *cert_file, const char *key_file);
int start(const struct sockaddr *bind_addr, socklen_t addrlen,
const char *cert_file, const char *key_file);
int start(const struct sockaddr *bind_addr, socklen_t addrlen,
const char *cert_file, const char *key_file);
/* For graceful restart. */
int serve(int listen_fd);
@@ -92,38 +92,38 @@ WFHttpServer::WFServer(http_process_t proc) :
~~~cpp
void process(WFHttpTask *server_task)
{
protocol::HttpRequest *req = server_task->get_req();
protocol::HttpResponse *resp = server_task->get_resp();
long seq = server_task->get_task_seq();
protocol::HttpHeaderCursor cursor(req);
std::string name;
std::string value;
char buf[8192];
int len;
protocol::HttpRequest *req = server_task->get_req();
protocol::HttpResponse *resp = server_task->get_resp();
long seq = server_task->get_task_seq();
protocol::HttpHeaderCursor cursor(req);
std::string name;
std::string value;
char buf[8192];
int len;
/* Set response message body. */
resp->append_output_body_nocopy("<html>", 6);
len = snprintf(buf, 8192, "<p>%s %s %s</p>", req->get_method(),
req->get_request_uri(), req->get_http_version());
resp->append_output_body(buf, len);
/* Set response message body. */
resp->append_output_body_nocopy("<html>", 6);
len = snprintf(buf, 8192, "<p>%s %s %s</p>", req->get_method(),
req->get_request_uri(), req->get_http_version());
resp->append_output_body(buf, len);
while (cursor.next(name, value))
{
len = snprintf(buf, 8192, "<p>%s: %s</p>", name.c_str(), value.c_str());
resp->append_output_body(buf, len);
}
while (cursor.next(name, value))
{
len = snprintf(buf, 8192, "<p>%s: %s</p>", name.c_str(), value.c_str());
resp->append_output_body(buf, len);
}
resp->append_output_body_nocopy("</html>", 7);
resp->append_output_body_nocopy("</html>", 7);
/* Set status line if you like. */
resp->set_http_version("HTTP/1.1");
resp->set_status_code("200");
resp->set_reason_phrase("OK");
/* Set status line if you like. */
resp->set_http_version("HTTP/1.1");
resp->set_status_code("200");
resp->set_reason_phrase("OK");
resp->add_header_pair("Content-Type", "text/html");
resp->add_header_pair("Server", "Sogou WFHttpServer");
if (seq == 9) /* no more than 10 requests on the same connection. */
resp->add_header_pair("Connection", "close");
resp->add_header_pair("Content-Type", "text/html");
resp->add_header_pair("Server", "Sogou WFHttpServer");
if (seq == 9) /* no more than 10 requests on the same connection. */
resp->add_header_pair("Connection", "close");
// print log
...
@@ -147,8 +147,8 @@ public:
函数中另外一个变量seq通过server_task->get_task_seq()得到表示该请求是当前连接上的第几次请求从0开始计。
程序中完成10次请求之后就强行关闭连接于是
~~~cpp
if (seq == 9) /* no more than 10 requests on the same connection. */
resp->add_header_pair("Connection", "close");
if (seq == 9) /* no more than 10 requests on the same connection. */
resp->add_header_pair("Connection", "close");
~~~
关闭连接还可以通过task->set_keep_alive()接口来完成但对于http协议还是推荐使用设置header的方式。
这个示例中因为返回的页面很小我们没有关注回复成功与否。下一个示例http_proxy我们将看到如果获得回复的状态。

View File

@@ -39,12 +39,12 @@ int main(int argc, char *argv[])
~~~cpp
static constexpr struct WFServerParams HTTP_SERVER_PARAMS_DEFAULT =
{
.max_connections = 1000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
.keep_alive_timeout = 60 * 1000,
.request_size_limit = (size_t)-1,
.ssl_accept_timeout = 10 * 1000,
.max_connections = 1000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
.keep_alive_timeout = 60 * 1000,
.request_size_limit = (size_t)-1,
.ssl_accept_timeout = 10 * 1000,
};
~~~
max_connections最大连接数1000达到上限之后会关闭最久未使用的keep-alive连接。没找到keep-alive连接则拒绝新连接。
@@ -66,45 +66,45 @@ GET / HTTP/1.1
~~~cpp
void process(WFHttpTask *proxy_task)
{
auto *req = proxy_task->get_req();
SeriesWork *series = series_of(proxy_task);
WFHttpTask *http_task; /* for requesting remote webserver. */
auto *req = proxy_task->get_req();
SeriesWork *series = series_of(proxy_task);
WFHttpTask *http_task; /* for requesting remote webserver. */
tutorial_series_context *context = new tutorial_series_context;
context->url = req->get_request_uri();
context->proxy_task = proxy_task;
tutorial_series_context *context = new tutorial_series_context;
context->url = req->get_request_uri();
context->proxy_task = proxy_task;
series->set_context(context);
series->set_callback([](const SeriesWork *series) {
delete (tutorial_series_context *)series->get_context();
});
series->set_context(context);
series->set_callback([](const SeriesWork *series) {
delete (tutorial_series_context *)series->get_context();
});
http_task = WFTaskFactory::create_http_task(req->get_request_uri(), 0, 0,
http_callback);
http_task = WFTaskFactory::create_http_task(req->get_request_uri(), 0, 0,
http_callback);
const void *body;
size_t len;
const void *body;
size_t len;
/* Copy user's request to the new task's reuqest using std::move() */
req->set_request_uri(http_task->get_req()->get_request_uri());
req->get_parsed_body(&body, &len);
req->append_output_body_nocopy(body, len);
*http_task->get_req() = std::move(*req);
/* Copy user's request to the new task's reuqest using std::move() */
req->set_request_uri(http_task->get_req()->get_request_uri());
req->get_parsed_body(&body, &len);
req->append_output_body_nocopy(body, len);
*http_task->get_req() = std::move(*req);
/* also, limit the remote webserver response size. */
http_task->get_resp()->set_size_limit(200 * 1024 * 1024);
/* also, limit the remote webserver response size. */
http_task->get_resp()->set_size_limit(200 * 1024 * 1024);
*series << http_task;
*series << http_task;
}
~~~
以上是process的全部内容。先解释向web server发送的http请求的构造。
req->get_request_uri()调用得到浏览器请求的完整URL通过这个URL构建发往server的http任务。
这个http任务重试与重定向都是0因为重定向是由浏览器处理遇到302等会重新发请求。
~~~cpp
req->set_request_uri(http_task->get_req()->get_request_uri());
req->get_parsed_body(&body, &len);
req->append_output_body_nocopy(body, len);
*http_task->get_req() = std::move(*req);
req->set_request_uri(http_task->get_req()->get_request_uri());
req->get_parsed_body(&body, &len);
req->append_output_body_nocopy(body, len);
*http_task->get_req() = std::move(*req);
~~~
上面4个语句其实是在生成发往web server的http请求。req是我们收到的http请求我们最终要通过std::move()把它直接移动到新请求上。
第一行实际上就是将request_uri里的http://host:port部分去掉只保留path之后的部分。
@@ -131,15 +131,15 @@ void process(WFHttpTask *proxy_task)
{
SeriesWork *series = series_of(proxy_task);
...
tutorial_series_context *context = new tutorial_series_context;
context->url = req->get_request_uri();
context->proxy_task = proxy_task;
tutorial_series_context *context = new tutorial_series_context;
context->url = req->get_request_uri();
context->proxy_task = proxy_task;
series->set_context(context);
series->set_callback([](const SeriesWork *series) {
delete (tutorial_series_context *)series->get_context();
});
...
series->set_context(context);
series->set_callback([](const SeriesWork *series) {
delete (tutorial_series_context *)series->get_context();
});
...
}
~~~
之前client的示例中我们说过任何一个运行中的任务都处在一个series里server任务也不例外。
@@ -148,36 +148,36 @@ void process(WFHttpTask *proxy_task)
~~~cpp
void http_callback(WFHttpTask *task)
{
int state = task->get_state();
auto *resp = task->get_resp();
SeriesWork *series = series_of(task);
tutorial_series_context *context =
(tutorial_series_context *)series->get_context();
auto *proxy_resp = context->proxy_task->get_resp();
int state = task->get_state();
auto *resp = task->get_resp();
SeriesWork *series = series_of(task);
tutorial_series_context *context =
(tutorial_series_context *)series->get_context();
auto *proxy_resp = context->proxy_task->get_resp();
...
if (state == WFT_STATE_SUCCESS)
{
const void *body;
size_t len;
if (state == WFT_STATE_SUCCESS)
{
const void *body;
size_t len;
/* set a callback for getting reply status. */
context->proxy_task->set_callback(reply_callback);
/* set a callback for getting reply status. */
context->proxy_task->set_callback(reply_callback);
/* Copy the remote webserver's response, to proxy response. */
if (resp->get_parsed_body(&body, &len))
resp->append_output_body_nocopy(body, len);
*proxy_resp = std::move(*resp);
...
}
else
{
// return a "404 Not found" page
...
/* Copy the remote webserver's response, to proxy response. */
if (resp->get_parsed_body(&body, &len))
resp->append_output_body_nocopy(body, len);
*proxy_resp = std::move(*resp);
...
}
else
{
// return a "404 Not found" page
...
}
}
~~~
我们只关注成功的情况。一切可以从web server得到一个完整http页面不管什么返回码都是成功。所有失败的情况简单返回一个404页面。
我们只关注成功的情况。一切可以从web server得到一个完整http页面不管什么返回码都是成功。所有失败的情况简单返回一个404页面。
因为返回给用户的数据可能很大在我们这个示例里设置为200MB上限。所以和之前的示例不同我们需要查看reply成功/失败状态。
http server任务和我们自行创建的http client任务的类型是完全相同的都是WFHttpTask。不同的是server任务是框架创建的它的callback初始为空。
server任务的callback和client一样是在http交互完成之后被调用。所以对server任务来讲就是reply完成之后被调用。

View File

@@ -22,14 +22,14 @@ class Workflow
{
...
public:
static ParallelWork *
create_parallel_work(parallel_callback_t callback);
static ParallelWork *
create_parallel_work(parallel_callback_t callback);
static ParallelWork *
create_parallel_work(SeriesWork *const all_series[], size_t n,
parallel_callback_t callback);
static ParallelWork *
create_parallel_work(SeriesWork *const all_series[], size_t n,
parallel_callback_t callback);
...
...
}
~~~
第一个接口创建一个空的并行任务第二个接口用一个series数组创建并行任务。
@@ -38,34 +38,34 @@ public:
~~~cpp
int main(int argc, char *argv[])
{
ParallelWork *pwork = Workflow::create_parallel_work(callback);
SeriesWork *series;
WFHttpTask *task;
HttpRequest *req;
tutorial_series_context *ctx;
int i;
ParallelWork *pwork = Workflow::create_parallel_work(callback);
SeriesWork *series;
WFHttpTask *task;
HttpRequest *req;
tutorial_series_context *ctx;
int i;
for (i = 1; i < argc; i++)
{
std::string url(argv[i]);
for (i = 1; i < argc; i++)
{
std::string url(argv[i]);
...
task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
[](WFHttpTask *task)
{
// store resp to ctx.
});
task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
[](WFHttpTask *task)
{
// store resp to ctx.
});
req = task->get_req();
// add some headers.
...
req = task->get_req();
// add some headers.
...
ctx = new tutorial_series_context;
ctx->url = std::move(url);
series = Workflow::create_series_work(task, NULL);
series->set_context(ctx);
pwork->add_series(series);
}
...
ctx = new tutorial_series_context;
ctx->url = std::move(url);
series = Workflow::create_series_work(task, NULL);
series->set_context(ctx);
pwork->add_series(series);
}
...
}
~~~
从代码中看到我们先创建http任务但http任务并不能直接加入到并行任务里需要先用它创建一个series。
@@ -76,41 +76,41 @@ int main(int argc, char *argv[])
http任务的callback是一个简单的lambda函数把抓取结果保存在自己的series context里以便并行任务获取。
~~~cpp
task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
[](WFHttpTask *task)
[](WFHttpTask *task)
{
tutorial_series_context *ctx =
(tutorial_series_context *)series_of(task)->get_context();
ctx->state = task->get_state();
ctx->error = task->get_error();
ctx->resp = std::move(*task->get_resp());
});
tutorial_series_context *ctx =
(tutorial_series_context *)series_of(task)->get_context();
ctx->state = task->get_state();
ctx->error = task->get_error();
ctx->resp = std::move(*task->get_resp());
});
~~~
这个做法是必须的因为http任务在callback之后就会被回收我们只能把resp通过std::move()操作移走。
而在并行任务的callback里我们可以很方便的获得结果
~~~cpp
void callback(ParallelWork *pwork)
{
tutorial_series_context *ctx;
const void *body;
size_t size;
size_t i;
tutorial_series_context *ctx;
const void *body;
size_t size;
size_t i;
for (i = 0; i < pwork->size(); i++)
{
ctx = (tutorial_series_context *)pwork->series_at(i)->get_context();
printf("%s\n", ctx->url.c_str());
if (ctx->state == WFT_STATE_SUCCESS)
{
ctx->resp.get_parsed_body(&body, &size);
printf("%zu%s\n", size, ctx->resp.is_chunked() ? " chunked" : "");
fwrite(body, 1, size, stdout);
printf("\n");
}
else
printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error);
for (i = 0; i < pwork->size(); i++)
{
ctx = (tutorial_series_context *)pwork->series_at(i)->get_context();
printf("%s\n", ctx->url.c_str());
if (ctx->state == WFT_STATE_SUCCESS)
{
ctx->resp.get_parsed_body(&body, &size);
printf("%zu%s\n", size, ctx->resp.is_chunked() ? " chunked" : "");
fwrite(body, 1, size, stdout);
printf("\n");
}
else
printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error);
delete ctx;
}
delete ctx;
}
}
~~~
在这里我们看到ParallelWork的两个新接口size()和series_at(i)分别获得它的并行series个数和第i个并行series。
@@ -123,4 +123,5 @@ void callback(ParallelWork *pwork)
并行任务是一种任务所以并行任务的启动并没有什么特别可以直接调用start()也可以用它建立或启动一个series。
在这个示例里我们启动一个series在这个series的callback里唤醒主进程正常退出程序。
我们也可以在并行任务的callback里唤醒主进程程序行为上区别不大。但在series callback里唤醒更加规范一点。
我们也可以在并行任务的callback里唤醒主进程程序行为上区别不大。但在series callback里唤醒更加规范一点。

View File

@@ -51,15 +51,15 @@ using Matrix = std::vector<std::vector<double>>;
struct MMInput
{
Matrix a;
Matrix b;
Matrix a;
Matrix b;
};
struct MMOutput
{
int error;
size_t m, n, k;
Matrix c;
int error;
size_t m, n, k;
Matrix c;
};
void matrix_multiply(const MMInput *in, MMOutput *out)
@@ -80,12 +80,12 @@ template <class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:
using T = WFThreadTask<INPUT, OUTPUT>;
using T = WFThreadTask<INPUT, OUTPUT>;
public:
static T *create_thread_task(const std::string& queue_name,
std::function<void (INPUT *, OUTPUT *)> routine,
std::function<void (T *)> callback);
static T *create_thread_task(const std::string& queue_name,
std::function<void (INPUT *, OUTPUT *)> routine,
std::function<void (T *)> callback);
...
};
~~~
@@ -101,15 +101,15 @@ using namespace algorithm;
int main()
{
typedef WFThreadTaskFactory<MMInput, MMOutput> MMFactory;
MMTask *task = MMFactory::create_thread_task("matrix_multiply_task",
matrix_multiply,
callback);
MMTask *task = MMFactory::create_thread_task("matrix_multiply_task",
matrix_multiply,
callback);
MMInput *input = task->get_input();
MMInput *input = task->get_input();
input->a = {{1, 2, 3}, {4, 5, 6}};
input->b = {{7, 8}, {9, 10}, {11, 12}};
...
input->a = {{1, 2, 3}, {4, 5, 6}};
input->b = {{7, 8}, {9, 10}, {11, 12}};
...
}
~~~
产生了task之后通过get_input()接口得到输入数据的指针。这个可以类比网络任务的get_req()。
@@ -117,22 +117,22 @@ int main()
~~~cpp
void callback(MMTask *task) // MMtask = WFThreadTask<MMInput, MMOutput>
{
MMInput *input = task->get_input();
MMOutput *output = task->get_output();
MMInput *input = task->get_input();
MMOutput *output = task->get_output();
assert(task->get_state() == WFT_STATE_SUCCESS);
assert(task->get_state() == WFT_STATE_SUCCESS);
if (output->error)
printf("Error: %d %s\n", output->error, strerror(output->error));
else
{
printf("Matrix A\n");
print_matrix(input->a, output->m, output->k);
printf("Matrix B\n");
print_matrix(input->b, output->k, output->n);
printf("Matrix A * Matrix B =>\n");
print_matrix(output->c, output->m, output->n);
}
if (output->error)
printf("Error: %d %s\n", output->error, strerror(output->error));
else
{
printf("Matrix A\n");
print_matrix(input->a, output->m, output->k);
printf("Matrix B\n");
print_matrix(input->b, output->k, output->n);
printf("Matrix A * Matrix B =>\n");
print_matrix(output->c, output->m, output->n);
}
}
~~~
普通的计算任务可以忽略失败的可能性结束状态肯定是SUCCESS。
@@ -153,4 +153,5 @@ HTTP协议的实现上也只关心序列化反序列化无需要关心什
同样用户可以非常简单的定义一个自有协议的server和client。
但在上一个示例里我们看到我们可以通过算法工厂产生一个并行排序任务这显然不是通过一个routine就能做到的。
对于网络任务比如一个kafka任务可能要经过与多台机器的交互才能得到结果但用户来讲是完全透明的。
所以,我们的任务都是具有复合性的,如果你熟练使用我们的框架,可以设计出很多复杂的组件出来。
所以,我们的任务都是具有复合性的,如果你熟练使用我们的框架,可以设计出很多复杂的组件出来。

View File

@@ -50,27 +50,27 @@ void process(WFHttpTask *server_task, const char *root)
// generate abs path.
...
int fd = open(abs_path.c_str(), O_RDONLY);
if (fd >= 0)
{
size_t size = lseek(fd, 0, SEEK_END);
void *buf = malloc(size); /* As an example, assert(buf != NULL); */
WFFileIOTask *pread_task;
int fd = open(abs_path.c_str(), O_RDONLY);
if (fd >= 0)
{
size_t size = lseek(fd, 0, SEEK_END);
void *buf = malloc(size); /* As an example, assert(buf != NULL); */
WFFileIOTask *pread_task;
pread_task = WFTaskFactory::create_pread_task(fd, buf, size, 0,
pread_callback);
/* To implement a more complicated server, please use series' context
* instead of tasks' user_data to pass/store internal data. */
pread_task->user_data = resp; /* pass resp pointer to pread task. */
server_task->user_data = buf; /* to free() in callback() */
server_task->set_callback([](WFHttpTask *t){ free(t->user_data); });
series_of(server_task)->push_back(pread_task);
}
else
{
resp->set_status_code("404");
resp->append_output_body("<html>404 Not Found.</html>");
}
pread_task = WFTaskFactory::create_pread_task(fd, buf, size, 0,
pread_callback);
/* To implement a more complicated server, please use series' context
* instead of tasks' user_data to pass/store internal data. */
pread_task->user_data = resp; /* pass resp pointer to pread task. */
server_task->user_data = buf; /* to free() in callback() */
server_task->set_callback([](WFHttpTask *t){ free(t->user_data); });
series_of(server_task)->push_back(pread_task);
}
else
{
resp->set_status_code("404");
resp->append_output_body("<html>404 Not Found.</html>");
}
}
~~~
与http_proxy产生一个新的http client任务不同这里我们通过factory产生了一个pread任务。
@@ -78,10 +78,10 @@ void process(WFHttpTask *server_task, const char *root)
~~~cpp
struct FileIOArg
{
int fd;
void *buf;
size_t count;
off_t offset;
int fd;
void *buf;
size_t count;
off_t offset;
};
...
@@ -93,11 +93,11 @@ class WFTaskFactory
{
public:
...
static WFFileIOTask *create_pread_task(int fd, void *buf, size_t count, off_t offset,
fio_callback_t callback);
static WFFileIOTask *create_pread_task(int fd, void *buf, size_t count, off_t offset,
fio_callback_t callback);
static WFFileIOTask *create_pwrite_task(int fd, void *buf, size_t count, off_t offset,
fio_callback_t callback);
static WFFileIOTask *create_pwrite_task(int fd, void *buf, size_t count, off_t offset,
fio_callback_t callback);
...
}
~~~
@@ -113,18 +113,18 @@ using namespace protocol;
void pread_callback(WFFileIOTask *task)
{
FileIOArg *arg = task->get_arg();
long ret = task->get_retval();
HttpResponse *resp = (HttpResponse *)task->user_data;
FileIOArg *arg = task->get_arg();
long ret = task->get_retval();
HttpResponse *resp = (HttpResponse *)task->user_data;
close(arg->fd);
if (ret < 0)
{
resp->set_status_code("503");
resp->append_output_body("<html>503 Internal Server Error.</html>");
}
else /* Use '_nocopy' carefully. */
resp->append_output_body_nocopy(arg->buf, ret);
close(arg->fd);
if (ret < 0)
{
resp->set_status_code("503");
resp->append_output_body("<html>503 Internal Server Error.</html>");
}
else /* Use '_nocopy' carefully. */
resp->append_output_body_nocopy(arg->buf, ret);
}
~~~
文件任务的get_arg()得到输入参数这里是FileIOArg结构。

View File

@@ -86,65 +86,65 @@ namespace protocol
int TutorialMessage::encode(struct iovec vectors[], int max/*max==8192*/)
{
uint32_t n = htonl(this->body_size);
uint32_t n = htonl(this->body_size);
memcpy(this->head, &n, 4);
vectors[0].iov_base = this->head;
vectors[0].iov_len = 4;
vectors[1].iov_base = this->body;
vectors[1].iov_len = this->body_size;
memcpy(this->head, &n, 4);
vectors[0].iov_base = this->head;
vectors[0].iov_len = 4;
vectors[1].iov_base = this->body;
vectors[1].iov_len = this->body_size;
return 2; /* return the number of vectors used, no more then max. */
return 2; /* return the number of vectors used, no more then max. */
}
int TutorialMessage::append(const void *buf, size_t size)
{
if (this->head_received < 4)
{
size_t head_left;
void *p;
if (this->head_received < 4)
{
size_t head_left;
void *p;
p = &this->head[head_received];
head_left = 4 - this->head_received;
if (size < 4 - this->head_received)
{
memcpy(p, buf, size);
this->head_received += size;
return 0;
}
p = &this->head[head_received];
head_left = 4 - this->head_received;
if (size < 4 - this->head_received)
{
memcpy(p, buf, size);
this->head_received += size;
return 0;
}
memcpy(p, buf, head_left);
size -= head_left;
buf = (const char *)buf + head_left;
memcpy(p, buf, head_left);
size -= head_left;
buf = (const char *)buf + head_left;
p = this->head;
this->body_size = ntohl(*(uint32_t *)p);
if (this->body_size > this->size_limit)
{
errno = EMSGSIZE;
return -1;
}
p = this->head;
this->body_size = ntohl(*(uint32_t *)p);
if (this->body_size > this->size_limit)
{
errno = EMSGSIZE;
return -1;
}
this->body = (char *)malloc(this->body_size);
if (!this->body)
return -1;
this->body = (char *)malloc(this->body_size);
if (!this->body)
return -1;
this->body_received = 0;
}
this->body_received = 0;
}
size_t body_left = this->body_size - this->body_received;
size_t body_left = this->body_size - this->body_received;
if (size > body_left)
{
errno = EBADMSG;
return -1;
}
if (size > body_left)
{
errno = EBADMSG;
return -1;
}
memcpy(this->body, buf, body_left);
if (size < body_left)
return 0;
memcpy(this->body, buf, body_left);
if (size < body_left)
return 0;
return 1;
return 1;
}
}
@@ -193,25 +193,25 @@ template<class REQ, class RESP>
class WFNetworkTaskFactory
{
private:
using T = WFNetworkTask<REQ, RESP>;
using T = WFNetworkTask<REQ, RESP>;
public:
static T *create_client_task(TransportType type,
const std::string& host,
unsigned short port,
int retry_max,
std::function<void (T *)> callback);
static T *create_client_task(TransportType type,
const std::string& host,
unsigned short port,
int retry_max,
std::function<void (T *)> callback);
static T *create_client_task(TransportType type,
const std::string& url,
int retry_max,
std::function<void (T *)> callback);
static T *create_client_task(TransportType type,
const std::string& url,
int retry_max,
std::function<void (T *)> callback);
static T *create_client_task(TransportType type,
const URI& uri,
int retry_max,
std::function<void (T *)> callback);
...
static T *create_client_task(TransportType type,
const URI& uri,
int retry_max,
std::function<void (T *)> callback);
...
};
~~~
其中TransportType指定传输层协议目前可选的值包括TT_TCPTT_UDPTT_SCTP和TT_TCP_SSL。
@@ -223,18 +223,18 @@ using namespace protocol;
class MyFactory : public WFTaskFactory
{
public:
static WFTutorialTask *create_tutorial_task(const std::string& host,
unsigned short port,
int retry_max,
tutorial_callback_t callback)
{
using NTF = WFNetworkTaskFactory<TutorialRequest, TutorialResponse>;
WFTutorialTask *task = NTF::create_client_task(TT_TCP, host, port,
retry_max,
std::move(callback));
task->set_keep_alive(30 * 1000);
return task;
}
static WFTutorialTask *create_tutorial_task(const std::string& host,
unsigned short port,
int retry_max,
tutorial_callback_t callback)
{
using NTF = WFNetworkTaskFactory<TutorialRequest, TutorialResponse>;
WFTutorialTask *task = NTF::create_client_task(TT_TCP, host, port,
retry_max,
std::move(callback));
task->set_keep_alive(30 * 1000);
return task;
}
};
~~~
可以看到我们用了WFNetworkTaskFactory<TutorialRequest, TutorialResponse>类来创建client任务。