add WFResourcePool

This commit is contained in:
holmes1412
2021-07-21 14:47:18 +08:00
parent 8f12e1f45a
commit c28469a398
5 changed files with 170 additions and 1 deletions

View File

@@ -68,7 +68,7 @@ if (WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /std:c++14")
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions -Wno-invalid-offsetof")
endif ()
add_subdirectory(src)

View File

@@ -88,6 +88,7 @@ set(INCLUDE_HEADERS
src/factory/WFAlgoTaskFactory.inl
src/factory/Workflow.h
src/factory/WFOperator.h
src/factory/WFResourcePool.h
src/nameservice/WFNameService.h
src/nameservice/WFDnsResolver.h
src/nameservice/WFServiceGovernance.h

View File

@@ -7,6 +7,7 @@ set(SRC
WFTaskFactory.cc
Workflow.cc
HttpTaskImpl.cc
WFResourcePool.cc
)
if (NOT MYSQL STREQUAL "n")

View File

@@ -0,0 +1,83 @@
/*
Copyright (c) 2021 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Li Yingxin (liyingxin@sogou-inc.com)
Xie Han (xiehan@sogou-inc.com)
*/
#include "list.h"
#include "WFTask.h"
#include "WFResourcePool.h"
class __WFConditional : public WFConditional
{
public:
struct list_head list;
struct WFResourcePool::Data *data;
public:
virtual void dispatch();
virtual void signal(void *res) { }
public:
__WFConditional(SubTask *task, void **pres,
struct WFResourcePool::Data *data) :
WFConditional(task, pres)
{
this->data = data;
}
};
void __WFConditional::dispatch()
{
struct WFResourcePool::Data *data = this->data;
data->mutex.lock();
if (--data->value >= 0)
this->WFConditional::signal(data->pop());
else
list_add_tail(&this->list, &data->wait_list);
data->mutex.unlock();
this->WFConditional::dispatch();
}
WFConditional *WFResourcePool::get(SubTask *task, void **pres)
{
return new __WFConditional(task, pres, &this->data);
}
void WFResourcePool::post(void *res)
{
struct WFResourcePool::Data *data = &this->data;
WFConditional *cond;
data->mutex.lock();
if (++data->value <= 0)
{
cond = list_entry(data->wait_list.next, __WFConditional, list);
list_del(data->wait_list.next);
}
else
{
cond = NULL;
data->push(res);
}
data->mutex.unlock();
if (cond)
cond->WFConditional::signal(res);
}

View File

@@ -0,0 +1,84 @@
/*
Copyright (c) 2021 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Li Yingxin (liyingxin@sogou-inc.com)
Xie Han (xiehan@sogou-inc.com)
*/
#ifndef _WFRESOURCEPOOL_H_
#define _WFRESOURCEPOOL_H_
#include <mutex>
#include <atomic>
#include "list.h"
#include "WFTask.h"
class WFResourcePool
{
public:
WFConditional *get(SubTask *task, void **pres);
void post(void *res);
public:
struct Data
{
void **res;
std::atomic<int> value;
std::atomic<int> index;
struct list_head wait_list;
std::mutex mutex;
WFResourcePool *ptr;
virtual void *pop()
{
return this->ptr->pop();
}
virtual void push(void *res)
{
ptr->push(res);
}
};
private:
virtual void *pop()
{
return this->data.res[this->data.index++];
}
virtual void push(void *res)
{
this->data.res[--this->data.index] = res;
}
private:
struct Data data;
public:
WFResourcePool(void **res, int n)
{
this->data.ptr = this;
this->data.res = new void *[n];
memcpy(this->data.res, res, n * sizeof(void *));
this->data.value = n;
this->data.index = 0;
INIT_LIST_HEAD(&this->data.wait_list);
}
virtual ~WFResourcePool() { delete[] this->data.res; }
};
#endif