LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_socket_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 80.8 % 417 337 80
Test Date: 2026-03-11 17:45:15 Functions: 90.6 % 32 29 3

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/socket_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
      22                 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
      23                 : 
      24                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      25                 : #include <boost/corosio/native/detail/make_err.hpp>
      26                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      27                 : #include <boost/corosio/detail/except.hpp>
      28                 : #include <boost/capy/buffers.hpp>
      29                 : 
      30                 : #include <coroutine>
      31                 : #include <mutex>
      32                 : #include <unordered_map>
      33                 : #include <utility>
      34                 : 
      35                 : #include <errno.h>
      36                 : #include <netinet/in.h>
      37                 : #include <netinet/tcp.h>
      38                 : #include <sys/epoll.h>
      39                 : #include <sys/socket.h>
      40                 : #include <unistd.h>
      41                 : 
      42                 : /*
      43                 :     epoll Socket Implementation
      44                 :     ===========================
      45                 : 
      46                 :     Each I/O operation follows the same pattern:
      47                 :       1. Try the syscall immediately (non-blocking socket)
      48                 :       2. If it succeeds or fails with a real error, post to completion queue
      49                 :       3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
      50                 : 
      51                 :     This "try first" approach avoids unnecessary epoll round-trips for
      52                 :     operations that can complete immediately (common for small reads/writes
      53                 :     on fast local connections).
      54                 : 
      55                 :     One-Shot Registration
      56                 :     ---------------------
      57                 :     We use one-shot epoll registration: each operation registers, waits for
      58                 :     one event, then unregisters. This simplifies the state machine since we
      59                 :     don't need to track whether an fd is currently registered or handle
      60                 :     re-arming. The tradeoff is slightly more epoll_ctl calls, but the
      61                 :     simplicity is worth it.
      62                 : 
      63                 :     Cancellation
      64                 :     ------------
      65                 :     See op.hpp for the completion/cancellation race handling via the
      66                 :     `registered` atomic. cancel() must complete pending operations (post
      67                 :     them with cancelled flag) so coroutines waiting on them can resume.
      68                 :     close_socket() calls cancel() first to ensure this.
      69                 : 
      70                 :     Impl Lifetime with shared_ptr
      71                 :     -----------------------------
      72                 :     Socket impls use enable_shared_from_this. The service owns impls via
      73                 :     shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
      74                 :     removal. When a user calls close(), we call cancel() which posts pending
      75                 :     ops to the scheduler.
      76                 : 
      77                 :     CRITICAL: The posted ops must keep the impl alive until they complete.
      78                 :     Otherwise the scheduler would process a freed op (use-after-free). The
      79                 :     cancel() method captures shared_from_this() into op.impl_ptr before
      80                 :     posting. When the op completes, impl_ptr is cleared, allowing the impl
      81                 :     to be destroyed if no other references exist.
      82                 : 
      83                 :     Service Ownership
      84                 :     -----------------
      85                 :     epoll_socket_service owns all socket impls. destroy_impl() removes the
      86                 :     shared_ptr from the map, but the impl may survive if ops still hold
      87                 :     impl_ptr refs. shutdown() closes all sockets and clears the map; any
      88                 :     in-flight ops will complete and release their refs.
      89                 : */
      90                 : 
      91                 : namespace boost::corosio::detail {
      92                 : 
      93                 : /** State for epoll socket service. */
      94                 : class epoll_socket_state
      95                 : {
      96                 : public:
      97 HIT         244 :     explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
      98                 :     {
      99             244 :     }
     100                 : 
     101                 :     epoll_scheduler& sched_;
     102                 :     std::mutex mutex_;
     103                 :     intrusive_list<epoll_socket> socket_list_;
     104                 :     std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
     105                 :         socket_ptrs_;
     106                 : };
     107                 : 
     108                 : /** epoll socket service implementation.
     109                 : 
     110                 :     Inherits from socket_service to enable runtime polymorphism.
     111                 :     Uses key_type = socket_service for service lookup.
     112                 : */
     113                 : class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
     114                 : {
     115                 : public:
     116                 :     explicit epoll_socket_service(capy::execution_context& ctx);
     117                 :     ~epoll_socket_service() override;
     118                 : 
     119                 :     epoll_socket_service(epoll_socket_service const&)            = delete;
     120                 :     epoll_socket_service& operator=(epoll_socket_service const&) = delete;
     121                 : 
     122                 :     void shutdown() override;
     123                 : 
     124                 :     io_object::implementation* construct() override;
     125                 :     void destroy(io_object::implementation*) override;
     126                 :     void close(io_object::handle&) override;
     127                 :     std::error_code open_socket(
     128                 :         tcp_socket::implementation& impl,
     129                 :         int family,
     130                 :         int type,
     131                 :         int protocol) override;
     132                 : 
     133          330486 :     epoll_scheduler& scheduler() const noexcept
     134                 :     {
     135          330486 :         return state_->sched_;
     136                 :     }
     137                 :     void post(epoll_op* op);
     138                 :     void work_started() noexcept;
     139                 :     void work_finished() noexcept;
     140                 : 
     141                 : private:
     142                 :     std::unique_ptr<epoll_socket_state> state_;
     143                 : };
     144                 : 
     145                 : //--------------------------------------------------------------------------
     146                 : //
     147                 : // Implementation
     148                 : //
     149                 : //--------------------------------------------------------------------------
     150                 : 
     151                 : // Register an op with the reactor, handling cached edge events.
     152                 : // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
     153                 : inline void
     154            4294 : epoll_socket::register_op(
     155                 :     epoll_op& op,
     156                 :     epoll_op*& desc_slot,
     157                 :     bool& ready_flag,
     158                 :     bool& cancel_flag) noexcept
     159                 : {
     160            4294 :     svc_.work_started();
     161                 : 
     162            4294 :     std::lock_guard lock(desc_state_.mutex);
     163            4294 :     bool io_done = false;
     164            4294 :     if (ready_flag)
     165                 :     {
     166             140 :         ready_flag = false;
     167             140 :         op.perform_io();
     168             140 :         io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     169             140 :         if (!io_done)
     170             140 :             op.errn = 0;
     171                 :     }
     172                 : 
     173            4294 :     if (cancel_flag)
     174                 :     {
     175              93 :         cancel_flag = false;
     176              93 :         op.cancelled.store(true, std::memory_order_relaxed);
     177                 :     }
     178                 : 
     179            4294 :     if (io_done || op.cancelled.load(std::memory_order_acquire))
     180                 :     {
     181              93 :         svc_.post(&op);
     182              93 :         svc_.work_finished();
     183                 :     }
     184                 :     else
     185                 :     {
     186            4201 :         desc_slot = &op;
     187                 :     }
     188            4294 : }
     189                 : 
     190                 : inline void
     191             104 : epoll_op::canceller::operator()() const noexcept
     192                 : {
     193             104 :     op->cancel();
     194             104 : }
     195                 : 
     196                 : inline void
     197 MIS           0 : epoll_connect_op::cancel() noexcept
     198                 : {
     199               0 :     if (socket_impl_)
     200               0 :         socket_impl_->cancel_single_op(*this);
     201                 :     else
     202               0 :         request_cancel();
     203               0 : }
     204                 : 
     205                 : inline void
     206 HIT          98 : epoll_read_op::cancel() noexcept
     207                 : {
     208              98 :     if (socket_impl_)
     209              98 :         socket_impl_->cancel_single_op(*this);
     210                 :     else
     211 MIS           0 :         request_cancel();
     212 HIT          98 : }
     213                 : 
     214                 : inline void
     215 MIS           0 : epoll_write_op::cancel() noexcept
     216                 : {
     217               0 :     if (socket_impl_)
     218               0 :         socket_impl_->cancel_single_op(*this);
     219                 :     else
     220               0 :         request_cancel();
     221               0 : }
     222                 : 
     223                 : inline void
     224 HIT       51089 : epoll_op::operator()()
     225                 : {
     226           51089 :     stop_cb.reset();
     227                 : 
     228           51089 :     socket_impl_->svc_.scheduler().reset_inline_budget();
     229                 : 
     230           51089 :     if (cancelled.load(std::memory_order_acquire))
     231             203 :         *ec_out = capy::error::canceled;
     232           50886 :     else if (errn != 0)
     233 MIS           0 :         *ec_out = make_err(errn);
     234 HIT       50886 :     else if (is_read_operation() && bytes_transferred == 0)
     235 MIS           0 :         *ec_out = capy::error::eof;
     236                 :     else
     237 HIT       50886 :         *ec_out = {};
     238                 : 
     239           51089 :     *bytes_out = bytes_transferred;
     240                 : 
     241                 :     // Move to stack before resuming coroutine. The coroutine might close
     242                 :     // the socket, releasing the last wrapper ref. If impl_ptr were the
     243                 :     // last ref and we destroyed it while still in operator(), we'd have
     244                 :     // use-after-free. Moving to local ensures destruction happens at
     245                 :     // function exit, after all member accesses are complete.
     246           51089 :     capy::executor_ref saved_ex(ex);
     247           51089 :     std::coroutine_handle<> saved_h(h);
     248           51089 :     auto prevent_premature_destruction = std::move(impl_ptr);
     249           51089 :     dispatch_coro(saved_ex, saved_h).resume();
     250           51089 : }
     251                 : 
     252                 : inline void
     253            4095 : epoll_connect_op::operator()()
     254                 : {
     255            4095 :     stop_cb.reset();
     256                 : 
     257            4095 :     socket_impl_->svc_.scheduler().reset_inline_budget();
     258                 : 
     259            4095 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
     260                 : 
     261                 :     // Cache endpoints on successful connect
     262            4095 :     if (success && socket_impl_)
     263                 :     {
     264            4093 :         endpoint local_ep;
     265            4093 :         sockaddr_storage local_storage{};
     266            4093 :         socklen_t local_len = sizeof(local_storage);
     267            4093 :         if (::getsockname(
     268            4093 :                 fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
     269                 :             0)
     270            4093 :             local_ep = from_sockaddr(local_storage);
     271            4093 :         static_cast<epoll_socket*>(socket_impl_)
     272            4093 :             ->set_endpoints(local_ep, target_endpoint);
     273                 :     }
     274                 : 
     275            4095 :     if (cancelled.load(std::memory_order_acquire))
     276 MIS           0 :         *ec_out = capy::error::canceled;
     277 HIT        4095 :     else if (errn != 0)
     278               2 :         *ec_out = make_err(errn);
     279                 :     else
     280            4093 :         *ec_out = {};
     281                 : 
     282                 :     // Move to stack before resuming. See epoll_op::operator()() for rationale.
     283            4095 :     capy::executor_ref saved_ex(ex);
     284            4095 :     std::coroutine_handle<> saved_h(h);
     285            4095 :     auto prevent_premature_destruction = std::move(impl_ptr);
     286            4095 :     dispatch_coro(saved_ex, saved_h).resume();
     287            4095 : }
     288                 : 
     289           12342 : inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
     290           12342 :     : svc_(svc)
     291                 : {
     292           12342 : }
     293                 : 
     294           12342 : inline epoll_socket::~epoll_socket() = default;
     295                 : 
     296                 : inline std::coroutine_handle<>
     297            4095 : epoll_socket::connect(
     298                 :     std::coroutine_handle<> h,
     299                 :     capy::executor_ref ex,
     300                 :     endpoint ep,
     301                 :     std::stop_token token,
     302                 :     std::error_code* ec)
     303                 : {
     304            4095 :     auto& op = conn_;
     305                 : 
     306            4095 :     sockaddr_storage storage{};
     307                 :     socklen_t addrlen =
     308            4095 :         detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
     309            4095 :     int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
     310                 : 
     311            4095 :     if (result == 0)
     312                 :     {
     313 MIS           0 :         sockaddr_storage local_storage{};
     314               0 :         socklen_t local_len = sizeof(local_storage);
     315               0 :         if (::getsockname(
     316               0 :                 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
     317                 :             0)
     318               0 :             local_endpoint_ = detail::from_sockaddr(local_storage);
     319               0 :         remote_endpoint_ = ep;
     320                 :     }
     321                 : 
     322 HIT        4095 :     if (result == 0 || errno != EINPROGRESS)
     323                 :     {
     324 MIS           0 :         int err = (result < 0) ? errno : 0;
     325               0 :         if (svc_.scheduler().try_consume_inline_budget())
     326                 :         {
     327               0 :             *ec = err ? make_err(err) : std::error_code{};
     328               0 :             return dispatch_coro(ex, h);
     329                 :         }
     330               0 :         op.reset();
     331               0 :         op.h               = h;
     332               0 :         op.ex              = ex;
     333               0 :         op.ec_out          = ec;
     334               0 :         op.fd              = fd_;
     335               0 :         op.target_endpoint = ep;
     336               0 :         op.start(token, this);
     337               0 :         op.impl_ptr = shared_from_this();
     338               0 :         op.complete(err, 0);
     339               0 :         svc_.post(&op);
     340               0 :         return std::noop_coroutine();
     341                 :     }
     342                 : 
     343                 :     // EINPROGRESS — register with reactor
     344 HIT        4095 :     svc_.scheduler().ensure_write_events(fd_, &desc_state_);
     345                 : 
     346            4095 :     op.reset();
     347            4095 :     op.h               = h;
     348            4095 :     op.ex              = ex;
     349            4095 :     op.ec_out          = ec;
     350            4095 :     op.fd              = fd_;
     351            4095 :     op.target_endpoint = ep;
     352            4095 :     op.start(token, this);
     353            4095 :     op.impl_ptr = shared_from_this();
     354                 : 
     355            4095 :     register_op(
     356            4095 :         op, desc_state_.connect_op, desc_state_.write_ready,
     357            4095 :         desc_state_.connect_cancel_pending);
     358            4095 :     return std::noop_coroutine();
     359                 : }
     360                 : 
     361                 : inline std::coroutine_handle<>
     362          127600 : epoll_socket::read_some(
     363                 :     std::coroutine_handle<> h,
     364                 :     capy::executor_ref ex,
     365                 :     buffer_param param,
     366                 :     std::stop_token token,
     367                 :     std::error_code* ec,
     368                 :     std::size_t* bytes_out)
     369                 : {
     370          127600 :     auto& op = rd_;
     371          127600 :     op.reset();
     372                 : 
     373          127600 :     capy::mutable_buffer bufs[epoll_read_op::max_buffers];
     374          127600 :     op.iovec_count =
     375          127600 :         static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
     376                 : 
     377          127600 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     378                 :     {
     379               1 :         op.empty_buffer_read = true;
     380               1 :         op.h                 = h;
     381               1 :         op.ex                = ex;
     382               1 :         op.ec_out            = ec;
     383               1 :         op.bytes_out         = bytes_out;
     384               1 :         op.start(token, this);
     385               1 :         op.impl_ptr = shared_from_this();
     386               1 :         op.complete(0, 0);
     387               1 :         svc_.post(&op);
     388               1 :         return std::noop_coroutine();
     389                 :     }
     390                 : 
     391          255198 :     for (int i = 0; i < op.iovec_count; ++i)
     392                 :     {
     393          127599 :         op.iovecs[i].iov_base = bufs[i].data();
     394          127599 :         op.iovecs[i].iov_len  = bufs[i].size();
     395                 :     }
     396                 : 
     397                 :     // Speculative read
     398                 :     ssize_t n;
     399                 :     do
     400                 :     {
     401          127599 :         n = ::readv(fd_, op.iovecs, op.iovec_count);
     402                 :     }
     403          127599 :     while (n < 0 && errno == EINTR);
     404                 : 
     405          127599 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     406                 :     {
     407          127400 :         int err    = (n < 0) ? errno : 0;
     408          127400 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     409                 : 
     410          127400 :         if (svc_.scheduler().try_consume_inline_budget())
     411                 :         {
     412          101965 :             if (err)
     413 MIS           0 :                 *ec = make_err(err);
     414 HIT      101965 :             else if (n == 0)
     415               5 :                 *ec = capy::error::eof;
     416                 :             else
     417          101960 :                 *ec = {};
     418          101965 :             *bytes_out = bytes;
     419          101965 :             return dispatch_coro(ex, h);
     420                 :         }
     421           25435 :         op.h         = h;
     422           25435 :         op.ex        = ex;
     423           25435 :         op.ec_out    = ec;
     424           25435 :         op.bytes_out = bytes_out;
     425           25435 :         op.start(token, this);
     426           25435 :         op.impl_ptr = shared_from_this();
     427           25435 :         op.complete(err, bytes);
     428           25435 :         svc_.post(&op);
     429           25435 :         return std::noop_coroutine();
     430                 :     }
     431                 : 
     432                 :     // EAGAIN — register with reactor
     433             199 :     op.h         = h;
     434             199 :     op.ex        = ex;
     435             199 :     op.ec_out    = ec;
     436             199 :     op.bytes_out = bytes_out;
     437             199 :     op.fd        = fd_;
     438             199 :     op.start(token, this);
     439             199 :     op.impl_ptr = shared_from_this();
     440                 : 
     441             199 :     register_op(
     442             199 :         op, desc_state_.read_op, desc_state_.read_ready,
     443             199 :         desc_state_.read_cancel_pending);
     444             199 :     return std::noop_coroutine();
     445                 : }
     446                 : 
     447                 : inline std::coroutine_handle<>
     448          127402 : epoll_socket::write_some(
     449                 :     std::coroutine_handle<> h,
     450                 :     capy::executor_ref ex,
     451                 :     buffer_param param,
     452                 :     std::stop_token token,
     453                 :     std::error_code* ec,
     454                 :     std::size_t* bytes_out)
     455                 : {
     456          127402 :     auto& op = wr_;
     457          127402 :     op.reset();
     458                 : 
     459          127402 :     capy::mutable_buffer bufs[epoll_write_op::max_buffers];
     460          127402 :     op.iovec_count =
     461          127402 :         static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
     462                 : 
     463          127402 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     464                 :     {
     465               1 :         op.h         = h;
     466               1 :         op.ex        = ex;
     467               1 :         op.ec_out    = ec;
     468               1 :         op.bytes_out = bytes_out;
     469               1 :         op.start(token, this);
     470               1 :         op.impl_ptr = shared_from_this();
     471               1 :         op.complete(0, 0);
     472               1 :         svc_.post(&op);
     473               1 :         return std::noop_coroutine();
     474                 :     }
     475                 : 
     476          254802 :     for (int i = 0; i < op.iovec_count; ++i)
     477                 :     {
     478          127401 :         op.iovecs[i].iov_base = bufs[i].data();
     479          127401 :         op.iovecs[i].iov_len  = bufs[i].size();
     480                 :     }
     481                 : 
     482                 :     // Speculative write
     483          127401 :     msghdr msg{};
     484          127401 :     msg.msg_iov    = op.iovecs;
     485          127401 :     msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
     486                 : 
     487                 :     ssize_t n;
     488                 :     do
     489                 :     {
     490          127401 :         n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
     491                 :     }
     492          127401 :     while (n < 0 && errno == EINTR);
     493                 : 
     494          127401 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     495                 :     {
     496          127401 :         int err    = (n < 0) ? errno : 0;
     497          127401 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     498                 : 
     499          127401 :         if (svc_.scheduler().try_consume_inline_budget())
     500                 :         {
     501          101948 :             *ec        = err ? make_err(err) : std::error_code{};
     502          101948 :             *bytes_out = bytes;
     503          101948 :             return dispatch_coro(ex, h);
     504                 :         }
     505           25453 :         op.h         = h;
     506           25453 :         op.ex        = ex;
     507           25453 :         op.ec_out    = ec;
     508           25453 :         op.bytes_out = bytes_out;
     509           25453 :         op.start(token, this);
     510           25453 :         op.impl_ptr = shared_from_this();
     511           25453 :         op.complete(err, bytes);
     512           25453 :         svc_.post(&op);
     513           25453 :         return std::noop_coroutine();
     514                 :     }
     515                 : 
     516                 :     // EAGAIN — register with reactor
     517 MIS           0 :     svc_.scheduler().ensure_write_events(fd_, &desc_state_);
     518                 : 
     519               0 :     op.h         = h;
     520               0 :     op.ex        = ex;
     521               0 :     op.ec_out    = ec;
     522               0 :     op.bytes_out = bytes_out;
     523               0 :     op.fd        = fd_;
     524               0 :     op.start(token, this);
     525               0 :     op.impl_ptr = shared_from_this();
     526                 : 
     527               0 :     register_op(
     528               0 :         op, desc_state_.write_op, desc_state_.write_ready,
     529               0 :         desc_state_.write_cancel_pending);
     530               0 :     return std::noop_coroutine();
     531                 : }
     532                 : 
     533                 : inline std::error_code
     534 HIT           3 : epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
     535                 : {
     536                 :     int how;
     537               3 :     switch (what)
     538                 :     {
     539               1 :     case tcp_socket::shutdown_receive:
     540               1 :         how = SHUT_RD;
     541               1 :         break;
     542               1 :     case tcp_socket::shutdown_send:
     543               1 :         how = SHUT_WR;
     544               1 :         break;
     545               1 :     case tcp_socket::shutdown_both:
     546               1 :         how = SHUT_RDWR;
     547               1 :         break;
     548 MIS           0 :     default:
     549               0 :         return make_err(EINVAL);
     550                 :     }
     551 HIT           3 :     if (::shutdown(fd_, how) != 0)
     552 MIS           0 :         return make_err(errno);
     553 HIT           3 :     return {};
     554                 : }
     555                 : 
     556                 : inline std::error_code
     557              32 : epoll_socket::set_option(
     558                 :     int level, int optname, void const* data, std::size_t size) noexcept
     559                 : {
     560              32 :     if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
     561                 :         0)
     562 MIS           0 :         return make_err(errno);
     563 HIT          32 :     return {};
     564                 : }
     565                 : 
     566                 : inline std::error_code
     567              31 : epoll_socket::get_option(
     568                 :     int level, int optname, void* data, std::size_t* size) const noexcept
     569                 : {
     570              31 :     socklen_t len = static_cast<socklen_t>(*size);
     571              31 :     if (::getsockopt(fd_, level, optname, data, &len) != 0)
     572 MIS           0 :         return make_err(errno);
     573 HIT          31 :     *size = static_cast<std::size_t>(len);
     574              31 :     return {};
     575                 : }
     576                 : 
     577                 : inline void
     578             183 : epoll_socket::cancel() noexcept
     579                 : {
     580             183 :     auto self = weak_from_this().lock();
     581             183 :     if (!self)
     582 MIS           0 :         return;
     583                 : 
     584 HIT         183 :     conn_.request_cancel();
     585             183 :     rd_.request_cancel();
     586             183 :     wr_.request_cancel();
     587                 : 
     588             183 :     epoll_op* conn_claimed = nullptr;
     589             183 :     epoll_op* rd_claimed   = nullptr;
     590             183 :     epoll_op* wr_claimed   = nullptr;
     591                 :     {
     592             183 :         std::lock_guard lock(desc_state_.mutex);
     593             183 :         if (desc_state_.connect_op == &conn_)
     594 MIS           0 :             conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
     595                 :         else
     596 HIT         183 :             desc_state_.connect_cancel_pending = true;
     597             183 :         if (desc_state_.read_op == &rd_)
     598               3 :             rd_claimed = std::exchange(desc_state_.read_op, nullptr);
     599                 :         else
     600             180 :             desc_state_.read_cancel_pending = true;
     601             183 :         if (desc_state_.write_op == &wr_)
     602 MIS           0 :             wr_claimed = std::exchange(desc_state_.write_op, nullptr);
     603                 :         else
     604 HIT         183 :             desc_state_.write_cancel_pending = true;
     605             183 :     }
     606                 : 
     607             183 :     if (conn_claimed)
     608                 :     {
     609 MIS           0 :         conn_.impl_ptr = self;
     610               0 :         svc_.post(&conn_);
     611               0 :         svc_.work_finished();
     612                 :     }
     613 HIT         183 :     if (rd_claimed)
     614                 :     {
     615               3 :         rd_.impl_ptr = self;
     616               3 :         svc_.post(&rd_);
     617               3 :         svc_.work_finished();
     618                 :     }
     619             183 :     if (wr_claimed)
     620                 :     {
     621 MIS           0 :         wr_.impl_ptr = self;
     622               0 :         svc_.post(&wr_);
     623               0 :         svc_.work_finished();
     624                 :     }
     625 HIT         183 : }
     626                 : 
     627                 : inline void
     628              98 : epoll_socket::cancel_single_op(epoll_op& op) noexcept
     629                 : {
     630              98 :     auto self = weak_from_this().lock();
     631              98 :     if (!self)
     632 MIS           0 :         return;
     633                 : 
     634 HIT          98 :     op.request_cancel();
     635                 : 
     636              98 :     epoll_op** desc_op_ptr = nullptr;
     637              98 :     if (&op == &conn_)
     638 MIS           0 :         desc_op_ptr = &desc_state_.connect_op;
     639 HIT          98 :     else if (&op == &rd_)
     640              98 :         desc_op_ptr = &desc_state_.read_op;
     641 MIS           0 :     else if (&op == &wr_)
     642               0 :         desc_op_ptr = &desc_state_.write_op;
     643                 : 
     644 HIT          98 :     if (desc_op_ptr)
     645                 :     {
     646              98 :         epoll_op* claimed = nullptr;
     647                 :         {
     648              98 :             std::lock_guard lock(desc_state_.mutex);
     649              98 :             if (*desc_op_ptr == &op)
     650              98 :                 claimed = std::exchange(*desc_op_ptr, nullptr);
     651 MIS           0 :             else if (&op == &conn_)
     652               0 :                 desc_state_.connect_cancel_pending = true;
     653               0 :             else if (&op == &rd_)
     654               0 :                 desc_state_.read_cancel_pending = true;
     655               0 :             else if (&op == &wr_)
     656               0 :                 desc_state_.write_cancel_pending = true;
     657 HIT          98 :         }
     658              98 :         if (claimed)
     659                 :         {
     660              98 :             op.impl_ptr = self;
     661              98 :             svc_.post(&op);
     662              98 :             svc_.work_finished();
     663                 :         }
     664                 :     }
     665              98 : }
     666                 : 
     667                 : inline void
     668           36997 : epoll_socket::close_socket() noexcept
     669                 : {
     670           36997 :     auto self = weak_from_this().lock();
     671           36997 :     if (self)
     672                 :     {
     673           36997 :         conn_.request_cancel();
     674           36997 :         rd_.request_cancel();
     675           36997 :         wr_.request_cancel();
     676                 : 
     677           36997 :         epoll_op* conn_claimed = nullptr;
     678           36997 :         epoll_op* rd_claimed   = nullptr;
     679           36997 :         epoll_op* wr_claimed   = nullptr;
     680                 :         {
     681           36997 :             std::lock_guard lock(desc_state_.mutex);
     682           36997 :             conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
     683           36997 :             rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
     684           36997 :             wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
     685           36997 :             desc_state_.read_ready             = false;
     686           36997 :             desc_state_.write_ready            = false;
     687           36997 :             desc_state_.read_cancel_pending    = false;
     688           36997 :             desc_state_.write_cancel_pending   = false;
     689           36997 :             desc_state_.connect_cancel_pending = false;
     690           36997 :         }
     691                 : 
     692           36997 :         if (conn_claimed)
     693                 :         {
     694 MIS           0 :             conn_.impl_ptr = self;
     695               0 :             svc_.post(&conn_);
     696               0 :             svc_.work_finished();
     697                 :         }
     698 HIT       36997 :         if (rd_claimed)
     699                 :         {
     700               1 :             rd_.impl_ptr = self;
     701               1 :             svc_.post(&rd_);
     702               1 :             svc_.work_finished();
     703                 :         }
     704           36997 :         if (wr_claimed)
     705                 :         {
     706 MIS           0 :             wr_.impl_ptr = self;
     707               0 :             svc_.post(&wr_);
     708               0 :             svc_.work_finished();
     709                 :         }
     710                 : 
     711 HIT       36997 :         if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     712              82 :             desc_state_.impl_ref_ = self;
     713                 :     }
     714                 : 
     715           36997 :     if (fd_ >= 0)
     716                 :     {
     717            8203 :         if (desc_state_.registered_events != 0)
     718            8203 :             svc_.scheduler().deregister_descriptor(fd_);
     719            8203 :         ::close(fd_);
     720            8203 :         fd_ = -1;
     721                 :     }
     722                 : 
     723           36997 :     desc_state_.fd                = -1;
     724           36997 :     desc_state_.registered_events = 0;
     725                 : 
     726           36997 :     local_endpoint_  = endpoint{};
     727           36997 :     remote_endpoint_ = endpoint{};
     728           36997 : }
     729                 : 
     730             244 : inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
     731             244 :     : state_(
     732                 :           std::make_unique<epoll_socket_state>(
     733             244 :               ctx.use_service<epoll_scheduler>()))
     734                 : {
     735             244 : }
     736                 : 
     737             488 : inline epoll_socket_service::~epoll_socket_service() {}
     738                 : 
     739                 : inline void
     740             244 : epoll_socket_service::shutdown()
     741                 : {
     742             244 :     std::lock_guard lock(state_->mutex_);
     743                 : 
     744             244 :     while (auto* impl = state_->socket_list_.pop_front())
     745 MIS           0 :         impl->close_socket();
     746                 : 
     747                 :     // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
     748                 :     // drains completed_ops_, calling destroy() on each queued op. If we
     749                 :     // released our shared_ptrs now, an epoll_op::destroy() could free the
     750                 :     // last ref to an impl whose embedded descriptor_state is still linked
     751                 :     // in the queue — use-after-free on the next pop(). Letting ~state_
     752                 :     // release the ptrs (during service destruction, after scheduler
     753                 :     // shutdown) keeps every impl alive until all ops have been drained.
     754 HIT         244 : }
     755                 : 
     756                 : inline io_object::implementation*
     757           12342 : epoll_socket_service::construct()
     758                 : {
     759           12342 :     auto impl = std::make_shared<epoll_socket>(*this);
     760           12342 :     auto* raw = impl.get();
     761                 : 
     762                 :     {
     763           12342 :         std::lock_guard lock(state_->mutex_);
     764           12342 :         state_->socket_list_.push_back(raw);
     765           12342 :         state_->socket_ptrs_.emplace(raw, std::move(impl));
     766           12342 :     }
     767                 : 
     768           12342 :     return raw;
     769           12342 : }
     770                 : 
     771                 : inline void
     772           12342 : epoll_socket_service::destroy(io_object::implementation* impl)
     773                 : {
     774           12342 :     auto* epoll_impl = static_cast<epoll_socket*>(impl);
     775           12342 :     epoll_impl->close_socket();
     776           12342 :     std::lock_guard lock(state_->mutex_);
     777           12342 :     state_->socket_list_.remove(epoll_impl);
     778           12342 :     state_->socket_ptrs_.erase(epoll_impl);
     779           12342 : }
     780                 : 
     781                 : inline std::error_code
     782            4110 : epoll_socket_service::open_socket(
     783                 :     tcp_socket::implementation& impl, int family, int type, int protocol)
     784                 : {
     785            4110 :     auto* epoll_impl = static_cast<epoll_socket*>(&impl);
     786            4110 :     epoll_impl->close_socket();
     787                 : 
     788            4110 :     int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
     789            4110 :     if (fd < 0)
     790 MIS           0 :         return make_err(errno);
     791                 : 
     792 HIT        4110 :     if (family == AF_INET6)
     793                 :     {
     794               5 :         int one = 1;
     795               5 :         ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
     796                 :     }
     797                 : 
     798            4110 :     epoll_impl->fd_ = fd;
     799                 : 
     800                 :     // Register fd with epoll (edge-triggered mode)
     801            4110 :     epoll_impl->desc_state_.fd = fd;
     802                 :     {
     803            4110 :         std::lock_guard lock(epoll_impl->desc_state_.mutex);
     804            4110 :         epoll_impl->desc_state_.read_op    = nullptr;
     805            4110 :         epoll_impl->desc_state_.write_op   = nullptr;
     806            4110 :         epoll_impl->desc_state_.connect_op = nullptr;
     807            4110 :     }
     808            4110 :     scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
     809                 : 
     810            4110 :     return {};
     811                 : }
     812                 : 
     813                 : inline void
     814           20545 : epoll_socket_service::close(io_object::handle& h)
     815                 : {
     816           20545 :     static_cast<epoll_socket*>(h.get())->close_socket();
     817           20545 : }
     818                 : 
     819                 : inline void
     820           51085 : epoll_socket_service::post(epoll_op* op)
     821                 : {
     822           51085 :     state_->sched_.post(op);
     823           51085 : }
     824                 : 
     825                 : inline void
     826            4294 : epoll_socket_service::work_started() noexcept
     827                 : {
     828            4294 :     state_->sched_.work_started();
     829            4294 : }
     830                 : 
     831                 : inline void
     832             195 : epoll_socket_service::work_finished() noexcept
     833                 : {
     834             195 :     state_->sched_.work_finished();
     835             195 : }
     836                 : 
     837                 : } // namespace boost::corosio::detail
     838                 : 
     839                 : #endif // BOOST_COROSIO_HAS_EPOLL
     840                 : 
     841                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
        

Generated by: LCOV version 2.3