pause() replace by WaitGroup

This commit is contained in:
alpc62
2020-08-05 14:25:15 +08:00
parent ee2b426e2f
commit a375a71bd0
12 changed files with 88 additions and 122 deletions

View File

@@ -17,7 +17,6 @@
*/
#include <netdb.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
@@ -26,6 +25,7 @@
#include "workflow/HttpMessage.h"
#include "workflow/HttpUtil.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
#define REDIRECT_MAX 5
#define RETRY_MAX 2
@@ -97,7 +97,12 @@ void wget_callback(WFHttpTask *task)
fprintf(stderr, "\nSuccess. Press Ctrl-C to exit.\n");
}
void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);
void sig_handler(int signo)
{
wait_group.done();
}
int main(int argc, char *argv[])
{
@@ -125,7 +130,8 @@ int main(int argc, char *argv[])
req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
req->add_header_pair("Connection", "close");
task->start();
pause();
wait_group.wait();
return 0;
}

View File

@@ -17,7 +17,6 @@
*/
#include <netdb.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
@@ -25,6 +24,7 @@
#include <string>
#include "workflow/RedisMessage.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
#define RETRY_MAX 2
@@ -102,7 +102,12 @@ void redis_callback(WFRedisTask *task)
}
}
void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);
void sig_handler(int signo)
{
wait_group.done();
}
int main(int argc, char *argv[])
{
@@ -145,7 +150,8 @@ int main(int argc, char *argv[])
* Workflow::start_series_work(task, nullptr) or
* Workflow::create_series_work(task, nullptr)->start() */
task->start();
pause();
wait_group.wait();
return 0;
}

View File

@@ -18,18 +18,16 @@
/* Tuturial-03. Store wget result in redis: key=URL, value=Http Body*/
#include <netdb.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <string>
#include <mutex>
#include <condition_variable>
#include "workflow/HttpMessage.h"
#include "workflow/HttpUtil.h"
#include "workflow/RedisMessage.h"
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
using namespace protocol;
@@ -110,9 +108,6 @@ void http_callback(WFHttpTask *task)
int main(int argc, char *argv[])
{
std::mutex mutex;
std::condition_variable cond;
bool finished = false;
WFHttpTask *http_task;
if (argc != 3)
@@ -152,7 +147,9 @@ int main(int argc, char *argv[])
/* no more than 30 seconds receiving http response. */
http_task->set_receive_timeout(30 * 1000);
auto series_callback = [&mutex, &cond, &finished](const SeriesWork *series)
WFFacilities::WaitGroup wait_group(1);
auto series_callback = [&wait_group](const SeriesWork *series)
{
tutorial_series_context *context = (tutorial_series_context *)
series->get_context();
@@ -163,10 +160,7 @@ int main(int argc, char *argv[])
fprintf(stderr, "Series finished. failed!\n");
/* signal the main() to terminate */
mutex.lock();
finished = true;
cond.notify_one();
mutex.unlock();
wait_group.done();
};
/* Create a series */
@@ -175,10 +169,7 @@ int main(int argc, char *argv[])
series->set_context(&context);
series->start();
std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
wait_group.wait();
return 0;
}

View File

@@ -21,7 +21,6 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -30,6 +29,7 @@
#include "workflow/HttpUtil.h"
#include "workflow/WFServer.h"
#include "workflow/WFHttpServer.h"
#include "workflow/WFFacilities.h"
void process(WFHttpTask *server_task)
{
@@ -92,7 +92,12 @@ void process(WFHttpTask *server_task)
addrstr, port, seq);
}
void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);
void sig_handler(int signo)
{
wait_group.done();
}
int main(int argc, char *argv[])
{
@@ -110,7 +115,7 @@ int main(int argc, char *argv[])
port = atoi(argv[1]);
if (server.start(port) == 0)
{
pause();
wait_group.wait();
server.stop();
}
else

View File

@@ -17,7 +17,6 @@
*/
#include <signal.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <utility>
@@ -25,6 +24,7 @@
#include "workflow/HttpMessage.h"
#include "workflow/HttpUtil.h"
#include "workflow/WFHttpServer.h"
#include "workflow/WFFacilities.h"
struct tutorial_series_context
{
@@ -137,7 +137,12 @@ void process(WFHttpTask *proxy_task)
*series << http_task;
}
void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);
void sig_handler(int signo)
{
wait_group.done();
}
int main(int argc, char *argv[])
{
@@ -160,7 +165,7 @@ int main(int argc, char *argv[])
if (server.start(port) == 0)
{
pause();
wait_group.wait();
server.stop();
}
else

View File

@@ -20,12 +20,11 @@
#include <string.h>
#include <utility>
#include <string>
#include <mutex>
#include <condition_variable>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/HttpMessage.h"
#include "workflow/HttpUtil.h"
#include "workflow/WFFacilities.h"
using namespace protocol;
@@ -106,23 +105,13 @@ int main(int argc, char *argv[])
pwork->add_series(series);
}
std::mutex mutex;
std::condition_variable cond;
bool finished = false;
WFFacilities::WaitGroup wait_group(1);
Workflow::start_series_work(pwork,
[&mutex, &cond, &finished](const SeriesWork *)
{
mutex.lock();
finished = true;
cond.notify_one();
mutex.unlock();
Workflow::start_series_work(pwork, [&wait_group](const SeriesWork *) {
wait_group.done();
});
std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
wait_group.wait();
return 0;
}

View File

@@ -18,17 +18,15 @@
#include <stdlib.h>
#include <stdio.h>
#include <mutex>
#include <condition_variable>
#include "workflow/WFTaskFactory.h"
bool use_parallel_sort = false;
bool finished = false;
std::mutex mutex;
std::condition_variable cond;
#include "workflow/WFFacilities.h"
using namespace algorithm;
static WFFacilities::WaitGroup wait_group(1);
bool use_parallel_sort = false;
void callback(WFSortTask<int> *task)
{
/* Sort task's input and output are identical. */
@@ -60,12 +58,7 @@ void callback(WFSortTask<int> *task)
printf("Sort reversely:\n");
}
else
{
mutex.lock();
finished = true;
cond.notify_one();
mutex.unlock();
}
wait_group.done();
}
int main(int argc, char *argv[])
@@ -112,11 +105,7 @@ int main(int argc, char *argv[])
printf("Sort result:\n");
task->start();
std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
wait_group.wait();
free(array);
return 0;
}

