LCOV - code coverage report
Current view: top level - capy/io - any_write_stream.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 94.1 % 101 95 6
Test Date: 2026-03-04 16:33:47 Functions: 91.5 % 59 54 5

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      11                 : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/await_suspend_helper.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/buffers/buffer_array.hpp>
      17                 : #include <boost/capy/concept/io_awaitable.hpp>
      18                 : #include <boost/capy/concept/write_stream.hpp>
      19                 : #include <coroutine>
      20                 : #include <boost/capy/ex/io_env.hpp>
      21                 : #include <boost/capy/io_result.hpp>
      22                 : 
      23                 : #include <concepts>
      24                 : #include <coroutine>
      25                 : #include <cstddef>
      26                 : #include <exception>
      27                 : #include <new>
      28                 : #include <span>
      29                 : #include <stop_token>
      30                 : #include <system_error>
      31                 : #include <utility>
      32                 : 
      33                 : namespace boost {
      34                 : namespace capy {
      35                 : 
      36                 : /** Type-erased wrapper for any WriteStream.
      37                 : 
      38                 :     This class provides type erasure for any type satisfying the
      39                 :     @ref WriteStream concept, enabling runtime polymorphism for
      40                 :     write operations. It uses cached awaitable storage to achieve
      41                 :     zero steady-state allocation after construction.
      42                 : 
      43                 :     The wrapper supports two construction modes:
      44                 :     - **Owning**: Pass by value to transfer ownership. The wrapper
      45                 :       allocates storage and owns the stream.
      46                 :     - **Reference**: Pass a pointer to wrap without ownership. The
      47                 :       pointed-to stream must outlive this wrapper.
      48                 : 
      49                 :     @par Awaitable Preallocation
      50                 :     The constructor preallocates storage for the type-erased awaitable.
      51                 :     This reserves all virtual address space at server startup
      52                 :     so memory usage can be measured up front, rather than
      53                 :     allocating piecemeal as traffic arrives.
      54                 : 
      55                 :     @par Immediate Completion
      56                 :     Operations complete immediately without suspending when the
      57                 :     buffer sequence is empty, or when the underlying stream's
      58                 :     awaitable reports readiness via `await_ready`.
      59                 : 
      60                 :     @par Thread Safety
      61                 :     Not thread-safe. Concurrent operations on the same wrapper
      62                 :     are undefined behavior.
      63                 : 
      64                 :     @par Example
      65                 :     @code
      66                 :     // Owning - takes ownership of the stream
      67                 :     any_write_stream stream(socket{ioc});
      68                 : 
      69                 :     // Reference - wraps without ownership
      70                 :     socket sock(ioc);
      71                 :     any_write_stream stream(&sock);
      72                 : 
      73                 :     const_buffer buf(data, size);
      74                 :     auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
      75                 :     @endcode
      76                 : 
      77                 :     @see any_read_stream, any_stream, WriteStream
      78                 : */
      79                 : class any_write_stream
      80                 : {
      81                 :     struct vtable;
      82                 : 
      83                 :     template<WriteStream S>
      84                 :     struct vtable_for_impl;
      85                 : 
      86                 :     // ordered for cache line coherence
      87                 :     void* stream_ = nullptr;
      88                 :     vtable const* vt_ = nullptr;
      89                 :     void* cached_awaitable_ = nullptr;
      90                 :     void* storage_ = nullptr;
      91                 :     bool awaitable_active_ = false;
      92                 : 
      93                 : public:
      94                 :     /** Destructor.
      95                 : 
      96                 :         Destroys the owned stream (if any) and releases the cached
      97                 :         awaitable storage.
      98                 :     */
      99                 :     ~any_write_stream();
     100                 : 
     101                 :     /** Default constructor.
     102                 : 
     103                 :         Constructs an empty wrapper. Operations on a default-constructed
     104                 :         wrapper result in undefined behavior.
     105                 :     */
     106 HIT           1 :     any_write_stream() = default;
     107                 : 
     108                 :     /** Non-copyable.
     109                 : 
     110                 :         The awaitable cache is per-instance and cannot be shared.
     111                 :     */
     112                 :     any_write_stream(any_write_stream const&) = delete;
     113                 :     any_write_stream& operator=(any_write_stream const&) = delete;
     114                 : 
     115                 :     /** Move constructor.
     116                 : 
     117                 :         Transfers ownership of the wrapped stream (if owned) and
     118                 :         cached awaitable storage from `other`. After the move, `other` is
     119                 :         in a default-constructed state.
     120                 : 
     121                 :         @param other The wrapper to move from.
     122                 :     */
     123               2 :     any_write_stream(any_write_stream&& other) noexcept
     124               2 :         : stream_(std::exchange(other.stream_, nullptr))
     125               2 :         , vt_(std::exchange(other.vt_, nullptr))
     126               2 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     127               2 :         , storage_(std::exchange(other.storage_, nullptr))
     128               2 :         , awaitable_active_(std::exchange(other.awaitable_active_, false))
     129                 :     {
     130               2 :     }
     131                 : 
     132                 :     /** Move assignment operator.
     133                 : 
     134                 :         Destroys any owned stream and releases existing resources,
     135                 :         then transfers ownership from `other`.
     136                 : 
     137                 :         @param other The wrapper to move from.
     138                 :         @return Reference to this wrapper.
     139                 :     */
     140                 :     any_write_stream&
     141                 :     operator=(any_write_stream&& other) noexcept;
     142                 : 
     143                 :     /** Construct by taking ownership of a WriteStream.
     144                 : 
     145                 :         Allocates storage and moves the stream into this wrapper.
     146                 :         The wrapper owns the stream and will destroy it.
     147                 : 
     148                 :         @param s The stream to take ownership of.
     149                 :     */
     150                 :     template<WriteStream S>
     151                 :         requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     152                 :     any_write_stream(S s);
     153                 : 
     154                 :     /** Construct by wrapping a WriteStream without ownership.
     155                 : 
     156                 :         Wraps the given stream by pointer. The stream must remain
     157                 :         valid for the lifetime of this wrapper.
     158                 : 
     159                 :         @param s Pointer to the stream to wrap.
     160                 :     */
     161                 :     template<WriteStream S>
     162                 :     any_write_stream(S* s);
     163                 : 
     164                 :     /** Check if the wrapper contains a valid stream.
     165                 : 
     166                 :         @return `true` if wrapping a stream, `false` if default-constructed
     167                 :             or moved-from.
     168                 :     */
     169                 :     bool
     170              21 :     has_value() const noexcept
     171                 :     {
     172              21 :         return stream_ != nullptr;
     173                 :     }
     174                 : 
     175                 :     /** Check if the wrapper contains a valid stream.
     176                 : 
     177                 :         @return `true` if wrapping a stream, `false` if default-constructed
     178                 :             or moved-from.
     179                 :     */
     180                 :     explicit
     181               3 :     operator bool() const noexcept
     182                 :     {
     183               3 :         return has_value();
     184                 :     }
     185                 : 
     186                 :     /** Initiate an asynchronous write operation.
     187                 : 
     188                 :         Writes data from the provided buffer sequence. The operation
     189                 :         completes when at least one byte has been written, or an error
     190                 :         occurs.
     191                 : 
     192                 :         @param buffers The buffer sequence containing data to write.
     193                 :             Passed by value to ensure the sequence lives in the
     194                 :             coroutine frame across suspension points.
     195                 : 
     196                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     197                 : 
     198                 :         @par Immediate Completion
     199                 :         The operation completes immediately without suspending
     200                 :         the calling coroutine when:
     201                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     202                 :         @li The underlying stream's awaitable reports immediate
     203                 :             readiness via `await_ready`.
     204                 : 
     205                 :         @note This is a partial operation and may not process the
     206                 :         entire buffer sequence. Use the composed @ref write algorithm
     207                 :         for guaranteed complete transfer.
     208                 : 
     209                 :         @par Preconditions
     210                 :         The wrapper must contain a valid stream (`has_value() == true`).
     211                 :     */
     212                 :     template<ConstBufferSequence CB>
     213                 :     auto
     214                 :     write_some(CB buffers);
     215                 : 
     216                 : protected:
     217                 :     /** Rebind to a new stream after move.
     218                 : 
     219                 :         Updates the internal pointer to reference a new stream object.
     220                 :         Used by owning wrappers after move assignment when the owned
     221                 :         object has moved to a new location.
     222                 : 
     223                 :         @param new_stream The new stream to bind to. Must be the same
     224                 :             type as the original stream.
     225                 : 
     226                 :         @note Terminates if called with a stream of different type
     227                 :             than the original.
     228                 :     */
     229                 :     template<WriteStream S>
     230                 :     void
     231                 :     rebind(S& new_stream) noexcept
     232                 :     {
     233                 :         if(vt_ != &vtable_for_impl<S>::value)
     234                 :             std::terminate();
     235                 :         stream_ = &new_stream;
     236                 :     }
     237                 : };
     238                 : 
     239                 : //----------------------------------------------------------
     240                 : 
     241                 : struct any_write_stream::vtable
     242                 : {
     243                 :     // ordered by call frequency for cache line coherence
     244                 :     void (*construct_awaitable)(
     245                 :         void* stream,
     246                 :         void* storage,
     247                 :         std::span<const_buffer const> buffers);
     248                 :     bool (*await_ready)(void*);
     249                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     250                 :     io_result<std::size_t> (*await_resume)(void*);
     251                 :     void (*destroy_awaitable)(void*) noexcept;
     252                 :     std::size_t awaitable_size;
     253                 :     std::size_t awaitable_align;
     254                 :     void (*destroy)(void*) noexcept;
     255                 : };
     256                 : 
     257                 : template<WriteStream S>
     258                 : struct any_write_stream::vtable_for_impl
     259                 : {
     260                 :     using Awaitable = decltype(std::declval<S&>().write_some(
     261                 :         std::span<const_buffer const>{}));
     262                 : 
     263                 :     static void
     264               1 :     do_destroy_impl(void* stream) noexcept
     265                 :     {
     266               1 :         static_cast<S*>(stream)->~S();
     267               1 :     }
     268                 : 
     269                 :     static void
     270              75 :     construct_awaitable_impl(
     271                 :         void* stream,
     272                 :         void* storage,
     273                 :         std::span<const_buffer const> buffers)
     274                 :     {
     275              75 :         auto& s = *static_cast<S*>(stream);
     276              75 :         ::new(storage) Awaitable(s.write_some(buffers));
     277              75 :     }
     278                 : 
     279                 :     static constexpr vtable value = {
     280                 :         &construct_awaitable_impl,
     281              75 :         +[](void* p) {
     282              75 :             return static_cast<Awaitable*>(p)->await_ready();
     283                 :         },
     284               2 :         +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     285               2 :             return detail::call_await_suspend(
     286               2 :                 static_cast<Awaitable*>(p), h, env);
     287                 :         },
     288              73 :         +[](void* p) {
     289              73 :             return static_cast<Awaitable*>(p)->await_resume();
     290                 :         },
     291              77 :         +[](void* p) noexcept {
     292              12 :             static_cast<Awaitable*>(p)->~Awaitable();
     293                 :         },
     294                 :         sizeof(Awaitable),
     295                 :         alignof(Awaitable),
     296                 :         &do_destroy_impl
     297                 :     };
     298                 : };
     299                 : 
     300                 : //----------------------------------------------------------
     301                 : 
     302                 : inline
     303              95 : any_write_stream::~any_write_stream()
     304                 : {
     305              95 :     if(storage_)
     306                 :     {
     307               1 :         vt_->destroy(stream_);
     308               1 :         ::operator delete(storage_);
     309                 :     }
     310              95 :     if(cached_awaitable_)
     311                 :     {
     312              85 :         if(awaitable_active_)
     313               1 :             vt_->destroy_awaitable(cached_awaitable_);
     314              85 :         ::operator delete(cached_awaitable_);
     315                 :     }
     316              95 : }
     317                 : 
     318                 : inline any_write_stream&
     319               5 : any_write_stream::operator=(any_write_stream&& other) noexcept
     320                 : {
     321               5 :     if(this != &other)
     322                 :     {
     323               5 :         if(storage_)
     324                 :         {
     325 MIS           0 :             vt_->destroy(stream_);
     326               0 :             ::operator delete(storage_);
     327                 :         }
     328 HIT           5 :         if(cached_awaitable_)
     329                 :         {
     330               2 :             if(awaitable_active_)
     331               1 :                 vt_->destroy_awaitable(cached_awaitable_);
     332               2 :             ::operator delete(cached_awaitable_);
     333                 :         }
     334               5 :         stream_ = std::exchange(other.stream_, nullptr);
     335               5 :         vt_ = std::exchange(other.vt_, nullptr);
     336               5 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     337               5 :         storage_ = std::exchange(other.storage_, nullptr);
     338               5 :         awaitable_active_ = std::exchange(other.awaitable_active_, false);
     339                 :     }
     340               5 :     return *this;
     341                 : }
     342                 : 
     343                 : template<WriteStream S>
     344                 :     requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     345               1 : any_write_stream::any_write_stream(S s)
     346               1 :     : vt_(&vtable_for_impl<S>::value)
     347                 : {
     348                 :     struct guard {
     349                 :         any_write_stream* self;
     350                 :         bool committed = false;
     351               1 :         ~guard() {
     352               1 :             if(!committed && self->storage_) {
     353 MIS           0 :                 self->vt_->destroy(self->stream_);
     354               0 :                 ::operator delete(self->storage_);
     355               0 :                 self->storage_ = nullptr;
     356               0 :                 self->stream_ = nullptr;
     357                 :             }
     358 HIT           1 :         }
     359               1 :     } g{this};
     360                 : 
     361               1 :     storage_ = ::operator new(sizeof(S));
     362               1 :     stream_ = ::new(storage_) S(std::move(s));
     363                 : 
     364                 :     // Preallocate the awaitable storage
     365               1 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     366                 : 
     367               1 :     g.committed = true;
     368               1 : }
     369                 : 
     370                 : template<WriteStream S>
     371              86 : any_write_stream::any_write_stream(S* s)
     372              86 :     : stream_(s)
     373              86 :     , vt_(&vtable_for_impl<S>::value)
     374                 : {
     375                 :     // Preallocate the awaitable storage
     376              86 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     377              86 : }
     378                 : 
     379                 : //----------------------------------------------------------
     380                 : 
     381                 : template<ConstBufferSequence CB>
     382                 : auto
     383              79 : any_write_stream::write_some(CB buffers)
     384                 : {
     385                 :     struct awaitable
     386                 :     {
     387                 :         any_write_stream* self_;
     388                 :         const_buffer_array<detail::max_iovec_> ba_;
     389                 : 
     390              79 :         awaitable(
     391                 :             any_write_stream* self,
     392                 :             CB const& buffers) noexcept
     393              79 :             : self_(self)
     394              79 :             , ba_(buffers)
     395                 :         {
     396              79 :         }
     397                 : 
     398                 :         bool
     399              79 :         await_ready() const noexcept
     400                 :         {
     401              79 :             return ba_.to_span().empty();
     402                 :         }
     403                 : 
     404                 :         std::coroutine_handle<>
     405              75 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     406                 :         {
     407              75 :             self_->vt_->construct_awaitable(
     408              75 :                 self_->stream_,
     409              75 :                 self_->cached_awaitable_,
     410              75 :                 ba_.to_span());
     411              75 :             self_->awaitable_active_ = true;
     412                 : 
     413              75 :             if(self_->vt_->await_ready(self_->cached_awaitable_))
     414              73 :                 return h;
     415                 : 
     416               2 :             return self_->vt_->await_suspend(
     417               2 :                 self_->cached_awaitable_, h, env);
     418                 :         }
     419                 : 
     420                 :         io_result<std::size_t>
     421              77 :         await_resume()
     422                 :         {
     423              77 :             if(!self_->awaitable_active_)
     424               4 :                 return {{}, 0};
     425                 :             struct guard {
     426                 :                 any_write_stream* self;
     427              73 :                 ~guard() {
     428              73 :                     self->vt_->destroy_awaitable(self->cached_awaitable_);
     429              73 :                     self->awaitable_active_ = false;
     430              73 :                 }
     431              73 :             } g{self_};
     432              73 :             return self_->vt_->await_resume(
     433              73 :                 self_->cached_awaitable_);
     434              73 :         }
     435                 :     };
     436              79 :     return awaitable{this, buffers};
     437                 : }
     438                 : 
     439                 : } // namespace capy
     440                 : } // namespace boost
     441                 : 
     442                 : #endif
        

Generated by: LCOV version 2.3