include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp

80.8% Lines (337/417) 93.3% List of functions (28/30)
f(x) Functions (30)
Function Calls Lines Branches Blocks
boost::corosio::detail::epoll_socket_state::epoll_socket_state(boost::corosio::detail::epoll_scheduler&) :97 0 100.0% boost::corosio::detail::epoll_socket_service::scheduler() const :133 0 100.0% boost::corosio::detail::epoll_socket::register_op(boost::corosio::detail::epoll_op&, boost::corosio::detail::epoll_op*&, bool&, bool&) :154 0 100.0% boost::corosio::detail::epoll_op::canceller::operator()() const :191 0 100.0% boost::corosio::detail::epoll_connect_op::cancel() :197 0 0.0% boost::corosio::detail::epoll_read_op::cancel() :206 0 80.0% boost::corosio::detail::epoll_write_op::cancel() :215 0 0.0% boost::corosio::detail::epoll_op::operator()() :224 0 87.5% boost::corosio::detail::epoll_connect_op::operator()() :253 0 95.7% boost::corosio::detail::epoll_socket::epoll_socket(boost::corosio::detail::epoll_socket_service&) :289 0 100.0% boost::corosio::detail::epoll_socket::~epoll_socket() :294 0 100.0% boost::corosio::detail::epoll_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :297 0 48.8% boost::corosio::detail::epoll_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :362 0 98.1% boost::corosio::detail::epoll_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :448 0 76.9% boost::corosio::detail::epoll_socket::shutdown(boost::corosio::tcp_socket::shutdown_type) :534 0 81.2% boost::corosio::detail::epoll_socket::set_option(int, int, void const*, unsigned long) :557 0 75.0% boost::corosio::detail::epoll_socket::get_option(int, int, void*, unsigned long*) const :567 0 83.3% boost::corosio::detail::epoll_socket::cancel() :578 0 73.5% boost::corosio::detail::epoll_socket::cancel_single_op(boost::corosio::detail::epoll_op&) :628 0 65.5% boost::corosio::detail::epoll_socket::close_socket() :668 0 86.0% boost::corosio::detail::epoll_socket_service::epoll_socket_service(boost::capy::execution_context&) :730 0 100.0% boost::corosio::detail::epoll_socket_service::~epoll_socket_service() :737 0 100.0% boost::corosio::detail::epoll_socket_service::shutdown() :740 0 80.0% boost::corosio::detail::epoll_socket_service::construct() :757 0 100.0% boost::corosio::detail::epoll_socket_service::destroy(boost::corosio::io_object::implementation*) :772 0 100.0% boost::corosio::detail::epoll_socket_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :782 0 94.4% boost::corosio::detail::epoll_socket_service::close(boost::corosio::io_object::handle&) :814 0 100.0% boost::corosio::detail::epoll_socket_service::post(boost::corosio::detail::epoll_op*) :820 0 100.0% boost::corosio::detail::epoll_socket_service::work_started() :826 0 100.0% boost::corosio::detail::epoll_socket_service::work_finished() :832 0 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23
24 #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/except.hpp>
28 #include <boost/capy/buffers.hpp>
29
30 #include <coroutine>
31 #include <mutex>
32 #include <unordered_map>
33 #include <utility>
34
35 #include <errno.h>
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 /*
43 epoll Socket Implementation
44 ===========================
45
46 Each I/O operation follows the same pattern:
47 1. Try the syscall immediately (non-blocking socket)
48 2. If it succeeds or fails with a real error, post to completion queue
49 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50
51 This "try first" approach avoids unnecessary epoll round-trips for
52 operations that can complete immediately (common for small reads/writes
53 on fast local connections).
54
55 One-Shot Registration
56 ---------------------
57 We use one-shot epoll registration: each operation registers, waits for
58 one event, then unregisters. This simplifies the state machine since we
59 don't need to track whether an fd is currently registered or handle
60 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 simplicity is worth it.
62
63 Cancellation
64 ------------
65 See op.hpp for the completion/cancellation race handling via the
66 `registered` atomic. cancel() must complete pending operations (post
67 them with cancelled flag) so coroutines waiting on them can resume.
68 close_socket() calls cancel() first to ensure this.
69
70 Impl Lifetime with shared_ptr
71 -----------------------------
72 Socket impls use enable_shared_from_this. The service owns impls via
73 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 removal. When a user calls close(), we call cancel() which posts pending
75 ops to the scheduler.
76
77 CRITICAL: The posted ops must keep the impl alive until they complete.
78 Otherwise the scheduler would process a freed op (use-after-free). The
79 cancel() method captures shared_from_this() into op.impl_ptr before
80 posting. When the op completes, impl_ptr is cleared, allowing the impl
81 to be destroyed if no other references exist.
82
83 Service Ownership
84 -----------------
85 epoll_socket_service owns all socket impls. destroy_impl() removes the
86 shared_ptr from the map, but the impl may survive if ops still hold
87 impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 in-flight ops will complete and release their refs.
89 */
90
91 namespace boost::corosio::detail {
92
93 /** State for epoll socket service. */
94 class epoll_socket_state
95 {
96 public:
97 244x explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 {
99 244x }
100
101 epoll_scheduler& sched_;
102 std::mutex mutex_;
103 intrusive_list<epoll_socket> socket_list_;
104 std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 socket_ptrs_;
106 };
107
108 /** epoll socket service implementation.
109
110 Inherits from socket_service to enable runtime polymorphism.
111 Uses key_type = socket_service for service lookup.
112 */
113 class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 {
115 public:
116 explicit epoll_socket_service(capy::execution_context& ctx);
117 ~epoll_socket_service() override;
118
119 epoll_socket_service(epoll_socket_service const&) = delete;
120 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121
122 void shutdown() override;
123
124 io_object::implementation* construct() override;
125 void destroy(io_object::implementation*) override;
126 void close(io_object::handle&) override;
127 std::error_code open_socket(
128 tcp_socket::implementation& impl,
129 int family,
130 int type,
131 int protocol) override;
132
133 330486x epoll_scheduler& scheduler() const noexcept
134 {
135 330486x return state_->sched_;
136 }
137 void post(epoll_op* op);
138 void work_started() noexcept;
139 void work_finished() noexcept;
140
141 private:
142 std::unique_ptr<epoll_socket_state> state_;
143 };
144
145 //--------------------------------------------------------------------------
146 //
147 // Implementation
148 //
149 //--------------------------------------------------------------------------
150
151 // Register an op with the reactor, handling cached edge events.
152 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
153 inline void
154 4294x epoll_socket::register_op(
155 epoll_op& op,
156 epoll_op*& desc_slot,
157 bool& ready_flag,
158 bool& cancel_flag) noexcept
159 {
160 4294x svc_.work_started();
161
162 4294x std::lock_guard lock(desc_state_.mutex);
163 4294x bool io_done = false;
164 4294x if (ready_flag)
165 {
166 140x ready_flag = false;
167 140x op.perform_io();
168 140x io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
169 140x if (!io_done)
170 140x op.errn = 0;
171 }
172
173 4294x if (cancel_flag)
174 {
175 93x cancel_flag = false;
176 93x op.cancelled.store(true, std::memory_order_relaxed);
177 }
178
179 4294x if (io_done || op.cancelled.load(std::memory_order_acquire))
180 {
181 93x svc_.post(&op);
182 93x svc_.work_finished();
183 }
184 else
185 {
186 4201x desc_slot = &op;
187 }
188 4294x }
189
190 inline void
191 104x epoll_op::canceller::operator()() const noexcept
192 {
193 104x op->cancel();
194 104x }
195
196 inline void
197 epoll_connect_op::cancel() noexcept
198 {
199 if (socket_impl_)
200 socket_impl_->cancel_single_op(*this);
201 else
202 request_cancel();
203 }
204
205 inline void
206 98x epoll_read_op::cancel() noexcept
207 {
208 98x if (socket_impl_)
209 98x socket_impl_->cancel_single_op(*this);
210 else
211 request_cancel();
212 98x }
213
214 inline void
215 epoll_write_op::cancel() noexcept
216 {
217 if (socket_impl_)
218 socket_impl_->cancel_single_op(*this);
219 else
220 request_cancel();
221 }
222
223 inline void
224 51089x epoll_op::operator()()
225 {
226 51089x stop_cb.reset();
227
228 51089x socket_impl_->svc_.scheduler().reset_inline_budget();
229
230 51089x if (cancelled.load(std::memory_order_acquire))
231 203x *ec_out = capy::error::canceled;
232 50886x else if (errn != 0)
233 *ec_out = make_err(errn);
234 50886x else if (is_read_operation() && bytes_transferred == 0)
235 *ec_out = capy::error::eof;
236 else
237 50886x *ec_out = {};
238
239 51089x *bytes_out = bytes_transferred;
240
241 // Move to stack before resuming coroutine. The coroutine might close
242 // the socket, releasing the last wrapper ref. If impl_ptr were the
243 // last ref and we destroyed it while still in operator(), we'd have
244 // use-after-free. Moving to local ensures destruction happens at
245 // function exit, after all member accesses are complete.
246 51089x capy::executor_ref saved_ex(ex);
247 51089x std::coroutine_handle<> saved_h(h);
248 51089x auto prevent_premature_destruction = std::move(impl_ptr);
249 51089x dispatch_coro(saved_ex, saved_h).resume();
250 51089x }
251
252 inline void
253 4095x epoll_connect_op::operator()()
254 {
255 4095x stop_cb.reset();
256
257 4095x socket_impl_->svc_.scheduler().reset_inline_budget();
258
259 4095x bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
260
261 // Cache endpoints on successful connect
262 4095x if (success && socket_impl_)
263 {
264 4093x endpoint local_ep;
265 4093x sockaddr_storage local_storage{};
266 4093x socklen_t local_len = sizeof(local_storage);
267 4093x if (::getsockname(
268 4093x fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
269 0)
270 4093x local_ep = from_sockaddr(local_storage);
271 4093x static_cast<epoll_socket*>(socket_impl_)
272 4093x ->set_endpoints(local_ep, target_endpoint);
273 }
274
275 4095x if (cancelled.load(std::memory_order_acquire))
276 *ec_out = capy::error::canceled;
277 4095x else if (errn != 0)
278 2x *ec_out = make_err(errn);
279 else
280 4093x *ec_out = {};
281
282 // Move to stack before resuming. See epoll_op::operator()() for rationale.
283 4095x capy::executor_ref saved_ex(ex);
284 4095x std::coroutine_handle<> saved_h(h);
285 4095x auto prevent_premature_destruction = std::move(impl_ptr);
286 4095x dispatch_coro(saved_ex, saved_h).resume();
287 4095x }
288
289 12342x inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
290 12342x : svc_(svc)
291 {
292 12342x }
293
294 12342x inline epoll_socket::~epoll_socket() = default;
295
296 inline std::coroutine_handle<>
297 4095x epoll_socket::connect(
298 std::coroutine_handle<> h,
299 capy::executor_ref ex,
300 endpoint ep,
301 std::stop_token token,
302 std::error_code* ec)
303 {
304 4095x auto& op = conn_;
305
306 4095x sockaddr_storage storage{};
307 socklen_t addrlen =
308 4095x detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
309 4095x int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
310
311 4095x if (result == 0)
312 {
313 sockaddr_storage local_storage{};
314 socklen_t local_len = sizeof(local_storage);
315 if (::getsockname(
316 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
317 0)
318 local_endpoint_ = detail::from_sockaddr(local_storage);
319 remote_endpoint_ = ep;
320 }
321
322 4095x if (result == 0 || errno != EINPROGRESS)
323 {
324 int err = (result < 0) ? errno : 0;
325 if (svc_.scheduler().try_consume_inline_budget())
326 {
327 *ec = err ? make_err(err) : std::error_code{};
328 return dispatch_coro(ex, h);
329 }
330 op.reset();
331 op.h = h;
332 op.ex = ex;
333 op.ec_out = ec;
334 op.fd = fd_;
335 op.target_endpoint = ep;
336 op.start(token, this);
337 op.impl_ptr = shared_from_this();
338 op.complete(err, 0);
339 svc_.post(&op);
340 return std::noop_coroutine();
341 }
342
343 // EINPROGRESS — register with reactor
344 4095x svc_.scheduler().ensure_write_events(fd_, &desc_state_);
345
346 4095x op.reset();
347 4095x op.h = h;
348 4095x op.ex = ex;
349 4095x op.ec_out = ec;
350 4095x op.fd = fd_;
351 4095x op.target_endpoint = ep;
352 4095x op.start(token, this);
353 4095x op.impl_ptr = shared_from_this();
354
355 4095x register_op(
356 4095x op, desc_state_.connect_op, desc_state_.write_ready,
357 4095x desc_state_.connect_cancel_pending);
358 4095x return std::noop_coroutine();
359 }
360
361 inline std::coroutine_handle<>
362 127600x epoll_socket::read_some(
363 std::coroutine_handle<> h,
364 capy::executor_ref ex,
365 buffer_param param,
366 std::stop_token token,
367 std::error_code* ec,
368 std::size_t* bytes_out)
369 {
370 127600x auto& op = rd_;
371 127600x op.reset();
372
373 127600x capy::mutable_buffer bufs[epoll_read_op::max_buffers];
374 127600x op.iovec_count =
375 127600x static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
376
377 127600x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
378 {
379 1x op.empty_buffer_read = true;
380 1x op.h = h;
381 1x op.ex = ex;
382 1x op.ec_out = ec;
383 1x op.bytes_out = bytes_out;
384 1x op.start(token, this);
385 1x op.impl_ptr = shared_from_this();
386 1x op.complete(0, 0);
387 1x svc_.post(&op);
388 1x return std::noop_coroutine();
389 }
390
391 255198x for (int i = 0; i < op.iovec_count; ++i)
392 {
393 127599x op.iovecs[i].iov_base = bufs[i].data();
394 127599x op.iovecs[i].iov_len = bufs[i].size();
395 }
396
397 // Speculative read
398 ssize_t n;
399 do
400 {
401 127599x n = ::readv(fd_, op.iovecs, op.iovec_count);
402 }
403 127599x while (n < 0 && errno == EINTR);
404
405 127599x if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
406 {
407 127400x int err = (n < 0) ? errno : 0;
408 127400x auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
409
410 127400x if (svc_.scheduler().try_consume_inline_budget())
411 {
412 101965x if (err)
413 *ec = make_err(err);
414 101965x else if (n == 0)
415 5x *ec = capy::error::eof;
416 else
417 101960x *ec = {};
418 101965x *bytes_out = bytes;
419 101965x return dispatch_coro(ex, h);
420 }
421 25435x op.h = h;
422 25435x op.ex = ex;
423 25435x op.ec_out = ec;
424 25435x op.bytes_out = bytes_out;
425 25435x op.start(token, this);
426 25435x op.impl_ptr = shared_from_this();
427 25435x op.complete(err, bytes);
428 25435x svc_.post(&op);
429 25435x return std::noop_coroutine();
430 }
431
432 // EAGAIN — register with reactor
433 199x op.h = h;
434 199x op.ex = ex;
435 199x op.ec_out = ec;
436 199x op.bytes_out = bytes_out;
437 199x op.fd = fd_;
438 199x op.start(token, this);
439 199x op.impl_ptr = shared_from_this();
440
441 199x register_op(
442 199x op, desc_state_.read_op, desc_state_.read_ready,
443 199x desc_state_.read_cancel_pending);
444 199x return std::noop_coroutine();
445 }
446
447 inline std::coroutine_handle<>
448 127402x epoll_socket::write_some(
449 std::coroutine_handle<> h,
450 capy::executor_ref ex,
451 buffer_param param,
452 std::stop_token token,
453 std::error_code* ec,
454 std::size_t* bytes_out)
455 {
456 127402x auto& op = wr_;
457 127402x op.reset();
458
459 127402x capy::mutable_buffer bufs[epoll_write_op::max_buffers];
460 127402x op.iovec_count =
461 127402x static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
462
463 127402x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
464 {
465 1x op.h = h;
466 1x op.ex = ex;
467 1x op.ec_out = ec;
468 1x op.bytes_out = bytes_out;
469 1x op.start(token, this);
470 1x op.impl_ptr = shared_from_this();
471 1x op.complete(0, 0);
472 1x svc_.post(&op);
473 1x return std::noop_coroutine();
474 }
475
476 254802x for (int i = 0; i < op.iovec_count; ++i)
477 {
478 127401x op.iovecs[i].iov_base = bufs[i].data();
479 127401x op.iovecs[i].iov_len = bufs[i].size();
480 }
481
482 // Speculative write
483 127401x msghdr msg{};
484 127401x msg.msg_iov = op.iovecs;
485 127401x msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
486
487 ssize_t n;
488 do
489 {
490 127401x n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
491 }
492 127401x while (n < 0 && errno == EINTR);
493
494 127401x if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
495 {
496 127401x int err = (n < 0) ? errno : 0;
497 127401x auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
498
499 127401x if (svc_.scheduler().try_consume_inline_budget())
500 {
501 101948x *ec = err ? make_err(err) : std::error_code{};
502 101948x *bytes_out = bytes;
503 101948x return dispatch_coro(ex, h);
504 }
505 25453x op.h = h;
506 25453x op.ex = ex;
507 25453x op.ec_out = ec;
508 25453x op.bytes_out = bytes_out;
509 25453x op.start(token, this);
510 25453x op.impl_ptr = shared_from_this();
511 25453x op.complete(err, bytes);
512 25453x svc_.post(&op);
513 25453x return std::noop_coroutine();
514 }
515
516 // EAGAIN — register with reactor
517 svc_.scheduler().ensure_write_events(fd_, &desc_state_);
518
519 op.h = h;
520 op.ex = ex;
521 op.ec_out = ec;
522 op.bytes_out = bytes_out;
523 op.fd = fd_;
524 op.start(token, this);
525 op.impl_ptr = shared_from_this();
526
527 register_op(
528 op, desc_state_.write_op, desc_state_.write_ready,
529 desc_state_.write_cancel_pending);
530 return std::noop_coroutine();
531 }
532
533 inline std::error_code
534 3x epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
535 {
536 int how;
537 3x switch (what)
538 {
539 1x case tcp_socket::shutdown_receive:
540 1x how = SHUT_RD;
541 1x break;
542 1x case tcp_socket::shutdown_send:
543 1x how = SHUT_WR;
544 1x break;
545 1x case tcp_socket::shutdown_both:
546 1x how = SHUT_RDWR;
547 1x break;
548 default:
549 return make_err(EINVAL);
550 }
551 3x if (::shutdown(fd_, how) != 0)
552 return make_err(errno);
553 3x return {};
554 }
555
556 inline std::error_code
557 32x epoll_socket::set_option(
558 int level, int optname, void const* data, std::size_t size) noexcept
559 {
560 32x if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
561 0)
562 return make_err(errno);
563 32x return {};
564 }
565
566 inline std::error_code
567 31x epoll_socket::get_option(
568 int level, int optname, void* data, std::size_t* size) const noexcept
569 {
570 31x socklen_t len = static_cast<socklen_t>(*size);
571 31x if (::getsockopt(fd_, level, optname, data, &len) != 0)
572 return make_err(errno);
573 31x *size = static_cast<std::size_t>(len);
574 31x return {};
575 }
576
577 inline void
578 183x epoll_socket::cancel() noexcept
579 {
580 183x auto self = weak_from_this().lock();
581 183x if (!self)
582 return;
583
584 183x conn_.request_cancel();
585 183x rd_.request_cancel();
586 183x wr_.request_cancel();
587
588 183x epoll_op* conn_claimed = nullptr;
589 183x epoll_op* rd_claimed = nullptr;
590 183x epoll_op* wr_claimed = nullptr;
591 {
592 183x std::lock_guard lock(desc_state_.mutex);
593 183x if (desc_state_.connect_op == &conn_)
594 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
595 else
596 183x desc_state_.connect_cancel_pending = true;
597 183x if (desc_state_.read_op == &rd_)
598 3x rd_claimed = std::exchange(desc_state_.read_op, nullptr);
599 else
600 180x desc_state_.read_cancel_pending = true;
601 183x if (desc_state_.write_op == &wr_)
602 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
603 else
604 183x desc_state_.write_cancel_pending = true;
605 183x }
606
607 183x if (conn_claimed)
608 {
609 conn_.impl_ptr = self;
610 svc_.post(&conn_);
611 svc_.work_finished();
612 }
613 183x if (rd_claimed)
614 {
615 3x rd_.impl_ptr = self;
616 3x svc_.post(&rd_);
617 3x svc_.work_finished();
618 }
619 183x if (wr_claimed)
620 {
621 wr_.impl_ptr = self;
622 svc_.post(&wr_);
623 svc_.work_finished();
624 }
625 183x }
626
627 inline void
628 98x epoll_socket::cancel_single_op(epoll_op& op) noexcept
629 {
630 98x auto self = weak_from_this().lock();
631 98x if (!self)
632 return;
633
634 98x op.request_cancel();
635
636 98x epoll_op** desc_op_ptr = nullptr;
637 98x if (&op == &conn_)
638 desc_op_ptr = &desc_state_.connect_op;
639 98x else if (&op == &rd_)
640 98x desc_op_ptr = &desc_state_.read_op;
641 else if (&op == &wr_)
642 desc_op_ptr = &desc_state_.write_op;
643
644 98x if (desc_op_ptr)
645 {
646 98x epoll_op* claimed = nullptr;
647 {
648 98x std::lock_guard lock(desc_state_.mutex);
649 98x if (*desc_op_ptr == &op)
650 98x claimed = std::exchange(*desc_op_ptr, nullptr);
651 else if (&op == &conn_)
652 desc_state_.connect_cancel_pending = true;
653 else if (&op == &rd_)
654 desc_state_.read_cancel_pending = true;
655 else if (&op == &wr_)
656 desc_state_.write_cancel_pending = true;
657 98x }
658 98x if (claimed)
659 {
660 98x op.impl_ptr = self;
661 98x svc_.post(&op);
662 98x svc_.work_finished();
663 }
664 }
665 98x }
666
667 inline void
668 36997x epoll_socket::close_socket() noexcept
669 {
670 36997x auto self = weak_from_this().lock();
671 36997x if (self)
672 {
673 36997x conn_.request_cancel();
674 36997x rd_.request_cancel();
675 36997x wr_.request_cancel();
676
677 36997x epoll_op* conn_claimed = nullptr;
678 36997x epoll_op* rd_claimed = nullptr;
679 36997x epoll_op* wr_claimed = nullptr;
680 {
681 36997x std::lock_guard lock(desc_state_.mutex);
682 36997x conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
683 36997x rd_claimed = std::exchange(desc_state_.read_op, nullptr);
684 36997x wr_claimed = std::exchange(desc_state_.write_op, nullptr);
685 36997x desc_state_.read_ready = false;
686 36997x desc_state_.write_ready = false;
687 36997x desc_state_.read_cancel_pending = false;
688 36997x desc_state_.write_cancel_pending = false;
689 36997x desc_state_.connect_cancel_pending = false;
690 36997x }
691
692 36997x if (conn_claimed)
693 {
694 conn_.impl_ptr = self;
695 svc_.post(&conn_);
696 svc_.work_finished();
697 }
698 36997x if (rd_claimed)
699 {
700 1x rd_.impl_ptr = self;
701 1x svc_.post(&rd_);
702 1x svc_.work_finished();
703 }
704 36997x if (wr_claimed)
705 {
706 wr_.impl_ptr = self;
707 svc_.post(&wr_);
708 svc_.work_finished();
709 }
710
711 36997x if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
712 82x desc_state_.impl_ref_ = self;
713 }
714
715 36997x if (fd_ >= 0)
716 {
717 8203x if (desc_state_.registered_events != 0)
718 8203x svc_.scheduler().deregister_descriptor(fd_);
719 8203x ::close(fd_);
720 8203x fd_ = -1;
721 }
722
723 36997x desc_state_.fd = -1;
724 36997x desc_state_.registered_events = 0;
725
726 36997x local_endpoint_ = endpoint{};
727 36997x remote_endpoint_ = endpoint{};
728 36997x }
729
730 244x inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
731 244x : state_(
732 std::make_unique<epoll_socket_state>(
733 244x ctx.use_service<epoll_scheduler>()))
734 {
735 244x }
736
737 488x inline epoll_socket_service::~epoll_socket_service() {}
738
739 inline void
740 244x epoll_socket_service::shutdown()
741 {
742 244x std::lock_guard lock(state_->mutex_);
743
744 244x while (auto* impl = state_->socket_list_.pop_front())
745 impl->close_socket();
746
747 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
748 // drains completed_ops_, calling destroy() on each queued op. If we
749 // released our shared_ptrs now, an epoll_op::destroy() could free the
750 // last ref to an impl whose embedded descriptor_state is still linked
751 // in the queue — use-after-free on the next pop(). Letting ~state_
752 // release the ptrs (during service destruction, after scheduler
753 // shutdown) keeps every impl alive until all ops have been drained.
754 244x }
755
756 inline io_object::implementation*
757 12342x epoll_socket_service::construct()
758 {
759 12342x auto impl = std::make_shared<epoll_socket>(*this);
760 12342x auto* raw = impl.get();
761
762 {
763 12342x std::lock_guard lock(state_->mutex_);
764 12342x state_->socket_list_.push_back(raw);
765 12342x state_->socket_ptrs_.emplace(raw, std::move(impl));
766 12342x }
767
768 12342x return raw;
769 12342x }
770
771 inline void
772 12342x epoll_socket_service::destroy(io_object::implementation* impl)
773 {
774 12342x auto* epoll_impl = static_cast<epoll_socket*>(impl);
775 12342x epoll_impl->close_socket();
776 12342x std::lock_guard lock(state_->mutex_);
777 12342x state_->socket_list_.remove(epoll_impl);
778 12342x state_->socket_ptrs_.erase(epoll_impl);
779 12342x }
780
781 inline std::error_code
782 4110x epoll_socket_service::open_socket(
783 tcp_socket::implementation& impl, int family, int type, int protocol)
784 {
785 4110x auto* epoll_impl = static_cast<epoll_socket*>(&impl);
786 4110x epoll_impl->close_socket();
787
788 4110x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
789 4110x if (fd < 0)
790 return make_err(errno);
791
792 4110x if (family == AF_INET6)
793 {
794 5x int one = 1;
795 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
796 }
797
798 4110x epoll_impl->fd_ = fd;
799
800 // Register fd with epoll (edge-triggered mode)
801 4110x epoll_impl->desc_state_.fd = fd;
802 {
803 4110x std::lock_guard lock(epoll_impl->desc_state_.mutex);
804 4110x epoll_impl->desc_state_.read_op = nullptr;
805 4110x epoll_impl->desc_state_.write_op = nullptr;
806 4110x epoll_impl->desc_state_.connect_op = nullptr;
807 4110x }
808 4110x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
809
810 4110x return {};
811 }
812
813 inline void
814 20545x epoll_socket_service::close(io_object::handle& h)
815 {
816 20545x static_cast<epoll_socket*>(h.get())->close_socket();
817 20545x }
818
819 inline void
820 51085x epoll_socket_service::post(epoll_op* op)
821 {
822 51085x state_->sched_.post(op);
823 51085x }
824
825 inline void
826 4294x epoll_socket_service::work_started() noexcept
827 {
828 4294x state_->sched_.work_started();
829 4294x }
830
831 inline void
832 195x epoll_socket_service::work_finished() noexcept
833 {
834 195x state_->sched_.work_finished();
835 195x }
836
837 } // namespace boost::corosio::detail
838
839 #endif // BOOST_COROSIO_HAS_EPOLL
840
841 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
842