RigsofRods
Soft-body Physics Simulation
ThreadPool.h
Go to the documentation of this file.
1 /*
2 This source file is part of Rigs of Rods
3 Copyright 2016 Fabian Killus
4 
5 For more information, see http://www.rigsofrods.org/
6 
7 Rigs of Rods is free software: you can redistribute it and/or modify
8 it under the terms of the GNU General Public License version 3, as
9 published by the Free Software Foundation.
10 
11 Rigs of Rods is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15 
16 You should have received a copy of the GNU General Public License
17 along with Rigs of Rods. If not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #pragma once
21 
22 #include "Application.h"
23 
24 #include <atomic>
25 #include <condition_variable>
26 #include <functional>
27 #include <memory>
28 #include <mutex>
29 #include <queue>
30 #include <thread>
31 #include <stdexcept>
32 #include <vector>
33 
34 namespace RoR {
35 
44 class Task
45 {
46  friend class ThreadPool;
47  public:
49  void join() const
50  {
51  // Wait until the tasks is_finished property is set to true by the ThreadPool instance.
52  // Three possible scenarios:
53  // 1) Execution of task has not started yet
54  // - locks task_mutex
55  // - is_finished will be false
56  // - therefore unlocks task_mutex again and waits until signaled by thread pool
57  // - then is_finished will be true (except in case of spurious wakeups for which the waiting continues)
58  // 2) Task is being executed
59  // - will try to lock task_mutex, but will fail
60  // - task_mutex is locked while the task is still running
61  // - is_finished will be true after aquiring the lock
62  // 3) Task has already finished execution
63  // - locks task_mutex
64  // - is_finished will be true
65  std::unique_lock<std::mutex> lock(m_task_mutex);
66  m_finish_cv.wait(lock, [this]{ return m_is_finished; });
67  }
68 
69  private:
70  // Only constructable by friend class ThreadPool
71  Task(std::function<void()> task_func) : m_task_func(task_func) {}
72  Task(Task &) = delete;
73  Task & operator=(Task &) = delete;
74 
75  bool m_is_finished = false;
76  mutable std::condition_variable m_finish_cv;
77  mutable std::mutex m_task_mutex;
78  const std::function<void()> m_task_func;
79 };
80 
105 class ThreadPool {
106 public:
108  {
109  // Create general-purpose thread pool
110  int logical_cores = std::thread::hardware_concurrency();
111 
112  int num_threads = App::app_num_workers->getInt();
113  if (num_threads < 1 || num_threads > logical_cores)
114  {
115  num_threads = Ogre::Math::Clamp(logical_cores - 1, 1, 8);
116  App::app_num_workers->setVal(num_threads);
117  }
118 
119  RoR::LogFormat("[RoR|ThreadPool] Found %d logical CPU cores, creating %d worker threads",
120  logical_cores, num_threads);
121 
122  return new ThreadPool(num_threads);
123  }
124 
129  ThreadPool(int num_threads)
130  {
131  ROR_ASSERT(num_threads > 0);
132 
133  // Generic function (to be run on a separate thread) within which submitted tasks
134  // are executed. It implements an endless loop (only returning when the ThreadPool
135  // instance itself is destructed) which constantly checks the task queue, grabbing
136  // and executing the frontmost task while the queue is not empty.
137  auto thread_body = [this]{
138  while (true) {
139  // Get next task from queue (synchronized access via taskqueue_mutex).
140  // If the queue is empty wait until either
141  // - being signaled about an available task.
142  // - the terminate flag is true (i.e. the ThreadPool instance is being destructed).
143  // In this case return from the running thread.
144  std::unique_lock<std::mutex> queue_lock(m_taskqueue_mutex);
145  while (m_taskqueue.empty()) {
146  if (m_terminate.load()) { return; }
147  m_task_available_cv.wait(queue_lock);
148  }
149  const auto current_task = m_taskqueue.front();
150  m_taskqueue.pop();
151  queue_lock.unlock();
152 
153  // Execute the actual task and signal the associated Task instance when finished.
154  {
155  std::lock_guard<std::mutex> task_lock(current_task->m_task_mutex);
156  current_task->m_task_func();
157  current_task->m_is_finished = true;
158  }
159  current_task->m_finish_cv.notify_all();
160  }
161  };
162 
163  // Launch the specified number of threads
164  for (int i = 0; i < num_threads; ++i) {
165  m_threads.emplace_back(thread_body);
166  }
167  }
168 
170  // Indicate termination and signal potential waiting threads to wake up.
171  // Then wait for all threads to finish their work and return properly.
172  m_terminate = true;
173  m_task_available_cv.notify_all();
174  for (auto &t : m_threads) { t.join(); }
175  }
176 
178  std::shared_ptr<Task> RunTask(const std::function<void()> &task_func) {
179  // Wrap provided task callable object in task handle. Then append it to the task queue and
180  // notify a waiting worker thread (if any) about the newly available task
181  auto task = std::shared_ptr<Task>(new Task(task_func));
182  {
183  std::lock_guard<std::mutex> lock(m_taskqueue_mutex);
184  m_taskqueue.push(task);
185  }
186  m_task_available_cv.notify_one();
187 
188  // Return task handle for later synchronization
189  return task;
190  }
191 
193  void Parallelize(const std::vector<std::function<void()>> &task_funcs)
194  {
195  if (task_funcs.empty()) return;
196 
197  // Launch all provided tasks (except for the first) in parallel and store the associated handles
198  auto it = begin(task_funcs);
199  const auto first_task = it++;
200  std::vector<std::shared_ptr<Task>> handles;
201  for(; it != end(task_funcs); ++it)
202  {
203  handles.push_back(RunTask(*it));
204  }
205 
206  // Run the first task locally on the current thread
207  (*first_task)();
208 
209  // Synchronize, i.e. wait for all parallelized tasks to complete
210  for(const auto &h : handles) { h->join(); }
211  }
212 
213  std::atomic_bool m_terminate{false};
214  std::vector<std::thread> m_threads;
215  std::queue<std::shared_ptr<Task>> m_taskqueue;
216  std::mutex m_taskqueue_mutex;
217  std::condition_variable m_task_available_cv;
218 };
219 
220 } // namespace RoR
ROR_ASSERT
#define ROR_ASSERT(_EXPR)
Definition: Application.h:40
RoR::ThreadPool::m_threads
std::vector< std::thread > m_threads
Collection of worker threads to run tasks.
Definition: ThreadPool.h:214
RoR::ThreadPool::m_taskqueue
std::queue< std::shared_ptr< Task > > m_taskqueue
Queue of submitted tasks pending for execution.
Definition: ThreadPool.h:215
RoR::Task::m_is_finished
bool m_is_finished
Indicates whether the task execution has finished.
Definition: ThreadPool.h:75
RoR::App::app_num_workers
CVar * app_num_workers
Definition: Application.cpp:84
RoR::LogFormat
void LogFormat(const char *format,...)
Improved logging utility. Uses fixed 2Kb buffer.
Definition: Application.cpp:424
RoR::Task::Task
Task(std::function< void()> task_func)
Definition: ThreadPool.h:71
RoR::Task::join
void join() const
Block the current thread and wait for the associated task to finish.
Definition: ThreadPool.h:49
RoR::Task::operator=
Task & operator=(Task &)=delete
RoR::ThreadPool
Facilitates execution of (small) tasks on separate threads.
Definition: ThreadPool.h:105
Application.h
Central state/object manager and communications hub.
RoR::ThreadPool::~ThreadPool
~ThreadPool()
Definition: ThreadPool.h:169
RoR::Task::m_task_func
const std::function< void()> m_task_func
Callable object which implements the task to execute.
Definition: ThreadPool.h:78
RoR::ThreadPool::m_taskqueue_mutex
std::mutex m_taskqueue_mutex
Protects task queue from concurrent access.
Definition: ThreadPool.h:216
RoR::ThreadPool::RunTask
std::shared_ptr< Task > RunTask(const std::function< void()> &task_func)
Submit new asynchronous task to thread pool and return Task handle to allow for synchronization.
Definition: ThreadPool.h:178
RoR::CVar::setVal
void setVal(T val)
Definition: CVar.h:72
RoR::Task
/brief Handle for a task executed by ThreadPool
Definition: ThreadPool.h:44
RoR::ThreadPool::m_terminate
std::atomic_bool m_terminate
Indicates destruction of ThreadPool instance to worker threads.
Definition: ThreadPool.h:213
RoR::CVar::getInt
int getInt() const
Definition: CVar.h:97
RoR::ThreadPool::ThreadPool
ThreadPool(int num_threads)
Construct thread pool and launch worker threads.
Definition: ThreadPool.h:129
RoR::ThreadPool::m_task_available_cv
std::condition_variable m_task_available_cv
Used to signal threads that a new task was submitted and is ready to run.
Definition: ThreadPool.h:217
RoR::Task::m_task_mutex
std::mutex m_task_mutex
Mutex which is locked while the task is running.
Definition: ThreadPool.h:77
RoR::ThreadPool::Parallelize
void Parallelize(const std::vector< std::function< void()>> &task_funcs)
Run collection of tasks in parallel and wait until all have finished.
Definition: ThreadPool.h:193
RoR
Definition: AppContext.h:36
RoR::Task::m_finish_cv
std::condition_variable m_finish_cv
Used to signal the current thread when the task has finished.
Definition: ThreadPool.h:76
RoR::ThreadPool::DetectNumWorkersAndCreate
static ThreadPool * DetectNumWorkersAndCreate()
Definition: ThreadPool.h:107