LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 82.1 % 106 87 19
Test Date: 2026-03-11 17:45:15 Functions: 85.0 % 20 17 3

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/corosio/io/io_object.hpp>
      19                 : #include <boost/corosio/endpoint.hpp>
      20                 : #include <boost/capy/ex/executor_ref.hpp>
      21                 : #include <coroutine>
      22                 : #include <boost/capy/error.hpp>
      23                 : #include <system_error>
      24                 : 
      25                 : #include <boost/corosio/native/detail/make_err.hpp>
      26                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      27                 : #include <boost/corosio/detail/scheduler_op.hpp>
      28                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      29                 : 
      30                 : #include <unistd.h>
      31                 : #include <errno.h>
      32                 : 
      33                 : #include <atomic>
      34                 : #include <cstddef>
      35                 : #include <memory>
      36                 : #include <mutex>
      37                 : #include <optional>
      38                 : #include <stop_token>
      39                 : 
      40                 : #include <netinet/in.h>
      41                 : #include <poll.h>
      42                 : #include <sys/socket.h>
      43                 : #include <sys/uio.h>
      44                 : 
      45                 : /*
      46                 :     epoll Operation State
      47                 :     =====================
      48                 : 
      49                 :     Each async I/O operation has a corresponding epoll_op-derived struct that
      50                 :     holds the operation's state while it's in flight. The socket impl owns
      51                 :     fixed slots for each operation type (conn_, rd_, wr_), so only one
      52                 :     operation of each type can be pending per socket at a time.
      53                 : 
      54                 :     Persistent Registration
      55                 :     -----------------------
      56                 :     File descriptors are registered with epoll once (via descriptor_state) and
      57                 :     stay registered until closed. The descriptor_state tracks which operations
      58                 :     are pending (read_op, write_op, connect_op). When an event arrives, the
      59                 :     reactor dispatches to the appropriate pending operation.
      60                 : 
      61                 :     Impl Lifetime Management
      62                 :     ------------------------
      63                 :     When cancel() posts an op to the scheduler's ready queue, the socket impl
      64                 :     might be destroyed before the scheduler processes the op. The `impl_ptr`
      65                 :     member holds a shared_ptr to the impl, keeping it alive until the op
      66                 :     completes. This is set by cancel() and cleared in operator() after the
      67                 :     coroutine is resumed.
      68                 : 
      69                 :     EOF Detection
      70                 :     -------------
      71                 :     For reads, 0 bytes with no error means EOF. But an empty user buffer also
      72                 :     returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
      73                 : 
      74                 :     SIGPIPE Prevention
      75                 :     ------------------
      76                 :     Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
      77                 :     SIGPIPE when the peer has closed.
      78                 : */
      79                 : 
      80                 : namespace boost::corosio::detail {
      81                 : 
      82                 : // Forward declarations
      83                 : class epoll_socket;
      84                 : class epoll_acceptor;
      85                 : struct epoll_op;
      86                 : 
      87                 : // Forward declaration
      88                 : class epoll_scheduler;
      89                 : 
      90                 : /** Per-descriptor state for persistent epoll registration.
      91                 : 
      92                 :     Tracks pending operations for a file descriptor. The fd is registered
      93                 :     once with epoll and stays registered until closed.
      94                 : 
      95                 :     This struct extends scheduler_op to support deferred I/O processing.
      96                 :     When epoll events arrive, the reactor sets ready_events and queues
      97                 :     this descriptor for processing. When popped from the scheduler queue,
      98                 :     operator() performs the actual I/O and queues completion handlers.
      99                 : 
     100                 :     @par Deferred I/O Model
     101                 :     The reactor no longer performs I/O directly. Instead:
     102                 :     1. Reactor sets ready_events and queues descriptor_state
     103                 :     2. Scheduler pops descriptor_state and calls operator()
     104                 :     3. operator() performs I/O under mutex and queues completions
     105                 : 
     106                 :     This eliminates per-descriptor mutex locking from the reactor hot path.
     107                 : 
     108                 :     @par Thread Safety
     109                 :     The mutex protects operation pointers and ready flags during I/O.
     110                 :     ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
     111                 : */
     112                 : struct descriptor_state final : scheduler_op
     113                 : {
     114                 :     std::mutex mutex;
     115                 : 
     116                 :     // Protected by mutex
     117                 :     epoll_op* read_op    = nullptr;
     118                 :     epoll_op* write_op   = nullptr;
     119                 :     epoll_op* connect_op = nullptr;
     120                 : 
     121                 :     // Caches edge events that arrived before an op was registered
     122                 :     bool read_ready  = false;
     123                 :     bool write_ready = false;
     124                 : 
     125                 :     // Deferred cancellation: set by cancel() when the target op is not
     126                 :     // parked (e.g. completing inline via speculative I/O). Checked when
     127                 :     // the next op parks; if set, the op is immediately self-cancelled.
     128                 :     // This matches IOCP semantics where CancelIoEx always succeeds.
     129                 :     bool read_cancel_pending    = false;
     130                 :     bool write_cancel_pending   = false;
     131                 :     bool connect_cancel_pending = false;
     132                 : 
     133                 :     // Protected by mutex (written by register_descriptor and ensure_write_events)
     134                 :     std::uint32_t registered_events = 0;
     135                 :     int fd                          = -1;
     136                 : 
     137                 :     // For deferred I/O - set by reactor, read by scheduler
     138                 :     std::atomic<std::uint32_t> ready_events_{0};
     139                 :     std::atomic<bool> is_enqueued_{false};
     140                 :     epoll_scheduler const* scheduler_ = nullptr;
     141                 : 
     142                 :     // Prevents impl destruction while this descriptor_state is queued.
     143                 :     // Set by close_socket() when is_enqueued_ is true, cleared by operator().
     144                 :     std::shared_ptr<void> impl_ref_;
     145                 : 
     146                 :     /// Add ready events atomically.
     147 HIT       43896 :     void add_ready_events(std::uint32_t ev) noexcept
     148                 :     {
     149           43896 :         ready_events_.fetch_or(ev, std::memory_order_relaxed);
     150           43896 :     }
     151                 : 
     152                 :     /// Perform deferred I/O and queue completions.
     153                 :     void operator()() override;
     154                 : 
     155                 :     /// Destroy without invoking.
     156                 :     /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
     157                 :     /// the self-referential cycle set by close_socket().
     158              25 :     void destroy() override
     159                 :     {
     160              25 :         impl_ref_.reset();
     161              25 :     }
     162                 : };
     163                 : 
     164                 : struct epoll_op : scheduler_op
     165                 : {
     166                 :     struct canceller
     167                 :     {
     168                 :         epoll_op* op;
     169                 :         void operator()() const noexcept;
     170                 :     };
     171                 : 
     172                 :     std::coroutine_handle<> h;
     173                 :     capy::executor_ref ex;
     174                 :     std::error_code* ec_out = nullptr;
     175                 :     std::size_t* bytes_out  = nullptr;
     176                 : 
     177                 :     int fd                        = -1;
     178                 :     int errn                      = 0;
     179                 :     std::size_t bytes_transferred = 0;
     180                 : 
     181                 :     std::atomic<bool> cancelled{false};
     182                 :     std::optional<std::stop_callback<canceller>> stop_cb;
     183                 : 
     184                 :     // Prevents use-after-free when socket is closed with pending ops.
     185                 :     // See "Impl Lifetime Management" in file header.
     186                 :     std::shared_ptr<void> impl_ptr;
     187                 : 
     188                 :     // For stop_token cancellation - pointer to owning socket/acceptor impl.
     189                 :     // When stop is requested, we call back to the impl to perform actual I/O cancellation.
     190                 :     epoll_socket* socket_impl_     = nullptr;
     191                 :     epoll_acceptor* acceptor_impl_ = nullptr;
     192                 : 
     193           37106 :     epoll_op() = default;
     194                 : 
     195          263199 :     void reset() noexcept
     196                 :     {
     197          263199 :         fd                = -1;
     198          263199 :         errn              = 0;
     199          263199 :         bytes_transferred = 0;
     200          263199 :         cancelled.store(false, std::memory_order_relaxed);
     201          263199 :         impl_ptr.reset();
     202          263199 :         socket_impl_   = nullptr;
     203          263199 :         acceptor_impl_ = nullptr;
     204          263199 :     }
     205                 : 
     206                 :     // Defined in sockets.cpp where epoll_socket is complete
     207                 :     void operator()() override;
     208                 : 
     209           25450 :     virtual bool is_read_operation() const noexcept
     210                 :     {
     211           25450 :         return false;
     212                 :     }
     213                 :     virtual void cancel() noexcept = 0;
     214                 : 
     215 MIS           0 :     void destroy() override
     216                 :     {
     217               0 :         stop_cb.reset();
     218               0 :         impl_ptr.reset();
     219               0 :     }
     220                 : 
     221 HIT      111964 :     void request_cancel() noexcept
     222                 :     {
     223          111964 :         cancelled.store(true, std::memory_order_release);
     224          111964 :     }
     225                 : 
     226           55184 :     void start(std::stop_token const& token, epoll_socket* impl)
     227                 :     {
     228           55184 :         cancelled.store(false, std::memory_order_release);
     229           55184 :         stop_cb.reset();
     230           55184 :         socket_impl_   = impl;
     231           55184 :         acceptor_impl_ = nullptr;
     232                 : 
     233           55184 :         if (token.stop_possible())
     234              99 :             stop_cb.emplace(token, canceller{this});
     235           55184 :     }
     236                 : 
     237            4102 :     void start(std::stop_token const& token, epoll_acceptor* impl)
     238                 :     {
     239            4102 :         cancelled.store(false, std::memory_order_release);
     240            4102 :         stop_cb.reset();
     241            4102 :         socket_impl_   = nullptr;
     242            4102 :         acceptor_impl_ = impl;
     243                 : 
     244            4102 :         if (token.stop_possible())
     245               9 :             stop_cb.emplace(token, canceller{this});
     246            4102 :     }
     247                 : 
     248           59222 :     void complete(int err, std::size_t bytes) noexcept
     249                 :     {
     250           59222 :         errn              = err;
     251           59222 :         bytes_transferred = bytes;
     252           59222 :     }
     253                 : 
     254 MIS           0 :     virtual void perform_io() noexcept {}
     255                 : };
     256                 : 
     257                 : struct epoll_connect_op final : epoll_op
     258                 : {
     259                 :     endpoint target_endpoint;
     260                 : 
     261 HIT        4095 :     void reset() noexcept
     262                 :     {
     263            4095 :         epoll_op::reset();
     264            4095 :         target_endpoint = endpoint{};
     265            4095 :     }
     266                 : 
     267            4093 :     void perform_io() noexcept override
     268                 :     {
     269                 :         // Guard against spurious write-ready events: a zero-timeout
     270                 :         // poll confirms the fd is actually writable (connect finished).
     271            4093 :         pollfd pfd{};
     272            4093 :         pfd.fd     = fd;
     273            4093 :         pfd.events = POLLOUT;
     274            4093 :         if (::poll(&pfd, 1, 0) == 0)
     275                 :         {
     276 MIS           0 :             complete(EAGAIN, 0);
     277               0 :             return;
     278                 :         }
     279                 : 
     280 HIT        4093 :         int err       = 0;
     281            4093 :         socklen_t len = sizeof(err);
     282            4093 :         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     283 MIS           0 :             err = errno;
     284 HIT        4093 :         complete(err, 0);
     285                 :     }
     286                 : 
     287                 :     // Defined in sockets.cpp where epoll_socket is complete
     288                 :     void operator()() override;
     289                 :     void cancel() noexcept override;
     290                 : };
     291                 : 
     292                 : struct epoll_read_op final : epoll_op
     293                 : {
     294                 :     static constexpr std::size_t max_buffers = 16;
     295                 :     iovec iovecs[max_buffers];
     296                 :     int iovec_count        = 0;
     297                 :     bool empty_buffer_read = false;
     298                 : 
     299           25436 :     bool is_read_operation() const noexcept override
     300                 :     {
     301           25436 :         return !empty_buffer_read;
     302                 :     }
     303                 : 
     304          127600 :     void reset() noexcept
     305                 :     {
     306          127600 :         epoll_op::reset();
     307          127600 :         iovec_count       = 0;
     308          127600 :         empty_buffer_read = false;
     309          127600 :     }
     310                 : 
     311             144 :     void perform_io() noexcept override
     312                 :     {
     313                 :         ssize_t n;
     314                 :         do
     315                 :         {
     316             144 :             n = ::readv(fd, iovecs, iovec_count);
     317                 :         }
     318             144 :         while (n < 0 && errno == EINTR);
     319                 : 
     320             144 :         if (n >= 0)
     321               4 :             complete(0, static_cast<std::size_t>(n));
     322                 :         else
     323             140 :             complete(errno, 0);
     324             144 :     }
     325                 : 
     326                 :     void cancel() noexcept override;
     327                 : };
     328                 : 
     329                 : struct epoll_write_op final : epoll_op
     330                 : {
     331                 :     static constexpr std::size_t max_buffers = 16;
     332                 :     iovec iovecs[max_buffers];
     333                 :     int iovec_count = 0;
     334                 : 
     335          127402 :     void reset() noexcept
     336                 :     {
     337          127402 :         epoll_op::reset();
     338          127402 :         iovec_count = 0;
     339          127402 :     }
     340                 : 
     341 MIS           0 :     void perform_io() noexcept override
     342                 :     {
     343               0 :         msghdr msg{};
     344               0 :         msg.msg_iov    = iovecs;
     345               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     346                 : 
     347                 :         ssize_t n;
     348                 :         do
     349                 :         {
     350               0 :             n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
     351                 :         }
     352               0 :         while (n < 0 && errno == EINTR);
     353                 : 
     354               0 :         if (n >= 0)
     355               0 :             complete(0, static_cast<std::size_t>(n));
     356                 :         else
     357               0 :             complete(errno, 0);
     358               0 :     }
     359                 : 
     360                 :     void cancel() noexcept override;
     361                 : };
     362                 : 
     363                 : struct epoll_accept_op final : epoll_op
     364                 : {
     365                 :     int accepted_fd                      = -1;
     366                 :     io_object::implementation** impl_out = nullptr;
     367                 :     sockaddr_storage peer_storage{};
     368                 : 
     369 HIT        4102 :     void reset() noexcept
     370                 :     {
     371            4102 :         epoll_op::reset();
     372            4102 :         accepted_fd  = -1;
     373            4102 :         impl_out     = nullptr;
     374            4102 :         peer_storage = {};
     375            4102 :     }
     376                 : 
     377            4091 :     void perform_io() noexcept override
     378                 :     {
     379            4091 :         socklen_t addrlen = sizeof(peer_storage);
     380                 :         int new_fd;
     381                 :         do
     382                 :         {
     383            8182 :             new_fd = ::accept4(
     384            4091 :                 fd, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
     385                 :                 SOCK_NONBLOCK | SOCK_CLOEXEC);
     386                 :         }
     387            4091 :         while (new_fd < 0 && errno == EINTR);
     388                 : 
     389            4091 :         if (new_fd >= 0)
     390                 :         {
     391            4091 :             accepted_fd = new_fd;
     392            4091 :             complete(0, 0);
     393                 :         }
     394                 :         else
     395                 :         {
     396 MIS           0 :             complete(errno, 0);
     397                 :         }
     398 HIT        4091 :     }
     399                 : 
     400                 :     // Defined in acceptors.cpp where epoll_acceptor is complete
     401                 :     void operator()() override;
     402                 :     void cancel() noexcept override;
     403                 : };
     404                 : 
     405                 : } // namespace boost::corosio::detail
     406                 : 
     407                 : #endif // BOOST_COROSIO_HAS_EPOLL
     408                 : 
     409                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
        

Generated by: LCOV version 2.3