1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/native_scheduler.hpp>
20  
#include <boost/corosio/native/native_scheduler.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
22  

22  

23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

28  

29  
#include <boost/corosio/detail/except.hpp>
29  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <condition_variable>
34  
#include <condition_variable>
35  
#include <cstddef>
35  
#include <cstddef>
36  
#include <cstdint>
36  
#include <cstdint>
37  
#include <limits>
37  
#include <limits>
38  
#include <mutex>
38  
#include <mutex>
39  
#include <utility>
39  
#include <utility>
40  

40  

41  
#include <errno.h>
41  
#include <errno.h>
42  
#include <fcntl.h>
42  
#include <fcntl.h>
43  
#include <sys/epoll.h>
43  
#include <sys/epoll.h>
44  
#include <sys/eventfd.h>
44  
#include <sys/eventfd.h>
45  
#include <sys/socket.h>
45  
#include <sys/socket.h>
46  
#include <sys/timerfd.h>
46  
#include <sys/timerfd.h>
47  
#include <unistd.h>
47  
#include <unistd.h>
48  

48  

49  
namespace boost::corosio::detail {
49  
namespace boost::corosio::detail {
50  

50  

51  
struct epoll_op;
51  
struct epoll_op;
52  
struct descriptor_state;
52  
struct descriptor_state;
53  
namespace epoll {
53  
namespace epoll {
54  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
54  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55  
} // namespace epoll
55  
} // namespace epoll
56  

56  

57  
/** Linux scheduler using epoll for I/O multiplexing.
57  
/** Linux scheduler using epoll for I/O multiplexing.
58  

58  

59  
    This scheduler implements the scheduler interface using Linux epoll
59  
    This scheduler implements the scheduler interface using Linux epoll
60  
    for efficient I/O event notification. It uses a single reactor model
60  
    for efficient I/O event notification. It uses a single reactor model
61  
    where one thread runs epoll_wait while other threads
61  
    where one thread runs epoll_wait while other threads
62  
    wait on a condition variable for handler work. This design provides:
62  
    wait on a condition variable for handler work. This design provides:
63  

63  

64  
    - Handler parallelism: N posted handlers can execute on N threads
64  
    - Handler parallelism: N posted handlers can execute on N threads
65  
    - No thundering herd: condition_variable wakes exactly one thread
65  
    - No thundering herd: condition_variable wakes exactly one thread
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
67  

67  

68  
    When threads call run(), they first try to execute queued handlers.
68  
    When threads call run(), they first try to execute queued handlers.
69  
    If the queue is empty and no reactor is running, one thread becomes
69  
    If the queue is empty and no reactor is running, one thread becomes
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
71  
    variable until handlers are available.
71  
    variable until handlers are available.
72  

72  

73  
    @par Thread Safety
73  
    @par Thread Safety
74  
    All public member functions are thread-safe.
74  
    All public member functions are thread-safe.
75  
*/
75  
*/
76  
class BOOST_COROSIO_DECL epoll_scheduler final
76  
class BOOST_COROSIO_DECL epoll_scheduler final
77  
    : public native_scheduler
77  
    : public native_scheduler
78  
    , public capy::execution_context::service
78  
    , public capy::execution_context::service
79  
{
79  
{
80  
public:
80  
public:
81  
    using key_type = scheduler;
81  
    using key_type = scheduler;
82  

82  

83  
    /** Construct the scheduler.
83  
    /** Construct the scheduler.
84  

84  

85  
        Creates an epoll instance, eventfd for reactor interruption,
85  
        Creates an epoll instance, eventfd for reactor interruption,
86  
        and timerfd for kernel-managed timer expiry.
86  
        and timerfd for kernel-managed timer expiry.
87  

87  

88  
        @param ctx Reference to the owning execution_context.
88  
        @param ctx Reference to the owning execution_context.
89  
        @param concurrency_hint Hint for expected thread count (unused).
89  
        @param concurrency_hint Hint for expected thread count (unused).
90  
    */
90  
    */
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92  

92  

93  
    /// Destroy the scheduler.
93  
    /// Destroy the scheduler.
94  
    ~epoll_scheduler() override;
94  
    ~epoll_scheduler() override;
95  

95  

96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98  

98  

99  
    void shutdown() override;
99  
    void shutdown() override;
100  
    void post(std::coroutine_handle<> h) const override;
100  
    void post(std::coroutine_handle<> h) const override;
101  
    void post(scheduler_op* h) const override;
101  
    void post(scheduler_op* h) const override;
102  
    bool running_in_this_thread() const noexcept override;
102  
    bool running_in_this_thread() const noexcept override;
103  
    void stop() override;
103  
    void stop() override;
104  
    bool stopped() const noexcept override;
104  
    bool stopped() const noexcept override;
105  
    void restart() override;
105  
    void restart() override;
106  
    std::size_t run() override;
106  
    std::size_t run() override;
107  
    std::size_t run_one() override;
107  
    std::size_t run_one() override;
108  
    std::size_t wait_one(long usec) override;
108  
    std::size_t wait_one(long usec) override;
109  
    std::size_t poll() override;
109  
    std::size_t poll() override;
110  
    std::size_t poll_one() override;
110  
    std::size_t poll_one() override;
111  

111  

112  
    /** Return the epoll file descriptor.
112  
    /** Return the epoll file descriptor.
113  

113  

114  
        Used by socket services to register file descriptors
114  
        Used by socket services to register file descriptors
115  
        for I/O event notification.
115  
        for I/O event notification.
116  

116  

117  
        @return The epoll file descriptor.
117  
        @return The epoll file descriptor.
118  
    */
118  
    */
119  
    int epoll_fd() const noexcept
119  
    int epoll_fd() const noexcept
120  
    {
120  
    {
121  
        return epoll_fd_;
121  
        return epoll_fd_;
122  
    }
122  
    }
123  

123  

124  
    /** Reset the thread's inline completion budget.
124  
    /** Reset the thread's inline completion budget.
125  

125  

126  
        Called at the start of each posted completion handler to
126  
        Called at the start of each posted completion handler to
127  
        grant a fresh budget for speculative inline completions.
127  
        grant a fresh budget for speculative inline completions.
128  
    */
128  
    */
129  
    void reset_inline_budget() const noexcept;
129  
    void reset_inline_budget() const noexcept;
130  

130  

131  
    /** Consume one unit of inline budget if available.
131  
    /** Consume one unit of inline budget if available.
132  

132  

133  
        @return True if budget was available and consumed.
133  
        @return True if budget was available and consumed.
134  
    */
134  
    */
135  
    bool try_consume_inline_budget() const noexcept;
135  
    bool try_consume_inline_budget() const noexcept;
136  

136  

137  
    /** Register a descriptor for persistent monitoring.
137  
    /** Register a descriptor for persistent monitoring.
138  

138  

139  
        The fd is registered once and stays registered until explicitly
139  
        The fd is registered once and stays registered until explicitly
140  
        deregistered. Events are dispatched via descriptor_state which
140  
        deregistered. Events are dispatched via descriptor_state which
141  
        tracks pending read/write/connect operations.
141  
        tracks pending read/write/connect operations.
142  

142  

143  
        @param fd The file descriptor to register.
143  
        @param fd The file descriptor to register.
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145  
    */
145  
    */
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
147  

147  

 
148 +
    /** Ensure EPOLLOUT is registered for a descriptor.
 
149 +

 
150 +
        No-op if the write interest is already registered. Called lazily
 
151 +
        before the first write or connect operation that needs async
 
152 +
        notification. Uses EPOLL_CTL_MOD to add EPOLLOUT to the
 
153 +
        existing registration.
 
154 +

 
155 +
        @param fd The file descriptor.
 
156 +
        @param desc The descriptor_state that owns the fd.
 
157 +
    */
 
158 +
    void ensure_write_events(int fd, descriptor_state* desc) const;
 
159 +

148  
    /** Deregister a persistently registered descriptor.
160  
    /** Deregister a persistently registered descriptor.
149  

161  

150  
        @param fd The file descriptor to deregister.
162  
        @param fd The file descriptor to deregister.
151  
    */
163  
    */
152  
    void deregister_descriptor(int fd) const;
164  
    void deregister_descriptor(int fd) const;
153  

165  

154  
    void work_started() noexcept override;
166  
    void work_started() noexcept override;
155  
    void work_finished() noexcept override;
167  
    void work_finished() noexcept override;
156  

168  

157  
    /** Offset a forthcoming work_finished from work_cleanup.
169  
    /** Offset a forthcoming work_finished from work_cleanup.
158  

170  

159  
        Called by descriptor_state when all I/O returned EAGAIN and no
171  
        Called by descriptor_state when all I/O returned EAGAIN and no
160  
        handler will be executed. Must be called from a scheduler thread.
172  
        handler will be executed. Must be called from a scheduler thread.
161  
    */
173  
    */
162  
    void compensating_work_started() const noexcept;
174  
    void compensating_work_started() const noexcept;
163  

175  

164  
    /** Drain work from thread context's private queue to global queue.
176  
    /** Drain work from thread context's private queue to global queue.
165  

177  

166  
        Called by thread_context_guard destructor when a thread exits run().
178  
        Called by thread_context_guard destructor when a thread exits run().
167  
        Transfers pending work to the global queue under mutex protection.
179  
        Transfers pending work to the global queue under mutex protection.
168  

180  

169  
        @param queue The private queue to drain.
181  
        @param queue The private queue to drain.
170  
        @param count Item count for wakeup decisions (wakes other threads if positive).
182  
        @param count Item count for wakeup decisions (wakes other threads if positive).
171  
    */
183  
    */
172  
    void drain_thread_queue(op_queue& queue, long count) const;
184  
    void drain_thread_queue(op_queue& queue, long count) const;
173  

185  

174  
    /** Post completed operations for deferred invocation.
186  
    /** Post completed operations for deferred invocation.
175  

187  

176  
        If called from a thread running this scheduler, operations go to
188  
        If called from a thread running this scheduler, operations go to
177  
        the thread's private queue (fast path). Otherwise, operations are
189  
        the thread's private queue (fast path). Otherwise, operations are
178  
        added to the global queue under mutex and a waiter is signaled.
190  
        added to the global queue under mutex and a waiter is signaled.
179  

191  

180  
        @par Preconditions
192  
        @par Preconditions
181  
        work_started() must have been called for each operation.
193  
        work_started() must have been called for each operation.
182  

194  

183  
        @param ops Queue of operations to post.
195  
        @param ops Queue of operations to post.
184  
    */
196  
    */
185  
    void post_deferred_completions(op_queue& ops) const;
197  
    void post_deferred_completions(op_queue& ops) const;
186  

198  

187  
private:
199  
private:
188  
    struct work_cleanup
200  
    struct work_cleanup
189  
    {
201  
    {
190  
        epoll_scheduler* scheduler;
202  
        epoll_scheduler* scheduler;
191  
        std::unique_lock<std::mutex>* lock;
203  
        std::unique_lock<std::mutex>* lock;
192  
        epoll::scheduler_context* ctx;
204  
        epoll::scheduler_context* ctx;
193  
        ~work_cleanup();
205  
        ~work_cleanup();
194  
    };
206  
    };
195  

207  

196  
    struct task_cleanup
208  
    struct task_cleanup
197  
    {
209  
    {
198  
        epoll_scheduler const* scheduler;
210  
        epoll_scheduler const* scheduler;
199  
        std::unique_lock<std::mutex>* lock;
211  
        std::unique_lock<std::mutex>* lock;
200  
        epoll::scheduler_context* ctx;
212  
        epoll::scheduler_context* ctx;
201  
        ~task_cleanup();
213  
        ~task_cleanup();
202  
    };
214  
    };
203  

215  

204  
    std::size_t do_one(
216  
    std::size_t do_one(
205  
        std::unique_lock<std::mutex>& lock,
217  
        std::unique_lock<std::mutex>& lock,
206  
        long timeout_us,
218  
        long timeout_us,
207  
        epoll::scheduler_context* ctx);
219  
        epoll::scheduler_context* ctx);
208  
    void
220  
    void
209  
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
221  
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
210  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
222  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
211  
    void interrupt_reactor() const;
223  
    void interrupt_reactor() const;
212  
    void update_timerfd() const;
224  
    void update_timerfd() const;
213  

225  

214  
    /** Set the signaled state and wake all waiting threads.
226  
    /** Set the signaled state and wake all waiting threads.
215  

227  

216  
        @par Preconditions
228  
        @par Preconditions
217  
        Mutex must be held.
229  
        Mutex must be held.
218  

230  

219  
        @param lock The held mutex lock.
231  
        @param lock The held mutex lock.
220  
    */
232  
    */
221  
    void signal_all(std::unique_lock<std::mutex>& lock) const;
233  
    void signal_all(std::unique_lock<std::mutex>& lock) const;
222  

234  

223  
    /** Set the signaled state and wake one waiter if any exist.
235  
    /** Set the signaled state and wake one waiter if any exist.
224  

236  

225  
        Only unlocks and signals if at least one thread is waiting.
237  
        Only unlocks and signals if at least one thread is waiting.
226  
        Use this when the caller needs to perform a fallback action
238  
        Use this when the caller needs to perform a fallback action
227  
        (such as interrupting the reactor) when no waiters exist.
239  
        (such as interrupting the reactor) when no waiters exist.
228  

240  

229  
        @par Preconditions
241  
        @par Preconditions
230  
        Mutex must be held.
242  
        Mutex must be held.
231  

243  

232  
        @param lock The held mutex lock.
244  
        @param lock The held mutex lock.
233  

245  

234  
        @return `true` if unlocked and signaled, `false` if lock still held.
246  
        @return `true` if unlocked and signaled, `false` if lock still held.
235  
    */
247  
    */
236  
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
248  
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
237  

249  

238  
    /** Set the signaled state, unlock, and wake one waiter if any exist.
250  
    /** Set the signaled state, unlock, and wake one waiter if any exist.
239  

251  

240  
        Always unlocks the mutex. Use this when the caller will release
252  
        Always unlocks the mutex. Use this when the caller will release
241  
        the lock regardless of whether a waiter exists.
253  
        the lock regardless of whether a waiter exists.
242  

254  

243  
        @par Preconditions
255  
        @par Preconditions
244  
        Mutex must be held.
256  
        Mutex must be held.
245  

257  

246  
        @param lock The held mutex lock.
258  
        @param lock The held mutex lock.
247  

259  

248  
        @return `true` if a waiter was signaled, `false` otherwise.
260  
        @return `true` if a waiter was signaled, `false` otherwise.
249  
    */
261  
    */
250  
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
262  
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
251  

263  

252  
    /** Clear the signaled state before waiting.
264  
    /** Clear the signaled state before waiting.
253  

265  

254  
        @par Preconditions
266  
        @par Preconditions
255  
        Mutex must be held.
267  
        Mutex must be held.
256  
    */
268  
    */
257  
    void clear_signal() const;
269  
    void clear_signal() const;
258  

270  

259  
    /** Block until the signaled state is set.
271  
    /** Block until the signaled state is set.
260  

272  

261  
        Returns immediately if already signaled (fast-path). Otherwise
273  
        Returns immediately if already signaled (fast-path). Otherwise
262  
        increments the waiter count, waits on the condition variable,
274  
        increments the waiter count, waits on the condition variable,
263  
        and decrements the waiter count upon waking.
275  
        and decrements the waiter count upon waking.
264  

276  

265  
        @par Preconditions
277  
        @par Preconditions
266  
        Mutex must be held.
278  
        Mutex must be held.
267  

279  

268  
        @param lock The held mutex lock.
280  
        @param lock The held mutex lock.
269  
    */
281  
    */
270  
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
282  
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
271  

283  

272  
    /** Block until signaled or timeout expires.
284  
    /** Block until signaled or timeout expires.
273  

285  

274  
        @par Preconditions
286  
        @par Preconditions
275  
        Mutex must be held.
287  
        Mutex must be held.
276  

288  

277  
        @param lock The held mutex lock.
289  
        @param lock The held mutex lock.
278  
        @param timeout_us Maximum time to wait in microseconds.
290  
        @param timeout_us Maximum time to wait in microseconds.
279  
    */
291  
    */
280  
    void wait_for_signal_for(
292  
    void wait_for_signal_for(
281  
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
293  
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
282  

294  

283  
    int epoll_fd_;
295  
    int epoll_fd_;
284  
    int event_fd_; // for interrupting reactor
296  
    int event_fd_; // for interrupting reactor
285  
    int timer_fd_; // timerfd for kernel-managed timer expiry
297  
    int timer_fd_; // timerfd for kernel-managed timer expiry
286  
    mutable std::mutex mutex_;
298  
    mutable std::mutex mutex_;
287  
    mutable std::condition_variable cond_;
299  
    mutable std::condition_variable cond_;
288  
    mutable op_queue completed_ops_;
300  
    mutable op_queue completed_ops_;
289  
    mutable std::atomic<long> outstanding_work_;
301  
    mutable std::atomic<long> outstanding_work_;
290  
    bool stopped_;
302  
    bool stopped_;
291  

303  

292  
    // True while a thread is blocked in epoll_wait. Used by
304  
    // True while a thread is blocked in epoll_wait. Used by
293  
    // wake_one_thread_and_unlock and work_finished to know when
305  
    // wake_one_thread_and_unlock and work_finished to know when
294  
    // an eventfd interrupt is needed instead of a condvar signal.
306  
    // an eventfd interrupt is needed instead of a condvar signal.
295  
    mutable std::atomic<bool> task_running_{false};
307  
    mutable std::atomic<bool> task_running_{false};
296  

308  

297  
    // True when the reactor has been told to do a non-blocking poll
309  
    // True when the reactor has been told to do a non-blocking poll
298  
    // (more handlers queued or poll mode). Prevents redundant eventfd
310  
    // (more handlers queued or poll mode). Prevents redundant eventfd
299  
    // writes and controls the epoll_wait timeout.
311  
    // writes and controls the epoll_wait timeout.
300  
    mutable bool task_interrupted_ = false;
312  
    mutable bool task_interrupted_ = false;
301  

313  

302  
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
314  
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
303  
    mutable std::size_t state_ = 0;
315  
    mutable std::size_t state_ = 0;
304  

316  

305  
    // Edge-triggered eventfd state
317  
    // Edge-triggered eventfd state
306  
    mutable std::atomic<bool> eventfd_armed_{false};
318  
    mutable std::atomic<bool> eventfd_armed_{false};
307  

319  

308  
    // Set when the earliest timer changes; flushed before epoll_wait
320  
    // Set when the earliest timer changes; flushed before epoll_wait
309  
    // blocks. Avoids timerfd_settime syscalls for timers that are
321  
    // blocks. Avoids timerfd_settime syscalls for timers that are
310  
    // scheduled then cancelled without being waited on.
322  
    // scheduled then cancelled without being waited on.
311  
    mutable std::atomic<bool> timerfd_stale_{false};
323  
    mutable std::atomic<bool> timerfd_stale_{false};
312  

324  

313  
    // Sentinel operation for interleaving reactor runs with handler execution.
325  
    // Sentinel operation for interleaving reactor runs with handler execution.
314  
    // Ensures the reactor runs periodically even when handlers are continuously
326  
    // Ensures the reactor runs periodically even when handlers are continuously
315  
    // posted, preventing starvation of I/O events, timers, and signals.
327  
    // posted, preventing starvation of I/O events, timers, and signals.
316  
    struct task_op final : scheduler_op
328  
    struct task_op final : scheduler_op
317  
    {
329  
    {
318  
        void operator()() override {}
330  
        void operator()() override {}
319  
        void destroy() override {}
331  
        void destroy() override {}
320  
    };
332  
    };
321  
    task_op task_op_;
333  
    task_op task_op_;
322  
};
334  
};
323  

335  

324  
//--------------------------------------------------------------------------
336  
//--------------------------------------------------------------------------
325  
//
337  
//
326  
// Implementation
338  
// Implementation
327  
//
339  
//
328  
//--------------------------------------------------------------------------
340  
//--------------------------------------------------------------------------
329  

341  

330  
/*
342  
/*
331  
    epoll Scheduler - Single Reactor Model
343  
    epoll Scheduler - Single Reactor Model
332  
    ======================================
344  
    ======================================
333  

345  

334  
    This scheduler uses a thread coordination strategy to provide handler
346  
    This scheduler uses a thread coordination strategy to provide handler
335  
    parallelism and avoid the thundering herd problem.
347  
    parallelism and avoid the thundering herd problem.
336  
    Instead of all threads blocking on epoll_wait(), one thread becomes the
348  
    Instead of all threads blocking on epoll_wait(), one thread becomes the
337  
    "reactor" while others wait on a condition variable for handler work.
349  
    "reactor" while others wait on a condition variable for handler work.
338  

350  

339  
    Thread Model
351  
    Thread Model
340  
    ------------
352  
    ------------
341  
    - ONE thread runs epoll_wait() at a time (the reactor thread)
353  
    - ONE thread runs epoll_wait() at a time (the reactor thread)
342  
    - OTHER threads wait on cond_ (condition variable) for handlers
354  
    - OTHER threads wait on cond_ (condition variable) for handlers
343  
    - When work is posted, exactly one waiting thread wakes via notify_one()
355  
    - When work is posted, exactly one waiting thread wakes via notify_one()
344  
    - This matches Windows IOCP semantics where N posted items wake N threads
356  
    - This matches Windows IOCP semantics where N posted items wake N threads
345  

357  

346  
    Event Loop Structure (do_one)
358  
    Event Loop Structure (do_one)
347  
    -----------------------------
359  
    -----------------------------
348  
    1. Lock mutex, try to pop handler from queue
360  
    1. Lock mutex, try to pop handler from queue
349  
    2. If got handler: execute it (unlocked), return
361  
    2. If got handler: execute it (unlocked), return
350  
    3. If queue empty and no reactor running: become reactor
362  
    3. If queue empty and no reactor running: become reactor
351  
       - Run epoll_wait (unlocked), queue I/O completions, loop back
363  
       - Run epoll_wait (unlocked), queue I/O completions, loop back
352  
    4. If queue empty and reactor running: wait on condvar for work
364  
    4. If queue empty and reactor running: wait on condvar for work
353  

365  

354  
    The task_running_ flag ensures only one thread owns epoll_wait().
366  
    The task_running_ flag ensures only one thread owns epoll_wait().
355  
    After the reactor queues I/O completions, it loops back to try getting
367  
    After the reactor queues I/O completions, it loops back to try getting
356  
    a handler, giving priority to handler execution over more I/O polling.
368  
    a handler, giving priority to handler execution over more I/O polling.
357  

369  

358  
    Signaling State (state_)
370  
    Signaling State (state_)
359  
    ------------------------
371  
    ------------------------
360  
    The state_ variable encodes two pieces of information:
372  
    The state_ variable encodes two pieces of information:
361  
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
373  
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
362  
    - Upper bits: waiter count (each waiter adds 2 before blocking)
374  
    - Upper bits: waiter count (each waiter adds 2 before blocking)
363  

375  

364  
    This allows efficient coordination:
376  
    This allows efficient coordination:
365  
    - Signalers only call notify when waiters exist (state_ > 1)
377  
    - Signalers only call notify when waiters exist (state_ > 1)
366  
    - Waiters check if already signaled before blocking (fast-path)
378  
    - Waiters check if already signaled before blocking (fast-path)
367  

379  

368  
    Wake Coordination (wake_one_thread_and_unlock)
380  
    Wake Coordination (wake_one_thread_and_unlock)
369  
    ----------------------------------------------
381  
    ----------------------------------------------
370  
    When posting work:
382  
    When posting work:
371  
    - If waiters exist (state_ > 1): signal and notify_one()
383  
    - If waiters exist (state_ > 1): signal and notify_one()
372  
    - Else if reactor running: interrupt via eventfd write
384  
    - Else if reactor running: interrupt via eventfd write
373  
    - Else: no-op (thread will find work when it checks queue)
385  
    - Else: no-op (thread will find work when it checks queue)
374  

386  

375  
    This avoids waking threads unnecessarily. With cascading wakes,
387  
    This avoids waking threads unnecessarily. With cascading wakes,
376  
    each handler execution wakes at most one additional thread if
388  
    each handler execution wakes at most one additional thread if
377  
    more work exists in the queue.
389  
    more work exists in the queue.
378  

390  

379  
    Work Counting
391  
    Work Counting
380  
    -------------
392  
    -------------
381  
    outstanding_work_ tracks pending operations. When it hits zero, run()
393  
    outstanding_work_ tracks pending operations. When it hits zero, run()
382  
    returns. Each operation increments on start, decrements on completion.
394  
    returns. Each operation increments on start, decrements on completion.
383  

395  

384  
    Timer Integration
396  
    Timer Integration
385  
    -----------------
397  
    -----------------
386  
    Timers are handled by timer_service. The reactor adjusts epoll_wait
398  
    Timers are handled by timer_service. The reactor adjusts epoll_wait
387  
    timeout to wake for the nearest timer expiry. When a new timer is
399  
    timeout to wake for the nearest timer expiry. When a new timer is
388  
    scheduled earlier than current, timer_service calls interrupt_reactor()
400  
    scheduled earlier than current, timer_service calls interrupt_reactor()
389  
    to re-evaluate the timeout.
401  
    to re-evaluate the timeout.
390  
*/
402  
*/
391  

403  

392  
namespace epoll {
404  
namespace epoll {
393  

405  

394  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
406  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
395  
{
407  
{
396  
    epoll_scheduler const* key;
408  
    epoll_scheduler const* key;
397  
    scheduler_context* next;
409  
    scheduler_context* next;
398  
    op_queue private_queue;
410  
    op_queue private_queue;
399  
    long private_outstanding_work;
411  
    long private_outstanding_work;
400  
    int inline_budget;
412  
    int inline_budget;
401  
    int inline_budget_max;
413  
    int inline_budget_max;
402  
    bool unassisted;
414  
    bool unassisted;
403  

415  

404  
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
416  
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
405  
        : key(k)
417  
        : key(k)
406  
        , next(n)
418  
        , next(n)
407  
        , private_outstanding_work(0)
419  
        , private_outstanding_work(0)
408  
        , inline_budget(0)
420  
        , inline_budget(0)
409  
        , inline_budget_max(2)
421  
        , inline_budget_max(2)
410  
        , unassisted(false)
422  
        , unassisted(false)
411  
    {
423  
    {
412  
    }
424  
    }
413  
};
425  
};
414  

426  

415  
inline thread_local_ptr<scheduler_context> context_stack;
427  
inline thread_local_ptr<scheduler_context> context_stack;
416  

428  

417  
struct thread_context_guard
429  
struct thread_context_guard
418  
{
430  
{
419  
    scheduler_context frame_;
431  
    scheduler_context frame_;
420  

432  

421  
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
433  
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
422  
        : frame_(ctx, context_stack.get())
434  
        : frame_(ctx, context_stack.get())
423  
    {
435  
    {
424  
        context_stack.set(&frame_);
436  
        context_stack.set(&frame_);
425  
    }
437  
    }
426  

438  

427  
    ~thread_context_guard() noexcept
439  
    ~thread_context_guard() noexcept
428  
    {
440  
    {
429  
        if (!frame_.private_queue.empty())
441  
        if (!frame_.private_queue.empty())
430  
            frame_.key->drain_thread_queue(
442  
            frame_.key->drain_thread_queue(
431  
                frame_.private_queue, frame_.private_outstanding_work);
443  
                frame_.private_queue, frame_.private_outstanding_work);
432  
        context_stack.set(frame_.next);
444  
        context_stack.set(frame_.next);
433  
    }
445  
    }
434  
};
446  
};
435  

447  

436  
inline scheduler_context*
448  
inline scheduler_context*
437  
find_context(epoll_scheduler const* self) noexcept
449  
find_context(epoll_scheduler const* self) noexcept
438  
{
450  
{
439  
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
451  
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
440  
        if (c->key == self)
452  
        if (c->key == self)
441  
            return c;
453  
            return c;
442  
    return nullptr;
454  
    return nullptr;
443  
}
455  
}
444  

456  

445  
} // namespace epoll
457  
} // namespace epoll
446  

458  

447  
inline void
459  
inline void
448  
epoll_scheduler::reset_inline_budget() const noexcept
460  
epoll_scheduler::reset_inline_budget() const noexcept
449  
{
461  
{
450  
    if (auto* ctx = epoll::find_context(this))
462  
    if (auto* ctx = epoll::find_context(this))
451  
    {
463  
    {
452  
        // Cap when no other thread absorbed queued work. A moderate
464  
        // Cap when no other thread absorbed queued work. A moderate
453  
        // cap (4) amortizes scheduling for small buffers while avoiding
465  
        // cap (4) amortizes scheduling for small buffers while avoiding
454  
        // bursty I/O that fills socket buffers and stalls large transfers.
466  
        // bursty I/O that fills socket buffers and stalls large transfers.
455  
        if (ctx->unassisted)
467  
        if (ctx->unassisted)
456  
        {
468  
        {
457  
            ctx->inline_budget_max = 4;
469  
            ctx->inline_budget_max = 4;
458  
            ctx->inline_budget     = 4;
470  
            ctx->inline_budget     = 4;
459  
            return;
471  
            return;
460  
        }
472  
        }
461  
        // Ramp up when previous cycle fully consumed budget.
473  
        // Ramp up when previous cycle fully consumed budget.
462  
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
474  
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
463  
        if (ctx->inline_budget == 0)
475  
        if (ctx->inline_budget == 0)
464  
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
476  
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
465  
        else if (ctx->inline_budget < ctx->inline_budget_max)
477  
        else if (ctx->inline_budget < ctx->inline_budget_max)
466  
            ctx->inline_budget_max = 2;
478  
            ctx->inline_budget_max = 2;
467  
        ctx->inline_budget = ctx->inline_budget_max;
479  
        ctx->inline_budget = ctx->inline_budget_max;
468  
    }
480  
    }
469  
}
481  
}
470  

482  

471  
inline bool
483  
inline bool
472  
epoll_scheduler::try_consume_inline_budget() const noexcept
484  
epoll_scheduler::try_consume_inline_budget() const noexcept
473  
{
485  
{
474  
    if (auto* ctx = epoll::find_context(this))
486  
    if (auto* ctx = epoll::find_context(this))
475  
    {
487  
    {
476  
        if (ctx->inline_budget > 0)
488  
        if (ctx->inline_budget > 0)
477  
        {
489  
        {
478  
            --ctx->inline_budget;
490  
            --ctx->inline_budget;
479  
            return true;
491  
            return true;
480  
        }
492  
        }
481  
    }
493  
    }
482  
    return false;
494  
    return false;
483  
}
495  
}
484  

496  

485  
inline void
497  
inline void
486  
descriptor_state::operator()()
498  
descriptor_state::operator()()
487  
{
499  
{
488  
    is_enqueued_.store(false, std::memory_order_relaxed);
500  
    is_enqueued_.store(false, std::memory_order_relaxed);
489  

501  

490  
    // Take ownership of impl ref set by close_socket() to prevent
502  
    // Take ownership of impl ref set by close_socket() to prevent
491  
    // the owning impl from being freed while we're executing
503  
    // the owning impl from being freed while we're executing
492  
    auto prevent_impl_destruction = std::move(impl_ref_);
504  
    auto prevent_impl_destruction = std::move(impl_ref_);
493  

505  

494  
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
506  
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
495  
    if (ev == 0)
507  
    if (ev == 0)
496  
    {
508  
    {
497  
        scheduler_->compensating_work_started();
509  
        scheduler_->compensating_work_started();
498  
        return;
510  
        return;
499  
    }
511  
    }
500  

512  

501  
    op_queue local_ops;
513  
    op_queue local_ops;
502  

514  

503  
    int err = 0;
515  
    int err = 0;
504  
    if (ev & EPOLLERR)
516  
    if (ev & EPOLLERR)
505  
    {
517  
    {
506  
        socklen_t len = sizeof(err);
518  
        socklen_t len = sizeof(err);
507  
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
519  
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
508  
            err = errno;
520  
            err = errno;
509  
        if (err == 0)
521  
        if (err == 0)
510  
            err = EIO;
522  
            err = EIO;
511  
    }
523  
    }
512  

524  

513  
    {
525  
    {
514  
        std::lock_guard lock(mutex);
526  
        std::lock_guard lock(mutex);
515  
        if (ev & EPOLLIN)
527  
        if (ev & EPOLLIN)
516  
        {
528  
        {
517  
            if (read_op)
529  
            if (read_op)
518  
            {
530  
            {
519  
                auto* rd = read_op;
531  
                auto* rd = read_op;
520  
                if (err)
532  
                if (err)
521  
                    rd->complete(err, 0);
533  
                    rd->complete(err, 0);
522  
                else
534  
                else
523  
                    rd->perform_io();
535  
                    rd->perform_io();
524  

536  

525  
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
537  
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
526  
                {
538  
                {
527  
                    rd->errn = 0;
539  
                    rd->errn = 0;
528  
                }
540  
                }
529  
                else
541  
                else
530  
                {
542  
                {
531  
                    read_op = nullptr;
543  
                    read_op = nullptr;
532  
                    local_ops.push(rd);
544  
                    local_ops.push(rd);
533  
                }
545  
                }
534  
            }
546  
            }
535  
            else
547  
            else
536  
            {
548  
            {
537  
                read_ready = true;
549  
                read_ready = true;
538  
            }
550  
            }
539  
        }
551  
        }
540  
        if (ev & EPOLLOUT)
552  
        if (ev & EPOLLOUT)
541  
        {
553  
        {
542  
            bool had_write_op = (connect_op || write_op);
554  
            bool had_write_op = (connect_op || write_op);
543  
            if (connect_op)
555  
            if (connect_op)
544  
            {
556  
            {
545  
                auto* cn = connect_op;
557  
                auto* cn = connect_op;
546  
                if (err)
558  
                if (err)
547  
                    cn->complete(err, 0);
559  
                    cn->complete(err, 0);
548  
                else
560  
                else
549  
                    cn->perform_io();
561  
                    cn->perform_io();
550 -
                connect_op = nullptr;
562 +

551 -
                local_ops.push(cn);
563 +
                if (cn->errn == EAGAIN || cn->errn == EWOULDBLOCK)
 
564 +
                {
 
565 +
                    cn->errn = 0;
 
566 +
                }
 
567 +
                else
 
568 +
                {
 
569 +
                    connect_op = nullptr;
 
570 +
                    local_ops.push(cn);
 
571 +
                }
552  
            }
572  
            }
553  
            if (write_op)
573  
            if (write_op)
554  
            {
574  
            {
555  
                auto* wr = write_op;
575  
                auto* wr = write_op;
556  
                if (err)
576  
                if (err)
557  
                    wr->complete(err, 0);
577  
                    wr->complete(err, 0);
558  
                else
578  
                else
559  
                    wr->perform_io();
579  
                    wr->perform_io();
560  

580  

561  
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
581  
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
562  
                {
582  
                {
563  
                    wr->errn = 0;
583  
                    wr->errn = 0;
564  
                }
584  
                }
565  
                else
585  
                else
566  
                {
586  
                {
567  
                    write_op = nullptr;
587  
                    write_op = nullptr;
568  
                    local_ops.push(wr);
588  
                    local_ops.push(wr);
569  
                }
589  
                }
570  
            }
590  
            }
571  
            if (!had_write_op)
591  
            if (!had_write_op)
572  
                write_ready = true;
592  
                write_ready = true;
573  
        }
593  
        }
574  
        if (err)
594  
        if (err)
575  
        {
595  
        {
576  
            if (read_op)
596  
            if (read_op)
577  
            {
597  
            {
578  
                read_op->complete(err, 0);
598  
                read_op->complete(err, 0);
579  
                local_ops.push(std::exchange(read_op, nullptr));
599  
                local_ops.push(std::exchange(read_op, nullptr));
580  
            }
600  
            }
581  
            if (write_op)
601  
            if (write_op)
582  
            {
602  
            {
583  
                write_op->complete(err, 0);
603  
                write_op->complete(err, 0);
584  
                local_ops.push(std::exchange(write_op, nullptr));
604  
                local_ops.push(std::exchange(write_op, nullptr));
585  
            }
605  
            }
586  
            if (connect_op)
606  
            if (connect_op)
587  
            {
607  
            {
588  
                connect_op->complete(err, 0);
608  
                connect_op->complete(err, 0);
589  
                local_ops.push(std::exchange(connect_op, nullptr));
609  
                local_ops.push(std::exchange(connect_op, nullptr));
590  
            }
610  
            }
591  
        }
611  
        }
592  
    }
612  
    }
593  

613  

594  
    // Execute first handler inline — the scheduler's work_cleanup
614  
    // Execute first handler inline — the scheduler's work_cleanup
595  
    // accounts for this as the "consumed" work item
615  
    // accounts for this as the "consumed" work item
596  
    scheduler_op* first = local_ops.pop();
616  
    scheduler_op* first = local_ops.pop();
597  
    if (first)
617  
    if (first)
598  
    {
618  
    {
599  
        scheduler_->post_deferred_completions(local_ops);
619  
        scheduler_->post_deferred_completions(local_ops);
600  
        (*first)();
620  
        (*first)();
601  
    }
621  
    }
602  
    else
622  
    else
603  
    {
623  
    {
604  
        scheduler_->compensating_work_started();
624  
        scheduler_->compensating_work_started();
605  
    }
625  
    }
606  
}
626  
}
607  

627  

608  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
628  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
609  
    : epoll_fd_(-1)
629  
    : epoll_fd_(-1)
610  
    , event_fd_(-1)
630  
    , event_fd_(-1)
611  
    , timer_fd_(-1)
631  
    , timer_fd_(-1)
612  
    , outstanding_work_(0)
632  
    , outstanding_work_(0)
613  
    , stopped_(false)
633  
    , stopped_(false)
614  
    , task_running_{false}
634  
    , task_running_{false}
615  
    , task_interrupted_(false)
635  
    , task_interrupted_(false)
616  
    , state_(0)
636  
    , state_(0)
617  
{
637  
{
618  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
638  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
619  
    if (epoll_fd_ < 0)
639  
    if (epoll_fd_ < 0)
620  
        detail::throw_system_error(make_err(errno), "epoll_create1");
640  
        detail::throw_system_error(make_err(errno), "epoll_create1");
621  

641  

622  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
642  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
623  
    if (event_fd_ < 0)
643  
    if (event_fd_ < 0)
624  
    {
644  
    {
625  
        int errn = errno;
645  
        int errn = errno;
626  
        ::close(epoll_fd_);
646  
        ::close(epoll_fd_);
627  
        detail::throw_system_error(make_err(errn), "eventfd");
647  
        detail::throw_system_error(make_err(errn), "eventfd");
628  
    }
648  
    }
629  

649  

630  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
650  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
631  
    if (timer_fd_ < 0)
651  
    if (timer_fd_ < 0)
632  
    {
652  
    {
633  
        int errn = errno;
653  
        int errn = errno;
634  
        ::close(event_fd_);
654  
        ::close(event_fd_);
635  
        ::close(epoll_fd_);
655  
        ::close(epoll_fd_);
636  
        detail::throw_system_error(make_err(errn), "timerfd_create");
656  
        detail::throw_system_error(make_err(errn), "timerfd_create");
637  
    }
657  
    }
638  

658  

639  
    epoll_event ev{};
659  
    epoll_event ev{};
640  
    ev.events   = EPOLLIN | EPOLLET;
660  
    ev.events   = EPOLLIN | EPOLLET;
641  
    ev.data.ptr = nullptr;
661  
    ev.data.ptr = nullptr;
642  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
662  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
643  
    {
663  
    {
644  
        int errn = errno;
664  
        int errn = errno;
645  
        ::close(timer_fd_);
665  
        ::close(timer_fd_);
646  
        ::close(event_fd_);
666  
        ::close(event_fd_);
647  
        ::close(epoll_fd_);
667  
        ::close(epoll_fd_);
648  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
668  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
649  
    }
669  
    }
650  

670  

651  
    epoll_event timer_ev{};
671  
    epoll_event timer_ev{};
652  
    timer_ev.events   = EPOLLIN | EPOLLERR;
672  
    timer_ev.events   = EPOLLIN | EPOLLERR;
653  
    timer_ev.data.ptr = &timer_fd_;
673  
    timer_ev.data.ptr = &timer_fd_;
654  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
674  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
655  
    {
675  
    {
656  
        int errn = errno;
676  
        int errn = errno;
657  
        ::close(timer_fd_);
677  
        ::close(timer_fd_);
658  
        ::close(event_fd_);
678  
        ::close(event_fd_);
659  
        ::close(epoll_fd_);
679  
        ::close(epoll_fd_);
660  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
680  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
661  
    }
681  
    }
662  

682  

663  
    timer_svc_ = &get_timer_service(ctx, *this);
683  
    timer_svc_ = &get_timer_service(ctx, *this);
664  
    timer_svc_->set_on_earliest_changed(
684  
    timer_svc_->set_on_earliest_changed(
665  
        timer_service::callback(this, [](void* p) {
685  
        timer_service::callback(this, [](void* p) {
666  
            auto* self = static_cast<epoll_scheduler*>(p);
686  
            auto* self = static_cast<epoll_scheduler*>(p);
667  
            self->timerfd_stale_.store(true, std::memory_order_release);
687  
            self->timerfd_stale_.store(true, std::memory_order_release);
668  
            if (self->task_running_.load(std::memory_order_acquire))
688  
            if (self->task_running_.load(std::memory_order_acquire))
669  
                self->interrupt_reactor();
689  
                self->interrupt_reactor();
670  
        }));
690  
        }));
671  

691  

672  
    // Initialize resolver service
692  
    // Initialize resolver service
673  
    get_resolver_service(ctx, *this);
693  
    get_resolver_service(ctx, *this);
674  

694  

675  
    // Initialize signal service
695  
    // Initialize signal service
676  
    get_signal_service(ctx, *this);
696  
    get_signal_service(ctx, *this);
677  

697  

678  
    // Push task sentinel to interleave reactor runs with handler execution
698  
    // Push task sentinel to interleave reactor runs with handler execution
679  
    completed_ops_.push(&task_op_);
699  
    completed_ops_.push(&task_op_);
680  
}
700  
}
681  

701  

682  
inline epoll_scheduler::~epoll_scheduler()
702  
inline epoll_scheduler::~epoll_scheduler()
683  
{
703  
{
684  
    if (timer_fd_ >= 0)
704  
    if (timer_fd_ >= 0)
685  
        ::close(timer_fd_);
705  
        ::close(timer_fd_);
686  
    if (event_fd_ >= 0)
706  
    if (event_fd_ >= 0)
687  
        ::close(event_fd_);
707  
        ::close(event_fd_);
688  
    if (epoll_fd_ >= 0)
708  
    if (epoll_fd_ >= 0)
689  
        ::close(epoll_fd_);
709  
        ::close(epoll_fd_);
690  
}
710  
}
691  

711  

692  
inline void
712  
inline void
693  
epoll_scheduler::shutdown()
713  
epoll_scheduler::shutdown()
694  
{
714  
{
695  
    {
715  
    {
696  
        std::unique_lock lock(mutex_);
716  
        std::unique_lock lock(mutex_);
697  

717  

698  
        while (auto* h = completed_ops_.pop())
718  
        while (auto* h = completed_ops_.pop())
699  
        {
719  
        {
700  
            if (h == &task_op_)
720  
            if (h == &task_op_)
701  
                continue;
721  
                continue;
702  
            lock.unlock();
722  
            lock.unlock();
703  
            h->destroy();
723  
            h->destroy();
704  
            lock.lock();
724  
            lock.lock();
705  
        }
725  
        }
706  

726  

707  
        signal_all(lock);
727  
        signal_all(lock);
708  
    }
728  
    }
709  

729  

710  
    if (event_fd_ >= 0)
730  
    if (event_fd_ >= 0)
711  
        interrupt_reactor();
731  
        interrupt_reactor();
712  
}
732  
}
713  

733  

714  
inline void
734  
inline void
715  
epoll_scheduler::post(std::coroutine_handle<> h) const
735  
epoll_scheduler::post(std::coroutine_handle<> h) const
716  
{
736  
{
717  
    struct post_handler final : scheduler_op
737  
    struct post_handler final : scheduler_op
718  
    {
738  
    {
719  
        std::coroutine_handle<> h_;
739  
        std::coroutine_handle<> h_;
720  

740  

721  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
741  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
722  

742  

723  
        ~post_handler() override = default;
743  
        ~post_handler() override = default;
724  

744  

725  
        void operator()() override
745  
        void operator()() override
726  
        {
746  
        {
727  
            auto h = h_;
747  
            auto h = h_;
728  
            delete this;
748  
            delete this;
729  
            h.resume();
749  
            h.resume();
730  
        }
750  
        }
731  

751  

732  
        void destroy() override
752  
        void destroy() override
733  
        {
753  
        {
734  
            auto h = h_;
754  
            auto h = h_;
735  
            delete this;
755  
            delete this;
736  
            h.destroy();
756  
            h.destroy();
737  
        }
757  
        }
738  
    };
758  
    };
739  

759  

740  
    auto ph = std::make_unique<post_handler>(h);
760  
    auto ph = std::make_unique<post_handler>(h);
741  

761  

742  
    // Fast path: same thread posts to private queue
762  
    // Fast path: same thread posts to private queue
743  
    // Only count locally; work_cleanup batches to global counter
763  
    // Only count locally; work_cleanup batches to global counter
744  
    if (auto* ctx = epoll::find_context(this))
764  
    if (auto* ctx = epoll::find_context(this))
745  
    {
765  
    {
746  
        ++ctx->private_outstanding_work;
766  
        ++ctx->private_outstanding_work;
747  
        ctx->private_queue.push(ph.release());
767  
        ctx->private_queue.push(ph.release());
748  
        return;
768  
        return;
749  
    }
769  
    }
750  

770  

751  
    // Slow path: cross-thread post requires mutex
771  
    // Slow path: cross-thread post requires mutex
752  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
772  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
753  

773  

754  
    std::unique_lock lock(mutex_);
774  
    std::unique_lock lock(mutex_);
755  
    completed_ops_.push(ph.release());
775  
    completed_ops_.push(ph.release());
756  
    wake_one_thread_and_unlock(lock);
776  
    wake_one_thread_and_unlock(lock);
757  
}
777  
}
758  

778  

759  
inline void
779  
inline void
760  
epoll_scheduler::post(scheduler_op* h) const
780  
epoll_scheduler::post(scheduler_op* h) const
761  
{
781  
{
762  
    // Fast path: same thread posts to private queue
782  
    // Fast path: same thread posts to private queue
763  
    // Only count locally; work_cleanup batches to global counter
783  
    // Only count locally; work_cleanup batches to global counter
764  
    if (auto* ctx = epoll::find_context(this))
784  
    if (auto* ctx = epoll::find_context(this))
765  
    {
785  
    {
766  
        ++ctx->private_outstanding_work;
786  
        ++ctx->private_outstanding_work;
767  
        ctx->private_queue.push(h);
787  
        ctx->private_queue.push(h);
768  
        return;
788  
        return;
769  
    }
789  
    }
770  

790  

771  
    // Slow path: cross-thread post requires mutex
791  
    // Slow path: cross-thread post requires mutex
772  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
792  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
773  

793  

774  
    std::unique_lock lock(mutex_);
794  
    std::unique_lock lock(mutex_);
775  
    completed_ops_.push(h);
795  
    completed_ops_.push(h);
776  
    wake_one_thread_and_unlock(lock);
796  
    wake_one_thread_and_unlock(lock);
777  
}
797  
}
778  

798  

779  
inline bool
799  
inline bool
780  
epoll_scheduler::running_in_this_thread() const noexcept
800  
epoll_scheduler::running_in_this_thread() const noexcept
781  
{
801  
{
782  
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
802  
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
783  
        if (c->key == this)
803  
        if (c->key == this)
784  
            return true;
804  
            return true;
785  
    return false;
805  
    return false;
786  
}
806  
}
787  

807  

788  
inline void
808  
inline void
789  
epoll_scheduler::stop()
809  
epoll_scheduler::stop()
790  
{
810  
{
791  
    std::unique_lock lock(mutex_);
811  
    std::unique_lock lock(mutex_);
792  
    if (!stopped_)
812  
    if (!stopped_)
793  
    {
813  
    {
794  
        stopped_ = true;
814  
        stopped_ = true;
795  
        signal_all(lock);
815  
        signal_all(lock);
796  
        interrupt_reactor();
816  
        interrupt_reactor();
797  
    }
817  
    }
798  
}
818  
}
799  

819  

800  
inline bool
820  
inline bool
801  
epoll_scheduler::stopped() const noexcept
821  
epoll_scheduler::stopped() const noexcept
802  
{
822  
{
803  
    std::unique_lock lock(mutex_);
823  
    std::unique_lock lock(mutex_);
804  
    return stopped_;
824  
    return stopped_;
805  
}
825  
}
806  

826  

807  
inline void
827  
inline void
808  
epoll_scheduler::restart()
828  
epoll_scheduler::restart()
809  
{
829  
{
810  
    std::unique_lock lock(mutex_);
830  
    std::unique_lock lock(mutex_);
811  
    stopped_ = false;
831  
    stopped_ = false;
812  
}
832  
}
813  

833  

814  
inline std::size_t
834  
inline std::size_t
815  
epoll_scheduler::run()
835  
epoll_scheduler::run()
816  
{
836  
{
817  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
837  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
818  
    {
838  
    {
819  
        stop();
839  
        stop();
820  
        return 0;
840  
        return 0;
821  
    }
841  
    }
822  

842  

823  
    epoll::thread_context_guard ctx(this);
843  
    epoll::thread_context_guard ctx(this);
824  
    std::unique_lock lock(mutex_);
844  
    std::unique_lock lock(mutex_);
825  

845  

826  
    std::size_t n = 0;
846  
    std::size_t n = 0;
827  
    for (;;)
847  
    for (;;)
828  
    {
848  
    {
829  
        if (!do_one(lock, -1, &ctx.frame_))
849  
        if (!do_one(lock, -1, &ctx.frame_))
830  
            break;
850  
            break;
831  
        if (n != (std::numeric_limits<std::size_t>::max)())
851  
        if (n != (std::numeric_limits<std::size_t>::max)())
832  
            ++n;
852  
            ++n;
833  
        if (!lock.owns_lock())
853  
        if (!lock.owns_lock())
834  
            lock.lock();
854  
            lock.lock();
835  
    }
855  
    }
836  
    return n;
856  
    return n;
837  
}
857  
}
838  

858  

839  
inline std::size_t
859  
inline std::size_t
840  
epoll_scheduler::run_one()
860  
epoll_scheduler::run_one()
841  
{
861  
{
842  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
862  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
843  
    {
863  
    {
844  
        stop();
864  
        stop();
845  
        return 0;
865  
        return 0;
846  
    }
866  
    }
847  

867  

848  
    epoll::thread_context_guard ctx(this);
868  
    epoll::thread_context_guard ctx(this);
849  
    std::unique_lock lock(mutex_);
869  
    std::unique_lock lock(mutex_);
850  
    return do_one(lock, -1, &ctx.frame_);
870  
    return do_one(lock, -1, &ctx.frame_);
851  
}
871  
}
852  

872  

853  
inline std::size_t
873  
inline std::size_t
854  
epoll_scheduler::wait_one(long usec)
874  
epoll_scheduler::wait_one(long usec)
855  
{
875  
{
856  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
876  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
857  
    {
877  
    {
858  
        stop();
878  
        stop();
859  
        return 0;
879  
        return 0;
860  
    }
880  
    }
861  

881  

862  
    epoll::thread_context_guard ctx(this);
882  
    epoll::thread_context_guard ctx(this);
863  
    std::unique_lock lock(mutex_);
883  
    std::unique_lock lock(mutex_);
864  
    return do_one(lock, usec, &ctx.frame_);
884  
    return do_one(lock, usec, &ctx.frame_);
865  
}
885  
}
866  

886  

867  
inline std::size_t
887  
inline std::size_t
868  
epoll_scheduler::poll()
888  
epoll_scheduler::poll()
869  
{
889  
{
870  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
890  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
871  
    {
891  
    {
872  
        stop();
892  
        stop();
873  
        return 0;
893  
        return 0;
874  
    }
894  
    }
875  

895  

876  
    epoll::thread_context_guard ctx(this);
896  
    epoll::thread_context_guard ctx(this);
877  
    std::unique_lock lock(mutex_);
897  
    std::unique_lock lock(mutex_);
878  

898  

879  
    std::size_t n = 0;
899  
    std::size_t n = 0;
880  
    for (;;)
900  
    for (;;)
881  
    {
901  
    {
882  
        if (!do_one(lock, 0, &ctx.frame_))
902  
        if (!do_one(lock, 0, &ctx.frame_))
883  
            break;
903  
            break;
884  
        if (n != (std::numeric_limits<std::size_t>::max)())
904  
        if (n != (std::numeric_limits<std::size_t>::max)())
885  
            ++n;
905  
            ++n;
886  
        if (!lock.owns_lock())
906  
        if (!lock.owns_lock())
887  
            lock.lock();
907  
            lock.lock();
888  
    }
908  
    }
889  
    return n;
909  
    return n;
890  
}
910  
}
891  

911  

892  
inline std::size_t
912  
inline std::size_t
893  
epoll_scheduler::poll_one()
913  
epoll_scheduler::poll_one()
894  
{
914  
{
895  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
915  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
896  
    {
916  
    {
897  
        stop();
917  
        stop();
898  
        return 0;
918  
        return 0;
899  
    }
919  
    }
900  

920  

901  
    epoll::thread_context_guard ctx(this);
921  
    epoll::thread_context_guard ctx(this);
902  
    std::unique_lock lock(mutex_);
922  
    std::unique_lock lock(mutex_);
903  
    return do_one(lock, 0, &ctx.frame_);
923  
    return do_one(lock, 0, &ctx.frame_);
904  
}
924  
}
905  

925  

906  
inline void
926  
inline void
907  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
927  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
908  
{
928  
{
 
929 +
    // Only register read interest upfront. EPOLLOUT is deferred until
 
930 +
    // the first write/connect op to avoid a spurious write-ready event
 
931 +
    // on freshly opened (always-writable) sockets.
909  
    epoll_event ev{};
932  
    epoll_event ev{};
910 -
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
933 +
    ev.events   = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP;
911  
    ev.data.ptr = desc;
934  
    ev.data.ptr = desc;
912  

935  

913  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
936  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
914  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
937  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
915  

938  

916  
    desc->registered_events = ev.events;
939  
    desc->registered_events = ev.events;
917  
    desc->fd                = fd;
940  
    desc->fd                = fd;
918  
    desc->scheduler_        = this;
941  
    desc->scheduler_        = this;
919  

942  

920  
    std::lock_guard lock(desc->mutex);
943  
    std::lock_guard lock(desc->mutex);
921  
    desc->read_ready  = false;
944  
    desc->read_ready  = false;
922  
    desc->write_ready = false;
945  
    desc->write_ready = false;
 
946 +
}
 
947 +

 
948 +
inline void
 
949 +
epoll_scheduler::ensure_write_events(int fd, descriptor_state* desc) const
 
950 +
{
 
951 +
    std::lock_guard lock(desc->mutex);
 
952 +
    if (desc->registered_events & EPOLLOUT)
 
953 +
        return;
 
954 +

 
955 +
    epoll_event ev{};
 
956 +
    ev.events   = desc->registered_events | EPOLLOUT;
 
957 +
    ev.data.ptr = desc;
 
958 +
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev) == 0)
 
959 +
        desc->registered_events = ev.events;
923  
}
960  
}
924  

961  

925  
inline void
962  
inline void
926  
epoll_scheduler::deregister_descriptor(int fd) const
963  
epoll_scheduler::deregister_descriptor(int fd) const
927  
{
964  
{
928  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
965  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
929  
}
966  
}
930  

967  

931  
inline void
968  
inline void
932  
epoll_scheduler::work_started() noexcept
969  
epoll_scheduler::work_started() noexcept
933  
{
970  
{
934  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
971  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
935  
}
972  
}
936  

973  

937  
inline void
974  
inline void
938  
epoll_scheduler::work_finished() noexcept
975  
epoll_scheduler::work_finished() noexcept
939  
{
976  
{
940  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
977  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
941  
        stop();
978  
        stop();
942  
}
979  
}
943  

980  

944  
inline void
981  
inline void
945  
epoll_scheduler::compensating_work_started() const noexcept
982  
epoll_scheduler::compensating_work_started() const noexcept
946  
{
983  
{
947  
    auto* ctx = epoll::find_context(this);
984  
    auto* ctx = epoll::find_context(this);
948  
    if (ctx)
985  
    if (ctx)
949  
        ++ctx->private_outstanding_work;
986  
        ++ctx->private_outstanding_work;
950  
}
987  
}
951  

988  

952  
inline void
989  
inline void
953  
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
990  
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
954  
{
991  
{
955  
    // Note: outstanding_work_ was already incremented when posting
992  
    // Note: outstanding_work_ was already incremented when posting
956  
    std::unique_lock lock(mutex_);
993  
    std::unique_lock lock(mutex_);
957  
    completed_ops_.splice(queue);
994  
    completed_ops_.splice(queue);
958  
    if (count > 0)
995  
    if (count > 0)
959  
        maybe_unlock_and_signal_one(lock);
996  
        maybe_unlock_and_signal_one(lock);
960  
}
997  
}
961  

998  

962  
inline void
999  
inline void
963  
epoll_scheduler::post_deferred_completions(op_queue& ops) const
1000  
epoll_scheduler::post_deferred_completions(op_queue& ops) const
964  
{
1001  
{
965  
    if (ops.empty())
1002  
    if (ops.empty())
966  
        return;
1003  
        return;
967  

1004  

968  
    // Fast path: if on scheduler thread, use private queue
1005  
    // Fast path: if on scheduler thread, use private queue
969  
    if (auto* ctx = epoll::find_context(this))
1006  
    if (auto* ctx = epoll::find_context(this))
970  
    {
1007  
    {
971  
        ctx->private_queue.splice(ops);
1008  
        ctx->private_queue.splice(ops);
972  
        return;
1009  
        return;
973  
    }
1010  
    }
974  

1011  

975  
    // Slow path: add to global queue and wake a thread
1012  
    // Slow path: add to global queue and wake a thread
976  
    std::unique_lock lock(mutex_);
1013  
    std::unique_lock lock(mutex_);
977  
    completed_ops_.splice(ops);
1014  
    completed_ops_.splice(ops);
978  
    wake_one_thread_and_unlock(lock);
1015  
    wake_one_thread_and_unlock(lock);
979  
}
1016  
}
980  

1017  

981  
inline void
1018  
inline void
982  
epoll_scheduler::interrupt_reactor() const
1019  
epoll_scheduler::interrupt_reactor() const
983  
{
1020  
{
984  
    // Only write if not already armed to avoid redundant writes
1021  
    // Only write if not already armed to avoid redundant writes
985  
    bool expected = false;
1022  
    bool expected = false;
986  
    if (eventfd_armed_.compare_exchange_strong(
1023  
    if (eventfd_armed_.compare_exchange_strong(
987  
            expected, true, std::memory_order_release,
1024  
            expected, true, std::memory_order_release,
988  
            std::memory_order_relaxed))
1025  
            std::memory_order_relaxed))
989  
    {
1026  
    {
990  
        std::uint64_t val       = 1;
1027  
        std::uint64_t val       = 1;
991  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
1028  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
992  
    }
1029  
    }
993  
}
1030  
}
994  

1031  

995  
inline void
1032  
inline void
996  
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
1033  
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
997  
{
1034  
{
998  
    state_ |= 1;
1035  
    state_ |= 1;
999  
    cond_.notify_all();
1036  
    cond_.notify_all();
1000  
}
1037  
}
1001  

1038  

1002  
inline bool
1039  
inline bool
1003  
epoll_scheduler::maybe_unlock_and_signal_one(
1040  
epoll_scheduler::maybe_unlock_and_signal_one(
1004  
    std::unique_lock<std::mutex>& lock) const
1041  
    std::unique_lock<std::mutex>& lock) const
1005  
{
1042  
{
1006  
    state_ |= 1;
1043  
    state_ |= 1;
1007  
    if (state_ > 1)
1044  
    if (state_ > 1)
1008  
    {
1045  
    {
1009  
        lock.unlock();
1046  
        lock.unlock();
1010  
        cond_.notify_one();
1047  
        cond_.notify_one();
1011  
        return true;
1048  
        return true;
1012  
    }
1049  
    }
1013  
    return false;
1050  
    return false;
1014  
}
1051  
}
1015  

1052  

1016  
inline bool
1053  
inline bool
1017  
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1054  
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1018  
{
1055  
{
1019  
    state_ |= 1;
1056  
    state_ |= 1;
1020  
    bool have_waiters = state_ > 1;
1057  
    bool have_waiters = state_ > 1;
1021  
    lock.unlock();
1058  
    lock.unlock();
1022  
    if (have_waiters)
1059  
    if (have_waiters)
1023  
        cond_.notify_one();
1060  
        cond_.notify_one();
1024  
    return have_waiters;
1061  
    return have_waiters;
1025  
}
1062  
}
1026  

1063  

1027  
inline void
1064  
inline void
1028  
epoll_scheduler::clear_signal() const
1065  
epoll_scheduler::clear_signal() const
1029  
{
1066  
{
1030  
    state_ &= ~std::size_t(1);
1067  
    state_ &= ~std::size_t(1);
1031  
}
1068  
}
1032  

1069  

1033  
inline void
1070  
inline void
1034  
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1071  
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1035  
{
1072  
{
1036  
    while ((state_ & 1) == 0)
1073  
    while ((state_ & 1) == 0)
1037  
    {
1074  
    {
1038  
        state_ += 2;
1075  
        state_ += 2;
1039  
        cond_.wait(lock);
1076  
        cond_.wait(lock);
1040  
        state_ -= 2;
1077  
        state_ -= 2;
1041  
    }
1078  
    }
1042  
}
1079  
}
1043  

1080  

1044  
inline void
1081  
inline void
1045  
epoll_scheduler::wait_for_signal_for(
1082  
epoll_scheduler::wait_for_signal_for(
1046  
    std::unique_lock<std::mutex>& lock, long timeout_us) const
1083  
    std::unique_lock<std::mutex>& lock, long timeout_us) const
1047  
{
1084  
{
1048  
    if ((state_ & 1) == 0)
1085  
    if ((state_ & 1) == 0)
1049  
    {
1086  
    {
1050  
        state_ += 2;
1087  
        state_ += 2;
1051  
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1088  
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1052  
        state_ -= 2;
1089  
        state_ -= 2;
1053  
    }
1090  
    }
1054  
}
1091  
}
1055  

1092  

1056  
inline void
1093  
inline void
1057  
epoll_scheduler::wake_one_thread_and_unlock(
1094  
epoll_scheduler::wake_one_thread_and_unlock(
1058  
    std::unique_lock<std::mutex>& lock) const
1095  
    std::unique_lock<std::mutex>& lock) const
1059  
{
1096  
{
1060  
    if (maybe_unlock_and_signal_one(lock))
1097  
    if (maybe_unlock_and_signal_one(lock))
1061  
        return;
1098  
        return;
1062  

1099  

1063  
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1100  
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1064  
    {
1101  
    {
1065  
        task_interrupted_ = true;
1102  
        task_interrupted_ = true;
1066  
        lock.unlock();
1103  
        lock.unlock();
1067  
        interrupt_reactor();
1104  
        interrupt_reactor();
1068  
    }
1105  
    }
1069  
    else
1106  
    else
1070  
    {
1107  
    {
1071  
        lock.unlock();
1108  
        lock.unlock();
1072  
    }
1109  
    }
1073  
}
1110  
}
1074  

1111  

1075  
inline epoll_scheduler::work_cleanup::~work_cleanup()
1112  
inline epoll_scheduler::work_cleanup::~work_cleanup()
1076  
{
1113  
{
1077  
    if (ctx)
1114  
    if (ctx)
1078  
    {
1115  
    {
1079  
        long produced = ctx->private_outstanding_work;
1116  
        long produced = ctx->private_outstanding_work;
1080  
        if (produced > 1)
1117  
        if (produced > 1)
1081  
            scheduler->outstanding_work_.fetch_add(
1118  
            scheduler->outstanding_work_.fetch_add(
1082  
                produced - 1, std::memory_order_relaxed);
1119  
                produced - 1, std::memory_order_relaxed);
1083  
        else if (produced < 1)
1120  
        else if (produced < 1)
1084  
            scheduler->work_finished();
1121  
            scheduler->work_finished();
1085  
        ctx->private_outstanding_work = 0;
1122  
        ctx->private_outstanding_work = 0;
1086  

1123  

1087  
        if (!ctx->private_queue.empty())
1124  
        if (!ctx->private_queue.empty())
1088  
        {
1125  
        {
1089  
            lock->lock();
1126  
            lock->lock();
1090  
            scheduler->completed_ops_.splice(ctx->private_queue);
1127  
            scheduler->completed_ops_.splice(ctx->private_queue);
1091  
        }
1128  
        }
1092  
    }
1129  
    }
1093  
    else
1130  
    else
1094  
    {
1131  
    {
1095  
        scheduler->work_finished();
1132  
        scheduler->work_finished();
1096  
    }
1133  
    }
1097  
}
1134  
}
1098  

1135  

1099  
inline epoll_scheduler::task_cleanup::~task_cleanup()
1136  
inline epoll_scheduler::task_cleanup::~task_cleanup()
1100  
{
1137  
{
1101  
    if (!ctx)
1138  
    if (!ctx)
1102  
        return;
1139  
        return;
1103  

1140  

1104  
    if (ctx->private_outstanding_work > 0)
1141  
    if (ctx->private_outstanding_work > 0)
1105  
    {
1142  
    {
1106  
        scheduler->outstanding_work_.fetch_add(
1143  
        scheduler->outstanding_work_.fetch_add(
1107  
            ctx->private_outstanding_work, std::memory_order_relaxed);
1144  
            ctx->private_outstanding_work, std::memory_order_relaxed);
1108  
        ctx->private_outstanding_work = 0;
1145  
        ctx->private_outstanding_work = 0;
1109  
    }
1146  
    }
1110  

1147  

1111  
    if (!ctx->private_queue.empty())
1148  
    if (!ctx->private_queue.empty())
1112  
    {
1149  
    {
1113  
        if (!lock->owns_lock())
1150  
        if (!lock->owns_lock())
1114  
            lock->lock();
1151  
            lock->lock();
1115  
        scheduler->completed_ops_.splice(ctx->private_queue);
1152  
        scheduler->completed_ops_.splice(ctx->private_queue);
1116  
    }
1153  
    }
1117  
}
1154  
}
1118  

1155  

1119  
inline void
1156  
inline void
1120  
epoll_scheduler::update_timerfd() const
1157  
epoll_scheduler::update_timerfd() const
1121  
{
1158  
{
1122  
    auto nearest = timer_svc_->nearest_expiry();
1159  
    auto nearest = timer_svc_->nearest_expiry();
1123  

1160  

1124  
    itimerspec ts{};
1161  
    itimerspec ts{};
1125  
    int flags = 0;
1162  
    int flags = 0;
1126  

1163  

1127  
    if (nearest == timer_service::time_point::max())
1164  
    if (nearest == timer_service::time_point::max())
1128  
    {
1165  
    {
1129  
        // No timers - disarm by setting to 0 (relative)
1166  
        // No timers - disarm by setting to 0 (relative)
1130  
    }
1167  
    }
1131  
    else
1168  
    else
1132  
    {
1169  
    {
1133  
        auto now = std::chrono::steady_clock::now();
1170  
        auto now = std::chrono::steady_clock::now();
1134  
        if (nearest <= now)
1171  
        if (nearest <= now)
1135  
        {
1172  
        {
1136  
            // Use 1ns instead of 0 - zero disarms the timerfd
1173  
            // Use 1ns instead of 0 - zero disarms the timerfd
1137  
            ts.it_value.tv_nsec = 1;
1174  
            ts.it_value.tv_nsec = 1;
1138  
        }
1175  
        }
1139  
        else
1176  
        else
1140  
        {
1177  
        {
1141  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1178  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1142  
                            nearest - now)
1179  
                            nearest - now)
1143  
                            .count();
1180  
                            .count();
1144  
            ts.it_value.tv_sec  = nsec / 1000000000;
1181  
            ts.it_value.tv_sec  = nsec / 1000000000;
1145  
            ts.it_value.tv_nsec = nsec % 1000000000;
1182  
            ts.it_value.tv_nsec = nsec % 1000000000;
1146  
            // Ensure non-zero to avoid disarming if duration rounds to 0
1183  
            // Ensure non-zero to avoid disarming if duration rounds to 0
1147  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1184  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1148  
                ts.it_value.tv_nsec = 1;
1185  
                ts.it_value.tv_nsec = 1;
1149  
        }
1186  
        }
1150  
    }
1187  
    }
1151  

1188  

1152  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1189  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1153  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1190  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1154  
}
1191  
}
1155  

