include/boost/corosio/native/detail/epoll/epoll_op.hpp

82.1% Lines (87/106) 85.0% List of functions (17/20)
f(x) Functions (20)
Function Calls Lines Branches Blocks
boost::corosio::detail::descriptor_state::add_ready_events(unsigned int) :147 0 100.0% boost::corosio::detail::descriptor_state::destroy() :158 0 100.0% boost::corosio::detail::epoll_op::epoll_op() :193 0 100.0% boost::corosio::detail::epoll_op::reset() :195 0 100.0% boost::corosio::detail::epoll_op::is_read_operation() const :209 0 100.0% boost::corosio::detail::epoll_op::destroy() :215 0 0.0% boost::corosio::detail::epoll_op::request_cancel() :221 0 100.0% boost::corosio::detail::epoll_op::start(std::stop_token const&, boost::corosio::detail::epoll_socket*) :226 0 100.0% boost::corosio::detail::epoll_op::start(std::stop_token const&, boost::corosio::detail::epoll_acceptor*) :237 0 100.0% boost::corosio::detail::epoll_op::complete(int, unsigned long) :248 0 100.0% boost::corosio::detail::epoll_op::perform_io() :254 0 0.0% boost::corosio::detail::epoll_connect_op::reset() :261 0 100.0% boost::corosio::detail::epoll_connect_op::perform_io() :267 0 75.0% boost::corosio::detail::epoll_read_op::is_read_operation() const :299 0 100.0% boost::corosio::detail::epoll_read_op::reset() :304 0 100.0% boost::corosio::detail::epoll_read_op::perform_io() :311 0 100.0% boost::corosio::detail::epoll_write_op::reset() :335 0 100.0% boost::corosio::detail::epoll_write_op::perform_io() :341 0 0.0% boost::corosio::detail::epoll_accept_op::reset() :369 0 100.0% boost::corosio::detail::epoll_accept_op::perform_io() :377 0 90.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/scheduler_op.hpp>
28 #include <boost/corosio/native/detail/endpoint_convert.hpp>
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <poll.h>
42 #include <sys/socket.h>
43 #include <sys/uio.h>
44
45 /*
46 epoll Operation State
47 =====================
48
49 Each async I/O operation has a corresponding epoll_op-derived struct that
50 holds the operation's state while it's in flight. The socket impl owns
51 fixed slots for each operation type (conn_, rd_, wr_), so only one
52 operation of each type can be pending per socket at a time.
53
54 Persistent Registration
55 -----------------------
56 File descriptors are registered with epoll once (via descriptor_state) and
57 stay registered until closed. The descriptor_state tracks which operations
58 are pending (read_op, write_op, connect_op). When an event arrives, the
59 reactor dispatches to the appropriate pending operation.
60
61 Impl Lifetime Management
62 ------------------------
63 When cancel() posts an op to the scheduler's ready queue, the socket impl
64 might be destroyed before the scheduler processes the op. The `impl_ptr`
65 member holds a shared_ptr to the impl, keeping it alive until the op
66 completes. This is set by cancel() and cleared in operator() after the
67 coroutine is resumed.
68
69 EOF Detection
70 -------------
71 For reads, 0 bytes with no error means EOF. But an empty user buffer also
72 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
73
74 SIGPIPE Prevention
75 ------------------
76 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
77 SIGPIPE when the peer has closed.
78 */
79
80 namespace boost::corosio::detail {
81
82 // Forward declarations
83 class epoll_socket;
84 class epoll_acceptor;
85 struct epoll_op;
86
87 // Forward declaration
88 class epoll_scheduler;
89
90 /** Per-descriptor state for persistent epoll registration.
91
92 Tracks pending operations for a file descriptor. The fd is registered
93 once with epoll and stays registered until closed.
94
95 This struct extends scheduler_op to support deferred I/O processing.
96 When epoll events arrive, the reactor sets ready_events and queues
97 this descriptor for processing. When popped from the scheduler queue,
98 operator() performs the actual I/O and queues completion handlers.
99
100 @par Deferred I/O Model
101 The reactor no longer performs I/O directly. Instead:
102 1. Reactor sets ready_events and queues descriptor_state
103 2. Scheduler pops descriptor_state and calls operator()
104 3. operator() performs I/O under mutex and queues completions
105
106 This eliminates per-descriptor mutex locking from the reactor hot path.
107
108 @par Thread Safety
109 The mutex protects operation pointers and ready flags during I/O.
110 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
111 */
112 struct descriptor_state final : scheduler_op
113 {
114 std::mutex mutex;
115
116 // Protected by mutex
117 epoll_op* read_op = nullptr;
118 epoll_op* write_op = nullptr;
119 epoll_op* connect_op = nullptr;
120
121 // Caches edge events that arrived before an op was registered
122 bool read_ready = false;
123 bool write_ready = false;
124
125 // Deferred cancellation: set by cancel() when the target op is not
126 // parked (e.g. completing inline via speculative I/O). Checked when
127 // the next op parks; if set, the op is immediately self-cancelled.
128 // This matches IOCP semantics where CancelIoEx always succeeds.
129 bool read_cancel_pending = false;
130 bool write_cancel_pending = false;
131 bool connect_cancel_pending = false;
132
133 // Protected by mutex (written by register_descriptor and ensure_write_events)
134 std::uint32_t registered_events = 0;
135 int fd = -1;
136
137 // For deferred I/O - set by reactor, read by scheduler
138 std::atomic<std::uint32_t> ready_events_{0};
139 std::atomic<bool> is_enqueued_{false};
140 epoll_scheduler const* scheduler_ = nullptr;
141
142 // Prevents impl destruction while this descriptor_state is queued.
143 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
144 std::shared_ptr<void> impl_ref_;
145
146 /// Add ready events atomically.
147 43896x void add_ready_events(std::uint32_t ev) noexcept
148 {
149 43896x ready_events_.fetch_or(ev, std::memory_order_relaxed);
150 43896x }
151
152 /// Perform deferred I/O and queue completions.
153 void operator()() override;
154
155 /// Destroy without invoking.
156 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
157 /// the self-referential cycle set by close_socket().
158 25x void destroy() override
159 {
160 25x impl_ref_.reset();
161 25x }
162 };
163
164 struct epoll_op : scheduler_op
165 {
166 struct canceller
167 {
168 epoll_op* op;
169 void operator()() const noexcept;
170 };
171
172 std::coroutine_handle<> h;
173 capy::executor_ref ex;
174 std::error_code* ec_out = nullptr;
175 std::size_t* bytes_out = nullptr;
176
177 int fd = -1;
178 int errn = 0;
179 std::size_t bytes_transferred = 0;
180
181 std::atomic<bool> cancelled{false};
182 std::optional<std::stop_callback<canceller>> stop_cb;
183
184 // Prevents use-after-free when socket is closed with pending ops.
185 // See "Impl Lifetime Management" in file header.
186 std::shared_ptr<void> impl_ptr;
187
188 // For stop_token cancellation - pointer to owning socket/acceptor impl.
189 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
190 epoll_socket* socket_impl_ = nullptr;
191 epoll_acceptor* acceptor_impl_ = nullptr;
192
193 37106x epoll_op() = default;
194
195 263199x void reset() noexcept
196 {
197 263199x fd = -1;
198 263199x errn = 0;
199 263199x bytes_transferred = 0;
200 263199x cancelled.store(false, std::memory_order_relaxed);
201 263199x impl_ptr.reset();
202 263199x socket_impl_ = nullptr;
203 263199x acceptor_impl_ = nullptr;
204 263199x }
205
206 // Defined in sockets.cpp where epoll_socket is complete
207 void operator()() override;
208
209 25450x virtual bool is_read_operation() const noexcept
210 {
211 25450x return false;
212 }
213 virtual void cancel() noexcept = 0;
214
215 void destroy() override
216 {
217 stop_cb.reset();
218 impl_ptr.reset();
219 }
220
221 111964x void request_cancel() noexcept
222 {
223 111964x cancelled.store(true, std::memory_order_release);
224 111964x }
225
226 55184x void start(std::stop_token const& token, epoll_socket* impl)
227 {
228 55184x cancelled.store(false, std::memory_order_release);
229 55184x stop_cb.reset();
230 55184x socket_impl_ = impl;
231 55184x acceptor_impl_ = nullptr;
232
233 55184x if (token.stop_possible())
234 99x stop_cb.emplace(token, canceller{this});
235 55184x }
236
237 4102x void start(std::stop_token const& token, epoll_acceptor* impl)
238 {
239 4102x cancelled.store(false, std::memory_order_release);
240 4102x stop_cb.reset();
241 4102x socket_impl_ = nullptr;
242 4102x acceptor_impl_ = impl;
243
244 4102x if (token.stop_possible())
245 9x stop_cb.emplace(token, canceller{this});
246 4102x }
247
248 59222x void complete(int err, std::size_t bytes) noexcept
249 {
250 59222x errn = err;
251 59222x bytes_transferred = bytes;
252 59222x }
253
254 virtual void perform_io() noexcept {}
255 };
256
257 struct epoll_connect_op final : epoll_op
258 {
259 endpoint target_endpoint;
260
261 4095x void reset() noexcept
262 {
263 4095x epoll_op::reset();
264 4095x target_endpoint = endpoint{};
265 4095x }
266
267 4093x void perform_io() noexcept override
268 {
269 // Guard against spurious write-ready events: a zero-timeout
270 // poll confirms the fd is actually writable (connect finished).
271 4093x pollfd pfd{};
272 4093x pfd.fd = fd;
273 4093x pfd.events = POLLOUT;
274 4093x if (::poll(&pfd, 1, 0) == 0)
275 {
276 complete(EAGAIN, 0);
277 return;
278 }
279
280 4093x int err = 0;
281 4093x socklen_t len = sizeof(err);
282 4093x if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
283 err = errno;
284 4093x complete(err, 0);
285 }
286
287 // Defined in sockets.cpp where epoll_socket is complete
288 void operator()() override;
289 void cancel() noexcept override;
290 };
291
292 struct epoll_read_op final : epoll_op
293 {
294 static constexpr std::size_t max_buffers = 16;
295 iovec iovecs[max_buffers];
296 int iovec_count = 0;
297 bool empty_buffer_read = false;
298
299 25436x bool is_read_operation() const noexcept override
300 {
301 25436x return !empty_buffer_read;
302 }
303
304 127600x void reset() noexcept
305 {
306 127600x epoll_op::reset();
307 127600x iovec_count = 0;
308 127600x empty_buffer_read = false;
309 127600x }
310
311 144x void perform_io() noexcept override
312 {
313 ssize_t n;
314 do
315 {
316 144x n = ::readv(fd, iovecs, iovec_count);
317 }
318 144x while (n < 0 && errno == EINTR);
319
320 144x if (n >= 0)
321 4x complete(0, static_cast<std::size_t>(n));
322 else
323 140x complete(errno, 0);
324 144x }
325
326 void cancel() noexcept override;
327 };
328
329 struct epoll_write_op final : epoll_op
330 {
331 static constexpr std::size_t max_buffers = 16;
332 iovec iovecs[max_buffers];
333 int iovec_count = 0;
334
335 127402x void reset() noexcept
336 {
337 127402x epoll_op::reset();
338 127402x iovec_count = 0;
339 127402x }
340
341 void perform_io() noexcept override
342 {
343 msghdr msg{};
344 msg.msg_iov = iovecs;
345 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
346
347 ssize_t n;
348 do
349 {
350 n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
351 }
352 while (n < 0 && errno == EINTR);
353
354 if (n >= 0)
355 complete(0, static_cast<std::size_t>(n));
356 else
357 complete(errno, 0);
358 }
359
360 void cancel() noexcept override;
361 };
362
363 struct epoll_accept_op final : epoll_op
364 {
365 int accepted_fd = -1;
366 io_object::implementation** impl_out = nullptr;
367 sockaddr_storage peer_storage{};
368
369 4102x void reset() noexcept
370 {
371 4102x epoll_op::reset();
372 4102x accepted_fd = -1;
373 4102x impl_out = nullptr;
374 4102x peer_storage = {};
375 4102x }
376
377 4091x void perform_io() noexcept override
378 {
379 4091x socklen_t addrlen = sizeof(peer_storage);
380 int new_fd;
381 do
382 {
383 8182x new_fd = ::accept4(
384 4091x fd, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
385 SOCK_NONBLOCK | SOCK_CLOEXEC);
386 }
387 4091x while (new_fd < 0 && errno == EINTR);
388
389 4091x if (new_fd >= 0)
390 {
391 4091x accepted_fd = new_fd;
392 4091x complete(0, 0);
393 }
394 else
395 {
396 complete(errno, 0);
397 }
398 4091x }
399
400 // Defined in acceptors.cpp where epoll_acceptor is complete
401 void operator()() override;
402 void cancel() noexcept override;
403 };
404
405 } // namespace boost::corosio::detail
406
407 #endif // BOOST_COROSIO_HAS_EPOLL
408
409 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
410