diff --git a/docs/about-connection-context.md b/docs/about-connection-context.md index 81cbb549..b1188dbc 100644 --- a/docs/about-connection-context.md +++ b/docs/about-connection-context.md @@ -105,4 +105,4 @@ int some_function() } ~~~ 在这个示例中,当http task是连接上的首个请求是,我们设置了cookie。如果不是首个请求,根据约定,不再设置cookie。 -另外,prepare函数里,可以安全的使用连接上下文。同一个连接上,prepare不会并发。 \ No newline at end of file +另外,prepare函数里,可以安全的使用连接上下文。同一个连接上,prepare不会并发。 diff --git a/docs/about-counter.md b/docs/about-counter.md index 8decfa3e..73d7f044 100644 --- a/docs/about-counter.md +++ b/docs/about-counter.md @@ -203,4 +203,4 @@ public: ... }; ~~~ -有需要的用户可以自行查阅相关代码。由于WFTaskFactory没有提供工厂函数,创造container任务需要自己调用new。 \ No newline at end of file +有需要的用户可以自行查阅相关代码。由于WFTaskFactory没有提供工厂函数,创造container任务需要自己调用new。 diff --git a/docs/about-error.md b/docs/about-error.md index 75b9bed5..448e7efe 100644 --- a/docs/about-error.md +++ b/docs/about-error.md @@ -29,14 +29,14 @@ void callback(WFXxxTask *task) ~~~cpp enum { - WFT_STATE_UNDEFINED = -1, - WFT_STATE_SUCCESS = CS_STATE_SUCCESS, - WFT_STATE_TOREPLY = CS_STATE_TOREPLY, /* for server task only */ - WFT_STATE_NOREPLY = CS_STATE_TOREPLY + 1, /* for server task only */ - WFT_STATE_SYS_ERROR = CS_STATE_ERROR, - WFT_STATE_SSL_ERROR = 65, - WFT_STATE_DNS_ERROR = 66, /* for client task only */ - WFT_STATE_TASK_ERROR = 67 + WFT_STATE_UNDEFINED = -1, + WFT_STATE_SUCCESS = CS_STATE_SUCCESS, + WFT_STATE_TOREPLY = CS_STATE_TOREPLY, /* for server task only */ + WFT_STATE_NOREPLY = CS_STATE_TOREPLY + 1, /* for server task only */ + WFT_STATE_SYS_ERROR = CS_STATE_ERROR, + WFT_STATE_SSL_ERROR = 65, + WFT_STATE_DNS_ERROR = 66, /* for client task only */ + WFT_STATE_TASK_ERROR = 67 }; ~~~ ##### 需要关注的几个状态: diff --git a/docs/about-service-management.md b/docs/about-service-management.md index 43529e07..bab20ec2 100644 --- a/docs/about-service-management.md +++ b/docs/about-service-management.md @@ -30,19 +30,18 @@ using upstream_route_t = std::functionsend_timeo = timeout; } - void set_receive_timeout(int timeout) { this->receive_timeo = timeout; } - void set_keep_alive(int timeout) { this->keep_alive_timeo = timeout; } + /* All in milleseconds. timeout == -1 for unlimited. */ + void set_send_timeout(int timeout) { this->send_timeo = timeout; } + void set_receive_timeout(int timeout) { this->receive_timeo = timeout; } + void set_keep_alive(int timeout) { this->keep_alive_timeo = timeout; } ... } ~~~ diff --git a/docs/about-timer.md b/docs/about-timer.md index c71e8f17..643c4397 100644 --- a/docs/about-timer.md +++ b/docs/about-timer.md @@ -1,6 +1,5 @@ # 关于定时器 -和网络任务,计算任务或文件任务一样,定时器是我们框架中的一种基础任务。 定时器的作用是不占线程的等待一个确定时间,同样通过callback来通知定时器到期。 # 定时器的创建 diff --git a/docs/about-upstream.md b/docs/about-upstream.md index 27c0f496..4b3dd725 100644 --- a/docs/about-upstream.md +++ b/docs/about-upstream.md @@ -42,25 +42,25 @@ Upstream和域名DNS解析都可以将一组ip配置到一个Host,但是 class UpstreamManager { public: - static int upstream_create_consistent_hash(const std::string& name, - upstream_route_t consitent_hash); - static int upstream_create_weighted_random(const std::string& name, - bool try_another); - static int upstream_create_manual(const std::string& name, - upstream_route_t select, - bool try_another, - upstream_route_t consitent_hash); - static int upstream_delete(const std::string& name); + static int upstream_create_consistent_hash(const std::string& name, + upstream_route_t consitent_hash); + static int upstream_create_weighted_random(const std::string& name, + bool try_another); + static int upstream_create_manual(const std::string& name, + upstream_route_t select, + bool try_another, + upstream_route_t consitent_hash); + static int upstream_delete(const std::string& name); public: - static int upstream_add_server(const std::string& name, - const std::string& address); - static int upstream_add_server(const std::string& name, - const std::string& address, - const struct AddressParams *address_params); - static int upstream_remove_server(const std::string& name, - const std::string& address); - ... + static int upstream_add_server(const std::string& name, + const std::string& address); + static int upstream_add_server(const std::string& name, + const std::string& address, + const struct AddressParams *address_params); + static int upstream_remove_server(const std::string& name, + const std::string& address); + ... } ~~~ @@ -68,8 +68,8 @@ public: 配置一个本地反向代理,将本地发出的my_proxy.name所有请求均匀的打到6个目标server上 ~~~cpp UpstreamManager::upstream_create_weighted_random( - "my_proxy.name", - true);//如果遇到熔断机器,再次尝试直至找到可用或全部熔断 + "my_proxy.name", + true);//如果遇到熔断机器,再次尝试直至找到可用或全部熔断 UpstreamManager::upstream_add_server("my_proxy.name", "192.168.2.100:8081"); UpstreamManager::upstream_add_server("my_proxy.name", "192.168.2.100:8082"); @@ -91,8 +91,8 @@ http_task->start(); 配置一个本地反向代理,将本地发出的weighted.random所有请求按照5/20/1的权重分配打到3个目标server上 ~~~cpp UpstreamManager::upstream_create_weighted_random( - "weighted.random", - false);//如果遇到熔断机器,不再尝试,这种情况下此次请求必定失败 + "weighted.random", + false);//如果遇到熔断机器,不再尝试,这种情况下此次请求必定失败 AddressParams address_params = ADDRESS_PARAMS_DEFAULT; address_params.weight = 5;//权重为5 @@ -112,8 +112,8 @@ http_task->start(); ### 例3 在多个目标中按照框架默认的一致性哈希访问 ~~~cpp UpstreamManager::upstream_create_consistent_hash( - "abc.local", - nullptr);//nullptr代表使用框架默认的一致性哈希函数 + "abc.local", + nullptr);//nullptr代表使用框架默认的一致性哈希函数 UpstreamManager::upstream_add_server("abc.local", "192.168.2.100:8081"); UpstreamManager::upstream_add_server("abc.local", "192.168.2.100:8082"); @@ -134,21 +134,21 @@ http_task->start(); ### 例4 自定义一致性哈希函数 ~~~cpp UpstreamManager::upstream_create_consistent_hash( - "abc.local", - [](const char *path, const char *query, const char *fragment) -> unsigned int { - unsigned int hash = 0; + "abc.local", + [](const char *path, const char *query, const char *fragment) -> unsigned int { + unsigned int hash = 0; - while (*path) - hash = (hash * 131) + (*path++); + while (*path) + hash = (hash * 131) + (*path++); - while (*query) - hash = (hash * 131) + (*query++); + while (*query) + hash = (hash * 131) + (*query++); - while (*fragment) - hash = (hash * 131) + (*fragment++); + while (*fragment) + hash = (hash * 131) + (*fragment++); - return hash; - }); + return hash; + }); UpstreamManager::upstream_add_server("abc.local", "192.168.2.100:8081"); UpstreamManager::upstream_add_server("abc.local", "192.168.2.100:8082"); @@ -166,12 +166,12 @@ http_task->start(); ### 例5 自定义选取策略 ~~~cpp UpstreamManager::upstream_create_manual( - "xyz.cdn", - [](const char *path, const char *query, const char *fragment) -> unsigned int { - return atoi(fragment); - }, - true,//如果选择到已经熔断的目标,将进行二次选取 - nullptr);//nullptr代表二次选取时使用框架默认的一致性哈希函数 + "xyz.cdn", + [](const char *path, const char *query, const char *fragment) -> unsigned int { + return atoi(fragment); + }, + true,//如果选择到已经熔断的目标,将进行二次选取 + nullptr);//nullptr代表二次选取时使用框架默认的一致性哈希函数 UpstreamManager::upstream_add_server("xyz.cdn", "192.168.2.100:8081"); UpstreamManager::upstream_add_server("xyz.cdn", "192.168.2.100:8082"); @@ -191,8 +191,8 @@ http_task->start(); ### 例6 简单的主备模式 ~~~cpp UpstreamManager::upstream_create_weighted_random( - "simple.name", - true);//一主一备这项设什么没区别 + "simple.name", + true);//一主一备这项设什么没区别 AddressParams address_params = ADDRESS_PARAMS_DEFAULT; address_params.server_type = SERVER_TYPE_MASTER; @@ -215,8 +215,8 @@ redis_task->get_req()->set_query("MGET", {"key1", "key2", "key3", "key4"}); ### 例7 主备+一致性哈希+分组 ~~~cpp UpstreamManager::upstream_create_consistent_hash( - "abc.local", - nullptr);//nullptr代表使用框架默认的一致性哈希函数 + "abc.local", + nullptr);//nullptr代表使用框架默认的一致性哈希函数 AddressParams address_params = ADDRESS_PARAMS_DEFAULT; address_params.server_type = SERVER_TYPE_MASTER; @@ -266,42 +266,42 @@ round-robin/weighted-round-robin:视为与[1]等价,暂不提供 ~~~cpp struct EndpointParams { - size_t max_connections; - int connect_timeout; - int response_timeout; - int ssl_connect_timeout; + size_t max_connections; + int connect_timeout; + int response_timeout; + int ssl_connect_timeout; }; static constexpr struct EndpointParams ENDPOINT_PARAMS_DEFAULT = { - .max_connections = 200, - .connect_timeout = 10 * 1000, - .response_timeout = 10 * 1000, - .ssl_connect_timeout = 10 * 1000, + .max_connections = 200, + .connect_timeout = 10 * 1000, + .response_timeout = 10 * 1000, + .ssl_connect_timeout = 10 * 1000, }; struct AddressParams { - struct EndpointParams endpoint_params; - unsigned int dns_ttl_default; - unsigned int dns_ttl_min; - unsigned int max_fails; - unsigned short weight; -#define SERVER_TYPE_MASTER 0 -#define SERVER_TYPE_SLAVE 1 - int server_type; - int group_id; + struct EndpointParams endpoint_params; + unsigned int dns_ttl_default; + unsigned int dns_ttl_min; + unsigned int max_fails; + unsigned short weight; +#define SERVER_TYPE_MASTER 0 +#define SERVER_TYPE_SLAVE 1 + int server_type; + int group_id; }; static constexpr struct AddressParams ADDRESS_PARAMS_DEFAULT = { - .endpoint_params = ENDPOINT_PARAMS_DEFAULT, - .dns_ttl_default = 12 * 3600, - .dns_ttl_min = 180, - .max_fails = 200, - .weight = 1, //only for master of UPSTREAM_WEIGHTED_RANDOM - .server_type = SERVER_TYPE_MASTER, - .group_id = -1, + .endpoint_params = ENDPOINT_PARAMS_DEFAULT, + .dns_ttl_default = 12 * 3600, + .dns_ttl_min = 180, + .max_fails = 200, + .weight = 1, //only for master of UPSTREAM_WEIGHTED_RANDOM + .server_type = SERVER_TYPE_MASTER, + .group_id = -1, }; ~~~ 每个Addreess都可以配置自己的自定义参数: diff --git a/docs/tutorial-01-wget.md b/docs/tutorial-01-wget.md index dde05943..96d7b5d6 100644 --- a/docs/tutorial-01-wget.md +++ b/docs/tutorial-01-wget.md @@ -85,4 +85,5 @@ public: ~~~ 相信这个cursor在使用上应该不会有什么疑惑。 之后一行resp->get_parsed_body()获得response的http body。这个调用在任务成功的状态下,必然返回true,body指向数据区。 -这个调用得到的是原始的http body,不解码chunk编码。如需解码chunk编码,可使用[HttpUtil.h](../src/protocol/HttpUtil.h)里的HttpChunkCursor。 \ No newline at end of file +这个调用得到的是原始的http body,不解码chunk编码。如需解码chunk编码,可使用[HttpUtil.h](../src/protocol/HttpUtil.h)里的HttpChunkCursor。 + diff --git a/docs/tutorial-04-http_echo_server.md b/docs/tutorial-04-http_echo_server.md index 1e5a758a..db73e434 100644 --- a/docs/tutorial-04-http_echo_server.md +++ b/docs/tutorial-04-http_echo_server.md @@ -35,7 +35,7 @@ public: int start(int family, unsigned short port); int start(const char *host, unsigned short port); int start(int family, const char *host, unsigned short port); - int start(const struct sockaddr *bind_addr, socklen_t addrlen); + int start(const struct sockaddr *bind_addr, socklen_t addrlen); /* To start an SSL server */ int start(unsigned short port, const char *cert_file, const char *key_file); @@ -45,8 +45,8 @@ public: const char *cert_file, const char *key_file); int start(int family, const char *host, unsigned short port, const char *cert_file, const char *key_file); - int start(const struct sockaddr *bind_addr, socklen_t addrlen, - const char *cert_file, const char *key_file); + int start(const struct sockaddr *bind_addr, socklen_t addrlen, + const char *cert_file, const char *key_file); /* For graceful restart. */ int serve(int listen_fd); @@ -92,38 +92,38 @@ WFHttpServer::WFServer(http_process_t proc) : ~~~cpp void process(WFHttpTask *server_task) { - protocol::HttpRequest *req = server_task->get_req(); - protocol::HttpResponse *resp = server_task->get_resp(); - long seq = server_task->get_task_seq(); - protocol::HttpHeaderCursor cursor(req); - std::string name; - std::string value; - char buf[8192]; - int len; + protocol::HttpRequest *req = server_task->get_req(); + protocol::HttpResponse *resp = server_task->get_resp(); + long seq = server_task->get_task_seq(); + protocol::HttpHeaderCursor cursor(req); + std::string name; + std::string value; + char buf[8192]; + int len; - /* Set response message body. */ - resp->append_output_body_nocopy("", 6); - len = snprintf(buf, 8192, "

