LCOV - code coverage report
Current view: top level - capy/io - any_read_source.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 91.1 % 135 123 12
Test Date: 2026-03-04 16:33:47 Functions: 82.4 % 51 42 9

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
      11                 : #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/await_suspend_helper.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/buffers/buffer_array.hpp>
      17                 : #include <boost/capy/buffers/buffer_param.hpp>
      18                 : #include <boost/capy/concept/io_awaitable.hpp>
      19                 : #include <boost/capy/concept/read_source.hpp>
      20                 : #include <boost/capy/ex/io_env.hpp>
      21                 : #include <boost/capy/io_result.hpp>
      22                 : #include <boost/capy/io_task.hpp>
      23                 : 
      24                 : #include <concepts>
      25                 : #include <coroutine>
      26                 : #include <cstddef>
      27                 : #include <exception>
      28                 : #include <new>
      29                 : #include <span>
      30                 : #include <stop_token>
      31                 : #include <system_error>
      32                 : #include <utility>
      33                 : 
      34                 : namespace boost {
      35                 : namespace capy {
      36                 : 
      37                 : /** Type-erased wrapper for any ReadSource.
      38                 : 
      39                 :     This class provides type erasure for any type satisfying the
      40                 :     @ref ReadSource concept, enabling runtime polymorphism for
      41                 :     source read operations. It uses cached awaitable storage to achieve
      42                 :     zero steady-state allocation after construction.
      43                 : 
      44                 :     The wrapper supports two construction modes:
      45                 :     - **Owning**: Pass by value to transfer ownership. The wrapper
      46                 :       allocates storage and owns the source.
      47                 :     - **Reference**: Pass a pointer to wrap without ownership. The
      48                 :       pointed-to source must outlive this wrapper.
      49                 : 
      50                 :     @par Awaitable Preallocation
      51                 :     The constructor preallocates storage for the type-erased awaitable.
      52                 :     This reserves all virtual address space at server startup
      53                 :     so memory usage can be measured up front, rather than
      54                 :     allocating piecemeal as traffic arrives.
      55                 : 
      56                 :     @par Immediate Completion
      57                 :     Operations complete immediately without suspending when the
      58                 :     buffer sequence is empty, or when the underlying source's
      59                 :     awaitable reports readiness via `await_ready`.
      60                 : 
      61                 :     @par Thread Safety
      62                 :     Not thread-safe. Concurrent operations on the same wrapper
      63                 :     are undefined behavior.
      64                 : 
      65                 :     @par Example
      66                 :     @code
      67                 :     // Owning - takes ownership of the source
      68                 :     any_read_source rs(some_source{args...});
      69                 : 
      70                 :     // Reference - wraps without ownership
      71                 :     some_source source;
      72                 :     any_read_source rs(&source);
      73                 : 
      74                 :     mutable_buffer buf(data, size);
      75                 :     auto [ec, n] = co_await rs.read(std::span(&buf, 1));
      76                 :     @endcode
      77                 : 
      78                 :     @see any_read_stream, ReadSource
      79                 : */
      80                 : class any_read_source
      81                 : {
      82                 :     struct vtable;
      83                 :     struct awaitable_ops;
      84                 : 
      85                 :     template<ReadSource S>
      86                 :     struct vtable_for_impl;
      87                 : 
      88                 :     void* source_ = nullptr;
      89                 :     vtable const* vt_ = nullptr;
      90                 :     void* cached_awaitable_ = nullptr;
      91                 :     void* storage_ = nullptr;
      92                 :     awaitable_ops const* active_ops_ = nullptr;
      93                 : 
      94                 : public:
      95                 :     /** Destructor.
      96                 : 
      97                 :         Destroys the owned source (if any) and releases the cached
      98                 :         awaitable storage.
      99                 :     */
     100                 :     ~any_read_source();
     101                 : 
     102                 :     /** Default constructor.
     103                 : 
     104                 :         Constructs an empty wrapper. Operations on a default-constructed
     105                 :         wrapper result in undefined behavior.
     106                 :     */
     107                 :     any_read_source() = default;
     108                 : 
     109                 :     /** Non-copyable.
     110                 : 
     111                 :         The awaitable cache is per-instance and cannot be shared.
     112                 :     */
     113                 :     any_read_source(any_read_source const&) = delete;
     114                 :     any_read_source& operator=(any_read_source const&) = delete;
     115                 : 
     116                 :     /** Move constructor.
     117                 : 
     118                 :         Transfers ownership of the wrapped source (if owned) and
     119                 :         cached awaitable storage from `other`. After the move, `other` is
     120                 :         in a default-constructed state.
     121                 : 
     122                 :         @param other The wrapper to move from.
     123                 :     */
     124 HIT           1 :     any_read_source(any_read_source&& other) noexcept
     125               1 :         : source_(std::exchange(other.source_, nullptr))
     126               1 :         , vt_(std::exchange(other.vt_, nullptr))
     127               1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     128               1 :         , storage_(std::exchange(other.storage_, nullptr))
     129               1 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     130                 :     {
     131               1 :     }
     132                 : 
     133                 :     /** Move assignment operator.
     134                 : 
     135                 :         Destroys any owned source and releases existing resources,
     136                 :         then transfers ownership from `other`.
     137                 : 
     138                 :         @param other The wrapper to move from.
     139                 :         @return Reference to this wrapper.
     140                 :     */
     141                 :     any_read_source&
     142                 :     operator=(any_read_source&& other) noexcept;
     143                 : 
     144                 :     /** Construct by taking ownership of a ReadSource.
     145                 : 
     146                 :         Allocates storage and moves the source into this wrapper.
     147                 :         The wrapper owns the source and will destroy it.
     148                 : 
     149                 :         @param s The source to take ownership of.
     150                 :     */
     151                 :     template<ReadSource S>
     152                 :         requires (!std::same_as<std::decay_t<S>, any_read_source>)
     153                 :     any_read_source(S s);
     154                 : 
     155                 :     /** Construct by wrapping a ReadSource without ownership.
     156                 : 
     157                 :         Wraps the given source by pointer. The source must remain
     158                 :         valid for the lifetime of this wrapper.
     159                 : 
     160                 :         @param s Pointer to the source to wrap.
     161                 :     */
     162                 :     template<ReadSource S>
     163                 :     any_read_source(S* s);
     164                 : 
     165                 :     /** Check if the wrapper contains a valid source.
     166                 : 
     167                 :         @return `true` if wrapping a source, `false` if default-constructed
     168                 :             or moved-from.
     169                 :     */
     170                 :     bool
     171              27 :     has_value() const noexcept
     172                 :     {
     173              27 :         return source_ != nullptr;
     174                 :     }
     175                 : 
     176                 :     /** Check if the wrapper contains a valid source.
     177                 : 
     178                 :         @return `true` if wrapping a source, `false` if default-constructed
     179                 :             or moved-from.
     180                 :     */
     181                 :     explicit
     182               8 :     operator bool() const noexcept
     183                 :     {
     184               8 :         return has_value();
     185                 :     }
     186                 : 
     187                 :     /** Initiate a partial read operation.
     188                 : 
     189                 :         Reads one or more bytes into the provided buffer sequence.
     190                 :         May fill less than the full sequence.
     191                 : 
     192                 :         @param buffers The buffer sequence to read into.
     193                 : 
     194                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     195                 : 
     196                 :         @par Immediate Completion
     197                 :         The operation completes immediately without suspending
     198                 :         the calling coroutine when:
     199                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     200                 :         @li The underlying source's awaitable reports immediate
     201                 :             readiness via `await_ready`.
     202                 : 
     203                 :         @note This is a partial operation and may not process the
     204                 :         entire buffer sequence. Use @ref read for guaranteed
     205                 :         complete transfer.
     206                 : 
     207                 :         @par Preconditions
     208                 :         The wrapper must contain a valid source (`has_value() == true`).
     209                 :         The caller must not call this function again after a prior
     210                 :         call returned an error (including EOF).
     211                 :     */
     212                 :     template<MutableBufferSequence MB>
     213                 :     auto
     214                 :     read_some(MB buffers);
     215                 : 
     216                 :     /** Initiate a complete read operation.
     217                 : 
     218                 :         Reads data into the provided buffer sequence by forwarding
     219                 :         to the underlying source's `read` operation. Large buffer
     220                 :         sequences are processed in windows, with each window
     221                 :         forwarded as a separate `read` call to the underlying source.
     222                 :         The operation completes when the entire buffer sequence is
     223                 :         filled, end-of-file is reached, or an error occurs.
     224                 : 
     225                 :         @param buffers The buffer sequence to read into.
     226                 : 
     227                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     228                 : 
     229                 :         @par Immediate Completion
     230                 :         The operation completes immediately without suspending
     231                 :         the calling coroutine when:
     232                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     233                 :         @li The underlying source's `read` awaitable reports
     234                 :             immediate readiness via `await_ready`.
     235                 : 
     236                 :         @par Postconditions
     237                 :         Exactly one of the following is true on return:
     238                 :         @li **Success**: `!ec` and `n == buffer_size(buffers)`.
     239                 :             The entire buffer was filled.
     240                 :         @li **End-of-stream or Error**: `ec` and `n` indicates
     241                 :             the number of bytes transferred before the failure.
     242                 : 
     243                 :         @par Preconditions
     244                 :         The wrapper must contain a valid source (`has_value() == true`).
     245                 :         The caller must not call this function again after a prior
     246                 :         call returned an error (including EOF).
     247                 :     */
     248                 :     template<MutableBufferSequence MB>
     249                 :     io_task<std::size_t>
     250                 :     read(MB buffers);
     251                 : 
     252                 : protected:
     253                 :     /** Rebind to a new source after move.
     254                 : 
     255                 :         Updates the internal pointer to reference a new source object.
     256                 :         Used by owning wrappers after move assignment when the owned
     257                 :         object has moved to a new location.
     258                 : 
     259                 :         @param new_source The new source to bind to. Must be the same
     260                 :             type as the original source.
     261                 : 
     262                 :         @note Terminates if called with a source of different type
     263                 :             than the original.
     264                 :     */
     265                 :     template<ReadSource S>
     266                 :     void
     267                 :     rebind(S& new_source) noexcept
     268                 :     {
     269                 :         if(vt_ != &vtable_for_impl<S>::value)
     270                 :             std::terminate();
     271                 :         source_ = &new_source;
     272                 :     }
     273                 : 
     274                 : private:
     275                 :     auto
     276                 :     read_(std::span<mutable_buffer const> buffers);
     277                 : };
     278                 : 
     279                 : //----------------------------------------------------------
     280                 : 
     281                 : // ordered by call sequence for cache line coherence
     282                 : struct any_read_source::awaitable_ops
     283                 : {
     284                 :     bool (*await_ready)(void*);
     285                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     286                 :     io_result<std::size_t> (*await_resume)(void*);
     287                 :     void (*destroy)(void*) noexcept;
     288                 : };
     289                 : 
     290                 : // ordered by call frequency for cache line coherence
     291                 : struct any_read_source::vtable
     292                 : {
     293                 :     awaitable_ops const* (*construct_read_some_awaitable)(
     294                 :         void* source,
     295                 :         void* storage,
     296                 :         std::span<mutable_buffer const> buffers);
     297                 :     awaitable_ops const* (*construct_read_awaitable)(
     298                 :         void* source,
     299                 :         void* storage,
     300                 :         std::span<mutable_buffer const> buffers);
     301                 :     std::size_t awaitable_size;
     302                 :     std::size_t awaitable_align;
     303                 :     void (*destroy)(void*) noexcept;
     304                 : };
     305                 : 
     306                 : template<ReadSource S>
     307                 : struct any_read_source::vtable_for_impl
     308                 : {
     309                 :     using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
     310                 :         std::span<mutable_buffer const>{}));
     311                 :     using ReadAwaitable = decltype(std::declval<S&>().read(
     312                 :         std::span<mutable_buffer const>{}));
     313                 : 
     314                 :     static void
     315               6 :     do_destroy_impl(void* source) noexcept
     316                 :     {
     317               6 :         static_cast<S*>(source)->~S();
     318               6 :     }
     319                 : 
     320                 :     static awaitable_ops const*
     321              52 :     construct_read_some_awaitable_impl(
     322                 :         void* source,
     323                 :         void* storage,
     324                 :         std::span<mutable_buffer const> buffers)
     325                 :     {
     326              52 :         auto& s = *static_cast<S*>(source);
     327              52 :         ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
     328                 : 
     329                 :         static constexpr awaitable_ops ops = {
     330              52 :             +[](void* p) {
     331              52 :                 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
     332                 :             },
     333               2 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     334               2 :                 return detail::call_await_suspend(
     335               2 :                     static_cast<ReadSomeAwaitable*>(p), h, env);
     336                 :             },
     337              50 :             +[](void* p) {
     338              50 :                 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
     339                 :             },
     340              54 :             +[](void* p) noexcept {
     341               2 :                 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
     342                 :             }
     343                 :         };
     344              52 :         return &ops;
     345                 :     }
     346                 : 
     347                 :     static awaitable_ops const*
     348             116 :     construct_read_awaitable_impl(
     349                 :         void* source,
     350                 :         void* storage,
     351                 :         std::span<mutable_buffer const> buffers)
     352                 :     {
     353             116 :         auto& s = *static_cast<S*>(source);
     354             116 :         ::new(storage) ReadAwaitable(s.read(buffers));
     355                 : 
     356                 :         static constexpr awaitable_ops ops = {
     357             116 :             +[](void* p) {
     358             116 :                 return static_cast<ReadAwaitable*>(p)->await_ready();
     359                 :             },
     360 MIS           0 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     361               0 :                 return detail::call_await_suspend(
     362               0 :                     static_cast<ReadAwaitable*>(p), h, env);
     363                 :             },
     364 HIT         116 :             +[](void* p) {
     365             116 :                 return static_cast<ReadAwaitable*>(p)->await_resume();
     366                 :             },
     367             116 :             +[](void* p) noexcept {
     368 MIS           0 :                 static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
     369                 :             }
     370                 :         };
     371 HIT         116 :         return &ops;
     372                 :     }
     373                 : 
     374                 :     static constexpr std::size_t max_awaitable_size =
     375                 :         sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
     376                 :             ? sizeof(ReadSomeAwaitable)
     377                 :             : sizeof(ReadAwaitable);
     378                 :     static constexpr std::size_t max_awaitable_align =
     379                 :         alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
     380                 :             ? alignof(ReadSomeAwaitable)
     381                 :             : alignof(ReadAwaitable);
     382                 : 
     383                 :     static constexpr vtable value = {
     384                 :         &construct_read_some_awaitable_impl,
     385                 :         &construct_read_awaitable_impl,
     386                 :         max_awaitable_size,
     387                 :         max_awaitable_align,
     388                 :         &do_destroy_impl
     389                 :     };
     390                 : };
     391                 : 
     392                 : //----------------------------------------------------------
     393                 : 
     394                 : inline
     395             145 : any_read_source::~any_read_source()
     396                 : {
     397             145 :     if(storage_)
     398                 :     {
     399               6 :         vt_->destroy(source_);
     400               6 :         ::operator delete(storage_);
     401                 :     }
     402             145 :     if(cached_awaitable_)
     403                 :     {
     404             139 :         if(active_ops_)
     405               1 :             active_ops_->destroy(cached_awaitable_);
     406             139 :         ::operator delete(cached_awaitable_);
     407                 :     }
     408             145 : }
     409                 : 
     410                 : inline any_read_source&
     411               4 : any_read_source::operator=(any_read_source&& other) noexcept
     412                 : {
     413               4 :     if(this != &other)
     414                 :     {
     415               3 :         if(storage_)
     416                 :         {
     417 MIS           0 :             vt_->destroy(source_);
     418               0 :             ::operator delete(storage_);
     419                 :         }
     420 HIT           3 :         if(cached_awaitable_)
     421                 :         {
     422               2 :             if(active_ops_)
     423               1 :                 active_ops_->destroy(cached_awaitable_);
     424               2 :             ::operator delete(cached_awaitable_);
     425                 :         }
     426               3 :         source_ = std::exchange(other.source_, nullptr);
     427               3 :         vt_ = std::exchange(other.vt_, nullptr);
     428               3 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     429               3 :         storage_ = std::exchange(other.storage_, nullptr);
     430               3 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     431                 :     }
     432               4 :     return *this;
     433                 : }
     434                 : 
     435                 : template<ReadSource S>
     436                 :     requires (!std::same_as<std::decay_t<S>, any_read_source>)
     437               6 : any_read_source::any_read_source(S s)
     438               6 :     : vt_(&vtable_for_impl<S>::value)
     439                 : {
     440                 :     struct guard {
     441                 :         any_read_source* self;
     442                 :         bool committed = false;
     443               6 :         ~guard() {
     444               6 :             if(!committed && self->storage_) {
     445 MIS           0 :                 self->vt_->destroy(self->source_);
     446               0 :                 ::operator delete(self->storage_);
     447               0 :                 self->storage_ = nullptr;
     448               0 :                 self->source_ = nullptr;
     449                 :             }
     450 HIT           6 :         }
     451               6 :     } g{this};
     452                 : 
     453               6 :     storage_ = ::operator new(sizeof(S));
     454               6 :     source_ = ::new(storage_) S(std::move(s));
     455                 : 
     456                 :     // Preallocate the awaitable storage
     457               6 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     458                 : 
     459               6 :     g.committed = true;
     460               6 : }
     461                 : 
     462                 : template<ReadSource S>
     463             135 : any_read_source::any_read_source(S* s)
     464             135 :     : source_(s)
     465             135 :     , vt_(&vtable_for_impl<S>::value)
     466                 : {
     467                 :     // Preallocate the awaitable storage
     468             135 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     469             135 : }
     470                 : 
     471                 : //----------------------------------------------------------
     472                 : 
     473                 : template<MutableBufferSequence MB>
     474                 : auto
     475              54 : any_read_source::read_some(MB buffers)
     476                 : {
     477                 :     struct awaitable
     478                 :     {
     479                 :         any_read_source* self_;
     480                 :         mutable_buffer_array<detail::max_iovec_> ba_;
     481                 : 
     482              54 :         awaitable(any_read_source* self, MB const& buffers)
     483              54 :             : self_(self)
     484              54 :             , ba_(buffers)
     485                 :         {
     486              54 :         }
     487                 : 
     488                 :         bool
     489              54 :         await_ready() const noexcept
     490                 :         {
     491              54 :             return ba_.to_span().empty();
     492                 :         }
     493                 : 
     494                 :         std::coroutine_handle<>
     495              52 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     496                 :         {
     497              52 :             self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
     498              52 :                 self_->source_,
     499              52 :                 self_->cached_awaitable_,
     500              52 :                 ba_.to_span());
     501                 : 
     502              52 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     503              50 :                 return h;
     504                 : 
     505               2 :             return self_->active_ops_->await_suspend(
     506               2 :                 self_->cached_awaitable_, h, env);
     507                 :         }
     508                 : 
     509                 :         io_result<std::size_t>
     510              52 :         await_resume()
     511                 :         {
     512              52 :             if(ba_.to_span().empty())
     513               2 :                 return {{}, 0};
     514                 : 
     515                 :             struct guard {
     516                 :                 any_read_source* self;
     517              50 :                 ~guard() {
     518              50 :                     self->active_ops_->destroy(self->cached_awaitable_);
     519              50 :                     self->active_ops_ = nullptr;
     520              50 :                 }
     521              50 :             } g{self_};
     522              50 :             return self_->active_ops_->await_resume(
     523              50 :                 self_->cached_awaitable_);
     524              50 :         }
     525                 :     };
     526              54 :     return awaitable(this, buffers);
     527                 : }
     528                 : 
     529                 : inline auto
     530             116 : any_read_source::read_(std::span<mutable_buffer const> buffers)
     531                 : {
     532                 :     struct awaitable
     533                 :     {
     534                 :         any_read_source* self_;
     535                 :         std::span<mutable_buffer const> buffers_;
     536                 : 
     537                 :         bool
     538             116 :         await_ready() const noexcept
     539                 :         {
     540             116 :             return false;
     541                 :         }
     542                 : 
     543                 :         std::coroutine_handle<>
     544             116 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     545                 :         {
     546             232 :             self_->active_ops_ = self_->vt_->construct_read_awaitable(
     547             116 :                 self_->source_,
     548             116 :                 self_->cached_awaitable_,
     549                 :                 buffers_);
     550                 : 
     551             116 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     552             116 :                 return h;
     553                 : 
     554 MIS           0 :             return self_->active_ops_->await_suspend(
     555               0 :                 self_->cached_awaitable_, h, env);
     556                 :         }
     557                 : 
     558                 :         io_result<std::size_t>
     559 HIT         116 :         await_resume()
     560                 :         {
     561                 :             struct guard {
     562                 :                 any_read_source* self;
     563             116 :                 ~guard() {
     564             116 :                     self->active_ops_->destroy(self->cached_awaitable_);
     565             116 :                     self->active_ops_ = nullptr;
     566             116 :                 }
     567             116 :             } g{self_};
     568             116 :             return self_->active_ops_->await_resume(
     569             200 :                 self_->cached_awaitable_);
     570             116 :         }
     571                 :     };
     572             116 :     return awaitable{this, buffers};
     573                 : }
     574                 : 
     575                 : template<MutableBufferSequence MB>
     576                 : io_task<std::size_t>
     577             110 : any_read_source::read(MB buffers)
     578                 : {
     579                 :     buffer_param bp(buffers);
     580                 :     std::size_t total = 0;
     581                 : 
     582                 :     for(;;)
     583                 :     {
     584                 :         auto bufs = bp.data();
     585                 :         if(bufs.empty())
     586                 :             break;
     587                 : 
     588                 :         auto [ec, n] = co_await read_(bufs);
     589                 :         total += n;
     590                 :         if(ec)
     591                 :             co_return {ec, total};
     592                 :         bp.consume(n);
     593                 :     }
     594                 : 
     595                 :     co_return {{}, total};
     596             220 : }
     597                 : 
     598                 : } // namespace capy
     599                 : } // namespace boost
     600                 : 
     601                 : #endif
        

Generated by: LCOV version 2.3