View File

@@ -21,9 +21,8 @@
#include <errno.h>
#include <string.h>
#include <vector>
#include <mutex>
#include <condition_variable>
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
namespace algorithm
{
@@ -146,23 +145,13 @@ int main()
input->a = {{1, 2, 3}, {4, 5, 6}};
input->b = {{7, 8}, {9, 10}, {11, 12}};
std::mutex mutex;
std::condition_variable cond;
bool finished = false;
WFFacilities::WaitGroup wait_group(1);
Workflow::start_series_work(task,
[&mutex, &cond, &finished](const SeriesWork *)
{
mutex.lock();
finished = true;
cond.notify_one();
mutex.unlock();
Workflow::start_series_work(task, [&wait_group](const SeriesWork *) {
wait_group.done();
});
std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
wait_group.wait();
return 0;
}

View File

@@ -27,6 +27,7 @@
#include "workflow/WFHttpServer.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/Workflow.h"
#include "workflow/WFFacilities.h"
using namespace protocol;
@@ -87,7 +88,12 @@ void process(WFHttpTask *server_task, const char *root)
}
}
void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);
void sig_handler(int signo)
{
wait_group.done();
}
int main(int argc, char *argv[])
{
@@ -113,7 +119,7 @@ int main(int argc, char *argv[])
if (ret == 0)
{
pause();
wait_group.wait();
server.stop();
}
else

View File

@@ -18,10 +18,9 @@
#include <string.h>
#include <stdio.h>
#include <mutex>
#include <condition_variable>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
#include "message.h"
using WFTutorialTask = WFNetworkTask<protocol::TutorialRequest,
@@ -49,9 +48,6 @@ public:
int main(int argc, char *argv[])
{
std::mutex mutex;
std::condition_variable cond;
bool finished = false;
unsigned short port;
std::string host;
@@ -104,21 +100,14 @@ int main(int argc, char *argv[])
};
/* First request is emtpy. We will ignore the server response. */
WFFacilities::WaitGroup wait_group(1);
WFTutorialTask *task = MyFactory::create_tutorial_task(host, port, 0, callback);
task->get_resp()->set_size_limit(4 * 1024);
Workflow::start_series_work(task, [&mutex, &cond, &finished](const SeriesWork *)
{
mutex.lock();
finished = true;
cond.notify_one();
mutex.unlock();
Workflow::start_series_work(task, [&wait_group](const SeriesWork *) {
wait_group.done();
});
std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
wait_group.wait();
return 0;
}

View File

@@ -20,10 +20,10 @@
#include <stdio.h>
#include <ctype.h>
#include <signal.h>
#include <unistd.h>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFServer.h"
#include "workflow/WFFacilities.h"
#include "message.h"
using WFTutorialTask = WFNetworkTask<protocol::TutorialRequest,
@@ -48,7 +48,12 @@ void process(WFTutorialTask *task)
resp->set_message_body(body, size);
}
void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);
void sig_handler(int signo)
{
wait_group.done();
}
int main(int argc, char *argv[])
{
@@ -70,7 +75,7 @@ int main(int argc, char *argv[])
if (server.start(AF_INET6, port) == 0 ||
server.start(AF_INET, port) == 0)
{
pause();
wait_group.wait();
server.stop();
}
else

View File

@@ -20,25 +20,19 @@
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <vector>
#include <map>
#include <mutex>
#include <condition_variable>
#include <signal.h>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/MySQLResult.h"
#include "workflow/WFFacilities.h"
using namespace protocol;
#define RETRY_MAX 0
std::mutex mutex;
std::condition_variable cond;
bool task_finished;
bool stop_flag;
volatile bool stop_flag;
void mysql_callback(WFMySQLTask *task);
@@ -213,15 +207,6 @@ void mysql_callback(WFMySQLTask *task)
return;
}
void series_callback(const SeriesWork *series)
{
/* signal the main() to continue */
mutex.lock();
task_finished = true;
cond.notify_one();
mutex.unlock();
}
static void sighandler(int signo)
{
stop_flag = true;
@@ -252,15 +237,16 @@ int main(int argc, char *argv[])
task = WFTaskFactory::create_mysql_task(url, RETRY_MAX, mysql_callback);
task->get_req()->set_query(query);
task_finished = false;
SeriesWork *series = Workflow::create_series_work(task, series_callback);
WFFacilities::WaitGroup wait_group(1);
SeriesWork *series = Workflow::create_series_work(task,
[&wait_group](const SeriesWork *series) {
wait_group.done();
});
series->set_context(&url);
series->start();
std::unique_lock<std::mutex> lock(mutex);
while (task_finished == false)
cond.wait(lock);
lock.unlock();
wait_group.wait();
return 0;
}