From a375a71bd0178237e11d0c10dd2e5ee7aa121f29 Mon Sep 17 00:00:00 2001 From: alpc62 Date: Wed, 5 Aug 2020 14:25:15 +0800 Subject: [PATCH] pause() replace by WaitGroup --- tutorial/tutorial-01-wget.cc | 12 +++++-- tutorial/tutorial-02-redis_cli.cc | 12 +++++-- tutorial/tutorial-03-wget_to_redis.cc | 21 ++++-------- tutorial/tutorial-04-http_echo_server.cc | 11 ++++-- tutorial/tutorial-05-http_proxy.cc | 11 ++++-- tutorial/tutorial-06-parallel_wget.cc | 21 +++--------- tutorial/tutorial-07-sort_task.cc | 25 ++++---------- tutorial/tutorial-08-matrix_multiply.cc | 21 +++--------- tutorial/tutorial-09-http_file_server.cc | 10 ++++-- .../client.cc | 21 +++--------- .../server.cc | 11 ++++-- tutorial/tutorial-12-mysql_cli.cc | 34 ++++++------------- 12 files changed, 88 insertions(+), 122 deletions(-) diff --git a/tutorial/tutorial-01-wget.cc b/tutorial/tutorial-01-wget.cc index 1aedda7b..e2ef770e 100644 --- a/tutorial/tutorial-01-wget.cc +++ b/tutorial/tutorial-01-wget.cc @@ -17,7 +17,6 @@ */ #include -#include #include #include #include @@ -26,6 +25,7 @@ #include "workflow/HttpMessage.h" #include "workflow/HttpUtil.h" #include "workflow/WFTaskFactory.h" +#include "workflow/WFFacilities.h" #define REDIRECT_MAX 5 #define RETRY_MAX 2 @@ -97,7 +97,12 @@ void wget_callback(WFHttpTask *task) fprintf(stderr, "\nSuccess. Press Ctrl-C to exit.\n"); } -void sig_handler(int signo) { } +static WFFacilities::WaitGroup wait_group(1); + +void sig_handler(int signo) +{ + wait_group.done(); +} int main(int argc, char *argv[]) { @@ -125,7 +130,8 @@ int main(int argc, char *argv[]) req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)"); req->add_header_pair("Connection", "close"); task->start(); - pause(); + + wait_group.wait(); return 0; } diff --git a/tutorial/tutorial-02-redis_cli.cc b/tutorial/tutorial-02-redis_cli.cc index 688cfdc6..bfe3cb9b 100644 --- a/tutorial/tutorial-02-redis_cli.cc +++ b/tutorial/tutorial-02-redis_cli.cc @@ -17,7 +17,6 @@ */ #include -#include #include #include #include @@ -25,6 +24,7 @@ #include #include "workflow/RedisMessage.h" #include "workflow/WFTaskFactory.h" +#include "workflow/WFFacilities.h" #define RETRY_MAX 2 @@ -102,7 +102,12 @@ void redis_callback(WFRedisTask *task) } } -void sig_handler(int signo) { } +static WFFacilities::WaitGroup wait_group(1); + +void sig_handler(int signo) +{ + wait_group.done(); +} int main(int argc, char *argv[]) { @@ -145,7 +150,8 @@ int main(int argc, char *argv[]) * Workflow::start_series_work(task, nullptr) or * Workflow::create_series_work(task, nullptr)->start() */ task->start(); - pause(); + + wait_group.wait(); return 0; } diff --git a/tutorial/tutorial-03-wget_to_redis.cc b/tutorial/tutorial-03-wget_to_redis.cc index 6559e0cd..487f489f 100644 --- a/tutorial/tutorial-03-wget_to_redis.cc +++ b/tutorial/tutorial-03-wget_to_redis.cc @@ -18,18 +18,16 @@ /* Tuturial-03. Store wget result in redis: key=URL, value=Http Body*/ #include -#include #include #include #include #include -#include -#include #include "workflow/HttpMessage.h" #include "workflow/HttpUtil.h" #include "workflow/RedisMessage.h" #include "workflow/Workflow.h" #include "workflow/WFTaskFactory.h" +#include "workflow/WFFacilities.h" using namespace protocol; @@ -110,9 +108,6 @@ void http_callback(WFHttpTask *task) int main(int argc, char *argv[]) { - std::mutex mutex; - std::condition_variable cond; - bool finished = false; WFHttpTask *http_task; if (argc != 3) @@ -152,7 +147,9 @@ int main(int argc, char *argv[]) /* no more than 30 seconds receiving http response. */ http_task->set_receive_timeout(30 * 1000); - auto series_callback = [&mutex, &cond, &finished](const SeriesWork *series) + WFFacilities::WaitGroup wait_group(1); + + auto series_callback = [&wait_group](const SeriesWork *series) { tutorial_series_context *context = (tutorial_series_context *) series->get_context(); @@ -163,10 +160,7 @@ int main(int argc, char *argv[]) fprintf(stderr, "Series finished. failed!\n"); /* signal the main() to terminate */ - mutex.lock(); - finished = true; - cond.notify_one(); - mutex.unlock(); + wait_group.done(); }; /* Create a series */ @@ -175,10 +169,7 @@ int main(int argc, char *argv[]) series->set_context(&context); series->start(); - std::unique_lock lock(mutex); - while (!finished) - cond.wait(lock); - lock.unlock(); + wait_group.wait(); return 0; } diff --git a/tutorial/tutorial-04-http_echo_server.cc b/tutorial/tutorial-04-http_echo_server.cc index eb00ed3f..2cbd6f70 100644 --- a/tutorial/tutorial-04-http_echo_server.cc +++ b/tutorial/tutorial-04-http_echo_server.cc @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include @@ -30,6 +29,7 @@ #include "workflow/HttpUtil.h" #include "workflow/WFServer.h" #include "workflow/WFHttpServer.h" +#include "workflow/WFFacilities.h" void process(WFHttpTask *server_task) { @@ -92,7 +92,12 @@ void process(WFHttpTask *server_task) addrstr, port, seq); } -void sig_handler(int signo) { } +static WFFacilities::WaitGroup wait_group(1); + +void sig_handler(int signo) +{ + wait_group.done(); +} int main(int argc, char *argv[]) { @@ -110,7 +115,7 @@ int main(int argc, char *argv[]) port = atoi(argv[1]); if (server.start(port) == 0) { - pause(); + wait_group.wait(); server.stop(); } else diff --git a/tutorial/tutorial-05-http_proxy.cc b/tutorial/tutorial-05-http_proxy.cc index c008e52d..769a72c5 100644 --- a/tutorial/tutorial-05-http_proxy.cc +++ b/tutorial/tutorial-05-http_proxy.cc @@ -17,7 +17,6 @@ */ #include -#include #include #include #include @@ -25,6 +24,7 @@ #include "workflow/HttpMessage.h" #include "workflow/HttpUtil.h" #include "workflow/WFHttpServer.h" +#include "workflow/WFFacilities.h" struct tutorial_series_context { @@ -137,7 +137,12 @@ void process(WFHttpTask *proxy_task) *series << http_task; } -void sig_handler(int signo) { } +static WFFacilities::WaitGroup wait_group(1); + +void sig_handler(int signo) +{ + wait_group.done(); +} int main(int argc, char *argv[]) { @@ -160,7 +165,7 @@ int main(int argc, char *argv[]) if (server.start(port) == 0) { - pause(); + wait_group.wait(); server.stop(); } else diff --git a/tutorial/tutorial-06-parallel_wget.cc b/tutorial/tutorial-06-parallel_wget.cc index 8053a5ba..ec9afeba 100644 --- a/tutorial/tutorial-06-parallel_wget.cc +++ b/tutorial/tutorial-06-parallel_wget.cc @@ -20,12 +20,11 @@ #include #include #include -#include -#include #include "workflow/Workflow.h" #include "workflow/WFTaskFactory.h" #include "workflow/HttpMessage.h" #include "workflow/HttpUtil.h" +#include "workflow/WFFacilities.h" using namespace protocol; @@ -106,23 +105,13 @@ int main(int argc, char *argv[]) pwork->add_series(series); } - std::mutex mutex; - std::condition_variable cond; - bool finished = false; + WFFacilities::WaitGroup wait_group(1); - Workflow::start_series_work(pwork, - [&mutex, &cond, &finished](const SeriesWork *) - { - mutex.lock(); - finished = true; - cond.notify_one(); - mutex.unlock(); + Workflow::start_series_work(pwork, [&wait_group](const SeriesWork *) { + wait_group.done(); }); - std::unique_lock lock(mutex); - while (!finished) - cond.wait(lock); - lock.unlock(); + wait_group.wait(); return 0; } diff --git a/tutorial/tutorial-07-sort_task.cc b/tutorial/tutorial-07-sort_task.cc index 1aaa6b31..ee965ad5 100644 --- a/tutorial/tutorial-07-sort_task.cc +++ b/tutorial/tutorial-07-sort_task.cc @@ -18,17 +18,15 @@ #include #include -#include -#include #include "workflow/WFTaskFactory.h" - -bool use_parallel_sort = false; -bool finished = false; -std::mutex mutex; -std::condition_variable cond; +#include "workflow/WFFacilities.h" using namespace algorithm; +static WFFacilities::WaitGroup wait_group(1); + +bool use_parallel_sort = false; + void callback(WFSortTask *task) { /* Sort task's input and output are identical. */ @@ -60,12 +58,7 @@ void callback(WFSortTask *task) printf("Sort reversely:\n"); } else - { - mutex.lock(); - finished = true; - cond.notify_one(); - mutex.unlock(); - } + wait_group.done(); } int main(int argc, char *argv[]) @@ -112,11 +105,7 @@ int main(int argc, char *argv[]) printf("Sort result:\n"); task->start(); - std::unique_lock lock(mutex); - while (!finished) - cond.wait(lock); - lock.unlock(); - + wait_group.wait(); free(array); return 0; } diff --git a/tutorial/tutorial-08-matrix_multiply.cc b/tutorial/tutorial-08-matrix_multiply.cc index 3bdab060..07a30180 100644 --- a/tutorial/tutorial-08-matrix_multiply.cc +++ b/tutorial/tutorial-08-matrix_multiply.cc @@ -21,9 +21,8 @@ #include #include #include -#include -#include #include "workflow/WFTaskFactory.h" +#include "workflow/WFFacilities.h" namespace algorithm { @@ -146,23 +145,13 @@ int main() input->a = {{1, 2, 3}, {4, 5, 6}}; input->b = {{7, 8}, {9, 10}, {11, 12}}; - std::mutex mutex; - std::condition_variable cond; - bool finished = false; + WFFacilities::WaitGroup wait_group(1); - Workflow::start_series_work(task, - [&mutex, &cond, &finished](const SeriesWork *) - { - mutex.lock(); - finished = true; - cond.notify_one(); - mutex.unlock(); + Workflow::start_series_work(task, [&wait_group](const SeriesWork *) { + wait_group.done(); }); - std::unique_lock lock(mutex); - while (!finished) - cond.wait(lock); - lock.unlock(); + wait_group.wait(); return 0; } diff --git a/tutorial/tutorial-09-http_file_server.cc b/tutorial/tutorial-09-http_file_server.cc index 151ac087..f4d4722c 100644 --- a/tutorial/tutorial-09-http_file_server.cc +++ b/tutorial/tutorial-09-http_file_server.cc @@ -27,6 +27,7 @@ #include "workflow/WFHttpServer.h" #include "workflow/WFTaskFactory.h" #include "workflow/Workflow.h" +#include "workflow/WFFacilities.h" using namespace protocol; @@ -87,7 +88,12 @@ void process(WFHttpTask *server_task, const char *root) } } -void sig_handler(int signo) { } +static WFFacilities::WaitGroup wait_group(1); + +void sig_handler(int signo) +{ + wait_group.done(); +} int main(int argc, char *argv[]) { @@ -113,7 +119,7 @@ int main(int argc, char *argv[]) if (ret == 0) { - pause(); + wait_group.wait(); server.stop(); } else diff --git a/tutorial/tutorial-10-user_defined_protocol/client.cc b/tutorial/tutorial-10-user_defined_protocol/client.cc index 98faaee9..9ea70fbb 100644 --- a/tutorial/tutorial-10-user_defined_protocol/client.cc +++ b/tutorial/tutorial-10-user_defined_protocol/client.cc @@ -18,10 +18,9 @@ #include #include -#include -#include #include "workflow/Workflow.h" #include "workflow/WFTaskFactory.h" +#include "workflow/WFFacilities.h" #include "message.h" using WFTutorialTask = WFNetworkTaskget_resp()->set_size_limit(4 * 1024); - Workflow::start_series_work(task, [&mutex, &cond, &finished](const SeriesWork *) - { - mutex.lock(); - finished = true; - cond.notify_one(); - mutex.unlock(); + Workflow::start_series_work(task, [&wait_group](const SeriesWork *) { + wait_group.done(); }); - std::unique_lock lock(mutex); - while (!finished) - cond.wait(lock); - - lock.unlock(); + wait_group.wait(); return 0; } diff --git a/tutorial/tutorial-10-user_defined_protocol/server.cc b/tutorial/tutorial-10-user_defined_protocol/server.cc index 0166c9e1..874b9ca2 100644 --- a/tutorial/tutorial-10-user_defined_protocol/server.cc +++ b/tutorial/tutorial-10-user_defined_protocol/server.cc @@ -20,10 +20,10 @@ #include #include #include -#include #include "workflow/Workflow.h" #include "workflow/WFTaskFactory.h" #include "workflow/WFServer.h" +#include "workflow/WFFacilities.h" #include "message.h" using WFTutorialTask = WFNetworkTaskset_message_body(body, size); } -void sig_handler(int signo) { } +static WFFacilities::WaitGroup wait_group(1); + +void sig_handler(int signo) +{ + wait_group.done(); +} int main(int argc, char *argv[]) { @@ -70,7 +75,7 @@ int main(int argc, char *argv[]) if (server.start(AF_INET6, port) == 0 || server.start(AF_INET, port) == 0) { - pause(); + wait_group.wait(); server.stop(); } else diff --git a/tutorial/tutorial-12-mysql_cli.cc b/tutorial/tutorial-12-mysql_cli.cc index d071c336..733cc4f1 100644 --- a/tutorial/tutorial-12-mysql_cli.cc +++ b/tutorial/tutorial-12-mysql_cli.cc @@ -20,25 +20,19 @@ #include #include #include +#include #include #include -#include -#include - -#include #include "workflow/Workflow.h" #include "workflow/WFTaskFactory.h" #include "workflow/MySQLResult.h" +#include "workflow/WFFacilities.h" using namespace protocol; #define RETRY_MAX 0 -std::mutex mutex; -std::condition_variable cond; - -bool task_finished; -bool stop_flag; +volatile bool stop_flag; void mysql_callback(WFMySQLTask *task); @@ -213,15 +207,6 @@ void mysql_callback(WFMySQLTask *task) return; } -void series_callback(const SeriesWork *series) -{ - /* signal the main() to continue */ - mutex.lock(); - task_finished = true; - cond.notify_one(); - mutex.unlock(); -} - static void sighandler(int signo) { stop_flag = true; @@ -252,15 +237,16 @@ int main(int argc, char *argv[]) task = WFTaskFactory::create_mysql_task(url, RETRY_MAX, mysql_callback); task->get_req()->set_query(query); - task_finished = false; - SeriesWork *series = Workflow::create_series_work(task, series_callback); + WFFacilities::WaitGroup wait_group(1); + SeriesWork *series = Workflow::create_series_work(task, + [&wait_group](const SeriesWork *series) { + wait_group.done(); + }); + series->set_context(&url); series->start(); - std::unique_lock lock(mutex); - while (task_finished == false) - cond.wait(lock); - lock.unlock(); + wait_group.wait(); return 0; }