Add named mailbox task.

This commit is contained in:
Xie Han
2023-10-17 19:16:43 +08:00
parent f4d3c3923a
commit d99327c3f9
2 changed files with 211 additions and 26 deletions

View File

@@ -284,18 +284,18 @@ void __NamedTimerMap::cancel(const std::string& name, size_t max)
}
}
WFTimerTask *WFTaskFactory::create_timer_task(const std::string& timer_name,
WFTimerTask *WFTaskFactory::create_timer_task(const std::string& name,
time_t seconds, long nanoseconds,
timer_callback_t callback)
{
return __timer_map.create(timer_name, seconds, nanoseconds,
return __timer_map.create(name, seconds, nanoseconds,
WFGlobal::get_scheduler(),
std::move(callback));
}
void WFTaskFactory::cancel_by_name(const std::string& timer_name, size_t max)
void WFTaskFactory::cancel_by_name(const std::string& name, size_t max)
{
__timer_map.cancel(timer_name, max);
__timer_map.cancel(name, max);
}
/****************** Named Counter ******************/
@@ -452,26 +452,197 @@ void __NamedCounterMap::count(CounterList *counters,
task->WFCounterTask::count();
}
WFCounterTask *WFTaskFactory::create_counter_task(const std::string& counter_name,
WFCounterTask *WFTaskFactory::create_counter_task(const std::string& name,
unsigned int target_value,
counter_callback_t callback)
{
return __counter_map.create(counter_name, target_value, std::move(callback));
return __counter_map.create(name, target_value, std::move(callback));
}
void WFTaskFactory::count_by_name(const std::string& counter_name, unsigned int n)
void WFTaskFactory::count_by_name(const std::string& name, unsigned int n)
{
__counter_map.count_n(counter_name, n);
__counter_map.count_n(name, n);
}
/****************** Named Mailbox ******************/
class __WFNamedMailboxTask;
struct __mailbox_node
{
struct list_head list;
__WFNamedMailboxTask *task;
};
static class __NamedMailboxMap
{
public:
using MailboxList = struct __NamedObjectList<struct __mailbox_node>;
public:
WFMailboxTask *create(const std::string& name, void **mailbox,
mailbox_callback_t&& cb);
WFMailboxTask *create(const std::string& name, mailbox_callback_t&& cb);
void send(const std::string& name, void *msg, size_t max);
void send(MailboxList *mailboxes, struct __mailbox_node *node, void *msg);
void remove(MailboxList *mailboxes, struct __mailbox_node *node)
{
mutex_.lock();
mailboxes->del(node, &root_);
mutex_.unlock();
}
private:
void send_max_locked(MailboxList *mailboxes, void *msg, size_t max,
struct list_head *task_list);
struct rb_root root_;
std::mutex mutex_;
public:
__NamedMailboxMap()
{
root_.rb_node = NULL;
}
} __mailbox_map;
class __WFNamedMailboxTask : public WFMailboxTask
{
public:
__WFNamedMailboxTask(void **mailbox, mailbox_callback_t&& cb) :
WFMailboxTask(mailbox, std::move(cb))
{
node_.task = this;
}
__WFNamedMailboxTask(mailbox_callback_t&& cb) :
WFMailboxTask(std::move(cb))
{
node_.task = this;
}
void push_to(__NamedMailboxMap::MailboxList *mailboxes)
{
mailboxes->push_back(&node_);
mailboxes_ = mailboxes;
}
virtual void send(void *msg)
{
__mailbox_map.send(mailboxes_, &node_, msg);
}
virtual ~__WFNamedMailboxTask()
{
if (!this->flag)
__mailbox_map.remove(mailboxes_, &node_);
}
private:
struct __mailbox_node node_;
__NamedMailboxMap::MailboxList *mailboxes_;
};
WFMailboxTask *__NamedMailboxMap::create(const std::string& name,
void **mailbox,
mailbox_callback_t&& cb)
{
auto *task = new __WFNamedMailboxTask(mailbox, std::move(cb));
mutex_.lock();
task->push_to(__get_object_list<MailboxList>(name, &root_, true));
mutex_.unlock();
return task;
}
WFMailboxTask *__NamedMailboxMap::create(const std::string& name,
mailbox_callback_t&& cb)
{
auto *task = new __WFNamedMailboxTask(std::move(cb));
mutex_.lock();
task->push_to(__get_object_list<MailboxList>(name, &root_, true));
mutex_.unlock();
return task;
}
void __NamedMailboxMap::send_max_locked(MailboxList *mailboxes,
void *msg, size_t max,
struct list_head *task_list)
{
if (max == (size_t)-1)
list_splice(&mailboxes->head, task_list);
else
{
do
{
if (max == 0)
return;
list_move_tail(mailboxes->head.next, task_list);
max--;
} while (!mailboxes->empty());
}
rb_erase(&mailboxes->rb, &root_);
delete mailboxes;
}
void __NamedMailboxMap::send(const std::string& name, void *msg, size_t max)
{
LIST_HEAD(task_list);
struct __mailbox_node *node;
MailboxList *mailboxes;
mutex_.lock();
mailboxes = __get_object_list<MailboxList>(name, &root_, false);
if (mailboxes)
send_max_locked(mailboxes, msg, max, &task_list);
mutex_.unlock();
while (!list_empty(&task_list))
{
node = list_entry(task_list.next, struct __mailbox_node, list);
list_del(&node->list);
node->task->WFMailboxTask::send(msg);
}
}
void __NamedMailboxMap::send(MailboxList *mailboxes,
struct __mailbox_node *node,
void *msg)
{
mutex_.lock();
mailboxes->del(node, &root_);
mutex_.unlock();
node->task->WFMailboxTask::send(msg);
}
WFMailboxTask *WFTaskFactory::create_mailbox_task(const std::string& name,
void **mailbox,
mailbox_callback_t callback)
{
return __mailbox_map.create(name, mailbox, std::move(callback));
}
WFMailboxTask *WFTaskFactory::create_mailbox_task(const std::string& name,
mailbox_callback_t callback)
{
return __mailbox_map.create(name, std::move(callback));
}
void WFTaskFactory::send_by_name(const std::string& name, void *msg, size_t max)
{
__mailbox_map.send(name, msg, max);
}
/****************** Named Conditional ******************/
class __WFNamedCondtional;
class __WFNamedConditional;
struct __conditional_node
{
struct list_head list;
__WFNamedCondtional *cond;
__WFNamedConditional *cond;
};
static class __NamedConditionalMap
@@ -508,16 +679,16 @@ public:
}
} __conditional_map;
class __WFNamedCondtional : public WFConditional
class __WFNamedConditional : public WFConditional
{
public:
__WFNamedCondtional(SubTask *task, void **msgbuf) :
__WFNamedConditional(SubTask *task, void **msgbuf) :
WFConditional(task, msgbuf)
{
node_.cond = this;
}
__WFNamedCondtional(SubTask *task) :
__WFNamedConditional(SubTask *task) :
WFConditional(task)
{
node_.cond = this;
@@ -534,7 +705,7 @@ public:
__conditional_map.signal(conds_, &node_, msg);
}
virtual ~__WFNamedCondtional()
virtual ~__WFNamedConditional()
{
if (!this->flag)
__conditional_map.remove(conds_, &node_);
@@ -548,7 +719,7 @@ private:
WFConditional *__NamedConditionalMap::create(const std::string& name,
SubTask *task, void **msgbuf)
{
auto *cond = new __WFNamedCondtional(task, msgbuf);
auto *cond = new __WFNamedConditional(task, msgbuf);
mutex_.lock();
cond->push_to(__get_object_list<ConditionalList>(name, &root_, true));
mutex_.unlock();
@@ -558,7 +729,7 @@ WFConditional *__NamedConditionalMap::create(const std::string& name,
WFConditional *__NamedConditionalMap::create(const std::string& name,
SubTask *task)
{
auto *cond = new __WFNamedCondtional(task);
auto *cond = new __WFNamedConditional(task);
mutex_.lock();
cond->push_to(__get_object_list<ConditionalList>(name, &root_, true));
mutex_.unlock();
@@ -617,22 +788,22 @@ void __NamedConditionalMap::signal(ConditionalList *conds,
node->cond->WFConditional::signal(msg);
}
WFConditional *WFTaskFactory::create_conditional(const std::string& cond_name,
WFConditional *WFTaskFactory::create_conditional(const std::string& name,
SubTask *task, void **msgbuf)
{
return __conditional_map.create(cond_name, task, msgbuf);
return __conditional_map.create(name, task, msgbuf);
}
WFConditional *WFTaskFactory::create_conditional(const std::string& cond_name,
WFConditional *WFTaskFactory::create_conditional(const std::string& name,
SubTask *task)
{
return __conditional_map.create(cond_name, task);
return __conditional_map.create(name, task);
}
void WFTaskFactory::signal_by_name(const std::string& cond_name, void *msg,
void WFTaskFactory::signal_by_name(const std::string& name, void *msg,
size_t max)
{
__conditional_map.signal(cond_name, msg, max);
__conditional_map.signal(name, msg, max);
}
/**************** Timed Go Task *****************/

View File

@@ -238,10 +238,7 @@ public:
static WFTimerTask *create_timer_task(unsigned int microseconds,
timer_callback_t callback);
/* Counter is like semaphore. The callback of counter is called when
* 'count' operations reach target_value & after the task is started.
* It's perfectly legal to call 'count' before the task is started. */
public:
/* Create an unnamed counter. Call counter->count() directly.
* NOTE: never call count() exceeding target_value. */
static WFCounterTask *create_counter_task(unsigned int target_value,
@@ -282,6 +279,23 @@ public:
return new WFMailboxTask(std::move(callback));
}
static WFMailboxTask *create_mailbox_task(const std::string& mailbox_name,
void **mailbox,
mailbox_callback_t callback);
static WFMailboxTask *create_mailbox_task(const std::string& mailbox_name,
mailbox_callback_t callback);
/* The 'msg' will be sent to the all mailbox tasks under the name, and
* would be lost if no task matched. */
static void send_by_name(const std::string& mailbox_name, void *msg)
{
WFTaskFactory::send_by_name(mailbox_name, msg, (size_t)-1);
}
static void send_by_name(const std::string& mailbox_name, void *msg,
size_t max);
public:
static WFConditional *create_conditional(SubTask *task, void **msgbuf)
{