include/boost/corosio/native/detail/select/select_op.hpp

74.4% Lines (96/129) 84.2% List of functions (16/19)
f(x) Functions (19)
Function Calls Lines Branches Blocks
boost::corosio::detail::select_op::select_op() :139 0 100.0% boost::corosio::detail::select_op::reset() :141 0 100.0% boost::corosio::detail::select_op::operator()() :154 0 100.0% boost::corosio::detail::select_op::is_read_operation() const :180 0 100.0% boost::corosio::detail::select_op::destroy() :186 0 0.0% boost::corosio::detail::select_op::request_cancel() :192 0 100.0% boost::corosio::detail::select_op::start(std::stop_token const&, boost::corosio::detail::select_socket*) :208 0 100.0% boost::corosio::detail::select_op::start(std::stop_token const&, boost::corosio::detail::select_acceptor*) :219 0 87.5% boost::corosio::detail::select_op::complete(int, unsigned long) :230 0 100.0% boost::corosio::detail::select_op::perform_io() :236 0 0.0% boost::corosio::detail::select_connect_op::reset() :243 0 100.0% boost::corosio::detail::select_connect_op::perform_io() :249 0 85.7% boost::corosio::detail::select_read_op::is_read_operation() const :271 0 100.0% boost::corosio::detail::select_read_op::reset() :276 0 100.0% boost::corosio::detail::select_read_op::perform_io() :283 0 83.3% boost::corosio::detail::select_write_op::reset() :301 0 100.0% boost::corosio::detail::select_write_op::perform_io() :307 0 0.0% boost::corosio::detail::select_accept_op::reset() :329 0 100.0% boost::corosio::detail::select_accept_op::perform_io() :337 0 42.9%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/scheduler_op.hpp>
28 #include <boost/corosio/native/detail/endpoint_convert.hpp>
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <fcntl.h>
33
34 #include <atomic>
35 #include <cstddef>
36 #include <memory>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/select.h>
42 #include <sys/socket.h>
43 #include <sys/uio.h>
44
45 /*
46 select Operation State
47 ======================
48
49 Each async I/O operation has a corresponding select_op-derived struct that
50 holds the operation's state while it's in flight. The socket impl owns
51 fixed slots for each operation type (conn_, rd_, wr_), so only one
52 operation of each type can be pending per socket at a time.
53
54 This mirrors the epoll_op design for consistency across backends.
55
56 Completion vs Cancellation Race
57 -------------------------------
58 The `registered` atomic uses a tri-state (unregistered, registering,
59 registered) to handle two races: (1) between register_fd() and the
60 reactor seeing an event, and (2) between reactor completion and cancel().
61
62 The registering state closes the window where an event could arrive
63 after register_fd() but before the boolean was set. The reactor and
64 cancel() both treat registering the same as registered when claiming.
65
66 Whoever atomically exchanges to unregistered "claims" the operation
67 and is responsible for completing it. The loser sees unregistered and
68 does nothing. The initiating thread uses compare_exchange to transition
69 from registering to registered; if this fails, the reactor or cancel
70 already claimed the op.
71
72 Impl Lifetime Management
73 ------------------------
74 When cancel() posts an op to the scheduler's ready queue, the socket impl
75 might be destroyed before the scheduler processes the op. The `impl_ptr`
76 member holds a shared_ptr to the impl, keeping it alive until the op
77 completes.
78
79 EOF Detection
80 -------------
81 For reads, 0 bytes with no error means EOF. But an empty user buffer also
82 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
83
84 SIGPIPE Prevention
85 ------------------
86 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
87 SIGPIPE when the peer has closed.
88 */
89
90 namespace boost::corosio::detail {
91
92 // Forward declarations for cancellation support
93 class select_socket;
94 class select_acceptor;
95
96 /** Registration state for async operations.
97
98 Tri-state enum to handle the race between register_fd() and
99 run_reactor() seeing an event. Setting REGISTERING before
100 calling register_fd() ensures events delivered during the
101 registration window are not dropped.
102 */
103 enum class select_registration_state : std::uint8_t
104 {
105 unregistered, ///< Not registered with reactor
106 registering, ///< register_fd() called, not yet confirmed
107 registered ///< Fully registered, ready for events
108 };
109
110 struct select_op : scheduler_op
111 {
112 struct canceller
113 {
114 select_op* op;
115 void operator()() const noexcept;
116 };
117
118 std::coroutine_handle<> h;
119 capy::executor_ref ex;
120 std::error_code* ec_out = nullptr;
121 std::size_t* bytes_out = nullptr;
122
123 int fd = -1;
124 int errn = 0;
125 std::size_t bytes_transferred = 0;
126
127 std::atomic<bool> cancelled{false};
128 std::atomic<select_registration_state> registered{
129 select_registration_state::unregistered};
130 std::optional<std::stop_callback<canceller>> stop_cb;
131
132 // Prevents use-after-free when socket is closed with pending ops.
133 std::shared_ptr<void> impl_ptr;
134
135 // For stop_token cancellation - pointer to owning socket/acceptor impl.
136 select_socket* socket_impl_ = nullptr;
137 select_acceptor* acceptor_impl_ = nullptr;
138
139 30130x select_op() = default;
140
141 153312x void reset() noexcept
142 {
143 153312x fd = -1;
144 153312x errn = 0;
145 153312x bytes_transferred = 0;
146 153312x cancelled.store(false, std::memory_order_relaxed);
147 153312x registered.store(
148 select_registration_state::unregistered, std::memory_order_relaxed);
149 153312x impl_ptr.reset();
150 153312x socket_impl_ = nullptr;
151 153312x acceptor_impl_ = nullptr;
152 153312x }
153
154 146642x void operator()() override
155 {
156 146642x stop_cb.reset();
157
158 146642x if (ec_out)
159 {
160 146642x if (cancelled.load(std::memory_order_acquire))
161 196x *ec_out = capy::error::canceled;
162 146446x else if (errn != 0)
163 1x *ec_out = make_err(errn);
164 146445x else if (is_read_operation() && bytes_transferred == 0)
165 5x *ec_out = capy::error::eof;
166 else
167 146440x *ec_out = {};
168 }
169
170 146642x if (bytes_out)
171 146642x *bytes_out = bytes_transferred;
172
173 // Move to stack before destroying the frame
174 146642x capy::executor_ref saved_ex(ex);
175 146642x std::coroutine_handle<> saved_h(h);
176 146642x impl_ptr.reset();
177 146642x dispatch_coro(saved_ex, saved_h).resume();
178 146642x }
179
180 73236x virtual bool is_read_operation() const noexcept
181 {
182 73236x return false;
183 }
184 virtual void cancel() noexcept = 0;
185
186 void destroy() override
187 {
188 stop_cb.reset();
189 impl_ptr.reset();
190 }
191
192 91087x void request_cancel() noexcept
193 {
194 91087x cancelled.store(true, std::memory_order_release);
195 91087x }
196
197 void start(std::stop_token const& token)
198 {
199 cancelled.store(false, std::memory_order_release);
200 stop_cb.reset();
201 socket_impl_ = nullptr;
202 acceptor_impl_ = nullptr;
203
204 if (token.stop_possible())
205 stop_cb.emplace(token, canceller{this});
206 }
207
208 149977x void start(std::stop_token const& token, select_socket* impl)
209 {
210 149977x cancelled.store(false, std::memory_order_release);
211 149977x stop_cb.reset();
212 149977x socket_impl_ = impl;
213 149977x acceptor_impl_ = nullptr;
214
215 149977x if (token.stop_possible())
216 99x stop_cb.emplace(token, canceller{this});
217 149977x }
218
219 3335x void start(std::stop_token const& token, select_acceptor* impl)
220 {
221 3335x cancelled.store(false, std::memory_order_release);
222 3335x stop_cb.reset();
223 3335x socket_impl_ = nullptr;
224 3335x acceptor_impl_ = impl;
225
226 3335x if (token.stop_possible())
227 stop_cb.emplace(token, canceller{this});
228 3335x }
229
230 153153x void complete(int err, std::size_t bytes) noexcept
231 {
232 153153x errn = err;
233 153153x bytes_transferred = bytes;
234 153153x }
235
236 virtual void perform_io() noexcept {}
237 };
238
239 struct select_connect_op final : select_op
240 {
241 endpoint target_endpoint;
242
243 3335x void reset() noexcept
244 {
245 3335x select_op::reset();
246 3335x target_endpoint = endpoint{};
247 3335x }
248
249 3335x void perform_io() noexcept override
250 {
251 // connect() completion status is retrieved via SO_ERROR, not return value
252 3335x int err = 0;
253 3335x socklen_t len = sizeof(err);
254 3335x if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
255 err = errno;
256 3335x complete(err, 0);
257 3335x }
258
259 // Defined in sockets.cpp where select_socket is complete
260 void operator()() override;
261 void cancel() noexcept override;
262 };
263
264 struct select_read_op final : select_op
265 {
266 static constexpr std::size_t max_buffers = 16;
267 iovec iovecs[max_buffers];
268 int iovec_count = 0;
269 bool empty_buffer_read = false;
270
271 73209x bool is_read_operation() const noexcept override
272 {
273 73209x return !empty_buffer_read;
274 }
275
276 73401x void reset() noexcept
277 {
278 73401x select_op::reset();
279 73401x iovec_count = 0;
280 73401x empty_buffer_read = false;
281 73401x }
282
283 120x void perform_io() noexcept override
284 {
285 120x ssize_t n = ::readv(fd, iovecs, iovec_count);
286 120x if (n >= 0)
287 120x complete(0, static_cast<std::size_t>(n));
288 else
289 complete(errno, 0);
290 120x }
291
292 void cancel() noexcept override;
293 };
294
295 struct select_write_op final : select_op
296 {
297 static constexpr std::size_t max_buffers = 16;
298 iovec iovecs[max_buffers];
299 int iovec_count = 0;
300
301 73241x void reset() noexcept
302 {
303 73241x select_op::reset();
304 73241x iovec_count = 0;
305 73241x }
306
307 void perform_io() noexcept override
308 {
309 msghdr msg{};
310 msg.msg_iov = iovecs;
311 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
312
313 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
314 if (n >= 0)
315 complete(0, static_cast<std::size_t>(n));
316 else
317 complete(errno, 0);
318 }
319
320 void cancel() noexcept override;
321 };
322
323 struct select_accept_op final : select_op
324 {
325 int accepted_fd = -1;
326 io_object::implementation* peer_impl = nullptr;
327 io_object::implementation** impl_out = nullptr;
328
329 3335x void reset() noexcept
330 {
331 3335x select_op::reset();
332 3335x accepted_fd = -1;
333 3335x peer_impl = nullptr;
334 3335x impl_out = nullptr;
335 3335x }
336
337 3330x void perform_io() noexcept override
338 {
339 3330x sockaddr_storage addr_storage{};
340 3330x socklen_t addrlen = sizeof(addr_storage);
341
342 // Note: select backend uses accept() + fcntl instead of accept4()
343 // for broader POSIX compatibility
344 int new_fd =
345 3330x ::accept(fd, reinterpret_cast<sockaddr*>(&addr_storage), &addrlen);
346
347 3330x if (new_fd >= 0)
348 {
349 // Reject fds that exceed select()'s FD_SETSIZE limit.
350 // Better to fail now than during later async operations.
351 3330x if (new_fd >= FD_SETSIZE)
352 {
353 ::close(new_fd);
354 complete(EINVAL, 0);
355 return;
356 }
357
358 // Set non-blocking and close-on-exec flags.
359 // A non-blocking socket is essential for the async reactor;
360 // if we can't configure it, fail rather than risk blocking.
361 3330x int flags = ::fcntl(new_fd, F_GETFL, 0);
362 3330x if (flags == -1)
363 {
364 int err = errno;
365 ::close(new_fd);
366 complete(err, 0);
367 return;
368 }
369
370 3330x if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
371 {
372 int err = errno;
373 ::close(new_fd);
374 complete(err, 0);
375 return;
376 }
377
378 3330x if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
379 {
380 int err = errno;
381 ::close(new_fd);
382 complete(err, 0);
383 return;
384 }
385
386 3330x accepted_fd = new_fd;
387 3330x complete(0, 0);
388 }
389 else
390 {
391 complete(errno, 0);
392 }
393 }
394
395 // Defined in acceptors.cpp where select_acceptor is complete
396 void operator()() override;
397 void cancel() noexcept override;
398 };
399
400 } // namespace boost::corosio::detail
401
402 #endif // BOOST_COROSIO_HAS_SELECT
403
404 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
405