LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_scheduler.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 80.6 % 505 407 98
Test Date: 2026-03-11 17:45:15 Functions: 88.2 % 51 45 6

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : 
      20                 : #include <boost/corosio/native/native_scheduler.hpp>
      21                 : #include <boost/corosio/detail/scheduler_op.hpp>
      22                 : 
      23                 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
      24                 : #include <boost/corosio/detail/timer_service.hpp>
      25                 : #include <boost/corosio/native/detail/make_err.hpp>
      26                 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
      27                 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
      28                 : 
      29                 : #include <boost/corosio/detail/except.hpp>
      30                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      31                 : 
      32                 : #include <atomic>
      33                 : #include <chrono>
      34                 : #include <condition_variable>
      35                 : #include <cstddef>
      36                 : #include <cstdint>
      37                 : #include <limits>
      38                 : #include <mutex>
      39                 : #include <utility>
      40                 : 
      41                 : #include <errno.h>
      42                 : #include <fcntl.h>
      43                 : #include <sys/epoll.h>
      44                 : #include <sys/eventfd.h>
      45                 : #include <sys/socket.h>
      46                 : #include <sys/timerfd.h>
      47                 : #include <unistd.h>
      48                 : 
      49                 : namespace boost::corosio::detail {
      50                 : 
      51                 : struct epoll_op;
      52                 : struct descriptor_state;
      53                 : namespace epoll {
      54                 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
      55                 : } // namespace epoll
      56                 : 
      57                 : /** Linux scheduler using epoll for I/O multiplexing.
      58                 : 
      59                 :     This scheduler implements the scheduler interface using Linux epoll
      60                 :     for efficient I/O event notification. It uses a single reactor model
      61                 :     where one thread runs epoll_wait while other threads
      62                 :     wait on a condition variable for handler work. This design provides:
      63                 : 
      64                 :     - Handler parallelism: N posted handlers can execute on N threads
      65                 :     - No thundering herd: condition_variable wakes exactly one thread
      66                 :     - IOCP parity: Behavior matches Windows I/O completion port semantics
      67                 : 
      68                 :     When threads call run(), they first try to execute queued handlers.
      69                 :     If the queue is empty and no reactor is running, one thread becomes
      70                 :     the reactor and runs epoll_wait. Other threads wait on a condition
      71                 :     variable until handlers are available.
      72                 : 
      73                 :     @par Thread Safety
      74                 :     All public member functions are thread-safe.
      75                 : */
      76                 : class BOOST_COROSIO_DECL epoll_scheduler final
      77                 :     : public native_scheduler
      78                 :     , public capy::execution_context::service
      79                 : {
      80                 : public:
      81                 :     using key_type = scheduler;
      82                 : 
      83                 :     /** Construct the scheduler.
      84                 : 
      85                 :         Creates an epoll instance, eventfd for reactor interruption,
      86                 :         and timerfd for kernel-managed timer expiry.
      87                 : 
      88                 :         @param ctx Reference to the owning execution_context.
      89                 :         @param concurrency_hint Hint for expected thread count (unused).
      90                 :     */
      91                 :     epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
      92                 : 
      93                 :     /// Destroy the scheduler.
      94                 :     ~epoll_scheduler() override;
      95                 : 
      96                 :     epoll_scheduler(epoll_scheduler const&)            = delete;
      97                 :     epoll_scheduler& operator=(epoll_scheduler const&) = delete;
      98                 : 
      99                 :     void shutdown() override;
     100                 :     void post(std::coroutine_handle<> h) const override;
     101                 :     void post(scheduler_op* h) const override;
     102                 :     bool running_in_this_thread() const noexcept override;
     103                 :     void stop() override;
     104                 :     bool stopped() const noexcept override;
     105                 :     void restart() override;
     106                 :     std::size_t run() override;
     107                 :     std::size_t run_one() override;
     108                 :     std::size_t wait_one(long usec) override;
     109                 :     std::size_t poll() override;
     110                 :     std::size_t poll_one() override;
     111                 : 
     112                 :     /** Return the epoll file descriptor.
     113                 : 
     114                 :         Used by socket services to register file descriptors
     115                 :         for I/O event notification.
     116                 : 
     117                 :         @return The epoll file descriptor.
     118                 :     */
     119                 :     int epoll_fd() const noexcept
     120                 :     {
     121                 :         return epoll_fd_;
     122                 :     }
     123                 : 
     124                 :     /** Reset the thread's inline completion budget.
     125                 : 
     126                 :         Called at the start of each posted completion handler to
     127                 :         grant a fresh budget for speculative inline completions.
     128                 :     */
     129                 :     void reset_inline_budget() const noexcept;
     130                 : 
     131                 :     /** Consume one unit of inline budget if available.
     132                 : 
     133                 :         @return True if budget was available and consumed.
     134                 :     */
     135                 :     bool try_consume_inline_budget() const noexcept;
     136                 : 
     137                 :     /** Register a descriptor for persistent monitoring.
     138                 : 
     139                 :         The fd is registered once and stays registered until explicitly
     140                 :         deregistered. Events are dispatched via descriptor_state which
     141                 :         tracks pending read/write/connect operations.
     142                 : 
     143                 :         @param fd The file descriptor to register.
     144                 :         @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
     145                 :     */
     146                 :     void register_descriptor(int fd, descriptor_state* desc) const;
     147                 : 
     148                 :     /** Ensure EPOLLOUT is registered for a descriptor.
     149                 : 
     150                 :         No-op if the write interest is already registered. Called lazily
     151                 :         before the first write or connect operation that needs async
     152                 :         notification. Uses EPOLL_CTL_MOD to add EPOLLOUT to the
     153                 :         existing registration.
     154                 : 
     155                 :         @param fd The file descriptor.
     156                 :         @param desc The descriptor_state that owns the fd.
     157                 :     */
     158                 :     void ensure_write_events(int fd, descriptor_state* desc) const;
     159                 : 
     160                 :     /** Deregister a persistently registered descriptor.
     161                 : 
     162                 :         @param fd The file descriptor to deregister.
     163                 :     */
     164                 :     void deregister_descriptor(int fd) const;
     165                 : 
     166                 :     void work_started() noexcept override;
     167                 :     void work_finished() noexcept override;
     168                 : 
     169                 :     /** Offset a forthcoming work_finished from work_cleanup.
     170                 : 
     171                 :         Called by descriptor_state when all I/O returned EAGAIN and no
     172                 :         handler will be executed. Must be called from a scheduler thread.
     173                 :     */
     174                 :     void compensating_work_started() const noexcept;
     175                 : 
     176                 :     /** Drain work from thread context's private queue to global queue.
     177                 : 
     178                 :         Called by thread_context_guard destructor when a thread exits run().
     179                 :         Transfers pending work to the global queue under mutex protection.
     180                 : 
     181                 :         @param queue The private queue to drain.
     182                 :         @param count Item count for wakeup decisions (wakes other threads if positive).
     183                 :     */
     184                 :     void drain_thread_queue(op_queue& queue, long count) const;
     185                 : 
     186                 :     /** Post completed operations for deferred invocation.
     187                 : 
     188                 :         If called from a thread running this scheduler, operations go to
     189                 :         the thread's private queue (fast path). Otherwise, operations are
     190                 :         added to the global queue under mutex and a waiter is signaled.
     191                 : 
     192                 :         @par Preconditions
     193                 :         work_started() must have been called for each operation.
     194                 : 
     195                 :         @param ops Queue of operations to post.
     196                 :     */
     197                 :     void post_deferred_completions(op_queue& ops) const;
     198                 : 
     199                 : private:
     200                 :     struct work_cleanup
     201                 :     {
     202                 :         epoll_scheduler* scheduler;
     203                 :         std::unique_lock<std::mutex>* lock;
     204                 :         epoll::scheduler_context* ctx;
     205                 :         ~work_cleanup();
     206                 :     };
     207                 : 
     208                 :     struct task_cleanup
     209                 :     {
     210                 :         epoll_scheduler const* scheduler;
     211                 :         std::unique_lock<std::mutex>* lock;
     212                 :         epoll::scheduler_context* ctx;
     213                 :         ~task_cleanup();
     214                 :     };
     215                 : 
     216                 :     std::size_t do_one(
     217                 :         std::unique_lock<std::mutex>& lock,
     218                 :         long timeout_us,
     219                 :         epoll::scheduler_context* ctx);
     220                 :     void
     221                 :     run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
     222                 :     void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
     223                 :     void interrupt_reactor() const;
     224                 :     void update_timerfd() const;
     225                 : 
     226                 :     /** Set the signaled state and wake all waiting threads.
     227                 : 
     228                 :         @par Preconditions
     229                 :         Mutex must be held.
     230                 : 
     231                 :         @param lock The held mutex lock.
     232                 :     */
     233                 :     void signal_all(std::unique_lock<std::mutex>& lock) const;
     234                 : 
     235                 :     /** Set the signaled state and wake one waiter if any exist.
     236                 : 
     237                 :         Only unlocks and signals if at least one thread is waiting.
     238                 :         Use this when the caller needs to perform a fallback action
     239                 :         (such as interrupting the reactor) when no waiters exist.
     240                 : 
     241                 :         @par Preconditions
     242                 :         Mutex must be held.
     243                 : 
     244                 :         @param lock The held mutex lock.
     245                 : 
     246                 :         @return `true` if unlocked and signaled, `false` if lock still held.
     247                 :     */
     248                 :     bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
     249                 : 
     250                 :     /** Set the signaled state, unlock, and wake one waiter if any exist.
     251                 : 
     252                 :         Always unlocks the mutex. Use this when the caller will release
     253                 :         the lock regardless of whether a waiter exists.
     254                 : 
     255                 :         @par Preconditions
     256                 :         Mutex must be held.
     257                 : 
     258                 :         @param lock The held mutex lock.
     259                 : 
     260                 :         @return `true` if a waiter was signaled, `false` otherwise.
     261                 :     */
     262                 :     bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
     263                 : 
     264                 :     /** Clear the signaled state before waiting.
     265                 : 
     266                 :         @par Preconditions
     267                 :         Mutex must be held.
     268                 :     */
     269                 :     void clear_signal() const;
     270                 : 
     271                 :     /** Block until the signaled state is set.
     272                 : 
     273                 :         Returns immediately if already signaled (fast-path). Otherwise
     274                 :         increments the waiter count, waits on the condition variable,
     275                 :         and decrements the waiter count upon waking.
     276                 : 
     277                 :         @par Preconditions
     278                 :         Mutex must be held.
     279                 : 
     280                 :         @param lock The held mutex lock.
     281                 :     */
     282                 :     void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
     283                 : 
     284                 :     /** Block until signaled or timeout expires.
     285                 : 
     286                 :         @par Preconditions
     287                 :         Mutex must be held.
     288                 : 
     289                 :         @param lock The held mutex lock.
     290                 :         @param timeout_us Maximum time to wait in microseconds.
     291                 :     */
     292                 :     void wait_for_signal_for(
     293                 :         std::unique_lock<std::mutex>& lock, long timeout_us) const;
     294                 : 
     295                 :     int epoll_fd_;
     296                 :     int event_fd_; // for interrupting reactor
     297                 :     int timer_fd_; // timerfd for kernel-managed timer expiry
     298                 :     mutable std::mutex mutex_;
     299                 :     mutable std::condition_variable cond_;
     300                 :     mutable op_queue completed_ops_;
     301                 :     mutable std::atomic<long> outstanding_work_;
     302                 :     bool stopped_;
     303                 : 
     304                 :     // True while a thread is blocked in epoll_wait. Used by
     305                 :     // wake_one_thread_and_unlock and work_finished to know when
     306                 :     // an eventfd interrupt is needed instead of a condvar signal.
     307                 :     mutable std::atomic<bool> task_running_{false};
     308                 : 
     309                 :     // True when the reactor has been told to do a non-blocking poll
     310                 :     // (more handlers queued or poll mode). Prevents redundant eventfd
     311                 :     // writes and controls the epoll_wait timeout.
     312                 :     mutable bool task_interrupted_ = false;
     313                 : 
     314                 :     // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
     315                 :     mutable std::size_t state_ = 0;
     316                 : 
     317                 :     // Edge-triggered eventfd state
     318                 :     mutable std::atomic<bool> eventfd_armed_{false};
     319                 : 
     320                 :     // Set when the earliest timer changes; flushed before epoll_wait
     321                 :     // blocks. Avoids timerfd_settime syscalls for timers that are
     322                 :     // scheduled then cancelled without being waited on.
     323                 :     mutable std::atomic<bool> timerfd_stale_{false};
     324                 : 
     325                 :     // Sentinel operation for interleaving reactor runs with handler execution.
     326                 :     // Ensures the reactor runs periodically even when handlers are continuously
     327                 :     // posted, preventing starvation of I/O events, timers, and signals.
     328                 :     struct task_op final : scheduler_op
     329                 :     {
     330 MIS           0 :         void operator()() override {}
     331               0 :         void destroy() override {}
     332                 :     };
     333                 :     task_op task_op_;
     334                 : };
     335                 : 
     336                 : //--------------------------------------------------------------------------
     337                 : //
     338                 : // Implementation
     339                 : //
     340                 : //--------------------------------------------------------------------------
     341                 : 
     342                 : /*
     343                 :     epoll Scheduler - Single Reactor Model
     344                 :     ======================================
     345                 : 
     346                 :     This scheduler uses a thread coordination strategy to provide handler
     347                 :     parallelism and avoid the thundering herd problem.
     348                 :     Instead of all threads blocking on epoll_wait(), one thread becomes the
     349                 :     "reactor" while others wait on a condition variable for handler work.
     350                 : 
     351                 :     Thread Model
     352                 :     ------------
     353                 :     - ONE thread runs epoll_wait() at a time (the reactor thread)
     354                 :     - OTHER threads wait on cond_ (condition variable) for handlers
     355                 :     - When work is posted, exactly one waiting thread wakes via notify_one()
     356                 :     - This matches Windows IOCP semantics where N posted items wake N threads
     357                 : 
     358                 :     Event Loop Structure (do_one)
     359                 :     -----------------------------
     360                 :     1. Lock mutex, try to pop handler from queue
     361                 :     2. If got handler: execute it (unlocked), return
     362                 :     3. If queue empty and no reactor running: become reactor
     363                 :        - Run epoll_wait (unlocked), queue I/O completions, loop back
     364                 :     4. If queue empty and reactor running: wait on condvar for work
     365                 : 
     366                 :     The task_running_ flag ensures only one thread owns epoll_wait().
     367                 :     After the reactor queues I/O completions, it loops back to try getting
     368                 :     a handler, giving priority to handler execution over more I/O polling.
     369                 : 
     370                 :     Signaling State (state_)
     371                 :     ------------------------
     372                 :     The state_ variable encodes two pieces of information:
     373                 :     - Bit 0: signaled flag (1 = signaled, persists until cleared)
     374                 :     - Upper bits: waiter count (each waiter adds 2 before blocking)
     375                 : 
     376                 :     This allows efficient coordination:
     377                 :     - Signalers only call notify when waiters exist (state_ > 1)
     378                 :     - Waiters check if already signaled before blocking (fast-path)
     379                 : 
     380                 :     Wake Coordination (wake_one_thread_and_unlock)
     381                 :     ----------------------------------------------
     382                 :     When posting work:
     383                 :     - If waiters exist (state_ > 1): signal and notify_one()
     384                 :     - Else if reactor running: interrupt via eventfd write
     385                 :     - Else: no-op (thread will find work when it checks queue)
     386                 : 
     387                 :     This avoids waking threads unnecessarily. With cascading wakes,
     388                 :     each handler execution wakes at most one additional thread if
     389                 :     more work exists in the queue.
     390                 : 
     391                 :     Work Counting
     392                 :     -------------
     393                 :     outstanding_work_ tracks pending operations. When it hits zero, run()
     394                 :     returns. Each operation increments on start, decrements on completion.
     395                 : 
     396                 :     Timer Integration
     397                 :     -----------------
     398                 :     Timers are handled by timer_service. The reactor adjusts epoll_wait
     399                 :     timeout to wake for the nearest timer expiry. When a new timer is
     400                 :     scheduled earlier than current, timer_service calls interrupt_reactor()
     401                 :     to re-evaluate the timeout.
     402                 : */
     403                 : 
     404                 : namespace epoll {
     405                 : 
     406                 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
     407                 : {
     408                 :     epoll_scheduler const* key;
     409                 :     scheduler_context* next;
     410                 :     op_queue private_queue;
     411                 :     long private_outstanding_work;
     412                 :     int inline_budget;
     413                 :     int inline_budget_max;
     414                 :     bool unassisted;
     415                 : 
     416 HIT         217 :     scheduler_context(epoll_scheduler const* k, scheduler_context* n)
     417             217 :         : key(k)
     418             217 :         , next(n)
     419             217 :         , private_outstanding_work(0)
     420             217 :         , inline_budget(0)
     421             217 :         , inline_budget_max(2)
     422             217 :         , unassisted(false)
     423                 :     {
     424             217 :     }
     425                 : };
     426                 : 
     427                 : inline thread_local_ptr<scheduler_context> context_stack;
     428                 : 
     429                 : struct thread_context_guard
     430                 : {
     431                 :     scheduler_context frame_;
     432                 : 
     433             217 :     explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
     434             217 :         : frame_(ctx, context_stack.get())
     435                 :     {
     436             217 :         context_stack.set(&frame_);
     437             217 :     }
     438                 : 
     439             217 :     ~thread_context_guard() noexcept
     440                 :     {
     441             217 :         if (!frame_.private_queue.empty())
     442 MIS           0 :             frame_.key->drain_thread_queue(
     443               0 :                 frame_.private_queue, frame_.private_outstanding_work);
     444 HIT         217 :         context_stack.set(frame_.next);
     445             217 :     }
     446                 : };
     447                 : 
     448                 : inline scheduler_context*
     449          411440 : find_context(epoll_scheduler const* self) noexcept
     450                 : {
     451          411440 :     for (auto* c = context_stack.get(); c != nullptr; c = c->next)
     452          409705 :         if (c->key == self)
     453          409705 :             return c;
     454            1735 :     return nullptr;
     455                 : }
     456                 : 
     457                 : } // namespace epoll
     458                 : 
     459                 : inline void
     460           59286 : epoll_scheduler::reset_inline_budget() const noexcept
     461                 : {
     462           59286 :     if (auto* ctx = epoll::find_context(this))
     463                 :     {
     464                 :         // Cap when no other thread absorbed queued work. A moderate
     465                 :         // cap (4) amortizes scheduling for small buffers while avoiding
     466                 :         // bursty I/O that fills socket buffers and stalls large transfers.
     467           59286 :         if (ctx->unassisted)
     468                 :         {
     469           59286 :             ctx->inline_budget_max = 4;
     470           59286 :             ctx->inline_budget     = 4;
     471           59286 :             return;
     472                 :         }
     473                 :         // Ramp up when previous cycle fully consumed budget.
     474                 :         // Reset on partial consumption (EAGAIN hit or peer got scheduled).
     475 MIS           0 :         if (ctx->inline_budget == 0)
     476               0 :             ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
     477               0 :         else if (ctx->inline_budget < ctx->inline_budget_max)
     478               0 :             ctx->inline_budget_max = 2;
     479               0 :         ctx->inline_budget = ctx->inline_budget_max;
     480                 :     }
     481                 : }
     482                 : 
     483                 : inline bool
     484 HIT      254803 : epoll_scheduler::try_consume_inline_budget() const noexcept
     485                 : {
     486          254803 :     if (auto* ctx = epoll::find_context(this))
     487                 :     {
     488          254803 :         if (ctx->inline_budget > 0)
     489                 :         {
     490          203913 :             --ctx->inline_budget;
     491          203913 :             return true;
     492                 :         }
     493                 :     }
     494           50890 :     return false;
     495                 : }
     496                 : 
     497                 : inline void
     498           43871 : descriptor_state::operator()()
     499                 : {
     500           43871 :     is_enqueued_.store(false, std::memory_order_relaxed);
     501                 : 
     502                 :     // Take ownership of impl ref set by close_socket() to prevent
     503                 :     // the owning impl from being freed while we're executing
     504           43871 :     auto prevent_impl_destruction = std::move(impl_ref_);
     505                 : 
     506           43871 :     std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
     507           43871 :     if (ev == 0)
     508                 :     {
     509 MIS           0 :         scheduler_->compensating_work_started();
     510               0 :         return;
     511                 :     }
     512                 : 
     513 HIT       43871 :     op_queue local_ops;
     514                 : 
     515           43871 :     int err = 0;
     516           43871 :     if (ev & EPOLLERR)
     517                 :     {
     518               2 :         socklen_t len = sizeof(err);
     519               2 :         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     520 MIS           0 :             err = errno;
     521 HIT           2 :         if (err == 0)
     522 MIS           0 :             err = EIO;
     523                 :     }
     524                 : 
     525                 :     {
     526 HIT       43871 :         std::lock_guard lock(mutex);
     527           43871 :         if (ev & EPOLLIN)
     528                 :         {
     529           14220 :             if (read_op)
     530                 :             {
     531            4095 :                 auto* rd = read_op;
     532            4095 :                 if (err)
     533 MIS           0 :                     rd->complete(err, 0);
     534                 :                 else
     535 HIT        4095 :                     rd->perform_io();
     536                 : 
     537            4095 :                 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
     538                 :                 {
     539 MIS           0 :                     rd->errn = 0;
     540                 :                 }
     541                 :                 else
     542                 :                 {
     543 HIT        4095 :                     read_op = nullptr;
     544            4095 :                     local_ops.push(rd);
     545                 :                 }
     546                 :             }
     547                 :             else
     548                 :             {
     549           10125 :                 read_ready = true;
     550                 :             }
     551                 :         }
     552           43871 :         if (ev & EPOLLOUT)
     553                 :         {
     554           39728 :             bool had_write_op = (connect_op || write_op);
     555           39728 :             if (connect_op)
     556                 :             {
     557            4095 :                 auto* cn = connect_op;
     558            4095 :                 if (err)
     559               2 :                     cn->complete(err, 0);
     560                 :                 else
     561            4093 :                     cn->perform_io();
     562                 : 
     563            4095 :                 if (cn->errn == EAGAIN || cn->errn == EWOULDBLOCK)
     564                 :                 {
     565 MIS           0 :                     cn->errn = 0;
     566                 :                 }
     567                 :                 else
     568                 :                 {
     569 HIT        4095 :                     connect_op = nullptr;
     570            4095 :                     local_ops.push(cn);
     571                 :                 }
     572                 :             }
     573           39728 :             if (write_op)
     574                 :             {
     575 MIS           0 :                 auto* wr = write_op;
     576               0 :                 if (err)
     577               0 :                     wr->complete(err, 0);
     578                 :                 else
     579               0 :                     wr->perform_io();
     580                 : 
     581               0 :                 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
     582                 :                 {
     583               0 :                     wr->errn = 0;
     584                 :                 }
     585                 :                 else
     586                 :                 {
     587               0 :                     write_op = nullptr;
     588               0 :                     local_ops.push(wr);
     589                 :                 }
     590                 :             }
     591 HIT       39728 :             if (!had_write_op)
     592           35633 :                 write_ready = true;
     593                 :         }
     594           43871 :         if (err)
     595                 :         {
     596               2 :             if (read_op)
     597                 :             {
     598 MIS           0 :                 read_op->complete(err, 0);
     599               0 :                 local_ops.push(std::exchange(read_op, nullptr));
     600                 :             }
     601 HIT           2 :             if (write_op)
     602                 :             {
     603 MIS           0 :                 write_op->complete(err, 0);
     604               0 :                 local_ops.push(std::exchange(write_op, nullptr));
     605                 :             }
     606 HIT           2 :             if (connect_op)
     607                 :             {
     608 MIS           0 :                 connect_op->complete(err, 0);
     609               0 :                 local_ops.push(std::exchange(connect_op, nullptr));
     610                 :             }
     611                 :         }
     612 HIT       43871 :     }
     613                 : 
     614                 :     // Execute first handler inline — the scheduler's work_cleanup
     615                 :     // accounts for this as the "consumed" work item
     616           43871 :     scheduler_op* first = local_ops.pop();
     617           43871 :     if (first)
     618                 :     {
     619            8190 :         scheduler_->post_deferred_completions(local_ops);
     620            8190 :         (*first)();
     621                 :     }
     622                 :     else
     623                 :     {
     624           35681 :         scheduler_->compensating_work_started();
     625                 :     }
     626           43871 : }
     627                 : 
     628             244 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
     629             244 :     : epoll_fd_(-1)
     630             244 :     , event_fd_(-1)
     631             244 :     , timer_fd_(-1)
     632             244 :     , outstanding_work_(0)
     633             244 :     , stopped_(false)
     634             244 :     , task_running_{false}
     635             244 :     , task_interrupted_(false)
     636             488 :     , state_(0)
     637                 : {
     638             244 :     epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
     639             244 :     if (epoll_fd_ < 0)
     640 MIS           0 :         detail::throw_system_error(make_err(errno), "epoll_create1");
     641                 : 
     642 HIT         244 :     event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
     643             244 :     if (event_fd_ < 0)
     644                 :     {
     645 MIS           0 :         int errn = errno;
     646               0 :         ::close(epoll_fd_);
     647               0 :         detail::throw_system_error(make_err(errn), "eventfd");
     648                 :     }
     649                 : 
     650 HIT         244 :     timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
     651             244 :     if (timer_fd_ < 0)
     652                 :     {
     653 MIS           0 :         int errn = errno;
     654               0 :         ::close(event_fd_);
     655               0 :         ::close(epoll_fd_);
     656               0 :         detail::throw_system_error(make_err(errn), "timerfd_create");
     657                 :     }
     658                 : 
     659 HIT         244 :     epoll_event ev{};
     660             244 :     ev.events   = EPOLLIN | EPOLLET;
     661             244 :     ev.data.ptr = nullptr;
     662             244 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
     663                 :     {
     664 MIS           0 :         int errn = errno;
     665               0 :         ::close(timer_fd_);
     666               0 :         ::close(event_fd_);
     667               0 :         ::close(epoll_fd_);
     668               0 :         detail::throw_system_error(make_err(errn), "epoll_ctl");
     669                 :     }
     670                 : 
     671 HIT         244 :     epoll_event timer_ev{};
     672             244 :     timer_ev.events   = EPOLLIN | EPOLLERR;
     673             244 :     timer_ev.data.ptr = &timer_fd_;
     674             244 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
     675                 :     {
     676 MIS           0 :         int errn = errno;
     677               0 :         ::close(timer_fd_);
     678               0 :         ::close(event_fd_);
     679               0 :         ::close(epoll_fd_);
     680               0 :         detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
     681                 :     }
     682                 : 
     683 HIT         244 :     timer_svc_ = &get_timer_service(ctx, *this);
     684             244 :     timer_svc_->set_on_earliest_changed(
     685            4557 :         timer_service::callback(this, [](void* p) {
     686            4313 :             auto* self = static_cast<epoll_scheduler*>(p);
     687            4313 :             self->timerfd_stale_.store(true, std::memory_order_release);
     688            4313 :             if (self->task_running_.load(std::memory_order_acquire))
     689 MIS           0 :                 self->interrupt_reactor();
     690 HIT        4313 :         }));
     691                 : 
     692                 :     // Initialize resolver service
     693             244 :     get_resolver_service(ctx, *this);
     694                 : 
     695                 :     // Initialize signal service
     696             244 :     get_signal_service(ctx, *this);
     697                 : 
     698                 :     // Push task sentinel to interleave reactor runs with handler execution
     699             244 :     completed_ops_.push(&task_op_);
     700             244 : }
     701                 : 
     702             488 : inline epoll_scheduler::~epoll_scheduler()
     703                 : {
     704             244 :     if (timer_fd_ >= 0)
     705             244 :         ::close(timer_fd_);
     706             244 :     if (event_fd_ >= 0)
     707             244 :         ::close(event_fd_);
     708             244 :     if (epoll_fd_ >= 0)
     709             244 :         ::close(epoll_fd_);
     710             488 : }
     711                 : 
     712                 : inline void
     713             244 : epoll_scheduler::shutdown()
     714                 : {
     715                 :     {
     716             244 :         std::unique_lock lock(mutex_);
     717                 : 
     718             523 :         while (auto* h = completed_ops_.pop())
     719                 :         {
     720             279 :             if (h == &task_op_)
     721             244 :                 continue;
     722              35 :             lock.unlock();
     723              35 :             h->destroy();
     724              35 :             lock.lock();
     725             279 :         }
     726                 : 
     727             244 :         signal_all(lock);
     728             244 :     }
     729                 : 
     730             244 :     if (event_fd_ >= 0)
     731             244 :         interrupt_reactor();
     732             244 : }
     733                 : 
     734                 : inline void
     735            6203 : epoll_scheduler::post(std::coroutine_handle<> h) const
     736                 : {
     737                 :     struct post_handler final : scheduler_op
     738                 :     {
     739                 :         std::coroutine_handle<> h_;
     740                 : 
     741            6203 :         explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
     742                 : 
     743           12406 :         ~post_handler() override = default;
     744                 : 
     745            6197 :         void operator()() override
     746                 :         {
     747            6197 :             auto h = h_;
     748            6197 :             delete this;
     749            6197 :             h.resume();
     750            6197 :         }
     751                 : 
     752               6 :         void destroy() override
     753                 :         {
     754               6 :             auto h = h_;
     755               6 :             delete this;
     756               6 :             h.destroy();
     757               6 :         }
     758                 :     };
     759                 : 
     760            6203 :     auto ph = std::make_unique<post_handler>(h);
     761                 : 
     762                 :     // Fast path: same thread posts to private queue
     763                 :     // Only count locally; work_cleanup batches to global counter
     764            6203 :     if (auto* ctx = epoll::find_context(this))
     765                 :     {
     766            4498 :         ++ctx->private_outstanding_work;
     767            4498 :         ctx->private_queue.push(ph.release());
     768            4498 :         return;
     769                 :     }
     770                 : 
     771                 :     // Slow path: cross-thread post requires mutex
     772            1705 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     773                 : 
     774            1705 :     std::unique_lock lock(mutex_);
     775            1705 :     completed_ops_.push(ph.release());
     776            1705 :     wake_one_thread_and_unlock(lock);
     777            6203 : }
     778                 : 
     779                 : inline void
     780           55467 : epoll_scheduler::post(scheduler_op* h) const
     781                 : {
     782                 :     // Fast path: same thread posts to private queue
     783                 :     // Only count locally; work_cleanup batches to global counter
     784           55467 :     if (auto* ctx = epoll::find_context(this))
     785                 :     {
     786           55437 :         ++ctx->private_outstanding_work;
     787           55437 :         ctx->private_queue.push(h);
     788           55437 :         return;
     789                 :     }
     790                 : 
     791                 :     // Slow path: cross-thread post requires mutex
     792              30 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     793                 : 
     794              30 :     std::unique_lock lock(mutex_);
     795              30 :     completed_ops_.push(h);
     796              30 :     wake_one_thread_and_unlock(lock);
     797              30 : }
     798                 : 
     799                 : inline bool
     800             736 : epoll_scheduler::running_in_this_thread() const noexcept
     801                 : {
     802             736 :     for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
     803             450 :         if (c->key == this)
     804             450 :             return true;
     805             286 :     return false;
     806                 : }
     807                 : 
     808                 : inline void
     809             222 : epoll_scheduler::stop()
     810                 : {
     811             222 :     std::unique_lock lock(mutex_);
     812             222 :     if (!stopped_)
     813                 :     {
     814             191 :         stopped_ = true;
     815             191 :         signal_all(lock);
     816             191 :         interrupt_reactor();
     817                 :     }
     818             222 : }
     819                 : 
     820                 : inline bool
     821              18 : epoll_scheduler::stopped() const noexcept
     822                 : {
     823              18 :     std::unique_lock lock(mutex_);
     824              36 :     return stopped_;
     825              18 : }
     826                 : 
     827                 : inline void
     828              53 : epoll_scheduler::restart()
     829                 : {
     830              53 :     std::unique_lock lock(mutex_);
     831              53 :     stopped_ = false;
     832              53 : }
     833                 : 
     834                 : inline std::size_t
     835             209 : epoll_scheduler::run()
     836                 : {
     837             418 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     838                 :     {
     839              26 :         stop();
     840              26 :         return 0;
     841                 :     }
     842                 : 
     843             183 :     epoll::thread_context_guard ctx(this);
     844             183 :     std::unique_lock lock(mutex_);
     845                 : 
     846             183 :     std::size_t n = 0;
     847                 :     for (;;)
     848                 :     {
     849          105679 :         if (!do_one(lock, -1, &ctx.frame_))
     850             183 :             break;
     851          105496 :         if (n != (std::numeric_limits<std::size_t>::max)())
     852          105496 :             ++n;
     853          105496 :         if (!lock.owns_lock())
     854           49881 :             lock.lock();
     855                 :     }
     856             183 :     return n;
     857             183 : }
     858                 : 
     859                 : inline std::size_t
     860               2 : epoll_scheduler::run_one()
     861                 : {
     862               4 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     863                 :     {
     864 MIS           0 :         stop();
     865               0 :         return 0;
     866                 :     }
     867                 : 
     868 HIT           2 :     epoll::thread_context_guard ctx(this);
     869               2 :     std::unique_lock lock(mutex_);
     870               2 :     return do_one(lock, -1, &ctx.frame_);
     871               2 : }
     872                 : 
     873                 : inline std::size_t
     874              34 : epoll_scheduler::wait_one(long usec)
     875                 : {
     876              68 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     877                 :     {
     878               7 :         stop();
     879               7 :         return 0;
     880                 :     }
     881                 : 
     882              27 :     epoll::thread_context_guard ctx(this);
     883              27 :     std::unique_lock lock(mutex_);
     884              27 :     return do_one(lock, usec, &ctx.frame_);
     885              27 : }
     886                 : 
     887                 : inline std::size_t
     888               4 : epoll_scheduler::poll()
     889                 : {
     890               8 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     891                 :     {
     892               1 :         stop();
     893               1 :         return 0;
     894                 :     }
     895                 : 
     896               3 :     epoll::thread_context_guard ctx(this);
     897               3 :     std::unique_lock lock(mutex_);
     898                 : 
     899               3 :     std::size_t n = 0;
     900                 :     for (;;)
     901                 :     {
     902               7 :         if (!do_one(lock, 0, &ctx.frame_))
     903               3 :             break;
     904               4 :         if (n != (std::numeric_limits<std::size_t>::max)())
     905               4 :             ++n;
     906               4 :         if (!lock.owns_lock())
     907               4 :             lock.lock();
     908                 :     }
     909               3 :     return n;
     910               3 : }
     911                 : 
     912                 : inline std::size_t
     913               4 : epoll_scheduler::poll_one()
     914                 : {
     915               8 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     916                 :     {
     917               2 :         stop();
     918               2 :         return 0;
     919                 :     }
     920                 : 
     921               2 :     epoll::thread_context_guard ctx(this);
     922               2 :     std::unique_lock lock(mutex_);
     923               2 :     return do_one(lock, 0, &ctx.frame_);
     924               2 : }
     925                 : 
     926                 : inline void
     927            8278 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
     928                 : {
     929                 :     // Only register read interest upfront. EPOLLOUT is deferred until
     930                 :     // the first write/connect op to avoid a spurious write-ready event
     931                 :     // on freshly opened (always-writable) sockets.
     932            8278 :     epoll_event ev{};
     933            8278 :     ev.events   = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP;
     934            8278 :     ev.data.ptr = desc;
     935                 : 
     936            8278 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
     937 MIS           0 :         detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
     938                 : 
     939 HIT        8278 :     desc->registered_events = ev.events;
     940            8278 :     desc->fd                = fd;
     941            8278 :     desc->scheduler_        = this;
     942                 : 
     943            8278 :     std::lock_guard lock(desc->mutex);
     944            8278 :     desc->read_ready  = false;
     945            8278 :     desc->write_ready = false;
     946            8278 : }
     947                 : 
     948                 : inline void
     949            4095 : epoll_scheduler::ensure_write_events(int fd, descriptor_state* desc) const
     950                 : {
     951            4095 :     std::lock_guard lock(desc->mutex);
     952            4095 :     if (desc->registered_events & EPOLLOUT)
     953 MIS           0 :         return;
     954                 : 
     955 HIT        4095 :     epoll_event ev{};
     956            4095 :     ev.events   = desc->registered_events | EPOLLOUT;
     957            4095 :     ev.data.ptr = desc;
     958            4095 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev) == 0)
     959            4095 :         desc->registered_events = ev.events;
     960            4095 : }
     961                 : 
     962                 : inline void
     963            8278 : epoll_scheduler::deregister_descriptor(int fd) const
     964                 : {
     965            8278 :     ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
     966            8278 : }
     967                 : 
     968                 : inline void
     969           13478 : epoll_scheduler::work_started() noexcept
     970                 : {
     971           13478 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     972           13478 : }
     973                 : 
     974                 : inline void
     975           19512 : epoll_scheduler::work_finished() noexcept
     976                 : {
     977           39024 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     978             184 :         stop();
     979           19512 : }
     980                 : 
     981                 : inline void
     982           35681 : epoll_scheduler::compensating_work_started() const noexcept
     983                 : {
     984           35681 :     auto* ctx = epoll::find_context(this);
     985           35681 :     if (ctx)
     986           35681 :         ++ctx->private_outstanding_work;
     987           35681 : }
     988                 : 
     989                 : inline void
     990 MIS           0 : epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
     991                 : {
     992                 :     // Note: outstanding_work_ was already incremented when posting
     993               0 :     std::unique_lock lock(mutex_);
     994               0 :     completed_ops_.splice(queue);
     995               0 :     if (count > 0)
     996               0 :         maybe_unlock_and_signal_one(lock);
     997               0 : }
     998                 : 
     999                 : inline void
    1000 HIT        8190 : epoll_scheduler::post_deferred_completions(op_queue& ops) const
    1001                 : {
    1002            8190 :     if (ops.empty())
    1003            8190 :         return;
    1004                 : 
    1005                 :     // Fast path: if on scheduler thread, use private queue
    1006 MIS           0 :     if (auto* ctx = epoll::find_context(this))
    1007                 :     {
    1008               0 :         ctx->private_queue.splice(ops);
    1009               0 :         return;
    1010                 :     }
    1011                 : 
    1012                 :     // Slow path: add to global queue and wake a thread
    1013               0 :     std::unique_lock lock(mutex_);
    1014               0 :     completed_ops_.splice(ops);
    1015               0 :     wake_one_thread_and_unlock(lock);
    1016               0 : }
    1017                 : 
    1018                 : inline void
    1019 HIT         461 : epoll_scheduler::interrupt_reactor() const
    1020                 : {
    1021                 :     // Only write if not already armed to avoid redundant writes
    1022             461 :     bool expected = false;
    1023             461 :     if (eventfd_armed_.compare_exchange_strong(
    1024                 :             expected, true, std::memory_order_release,
    1025                 :             std::memory_order_relaxed))
    1026                 :     {
    1027             316 :         std::uint64_t val       = 1;
    1028             316 :         [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
    1029                 :     }
    1030             461 : }
    1031                 : 
    1032                 : inline void
    1033             435 : epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
    1034                 : {
    1035             435 :     state_ |= 1;
    1036             435 :     cond_.notify_all();
    1037             435 : }
    1038                 : 
    1039                 : inline bool
    1040            1735 : epoll_scheduler::maybe_unlock_and_signal_one(
    1041                 :     std::unique_lock<std::mutex>& lock) const
    1042                 : {
    1043            1735 :     state_ |= 1;
    1044            1735 :     if (state_ > 1)
    1045                 :     {
    1046 MIS           0 :         lock.unlock();
    1047               0 :         cond_.notify_one();
    1048               0 :         return true;
    1049                 :     }
    1050 HIT        1735 :     return false;
    1051                 : }
    1052                 : 
    1053                 : inline bool
    1054          134277 : epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
    1055                 : {
    1056          134277 :     state_ |= 1;
    1057          134277 :     bool have_waiters = state_ > 1;
    1058          134277 :     lock.unlock();
    1059          134277 :     if (have_waiters)
    1060 MIS           0 :         cond_.notify_one();
    1061 HIT      134277 :     return have_waiters;
    1062                 : }
    1063                 : 
    1064                 : inline void
    1065 MIS           0 : epoll_scheduler::clear_signal() const
    1066                 : {
    1067               0 :     state_ &= ~std::size_t(1);
    1068               0 : }
    1069                 : 
    1070                 : inline void
    1071               0 : epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
    1072                 : {
    1073               0 :     while ((state_ & 1) == 0)
    1074                 :     {
    1075               0 :         state_ += 2;
    1076               0 :         cond_.wait(lock);
    1077               0 :         state_ -= 2;
    1078                 :     }
    1079               0 : }
    1080                 : 
    1081                 : inline void
    1082               0 : epoll_scheduler::wait_for_signal_for(
    1083                 :     std::unique_lock<std::mutex>& lock, long timeout_us) const
    1084                 : {
    1085               0 :     if ((state_ & 1) == 0)
    1086                 :     {
    1087               0 :         state_ += 2;
    1088               0 :         cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
    1089               0 :         state_ -= 2;
    1090                 :     }
    1091               0 : }
    1092                 : 
    1093                 : inline void
    1094 HIT        1735 : epoll_scheduler::wake_one_thread_and_unlock(
    1095                 :     std::unique_lock<std::mutex>& lock) const
    1096                 : {
    1097            1735 :     if (maybe_unlock_and_signal_one(lock))
    1098 MIS           0 :         return;
    1099                 : 
    1100 HIT        1735 :     if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
    1101                 :     {
    1102              26 :         task_interrupted_ = true;
    1103              26 :         lock.unlock();
    1104              26 :         interrupt_reactor();
    1105                 :     }
    1106                 :     else
    1107                 :     {
    1108            1709 :         lock.unlock();
    1109                 :     }
    1110                 : }
    1111                 : 
    1112          105531 : inline epoll_scheduler::work_cleanup::~work_cleanup()
    1113                 : {
    1114          105531 :     if (ctx)
    1115                 :     {
    1116          105531 :         long produced = ctx->private_outstanding_work;
    1117          105531 :         if (produced > 1)
    1118               8 :             scheduler->outstanding_work_.fetch_add(
    1119                 :                 produced - 1, std::memory_order_relaxed);
    1120          105523 :         else if (produced < 1)
    1121           14224 :             scheduler->work_finished();
    1122          105531 :         ctx->private_outstanding_work = 0;
    1123                 : 
    1124          105531 :         if (!ctx->private_queue.empty())
    1125                 :         {
    1126           55626 :             lock->lock();
    1127           55626 :             scheduler->completed_ops_.splice(ctx->private_queue);
    1128                 :         }
    1129                 :     }
    1130                 :     else
    1131                 :     {
    1132 MIS           0 :         scheduler->work_finished();
    1133                 :     }
    1134 HIT      105531 : }
    1135                 : 
    1136           74332 : inline epoll_scheduler::task_cleanup::~task_cleanup()
    1137                 : {
    1138           37166 :     if (!ctx)
    1139 MIS           0 :         return;
    1140                 : 
    1141 HIT       37166 :     if (ctx->private_outstanding_work > 0)
    1142                 :     {
    1143            4295 :         scheduler->outstanding_work_.fetch_add(
    1144            4295 :             ctx->private_outstanding_work, std::memory_order_relaxed);
    1145            4295 :         ctx->private_outstanding_work = 0;
    1146                 :     }
    1147                 : 
    1148           37166 :     if (!ctx->private_queue.empty())
    1149                 :     {
    1150            4295 :         if (!lock->owns_lock())
    1151 MIS           0 :             lock->lock();
    1152 HIT        4295 :         scheduler->completed_ops_.splice(ctx->private_queue);
    1153                 :     }
    1154           37166 : }
    1155                 : 
    1156                 : inline void
    1157            8587 : epoll_scheduler::update_timerfd() const
    1158                 : {
    1159            8587 :     auto nearest = timer_svc_->nearest_expiry();
    1160                 : 
    1161            8587 :     itimerspec ts{};
    1162            8587 :     int flags = 0;
    1163                 : 
    1164            8587 :     if (nearest == timer_service::time_point::max())
    1165                 :     {
    1166                 :         // No timers - disarm by setting to 0 (relative)
    1167                 :     }
    1168                 :     else
    1169                 :     {
    1170            8542 :         auto now = std::chrono::steady_clock::now();
    1171            8542 :         if (nearest <= now)
    1172                 :         {
    1173                 :             // Use 1ns instead of 0 - zero disarms the timerfd
    1174             237 :             ts.it_value.tv_nsec = 1;
    1175                 :         }
    1176                 :         else
    1177                 :         {
    1178            8305 :             auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
    1179            8305 :                             nearest - now)
    1180            8305 :                             .count();
    1181            8305 :             ts.it_value.tv_sec  = nsec / 1000000000;
    1182            8305 :             ts.it_value.tv_nsec = nsec % 1000000000;
    1183                 :             // Ensure non-zero to avoid disarming if duration rounds to 0
    1184            8305 :             if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
    1185 MIS           0 :                 ts.it_value.tv_nsec = 1;
    1186                 :         }
    1187                 :     }
    1188                 : 
    1189 HIT        8587 :     if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
    1190 MIS           0 :         detail::throw_system_error(make_err(errno), "timerfd_settime");
    1191 HIT        8587 : }
    1192                 : 
    1193                 : inline void
    1194           37166 : epoll_scheduler::run_task(
    1195                 :     std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
    1196                 : {
    1197           37166 :     int timeout_ms = task_interrupted_ ? 0 : -1;
    1198                 : 
    1199           37166 :     if (lock.owns_lock())
    1200            8420 :         lock.unlock();
    1201                 : 
    1202           37166 :     task_cleanup on_exit{this, &lock, ctx};
    1203                 : 
    1204                 :     // Flush deferred timerfd programming before blocking
    1205           37166 :     if (timerfd_stale_.exchange(false, std::memory_order_acquire))
    1206            4292 :         update_timerfd();
    1207                 : 
    1208                 :     // Event loop runs without mutex held
    1209                 :     epoll_event events[128];
    1210           37166 :     int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
    1211                 : 
    1212           37166 :     if (nfds < 0 && errno != EINTR)
    1213 MIS           0 :         detail::throw_system_error(make_err(errno), "epoll_wait");
    1214                 : 
    1215 HIT       37166 :     bool check_timers = false;
    1216           37166 :     op_queue local_ops;
    1217                 : 
    1218                 :     // Process events without holding the mutex
    1219           85429 :     for (int i = 0; i < nfds; ++i)
    1220                 :     {
    1221           48263 :         if (events[i].data.ptr == nullptr)
    1222                 :         {
    1223                 :             std::uint64_t val;
    1224                 :             // Mutex released above; analyzer can't track unlock via ref
    1225                 :             // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
    1226              72 :             [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
    1227              72 :             eventfd_armed_.store(false, std::memory_order_relaxed);
    1228              72 :             continue;
    1229              72 :         }
    1230                 : 
    1231           48191 :         if (events[i].data.ptr == &timer_fd_)
    1232                 :         {
    1233                 :             std::uint64_t expirations;
    1234                 :             // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
    1235                 :             [[maybe_unused]] auto r =
    1236            4295 :                 ::read(timer_fd_, &expirations, sizeof(expirations));
    1237            4295 :             check_timers = true;
    1238            4295 :             continue;
    1239            4295 :         }
    1240                 : 
    1241                 :         // Deferred I/O: just set ready events and enqueue descriptor
    1242                 :         // No per-descriptor mutex locking in reactor hot path!
    1243           43896 :         auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
    1244           43896 :         desc->add_ready_events(events[i].events);
    1245                 : 
    1246                 :         // Only enqueue if not already enqueued
    1247           43896 :         bool expected = false;
    1248           43896 :         if (desc->is_enqueued_.compare_exchange_strong(
    1249                 :                 expected, true, std::memory_order_release,
    1250                 :                 std::memory_order_relaxed))
    1251                 :         {
    1252           43896 :             local_ops.push(desc);
    1253                 :         }
    1254                 :     }
    1255                 : 
    1256                 :     // Process timers only when timerfd fires
    1257           37166 :     if (check_timers)
    1258                 :     {
    1259            4295 :         timer_svc_->process_expired();
    1260            4295 :         update_timerfd();
    1261                 :     }
    1262                 : 
    1263           37166 :     lock.lock();
    1264                 : 
    1265           37166 :     if (!local_ops.empty())
    1266           28214 :         completed_ops_.splice(local_ops);
    1267           37166 : }
    1268                 : 
    1269                 : inline std::size_t
    1270          105717 : epoll_scheduler::do_one(
    1271                 :     std::unique_lock<std::mutex>& lock,
    1272                 :     long timeout_us,
    1273                 :     epoll::scheduler_context* ctx)
    1274                 : {
    1275                 :     for (;;)
    1276                 :     {
    1277          142883 :         if (stopped_)
    1278             183 :             return 0;
    1279                 : 
    1280          142700 :         scheduler_op* op = completed_ops_.pop();
    1281                 : 
    1282                 :         // Handle reactor sentinel - time to poll for I/O
    1283          142700 :         if (op == &task_op_)
    1284                 :         {
    1285           37168 :             bool more_handlers = !completed_ops_.empty();
    1286                 : 
    1287                 :             // Nothing to run the reactor for: no pending work to wait on,
    1288                 :             // or caller requested a non-blocking poll
    1289           45590 :             if (!more_handlers &&
    1290           16844 :                 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
    1291                 :                  timeout_us == 0))
    1292                 :             {
    1293               2 :                 completed_ops_.push(&task_op_);
    1294               2 :                 return 0;
    1295                 :             }
    1296                 : 
    1297           37166 :             task_interrupted_ = more_handlers || timeout_us == 0;
    1298           37166 :             task_running_.store(true, std::memory_order_release);
    1299                 : 
    1300           37166 :             if (more_handlers)
    1301           28746 :                 unlock_and_signal_one(lock);
    1302                 : 
    1303           37166 :             run_task(lock, ctx);
    1304                 : 
    1305           37166 :             task_running_.store(false, std::memory_order_relaxed);
    1306           37166 :             completed_ops_.push(&task_op_);
    1307           37166 :             continue;
    1308           37166 :         }
    1309                 : 
    1310                 :         // Handle operation
    1311          105532 :         if (op != nullptr)
    1312                 :         {
    1313          105531 :             bool more = !completed_ops_.empty();
    1314                 : 
    1315          105531 :             if (more)
    1316          105531 :                 ctx->unassisted = !unlock_and_signal_one(lock);
    1317                 :             else
    1318                 :             {
    1319 MIS           0 :                 ctx->unassisted = false;
    1320               0 :                 lock.unlock();
    1321                 :             }
    1322                 : 
    1323 HIT      105531 :             work_cleanup on_exit{this, &lock, ctx};
    1324                 : 
    1325          105531 :             (*op)();
    1326          105531 :             return 1;
    1327          105531 :         }
    1328                 : 
    1329                 :         // No pending work to wait on, or caller requested non-blocking poll
    1330               2 :         if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
    1331                 :             timeout_us == 0)
    1332               1 :             return 0;
    1333                 : 
    1334 MIS           0 :         clear_signal();
    1335               0 :         if (timeout_us < 0)
    1336               0 :             wait_for_signal(lock);
    1337                 :         else
    1338               0 :             wait_for_signal_for(lock, timeout_us);
    1339 HIT       37166 :     }
    1340                 : }
    1341                 : 
    1342                 : } // namespace boost::corosio::detail
    1343                 : 
    1344                 : #endif // BOOST_COROSIO_HAS_EPOLL
    1345                 : 
    1346                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
        

Generated by: LCOV version 2.3