TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/io/io_object.hpp>
19 : #include <boost/corosio/endpoint.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <coroutine>
22 : #include <boost/capy/error.hpp>
23 : #include <system_error>
24 :
25 : #include <boost/corosio/native/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/scheduler_op.hpp>
28 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
29 :
30 : #include <unistd.h>
31 : #include <errno.h>
32 :
33 : #include <atomic>
34 : #include <cstddef>
35 : #include <memory>
36 : #include <mutex>
37 : #include <optional>
38 : #include <stop_token>
39 :
40 : #include <netinet/in.h>
41 : #include <poll.h>
42 : #include <sys/socket.h>
43 : #include <sys/uio.h>
44 :
45 : /*
46 : epoll Operation State
47 : =====================
48 :
49 : Each async I/O operation has a corresponding epoll_op-derived struct that
50 : holds the operation's state while it's in flight. The socket impl owns
51 : fixed slots for each operation type (conn_, rd_, wr_), so only one
52 : operation of each type can be pending per socket at a time.
53 :
54 : Persistent Registration
55 : -----------------------
56 : File descriptors are registered with epoll once (via descriptor_state) and
57 : stay registered until closed. The descriptor_state tracks which operations
58 : are pending (read_op, write_op, connect_op). When an event arrives, the
59 : reactor dispatches to the appropriate pending operation.
60 :
61 : Impl Lifetime Management
62 : ------------------------
63 : When cancel() posts an op to the scheduler's ready queue, the socket impl
64 : might be destroyed before the scheduler processes the op. The `impl_ptr`
65 : member holds a shared_ptr to the impl, keeping it alive until the op
66 : completes. This is set by cancel() and cleared in operator() after the
67 : coroutine is resumed.
68 :
69 : EOF Detection
70 : -------------
71 : For reads, 0 bytes with no error means EOF. But an empty user buffer also
72 : returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
73 :
74 : SIGPIPE Prevention
75 : ------------------
76 : Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
77 : SIGPIPE when the peer has closed.
78 : */
79 :
80 : namespace boost::corosio::detail {
81 :
82 : // Forward declarations
83 : class epoll_socket;
84 : class epoll_acceptor;
85 : struct epoll_op;
86 :
87 : // Forward declaration
88 : class epoll_scheduler;
89 :
90 : /** Per-descriptor state for persistent epoll registration.
91 :
92 : Tracks pending operations for a file descriptor. The fd is registered
93 : once with epoll and stays registered until closed.
94 :
95 : This struct extends scheduler_op to support deferred I/O processing.
96 : When epoll events arrive, the reactor sets ready_events and queues
97 : this descriptor for processing. When popped from the scheduler queue,
98 : operator() performs the actual I/O and queues completion handlers.
99 :
100 : @par Deferred I/O Model
101 : The reactor no longer performs I/O directly. Instead:
102 : 1. Reactor sets ready_events and queues descriptor_state
103 : 2. Scheduler pops descriptor_state and calls operator()
104 : 3. operator() performs I/O under mutex and queues completions
105 :
106 : This eliminates per-descriptor mutex locking from the reactor hot path.
107 :
108 : @par Thread Safety
109 : The mutex protects operation pointers and ready flags during I/O.
110 : ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
111 : */
112 : struct descriptor_state final : scheduler_op
113 : {
114 : std::mutex mutex;
115 :
116 : // Protected by mutex
117 : epoll_op* read_op = nullptr;
118 : epoll_op* write_op = nullptr;
119 : epoll_op* connect_op = nullptr;
120 :
121 : // Caches edge events that arrived before an op was registered
122 : bool read_ready = false;
123 : bool write_ready = false;
124 :
125 : // Deferred cancellation: set by cancel() when the target op is not
126 : // parked (e.g. completing inline via speculative I/O). Checked when
127 : // the next op parks; if set, the op is immediately self-cancelled.
128 : // This matches IOCP semantics where CancelIoEx always succeeds.
129 : bool read_cancel_pending = false;
130 : bool write_cancel_pending = false;
131 : bool connect_cancel_pending = false;
132 :
133 : // Protected by mutex (written by register_descriptor and ensure_write_events)
134 : std::uint32_t registered_events = 0;
135 : int fd = -1;
136 :
137 : // For deferred I/O - set by reactor, read by scheduler
138 : std::atomic<std::uint32_t> ready_events_{0};
139 : std::atomic<bool> is_enqueued_{false};
140 : epoll_scheduler const* scheduler_ = nullptr;
141 :
142 : // Prevents impl destruction while this descriptor_state is queued.
143 : // Set by close_socket() when is_enqueued_ is true, cleared by operator().
144 : std::shared_ptr<void> impl_ref_;
145 :
146 : /// Add ready events atomically.
147 HIT 43896 : void add_ready_events(std::uint32_t ev) noexcept
148 : {
149 43896 : ready_events_.fetch_or(ev, std::memory_order_relaxed);
150 43896 : }
151 :
152 : /// Perform deferred I/O and queue completions.
153 : void operator()() override;
154 :
155 : /// Destroy without invoking.
156 : /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
157 : /// the self-referential cycle set by close_socket().
158 25 : void destroy() override
159 : {
160 25 : impl_ref_.reset();
161 25 : }
162 : };
163 :
164 : struct epoll_op : scheduler_op
165 : {
166 : struct canceller
167 : {
168 : epoll_op* op;
169 : void operator()() const noexcept;
170 : };
171 :
172 : std::coroutine_handle<> h;
173 : capy::executor_ref ex;
174 : std::error_code* ec_out = nullptr;
175 : std::size_t* bytes_out = nullptr;
176 :
177 : int fd = -1;
178 : int errn = 0;
179 : std::size_t bytes_transferred = 0;
180 :
181 : std::atomic<bool> cancelled{false};
182 : std::optional<std::stop_callback<canceller>> stop_cb;
183 :
184 : // Prevents use-after-free when socket is closed with pending ops.
185 : // See "Impl Lifetime Management" in file header.
186 : std::shared_ptr<void> impl_ptr;
187 :
188 : // For stop_token cancellation - pointer to owning socket/acceptor impl.
189 : // When stop is requested, we call back to the impl to perform actual I/O cancellation.
190 : epoll_socket* socket_impl_ = nullptr;
191 : epoll_acceptor* acceptor_impl_ = nullptr;
192 :
193 37106 : epoll_op() = default;
194 :
195 263199 : void reset() noexcept
196 : {
197 263199 : fd = -1;
198 263199 : errn = 0;
199 263199 : bytes_transferred = 0;
200 263199 : cancelled.store(false, std::memory_order_relaxed);
201 263199 : impl_ptr.reset();
202 263199 : socket_impl_ = nullptr;
203 263199 : acceptor_impl_ = nullptr;
204 263199 : }
205 :
206 : // Defined in sockets.cpp where epoll_socket is complete
207 : void operator()() override;
208 :
209 25450 : virtual bool is_read_operation() const noexcept
210 : {
211 25450 : return false;
212 : }
213 : virtual void cancel() noexcept = 0;
214 :
215 MIS 0 : void destroy() override
216 : {
217 0 : stop_cb.reset();
218 0 : impl_ptr.reset();
219 0 : }
220 :
221 HIT 111964 : void request_cancel() noexcept
222 : {
223 111964 : cancelled.store(true, std::memory_order_release);
224 111964 : }
225 :
226 55184 : void start(std::stop_token const& token, epoll_socket* impl)
227 : {
228 55184 : cancelled.store(false, std::memory_order_release);
229 55184 : stop_cb.reset();
230 55184 : socket_impl_ = impl;
231 55184 : acceptor_impl_ = nullptr;
232 :
233 55184 : if (token.stop_possible())
234 99 : stop_cb.emplace(token, canceller{this});
235 55184 : }
236 :
237 4102 : void start(std::stop_token const& token, epoll_acceptor* impl)
238 : {
239 4102 : cancelled.store(false, std::memory_order_release);
240 4102 : stop_cb.reset();
241 4102 : socket_impl_ = nullptr;
242 4102 : acceptor_impl_ = impl;
243 :
244 4102 : if (token.stop_possible())
245 9 : stop_cb.emplace(token, canceller{this});
246 4102 : }
247 :
248 59222 : void complete(int err, std::size_t bytes) noexcept
249 : {
250 59222 : errn = err;
251 59222 : bytes_transferred = bytes;
252 59222 : }
253 :
254 MIS 0 : virtual void perform_io() noexcept {}
255 : };
256 :
257 : struct epoll_connect_op final : epoll_op
258 : {
259 : endpoint target_endpoint;
260 :
261 HIT 4095 : void reset() noexcept
262 : {
263 4095 : epoll_op::reset();
264 4095 : target_endpoint = endpoint{};
265 4095 : }
266 :
267 4093 : void perform_io() noexcept override
268 : {
269 : // Guard against spurious write-ready events: a zero-timeout
270 : // poll confirms the fd is actually writable (connect finished).
271 4093 : pollfd pfd{};
272 4093 : pfd.fd = fd;
273 4093 : pfd.events = POLLOUT;
274 4093 : if (::poll(&pfd, 1, 0) == 0)
275 : {
276 MIS 0 : complete(EAGAIN, 0);
277 0 : return;
278 : }
279 :
280 HIT 4093 : int err = 0;
281 4093 : socklen_t len = sizeof(err);
282 4093 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
283 MIS 0 : err = errno;
284 HIT 4093 : complete(err, 0);
285 : }
286 :
287 : // Defined in sockets.cpp where epoll_socket is complete
288 : void operator()() override;
289 : void cancel() noexcept override;
290 : };
291 :
292 : struct epoll_read_op final : epoll_op
293 : {
294 : static constexpr std::size_t max_buffers = 16;
295 : iovec iovecs[max_buffers];
296 : int iovec_count = 0;
297 : bool empty_buffer_read = false;
298 :
299 25436 : bool is_read_operation() const noexcept override
300 : {
301 25436 : return !empty_buffer_read;
302 : }
303 :
304 127600 : void reset() noexcept
305 : {
306 127600 : epoll_op::reset();
307 127600 : iovec_count = 0;
308 127600 : empty_buffer_read = false;
309 127600 : }
310 :
311 144 : void perform_io() noexcept override
312 : {
313 : ssize_t n;
314 : do
315 : {
316 144 : n = ::readv(fd, iovecs, iovec_count);
317 : }
318 144 : while (n < 0 && errno == EINTR);
319 :
320 144 : if (n >= 0)
321 4 : complete(0, static_cast<std::size_t>(n));
322 : else
323 140 : complete(errno, 0);
324 144 : }
325 :
326 : void cancel() noexcept override;
327 : };
328 :
329 : struct epoll_write_op final : epoll_op
330 : {
331 : static constexpr std::size_t max_buffers = 16;
332 : iovec iovecs[max_buffers];
333 : int iovec_count = 0;
334 :
335 127402 : void reset() noexcept
336 : {
337 127402 : epoll_op::reset();
338 127402 : iovec_count = 0;
339 127402 : }
340 :
341 MIS 0 : void perform_io() noexcept override
342 : {
343 0 : msghdr msg{};
344 0 : msg.msg_iov = iovecs;
345 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
346 :
347 : ssize_t n;
348 : do
349 : {
350 0 : n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
351 : }
352 0 : while (n < 0 && errno == EINTR);
353 :
354 0 : if (n >= 0)
355 0 : complete(0, static_cast<std::size_t>(n));
356 : else
357 0 : complete(errno, 0);
358 0 : }
359 :
360 : void cancel() noexcept override;
361 : };
362 :
363 : struct epoll_accept_op final : epoll_op
364 : {
365 : int accepted_fd = -1;
366 : io_object::implementation** impl_out = nullptr;
367 : sockaddr_storage peer_storage{};
368 :
369 HIT 4102 : void reset() noexcept
370 : {
371 4102 : epoll_op::reset();
372 4102 : accepted_fd = -1;
373 4102 : impl_out = nullptr;
374 4102 : peer_storage = {};
375 4102 : }
376 :
377 4091 : void perform_io() noexcept override
378 : {
379 4091 : socklen_t addrlen = sizeof(peer_storage);
380 : int new_fd;
381 : do
382 : {
383 8182 : new_fd = ::accept4(
384 4091 : fd, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
385 : SOCK_NONBLOCK | SOCK_CLOEXEC);
386 : }
387 4091 : while (new_fd < 0 && errno == EINTR);
388 :
389 4091 : if (new_fd >= 0)
390 : {
391 4091 : accepted_fd = new_fd;
392 4091 : complete(0, 0);
393 : }
394 : else
395 : {
396 MIS 0 : complete(errno, 0);
397 : }
398 HIT 4091 : }
399 :
400 : // Defined in acceptors.cpp where epoll_acceptor is complete
401 : void operator()() override;
402 : void cancel() noexcept override;
403 : };
404 :
405 : } // namespace boost::corosio::detail
406 :
407 : #endif // BOOST_COROSIO_HAS_EPOLL
408 :
409 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
|