diff --git a/src/factory/WFAlgoTaskFactory.h b/src/factory/WFAlgoTaskFactory.h index af8e0936..bbeda0af 100644 --- a/src/factory/WFAlgoTaskFactory.h +++ b/src/factory/WFAlgoTaskFactory.h @@ -73,6 +73,35 @@ struct ShuffleOutput T *last; }; +template +struct RemoveInput +{ + T *first; + T *last; + T value; +}; + +template +struct RemoveOutput +{ + T *first; + T *last; +}; + +template +struct UniqueInput +{ + T *first; + T *last; +}; + +template +struct UniqueOutput +{ + T *first; + T *last; +}; + template using ReduceInput = std::vector>; @@ -99,6 +128,18 @@ using WFShuffleTask = WFThreadTask, template using shuffle_callback_t = std::function *)>; +template +using WFRemoveTask = WFThreadTask, + algorithm::RemoveOutput>; +template +using remove_callback_t = std::function *)>; + +template +using WFUniqueTask = WFThreadTask, + algorithm::UniqueOutput>; +template +using unique_callback_t = std::function *)>; + template using WFReduceTask = WFThreadTask, algorithm::ReduceOutput>; @@ -156,6 +197,17 @@ public: URBG generator, CB callback); + template> + static WFRemoveTask *create_remove_task(const std::string& queue_name, + T *first, T *last, + T value, + CB callback); + + template> + static WFUniqueTask *create_unique_task(const std::string& queue_name, + T *first, T *last, + CB callback); + template, class CB = reduce_callback_t> diff --git a/src/factory/WFAlgoTaskFactory.inl b/src/factory/WFAlgoTaskFactory.inl index 05c4a4a8..fec80d47 100644 --- a/src/factory/WFAlgoTaskFactory.inl +++ b/src/factory/WFAlgoTaskFactory.inl @@ -502,6 +502,81 @@ WFShuffleTask *WFAlgoTaskFactory::create_shuffle_task(const std::string& name std::move(callback)); } +/****************** Remove ******************/ + +template +class __WFRemoveTask : public WFRemoveTask +{ +protected: + virtual void execute() + { + this->output.last = std::remove(this->input.first, this->input.last, + this->input.value); + this->output.first = this->input.first; + } + +public: + __WFRemoveTask(ExecQueue *queue, Executor *executor, + T *first, T *last, T&& value, + remove_callback_t&& cb) : + WFRemoveTask(queue, executor, std::move(cb)) + { + this->input.first = first; + this->input.last = last; + this->input.value = std::move(value); + this->output.first = NULL; + this->output.last = NULL; + } +}; + +template +WFRemoveTask *WFAlgoTaskFactory::create_remove_task(const std::string& name, + T *first, T *last, + T value, + CB callback) +{ + return new __WFRemoveTask(WFGlobal::get_exec_queue(name), + WFGlobal::get_compute_executor(), + first, last, std::move(value), + std::move(callback)); +} + +/****************** Unique ******************/ + +template +class __WFUniqueTask : public WFUniqueTask +{ +protected: + virtual void execute() + { + this->output.last = std::unique(this->input.first, this->input.last); + this->output.first = this->input.first; + } + +public: + __WFUniqueTask(ExecQueue *queue, Executor *executor, + T *first, T *last, + unique_callback_t&& cb) : + WFUniqueTask(queue, executor, std::move(cb)) + { + this->input.first = first; + this->input.last = last; + this->output.first = NULL; + this->output.last = NULL; + } +}; + +template +WFUniqueTask *WFAlgoTaskFactory::create_unique_task(const std::string& name, + T *first, T *last, + CB callback) +{ + return new __WFUniqueTask(WFGlobal::get_exec_queue(name), + WFGlobal::get_compute_executor(), + first, last, + std::move(callback)); +} + /****************** MapReduce ******************/ template