LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.0 % 90 81 9
Test Date: 2026-03-04 16:33:47 Functions: 88.9 % 18 16 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Michael Vandeberg
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/boostorg/capy
       9                 : //
      10                 : 
      11                 : #include <boost/capy/ex/thread_pool.hpp>
      12                 : #include <boost/capy/detail/intrusive.hpp>
      13                 : #include <boost/capy/test/thread_name.hpp>
      14                 : #include <condition_variable>
      15                 : #include <cstdio>
      16                 : #include <mutex>
      17                 : #include <thread>
      18                 : #include <vector>
      19                 : 
      20                 : /*
      21                 :     Thread pool implementation using a shared work queue.
      22                 : 
      23                 :     Work items are coroutine handles wrapped in intrusive list nodes, stored
      24                 :     in a single queue protected by a mutex. Worker threads wait on a
      25                 :     condition_variable until work is available or stop is requested.
      26                 : 
      27                 :     Threads are started lazily on first post() via std::call_once to avoid
      28                 :     spawning threads for pools that are constructed but never used. Each
      29                 :     thread is named with a configurable prefix plus index for debugger
      30                 :     visibility.
      31                 : 
      32                 :     Shutdown sequence: stop() sets the stop flag and notifies all threads,
      33                 :     then the destructor joins threads and destroys any remaining queued
      34                 :     work without executing it.
      35                 : */
      36                 : 
      37                 : namespace boost {
      38                 : namespace capy {
      39                 : 
      40                 : //------------------------------------------------------------------------------
      41                 : 
      42                 : class thread_pool::impl
      43                 : {
      44                 :     struct work : detail::intrusive_queue<work>::node
      45                 :     {
      46                 :         std::coroutine_handle<> h_;
      47                 : 
      48 HIT         127 :         explicit work(std::coroutine_handle<> h) noexcept
      49             127 :             : h_(h)
      50                 :         {
      51             127 :         }
      52                 : 
      53             127 :         void run()
      54                 :         {
      55             127 :             auto h = h_;
      56             127 :             delete this;
      57             127 :             h.resume();
      58             127 :         }
      59                 : 
      60 MIS           0 :         void destroy()
      61                 :         {
      62               0 :             delete this;
      63               0 :         }
      64                 :     };
      65                 : 
      66                 :     std::mutex mutex_;
      67                 :     std::condition_variable cv_;
      68                 :     detail::intrusive_queue<work> q_;
      69                 :     std::vector<std::thread> threads_;
      70                 :     bool stop_{false};
      71                 :     std::size_t num_threads_;
      72                 :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
      73                 :     std::once_flag start_flag_;
      74                 : 
      75                 : public:
      76 HIT          63 :     ~impl()
      77                 :     {
      78              63 :         stop();
      79             105 :         for(auto& t : threads_)
      80              42 :             if(t.joinable())
      81 MIS           0 :                 t.join();
      82                 : 
      83 HIT          63 :         while(auto* w = q_.pop())
      84 MIS           0 :             w->destroy();
      85 HIT          63 :     }
      86                 : 
      87              63 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
      88              63 :         : num_threads_(num_threads)
      89                 :     {
      90              63 :         if(num_threads_ == 0)
      91               2 :             num_threads_ = std::thread::hardware_concurrency();
      92              63 :         if(num_threads_ == 0)
      93 MIS           0 :             num_threads_ = 1;
      94                 : 
      95                 :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
      96 HIT          63 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
      97              63 :         thread_name_prefix_[n] = '\0';
      98              63 :     }
      99                 : 
     100                 :     void
     101             127 :     post(std::coroutine_handle<> h)
     102                 :     {
     103             127 :         ensure_started();
     104             127 :         auto* w = new work(h);
     105                 :         {
     106             127 :             std::lock_guard<std::mutex> lock(mutex_);
     107             127 :             q_.push(w);
     108             127 :         }
     109             127 :         cv_.notify_one();
     110             127 :     }
     111                 : 
     112                 :     void
     113              63 :     join() noexcept
     114                 :     {
     115              63 :         stop();
     116             105 :         for(auto& t : threads_)
     117              42 :             if(t.joinable())
     118              42 :                 t.join();
     119              63 :     }
     120                 : 
     121                 :     void
     122             126 :     stop() noexcept
     123                 :     {
     124                 :         {
     125             126 :             std::lock_guard<std::mutex> lock(mutex_);
     126             126 :             stop_ = true;
     127             126 :         }
     128             126 :         cv_.notify_all();
     129             126 :     }
     130                 : 
     131                 : private:
     132                 :     void
     133             127 :     ensure_started()
     134                 :     {
     135             127 :         std::call_once(start_flag_, [this]{
     136              24 :             threads_.reserve(num_threads_);
     137              66 :             for(std::size_t i = 0; i < num_threads_; ++i)
     138              84 :                 threads_.emplace_back([this, i]{ run(i); });
     139              24 :         });
     140             127 :     }
     141                 : 
     142                 :     void
     143              42 :     run(std::size_t index)
     144                 :     {
     145                 :         // Build name; set_current_thread_name truncates to platform limits.
     146                 :         char name[16];
     147              42 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     148              42 :         set_current_thread_name(name);
     149                 : 
     150                 :         for(;;)
     151                 :         {
     152             169 :             work* w = nullptr;
     153                 :             {
     154             169 :                 std::unique_lock<std::mutex> lock(mutex_);
     155             169 :                 cv_.wait(lock, [this]{
     156             343 :                     return !q_.empty() ||
     157             343 :                         stop_;
     158                 :                 });
     159             169 :                 if(stop_ && q_.empty())
     160              84 :                     return;
     161             127 :                 w = q_.pop();
     162             169 :             }
     163             127 :             if(w)
     164             127 :                 w->run();
     165             127 :         }
     166                 :     }
     167                 : };
     168                 : 
     169                 : //------------------------------------------------------------------------------
     170                 : 
     171              63 : thread_pool::
     172                 : ~thread_pool()
     173                 : {
     174              63 :     impl_->join();
     175              63 :     shutdown();
     176              63 :     destroy();
     177              63 :     delete impl_;
     178              63 : }
     179                 : 
     180              63 : thread_pool::
     181              63 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     182              63 :     : impl_(new impl(num_threads, thread_name_prefix))
     183                 : {
     184              63 :     this->set_frame_allocator(std::allocator<void>{});
     185              63 : }
     186                 : 
     187                 : void
     188 MIS           0 : thread_pool::
     189                 : stop() noexcept
     190                 : {
     191               0 :     impl_->stop();
     192               0 : }
     193                 : 
     194                 : //------------------------------------------------------------------------------
     195                 : 
     196                 : thread_pool::executor_type
     197 HIT          63 : thread_pool::
     198                 : get_executor() const noexcept
     199                 : {
     200              63 :     return executor_type(
     201              63 :         const_cast<thread_pool&>(*this));
     202                 : }
     203                 : 
     204                 : void
     205             127 : thread_pool::executor_type::
     206                 : post(std::coroutine_handle<> h) const
     207                 : {
     208             127 :     pool_->impl_->post(h);
     209             127 : }
     210                 : 
     211                 : } // capy
     212                 : } // boost
        

Generated by: LCOV version 2.3