mirror of
https://github.com/sogou/workflow.git
synced 2026-02-08 01:33:17 +08:00
Add WFMessageQueue
This commit is contained in:
1
BUILD
1
BUILD
@@ -45,6 +45,7 @@ cc_library(
|
||||
'src/factory/FileTaskImpl.cc',
|
||||
'src/factory/WFGraphTask.cc',
|
||||
'src/factory/WFResourcePool.cc',
|
||||
'src/factory/WFMessageQueue.cc',
|
||||
'src/factory/WFTaskFactory.cc',
|
||||
'src/factory/Workflow.cc',
|
||||
'src/manager/DnsCache.cc',
|
||||
|
||||
@@ -90,6 +90,7 @@ set(INCLUDE_HEADERS
|
||||
src/factory/Workflow.h
|
||||
src/factory/WFOperator.h
|
||||
src/factory/WFResourcePool.h
|
||||
src/factory/WFMessageQueue.h
|
||||
src/nameservice/WFNameService.h
|
||||
src/nameservice/WFDnsResolver.h
|
||||
src/nameservice/WFServiceGovernance.h
|
||||
|
||||
@@ -8,6 +8,7 @@ set(SRC
|
||||
Workflow.cc
|
||||
HttpTaskImpl.cc
|
||||
WFResourcePool.cc
|
||||
WFMessageQueue.cc
|
||||
FileTaskImpl.cc
|
||||
)
|
||||
|
||||
|
||||
94
src/factory/WFMessageQueue.cc
Normal file
94
src/factory/WFMessageQueue.cc
Normal file
@@ -0,0 +1,94 @@
|
||||
/*
|
||||
Copyright (c) 2022 Sogou, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
Authors: Xie Han (xiehan@sogou-inc.com)
|
||||
*/
|
||||
|
||||
#include "list.h"
|
||||
#include "WFTask.h"
|
||||
#include "WFMessageQueue.h"
|
||||
|
||||
class __MQConditional : public WFConditional
|
||||
{
|
||||
public:
|
||||
struct list_head list;
|
||||
struct WFMessageQueue::Data *data;
|
||||
|
||||
public:
|
||||
virtual void dispatch();
|
||||
virtual void signal(void *msg) { }
|
||||
|
||||
public:
|
||||
__MQConditional(SubTask *task, void **msgbuf,
|
||||
struct WFMessageQueue::Data *data) :
|
||||
WFConditional(task, msgbuf)
|
||||
{
|
||||
this->data = data;
|
||||
}
|
||||
|
||||
__MQConditional(SubTask *task,
|
||||
struct WFMessageQueue::Data *data) :
|
||||
WFConditional(task)
|
||||
{
|
||||
this->data = data;
|
||||
}
|
||||
};
|
||||
|
||||
void __MQConditional::dispatch()
|
||||
{
|
||||
struct WFMessageQueue::Data *data = this->data;
|
||||
|
||||
data->mutex.lock();
|
||||
if (!list_empty(&data->msg_list))
|
||||
this->WFConditional::signal(data->pop());
|
||||
else
|
||||
list_add_tail(&this->list, &data->wait_list);
|
||||
|
||||
data->mutex.unlock();
|
||||
this->WFConditional::dispatch();
|
||||
}
|
||||
|
||||
WFConditional *WFMessageQueue::get(SubTask *task, void **msgbuf)
|
||||
{
|
||||
return new __MQConditional(task, msgbuf, &this->data);
|
||||
}
|
||||
|
||||
WFConditional *WFMessageQueue::get(SubTask *task)
|
||||
{
|
||||
return new __MQConditional(task, &this->data);
|
||||
}
|
||||
|
||||
void WFMessageQueue::post(void *msg)
|
||||
{
|
||||
struct WFMessageQueue::Data *data = &this->data;
|
||||
WFConditional *cond;
|
||||
|
||||
data->mutex.lock();
|
||||
if (!list_empty(&data->wait_list))
|
||||
{
|
||||
cond = list_entry(data->wait_list.next, __MQConditional, list);
|
||||
list_del(data->wait_list.next);
|
||||
}
|
||||
else
|
||||
{
|
||||
cond = NULL;
|
||||
this->push(msg);
|
||||
}
|
||||
|
||||
data->mutex.unlock();
|
||||
if (cond)
|
||||
cond->WFConditional::signal(msg);
|
||||
}
|
||||
|
||||
88
src/factory/WFMessageQueue.h
Normal file
88
src/factory/WFMessageQueue.h
Normal file
@@ -0,0 +1,88 @@
|
||||
/*
|
||||
Copyright (c) 2022 Sogou, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
Authors: Xie Han (xiehan@sogou-inc.com)
|
||||
*/
|
||||
|
||||
#ifndef _WFMESSAGEQUEUE_H_
|
||||
#define _WFMESSAGEQUEUE_H_
|
||||
|
||||
#include <mutex>
|
||||
#include "list.h"
|
||||
#include "WFTask.h"
|
||||
|
||||
class WFMessageQueue
|
||||
{
|
||||
public:
|
||||
WFConditional *get(SubTask *task, void **msgbuf);
|
||||
WFConditional *get(SubTask *task);
|
||||
void post(void *msg);
|
||||
|
||||
public:
|
||||
struct Data
|
||||
{
|
||||
void *pop() { return this->queue->pop(); }
|
||||
void push(void *msg) { this->queue->push(msg); }
|
||||
|
||||
struct list_head msg_list;
|
||||
struct list_head wait_list;
|
||||
std::mutex mutex;
|
||||
WFMessageQueue *queue;
|
||||
};
|
||||
|
||||
protected:
|
||||
struct MessageEntry
|
||||
{
|
||||
struct list_head list;
|
||||
void *msg;
|
||||
};
|
||||
|
||||
protected:
|
||||
virtual void *pop()
|
||||
{
|
||||
struct MessageEntry *entry;
|
||||
void *msg;
|
||||
|
||||
entry = list_entry(this->data.msg_list.next, struct MessageEntry, list);
|
||||
list_del(&entry->list);
|
||||
msg = entry->msg;
|
||||
delete entry;
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
virtual void push(void *msg)
|
||||
{
|
||||
struct MessageEntry *entry = new struct MessageEntry;
|
||||
entry->msg = msg;
|
||||
list_add_tail(&entry->list, &this->data.msg_list);
|
||||
}
|
||||
|
||||
protected:
|
||||
struct Data data;
|
||||
|
||||
public:
|
||||
WFMessageQueue()
|
||||
{
|
||||
INIT_LIST_HEAD(&this->data.msg_list);
|
||||
INIT_LIST_HEAD(&this->data.wait_list);
|
||||
this->data.queue = this;
|
||||
}
|
||||
|
||||
virtual ~WFMessageQueue() { }
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user