1192  

1156  
inline void
1193  
inline void
1157  
epoll_scheduler::run_task(
1194  
epoll_scheduler::run_task(
1158  
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1195  
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1159  
{
1196  
{
1160  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1197  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1161  

1198  

1162  
    if (lock.owns_lock())
1199  
    if (lock.owns_lock())
1163  
        lock.unlock();
1200  
        lock.unlock();
1164  

1201  

1165  
    task_cleanup on_exit{this, &lock, ctx};
1202  
    task_cleanup on_exit{this, &lock, ctx};
1166  

1203  

1167  
    // Flush deferred timerfd programming before blocking
1204  
    // Flush deferred timerfd programming before blocking
1168  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1205  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1169  
        update_timerfd();
1206  
        update_timerfd();
1170  

1207  

1171  
    // Event loop runs without mutex held
1208  
    // Event loop runs without mutex held
1172  
    epoll_event events[128];
1209  
    epoll_event events[128];
1173  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1210  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1174  

1211  

1175  
    if (nfds < 0 && errno != EINTR)
1212  
    if (nfds < 0 && errno != EINTR)
1176  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1213  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1177  

1214  

1178  
    bool check_timers = false;
1215  
    bool check_timers = false;
1179  
    op_queue local_ops;
1216  
    op_queue local_ops;
1180  

1217  

1181  
    // Process events without holding the mutex
1218  
    // Process events without holding the mutex
1182  
    for (int i = 0; i < nfds; ++i)
1219  
    for (int i = 0; i < nfds; ++i)
1183  
    {
1220  
    {
1184  
        if (events[i].data.ptr == nullptr)
1221  
        if (events[i].data.ptr == nullptr)
1185  
        {
1222  
        {
1186  
            std::uint64_t val;
1223  
            std::uint64_t val;
1187  
            // Mutex released above; analyzer can't track unlock via ref
1224  
            // Mutex released above; analyzer can't track unlock via ref
1188  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1225  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1189  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1226  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1190  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1227  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1191  
            continue;
1228  
            continue;
1192  
        }
1229  
        }
1193  

1230  

1194  
        if (events[i].data.ptr == &timer_fd_)
1231  
        if (events[i].data.ptr == &timer_fd_)
1195  
        {
1232  
        {
1196  
            std::uint64_t expirations;
1233  
            std::uint64_t expirations;
1197  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1234  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1198  
            [[maybe_unused]] auto r =
1235  
            [[maybe_unused]] auto r =
1199  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1236  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1200  
            check_timers = true;
1237  
            check_timers = true;
1201  
            continue;
1238  
            continue;
1202  
        }
1239  
        }
1203  

1240  

1204  
        // Deferred I/O: just set ready events and enqueue descriptor
1241  
        // Deferred I/O: just set ready events and enqueue descriptor
1205  
        // No per-descriptor mutex locking in reactor hot path!
1242  
        // No per-descriptor mutex locking in reactor hot path!
1206  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1243  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1207  
        desc->add_ready_events(events[i].events);
1244  
        desc->add_ready_events(events[i].events);
1208  

1245  

1209  
        // Only enqueue if not already enqueued
1246  
        // Only enqueue if not already enqueued
1210  
        bool expected = false;
1247  
        bool expected = false;
1211  
        if (desc->is_enqueued_.compare_exchange_strong(
1248  
        if (desc->is_enqueued_.compare_exchange_strong(
1212  
                expected, true, std::memory_order_release,
1249  
                expected, true, std::memory_order_release,
1213  
                std::memory_order_relaxed))
1250  
                std::memory_order_relaxed))
1214  
        {
1251  
        {
1215  
            local_ops.push(desc);
1252  
            local_ops.push(desc);
1216  
        }
1253  
        }
1217  
    }
1254  
    }
1218  

1255  

1219  
    // Process timers only when timerfd fires
1256  
    // Process timers only when timerfd fires
1220  
    if (check_timers)
1257  
    if (check_timers)
1221  
    {
1258  
    {
1222  
        timer_svc_->process_expired();
1259  
        timer_svc_->process_expired();
1223  
        update_timerfd();
1260  
        update_timerfd();
1224  
    }
1261  
    }
1225  

1262  

1226  
    lock.lock();
1263  
    lock.lock();
1227  

1264  

1228  
    if (!local_ops.empty())
1265  
    if (!local_ops.empty())
1229  
        completed_ops_.splice(local_ops);
1266  
        completed_ops_.splice(local_ops);
1230  
}
1267  
}
1231  

1268  

1232  
inline std::size_t
1269  
inline std::size_t
1233  
epoll_scheduler::do_one(
1270  
epoll_scheduler::do_one(
1234  
    std::unique_lock<std::mutex>& lock,
1271  
    std::unique_lock<std::mutex>& lock,
1235  
    long timeout_us,
1272  
    long timeout_us,
1236  
    epoll::scheduler_context* ctx)
1273  
    epoll::scheduler_context* ctx)
1237  
{
1274  
{
1238  
    for (;;)
1275  
    for (;;)
1239  
    {
1276  
    {
1240  
        if (stopped_)
1277  
        if (stopped_)
1241  
            return 0;
1278  
            return 0;
1242  

1279  

1243  
        scheduler_op* op = completed_ops_.pop();
1280  
        scheduler_op* op = completed_ops_.pop();
1244  

1281  

1245  
        // Handle reactor sentinel - time to poll for I/O
1282  
        // Handle reactor sentinel - time to poll for I/O
1246  
        if (op == &task_op_)
1283  
        if (op == &task_op_)
1247  
        {
1284  
        {
1248  
            bool more_handlers = !completed_ops_.empty();
1285  
            bool more_handlers = !completed_ops_.empty();
1249  

1286  

1250  
            // Nothing to run the reactor for: no pending work to wait on,
1287  
            // Nothing to run the reactor for: no pending work to wait on,
1251  
            // or caller requested a non-blocking poll
1288  
            // or caller requested a non-blocking poll
1252  
            if (!more_handlers &&
1289  
            if (!more_handlers &&
1253  
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1290  
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1254  
                 timeout_us == 0))
1291  
                 timeout_us == 0))
1255  
            {
1292  
            {
1256  
                completed_ops_.push(&task_op_);
1293  
                completed_ops_.push(&task_op_);
1257  
                return 0;
1294  
                return 0;
1258  
            }
1295  
            }
1259  

1296  

1260  
            task_interrupted_ = more_handlers || timeout_us == 0;
1297  
            task_interrupted_ = more_handlers || timeout_us == 0;
1261  
            task_running_.store(true, std::memory_order_release);
1298  
            task_running_.store(true, std::memory_order_release);
1262  

1299  

1263  
            if (more_handlers)
1300  
            if (more_handlers)
1264  
                unlock_and_signal_one(lock);
1301  
                unlock_and_signal_one(lock);
1265  

1302  

1266  
            run_task(lock, ctx);
1303  
            run_task(lock, ctx);
1267  

1304  

1268  
            task_running_.store(false, std::memory_order_relaxed);
1305  
            task_running_.store(false, std::memory_order_relaxed);
1269  
            completed_ops_.push(&task_op_);
1306  
            completed_ops_.push(&task_op_);
1270  
            continue;
1307  
            continue;
1271  
        }
1308  
        }
1272  

1309  

1273  
        // Handle operation
1310  
        // Handle operation
1274  
        if (op != nullptr)
1311  
        if (op != nullptr)
1275  
        {
1312  
        {
1276  
            bool more = !completed_ops_.empty();
1313  
            bool more = !completed_ops_.empty();
1277  

1314  

1278  
            if (more)
1315  
            if (more)
1279  
                ctx->unassisted = !unlock_and_signal_one(lock);
1316  
                ctx->unassisted = !unlock_and_signal_one(lock);
1280  
            else
1317  
            else
1281  
            {
1318  
            {
1282  
                ctx->unassisted = false;
1319  
                ctx->unassisted = false;
1283  
                lock.unlock();
1320  
                lock.unlock();
1284  
            }
1321  
            }
1285  

1322  

1286  
            work_cleanup on_exit{this, &lock, ctx};
1323  
            work_cleanup on_exit{this, &lock, ctx};
1287  

1324  

1288  
            (*op)();
1325  
            (*op)();
1289  
            return 1;
1326  
            return 1;
1290  
        }
1327  
        }
1291  

1328  

1292  
        // No pending work to wait on, or caller requested non-blocking poll
1329  
        // No pending work to wait on, or caller requested non-blocking poll
1293  
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1330  
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1294  
            timeout_us == 0)
1331  
            timeout_us == 0)
1295  
            return 0;
1332  
            return 0;
1296  

1333  

1297  
        clear_signal();
1334  
        clear_signal();
1298  
        if (timeout_us < 0)
1335  
        if (timeout_us < 0)
1299  
            wait_for_signal(lock);
1336  
            wait_for_signal(lock);
1300  
        else
1337  
        else
1301  
            wait_for_signal_for(lock, timeout_us);
1338  
            wait_for_signal_for(lock, timeout_us);
1302  
    }
1339  
    }
1303  
}
1340  
}
1304  

1341  

1305  
} // namespace boost::corosio::detail
1342  
} // namespace boost::corosio::detail
1306  

1343  

1307  
#endif // BOOST_COROSIO_HAS_EPOLL
1344  
#endif // BOOST_COROSIO_HAS_EPOLL
1308  

1345  

1309  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
1346  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP