TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #include <boost/capy/ex/thread_pool.hpp>
12 : #include <boost/capy/detail/intrusive.hpp>
13 : #include <boost/capy/test/thread_name.hpp>
14 : #include <condition_variable>
15 : #include <cstdio>
16 : #include <mutex>
17 : #include <thread>
18 : #include <vector>
19 :
20 : /*
21 : Thread pool implementation using a shared work queue.
22 :
23 : Work items are coroutine handles wrapped in intrusive list nodes, stored
24 : in a single queue protected by a mutex. Worker threads wait on a
25 : condition_variable until work is available or stop is requested.
26 :
27 : Threads are started lazily on first post() via std::call_once to avoid
28 : spawning threads for pools that are constructed but never used. Each
29 : thread is named with a configurable prefix plus index for debugger
30 : visibility.
31 :
32 : Shutdown sequence: stop() sets the stop flag and notifies all threads,
33 : then the destructor joins threads and destroys any remaining queued
34 : work without executing it.
35 : */
36 :
37 : namespace boost {
38 : namespace capy {
39 :
40 : //------------------------------------------------------------------------------
41 :
42 : class thread_pool::impl
43 : {
44 : struct work : detail::intrusive_queue<work>::node
45 : {
46 : std::coroutine_handle<> h_;
47 :
48 HIT 127 : explicit work(std::coroutine_handle<> h) noexcept
49 127 : : h_(h)
50 : {
51 127 : }
52 :
53 127 : void run()
54 : {
55 127 : auto h = h_;
56 127 : delete this;
57 127 : h.resume();
58 127 : }
59 :
60 MIS 0 : void destroy()
61 : {
62 0 : delete this;
63 0 : }
64 : };
65 :
66 : std::mutex mutex_;
67 : std::condition_variable cv_;
68 : detail::intrusive_queue<work> q_;
69 : std::vector<std::thread> threads_;
70 : bool stop_{false};
71 : std::size_t num_threads_;
72 : char thread_name_prefix_[13]{}; // 12 chars max + null terminator
73 : std::once_flag start_flag_;
74 :
75 : public:
76 HIT 63 : ~impl()
77 : {
78 63 : stop();
79 105 : for(auto& t : threads_)
80 42 : if(t.joinable())
81 MIS 0 : t.join();
82 :
83 HIT 63 : while(auto* w = q_.pop())
84 MIS 0 : w->destroy();
85 HIT 63 : }
86 :
87 63 : impl(std::size_t num_threads, std::string_view thread_name_prefix)
88 63 : : num_threads_(num_threads)
89 : {
90 63 : if(num_threads_ == 0)
91 2 : num_threads_ = std::thread::hardware_concurrency();
92 63 : if(num_threads_ == 0)
93 MIS 0 : num_threads_ = 1;
94 :
95 : // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
96 HIT 63 : auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
97 63 : thread_name_prefix_[n] = '\0';
98 63 : }
99 :
100 : void
101 127 : post(std::coroutine_handle<> h)
102 : {
103 127 : ensure_started();
104 127 : auto* w = new work(h);
105 : {
106 127 : std::lock_guard<std::mutex> lock(mutex_);
107 127 : q_.push(w);
108 127 : }
109 127 : cv_.notify_one();
110 127 : }
111 :
112 : void
113 63 : join() noexcept
114 : {
115 63 : stop();
116 105 : for(auto& t : threads_)
117 42 : if(t.joinable())
118 42 : t.join();
119 63 : }
120 :
121 : void
122 126 : stop() noexcept
123 : {
124 : {
125 126 : std::lock_guard<std::mutex> lock(mutex_);
126 126 : stop_ = true;
127 126 : }
128 126 : cv_.notify_all();
129 126 : }
130 :
131 : private:
132 : void
133 127 : ensure_started()
134 : {
135 127 : std::call_once(start_flag_, [this]{
136 24 : threads_.reserve(num_threads_);
137 66 : for(std::size_t i = 0; i < num_threads_; ++i)
138 84 : threads_.emplace_back([this, i]{ run(i); });
139 24 : });
140 127 : }
141 :
142 : void
143 42 : run(std::size_t index)
144 : {
145 : // Build name; set_current_thread_name truncates to platform limits.
146 : char name[16];
147 42 : std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
148 42 : set_current_thread_name(name);
149 :
150 : for(;;)
151 : {
152 169 : work* w = nullptr;
153 : {
154 169 : std::unique_lock<std::mutex> lock(mutex_);
155 169 : cv_.wait(lock, [this]{
156 343 : return !q_.empty() ||
157 343 : stop_;
158 : });
159 169 : if(stop_ && q_.empty())
160 84 : return;
161 127 : w = q_.pop();
162 169 : }
163 127 : if(w)
164 127 : w->run();
165 127 : }
166 : }
167 : };
168 :
169 : //------------------------------------------------------------------------------
170 :
171 63 : thread_pool::
172 : ~thread_pool()
173 : {
174 63 : impl_->join();
175 63 : shutdown();
176 63 : destroy();
177 63 : delete impl_;
178 63 : }
179 :
180 63 : thread_pool::
181 63 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
182 63 : : impl_(new impl(num_threads, thread_name_prefix))
183 : {
184 63 : this->set_frame_allocator(std::allocator<void>{});
185 63 : }
186 :
187 : void
188 MIS 0 : thread_pool::
189 : stop() noexcept
190 : {
191 0 : impl_->stop();
192 0 : }
193 :
194 : //------------------------------------------------------------------------------
195 :
196 : thread_pool::executor_type
197 HIT 63 : thread_pool::
198 : get_executor() const noexcept
199 : {
200 63 : return executor_type(
201 63 : const_cast<thread_pool&>(*this));
202 : }
203 :
204 : void
205 127 : thread_pool::executor_type::
206 : post(std::coroutine_handle<> h) const
207 : {
208 127 : pool_->impl_->post(h);
209 127 : }
210 :
211 : } // capy
212 : } // boost
|