include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

80.6% Lines (407/505) 87.8% List of functions (43/49)
f(x) Functions (49)
Function Calls Lines Branches Blocks
boost::corosio::detail::epoll_scheduler::task_op::operator()() :330 0 0.0% boost::corosio::detail::epoll_scheduler::task_op::destroy() :331 0 0.0% boost::corosio::detail::epoll::scheduler_context::scheduler_context(boost::corosio::detail::epoll_scheduler const*, boost::corosio::detail::epoll::scheduler_context*) :416 0 100.0% boost::corosio::detail::epoll::thread_context_guard::thread_context_guard(boost::corosio::detail::epoll_scheduler const*) :433 0 100.0% boost::corosio::detail::epoll::thread_context_guard::~thread_context_guard() :439 0 66.7% boost::corosio::detail::epoll::find_context(boost::corosio::detail::epoll_scheduler const*) :449 0 100.0% boost::corosio::detail::epoll_scheduler::reset_inline_budget() const :460 0 54.5% boost::corosio::detail::epoll_scheduler::try_consume_inline_budget() const :484 0 100.0% boost::corosio::detail::descriptor_state::operator()() :498 0 68.2% boost::corosio::detail::epoll_scheduler::epoll_scheduler(boost::capy::execution_context&, int) :628 0 58.1% boost::corosio::detail::epoll_scheduler::epoll_scheduler(boost::capy::execution_context&, int)::{lambda(void*)#1}::operator()(void*) const :685 0 90.0% boost::corosio::detail::epoll_scheduler::~epoll_scheduler() :702 0 100.0% boost::corosio::detail::epoll_scheduler::shutdown() :713 0 100.0% boost::corosio::detail::epoll_scheduler::post(std::__n4861::coroutine_handle<void>) const :735 0 100.0% boost::corosio::detail::epoll_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :741 0 100.0% boost::corosio::detail::epoll_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :743 0 100.0% boost::corosio::detail::epoll_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :745 0 100.0% boost::corosio::detail::epoll_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :752 0 100.0% boost::corosio::detail::epoll_scheduler::post(boost::corosio::detail::scheduler_op*) const :780 0 100.0% boost::corosio::detail::epoll_scheduler::running_in_this_thread() const :800 0 100.0% boost::corosio::detail::epoll_scheduler::stop() :809 0 100.0% boost::corosio::detail::epoll_scheduler::stopped() const :821 0 100.0% boost::corosio::detail::epoll_scheduler::restart() :828 0 100.0% boost::corosio::detail::epoll_scheduler::run() :835 0 100.0% boost::corosio::detail::epoll_scheduler::run_one() :860 0 75.0% boost::corosio::detail::epoll_scheduler::wait_one(long) :874 0 100.0% boost::corosio::detail::epoll_scheduler::poll() :888 0 100.0% boost::corosio::detail::epoll_scheduler::poll_one() :913 0 100.0% boost::corosio::detail::epoll_scheduler::register_descriptor(int, boost::corosio::detail::descriptor_state*) const :927 0 92.3% boost::corosio::detail::epoll_scheduler::ensure_write_events(int, boost::corosio::detail::descriptor_state*) const :949 0 90.0% boost::corosio::detail::epoll_scheduler::deregister_descriptor(int) const :963 0 100.0% boost::corosio::detail::epoll_scheduler::work_started() :969 0 100.0% boost::corosio::detail::epoll_scheduler::work_finished() :975 0 100.0% boost::corosio::detail::epoll_scheduler::compensating_work_started() const :982 0 100.0% boost::corosio::detail::epoll_scheduler::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :990 0 0.0% boost::corosio::detail::epoll_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :1000 0 30.0% boost::corosio::detail::epoll_scheduler::interrupt_reactor() const :1019 0 100.0% boost::corosio::detail::epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const :1033 0 100.0% boost::corosio::detail::epoll_scheduler::maybe_unlock_and_signal_one(std::unique_lock<std::mutex>&) const :1040 0 57.1% boost::corosio::detail::epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>&) const :1054 0 85.7% boost::corosio::detail::epoll_scheduler::clear_signal() const :1065 0 0.0% boost::corosio::detail::epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>&) const :1071 0 0.0% boost::corosio::detail::epoll_scheduler::wait_for_signal_for(std::unique_lock<std::mutex>&, long) const :1082 0 0.0% boost::corosio::detail::epoll_scheduler::wake_one_thread_and_unlock(std::unique_lock<std::mutex>&) const :1094 0 87.5% boost::corosio::detail::epoll_scheduler::work_cleanup::~work_cleanup() :1112 0 92.3% boost::corosio::detail::epoll_scheduler::task_cleanup::~task_cleanup() :1136 0 83.3% boost::corosio::detail::epoll_scheduler::update_timerfd() const :1157 0 88.9% boost::corosio::detail::epoll_scheduler::run_task(std::unique_lock<std::mutex>&, boost::corosio::detail::epoll::scheduler_context*) :1194 0 97.1% boost::corosio::detail::epoll_scheduler::do_one(std::unique_lock<std::mutex>&, long, boost::corosio::detail::epoll::scheduler_context*) :1270 0 83.3%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/native_scheduler.hpp>
21 #include <boost/corosio/detail/scheduler_op.hpp>
22
23 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28
29 #include <boost/corosio/detail/except.hpp>
30 #include <boost/corosio/detail/thread_local_ptr.hpp>
31
32 #include <atomic>
33 #include <chrono>
34 #include <condition_variable>
35 #include <cstddef>
36 #include <cstdint>
37 #include <limits>
38 #include <mutex>
39 #include <utility>
40
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <sys/epoll.h>
44 #include <sys/eventfd.h>
45 #include <sys/socket.h>
46 #include <sys/timerfd.h>
47 #include <unistd.h>
48
49 namespace boost::corosio::detail {
50
51 struct epoll_op;
52 struct descriptor_state;
53 namespace epoll {
54 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55 } // namespace epoll
56
57 /** Linux scheduler using epoll for I/O multiplexing.
58
59 This scheduler implements the scheduler interface using Linux epoll
60 for efficient I/O event notification. It uses a single reactor model
61 where one thread runs epoll_wait while other threads
62 wait on a condition variable for handler work. This design provides:
63
64 - Handler parallelism: N posted handlers can execute on N threads
65 - No thundering herd: condition_variable wakes exactly one thread
66 - IOCP parity: Behavior matches Windows I/O completion port semantics
67
68 When threads call run(), they first try to execute queued handlers.
69 If the queue is empty and no reactor is running, one thread becomes
70 the reactor and runs epoll_wait. Other threads wait on a condition
71 variable until handlers are available.
72
73 @par Thread Safety
74 All public member functions are thread-safe.
75 */
76 class BOOST_COROSIO_DECL epoll_scheduler final
77 : public native_scheduler
78 , public capy::execution_context::service
79 {
80 public:
81 using key_type = scheduler;
82
83 /** Construct the scheduler.
84
85 Creates an epoll instance, eventfd for reactor interruption,
86 and timerfd for kernel-managed timer expiry.
87
88 @param ctx Reference to the owning execution_context.
89 @param concurrency_hint Hint for expected thread count (unused).
90 */
91 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92
93 /// Destroy the scheduler.
94 ~epoll_scheduler() override;
95
96 epoll_scheduler(epoll_scheduler const&) = delete;
97 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98
99 void shutdown() override;
100 void post(std::coroutine_handle<> h) const override;
101 void post(scheduler_op* h) const override;
102 bool running_in_this_thread() const noexcept override;
103 void stop() override;
104 bool stopped() const noexcept override;
105 void restart() override;
106 std::size_t run() override;
107 std::size_t run_one() override;
108 std::size_t wait_one(long usec) override;
109 std::size_t poll() override;
110 std::size_t poll_one() override;
111
112 /** Return the epoll file descriptor.
113
114 Used by socket services to register file descriptors
115 for I/O event notification.
116
117 @return The epoll file descriptor.
118 */
119 int epoll_fd() const noexcept
120 {
121 return epoll_fd_;
122 }
123
124 /** Reset the thread's inline completion budget.
125
126 Called at the start of each posted completion handler to
127 grant a fresh budget for speculative inline completions.
128 */
129 void reset_inline_budget() const noexcept;
130
131 /** Consume one unit of inline budget if available.
132
133 @return True if budget was available and consumed.
134 */
135 bool try_consume_inline_budget() const noexcept;
136
137 /** Register a descriptor for persistent monitoring.
138
139 The fd is registered once and stays registered until explicitly
140 deregistered. Events are dispatched via descriptor_state which
141 tracks pending read/write/connect operations.
142
143 @param fd The file descriptor to register.
144 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145 */
146 void register_descriptor(int fd, descriptor_state* desc) const;
147
148 /** Ensure EPOLLOUT is registered for a descriptor.
149
150 No-op if the write interest is already registered. Called lazily
151 before the first write or connect operation that needs async
152 notification. Uses EPOLL_CTL_MOD to add EPOLLOUT to the
153 existing registration.
154
155 @param fd The file descriptor.
156 @param desc The descriptor_state that owns the fd.
157 */
158 void ensure_write_events(int fd, descriptor_state* desc) const;
159
160 /** Deregister a persistently registered descriptor.
161
162 @param fd The file descriptor to deregister.
163 */
164 void deregister_descriptor(int fd) const;
165
166 void work_started() noexcept override;
167 void work_finished() noexcept override;
168
169 /** Offset a forthcoming work_finished from work_cleanup.
170
171 Called by descriptor_state when all I/O returned EAGAIN and no
172 handler will be executed. Must be called from a scheduler thread.
173 */
174 void compensating_work_started() const noexcept;
175
176 /** Drain work from thread context's private queue to global queue.
177
178 Called by thread_context_guard destructor when a thread exits run().
179 Transfers pending work to the global queue under mutex protection.
180
181 @param queue The private queue to drain.
182 @param count Item count for wakeup decisions (wakes other threads if positive).
183 */
184 void drain_thread_queue(op_queue& queue, long count) const;
185
186 /** Post completed operations for deferred invocation.
187
188 If called from a thread running this scheduler, operations go to
189 the thread's private queue (fast path). Otherwise, operations are
190 added to the global queue under mutex and a waiter is signaled.
191
192 @par Preconditions
193 work_started() must have been called for each operation.
194
195 @param ops Queue of operations to post.
196 */
197 void post_deferred_completions(op_queue& ops) const;
198
199 private:
200 struct work_cleanup
201 {
202 epoll_scheduler* scheduler;
203 std::unique_lock<std::mutex>* lock;
204 epoll::scheduler_context* ctx;
205 ~work_cleanup();
206 };
207
208 struct task_cleanup
209 {
210 epoll_scheduler const* scheduler;
211 std::unique_lock<std::mutex>* lock;
212 epoll::scheduler_context* ctx;
213 ~task_cleanup();
214 };
215
216 std::size_t do_one(
217 std::unique_lock<std::mutex>& lock,
218 long timeout_us,
219 epoll::scheduler_context* ctx);
220 void
221 run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
222 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
223 void interrupt_reactor() const;
224 void update_timerfd() const;
225
226 /** Set the signaled state and wake all waiting threads.
227
228 @par Preconditions
229 Mutex must be held.
230
231 @param lock The held mutex lock.
232 */
233 void signal_all(std::unique_lock<std::mutex>& lock) const;
234
235 /** Set the signaled state and wake one waiter if any exist.
236
237 Only unlocks and signals if at least one thread is waiting.
238 Use this when the caller needs to perform a fallback action
239 (such as interrupting the reactor) when no waiters exist.
240
241 @par Preconditions
242 Mutex must be held.
243
244 @param lock The held mutex lock.
245
246 @return `true` if unlocked and signaled, `false` if lock still held.
247 */
248 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
249
250 /** Set the signaled state, unlock, and wake one waiter if any exist.
251
252 Always unlocks the mutex. Use this when the caller will release
253 the lock regardless of whether a waiter exists.
254
255 @par Preconditions
256 Mutex must be held.
257
258 @param lock The held mutex lock.
259
260 @return `true` if a waiter was signaled, `false` otherwise.
261 */
262 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
263
264 /** Clear the signaled state before waiting.
265
266 @par Preconditions
267 Mutex must be held.
268 */
269 void clear_signal() const;
270
271 /** Block until the signaled state is set.
272
273 Returns immediately if already signaled (fast-path). Otherwise
274 increments the waiter count, waits on the condition variable,
275 and decrements the waiter count upon waking.
276
277 @par Preconditions
278 Mutex must be held.
279
280 @param lock The held mutex lock.
281 */
282 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
283
284 /** Block until signaled or timeout expires.
285
286 @par Preconditions
287 Mutex must be held.
288
289 @param lock The held mutex lock.
290 @param timeout_us Maximum time to wait in microseconds.
291 */
292 void wait_for_signal_for(
293 std::unique_lock<std::mutex>& lock, long timeout_us) const;
294
295 int epoll_fd_;
296 int event_fd_; // for interrupting reactor
297 int timer_fd_; // timerfd for kernel-managed timer expiry
298 mutable std::mutex mutex_;
299 mutable std::condition_variable cond_;
300 mutable op_queue completed_ops_;
301 mutable std::atomic<long> outstanding_work_;
302 bool stopped_;
303
304 // True while a thread is blocked in epoll_wait. Used by
305 // wake_one_thread_and_unlock and work_finished to know when
306 // an eventfd interrupt is needed instead of a condvar signal.
307 mutable std::atomic<bool> task_running_{false};
308
309 // True when the reactor has been told to do a non-blocking poll
310 // (more handlers queued or poll mode). Prevents redundant eventfd
311 // writes and controls the epoll_wait timeout.
312 mutable bool task_interrupted_ = false;
313
314 // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
315 mutable std::size_t state_ = 0;
316
317 // Edge-triggered eventfd state
318 mutable std::atomic<bool> eventfd_armed_{false};
319
320 // Set when the earliest timer changes; flushed before epoll_wait
321 // blocks. Avoids timerfd_settime syscalls for timers that are
322 // scheduled then cancelled without being waited on.
323 mutable std::atomic<bool> timerfd_stale_{false};
324
325 // Sentinel operation for interleaving reactor runs with handler execution.
326 // Ensures the reactor runs periodically even when handlers are continuously
327 // posted, preventing starvation of I/O events, timers, and signals.
328 struct task_op final : scheduler_op
329 {
330 void operator()() override {}
331 void destroy() override {}
332 };
333 task_op task_op_;
334 };
335
336 //--------------------------------------------------------------------------
337 //
338 // Implementation
339 //
340 //--------------------------------------------------------------------------
341
342 /*
343 epoll Scheduler - Single Reactor Model
344 ======================================
345
346 This scheduler uses a thread coordination strategy to provide handler
347 parallelism and avoid the thundering herd problem.
348 Instead of all threads blocking on epoll_wait(), one thread becomes the
349 "reactor" while others wait on a condition variable for handler work.
350
351 Thread Model
352 ------------
353 - ONE thread runs epoll_wait() at a time (the reactor thread)
354 - OTHER threads wait on cond_ (condition variable) for handlers
355 - When work is posted, exactly one waiting thread wakes via notify_one()
356 - This matches Windows IOCP semantics where N posted items wake N threads
357
358 Event Loop Structure (do_one)
359 -----------------------------
360 1. Lock mutex, try to pop handler from queue
361 2. If got handler: execute it (unlocked), return
362 3. If queue empty and no reactor running: become reactor
363 - Run epoll_wait (unlocked), queue I/O completions, loop back
364 4. If queue empty and reactor running: wait on condvar for work
365
366 The task_running_ flag ensures only one thread owns epoll_wait().
367 After the reactor queues I/O completions, it loops back to try getting
368 a handler, giving priority to handler execution over more I/O polling.
369
370 Signaling State (state_)
371 ------------------------
372 The state_ variable encodes two pieces of information:
373 - Bit 0: signaled flag (1 = signaled, persists until cleared)
374 - Upper bits: waiter count (each waiter adds 2 before blocking)
375
376 This allows efficient coordination:
377 - Signalers only call notify when waiters exist (state_ > 1)
378 - Waiters check if already signaled before blocking (fast-path)
379
380 Wake Coordination (wake_one_thread_and_unlock)
381 ----------------------------------------------
382 When posting work:
383 - If waiters exist (state_ > 1): signal and notify_one()
384 - Else if reactor running: interrupt via eventfd write
385 - Else: no-op (thread will find work when it checks queue)
386
387 This avoids waking threads unnecessarily. With cascading wakes,
388 each handler execution wakes at most one additional thread if
389 more work exists in the queue.
390
391 Work Counting
392 -------------
393 outstanding_work_ tracks pending operations. When it hits zero, run()
394 returns. Each operation increments on start, decrements on completion.
395
396 Timer Integration
397 -----------------
398 Timers are handled by timer_service. The reactor adjusts epoll_wait
399 timeout to wake for the nearest timer expiry. When a new timer is
400 scheduled earlier than current, timer_service calls interrupt_reactor()
401 to re-evaluate the timeout.
402 */
403
404 namespace epoll {
405
406 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
407 {
408 epoll_scheduler const* key;
409 scheduler_context* next;
410 op_queue private_queue;
411 long private_outstanding_work;
412 int inline_budget;
413 int inline_budget_max;
414 bool unassisted;
415
416 217x scheduler_context(epoll_scheduler const* k, scheduler_context* n)
417 217x : key(k)
418 217x , next(n)
419 217x , private_outstanding_work(0)
420 217x , inline_budget(0)
421 217x , inline_budget_max(2)
422 217x , unassisted(false)
423 {
424 217x }
425 };
426
427 inline thread_local_ptr<scheduler_context> context_stack;
428
429 struct thread_context_guard
430 {
431 scheduler_context frame_;
432
433 217x explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
434 217x : frame_(ctx, context_stack.get())
435 {
436 217x context_stack.set(&frame_);
437 217x }
438
439 217x ~thread_context_guard() noexcept
440 {
441 217x if (!frame_.private_queue.empty())
442 frame_.key->drain_thread_queue(
443 frame_.private_queue, frame_.private_outstanding_work);
444 217x context_stack.set(frame_.next);
445 217x }
446 };
447
448 inline scheduler_context*
449 411440x find_context(epoll_scheduler const* self) noexcept
450 {
451 411440x for (auto* c = context_stack.get(); c != nullptr; c = c->next)
452 409705x if (c->key == self)
453 409705x return c;
454 1735x return nullptr;
455 }
456
457 } // namespace epoll
458
459 inline void
460 59286x epoll_scheduler::reset_inline_budget() const noexcept
461 {
462 59286x if (auto* ctx = epoll::find_context(this))
463 {
464 // Cap when no other thread absorbed queued work. A moderate
465 // cap (4) amortizes scheduling for small buffers while avoiding
466 // bursty I/O that fills socket buffers and stalls large transfers.
467 59286x if (ctx->unassisted)
468 {
469 59286x ctx->inline_budget_max = 4;
470 59286x ctx->inline_budget = 4;
471 59286x return;
472 }
473 // Ramp up when previous cycle fully consumed budget.
474 // Reset on partial consumption (EAGAIN hit or peer got scheduled).
475 if (ctx->inline_budget == 0)
476 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
477 else if (ctx->inline_budget < ctx->inline_budget_max)
478 ctx->inline_budget_max = 2;
479 ctx->inline_budget = ctx->inline_budget_max;
480 }
481 }
482
483 inline bool
484 254803x epoll_scheduler::try_consume_inline_budget() const noexcept
485 {
486 254803x if (auto* ctx = epoll::find_context(this))
487 {
488 254803x if (ctx->inline_budget > 0)
489 {
490 203913x --ctx->inline_budget;
491 203913x return true;
492 }
493 }
494 50890x return false;
495 }
496
497 inline void
498 43871x descriptor_state::operator()()
499 {
500 43871x is_enqueued_.store(false, std::memory_order_relaxed);
501
502 // Take ownership of impl ref set by close_socket() to prevent
503 // the owning impl from being freed while we're executing
504 43871x auto prevent_impl_destruction = std::move(impl_ref_);
505
506 43871x std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
507 43871x if (ev == 0)
508 {
509 scheduler_->compensating_work_started();
510 return;
511 }
512
513 43871x op_queue local_ops;
514
515 43871x int err = 0;
516 43871x if (ev & EPOLLERR)
517 {
518 2x socklen_t len = sizeof(err);
519 2x if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
520 err = errno;
521 2x if (err == 0)
522 err = EIO;
523 }
524
525 {
526 43871x std::lock_guard lock(mutex);
527 43871x if (ev & EPOLLIN)
528 {
529 14220x if (read_op)
530 {
531 4095x auto* rd = read_op;
532 4095x if (err)
533 rd->complete(err, 0);
534 else
535 4095x rd->perform_io();
536
537 4095x if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
538 {
539 rd->errn = 0;
540 }
541 else
542 {
543 4095x read_op = nullptr;
544 4095x local_ops.push(rd);
545 }
546 }
547 else
548 {
549 10125x read_ready = true;
550 }
551 }
552 43871x if (ev & EPOLLOUT)
553 {
554 39728x bool had_write_op = (connect_op || write_op);
555 39728x if (connect_op)
556 {
557 4095x auto* cn = connect_op;
558 4095x if (err)
559 2x cn->complete(err, 0);
560 else
561 4093x cn->perform_io();
562
563 4095x if (cn->errn == EAGAIN || cn->errn == EWOULDBLOCK)
564 {
565 cn->errn = 0;
566 }
567 else
568 {
569 4095x connect_op = nullptr;
570 4095x local_ops.push(cn);
571 }
572 }
573 39728x if (write_op)
574 {
575 auto* wr = write_op;
576 if (err)
577 wr->complete(err, 0);
578 else
579 wr->perform_io();
580
581 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
582 {
583 wr->errn = 0;
584 }
585 else
586 {
587 write_op = nullptr;
588 local_ops.push(wr);
589 }
590 }
591 39728x if (!had_write_op)
592 35633x write_ready = true;
593 }
594 43871x if (err)
595 {
596 2x if (read_op)
597 {
598 read_op->complete(err, 0);
599 local_ops.push(std::exchange(read_op, nullptr));
600 }
601 2x if (write_op)
602 {
603 write_op->complete(err, 0);
604 local_ops.push(std::exchange(write_op, nullptr));
605 }
606 2x if (connect_op)
607 {
608 connect_op->complete(err, 0);
609 local_ops.push(std::exchange(connect_op, nullptr));
610 }
611 }
612 43871x }
613
614 // Execute first handler inline — the scheduler's work_cleanup
615 // accounts for this as the "consumed" work item
616 43871x scheduler_op* first = local_ops.pop();
617 43871x if (first)
618 {
619 8190x scheduler_->post_deferred_completions(local_ops);
620 8190x (*first)();
621 }
622 else
623 {
624 35681x scheduler_->compensating_work_started();
625 }
626 43871x }
627
628 244x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
629 244x : epoll_fd_(-1)
630 244x , event_fd_(-1)
631 244x , timer_fd_(-1)
632 244x , outstanding_work_(0)
633 244x , stopped_(false)
634 244x , task_running_{false}
635 244x , task_interrupted_(false)
636 488x , state_(0)
637 {
638 244x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
639 244x if (epoll_fd_ < 0)
640 detail::throw_system_error(make_err(errno), "epoll_create1");
641
642 244x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
643 244x if (event_fd_ < 0)
644 {
645 int errn = errno;
646 ::close(epoll_fd_);
647 detail::throw_system_error(make_err(errn), "eventfd");
648 }
649
650 244x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
651 244x if (timer_fd_ < 0)
652 {
653 int errn = errno;
654 ::close(event_fd_);
655 ::close(epoll_fd_);
656 detail::throw_system_error(make_err(errn), "timerfd_create");
657 }
658
659 244x epoll_event ev{};
660 244x ev.events = EPOLLIN | EPOLLET;
661 244x ev.data.ptr = nullptr;
662 244x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
663 {
664 int errn = errno;
665 ::close(timer_fd_);
666 ::close(event_fd_);
667 ::close(epoll_fd_);
668 detail::throw_system_error(make_err(errn), "epoll_ctl");
669 }
670
671 244x epoll_event timer_ev{};
672 244x timer_ev.events = EPOLLIN | EPOLLERR;
673 244x timer_ev.data.ptr = &timer_fd_;
674 244x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
675 {
676 int errn = errno;
677 ::close(timer_fd_);
678 ::close(event_fd_);
679 ::close(epoll_fd_);
680 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
681 }
682
683 244x timer_svc_ = &get_timer_service(ctx, *this);
684 244x timer_svc_->set_on_earliest_changed(
685 4557x timer_service::callback(this, [](void* p) {
686 4313x auto* self = static_cast<epoll_scheduler*>(p);
687 4313x self->timerfd_stale_.store(true, std::memory_order_release);
688 4313x if (self->task_running_.load(std::memory_order_acquire))
689 self->interrupt_reactor();
690 4313x }));
691
692 // Initialize resolver service
693 244x get_resolver_service(ctx, *this);
694
695 // Initialize signal service
696 244x get_signal_service(ctx, *this);
697
698 // Push task sentinel to interleave reactor runs with handler execution
699 244x completed_ops_.push(&task_op_);
700 244x }
701
702 488x inline epoll_scheduler::~epoll_scheduler()
703 {
704 244x if (timer_fd_ >= 0)
705 244x ::close(timer_fd_);
706 244x if (event_fd_ >= 0)
707 244x ::close(event_fd_);
708 244x if (epoll_fd_ >= 0)
709 244x ::close(epoll_fd_);
710 488x }
711
712 inline void
713 244x epoll_scheduler::shutdown()
714 {
715 {
716 244x std::unique_lock lock(mutex_);
717
718 523x while (auto* h = completed_ops_.pop())
719 {
720 279x if (h == &task_op_)
721 244x continue;
722 35x lock.unlock();
723 35x h->destroy();
724 35x lock.lock();
725 279x }
726
727 244x signal_all(lock);
728 244x }
729
730 244x if (event_fd_ >= 0)
731 244x interrupt_reactor();
732 244x }
733
734 inline void
735 6203x epoll_scheduler::post(std::coroutine_handle<> h) const
736 {
737 struct post_handler final : scheduler_op
738 {
739 std::coroutine_handle<> h_;
740
741 6203x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
742
743 12406x ~post_handler() override = default;
744
745 6197x void operator()() override
746 {
747 6197x auto h = h_;
748 6197x delete this;
749 6197x h.resume();
750 6197x }
751
752 6x void destroy() override
753 {
754 6x auto h = h_;
755 6x delete this;
756 6x h.destroy();
757 6x }
758 };
759
760 6203x auto ph = std::make_unique<post_handler>(h);
761
762 // Fast path: same thread posts to private queue
763 // Only count locally; work_cleanup batches to global counter
764 6203x if (auto* ctx = epoll::find_context(this))
765 {
766 4498x ++ctx->private_outstanding_work;
767 4498x ctx->private_queue.push(ph.release());
768 4498x return;
769 }
770
771 // Slow path: cross-thread post requires mutex
772 1705x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
773
774 1705x std::unique_lock lock(mutex_);
775 1705x completed_ops_.push(ph.release());
776 1705x wake_one_thread_and_unlock(lock);
777 6203x }
778
779 inline void
780 55467x epoll_scheduler::post(scheduler_op* h) const
781 {
782 // Fast path: same thread posts to private queue
783 // Only count locally; work_cleanup batches to global counter
784 55467x if (auto* ctx = epoll::find_context(this))
785 {
786 55437x ++ctx->private_outstanding_work;
787 55437x ctx->private_queue.push(h);
788 55437x return;
789 }
790
791 // Slow path: cross-thread post requires mutex
792 30x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
793
794 30x std::unique_lock lock(mutex_);
795 30x completed_ops_.push(h);
796 30x wake_one_thread_and_unlock(lock);
797 30x }
798
799 inline bool
800 736x epoll_scheduler::running_in_this_thread() const noexcept
801 {
802 736x for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
803 450x if (c->key == this)
804 450x return true;
805 286x return false;
806 }
807
808 inline void
809 222x epoll_scheduler::stop()
810 {
811 222x std::unique_lock lock(mutex_);
812 222x if (!stopped_)
813 {
814 191x stopped_ = true;
815 191x signal_all(lock);
816 191x interrupt_reactor();
817 }
818 222x }
819
820 inline bool
821 18x epoll_scheduler::stopped() const noexcept
822 {
823 18x std::unique_lock lock(mutex_);
824 36x return stopped_;
825 18x }
826
827 inline void
828 53x epoll_scheduler::restart()
829 {
830 53x std::unique_lock lock(mutex_);
831 53x stopped_ = false;
832 53x }
833
834 inline std::size_t
835 209x epoll_scheduler::run()
836 {
837 418x if (outstanding_work_.load(std::memory_order_acquire) == 0)
838 {
839 26x stop();
840 26x return 0;
841 }
842
843 183x epoll::thread_context_guard ctx(this);
844 183x std::unique_lock lock(mutex_);
845
846 183x std::size_t n = 0;
847 for (;;)
848 {
849 105679x if (!do_one(lock, -1, &ctx.frame_))
850 183x break;
851 105496x if (n != (std::numeric_limits<std::size_t>::max)())
852 105496x ++n;
853 105496x if (!lock.owns_lock())
854 49881x lock.lock();
855 }
856 183x return n;
857 183x }
858
859 inline std::size_t
860 2x epoll_scheduler::run_one()
861 {
862 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
863 {
864 stop();
865 return 0;
866 }
867
868 2x epoll::thread_context_guard ctx(this);
869 2x std::unique_lock lock(mutex_);
870 2x return do_one(lock, -1, &ctx.frame_);
871 2x }
872
873 inline std::size_t
874 34x epoll_scheduler::wait_one(long usec)
875 {
876 68x if (outstanding_work_.load(std::memory_order_acquire) == 0)
877 {
878 7x stop();
879 7x return 0;
880 }
881
882 27x epoll::thread_context_guard ctx(this);
883 27x std::unique_lock lock(mutex_);
884 27x return do_one(lock, usec, &ctx.frame_);
885 27x }
886
887 inline std::size_t
888 4x epoll_scheduler::poll()
889 {
890 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
891 {
892 1x stop();
893 1x return 0;
894 }
895
896 3x epoll::thread_context_guard ctx(this);
897 3x std::unique_lock lock(mutex_);
898
899 3x std::size_t n = 0;
900 for (;;)
901 {
902 7x if (!do_one(lock, 0, &ctx.frame_))
903 3x break;
904 4x if (n != (std::numeric_limits<std::size_t>::max)())
905 4x ++n;
906 4x if (!lock.owns_lock())
907 4x lock.lock();
908 }
909 3x return n;
910 3x }
911
912 inline std::size_t
913 4x epoll_scheduler::poll_one()
914 {
915 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
916 {
917 2x stop();
918 2x return 0;
919 }
920
921 2x epoll::thread_context_guard ctx(this);
922 2x std::unique_lock lock(mutex_);
923 2x return do_one(lock, 0, &ctx.frame_);
924 2x }
925
926 inline void
927 8278x epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
928 {
929 // Only register read interest upfront. EPOLLOUT is deferred until
930 // the first write/connect op to avoid a spurious write-ready event
931 // on freshly opened (always-writable) sockets.
932 8278x epoll_event ev{};
933 8278x ev.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP;
934 8278x ev.data.ptr = desc;
935
936 8278x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
937 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
938
939 8278x desc->registered_events = ev.events;
940 8278x desc->fd = fd;
941 8278x desc->scheduler_ = this;
942
943 8278x std::lock_guard lock(desc->mutex);
944 8278x desc->read_ready = false;
945 8278x desc->write_ready = false;
946 8278x }
947
948 inline void
949 4095x epoll_scheduler::ensure_write_events(int fd, descriptor_state* desc) const
950 {
951 4095x std::lock_guard lock(desc->mutex);
952 4095x if (desc->registered_events & EPOLLOUT)
953 return;
954
955 4095x epoll_event ev{};
956 4095x ev.events = desc->registered_events | EPOLLOUT;
957 4095x ev.data.ptr = desc;
958 4095x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev) == 0)
959 4095x desc->registered_events = ev.events;
960 4095x }
961
962 inline void
963 8278x epoll_scheduler::deregister_descriptor(int fd) const
964 {
965 8278x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
966 8278x }
967
968 inline void
969 13478x epoll_scheduler::work_started() noexcept
970 {
971 13478x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
972 13478x }
973
974 inline void
975 19512x epoll_scheduler::work_finished() noexcept
976 {
977 39024x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
978 184x stop();
979 19512x }
980
981 inline void
982 35681x epoll_scheduler::compensating_work_started() const noexcept
983 {
984 35681x auto* ctx = epoll::find_context(this);
985 35681x if (ctx)
986 35681x ++ctx->private_outstanding_work;
987 35681x }
988
989 inline void
990 epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
991 {
992 // Note: outstanding_work_ was already incremented when posting
993 std::unique_lock lock(mutex_);
994 completed_ops_.splice(queue);
995 if (count > 0)
996 maybe_unlock_and_signal_one(lock);
997 }
998
999 inline void
1000 8190x epoll_scheduler::post_deferred_completions(op_queue& ops) const
1001 {
1002 8190x if (ops.empty())
1003 8190x return;
1004
1005 // Fast path: if on scheduler thread, use private queue
1006 if (auto* ctx = epoll::find_context(this))
1007 {
1008 ctx->private_queue.splice(ops);
1009 return;
1010 }
1011
1012 // Slow path: add to global queue and wake a thread
1013 std::unique_lock lock(mutex_);
1014 completed_ops_.splice(ops);
1015 wake_one_thread_and_unlock(lock);
1016 }
1017
1018 inline void
1019 461x epoll_scheduler::interrupt_reactor() const
1020 {
1021 // Only write if not already armed to avoid redundant writes
1022 461x bool expected = false;
1023 461x if (eventfd_armed_.compare_exchange_strong(
1024 expected, true, std::memory_order_release,
1025 std::memory_order_relaxed))
1026 {
1027 316x std::uint64_t val = 1;
1028 316x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
1029 }
1030 461x }
1031
1032 inline void
1033 435x epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
1034 {
1035 435x state_ |= 1;
1036 435x cond_.notify_all();
1037 435x }
1038
1039 inline bool
1040 1735x epoll_scheduler::maybe_unlock_and_signal_one(
1041 std::unique_lock<std::mutex>& lock) const
1042 {
1043 1735x state_ |= 1;
1044 1735x if (state_ > 1)
1045 {
1046 lock.unlock();
1047 cond_.notify_one();
1048 return true;
1049 }
1050 1735x return false;
1051 }
1052
1053 inline bool
1054 134277x epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1055 {
1056 134277x state_ |= 1;
1057 134277x bool have_waiters = state_ > 1;
1058 134277x lock.unlock();
1059 134277x if (have_waiters)
1060 cond_.notify_one();
1061 134277x return have_waiters;
1062 }
1063
1064 inline void
1065 epoll_scheduler::clear_signal() const
1066 {
1067 state_ &= ~std::size_t(1);
1068 }
1069
1070 inline void
1071 epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1072 {
1073 while ((state_ & 1) == 0)
1074 {
1075 state_ += 2;
1076 cond_.wait(lock);
1077 state_ -= 2;
1078 }
1079 }
1080
1081 inline void
1082 epoll_scheduler::wait_for_signal_for(
1083 std::unique_lock<std::mutex>& lock, long timeout_us) const
1084 {
1085 if ((state_ & 1) == 0)
1086 {
1087 state_ += 2;
1088 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1089 state_ -= 2;
1090 }
1091 }
1092
1093 inline void
1094 1735x epoll_scheduler::wake_one_thread_and_unlock(
1095 std::unique_lock<std::mutex>& lock) const
1096 {
1097 1735x if (maybe_unlock_and_signal_one(lock))
1098 return;
1099
1100 1735x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1101 {
1102 26x task_interrupted_ = true;
1103 26x lock.unlock();
1104 26x interrupt_reactor();
1105 }
1106 else
1107 {
1108 1709x lock.unlock();
1109 }
1110 }
1111
1112 105531x inline epoll_scheduler::work_cleanup::~work_cleanup()
1113 {
1114 105531x if (ctx)
1115 {
1116 105531x long produced = ctx->private_outstanding_work;
1117 105531x if (produced > 1)
1118 8x scheduler->outstanding_work_.fetch_add(
1119 produced - 1, std::memory_order_relaxed);
1120 105523x else if (produced < 1)
1121 14224x scheduler->work_finished();
1122 105531x ctx->private_outstanding_work = 0;
1123
1124 105531x if (!ctx->private_queue.empty())
1125 {
1126 55626x lock->lock();
1127 55626x scheduler->completed_ops_.splice(ctx->private_queue);
1128 }
1129 }
1130 else
1131 {
1132 scheduler->work_finished();
1133 }
1134 105531x }
1135
1136 74332x inline epoll_scheduler::task_cleanup::~task_cleanup()
1137 {
1138 37166x if (!ctx)
1139 return;
1140
1141 37166x if (ctx->private_outstanding_work > 0)
1142 {
1143 4295x scheduler->outstanding_work_.fetch_add(
1144 4295x ctx->private_outstanding_work, std::memory_order_relaxed);
1145 4295x ctx->private_outstanding_work = 0;
1146 }
1147
1148 37166x if (!ctx->private_queue.empty())
1149 {
1150 4295x if (!lock->owns_lock())
1151 lock->lock();
1152 4295x scheduler->completed_ops_.splice(ctx->private_queue);
1153 }
1154 37166x }
1155
1156 inline void
1157 8587x epoll_scheduler::update_timerfd() const
1158 {
1159 8587x auto nearest = timer_svc_->nearest_expiry();
1160
1161 8587x itimerspec ts{};
1162 8587x int flags = 0;
1163
1164 8587x if (nearest == timer_service::time_point::max())
1165 {
1166 // No timers - disarm by setting to 0 (relative)
1167 }
1168 else
1169 {
1170 8542x auto now = std::chrono::steady_clock::now();
1171 8542x if (nearest <= now)
1172 {
1173 // Use 1ns instead of 0 - zero disarms the timerfd
1174 237x ts.it_value.tv_nsec = 1;
1175 }
1176 else
1177 {
1178 8305x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1179 8305x nearest - now)
1180 8305x .count();
1181 8305x ts.it_value.tv_sec = nsec / 1000000000;
1182 8305x ts.it_value.tv_nsec = nsec % 1000000000;
1183 // Ensure non-zero to avoid disarming if duration rounds to 0
1184 8305x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1185 ts.it_value.tv_nsec = 1;
1186 }
1187 }
1188
1189 8587x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1190 detail::throw_system_error(make_err(errno), "timerfd_settime");
1191 8587x }
1192
1193 inline void
1194 37166x epoll_scheduler::run_task(
1195 std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1196 {
1197 37166x int timeout_ms = task_interrupted_ ? 0 : -1;
1198
1199 37166x if (lock.owns_lock())
1200 8420x lock.unlock();
1201
1202 37166x task_cleanup on_exit{this, &lock, ctx};
1203
1204 // Flush deferred timerfd programming before blocking
1205 37166x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1206 4292x update_timerfd();
1207
1208 // Event loop runs without mutex held
1209 epoll_event events[128];
1210 37166x int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1211
1212 37166x if (nfds < 0 && errno != EINTR)
1213 detail::throw_system_error(make_err(errno), "epoll_wait");
1214
1215 37166x bool check_timers = false;
1216 37166x op_queue local_ops;
1217
1218 // Process events without holding the mutex
1219 85429x for (int i = 0; i < nfds; ++i)
1220 {
1221 48263x if (events[i].data.ptr == nullptr)
1222 {
1223 std::uint64_t val;
1224 // Mutex released above; analyzer can't track unlock via ref
1225 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1226 72x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1227 72x eventfd_armed_.store(false, std::memory_order_relaxed);
1228 72x continue;
1229 72x }
1230
1231 48191x if (events[i].data.ptr == &timer_fd_)
1232 {
1233 std::uint64_t expirations;
1234 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1235 [[maybe_unused]] auto r =
1236 4295x ::read(timer_fd_, &expirations, sizeof(expirations));
1237 4295x check_timers = true;
1238 4295x continue;
1239 4295x }
1240
1241 // Deferred I/O: just set ready events and enqueue descriptor
1242 // No per-descriptor mutex locking in reactor hot path!
1243 43896x auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1244 43896x desc->add_ready_events(events[i].events);
1245
1246 // Only enqueue if not already enqueued
1247 43896x bool expected = false;
1248 43896x if (desc->is_enqueued_.compare_exchange_strong(
1249 expected, true, std::memory_order_release,
1250 std::memory_order_relaxed))
1251 {
1252 43896x local_ops.push(desc);
1253 }
1254 }
1255
1256 // Process timers only when timerfd fires
1257 37166x if (check_timers)
1258 {
1259 4295x timer_svc_->process_expired();
1260 4295x update_timerfd();
1261 }
1262
1263 37166x lock.lock();
1264
1265 37166x if (!local_ops.empty())
1266 28214x completed_ops_.splice(local_ops);
1267 37166x }
1268
1269 inline std::size_t
1270 105717x epoll_scheduler::do_one(
1271 std::unique_lock<std::mutex>& lock,
1272 long timeout_us,
1273 epoll::scheduler_context* ctx)
1274 {
1275 for (;;)
1276 {
1277 142883x if (stopped_)
1278 183x return 0;
1279
1280 142700x scheduler_op* op = completed_ops_.pop();
1281
1282 // Handle reactor sentinel - time to poll for I/O
1283 142700x if (op == &task_op_)
1284 {
1285 37168x bool more_handlers = !completed_ops_.empty();
1286
1287 // Nothing to run the reactor for: no pending work to wait on,
1288 // or caller requested a non-blocking poll
1289 45590x if (!more_handlers &&
1290 16844x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1291 timeout_us == 0))
1292 {
1293 2x completed_ops_.push(&task_op_);
1294 2x return 0;
1295 }
1296
1297 37166x task_interrupted_ = more_handlers || timeout_us == 0;
1298 37166x task_running_.store(true, std::memory_order_release);
1299
1300 37166x if (more_handlers)
1301 28746x unlock_and_signal_one(lock);
1302
1303 37166x run_task(lock, ctx);
1304
1305 37166x task_running_.store(false, std::memory_order_relaxed);
1306 37166x completed_ops_.push(&task_op_);
1307 37166x continue;
1308 37166x }
1309
1310 // Handle operation
1311 105532x if (op != nullptr)
1312 {
1313 105531x bool more = !completed_ops_.empty();
1314
1315 105531x if (more)
1316 105531x ctx->unassisted = !unlock_and_signal_one(lock);
1317 else
1318 {
1319 ctx->unassisted = false;
1320 lock.unlock();
1321 }
1322
1323 105531x work_cleanup on_exit{this, &lock, ctx};
1324
1325 105531x (*op)();
1326 105531x return 1;
1327 105531x }
1328
1329 // No pending work to wait on, or caller requested non-blocking poll
1330 2x if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1331 timeout_us == 0)
1332 1x return 0;
1333
1334 clear_signal();
1335 if (timeout_us < 0)
1336 wait_for_signal(lock);
1337 else
1338 wait_for_signal_for(lock, timeout_us);
1339 37166x }
1340 }
1341
1342 } // namespace boost::corosio::detail
1343
1344 #endif // BOOST_COROSIO_HAS_EPOLL
1345
1346 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
1347