TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23 :
24 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 : #include <boost/corosio/native/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/except.hpp>
28 : #include <boost/capy/buffers.hpp>
29 :
30 : #include <coroutine>
31 : #include <mutex>
32 : #include <unordered_map>
33 : #include <utility>
34 :
35 : #include <errno.h>
36 : #include <netinet/in.h>
37 : #include <netinet/tcp.h>
38 : #include <sys/epoll.h>
39 : #include <sys/socket.h>
40 : #include <unistd.h>
41 :
42 : /*
43 : epoll Socket Implementation
44 : ===========================
45 :
46 : Each I/O operation follows the same pattern:
47 : 1. Try the syscall immediately (non-blocking socket)
48 : 2. If it succeeds or fails with a real error, post to completion queue
49 : 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50 :
51 : This "try first" approach avoids unnecessary epoll round-trips for
52 : operations that can complete immediately (common for small reads/writes
53 : on fast local connections).
54 :
55 : One-Shot Registration
56 : ---------------------
57 : We use one-shot epoll registration: each operation registers, waits for
58 : one event, then unregisters. This simplifies the state machine since we
59 : don't need to track whether an fd is currently registered or handle
60 : re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 : simplicity is worth it.
62 :
63 : Cancellation
64 : ------------
65 : See op.hpp for the completion/cancellation race handling via the
66 : `registered` atomic. cancel() must complete pending operations (post
67 : them with cancelled flag) so coroutines waiting on them can resume.
68 : close_socket() calls cancel() first to ensure this.
69 :
70 : Impl Lifetime with shared_ptr
71 : -----------------------------
72 : Socket impls use enable_shared_from_this. The service owns impls via
73 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 : removal. When a user calls close(), we call cancel() which posts pending
75 : ops to the scheduler.
76 :
77 : CRITICAL: The posted ops must keep the impl alive until they complete.
78 : Otherwise the scheduler would process a freed op (use-after-free). The
79 : cancel() method captures shared_from_this() into op.impl_ptr before
80 : posting. When the op completes, impl_ptr is cleared, allowing the impl
81 : to be destroyed if no other references exist.
82 :
83 : Service Ownership
84 : -----------------
85 : epoll_socket_service owns all socket impls. destroy_impl() removes the
86 : shared_ptr from the map, but the impl may survive if ops still hold
87 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 : in-flight ops will complete and release their refs.
89 : */
90 :
91 : namespace boost::corosio::detail {
92 :
93 : /** State for epoll socket service. */
94 : class epoll_socket_state
95 : {
96 : public:
97 HIT 244 : explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 : {
99 244 : }
100 :
101 : epoll_scheduler& sched_;
102 : std::mutex mutex_;
103 : intrusive_list<epoll_socket> socket_list_;
104 : std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 : socket_ptrs_;
106 : };
107 :
108 : /** epoll socket service implementation.
109 :
110 : Inherits from socket_service to enable runtime polymorphism.
111 : Uses key_type = socket_service for service lookup.
112 : */
113 : class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 : {
115 : public:
116 : explicit epoll_socket_service(capy::execution_context& ctx);
117 : ~epoll_socket_service() override;
118 :
119 : epoll_socket_service(epoll_socket_service const&) = delete;
120 : epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121 :
122 : void shutdown() override;
123 :
124 : io_object::implementation* construct() override;
125 : void destroy(io_object::implementation*) override;
126 : void close(io_object::handle&) override;
127 : std::error_code open_socket(
128 : tcp_socket::implementation& impl,
129 : int family,
130 : int type,
131 : int protocol) override;
132 :
133 330486 : epoll_scheduler& scheduler() const noexcept
134 : {
135 330486 : return state_->sched_;
136 : }
137 : void post(epoll_op* op);
138 : void work_started() noexcept;
139 : void work_finished() noexcept;
140 :
141 : private:
142 : std::unique_ptr<epoll_socket_state> state_;
143 : };
144 :
145 : //--------------------------------------------------------------------------
146 : //
147 : // Implementation
148 : //
149 : //--------------------------------------------------------------------------
150 :
151 : // Register an op with the reactor, handling cached edge events.
152 : // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
153 : inline void
154 4294 : epoll_socket::register_op(
155 : epoll_op& op,
156 : epoll_op*& desc_slot,
157 : bool& ready_flag,
158 : bool& cancel_flag) noexcept
159 : {
160 4294 : svc_.work_started();
161 :
162 4294 : std::lock_guard lock(desc_state_.mutex);
163 4294 : bool io_done = false;
164 4294 : if (ready_flag)
165 : {
166 140 : ready_flag = false;
167 140 : op.perform_io();
168 140 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
169 140 : if (!io_done)
170 140 : op.errn = 0;
171 : }
172 :
173 4294 : if (cancel_flag)
174 : {
175 93 : cancel_flag = false;
176 93 : op.cancelled.store(true, std::memory_order_relaxed);
177 : }
178 :
179 4294 : if (io_done || op.cancelled.load(std::memory_order_acquire))
180 : {
181 93 : svc_.post(&op);
182 93 : svc_.work_finished();
183 : }
184 : else
185 : {
186 4201 : desc_slot = &op;
187 : }
188 4294 : }
189 :
190 : inline void
191 104 : epoll_op::canceller::operator()() const noexcept
192 : {
193 104 : op->cancel();
194 104 : }
195 :
196 : inline void
197 MIS 0 : epoll_connect_op::cancel() noexcept
198 : {
199 0 : if (socket_impl_)
200 0 : socket_impl_->cancel_single_op(*this);
201 : else
202 0 : request_cancel();
203 0 : }
204 :
205 : inline void
206 HIT 98 : epoll_read_op::cancel() noexcept
207 : {
208 98 : if (socket_impl_)
209 98 : socket_impl_->cancel_single_op(*this);
210 : else
211 MIS 0 : request_cancel();
212 HIT 98 : }
213 :
214 : inline void
215 MIS 0 : epoll_write_op::cancel() noexcept
216 : {
217 0 : if (socket_impl_)
218 0 : socket_impl_->cancel_single_op(*this);
219 : else
220 0 : request_cancel();
221 0 : }
222 :
223 : inline void
224 HIT 51089 : epoll_op::operator()()
225 : {
226 51089 : stop_cb.reset();
227 :
228 51089 : socket_impl_->svc_.scheduler().reset_inline_budget();
229 :
230 51089 : if (cancelled.load(std::memory_order_acquire))
231 203 : *ec_out = capy::error::canceled;
232 50886 : else if (errn != 0)
233 MIS 0 : *ec_out = make_err(errn);
234 HIT 50886 : else if (is_read_operation() && bytes_transferred == 0)
235 MIS 0 : *ec_out = capy::error::eof;
236 : else
237 HIT 50886 : *ec_out = {};
238 :
239 51089 : *bytes_out = bytes_transferred;
240 :
241 : // Move to stack before resuming coroutine. The coroutine might close
242 : // the socket, releasing the last wrapper ref. If impl_ptr were the
243 : // last ref and we destroyed it while still in operator(), we'd have
244 : // use-after-free. Moving to local ensures destruction happens at
245 : // function exit, after all member accesses are complete.
246 51089 : capy::executor_ref saved_ex(ex);
247 51089 : std::coroutine_handle<> saved_h(h);
248 51089 : auto prevent_premature_destruction = std::move(impl_ptr);
249 51089 : dispatch_coro(saved_ex, saved_h).resume();
250 51089 : }
251 :
252 : inline void
253 4095 : epoll_connect_op::operator()()
254 : {
255 4095 : stop_cb.reset();
256 :
257 4095 : socket_impl_->svc_.scheduler().reset_inline_budget();
258 :
259 4095 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
260 :
261 : // Cache endpoints on successful connect
262 4095 : if (success && socket_impl_)
263 : {
264 4093 : endpoint local_ep;
265 4093 : sockaddr_storage local_storage{};
266 4093 : socklen_t local_len = sizeof(local_storage);
267 4093 : if (::getsockname(
268 4093 : fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
269 : 0)
270 4093 : local_ep = from_sockaddr(local_storage);
271 4093 : static_cast<epoll_socket*>(socket_impl_)
272 4093 : ->set_endpoints(local_ep, target_endpoint);
273 : }
274 :
275 4095 : if (cancelled.load(std::memory_order_acquire))
276 MIS 0 : *ec_out = capy::error::canceled;
277 HIT 4095 : else if (errn != 0)
278 2 : *ec_out = make_err(errn);
279 : else
280 4093 : *ec_out = {};
281 :
282 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
283 4095 : capy::executor_ref saved_ex(ex);
284 4095 : std::coroutine_handle<> saved_h(h);
285 4095 : auto prevent_premature_destruction = std::move(impl_ptr);
286 4095 : dispatch_coro(saved_ex, saved_h).resume();
287 4095 : }
288 :
289 12342 : inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
290 12342 : : svc_(svc)
291 : {
292 12342 : }
293 :
294 12342 : inline epoll_socket::~epoll_socket() = default;
295 :
296 : inline std::coroutine_handle<>
297 4095 : epoll_socket::connect(
298 : std::coroutine_handle<> h,
299 : capy::executor_ref ex,
300 : endpoint ep,
301 : std::stop_token token,
302 : std::error_code* ec)
303 : {
304 4095 : auto& op = conn_;
305 :
306 4095 : sockaddr_storage storage{};
307 : socklen_t addrlen =
308 4095 : detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
309 4095 : int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
310 :
311 4095 : if (result == 0)
312 : {
313 MIS 0 : sockaddr_storage local_storage{};
314 0 : socklen_t local_len = sizeof(local_storage);
315 0 : if (::getsockname(
316 0 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
317 : 0)
318 0 : local_endpoint_ = detail::from_sockaddr(local_storage);
319 0 : remote_endpoint_ = ep;
320 : }
321 :
322 HIT 4095 : if (result == 0 || errno != EINPROGRESS)
323 : {
324 MIS 0 : int err = (result < 0) ? errno : 0;
325 0 : if (svc_.scheduler().try_consume_inline_budget())
326 : {
327 0 : *ec = err ? make_err(err) : std::error_code{};
328 0 : return dispatch_coro(ex, h);
329 : }
330 0 : op.reset();
331 0 : op.h = h;
332 0 : op.ex = ex;
333 0 : op.ec_out = ec;
334 0 : op.fd = fd_;
335 0 : op.target_endpoint = ep;
336 0 : op.start(token, this);
337 0 : op.impl_ptr = shared_from_this();
338 0 : op.complete(err, 0);
339 0 : svc_.post(&op);
340 0 : return std::noop_coroutine();
341 : }
342 :
343 : // EINPROGRESS — register with reactor
344 HIT 4095 : svc_.scheduler().ensure_write_events(fd_, &desc_state_);
345 :
346 4095 : op.reset();
347 4095 : op.h = h;
348 4095 : op.ex = ex;
349 4095 : op.ec_out = ec;
350 4095 : op.fd = fd_;
351 4095 : op.target_endpoint = ep;
352 4095 : op.start(token, this);
353 4095 : op.impl_ptr = shared_from_this();
354 :
355 4095 : register_op(
356 4095 : op, desc_state_.connect_op, desc_state_.write_ready,
357 4095 : desc_state_.connect_cancel_pending);
358 4095 : return std::noop_coroutine();
359 : }
360 :
361 : inline std::coroutine_handle<>
362 127600 : epoll_socket::read_some(
363 : std::coroutine_handle<> h,
364 : capy::executor_ref ex,
365 : buffer_param param,
366 : std::stop_token token,
367 : std::error_code* ec,
368 : std::size_t* bytes_out)
369 : {
370 127600 : auto& op = rd_;
371 127600 : op.reset();
372 :
373 127600 : capy::mutable_buffer bufs[epoll_read_op::max_buffers];
374 127600 : op.iovec_count =
375 127600 : static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
376 :
377 127600 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
378 : {
379 1 : op.empty_buffer_read = true;
380 1 : op.h = h;
381 1 : op.ex = ex;
382 1 : op.ec_out = ec;
383 1 : op.bytes_out = bytes_out;
384 1 : op.start(token, this);
385 1 : op.impl_ptr = shared_from_this();
386 1 : op.complete(0, 0);
387 1 : svc_.post(&op);
388 1 : return std::noop_coroutine();
389 : }
390 :
391 255198 : for (int i = 0; i < op.iovec_count; ++i)
392 : {
393 127599 : op.iovecs[i].iov_base = bufs[i].data();
394 127599 : op.iovecs[i].iov_len = bufs[i].size();
395 : }
396 :
397 : // Speculative read
398 : ssize_t n;
399 : do
400 : {
401 127599 : n = ::readv(fd_, op.iovecs, op.iovec_count);
402 : }
403 127599 : while (n < 0 && errno == EINTR);
404 :
405 127599 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
406 : {
407 127400 : int err = (n < 0) ? errno : 0;
408 127400 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
409 :
410 127400 : if (svc_.scheduler().try_consume_inline_budget())
411 : {
412 101965 : if (err)
413 MIS 0 : *ec = make_err(err);
414 HIT 101965 : else if (n == 0)
415 5 : *ec = capy::error::eof;
416 : else
417 101960 : *ec = {};
418 101965 : *bytes_out = bytes;
419 101965 : return dispatch_coro(ex, h);
420 : }
421 25435 : op.h = h;
422 25435 : op.ex = ex;
423 25435 : op.ec_out = ec;
424 25435 : op.bytes_out = bytes_out;
425 25435 : op.start(token, this);
426 25435 : op.impl_ptr = shared_from_this();
427 25435 : op.complete(err, bytes);
428 25435 : svc_.post(&op);
429 25435 : return std::noop_coroutine();
430 : }
431 :
432 : // EAGAIN — register with reactor
433 199 : op.h = h;
434 199 : op.ex = ex;
435 199 : op.ec_out = ec;
436 199 : op.bytes_out = bytes_out;
437 199 : op.fd = fd_;
438 199 : op.start(token, this);
439 199 : op.impl_ptr = shared_from_this();
440 :
441 199 : register_op(
442 199 : op, desc_state_.read_op, desc_state_.read_ready,
443 199 : desc_state_.read_cancel_pending);
444 199 : return std::noop_coroutine();
445 : }
446 :
447 : inline std::coroutine_handle<>
448 127402 : epoll_socket::write_some(
449 : std::coroutine_handle<> h,
450 : capy::executor_ref ex,
451 : buffer_param param,
452 : std::stop_token token,
453 : std::error_code* ec,
454 : std::size_t* bytes_out)
455 : {
456 127402 : auto& op = wr_;
457 127402 : op.reset();
458 :
459 127402 : capy::mutable_buffer bufs[epoll_write_op::max_buffers];
460 127402 : op.iovec_count =
461 127402 : static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
462 :
463 127402 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
464 : {
465 1 : op.h = h;
466 1 : op.ex = ex;
467 1 : op.ec_out = ec;
468 1 : op.bytes_out = bytes_out;
469 1 : op.start(token, this);
470 1 : op.impl_ptr = shared_from_this();
471 1 : op.complete(0, 0);
472 1 : svc_.post(&op);
473 1 : return std::noop_coroutine();
474 : }
475 :
476 254802 : for (int i = 0; i < op.iovec_count; ++i)
477 : {
478 127401 : op.iovecs[i].iov_base = bufs[i].data();
479 127401 : op.iovecs[i].iov_len = bufs[i].size();
480 : }
481 :
482 : // Speculative write
483 127401 : msghdr msg{};
484 127401 : msg.msg_iov = op.iovecs;
485 127401 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
486 :
487 : ssize_t n;
488 : do
489 : {
490 127401 : n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
491 : }
492 127401 : while (n < 0 && errno == EINTR);
493 :
494 127401 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
495 : {
496 127401 : int err = (n < 0) ? errno : 0;
497 127401 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
498 :
499 127401 : if (svc_.scheduler().try_consume_inline_budget())
500 : {
501 101948 : *ec = err ? make_err(err) : std::error_code{};
502 101948 : *bytes_out = bytes;
503 101948 : return dispatch_coro(ex, h);
504 : }
505 25453 : op.h = h;
506 25453 : op.ex = ex;
507 25453 : op.ec_out = ec;
508 25453 : op.bytes_out = bytes_out;
509 25453 : op.start(token, this);
510 25453 : op.impl_ptr = shared_from_this();
511 25453 : op.complete(err, bytes);
512 25453 : svc_.post(&op);
513 25453 : return std::noop_coroutine();
514 : }
515 :
516 : // EAGAIN — register with reactor
517 MIS 0 : svc_.scheduler().ensure_write_events(fd_, &desc_state_);
518 :
519 0 : op.h = h;
520 0 : op.ex = ex;
521 0 : op.ec_out = ec;
522 0 : op.bytes_out = bytes_out;
523 0 : op.fd = fd_;
524 0 : op.start(token, this);
525 0 : op.impl_ptr = shared_from_this();
526 :
527 0 : register_op(
528 0 : op, desc_state_.write_op, desc_state_.write_ready,
529 0 : desc_state_.write_cancel_pending);
530 0 : return std::noop_coroutine();
531 : }
532 :
533 : inline std::error_code
534 HIT 3 : epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
535 : {
536 : int how;
537 3 : switch (what)
538 : {
539 1 : case tcp_socket::shutdown_receive:
540 1 : how = SHUT_RD;
541 1 : break;
542 1 : case tcp_socket::shutdown_send:
543 1 : how = SHUT_WR;
544 1 : break;
545 1 : case tcp_socket::shutdown_both:
546 1 : how = SHUT_RDWR;
547 1 : break;
548 MIS 0 : default:
549 0 : return make_err(EINVAL);
550 : }
551 HIT 3 : if (::shutdown(fd_, how) != 0)
552 MIS 0 : return make_err(errno);
553 HIT 3 : return {};
554 : }
555 :
556 : inline std::error_code
557 32 : epoll_socket::set_option(
558 : int level, int optname, void const* data, std::size_t size) noexcept
559 : {
560 32 : if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
561 : 0)
562 MIS 0 : return make_err(errno);
563 HIT 32 : return {};
564 : }
565 :
566 : inline std::error_code
567 31 : epoll_socket::get_option(
568 : int level, int optname, void* data, std::size_t* size) const noexcept
569 : {
570 31 : socklen_t len = static_cast<socklen_t>(*size);
571 31 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
572 MIS 0 : return make_err(errno);
573 HIT 31 : *size = static_cast<std::size_t>(len);
574 31 : return {};
575 : }
576 :
577 : inline void
578 183 : epoll_socket::cancel() noexcept
579 : {
580 183 : auto self = weak_from_this().lock();
581 183 : if (!self)
582 MIS 0 : return;
583 :
584 HIT 183 : conn_.request_cancel();
585 183 : rd_.request_cancel();
586 183 : wr_.request_cancel();
587 :
588 183 : epoll_op* conn_claimed = nullptr;
589 183 : epoll_op* rd_claimed = nullptr;
590 183 : epoll_op* wr_claimed = nullptr;
591 : {
592 183 : std::lock_guard lock(desc_state_.mutex);
593 183 : if (desc_state_.connect_op == &conn_)
594 MIS 0 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
595 : else
596 HIT 183 : desc_state_.connect_cancel_pending = true;
597 183 : if (desc_state_.read_op == &rd_)
598 3 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
599 : else
600 180 : desc_state_.read_cancel_pending = true;
601 183 : if (desc_state_.write_op == &wr_)
602 MIS 0 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
603 : else
604 HIT 183 : desc_state_.write_cancel_pending = true;
605 183 : }
606 :
607 183 : if (conn_claimed)
608 : {
609 MIS 0 : conn_.impl_ptr = self;
610 0 : svc_.post(&conn_);
611 0 : svc_.work_finished();
612 : }
613 HIT 183 : if (rd_claimed)
614 : {
615 3 : rd_.impl_ptr = self;
616 3 : svc_.post(&rd_);
617 3 : svc_.work_finished();
618 : }
619 183 : if (wr_claimed)
620 : {
621 MIS 0 : wr_.impl_ptr = self;
622 0 : svc_.post(&wr_);
623 0 : svc_.work_finished();
624 : }
625 HIT 183 : }
626 :
627 : inline void
628 98 : epoll_socket::cancel_single_op(epoll_op& op) noexcept
629 : {
630 98 : auto self = weak_from_this().lock();
631 98 : if (!self)
632 MIS 0 : return;
633 :
634 HIT 98 : op.request_cancel();
635 :
636 98 : epoll_op** desc_op_ptr = nullptr;
637 98 : if (&op == &conn_)
638 MIS 0 : desc_op_ptr = &desc_state_.connect_op;
639 HIT 98 : else if (&op == &rd_)
640 98 : desc_op_ptr = &desc_state_.read_op;
641 MIS 0 : else if (&op == &wr_)
642 0 : desc_op_ptr = &desc_state_.write_op;
643 :
644 HIT 98 : if (desc_op_ptr)
645 : {
646 98 : epoll_op* claimed = nullptr;
647 : {
648 98 : std::lock_guard lock(desc_state_.mutex);
649 98 : if (*desc_op_ptr == &op)
650 98 : claimed = std::exchange(*desc_op_ptr, nullptr);
651 MIS 0 : else if (&op == &conn_)
652 0 : desc_state_.connect_cancel_pending = true;
653 0 : else if (&op == &rd_)
654 0 : desc_state_.read_cancel_pending = true;
655 0 : else if (&op == &wr_)
656 0 : desc_state_.write_cancel_pending = true;
657 HIT 98 : }
658 98 : if (claimed)
659 : {
660 98 : op.impl_ptr = self;
661 98 : svc_.post(&op);
662 98 : svc_.work_finished();
663 : }
664 : }
665 98 : }
666 :
667 : inline void
668 36997 : epoll_socket::close_socket() noexcept
669 : {
670 36997 : auto self = weak_from_this().lock();
671 36997 : if (self)
672 : {
673 36997 : conn_.request_cancel();
674 36997 : rd_.request_cancel();
675 36997 : wr_.request_cancel();
676 :
677 36997 : epoll_op* conn_claimed = nullptr;
678 36997 : epoll_op* rd_claimed = nullptr;
679 36997 : epoll_op* wr_claimed = nullptr;
680 : {
681 36997 : std::lock_guard lock(desc_state_.mutex);
682 36997 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
683 36997 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
684 36997 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
685 36997 : desc_state_.read_ready = false;
686 36997 : desc_state_.write_ready = false;
687 36997 : desc_state_.read_cancel_pending = false;
688 36997 : desc_state_.write_cancel_pending = false;
689 36997 : desc_state_.connect_cancel_pending = false;
690 36997 : }
691 :
692 36997 : if (conn_claimed)
693 : {
694 MIS 0 : conn_.impl_ptr = self;
695 0 : svc_.post(&conn_);
696 0 : svc_.work_finished();
697 : }
698 HIT 36997 : if (rd_claimed)
699 : {
700 1 : rd_.impl_ptr = self;
701 1 : svc_.post(&rd_);
702 1 : svc_.work_finished();
703 : }
704 36997 : if (wr_claimed)
705 : {
706 MIS 0 : wr_.impl_ptr = self;
707 0 : svc_.post(&wr_);
708 0 : svc_.work_finished();
709 : }
710 :
711 HIT 36997 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
712 82 : desc_state_.impl_ref_ = self;
713 : }
714 :
715 36997 : if (fd_ >= 0)
716 : {
717 8203 : if (desc_state_.registered_events != 0)
718 8203 : svc_.scheduler().deregister_descriptor(fd_);
719 8203 : ::close(fd_);
720 8203 : fd_ = -1;
721 : }
722 :
723 36997 : desc_state_.fd = -1;
724 36997 : desc_state_.registered_events = 0;
725 :
726 36997 : local_endpoint_ = endpoint{};
727 36997 : remote_endpoint_ = endpoint{};
728 36997 : }
729 :
730 244 : inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
731 244 : : state_(
732 : std::make_unique<epoll_socket_state>(
733 244 : ctx.use_service<epoll_scheduler>()))
734 : {
735 244 : }
736 :
737 488 : inline epoll_socket_service::~epoll_socket_service() {}
738 :
739 : inline void
740 244 : epoll_socket_service::shutdown()
741 : {
742 244 : std::lock_guard lock(state_->mutex_);
743 :
744 244 : while (auto* impl = state_->socket_list_.pop_front())
745 MIS 0 : impl->close_socket();
746 :
747 : // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
748 : // drains completed_ops_, calling destroy() on each queued op. If we
749 : // released our shared_ptrs now, an epoll_op::destroy() could free the
750 : // last ref to an impl whose embedded descriptor_state is still linked
751 : // in the queue — use-after-free on the next pop(). Letting ~state_
752 : // release the ptrs (during service destruction, after scheduler
753 : // shutdown) keeps every impl alive until all ops have been drained.
754 HIT 244 : }
755 :
756 : inline io_object::implementation*
757 12342 : epoll_socket_service::construct()
758 : {
759 12342 : auto impl = std::make_shared<epoll_socket>(*this);
760 12342 : auto* raw = impl.get();
761 :
762 : {
763 12342 : std::lock_guard lock(state_->mutex_);
764 12342 : state_->socket_list_.push_back(raw);
765 12342 : state_->socket_ptrs_.emplace(raw, std::move(impl));
766 12342 : }
767 :
768 12342 : return raw;
769 12342 : }
770 :
771 : inline void
772 12342 : epoll_socket_service::destroy(io_object::implementation* impl)
773 : {
774 12342 : auto* epoll_impl = static_cast<epoll_socket*>(impl);
775 12342 : epoll_impl->close_socket();
776 12342 : std::lock_guard lock(state_->mutex_);
777 12342 : state_->socket_list_.remove(epoll_impl);
778 12342 : state_->socket_ptrs_.erase(epoll_impl);
779 12342 : }
780 :
781 : inline std::error_code
782 4110 : epoll_socket_service::open_socket(
783 : tcp_socket::implementation& impl, int family, int type, int protocol)
784 : {
785 4110 : auto* epoll_impl = static_cast<epoll_socket*>(&impl);
786 4110 : epoll_impl->close_socket();
787 :
788 4110 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
789 4110 : if (fd < 0)
790 MIS 0 : return make_err(errno);
791 :
792 HIT 4110 : if (family == AF_INET6)
793 : {
794 5 : int one = 1;
795 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
796 : }
797 :
798 4110 : epoll_impl->fd_ = fd;
799 :
800 : // Register fd with epoll (edge-triggered mode)
801 4110 : epoll_impl->desc_state_.fd = fd;
802 : {
803 4110 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
804 4110 : epoll_impl->desc_state_.read_op = nullptr;
805 4110 : epoll_impl->desc_state_.write_op = nullptr;
806 4110 : epoll_impl->desc_state_.connect_op = nullptr;
807 4110 : }
808 4110 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
809 :
810 4110 : return {};
811 : }
812 :
813 : inline void
814 20545 : epoll_socket_service::close(io_object::handle& h)
815 : {
816 20545 : static_cast<epoll_socket*>(h.get())->close_socket();
817 20545 : }
818 :
819 : inline void
820 51085 : epoll_socket_service::post(epoll_op* op)
821 : {
822 51085 : state_->sched_.post(op);
823 51085 : }
824 :
825 : inline void
826 4294 : epoll_socket_service::work_started() noexcept
827 : {
828 4294 : state_->sched_.work_started();
829 4294 : }
830 :
831 : inline void
832 195 : epoll_socket_service::work_finished() noexcept
833 : {
834 195 : state_->sched_.work_finished();
835 195 : }
836 :
837 : } // namespace boost::corosio::detail
838 :
839 : #endif // BOOST_COROSIO_HAS_EPOLL
840 :
841 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
|