Rigs of Rods 2023.09
Soft-body Physics Simulation
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Loading...
Searching...
No Matches
ThreadPool.h
Go to the documentation of this file.
1/*
2This source file is part of Rigs of Rods
3Copyright 2016 Fabian Killus
4
5For more information, see http://www.rigsofrods.org/
6
7Rigs of Rods is free software: you can redistribute it and/or modify
8it under the terms of the GNU General Public License version 3, as
9published by the Free Software Foundation.
10
11Rigs of Rods is distributed in the hope that it will be useful,
12but WITHOUT ANY WARRANTY; without even the implied warranty of
13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14GNU General Public License for more details.
15
16You should have received a copy of the GNU General Public License
17along with Rigs of Rods. If not, see <http://www.gnu.org/licenses/>.
18*/
19
20#pragma once
21
22#include "Application.h"
23
24#include <atomic>
25#include <condition_variable>
26#include <functional>
27#include <memory>
28#include <mutex>
29#include <queue>
30#include <thread>
31#include <stdexcept>
32#include <vector>
33
34namespace RoR {
35
44class Task
45{
46 friend class ThreadPool;
47 public:
49 void join() const
50 {
51 // Wait until the tasks is_finished property is set to true by the ThreadPool instance.
52 // Three possible scenarios:
53 // 1) Execution of task has not started yet
54 // - locks task_mutex
55 // - is_finished will be false
56 // - therefore unlocks task_mutex again and waits until signaled by thread pool
57 // - then is_finished will be true (except in case of spurious wakeups for which the waiting continues)
58 // 2) Task is being executed
59 // - will try to lock task_mutex, but will fail
60 // - task_mutex is locked while the task is still running
61 // - is_finished will be true after aquiring the lock
62 // 3) Task has already finished execution
63 // - locks task_mutex
64 // - is_finished will be true
65 std::unique_lock<std::mutex> lock(m_task_mutex);
66 m_finish_cv.wait(lock, [this]{ return m_is_finished; });
67 }
68
69 private:
70 // Only constructable by friend class ThreadPool
71 Task(std::function<void()> task_func) : m_task_func(task_func) {}
72 Task(Task &) = delete;
73 Task & operator=(Task &) = delete;
74
75 bool m_is_finished = false;
76 mutable std::condition_variable m_finish_cv;
77 mutable std::mutex m_task_mutex;
78 const std::function<void()> m_task_func;
79};
80
106public:
108 {
109 // Create general-purpose thread pool
110 int logical_cores = std::thread::hardware_concurrency();
111
112 int num_threads = App::app_num_workers->getInt();
113 if (num_threads < 1 || num_threads > logical_cores)
114 {
115 num_threads = Ogre::Math::Clamp(logical_cores - 1, 1, 8);
116 App::app_num_workers->setVal(num_threads);
117 }
118
119 RoR::LogFormat("[RoR|ThreadPool] Found %d logical CPU cores, creating %d worker threads",
120 logical_cores, num_threads);
121
122 return new ThreadPool(num_threads);
123 }
124
129 ThreadPool(int num_threads)
130 {
131 ROR_ASSERT(num_threads > 0);
132
133 // Generic function (to be run on a separate thread) within which submitted tasks
134 // are executed. It implements an endless loop (only returning when the ThreadPool
135 // instance itself is destructed) which constantly checks the task queue, grabbing
136 // and executing the frontmost task while the queue is not empty.
137 auto thread_body = [this]{
138 while (true) {
139 // Get next task from queue (synchronized access via taskqueue_mutex).
140 // If the queue is empty wait until either
141 // - being signaled about an available task.
142 // - the terminate flag is true (i.e. the ThreadPool instance is being destructed).
143 // In this case return from the running thread.
144 std::unique_lock<std::mutex> queue_lock(m_taskqueue_mutex);
145 while (m_taskqueue.empty()) {
146 if (m_terminate.load()) { return; }
147 m_task_available_cv.wait(queue_lock);
148 }
149 const auto current_task = m_taskqueue.front();
150 m_taskqueue.pop();
151 queue_lock.unlock();
152
153 // Execute the actual task and signal the associated Task instance when finished.
154 {
155 std::lock_guard<std::mutex> task_lock(current_task->m_task_mutex);
156 current_task->m_task_func();
157 current_task->m_is_finished = true;
158 }
159 current_task->m_finish_cv.notify_all();
160 }
161 };
162
163 // Launch the specified number of threads
164 for (int i = 0; i < num_threads; ++i) {
165 m_threads.emplace_back(thread_body);
166 }
167 }
168
170 // Indicate termination and signal potential waiting threads to wake up.
171 // Then wait for all threads to finish their work and return properly.
172 m_terminate = true;
173 m_task_available_cv.notify_all();
174 for (auto &t : m_threads) { t.join(); }
175 }
176
178 std::shared_ptr<Task> RunTask(const std::function<void()> &task_func) {
179 // Wrap provided task callable object in task handle. Then append it to the task queue and
180 // notify a waiting worker thread (if any) about the newly available task
181 auto task = std::shared_ptr<Task>(new Task(task_func));
182 {
183 std::lock_guard<std::mutex> lock(m_taskqueue_mutex);
184 m_taskqueue.push(task);
185 }
186 m_task_available_cv.notify_one();
187
188 // Return task handle for later synchronization
189 return task;
190 }
191
193 void Parallelize(const std::vector<std::function<void()>> &task_funcs)
194 {
195 if (task_funcs.empty()) return;
196
197 // Launch all provided tasks (except for the first) in parallel and store the associated handles
198 auto it = begin(task_funcs);
199 const auto first_task = it++;
200 std::vector<std::shared_ptr<Task>> handles;
201 for(; it != end(task_funcs); ++it)
202 {
203 handles.push_back(RunTask(*it));
204 }
205
206 // Run the first task locally on the current thread
207 (*first_task)();
208
209 // Synchronize, i.e. wait for all parallelized tasks to complete
210 for(const auto &h : handles) { h->join(); }
211 }
212
213 std::atomic_bool m_terminate{false};
214 std::vector<std::thread> m_threads;
215 std::queue<std::shared_ptr<Task>> m_taskqueue;
216 std::mutex m_taskqueue_mutex;
217 std::condition_variable m_task_available_cv;
218};
219
220} // namespace RoR
Central state/object manager and communications hub.
#define ROR_ASSERT(_EXPR)
Definition Application.h:40
void setVal(T val)
Definition CVar.h:72
int getInt() const
Definition CVar.h:97
/brief Handle for a task executed by ThreadPool
Definition ThreadPool.h:45
std::condition_variable m_finish_cv
Used to signal the current thread when the task has finished.
Definition ThreadPool.h:76
bool m_is_finished
Indicates whether the task execution has finished.
Definition ThreadPool.h:75
void join() const
Block the current thread and wait for the associated task to finish.
Definition ThreadPool.h:49
Task & operator=(Task &)=delete
std::mutex m_task_mutex
Mutex which is locked while the task is running.
Definition ThreadPool.h:77
const std::function< void()> m_task_func
Callable object which implements the task to execute.
Definition ThreadPool.h:78
Task(Task &)=delete
Task(std::function< void()> task_func)
Definition ThreadPool.h:71
Facilitates execution of (small) tasks on separate threads.
Definition ThreadPool.h:105
static ThreadPool * DetectNumWorkersAndCreate()
Definition ThreadPool.h:107
std::shared_ptr< Task > RunTask(const std::function< void()> &task_func)
Submit new asynchronous task to thread pool and return Task handle to allow for synchronization.
Definition ThreadPool.h:178
void Parallelize(const std::vector< std::function< void()> > &task_funcs)
Run collection of tasks in parallel and wait until all have finished.
Definition ThreadPool.h:193
std::atomic_bool m_terminate
Indicates destruction of ThreadPool instance to worker threads.
Definition ThreadPool.h:213
std::queue< std::shared_ptr< Task > > m_taskqueue
Queue of submitted tasks pending for execution.
Definition ThreadPool.h:215
std::mutex m_taskqueue_mutex
Protects task queue from concurrent access.
Definition ThreadPool.h:216
std::vector< std::thread > m_threads
Collection of worker threads to run tasks.
Definition ThreadPool.h:214
ThreadPool(int num_threads)
Construct thread pool and launch worker threads.
Definition ThreadPool.h:129
std::condition_variable m_task_available_cv
Used to signal threads that a new task was submitted and is ready to run.
Definition ThreadPool.h:217
CVar * app_num_workers
void LogFormat(const char *format,...)
Improved logging utility. Uses fixed 2Kb buffer.