TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_array.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/write_stream.hpp>
19 : #include <coroutine>
20 : #include <boost/capy/ex/io_env.hpp>
21 : #include <boost/capy/io_result.hpp>
22 :
23 : #include <concepts>
24 : #include <coroutine>
25 : #include <cstddef>
26 : #include <exception>
27 : #include <new>
28 : #include <span>
29 : #include <stop_token>
30 : #include <system_error>
31 : #include <utility>
32 :
33 : namespace boost {
34 : namespace capy {
35 :
36 : /** Type-erased wrapper for any WriteStream.
37 :
38 : This class provides type erasure for any type satisfying the
39 : @ref WriteStream concept, enabling runtime polymorphism for
40 : write operations. It uses cached awaitable storage to achieve
41 : zero steady-state allocation after construction.
42 :
43 : The wrapper supports two construction modes:
44 : - **Owning**: Pass by value to transfer ownership. The wrapper
45 : allocates storage and owns the stream.
46 : - **Reference**: Pass a pointer to wrap without ownership. The
47 : pointed-to stream must outlive this wrapper.
48 :
49 : @par Awaitable Preallocation
50 : The constructor preallocates storage for the type-erased awaitable.
51 : This reserves all virtual address space at server startup
52 : so memory usage can be measured up front, rather than
53 : allocating piecemeal as traffic arrives.
54 :
55 : @par Immediate Completion
56 : Operations complete immediately without suspending when the
57 : buffer sequence is empty, or when the underlying stream's
58 : awaitable reports readiness via `await_ready`.
59 :
60 : @par Thread Safety
61 : Not thread-safe. Concurrent operations on the same wrapper
62 : are undefined behavior.
63 :
64 : @par Example
65 : @code
66 : // Owning - takes ownership of the stream
67 : any_write_stream stream(socket{ioc});
68 :
69 : // Reference - wraps without ownership
70 : socket sock(ioc);
71 : any_write_stream stream(&sock);
72 :
73 : const_buffer buf(data, size);
74 : auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
75 : @endcode
76 :
77 : @see any_read_stream, any_stream, WriteStream
78 : */
79 : class any_write_stream
80 : {
81 : struct vtable;
82 :
83 : template<WriteStream S>
84 : struct vtable_for_impl;
85 :
86 : // ordered for cache line coherence
87 : void* stream_ = nullptr;
88 : vtable const* vt_ = nullptr;
89 : void* cached_awaitable_ = nullptr;
90 : void* storage_ = nullptr;
91 : bool awaitable_active_ = false;
92 :
93 : public:
94 : /** Destructor.
95 :
96 : Destroys the owned stream (if any) and releases the cached
97 : awaitable storage.
98 : */
99 : ~any_write_stream();
100 :
101 : /** Default constructor.
102 :
103 : Constructs an empty wrapper. Operations on a default-constructed
104 : wrapper result in undefined behavior.
105 : */
106 HIT 1 : any_write_stream() = default;
107 :
108 : /** Non-copyable.
109 :
110 : The awaitable cache is per-instance and cannot be shared.
111 : */
112 : any_write_stream(any_write_stream const&) = delete;
113 : any_write_stream& operator=(any_write_stream const&) = delete;
114 :
115 : /** Move constructor.
116 :
117 : Transfers ownership of the wrapped stream (if owned) and
118 : cached awaitable storage from `other`. After the move, `other` is
119 : in a default-constructed state.
120 :
121 : @param other The wrapper to move from.
122 : */
123 2 : any_write_stream(any_write_stream&& other) noexcept
124 2 : : stream_(std::exchange(other.stream_, nullptr))
125 2 : , vt_(std::exchange(other.vt_, nullptr))
126 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127 2 : , storage_(std::exchange(other.storage_, nullptr))
128 2 : , awaitable_active_(std::exchange(other.awaitable_active_, false))
129 : {
130 2 : }
131 :
132 : /** Move assignment operator.
133 :
134 : Destroys any owned stream and releases existing resources,
135 : then transfers ownership from `other`.
136 :
137 : @param other The wrapper to move from.
138 : @return Reference to this wrapper.
139 : */
140 : any_write_stream&
141 : operator=(any_write_stream&& other) noexcept;
142 :
143 : /** Construct by taking ownership of a WriteStream.
144 :
145 : Allocates storage and moves the stream into this wrapper.
146 : The wrapper owns the stream and will destroy it.
147 :
148 : @param s The stream to take ownership of.
149 : */
150 : template<WriteStream S>
151 : requires (!std::same_as<std::decay_t<S>, any_write_stream>)
152 : any_write_stream(S s);
153 :
154 : /** Construct by wrapping a WriteStream without ownership.
155 :
156 : Wraps the given stream by pointer. The stream must remain
157 : valid for the lifetime of this wrapper.
158 :
159 : @param s Pointer to the stream to wrap.
160 : */
161 : template<WriteStream S>
162 : any_write_stream(S* s);
163 :
164 : /** Check if the wrapper contains a valid stream.
165 :
166 : @return `true` if wrapping a stream, `false` if default-constructed
167 : or moved-from.
168 : */
169 : bool
170 21 : has_value() const noexcept
171 : {
172 21 : return stream_ != nullptr;
173 : }
174 :
175 : /** Check if the wrapper contains a valid stream.
176 :
177 : @return `true` if wrapping a stream, `false` if default-constructed
178 : or moved-from.
179 : */
180 : explicit
181 3 : operator bool() const noexcept
182 : {
183 3 : return has_value();
184 : }
185 :
186 : /** Initiate an asynchronous write operation.
187 :
188 : Writes data from the provided buffer sequence. The operation
189 : completes when at least one byte has been written, or an error
190 : occurs.
191 :
192 : @param buffers The buffer sequence containing data to write.
193 : Passed by value to ensure the sequence lives in the
194 : coroutine frame across suspension points.
195 :
196 : @return An awaitable yielding `(error_code,std::size_t)`.
197 :
198 : @par Immediate Completion
199 : The operation completes immediately without suspending
200 : the calling coroutine when:
201 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
202 : @li The underlying stream's awaitable reports immediate
203 : readiness via `await_ready`.
204 :
205 : @note This is a partial operation and may not process the
206 : entire buffer sequence. Use the composed @ref write algorithm
207 : for guaranteed complete transfer.
208 :
209 : @par Preconditions
210 : The wrapper must contain a valid stream (`has_value() == true`).
211 : */
212 : template<ConstBufferSequence CB>
213 : auto
214 : write_some(CB buffers);
215 :
216 : protected:
217 : /** Rebind to a new stream after move.
218 :
219 : Updates the internal pointer to reference a new stream object.
220 : Used by owning wrappers after move assignment when the owned
221 : object has moved to a new location.
222 :
223 : @param new_stream The new stream to bind to. Must be the same
224 : type as the original stream.
225 :
226 : @note Terminates if called with a stream of different type
227 : than the original.
228 : */
229 : template<WriteStream S>
230 : void
231 : rebind(S& new_stream) noexcept
232 : {
233 : if(vt_ != &vtable_for_impl<S>::value)
234 : std::terminate();
235 : stream_ = &new_stream;
236 : }
237 : };
238 :
239 : //----------------------------------------------------------
240 :
241 : struct any_write_stream::vtable
242 : {
243 : // ordered by call frequency for cache line coherence
244 : void (*construct_awaitable)(
245 : void* stream,
246 : void* storage,
247 : std::span<const_buffer const> buffers);
248 : bool (*await_ready)(void*);
249 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
250 : io_result<std::size_t> (*await_resume)(void*);
251 : void (*destroy_awaitable)(void*) noexcept;
252 : std::size_t awaitable_size;
253 : std::size_t awaitable_align;
254 : void (*destroy)(void*) noexcept;
255 : };
256 :
257 : template<WriteStream S>
258 : struct any_write_stream::vtable_for_impl
259 : {
260 : using Awaitable = decltype(std::declval<S&>().write_some(
261 : std::span<const_buffer const>{}));
262 :
263 : static void
264 1 : do_destroy_impl(void* stream) noexcept
265 : {
266 1 : static_cast<S*>(stream)->~S();
267 1 : }
268 :
269 : static void
270 75 : construct_awaitable_impl(
271 : void* stream,
272 : void* storage,
273 : std::span<const_buffer const> buffers)
274 : {
275 75 : auto& s = *static_cast<S*>(stream);
276 75 : ::new(storage) Awaitable(s.write_some(buffers));
277 75 : }
278 :
279 : static constexpr vtable value = {
280 : &construct_awaitable_impl,
281 75 : +[](void* p) {
282 75 : return static_cast<Awaitable*>(p)->await_ready();
283 : },
284 2 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
285 2 : return detail::call_await_suspend(
286 2 : static_cast<Awaitable*>(p), h, env);
287 : },
288 73 : +[](void* p) {
289 73 : return static_cast<Awaitable*>(p)->await_resume();
290 : },
291 77 : +[](void* p) noexcept {
292 12 : static_cast<Awaitable*>(p)->~Awaitable();
293 : },
294 : sizeof(Awaitable),
295 : alignof(Awaitable),
296 : &do_destroy_impl
297 : };
298 : };
299 :
300 : //----------------------------------------------------------
301 :
302 : inline
303 95 : any_write_stream::~any_write_stream()
304 : {
305 95 : if(storage_)
306 : {
307 1 : vt_->destroy(stream_);
308 1 : ::operator delete(storage_);
309 : }
310 95 : if(cached_awaitable_)
311 : {
312 85 : if(awaitable_active_)
313 1 : vt_->destroy_awaitable(cached_awaitable_);
314 85 : ::operator delete(cached_awaitable_);
315 : }
316 95 : }
317 :
318 : inline any_write_stream&
319 5 : any_write_stream::operator=(any_write_stream&& other) noexcept
320 : {
321 5 : if(this != &other)
322 : {
323 5 : if(storage_)
324 : {
325 MIS 0 : vt_->destroy(stream_);
326 0 : ::operator delete(storage_);
327 : }
328 HIT 5 : if(cached_awaitable_)
329 : {
330 2 : if(awaitable_active_)
331 1 : vt_->destroy_awaitable(cached_awaitable_);
332 2 : ::operator delete(cached_awaitable_);
333 : }
334 5 : stream_ = std::exchange(other.stream_, nullptr);
335 5 : vt_ = std::exchange(other.vt_, nullptr);
336 5 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
337 5 : storage_ = std::exchange(other.storage_, nullptr);
338 5 : awaitable_active_ = std::exchange(other.awaitable_active_, false);
339 : }
340 5 : return *this;
341 : }
342 :
343 : template<WriteStream S>
344 : requires (!std::same_as<std::decay_t<S>, any_write_stream>)
345 1 : any_write_stream::any_write_stream(S s)
346 1 : : vt_(&vtable_for_impl<S>::value)
347 : {
348 : struct guard {
349 : any_write_stream* self;
350 : bool committed = false;
351 1 : ~guard() {
352 1 : if(!committed && self->storage_) {
353 MIS 0 : self->vt_->destroy(self->stream_);
354 0 : ::operator delete(self->storage_);
355 0 : self->storage_ = nullptr;
356 0 : self->stream_ = nullptr;
357 : }
358 HIT 1 : }
359 1 : } g{this};
360 :
361 1 : storage_ = ::operator new(sizeof(S));
362 1 : stream_ = ::new(storage_) S(std::move(s));
363 :
364 : // Preallocate the awaitable storage
365 1 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
366 :
367 1 : g.committed = true;
368 1 : }
369 :
370 : template<WriteStream S>
371 86 : any_write_stream::any_write_stream(S* s)
372 86 : : stream_(s)
373 86 : , vt_(&vtable_for_impl<S>::value)
374 : {
375 : // Preallocate the awaitable storage
376 86 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
377 86 : }
378 :
379 : //----------------------------------------------------------
380 :
381 : template<ConstBufferSequence CB>
382 : auto
383 79 : any_write_stream::write_some(CB buffers)
384 : {
385 : struct awaitable
386 : {
387 : any_write_stream* self_;
388 : const_buffer_array<detail::max_iovec_> ba_;
389 :
390 79 : awaitable(
391 : any_write_stream* self,
392 : CB const& buffers) noexcept
393 79 : : self_(self)
394 79 : , ba_(buffers)
395 : {
396 79 : }
397 :
398 : bool
399 79 : await_ready() const noexcept
400 : {
401 79 : return ba_.to_span().empty();
402 : }
403 :
404 : std::coroutine_handle<>
405 75 : await_suspend(std::coroutine_handle<> h, io_env const* env)
406 : {
407 75 : self_->vt_->construct_awaitable(
408 75 : self_->stream_,
409 75 : self_->cached_awaitable_,
410 75 : ba_.to_span());
411 75 : self_->awaitable_active_ = true;
412 :
413 75 : if(self_->vt_->await_ready(self_->cached_awaitable_))
414 73 : return h;
415 :
416 2 : return self_->vt_->await_suspend(
417 2 : self_->cached_awaitable_, h, env);
418 : }
419 :
420 : io_result<std::size_t>
421 77 : await_resume()
422 : {
423 77 : if(!self_->awaitable_active_)
424 4 : return {{}, 0};
425 : struct guard {
426 : any_write_stream* self;
427 73 : ~guard() {
428 73 : self->vt_->destroy_awaitable(self->cached_awaitable_);
429 73 : self->awaitable_active_ = false;
430 73 : }
431 73 : } g{self_};
432 73 : return self_->vt_->await_resume(
433 73 : self_->cached_awaitable_);
434 73 : }
435 : };
436 79 : return awaitable{this, buffers};
437 : }
438 :
439 : } // namespace capy
440 : } // namespace boost
441 :
442 : #endif
|