Add WFRmoveTask and WFUniqueTask.

This commit is contained in:
XieHan
2023-06-04 02:35:26 +08:00
parent ca8812f6a5
commit 6fbcbe7fcf
2 changed files with 127 additions and 0 deletions

View File

@@ -73,6 +73,35 @@ struct ShuffleOutput
T *last;
};
template<typename T>
struct RemoveInput
{
T *first;
T *last;
T value;
};
template<typename T>
struct RemoveOutput
{
T *first;
T *last;
};
template<typename T>
struct UniqueInput
{
T *first;
T *last;
};
template<typename T>
struct UniqueOutput
{
T *first;
T *last;
};
template<typename KEY = std::string, typename VAL = std::string>
using ReduceInput = std::vector<std::pair<KEY, VAL>>;
@@ -99,6 +128,18 @@ using WFShuffleTask = WFThreadTask<algorithm::ShuffleInput<T>,
template<typename T>
using shuffle_callback_t = std::function<void (WFShuffleTask<T> *)>;
template<typename T>
using WFRemoveTask = WFThreadTask<algorithm::RemoveInput<T>,
algorithm::RemoveOutput<T>>;
template<typename T>
using remove_callback_t = std::function<void (WFRemoveTask<T> *)>;
template<typename T>
using WFUniqueTask = WFThreadTask<algorithm::UniqueInput<T>,
algorithm::UniqueOutput<T>>;
template<typename T>
using unique_callback_t = std::function<void (WFUniqueTask<T> *)>;
template<typename KEY = std::string, typename VAL = std::string>
using WFReduceTask = WFThreadTask<algorithm::ReduceInput<KEY, VAL>,
algorithm::ReduceOutput<KEY, VAL>>;
@@ -156,6 +197,17 @@ public:
URBG generator,
CB callback);
template<typename T, class CB = remove_callback_t<T>>
static WFRemoveTask<T> *create_remove_task(const std::string& queue_name,
T *first, T *last,
T value,
CB callback);
template<typename T, class CB = unique_callback_t<T>>
static WFUniqueTask<T> *create_unique_task(const std::string& queue_name,
T *first, T *last,
CB callback);
template<typename KEY = std::string, typename VAL = std::string,
class RED = algorithm::reduce_function_t<KEY, VAL>,
class CB = reduce_callback_t<KEY, VAL>>

View File

@@ -502,6 +502,81 @@ WFShuffleTask<T> *WFAlgoTaskFactory::create_shuffle_task(const std::string& name
std::move(callback));
}
/****************** Remove ******************/
template<typename T>
class __WFRemoveTask : public WFRemoveTask<T>
{
protected:
virtual void execute()
{
this->output.last = std::remove(this->input.first, this->input.last,
this->input.value);
this->output.first = this->input.first;
}
public:
__WFRemoveTask(ExecQueue *queue, Executor *executor,
T *first, T *last, T&& value,
remove_callback_t<T>&& cb) :
WFRemoveTask<T>(queue, executor, std::move(cb))
{
this->input.first = first;
this->input.last = last;
this->input.value = std::move(value);
this->output.first = NULL;
this->output.last = NULL;
}
};
template<typename T, class CB>
WFRemoveTask<T> *WFAlgoTaskFactory::create_remove_task(const std::string& name,
T *first, T *last,
T value,
CB callback)
{
return new __WFRemoveTask<T>(WFGlobal::get_exec_queue(name),
WFGlobal::get_compute_executor(),
first, last, std::move(value),
std::move(callback));
}
/****************** Unique ******************/
template<typename T>
class __WFUniqueTask : public WFUniqueTask<T>
{
protected:
virtual void execute()
{
this->output.last = std::unique(this->input.first, this->input.last);
this->output.first = this->input.first;
}
public:
__WFUniqueTask(ExecQueue *queue, Executor *executor,
T *first, T *last,
unique_callback_t<T>&& cb) :
WFUniqueTask<T>(queue, executor, std::move(cb))
{
this->input.first = first;
this->input.last = last;
this->output.first = NULL;
this->output.last = NULL;
}
};
template<typename T, class CB>
WFUniqueTask<T> *WFAlgoTaskFactory::create_unique_task(const std::string& name,
T *first, T *last,
CB callback)
{
return new __WFUniqueTask<T>(WFGlobal::get_exec_queue(name),
WFGlobal::get_compute_executor(),
first, last,
std::move(callback));
}
/****************** MapReduce ******************/
template<typename KEY, typename VAL>