include/boost/capy/io/any_write_stream.hpp

94.1% Lines (95/101) 85.7% Functions (30/35)
include/boost/capy/io/any_write_stream.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <coroutine>
20 #include <boost/capy/ex/io_env.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <concepts>
24 #include <coroutine>
25 #include <cstddef>
26 #include <exception>
27 #include <new>
28 #include <span>
29 #include <stop_token>
30 #include <system_error>
31 #include <utility>
32
33 namespace boost {
34 namespace capy {
35
36 /** Type-erased wrapper for any WriteStream.
37
38 This class provides type erasure for any type satisfying the
39 @ref WriteStream concept, enabling runtime polymorphism for
40 write operations. It uses cached awaitable storage to achieve
41 zero steady-state allocation after construction.
42
43 The wrapper supports two construction modes:
44 - **Owning**: Pass by value to transfer ownership. The wrapper
45 allocates storage and owns the stream.
46 - **Reference**: Pass a pointer to wrap without ownership. The
47 pointed-to stream must outlive this wrapper.
48
49 @par Awaitable Preallocation
50 The constructor preallocates storage for the type-erased awaitable.
51 This reserves all virtual address space at server startup
52 so memory usage can be measured up front, rather than
53 allocating piecemeal as traffic arrives.
54
55 @par Immediate Completion
56 Operations complete immediately without suspending when the
57 buffer sequence is empty, or when the underlying stream's
58 awaitable reports readiness via `await_ready`.
59
60 @par Thread Safety
61 Not thread-safe. Concurrent operations on the same wrapper
62 are undefined behavior.
63
64 @par Example
65 @code
66 // Owning - takes ownership of the stream
67 any_write_stream stream(socket{ioc});
68
69 // Reference - wraps without ownership
70 socket sock(ioc);
71 any_write_stream stream(&sock);
72
73 const_buffer buf(data, size);
74 auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
75 @endcode
76
77 @see any_read_stream, any_stream, WriteStream
78 */
79 class any_write_stream
80 {
81 struct vtable;
82
83 template<WriteStream S>
84 struct vtable_for_impl;
85
86 // ordered for cache line coherence
87 void* stream_ = nullptr;
88 vtable const* vt_ = nullptr;
89 void* cached_awaitable_ = nullptr;
90 void* storage_ = nullptr;
91 bool awaitable_active_ = false;
92
93 public:
94 /** Destructor.
95
96 Destroys the owned stream (if any) and releases the cached
97 awaitable storage.
98 */
99 ~any_write_stream();
100
101 /** Default constructor.
102
103 Constructs an empty wrapper. Operations on a default-constructed
104 wrapper result in undefined behavior.
105 */
106 1 any_write_stream() = default;
107
108 /** Non-copyable.
109
110 The awaitable cache is per-instance and cannot be shared.
111 */
112 any_write_stream(any_write_stream const&) = delete;
113 any_write_stream& operator=(any_write_stream const&) = delete;
114
115 /** Move constructor.
116
117 Transfers ownership of the wrapped stream (if owned) and
118 cached awaitable storage from `other`. After the move, `other` is
119 in a default-constructed state.
120
121 @param other The wrapper to move from.
122 */
123 2 any_write_stream(any_write_stream&& other) noexcept
124 2 : stream_(std::exchange(other.stream_, nullptr))
125 2 , vt_(std::exchange(other.vt_, nullptr))
126 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127 2 , storage_(std::exchange(other.storage_, nullptr))
128 2 , awaitable_active_(std::exchange(other.awaitable_active_, false))
129 {
130 2 }
131
132 /** Move assignment operator.
133
134 Destroys any owned stream and releases existing resources,
135 then transfers ownership from `other`.
136
137 @param other The wrapper to move from.
138 @return Reference to this wrapper.
139 */
140 any_write_stream&
141 operator=(any_write_stream&& other) noexcept;
142
143 /** Construct by taking ownership of a WriteStream.
144
145 Allocates storage and moves the stream into this wrapper.
146 The wrapper owns the stream and will destroy it.
147
148 @param s The stream to take ownership of.
149 */
150 template<WriteStream S>
151 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
152 any_write_stream(S s);
153
154 /** Construct by wrapping a WriteStream without ownership.
155
156 Wraps the given stream by pointer. The stream must remain
157 valid for the lifetime of this wrapper.
158
159 @param s Pointer to the stream to wrap.
160 */
161 template<WriteStream S>
162 any_write_stream(S* s);
163
164 /** Check if the wrapper contains a valid stream.
165
166 @return `true` if wrapping a stream, `false` if default-constructed
167 or moved-from.
168 */
169 bool
170 21 has_value() const noexcept
171 {
172 21 return stream_ != nullptr;
173 }
174
175 /** Check if the wrapper contains a valid stream.
176
177 @return `true` if wrapping a stream, `false` if default-constructed
178 or moved-from.
179 */
180 explicit
181 3 operator bool() const noexcept
182 {
183 3 return has_value();
184 }
185
186 /** Initiate an asynchronous write operation.
187
188 Writes data from the provided buffer sequence. The operation
189 completes when at least one byte has been written, or an error
190 occurs.
191
192 @param buffers The buffer sequence containing data to write.
193 Passed by value to ensure the sequence lives in the
194 coroutine frame across suspension points.
195
196 @return An awaitable yielding `(error_code,std::size_t)`.
197
198 @par Immediate Completion
199 The operation completes immediately without suspending
200 the calling coroutine when:
201 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
202 @li The underlying stream's awaitable reports immediate
203 readiness via `await_ready`.
204
205 @note This is a partial operation and may not process the
206 entire buffer sequence. Use the composed @ref write algorithm
207 for guaranteed complete transfer.
208
209 @par Preconditions
210 The wrapper must contain a valid stream (`has_value() == true`).
211 */
212 template<ConstBufferSequence CB>
213 auto
214 write_some(CB buffers);
215
216 protected:
217 /** Rebind to a new stream after move.
218
219 Updates the internal pointer to reference a new stream object.
220 Used by owning wrappers after move assignment when the owned
221 object has moved to a new location.
222
223 @param new_stream The new stream to bind to. Must be the same
224 type as the original stream.
225
226 @note Terminates if called with a stream of different type
227 than the original.
228 */
229 template<WriteStream S>
230 void
231 rebind(S& new_stream) noexcept
232 {
233 if(vt_ != &vtable_for_impl<S>::value)
234 std::terminate();
235 stream_ = &new_stream;
236 }
237 };
238
239 //----------------------------------------------------------
240
241 struct any_write_stream::vtable
242 {
243 // ordered by call frequency for cache line coherence
244 void (*construct_awaitable)(
245 void* stream,
246 void* storage,
247 std::span<const_buffer const> buffers);
248 bool (*await_ready)(void*);
249 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
250 io_result<std::size_t> (*await_resume)(void*);
251 void (*destroy_awaitable)(void*) noexcept;
252 std::size_t awaitable_size;
253 std::size_t awaitable_align;
254 void (*destroy)(void*) noexcept;
255 };
256
257 template<WriteStream S>
258 struct any_write_stream::vtable_for_impl
259 {
260 using Awaitable = decltype(std::declval<S&>().write_some(
261 std::span<const_buffer const>{}));
262
263 static void
264 1 do_destroy_impl(void* stream) noexcept
265 {
266 1 static_cast<S*>(stream)->~S();
267 1 }
268
269 static void
270 75 construct_awaitable_impl(
271 void* stream,
272 void* storage,
273 std::span<const_buffer const> buffers)
274 {
275 75 auto& s = *static_cast<S*>(stream);
276 75 ::new(storage) Awaitable(s.write_some(buffers));
277 75 }
278
279 static constexpr vtable value = {
280 &construct_awaitable_impl,
281 75 +[](void* p) {
282 75 return static_cast<Awaitable*>(p)->await_ready();
283 },
284 2 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
285 2 return detail::call_await_suspend(
286 2 static_cast<Awaitable*>(p), h, env);
287 },
288 73 +[](void* p) {
289 73 return static_cast<Awaitable*>(p)->await_resume();
290 },
291 77 +[](void* p) noexcept {
292 12 static_cast<Awaitable*>(p)->~Awaitable();
293 },
294 sizeof(Awaitable),
295 alignof(Awaitable),
296 &do_destroy_impl
297 };
298 };
299
300 //----------------------------------------------------------
301
302 inline
303 95 any_write_stream::~any_write_stream()
304 {
305 95 if(storage_)
306 {
307 1 vt_->destroy(stream_);
308 1 ::operator delete(storage_);
309 }
310 95 if(cached_awaitable_)
311 {
312 85 if(awaitable_active_)
313 1 vt_->destroy_awaitable(cached_awaitable_);
314 85 ::operator delete(cached_awaitable_);
315 }
316 95 }
317
318 inline any_write_stream&
319 5 any_write_stream::operator=(any_write_stream&& other) noexcept
320 {
321 5 if(this != &other)
322 {
323 5 if(storage_)
324 {
325 vt_->destroy(stream_);
326 ::operator delete(storage_);
327 }
328 5 if(cached_awaitable_)
329 {
330 2 if(awaitable_active_)
331 1 vt_->destroy_awaitable(cached_awaitable_);
332 2 ::operator delete(cached_awaitable_);
333 }
334 5 stream_ = std::exchange(other.stream_, nullptr);
335 5 vt_ = std::exchange(other.vt_, nullptr);
336 5 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
337 5 storage_ = std::exchange(other.storage_, nullptr);
338 5 awaitable_active_ = std::exchange(other.awaitable_active_, false);
339 }
340 5 return *this;
341 }
342
343 template<WriteStream S>
344 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
345 1 any_write_stream::any_write_stream(S s)
346 1 : vt_(&vtable_for_impl<S>::value)
347 {
348 struct guard {
349 any_write_stream* self;
350 bool committed = false;
351 1 ~guard() {
352 1 if(!committed && self->storage_) {
353 self->vt_->destroy(self->stream_);
354 ::operator delete(self->storage_);
355 self->storage_ = nullptr;
356 self->stream_ = nullptr;
357 }
358 1 }
359 1 } g{this};
360
361 1 storage_ = ::operator new(sizeof(S));
362 1 stream_ = ::new(storage_) S(std::move(s));
363
364 // Preallocate the awaitable storage
365 1 cached_awaitable_ = ::operator new(vt_->awaitable_size);
366
367 1 g.committed = true;
368 1 }
369
370 template<WriteStream S>
371 86 any_write_stream::any_write_stream(S* s)
372 86 : stream_(s)
373 86 , vt_(&vtable_for_impl<S>::value)
374 {
375 // Preallocate the awaitable storage
376 86 cached_awaitable_ = ::operator new(vt_->awaitable_size);
377 86 }
378
379 //----------------------------------------------------------
380
381 template<ConstBufferSequence CB>
382 auto
383 79 any_write_stream::write_some(CB buffers)
384 {
385 struct awaitable
386 {
387 any_write_stream* self_;
388 const_buffer_array<detail::max_iovec_> ba_;
389
390 79 awaitable(
391 any_write_stream* self,
392 CB const& buffers) noexcept
393 79 : self_(self)
394 79 , ba_(buffers)
395 {
396 79 }
397
398 bool
399 79 await_ready() const noexcept
400 {
401 79 return ba_.to_span().empty();
402 }
403
404 std::coroutine_handle<>
405 75 await_suspend(std::coroutine_handle<> h, io_env const* env)
406 {
407 75 self_->vt_->construct_awaitable(
408 75 self_->stream_,
409 75 self_->cached_awaitable_,
410 75 ba_.to_span());
411 75 self_->awaitable_active_ = true;
412
413 75 if(self_->vt_->await_ready(self_->cached_awaitable_))
414 73 return h;
415
416 2 return self_->vt_->await_suspend(
417 2 self_->cached_awaitable_, h, env);
418 }
419
420 io_result<std::size_t>
421 77 await_resume()
422 {
423 77 if(!self_->awaitable_active_)
424 4 return {{}, 0};
425 struct guard {
426 any_write_stream* self;
427 73 ~guard() {
428 73 self->vt_->destroy_awaitable(self->cached_awaitable_);
429 73 self->awaitable_active_ = false;
430 73 }
431 73 } g{self_};
432 73 return self_->vt_->await_resume(
433 73 self_->cached_awaitable_);
434 73 }
435 };
436 79 return awaitable{this, buffers};
437 }
438
439 } // namespace capy
440 } // namespace boost
441
442 #endif
443