%s %s %s

", req->get_method(), - req->get_request_uri(), req->get_http_version()); - resp->append_output_body(buf, len); + /* Set response message body. */ + resp->append_output_body_nocopy("", 6); + len = snprintf(buf, 8192, "

%s %s %s

", req->get_method(), + req->get_request_uri(), req->get_http_version()); + resp->append_output_body(buf, len); - while (cursor.next(name, value)) - { - len = snprintf(buf, 8192, "

%s: %s

", name.c_str(), value.c_str()); - resp->append_output_body(buf, len); - } + while (cursor.next(name, value)) + { + len = snprintf(buf, 8192, "

%s: %s

", name.c_str(), value.c_str()); + resp->append_output_body(buf, len); + } - resp->append_output_body_nocopy("", 7); + resp->append_output_body_nocopy("", 7); - /* Set status line if you like. */ - resp->set_http_version("HTTP/1.1"); - resp->set_status_code("200"); - resp->set_reason_phrase("OK"); + /* Set status line if you like. */ + resp->set_http_version("HTTP/1.1"); + resp->set_status_code("200"); + resp->set_reason_phrase("OK"); - resp->add_header_pair("Content-Type", "text/html"); - resp->add_header_pair("Server", "Sogou WFHttpServer"); - if (seq == 9) /* no more than 10 requests on the same connection. */ - resp->add_header_pair("Connection", "close"); + resp->add_header_pair("Content-Type", "text/html"); + resp->add_header_pair("Server", "Sogou WFHttpServer"); + if (seq == 9) /* no more than 10 requests on the same connection. */ + resp->add_header_pair("Connection", "close"); // print log ... @@ -147,8 +147,8 @@ public: 函数中另外一个变量seq,通过server_task->get_task_seq()得到,表示该请求是当前连接上的第几次请求,从0开始计。 程序中,完成10次请求之后就强行关闭连接,于是: ~~~cpp - if (seq == 9) /* no more than 10 requests on the same connection. */ - resp->add_header_pair("Connection", "close"); + if (seq == 9) /* no more than 10 requests on the same connection. */ + resp->add_header_pair("Connection", "close"); ~~~ 关闭连接还可以通过task->set_keep_alive()接口来完成,但对于http协议,还是推荐使用设置header的方式。 这个示例中,因为返回的页面很小,我们没有关注回复成功与否。下一个示例http_proxy我们将看到如果获得回复的状态。 diff --git a/docs/tutorial-05-http_proxy.md b/docs/tutorial-05-http_proxy.md index 574a2b57..8583ed35 100644 --- a/docs/tutorial-05-http_proxy.md +++ b/docs/tutorial-05-http_proxy.md @@ -39,12 +39,12 @@ int main(int argc, char *argv[]) ~~~cpp static constexpr struct WFServerParams HTTP_SERVER_PARAMS_DEFAULT = { - .max_connections = 1000, - .peer_response_timeout = 10 * 1000, - .receive_timeout = -1, - .keep_alive_timeout = 60 * 1000, - .request_size_limit = (size_t)-1, - .ssl_accept_timeout = 10 * 1000, + .max_connections = 1000, + .peer_response_timeout = 10 * 1000, + .receive_timeout = -1, + .keep_alive_timeout = 60 * 1000, + .request_size_limit = (size_t)-1, + .ssl_accept_timeout = 10 * 1000, }; ~~~ max_connections:最大连接数1000,达到上限之后会关闭最久未使用的keep-alive连接。没找到keep-alive连接,则拒绝新连接。 @@ -66,45 +66,45 @@ GET / HTTP/1.1 ~~~cpp void process(WFHttpTask *proxy_task) { - auto *req = proxy_task->get_req(); - SeriesWork *series = series_of(proxy_task); - WFHttpTask *http_task; /* for requesting remote webserver. */ + auto *req = proxy_task->get_req(); + SeriesWork *series = series_of(proxy_task); + WFHttpTask *http_task; /* for requesting remote webserver. */ - tutorial_series_context *context = new tutorial_series_context; - context->url = req->get_request_uri(); - context->proxy_task = proxy_task; + tutorial_series_context *context = new tutorial_series_context; + context->url = req->get_request_uri(); + context->proxy_task = proxy_task; - series->set_context(context); - series->set_callback([](const SeriesWork *series) { - delete (tutorial_series_context *)series->get_context(); - }); + series->set_context(context); + series->set_callback([](const SeriesWork *series) { + delete (tutorial_series_context *)series->get_context(); + }); - http_task = WFTaskFactory::create_http_task(req->get_request_uri(), 0, 0, - http_callback); + http_task = WFTaskFactory::create_http_task(req->get_request_uri(), 0, 0, + http_callback); - const void *body; - size_t len; + const void *body; + size_t len; - /* Copy user's request to the new task's reuqest using std::move() */ - req->set_request_uri(http_task->get_req()->get_request_uri()); - req->get_parsed_body(&body, &len); - req->append_output_body_nocopy(body, len); - *http_task->get_req() = std::move(*req); + /* Copy user's request to the new task's reuqest using std::move() */ + req->set_request_uri(http_task->get_req()->get_request_uri()); + req->get_parsed_body(&body, &len); + req->append_output_body_nocopy(body, len); + *http_task->get_req() = std::move(*req); - /* also, limit the remote webserver response size. */ - http_task->get_resp()->set_size_limit(200 * 1024 * 1024); + /* also, limit the remote webserver response size. */ + http_task->get_resp()->set_size_limit(200 * 1024 * 1024); - *series << http_task; + *series << http_task; } ~~~ 以上是process的全部内容。先解释向web server发送的http请求的构造。 req->get_request_uri()调用得到浏览器请求的完整URL,通过这个URL构建发往server的http任务。 这个http任务重试与重定向都是0,因为重定向是由浏览器处理,遇到302等会重新发请求。 ~~~cpp - req->set_request_uri(http_task->get_req()->get_request_uri()); - req->get_parsed_body(&body, &len); - req->append_output_body_nocopy(body, len); - *http_task->get_req() = std::move(*req); + req->set_request_uri(http_task->get_req()->get_request_uri()); + req->get_parsed_body(&body, &len); + req->append_output_body_nocopy(body, len); + *http_task->get_req() = std::move(*req); ~~~ 上面4个语句,其实是在生成发往web server的http请求。req是我们收到的http请求,我们最终要通过std::move()把它直接移动到新请求上。 第一行实际上就是将request_uri里的http://host:port部分去掉,只保留path之后的部分。 @@ -131,15 +131,15 @@ void process(WFHttpTask *proxy_task) { SeriesWork *series = series_of(proxy_task); ... - tutorial_series_context *context = new tutorial_series_context; - context->url = req->get_request_uri(); - context->proxy_task = proxy_task; + tutorial_series_context *context = new tutorial_series_context; + context->url = req->get_request_uri(); + context->proxy_task = proxy_task; - series->set_context(context); - series->set_callback([](const SeriesWork *series) { - delete (tutorial_series_context *)series->get_context(); - }); - ... + series->set_context(context); + series->set_callback([](const SeriesWork *series) { + delete (tutorial_series_context *)series->get_context(); + }); + ... } ~~~ 之前client的示例中我们说过,任何一个运行中的任务,都处在一个series里,server任务也不例外。 @@ -148,36 +148,36 @@ void process(WFHttpTask *proxy_task) ~~~cpp void http_callback(WFHttpTask *task) { - int state = task->get_state(); - auto *resp = task->get_resp(); - SeriesWork *series = series_of(task); - tutorial_series_context *context = - (tutorial_series_context *)series->get_context(); - auto *proxy_resp = context->proxy_task->get_resp(); + int state = task->get_state(); + auto *resp = task->get_resp(); + SeriesWork *series = series_of(task); + tutorial_series_context *context = + (tutorial_series_context *)series->get_context(); + auto *proxy_resp = context->proxy_task->get_resp(); ... - if (state == WFT_STATE_SUCCESS) - { - const void *body; - size_t len; + if (state == WFT_STATE_SUCCESS) + { + const void *body; + size_t len; - /* set a callback for getting reply status. */ - context->proxy_task->set_callback(reply_callback); + /* set a callback for getting reply status. */ + context->proxy_task->set_callback(reply_callback); - /* Copy the remote webserver's response, to proxy response. */ - if (resp->get_parsed_body(&body, &len)) - resp->append_output_body_nocopy(body, len); - *proxy_resp = std::move(*resp); - ... - } - else - { - // return a "404 Not found" page - ... + /* Copy the remote webserver's response, to proxy response. */ + if (resp->get_parsed_body(&body, &len)) + resp->append_output_body_nocopy(body, len); + *proxy_resp = std::move(*resp); + ... + } + else + { + // return a "404 Not found" page + ... } } ~~~ -我们只关注成功的情况。一切可以从web server得到一个完整http页面,不管什么返回码,都是成功。所有失败的情况,简单返回一个404页面。 +我们只关注成功的情况。一切可以从web server得到一个完整http页面,不管什么返回码,都是成功。所有失败的情况,简单返回一个404页面。 因为返回给用户的数据可能很大,在我们这个示例里,设置为200MB上限。所以,和之前的示例不同,我们需要查看reply成功/失败状态。 http server任务和我们自行创建的http client任务的类型是完全相同的,都是WFHttpTask。不同的是server任务是框架创建的,它的callback初始为空。 server任务的callback和client一样,是在http交互完成之后被调用。所以,对server任务来讲,就是reply完成之后被调用。 diff --git a/docs/tutorial-06-parallel_wget.md b/docs/tutorial-06-parallel_wget.md index a38c16a9..1d4c2c12 100644 --- a/docs/tutorial-06-parallel_wget.md +++ b/docs/tutorial-06-parallel_wget.md @@ -22,14 +22,14 @@ class Workflow { ... public: - static ParallelWork * - create_parallel_work(parallel_callback_t callback); + static ParallelWork * + create_parallel_work(parallel_callback_t callback); - static ParallelWork * - create_parallel_work(SeriesWork *const all_series[], size_t n, - parallel_callback_t callback); + static ParallelWork * + create_parallel_work(SeriesWork *const all_series[], size_t n, + parallel_callback_t callback); - ... + ... } ~~~ 第一个接口创建一个空的并行任务,第二个接口用一个series数组创建并行任务。 @@ -38,34 +38,34 @@ public: ~~~cpp int main(int argc, char *argv[]) { - ParallelWork *pwork = Workflow::create_parallel_work(callback); - SeriesWork *series; - WFHttpTask *task; - HttpRequest *req; - tutorial_series_context *ctx; - int i; + ParallelWork *pwork = Workflow::create_parallel_work(callback); + SeriesWork *series; + WFHttpTask *task; + HttpRequest *req; + tutorial_series_context *ctx; + int i; - for (i = 1; i < argc; i++) - { - std::string url(argv[i]); + for (i = 1; i < argc; i++) + { + std::string url(argv[i]); ... - task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX, - [](WFHttpTask *task) - { - // store resp to ctx. - }); + task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX, + [](WFHttpTask *task) + { + // store resp to ctx. + }); - req = task->get_req(); - // add some headers. - ... + req = task->get_req(); + // add some headers. + ... - ctx = new tutorial_series_context; - ctx->url = std::move(url); - series = Workflow::create_series_work(task, NULL); - series->set_context(ctx); - pwork->add_series(series); - } - ... + ctx = new tutorial_series_context; + ctx->url = std::move(url); + series = Workflow::create_series_work(task, NULL); + series->set_context(ctx); + pwork->add_series(series); + } + ... } ~~~ 从代码中看到,我们先创建http任务,但http任务并不能直接加入到并行任务里,需要先用它创建一个series。 @@ -76,41 +76,41 @@ int main(int argc, char *argv[]) http任务的callback是一个简单的lambda函数,把抓取结果保存在自己的series context里,以便并行任务获取。 ~~~cpp task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX, - [](WFHttpTask *task) + [](WFHttpTask *task) { - tutorial_series_context *ctx = - (tutorial_series_context *)series_of(task)->get_context(); - ctx->state = task->get_state(); - ctx->error = task->get_error(); - ctx->resp = std::move(*task->get_resp()); - }); + tutorial_series_context *ctx = + (tutorial_series_context *)series_of(task)->get_context(); + ctx->state = task->get_state(); + ctx->error = task->get_error(); + ctx->resp = std::move(*task->get_resp()); + }); ~~~ 这个做法是必须的,因为http任务在callback之后就会被回收,我们只能把resp通过std::move()操作移走。 而在并行任务的callback里,我们可以很方便的获得结果: ~~~cpp void callback(ParallelWork *pwork) { - tutorial_series_context *ctx; - const void *body; - size_t size; - size_t i; + tutorial_series_context *ctx; + const void *body; + size_t size; + size_t i; - for (i = 0; i < pwork->size(); i++) - { - ctx = (tutorial_series_context *)pwork->series_at(i)->get_context(); - printf("%s\n", ctx->url.c_str()); - if (ctx->state == WFT_STATE_SUCCESS) - { - ctx->resp.get_parsed_body(&body, &size); - printf("%zu%s\n", size, ctx->resp.is_chunked() ? " chunked" : ""); - fwrite(body, 1, size, stdout); - printf("\n"); - } - else - printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error); + for (i = 0; i < pwork->size(); i++) + { + ctx = (tutorial_series_context *)pwork->series_at(i)->get_context(); + printf("%s\n", ctx->url.c_str()); + if (ctx->state == WFT_STATE_SUCCESS) + { + ctx->resp.get_parsed_body(&body, &size); + printf("%zu%s\n", size, ctx->resp.is_chunked() ? " chunked" : ""); + fwrite(body, 1, size, stdout); + printf("\n"); + } + else + printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error); - delete ctx; - } + delete ctx; + } } ~~~ 在这里,我们看到ParallelWork的两个新接口,size()和series_at(i),分别获得它的并行series个数,和第i个并行series。 @@ -123,4 +123,5 @@ void callback(ParallelWork *pwork) 并行任务是一种任务,所以并行任务的启动并没有什么特别,可以直接调用start(),也可以用它建立或启动一个series。 在这个示例里,我们启动一个series,在这个series的callback里唤醒主进程,正常退出程序。 -我们也可以在并行任务的callback里唤醒主进程,程序行为上区别不大。但在series callback里唤醒更加规范一点。 \ No newline at end of file +我们也可以在并行任务的callback里唤醒主进程,程序行为上区别不大。但在series callback里唤醒更加规范一点。 + diff --git a/docs/tutorial-08-matrix_multiply.md b/docs/tutorial-08-matrix_multiply.md index 5f5edb36..a4ece141 100644 --- a/docs/tutorial-08-matrix_multiply.md +++ b/docs/tutorial-08-matrix_multiply.md @@ -51,15 +51,15 @@ using Matrix = std::vector>; struct MMInput { - Matrix a; - Matrix b; + Matrix a; + Matrix b; }; struct MMOutput { - int error; - size_t m, n, k; - Matrix c; + int error; + size_t m, n, k; + Matrix c; }; void matrix_multiply(const MMInput *in, MMOutput *out) @@ -80,12 +80,12 @@ template class WFThreadTaskFactory { private: - using T = WFThreadTask; + using T = WFThreadTask; public: - static T *create_thread_task(const std::string& queue_name, - std::function routine, - std::function callback); + static T *create_thread_task(const std::string& queue_name, + std::function routine, + std::function callback); ... }; ~~~ @@ -101,15 +101,15 @@ using namespace algorithm; int main() { typedef WFThreadTaskFactory MMFactory; - MMTask *task = MMFactory::create_thread_task("matrix_multiply_task", - matrix_multiply, - callback); + MMTask *task = MMFactory::create_thread_task("matrix_multiply_task", + matrix_multiply, + callback); - MMInput *input = task->get_input(); + MMInput *input = task->get_input(); - input->a = {{1, 2, 3}, {4, 5, 6}}; - input->b = {{7, 8}, {9, 10}, {11, 12}}; - ... + input->a = {{1, 2, 3}, {4, 5, 6}}; + input->b = {{7, 8}, {9, 10}, {11, 12}}; + ... } ~~~ 产生了task之后,通过get_input()接口得到输入数据的指针。这个可以类比网络任务的get_req()。 @@ -117,22 +117,22 @@ int main() ~~~cpp void callback(MMTask *task) // MMtask = WFThreadTask { - MMInput *input = task->get_input(); - MMOutput *output = task->get_output(); + MMInput *input = task->get_input(); + MMOutput *output = task->get_output(); - assert(task->get_state() == WFT_STATE_SUCCESS); + assert(task->get_state() == WFT_STATE_SUCCESS); - if (output->error) - printf("Error: %d %s\n", output->error, strerror(output->error)); - else - { - printf("Matrix A\n"); - print_matrix(input->a, output->m, output->k); - printf("Matrix B\n"); - print_matrix(input->b, output->k, output->n); - printf("Matrix A * Matrix B =>\n"); - print_matrix(output->c, output->m, output->n); - } + if (output->error) + printf("Error: %d %s\n", output->error, strerror(output->error)); + else + { + printf("Matrix A\n"); + print_matrix(input->a, output->m, output->k); + printf("Matrix B\n"); + print_matrix(input->b, output->k, output->n); + printf("Matrix A * Matrix B =>\n"); + print_matrix(output->c, output->m, output->n); + } } ~~~ 普通的计算任务可以忽略失败的可能性,结束状态肯定是SUCCESS。 @@ -153,4 +153,5 @@ HTTP协议的实现上,也只关心序列化反序列化,无需要关心什 同样,用户可以非常简单的定义一个自有协议的server和client。 但在上一个示例里我们看到,我们可以通过算法工厂产生一个并行排序任务,这显然不是通过一个routine就能做到的。 对于网络任务,比如一个kafka任务,可能要经过与多台机器的交互才能得到结果,但用户来讲是完全透明的。 -所以,我们的任务都是具有复合性的,如果你熟练使用我们的框架,可以设计出很多复杂的组件出来。 \ No newline at end of file +所以,我们的任务都是具有复合性的,如果你熟练使用我们的框架,可以设计出很多复杂的组件出来。 + diff --git a/docs/tutorial-09-http_file_server.md b/docs/tutorial-09-http_file_server.md index cd4d42f6..d94cf6cc 100644 --- a/docs/tutorial-09-http_file_server.md +++ b/docs/tutorial-09-http_file_server.md @@ -50,27 +50,27 @@ void process(WFHttpTask *server_task, const char *root) // generate abs path. ... - int fd = open(abs_path.c_str(), O_RDONLY); - if (fd >= 0) - { - size_t size = lseek(fd, 0, SEEK_END); - void *buf = malloc(size); /* As an example, assert(buf != NULL); */ - WFFileIOTask *pread_task; + int fd = open(abs_path.c_str(), O_RDONLY); + if (fd >= 0) + { + size_t size = lseek(fd, 0, SEEK_END); + void *buf = malloc(size); /* As an example, assert(buf != NULL); */ + WFFileIOTask *pread_task; - pread_task = WFTaskFactory::create_pread_task(fd, buf, size, 0, - pread_callback); - /* To implement a more complicated server, please use series' context - * instead of tasks' user_data to pass/store internal data. */ - pread_task->user_data = resp; /* pass resp pointer to pread task. */ - server_task->user_data = buf; /* to free() in callback() */ - server_task->set_callback([](WFHttpTask *t){ free(t->user_data); }); - series_of(server_task)->push_back(pread_task); - } - else - { - resp->set_status_code("404"); - resp->append_output_body("404 Not Found."); - } + pread_task = WFTaskFactory::create_pread_task(fd, buf, size, 0, + pread_callback); + /* To implement a more complicated server, please use series' context + * instead of tasks' user_data to pass/store internal data. */ + pread_task->user_data = resp; /* pass resp pointer to pread task. */ + server_task->user_data = buf; /* to free() in callback() */ + server_task->set_callback([](WFHttpTask *t){ free(t->user_data); }); + series_of(server_task)->push_back(pread_task); + } + else + { + resp->set_status_code("404"); + resp->append_output_body("404 Not Found."); + } } ~~~ 与http_proxy产生一个新的http client任务不同,这里我们通过factory产生了一个pread任务。 @@ -78,10 +78,10 @@ void process(WFHttpTask *server_task, const char *root) ~~~cpp struct FileIOArg { - int fd; - void *buf; - size_t count; - off_t offset; + int fd; + void *buf; + size_t count; + off_t offset; }; ... @@ -93,11 +93,11 @@ class WFTaskFactory { public: ... - static WFFileIOTask *create_pread_task(int fd, void *buf, size_t count, off_t offset, - fio_callback_t callback); + static WFFileIOTask *create_pread_task(int fd, void *buf, size_t count, off_t offset, + fio_callback_t callback); - static WFFileIOTask *create_pwrite_task(int fd, void *buf, size_t count, off_t offset, - fio_callback_t callback); + static WFFileIOTask *create_pwrite_task(int fd, void *buf, size_t count, off_t offset, + fio_callback_t callback); ... } ~~~ @@ -113,18 +113,18 @@ using namespace protocol; void pread_callback(WFFileIOTask *task) { - FileIOArg *arg = task->get_arg(); - long ret = task->get_retval(); - HttpResponse *resp = (HttpResponse *)task->user_data; + FileIOArg *arg = task->get_arg(); + long ret = task->get_retval(); + HttpResponse *resp = (HttpResponse *)task->user_data; - close(arg->fd); - if (ret < 0) - { - resp->set_status_code("503"); - resp->append_output_body("503 Internal Server Error."); - } - else /* Use '_nocopy' carefully. */ - resp->append_output_body_nocopy(arg->buf, ret); + close(arg->fd); + if (ret < 0) + { + resp->set_status_code("503"); + resp->append_output_body("503 Internal Server Error."); + } + else /* Use '_nocopy' carefully. */ + resp->append_output_body_nocopy(arg->buf, ret); } ~~~ 文件任务的get_arg()得到输入参数,这里是FileIOArg结构。 diff --git a/docs/tutorial-10-user_defined_protocol.md b/docs/tutorial-10-user_defined_protocol.md index aa046be7..244d2207 100644 --- a/docs/tutorial-10-user_defined_protocol.md +++ b/docs/tutorial-10-user_defined_protocol.md @@ -86,65 +86,65 @@ namespace protocol int TutorialMessage::encode(struct iovec vectors[], int max/*max==8192*/) { - uint32_t n = htonl(this->body_size); + uint32_t n = htonl(this->body_size); - memcpy(this->head, &n, 4); - vectors[0].iov_base = this->head; - vectors[0].iov_len = 4; - vectors[1].iov_base = this->body; - vectors[1].iov_len = this->body_size; + memcpy(this->head, &n, 4); + vectors[0].iov_base = this->head; + vectors[0].iov_len = 4; + vectors[1].iov_base = this->body; + vectors[1].iov_len = this->body_size; - return 2; /* return the number of vectors used, no more then max. */ + return 2; /* return the number of vectors used, no more then max. */ } int TutorialMessage::append(const void *buf, size_t size) { - if (this->head_received < 4) - { - size_t head_left; - void *p; + if (this->head_received < 4) + { + size_t head_left; + void *p; - p = &this->head[head_received]; - head_left = 4 - this->head_received; - if (size < 4 - this->head_received) - { - memcpy(p, buf, size); - this->head_received += size; - return 0; - } + p = &this->head[head_received]; + head_left = 4 - this->head_received; + if (size < 4 - this->head_received) + { + memcpy(p, buf, size); + this->head_received += size; + return 0; + } - memcpy(p, buf, head_left); - size -= head_left; - buf = (const char *)buf + head_left; + memcpy(p, buf, head_left); + size -= head_left; + buf = (const char *)buf + head_left; - p = this->head; - this->body_size = ntohl(*(uint32_t *)p); - if (this->body_size > this->size_limit) - { - errno = EMSGSIZE; - return -1; - } + p = this->head; + this->body_size = ntohl(*(uint32_t *)p); + if (this->body_size > this->size_limit) + { + errno = EMSGSIZE; + return -1; + } - this->body = (char *)malloc(this->body_size); - if (!this->body) - return -1; + this->body = (char *)malloc(this->body_size); + if (!this->body) + return -1; - this->body_received = 0; - } + this->body_received = 0; + } - size_t body_left = this->body_size - this->body_received; + size_t body_left = this->body_size - this->body_received; - if (size > body_left) - { - errno = EBADMSG; - return -1; - } + if (size > body_left) + { + errno = EBADMSG; + return -1; + } - memcpy(this->body, buf, body_left); - if (size < body_left) - return 0; + memcpy(this->body, buf, body_left); + if (size < body_left) + return 0; - return 1; + return 1; } } @@ -193,25 +193,25 @@ template class WFNetworkTaskFactory { private: - using T = WFNetworkTask; + using T = WFNetworkTask; public: - static T *create_client_task(TransportType type, - const std::string& host, - unsigned short port, - int retry_max, - std::function callback); + static T *create_client_task(TransportType type, + const std::string& host, + unsigned short port, + int retry_max, + std::function callback); - static T *create_client_task(TransportType type, - const std::string& url, - int retry_max, - std::function callback); + static T *create_client_task(TransportType type, + const std::string& url, + int retry_max, + std::function callback); - static T *create_client_task(TransportType type, - const URI& uri, - int retry_max, - std::function callback); - ... + static T *create_client_task(TransportType type, + const URI& uri, + int retry_max, + std::function callback); + ... }; ~~~ 其中,TransportType指定传输层协议,目前可选的值包括TT_TCP,TT_UDP,TT_SCTP和TT_TCP_SSL。 @@ -223,18 +223,18 @@ using namespace protocol; class MyFactory : public WFTaskFactory { public: - static WFTutorialTask *create_tutorial_task(const std::string& host, - unsigned short port, - int retry_max, - tutorial_callback_t callback) - { - using NTF = WFNetworkTaskFactory; - WFTutorialTask *task = NTF::create_client_task(TT_TCP, host, port, - retry_max, - std::move(callback)); - task->set_keep_alive(30 * 1000); - return task; - } + static WFTutorialTask *create_tutorial_task(const std::string& host, + unsigned short port, + int retry_max, + tutorial_callback_t callback) + { + using NTF = WFNetworkTaskFactory; + WFTutorialTask *task = NTF::create_client_task(TT_TCP, host, port, + retry_max, + std::move(callback)); + task->set_keep_alive(30 * 1000); + return task; + } }; ~~~ 可以看到我们用了WFNetworkTaskFactory类来创建client任务。