//===--------------------- TaskPool.h ---------------------------*- C++ -*-===// // // The LLVM Compiler Infrastructure // // This file is distributed under the University of Illinois Open Source // License. See LICENSE.TXT for details. // //===----------------------------------------------------------------------===// #ifndef utility_TaskPool_h_ #define utility_TaskPool_h_ #if defined(__cplusplus) && defined(_MSC_VER) && (_HAS_EXCEPTIONS == 0) // Compiling MSVC libraries with _HAS_EXCEPTIONS=0, eliminates most but not all // calls to __uncaught_exception. Unfortunately, it does seem to eliminate // the delcaration of __uncaught_excpeiton. Including ensures that it is // declared. This may not be necessary after MSVC 12. #include #endif #if defined(_MSC_VER) // Due to another bug in MSVC 2013, including will generate hundreds of // warnings in the Concurrency Runtime. This can be removed when we switch to // MSVC 2015 #pragma warning(push) #pragma warning(disable:4062) #endif #include #include #include #include #include #include #include // Global TaskPool class for running tasks in parallel on a set of worker thread created the first // time the task pool is used. The TaskPool provide no gurantee about the order the task will be run // and about what tasks will run in parrallel. None of the task added to the task pool should block // on something (mutex, future, condition variable) what will be set only by the completion of an // other task on the task pool as they may run on the same thread sequentally. class TaskPool { public: // Add a new task to the task pool and return a std::future belonging to the newly created task. // The caller of this function has to wait on the future for this task to complete. template static std::future::type> AddTask(F&& f, Args&&... args); // Run all of the specified tasks on the task pool and wait until all of them are finished // before returning. This method is intended to be used for small number tasks where listing // them as function arguments is acceptable. For running large number of tasks you should use // AddTask for each task and then call wait() on each returned future. template static void RunTasks(T&&... tasks); private: TaskPool() = delete; template struct RunTaskImpl; static void AddTaskImpl(std::function&& task_fn); }; // Wrapper class around the global TaskPool implementation to make it possible to create a set of // tasks and then wait for the tasks to be completed by the WaitForNextCompletedTask call. This // class should be used when WaitForNextCompletedTask is needed because this class add no other // extra functionality to the TaskPool class and it have a very minor performance overhead. template // The return type of the tasks what will be added to this task runner class TaskRunner { public: // Add a task to the task runner what will also add the task to the global TaskPool. The // function doesn't return the std::future for the task because it will be supplied by the // WaitForNextCompletedTask after the task is completed. template void AddTask(F&& f, Args&&... args); // Wait for the next task in this task runner to finish and then return the std::future what // belongs to the finished task. If there is no task in this task runner (neither pending nor // comleted) then this function will return an invalid future. Usually this function should be // called in a loop processing the results of the tasks until it returns an invalid std::future // what means that all task in this task runner is completed. std::future WaitForNextCompletedTask(); // Convenience method to wait for all task in this TaskRunner to finish. Do NOT use this class // just because of this method. Use TaskPool instead and wait for each std::future returned by // AddTask in a loop. void WaitForAllTasks(); private: std::list> m_ready; std::list> m_pending; std::mutex m_mutex; std::condition_variable m_cv; }; template std::future::type> TaskPool::AddTask(F&& f, Args&&... args) { auto task_sp = std::make_shared::type()>>( std::bind(std::forward(f), std::forward(args)...)); AddTaskImpl([task_sp]() { (*task_sp)(); }); return task_sp->get_future(); } template void TaskPool::RunTasks(T&&... tasks) { RunTaskImpl::Run(std::forward(tasks)...); } template struct TaskPool::RunTaskImpl { static void Run(Head&& h, Tail&&... t) { auto f = AddTask(std::forward(h)); RunTaskImpl::Run(std::forward(t)...); f.wait(); } }; template<> struct TaskPool::RunTaskImpl<> { static void Run() {} }; template template void TaskRunner::AddTask(F&& f, Args&&... args) { std::unique_lock lock(m_mutex); auto it = m_pending.emplace(m_pending.end()); *it = std::move(TaskPool::AddTask( [this, it](F f, Args... args) { T&& r = f(std::forward(args)...); std::unique_lock lock(this->m_mutex); this->m_ready.splice(this->m_ready.end(), this->m_pending, it); lock.unlock(); this->m_cv.notify_one(); return r; }, std::forward(f), std::forward(args)...)); } template <> template void TaskRunner::AddTask(F&& f, Args&&... args) { std::unique_lock lock(m_mutex); auto it = m_pending.emplace(m_pending.end()); *it = std::move(TaskPool::AddTask( [this, it](F f, Args... args) { f(std::forward(args)...); std::unique_lock lock(this->m_mutex); this->m_ready.emplace_back(std::move(*it)); this->m_pending.erase(it); lock.unlock(); this->m_cv.notify_one(); }, std::forward(f), std::forward(args)...)); } template std::future TaskRunner::WaitForNextCompletedTask() { std::unique_lock lock(m_mutex); if (m_ready.empty() && m_pending.empty()) return std::future(); // No more tasks if (m_ready.empty()) m_cv.wait(lock, [this](){ return !this->m_ready.empty(); }); std::future res = std::move(m_ready.front()); m_ready.pop_front(); lock.unlock(); res.wait(); return std::move(res); } template void TaskRunner::WaitForAllTasks() { while (WaitForNextCompletedTask().valid()); } #if defined(_MSC_VER) #pragma warning(pop) #endif #endif // #ifndef utility_TaskPool_h_