src/ex/thread_pool.cpp

90.0% Lines (81/90) 88.9% Functions (16/18)
src/ex/thread_pool.cpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/detail/intrusive.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <condition_variable>
15 #include <cstdio>
16 #include <mutex>
17 #include <thread>
18 #include <vector>
19
20 /*
21 Thread pool implementation using a shared work queue.
22
23 Work items are coroutine handles wrapped in intrusive list nodes, stored
24 in a single queue protected by a mutex. Worker threads wait on a
25 condition_variable until work is available or stop is requested.
26
27 Threads are started lazily on first post() via std::call_once to avoid
28 spawning threads for pools that are constructed but never used. Each
29 thread is named with a configurable prefix plus index for debugger
30 visibility.
31
32 Shutdown sequence: stop() sets the stop flag and notifies all threads,
33 then the destructor joins threads and destroys any remaining queued
34 work without executing it.
35 */
36
37 namespace boost {
38 namespace capy {
39
40 //------------------------------------------------------------------------------
41
42 class thread_pool::impl
43 {
44 struct work : detail::intrusive_queue<work>::node
45 {
46 std::coroutine_handle<> h_;
47
48 127 explicit work(std::coroutine_handle<> h) noexcept
49 127 : h_(h)
50 {
51 127 }
52
53 127 void run()
54 {
55 127 auto h = h_;
56 127 delete this;
57 127 h.resume();
58 127 }
59
60 void destroy()
61 {
62 delete this;
63 }
64 };
65
66 std::mutex mutex_;
67 std::condition_variable cv_;
68 detail::intrusive_queue<work> q_;
69 std::vector<std::thread> threads_;
70 bool stop_{false};
71 std::size_t num_threads_;
72 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
73 std::once_flag start_flag_;
74
75 public:
76 63 ~impl()
77 {
78 63 stop();
79 105 for(auto& t : threads_)
80 42 if(t.joinable())
81 t.join();
82
83 63 while(auto* w = q_.pop())
84 w->destroy();
85 63 }
86
87 63 impl(std::size_t num_threads, std::string_view thread_name_prefix)
88 63 : num_threads_(num_threads)
89 {
90 63 if(num_threads_ == 0)
91 2 num_threads_ = std::thread::hardware_concurrency();
92 63 if(num_threads_ == 0)
93 num_threads_ = 1;
94
95 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
96 63 auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
97 63 thread_name_prefix_[n] = '\0';
98 63 }
99
100 void
101 127 post(std::coroutine_handle<> h)
102 {
103 127 ensure_started();
104 127 auto* w = new work(h);
105 {
106 127 std::lock_guard<std::mutex> lock(mutex_);
107 127 q_.push(w);
108 127 }
109 127 cv_.notify_one();
110 127 }
111
112 void
113 63 join() noexcept
114 {
115 63 stop();
116 105 for(auto& t : threads_)
117 42 if(t.joinable())
118 42 t.join();
119 63 }
120
121 void
122 126 stop() noexcept
123 {
124 {
125 126 std::lock_guard<std::mutex> lock(mutex_);
126 126 stop_ = true;
127 126 }
128 126 cv_.notify_all();
129 126 }
130
131 private:
132 void
133 127 ensure_started()
134 {
135 127 std::call_once(start_flag_, [this]{
136 24 threads_.reserve(num_threads_);
137 66 for(std::size_t i = 0; i < num_threads_; ++i)
138 84 threads_.emplace_back([this, i]{ run(i); });
139 24 });
140 127 }
141
142 void
143 42 run(std::size_t index)
144 {
145 // Build name; set_current_thread_name truncates to platform limits.
146 char name[16];
147 42 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
148 42 set_current_thread_name(name);
149
150 for(;;)
151 {
152 169 work* w = nullptr;
153 {
154 169 std::unique_lock<std::mutex> lock(mutex_);
155 169 cv_.wait(lock, [this]{
156 343 return !q_.empty() ||
157 343 stop_;
158 });
159 169 if(stop_ && q_.empty())
160 84 return;
161 127 w = q_.pop();
162 169 }
163 127 if(w)
164 127 w->run();
165 127 }
166 }
167 };
168
169 //------------------------------------------------------------------------------
170
171 63 thread_pool::
172 ~thread_pool()
173 {
174 63 impl_->join();
175 63 shutdown();
176 63 destroy();
177 63 delete impl_;
178 63 }
179
180 63 thread_pool::
181 63 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
182 63 : impl_(new impl(num_threads, thread_name_prefix))
183 {
184 63 this->set_frame_allocator(std::allocator<void>{});
185 63 }
186
187 void
188 thread_pool::
189 stop() noexcept
190 {
191 impl_->stop();
192 }
193
194 //------------------------------------------------------------------------------
195
196 thread_pool::executor_type
197 63 thread_pool::
198 get_executor() const noexcept
199 {
200 63 return executor_type(
201 63 const_cast<thread_pool&>(*this));
202 }
203
204 void
205 127 thread_pool::executor_type::
206 post(std::coroutine_handle<> h) const
207 {
208 127 pool_->impl_->post(h);
209 127 }
210
211 } // capy
212 } // boost
213