Add WFSelectorTask.

This commit is contained in:
Xie Han
2024-04-09 16:45:56 +08:00
parent 9abfe7125f
commit 5ec492fa98
4 changed files with 151 additions and 6 deletions

View File

@@ -485,10 +485,7 @@ public:
}
}
void **get_mailbox() const
{
return this->mailbox;
}
void **get_mailbox() const { return this->mailbox; }
public:
void set_callback(std::function<void (WFMailboxTask *)> cb)
@@ -542,6 +539,99 @@ protected:
virtual ~WFMailboxTask() { }
};
class WFSelectorTask : public WFGenericTask
{
public:
virtual int submit(void *msg)
{
void *tmp = NULL;
int ret = 0;
if (this->message.compare_exchange_strong(tmp, msg) && msg)
{
ret = 1;
if (this->flag.exchange(true))
{
this->state = WFT_STATE_SUCCESS;
this->subtask_done();
}
}
if (--this->nleft == 0)
{
if (!this->message)
{
this->state = WFT_STATE_SYS_ERROR;
this->error = ENOMSG;
this->subtask_done();
}
delete this;
}
return ret;
}
void *get_message() const { return this->message; }
public:
void set_callback(std::function<void (WFSelectorTask *)> cb)
{
this->callback = std::move(cb);
}
protected:
virtual void dispatch()
{
if (this->flag.exchange(true))
{
this->state = WFT_STATE_SUCCESS;
this->subtask_done();
}
if (--this->nleft == 0)
{
if (!this->message)
{
this->state = WFT_STATE_SYS_ERROR;
this->error = ENOMSG;
this->subtask_done();
}
delete this;
}
}
virtual SubTask *done()
{
SeriesWork *series = series_of(this);
if (this->callback)
this->callback(this);
return series->pop();
}
protected:
std::atomic<void *> message;
std::atomic<bool> flag;
std::atomic<size_t> nleft;
std::function<void (WFSelectorTask *)> callback;
public:
WFSelectorTask(size_t candidates,
std::function<void (WFSelectorTask *)>&& cb) :
message(NULL),
flag(false),
nleft(candidates + 1),
callback(std::move(cb))
{
}
protected:
virtual ~WFSelectorTask() { }
};
class WFConditional : public WFGenericTask
{
public:

View File

@@ -89,9 +89,10 @@ using fsync_callback_t = std::function<void (WFFileSyncTask *)>;
using timer_callback_t = std::function<void (WFTimerTask *)>;
using counter_callback_t = std::function<void (WFCounterTask *)>;
// Mailbox is like counter with data passing
using mailbox_callback_t = std::function<void (WFMailboxTask *)>;
using selector_callback_t = std::function<void (WFSelectorTask *)>;
// Graph (DAG) task.
using graph_callback_t = std::function<void (WFGraphTask *)>;
@@ -296,6 +297,13 @@ public:
static void send_by_name(const std::string& mailbox_name, void *msg,
size_t max);
public:
static WFSelectorTask *create_selector_task(size_t candidates,
selector_callback_t callback)
{
return new WFSelectorTask(candidates, std::move(callback));
}
public:
static WFConditional *create_conditional(SubTask *task, void **msgbuf)
{
@@ -336,7 +344,7 @@ public:
return WFTaskFactory::release_guard(resource_name, NULL);
}
static int release_guard(const std::string& resaource_name, void *msg);
static int release_guard(const std::string& resource_name, void *msg);
static int release_guard_safe(const std::string& resource_name)
{