Merge branch 'master' of https://github.com/sogou/workflow into nossl

This commit is contained in:
Xie Han
2025-01-03 22:49:20 +08:00
5 changed files with 36 additions and 44 deletions

View File

@@ -58,12 +58,13 @@ public:
static WFConditional *create_conditional(const std::string& cond_name, SubTask *task, void **msgbuf);
static int signal_by_name(const std::string& cond_name, void *msg);
static int signal_by_name(const std::string& cond_name, void *msg, size_t max);
static int signal_by_name(const std::string& cond_name, void *const msg[], size_t max);
template<typename T>
static int signal_by_name(const std::string& cond_name, T *const msg[], size_t max);
};
~~~
我们看到与普通条件任务唯一区别是命名条件任务创建时需要传入一个cond_name。
而signal_by_name()接口默认将msg发送到所有在这个名称上等待的条件任务将它们全部唤醒。
也可以通过max参数指定唤醒的最大任务数。此时msg还可以是一个数组可给不同的条件任务发送不同的消息。
也可以通过max参数指定唤醒的最大任务数。此时msg还可以是一个指针数组,可给不同的条件任务发送不同的消息。
任何一个signal_by_name的重载函数其返回值都是表示实际唤醒的条件任务个数。
这就相当于实现了观察者模式。

View File

@@ -308,7 +308,7 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
snprintf(name, 64, "%p.cgroup", member);
member->mutex.unlock();
WFTaskFactory::signal_by_name(name, (void *)NULL, max);
WFTaskFactory::signal_by_name(name, NULL, max);
}
else
{

View File

@@ -707,6 +707,7 @@ int WFTaskFactory::send_by_name(const std::string& name, void *msg,
return __mailbox_map.send(name, &msg, max, 0);
}
template<>
int WFTaskFactory::send_by_name(const std::string& name, void *const msg[],
size_t max)
{
@@ -903,6 +904,7 @@ int WFTaskFactory::signal_by_name(const std::string& name, void *msg,
return __conditional_map.signal(name, &msg, max, 0);
}
template<>
int WFTaskFactory::signal_by_name(const std::string& name, void *const msg[],
size_t max)
{

View File

@@ -23,7 +23,6 @@
#include <sys/types.h>
#include <sys/uio.h>
#include <time.h>
#include <stdint.h>
#include <utility>
#include <functional>
#include "URIParser.h"
@@ -253,8 +252,7 @@ public:
/* Count by a counter's name. When count_by_name(), it's safe to count
* exceeding target_value. When multiple counters share a same name,
* this operation will be performed on the first created. If no counter
* matches the name, nothing is performed. */
* this operation will be performed on the first created. */
static int count_by_name(const std::string& counter_name)
{
return WFTaskFactory::count_by_name(counter_name, 1);
@@ -295,7 +293,8 @@ public:
static int send_by_name(const std::string& mailbox_name, void *msg,
size_t max);
static int send_by_name(const std::string& mailbox_name, void *const msg[],
template<typename T>
static int send_by_name(const std::string& mailbox_name, T *const msg[],
size_t max);
public:
@@ -330,7 +329,8 @@ public:
static int signal_by_name(const std::string& cond_name, void *msg,
size_t max);
static int signal_by_name(const std::string& cond_name, void *const msg[],
template<typename T>
static int signal_by_name(const std::string& cond_name, T *const msg[],
size_t max);
public:
@@ -408,37 +408,6 @@ public:
task->sub_series()->set_last_task(last);
return task;
}
private:
/* Some compilers don't declare 'nullptr_t' although required by C++11. */
using nullptr_t = std::nullptr_t;
public:
/* The following functions are for overload resolution only. */
static int send_by_name(const std::string& mailbox_name, intptr_t msg,
size_t max)
{
return WFTaskFactory::send_by_name(mailbox_name, (void *)msg, max);
}
static int send_by_name(const std::string& mailbox_name, nullptr_t msg,
size_t max)
{
return WFTaskFactory::send_by_name(mailbox_name, (void *)0, max);
}
static int signal_by_name(const std::string& cond_name, intptr_t msg,
size_t max)
{
return WFTaskFactory::signal_by_name(cond_name, (void *)msg, max);
}
static int signal_by_name(const std::string& cond_name, nullptr_t msg,
size_t max)
{
return WFTaskFactory::signal_by_name(cond_name, (void *)0, max);
}
};
template<class REQ, class RESP>

View File

@@ -62,6 +62,26 @@ WFTaskFactory::create_dynamic_task(dynamic_create_t create)
return new __WFDynamicTask(std::move(create));
}
template<>
int WFTaskFactory::send_by_name(const std::string&, void *const *, size_t);
template<typename T>
int WFTaskFactory::send_by_name(const std::string& mailbox_name, T *const msg[],
size_t max)
{
return WFTaskFactory::send_by_name(mailbox_name, (void *const *)msg, max);
}
template<>
int WFTaskFactory::signal_by_name(const std::string&, void *const *, size_t);
template<typename T>
int WFTaskFactory::signal_by_name(const std::string& cond_name, T *const msg[],
size_t max)
{
return WFTaskFactory::signal_by_name(cond_name, (void *const *)msg, max);
}
template<class REQ, class RESP, typename CTX = bool>
class WFComplexClientTask : public WFClientTask<REQ, RESP>
{
@@ -680,7 +700,7 @@ void WFTaskFactory::reset_go_task(WFGoTask *task, FUNC&& func, ARGS&&... args)
template<> inline
WFGoTask *WFTaskFactory::create_go_task(const std::string& queue_name,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFGoTask(WFGlobal::get_exec_queue(queue_name),
WFGlobal::get_compute_executor(),
@@ -690,7 +710,7 @@ WFGoTask *WFTaskFactory::create_go_task(const std::string& queue_name,
template<> inline
WFGoTask *WFTaskFactory::create_timedgo_task(time_t seconds, long nanoseconds,
const std::string& queue_name,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFTimedGoTask(seconds, nanoseconds,
WFGlobal::get_exec_queue(queue_name),
@@ -700,7 +720,7 @@ WFGoTask *WFTaskFactory::create_timedgo_task(time_t seconds, long nanoseconds,
template<> inline
WFGoTask *WFTaskFactory::create_go_task(ExecQueue *queue, Executor *executor,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFGoTask(queue, executor, nullptr);
}
@@ -708,13 +728,13 @@ WFGoTask *WFTaskFactory::create_go_task(ExecQueue *queue, Executor *executor,
template<> inline
WFGoTask *WFTaskFactory::create_timedgo_task(time_t seconds, long nanoseconds,
ExecQueue *queue, Executor *executor,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFTimedGoTask(seconds, nanoseconds, queue, executor, nullptr);
}
template<> inline
void WFTaskFactory::reset_go_task(WFGoTask *task, nullptr_t&& func)
void WFTaskFactory::reset_go_task(WFGoTask *task, std::nullptr_t&&)
{
((__WFGoTask *)task)->set_go_func(nullptr);
}