TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/native_scheduler.hpp>
21 : #include <boost/corosio/detail/scheduler_op.hpp>
22 :
23 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24 : #include <boost/corosio/detail/timer_service.hpp>
25 : #include <boost/corosio/native/detail/make_err.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 :
29 : #include <boost/corosio/detail/except.hpp>
30 : #include <boost/corosio/detail/thread_local_ptr.hpp>
31 :
32 : #include <atomic>
33 : #include <chrono>
34 : #include <condition_variable>
35 : #include <cstddef>
36 : #include <cstdint>
37 : #include <limits>
38 : #include <mutex>
39 : #include <utility>
40 :
41 : #include <errno.h>
42 : #include <fcntl.h>
43 : #include <sys/epoll.h>
44 : #include <sys/eventfd.h>
45 : #include <sys/socket.h>
46 : #include <sys/timerfd.h>
47 : #include <unistd.h>
48 :
49 : namespace boost::corosio::detail {
50 :
51 : struct epoll_op;
52 : struct descriptor_state;
53 : namespace epoll {
54 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55 : } // namespace epoll
56 :
57 : /** Linux scheduler using epoll for I/O multiplexing.
58 :
59 : This scheduler implements the scheduler interface using Linux epoll
60 : for efficient I/O event notification. It uses a single reactor model
61 : where one thread runs epoll_wait while other threads
62 : wait on a condition variable for handler work. This design provides:
63 :
64 : - Handler parallelism: N posted handlers can execute on N threads
65 : - No thundering herd: condition_variable wakes exactly one thread
66 : - IOCP parity: Behavior matches Windows I/O completion port semantics
67 :
68 : When threads call run(), they first try to execute queued handlers.
69 : If the queue is empty and no reactor is running, one thread becomes
70 : the reactor and runs epoll_wait. Other threads wait on a condition
71 : variable until handlers are available.
72 :
73 : @par Thread Safety
74 : All public member functions are thread-safe.
75 : */
76 : class BOOST_COROSIO_DECL epoll_scheduler final
77 : : public native_scheduler
78 : , public capy::execution_context::service
79 : {
80 : public:
81 : using key_type = scheduler;
82 :
83 : /** Construct the scheduler.
84 :
85 : Creates an epoll instance, eventfd for reactor interruption,
86 : and timerfd for kernel-managed timer expiry.
87 :
88 : @param ctx Reference to the owning execution_context.
89 : @param concurrency_hint Hint for expected thread count (unused).
90 : */
91 : epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92 :
93 : /// Destroy the scheduler.
94 : ~epoll_scheduler() override;
95 :
96 : epoll_scheduler(epoll_scheduler const&) = delete;
97 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98 :
99 : void shutdown() override;
100 : void post(std::coroutine_handle<> h) const override;
101 : void post(scheduler_op* h) const override;
102 : bool running_in_this_thread() const noexcept override;
103 : void stop() override;
104 : bool stopped() const noexcept override;
105 : void restart() override;
106 : std::size_t run() override;
107 : std::size_t run_one() override;
108 : std::size_t wait_one(long usec) override;
109 : std::size_t poll() override;
110 : std::size_t poll_one() override;
111 :
112 : /** Return the epoll file descriptor.
113 :
114 : Used by socket services to register file descriptors
115 : for I/O event notification.
116 :
117 : @return The epoll file descriptor.
118 : */
119 : int epoll_fd() const noexcept
120 : {
121 : return epoll_fd_;
122 : }
123 :
124 : /** Reset the thread's inline completion budget.
125 :
126 : Called at the start of each posted completion handler to
127 : grant a fresh budget for speculative inline completions.
128 : */
129 : void reset_inline_budget() const noexcept;
130 :
131 : /** Consume one unit of inline budget if available.
132 :
133 : @return True if budget was available and consumed.
134 : */
135 : bool try_consume_inline_budget() const noexcept;
136 :
137 : /** Register a descriptor for persistent monitoring.
138 :
139 : The fd is registered once and stays registered until explicitly
140 : deregistered. Events are dispatched via descriptor_state which
141 : tracks pending read/write/connect operations.
142 :
143 : @param fd The file descriptor to register.
144 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145 : */
146 : void register_descriptor(int fd, descriptor_state* desc) const;
147 :
148 : /** Ensure EPOLLOUT is registered for a descriptor.
149 :
150 : No-op if the write interest is already registered. Called lazily
151 : before the first write or connect operation that needs async
152 : notification. Uses EPOLL_CTL_MOD to add EPOLLOUT to the
153 : existing registration.
154 :
155 : @param fd The file descriptor.
156 : @param desc The descriptor_state that owns the fd.
157 : */
158 : void ensure_write_events(int fd, descriptor_state* desc) const;
159 :
160 : /** Deregister a persistently registered descriptor.
161 :
162 : @param fd The file descriptor to deregister.
163 : */
164 : void deregister_descriptor(int fd) const;
165 :
166 : void work_started() noexcept override;
167 : void work_finished() noexcept override;
168 :
169 : /** Offset a forthcoming work_finished from work_cleanup.
170 :
171 : Called by descriptor_state when all I/O returned EAGAIN and no
172 : handler will be executed. Must be called from a scheduler thread.
173 : */
174 : void compensating_work_started() const noexcept;
175 :
176 : /** Drain work from thread context's private queue to global queue.
177 :
178 : Called by thread_context_guard destructor when a thread exits run().
179 : Transfers pending work to the global queue under mutex protection.
180 :
181 : @param queue The private queue to drain.
182 : @param count Item count for wakeup decisions (wakes other threads if positive).
183 : */
184 : void drain_thread_queue(op_queue& queue, long count) const;
185 :
186 : /** Post completed operations for deferred invocation.
187 :
188 : If called from a thread running this scheduler, operations go to
189 : the thread's private queue (fast path). Otherwise, operations are
190 : added to the global queue under mutex and a waiter is signaled.
191 :
192 : @par Preconditions
193 : work_started() must have been called for each operation.
194 :
195 : @param ops Queue of operations to post.
196 : */
197 : void post_deferred_completions(op_queue& ops) const;
198 :
199 : private:
200 : struct work_cleanup
201 : {
202 : epoll_scheduler* scheduler;
203 : std::unique_lock<std::mutex>* lock;
204 : epoll::scheduler_context* ctx;
205 : ~work_cleanup();
206 : };
207 :
208 : struct task_cleanup
209 : {
210 : epoll_scheduler const* scheduler;
211 : std::unique_lock<std::mutex>* lock;
212 : epoll::scheduler_context* ctx;
213 : ~task_cleanup();
214 : };
215 :
216 : std::size_t do_one(
217 : std::unique_lock<std::mutex>& lock,
218 : long timeout_us,
219 : epoll::scheduler_context* ctx);
220 : void
221 : run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
222 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
223 : void interrupt_reactor() const;
224 : void update_timerfd() const;
225 :
226 : /** Set the signaled state and wake all waiting threads.
227 :
228 : @par Preconditions
229 : Mutex must be held.
230 :
231 : @param lock The held mutex lock.
232 : */
233 : void signal_all(std::unique_lock<std::mutex>& lock) const;
234 :
235 : /** Set the signaled state and wake one waiter if any exist.
236 :
237 : Only unlocks and signals if at least one thread is waiting.
238 : Use this when the caller needs to perform a fallback action
239 : (such as interrupting the reactor) when no waiters exist.
240 :
241 : @par Preconditions
242 : Mutex must be held.
243 :
244 : @param lock The held mutex lock.
245 :
246 : @return `true` if unlocked and signaled, `false` if lock still held.
247 : */
248 : bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
249 :
250 : /** Set the signaled state, unlock, and wake one waiter if any exist.
251 :
252 : Always unlocks the mutex. Use this when the caller will release
253 : the lock regardless of whether a waiter exists.
254 :
255 : @par Preconditions
256 : Mutex must be held.
257 :
258 : @param lock The held mutex lock.
259 :
260 : @return `true` if a waiter was signaled, `false` otherwise.
261 : */
262 : bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
263 :
264 : /** Clear the signaled state before waiting.
265 :
266 : @par Preconditions
267 : Mutex must be held.
268 : */
269 : void clear_signal() const;
270 :
271 : /** Block until the signaled state is set.
272 :
273 : Returns immediately if already signaled (fast-path). Otherwise
274 : increments the waiter count, waits on the condition variable,
275 : and decrements the waiter count upon waking.
276 :
277 : @par Preconditions
278 : Mutex must be held.
279 :
280 : @param lock The held mutex lock.
281 : */
282 : void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
283 :
284 : /** Block until signaled or timeout expires.
285 :
286 : @par Preconditions
287 : Mutex must be held.
288 :
289 : @param lock The held mutex lock.
290 : @param timeout_us Maximum time to wait in microseconds.
291 : */
292 : void wait_for_signal_for(
293 : std::unique_lock<std::mutex>& lock, long timeout_us) const;
294 :
295 : int epoll_fd_;
296 : int event_fd_; // for interrupting reactor
297 : int timer_fd_; // timerfd for kernel-managed timer expiry
298 : mutable std::mutex mutex_;
299 : mutable std::condition_variable cond_;
300 : mutable op_queue completed_ops_;
301 : mutable std::atomic<long> outstanding_work_;
302 : bool stopped_;
303 :
304 : // True while a thread is blocked in epoll_wait. Used by
305 : // wake_one_thread_and_unlock and work_finished to know when
306 : // an eventfd interrupt is needed instead of a condvar signal.
307 : mutable std::atomic<bool> task_running_{false};
308 :
309 : // True when the reactor has been told to do a non-blocking poll
310 : // (more handlers queued or poll mode). Prevents redundant eventfd
311 : // writes and controls the epoll_wait timeout.
312 : mutable bool task_interrupted_ = false;
313 :
314 : // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
315 : mutable std::size_t state_ = 0;
316 :
317 : // Edge-triggered eventfd state
318 : mutable std::atomic<bool> eventfd_armed_{false};
319 :
320 : // Set when the earliest timer changes; flushed before epoll_wait
321 : // blocks. Avoids timerfd_settime syscalls for timers that are
322 : // scheduled then cancelled without being waited on.
323 : mutable std::atomic<bool> timerfd_stale_{false};
324 :
325 : // Sentinel operation for interleaving reactor runs with handler execution.
326 : // Ensures the reactor runs periodically even when handlers are continuously
327 : // posted, preventing starvation of I/O events, timers, and signals.
328 : struct task_op final : scheduler_op
329 : {
330 MIS 0 : void operator()() override {}
331 0 : void destroy() override {}
332 : };
333 : task_op task_op_;
334 : };
335 :
336 : //--------------------------------------------------------------------------
337 : //
338 : // Implementation
339 : //
340 : //--------------------------------------------------------------------------
341 :
342 : /*
343 : epoll Scheduler - Single Reactor Model
344 : ======================================
345 :
346 : This scheduler uses a thread coordination strategy to provide handler
347 : parallelism and avoid the thundering herd problem.
348 : Instead of all threads blocking on epoll_wait(), one thread becomes the
349 : "reactor" while others wait on a condition variable for handler work.
350 :
351 : Thread Model
352 : ------------
353 : - ONE thread runs epoll_wait() at a time (the reactor thread)
354 : - OTHER threads wait on cond_ (condition variable) for handlers
355 : - When work is posted, exactly one waiting thread wakes via notify_one()
356 : - This matches Windows IOCP semantics where N posted items wake N threads
357 :
358 : Event Loop Structure (do_one)
359 : -----------------------------
360 : 1. Lock mutex, try to pop handler from queue
361 : 2. If got handler: execute it (unlocked), return
362 : 3. If queue empty and no reactor running: become reactor
363 : - Run epoll_wait (unlocked), queue I/O completions, loop back
364 : 4. If queue empty and reactor running: wait on condvar for work
365 :
366 : The task_running_ flag ensures only one thread owns epoll_wait().
367 : After the reactor queues I/O completions, it loops back to try getting
368 : a handler, giving priority to handler execution over more I/O polling.
369 :
370 : Signaling State (state_)
371 : ------------------------
372 : The state_ variable encodes two pieces of information:
373 : - Bit 0: signaled flag (1 = signaled, persists until cleared)
374 : - Upper bits: waiter count (each waiter adds 2 before blocking)
375 :
376 : This allows efficient coordination:
377 : - Signalers only call notify when waiters exist (state_ > 1)
378 : - Waiters check if already signaled before blocking (fast-path)
379 :
380 : Wake Coordination (wake_one_thread_and_unlock)
381 : ----------------------------------------------
382 : When posting work:
383 : - If waiters exist (state_ > 1): signal and notify_one()
384 : - Else if reactor running: interrupt via eventfd write
385 : - Else: no-op (thread will find work when it checks queue)
386 :
387 : This avoids waking threads unnecessarily. With cascading wakes,
388 : each handler execution wakes at most one additional thread if
389 : more work exists in the queue.
390 :
391 : Work Counting
392 : -------------
393 : outstanding_work_ tracks pending operations. When it hits zero, run()
394 : returns. Each operation increments on start, decrements on completion.
395 :
396 : Timer Integration
397 : -----------------
398 : Timers are handled by timer_service. The reactor adjusts epoll_wait
399 : timeout to wake for the nearest timer expiry. When a new timer is
400 : scheduled earlier than current, timer_service calls interrupt_reactor()
401 : to re-evaluate the timeout.
402 : */
403 :
404 : namespace epoll {
405 :
406 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
407 : {
408 : epoll_scheduler const* key;
409 : scheduler_context* next;
410 : op_queue private_queue;
411 : long private_outstanding_work;
412 : int inline_budget;
413 : int inline_budget_max;
414 : bool unassisted;
415 :
416 HIT 217 : scheduler_context(epoll_scheduler const* k, scheduler_context* n)
417 217 : : key(k)
418 217 : , next(n)
419 217 : , private_outstanding_work(0)
420 217 : , inline_budget(0)
421 217 : , inline_budget_max(2)
422 217 : , unassisted(false)
423 : {
424 217 : }
425 : };
426 :
427 : inline thread_local_ptr<scheduler_context> context_stack;
428 :
429 : struct thread_context_guard
430 : {
431 : scheduler_context frame_;
432 :
433 217 : explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
434 217 : : frame_(ctx, context_stack.get())
435 : {
436 217 : context_stack.set(&frame_);
437 217 : }
438 :
439 217 : ~thread_context_guard() noexcept
440 : {
441 217 : if (!frame_.private_queue.empty())
442 MIS 0 : frame_.key->drain_thread_queue(
443 0 : frame_.private_queue, frame_.private_outstanding_work);
444 HIT 217 : context_stack.set(frame_.next);
445 217 : }
446 : };
447 :
448 : inline scheduler_context*
449 411440 : find_context(epoll_scheduler const* self) noexcept
450 : {
451 411440 : for (auto* c = context_stack.get(); c != nullptr; c = c->next)
452 409705 : if (c->key == self)
453 409705 : return c;
454 1735 : return nullptr;
455 : }
456 :
457 : } // namespace epoll
458 :
459 : inline void
460 59286 : epoll_scheduler::reset_inline_budget() const noexcept
461 : {
462 59286 : if (auto* ctx = epoll::find_context(this))
463 : {
464 : // Cap when no other thread absorbed queued work. A moderate
465 : // cap (4) amortizes scheduling for small buffers while avoiding
466 : // bursty I/O that fills socket buffers and stalls large transfers.
467 59286 : if (ctx->unassisted)
468 : {
469 59286 : ctx->inline_budget_max = 4;
470 59286 : ctx->inline_budget = 4;
471 59286 : return;
472 : }
473 : // Ramp up when previous cycle fully consumed budget.
474 : // Reset on partial consumption (EAGAIN hit or peer got scheduled).
475 MIS 0 : if (ctx->inline_budget == 0)
476 0 : ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
477 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
478 0 : ctx->inline_budget_max = 2;
479 0 : ctx->inline_budget = ctx->inline_budget_max;
480 : }
481 : }
482 :
483 : inline bool
484 HIT 254803 : epoll_scheduler::try_consume_inline_budget() const noexcept
485 : {
486 254803 : if (auto* ctx = epoll::find_context(this))
487 : {
488 254803 : if (ctx->inline_budget > 0)
489 : {
490 203913 : --ctx->inline_budget;
491 203913 : return true;
492 : }
493 : }
494 50890 : return false;
495 : }
496 :
497 : inline void
498 43871 : descriptor_state::operator()()
499 : {
500 43871 : is_enqueued_.store(false, std::memory_order_relaxed);
501 :
502 : // Take ownership of impl ref set by close_socket() to prevent
503 : // the owning impl from being freed while we're executing
504 43871 : auto prevent_impl_destruction = std::move(impl_ref_);
505 :
506 43871 : std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
507 43871 : if (ev == 0)
508 : {
509 MIS 0 : scheduler_->compensating_work_started();
510 0 : return;
511 : }
512 :
513 HIT 43871 : op_queue local_ops;
514 :
515 43871 : int err = 0;
516 43871 : if (ev & EPOLLERR)
517 : {
518 2 : socklen_t len = sizeof(err);
519 2 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
520 MIS 0 : err = errno;
521 HIT 2 : if (err == 0)
522 MIS 0 : err = EIO;
523 : }
524 :
525 : {
526 HIT 43871 : std::lock_guard lock(mutex);
527 43871 : if (ev & EPOLLIN)
528 : {
529 14220 : if (read_op)
530 : {
531 4095 : auto* rd = read_op;
532 4095 : if (err)
533 MIS 0 : rd->complete(err, 0);
534 : else
535 HIT 4095 : rd->perform_io();
536 :
537 4095 : if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
538 : {
539 MIS 0 : rd->errn = 0;
540 : }
541 : else
542 : {
543 HIT 4095 : read_op = nullptr;
544 4095 : local_ops.push(rd);
545 : }
546 : }
547 : else
548 : {
549 10125 : read_ready = true;
550 : }
551 : }
552 43871 : if (ev & EPOLLOUT)
553 : {
554 39728 : bool had_write_op = (connect_op || write_op);
555 39728 : if (connect_op)
556 : {
557 4095 : auto* cn = connect_op;
558 4095 : if (err)
559 2 : cn->complete(err, 0);
560 : else
561 4093 : cn->perform_io();
562 :
563 4095 : if (cn->errn == EAGAIN || cn->errn == EWOULDBLOCK)
564 : {
565 MIS 0 : cn->errn = 0;
566 : }
567 : else
568 : {
569 HIT 4095 : connect_op = nullptr;
570 4095 : local_ops.push(cn);
571 : }
572 : }
573 39728 : if (write_op)
574 : {
575 MIS 0 : auto* wr = write_op;
576 0 : if (err)
577 0 : wr->complete(err, 0);
578 : else
579 0 : wr->perform_io();
580 :
581 0 : if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
582 : {
583 0 : wr->errn = 0;
584 : }
585 : else
586 : {
587 0 : write_op = nullptr;
588 0 : local_ops.push(wr);
589 : }
590 : }
591 HIT 39728 : if (!had_write_op)
592 35633 : write_ready = true;
593 : }
594 43871 : if (err)
595 : {
596 2 : if (read_op)
597 : {
598 MIS 0 : read_op->complete(err, 0);
599 0 : local_ops.push(std::exchange(read_op, nullptr));
600 : }
601 HIT 2 : if (write_op)
602 : {
603 MIS 0 : write_op->complete(err, 0);
604 0 : local_ops.push(std::exchange(write_op, nullptr));
605 : }
606 HIT 2 : if (connect_op)
607 : {
608 MIS 0 : connect_op->complete(err, 0);
609 0 : local_ops.push(std::exchange(connect_op, nullptr));
610 : }
611 : }
612 HIT 43871 : }
613 :
614 : // Execute first handler inline — the scheduler's work_cleanup
615 : // accounts for this as the "consumed" work item
616 43871 : scheduler_op* first = local_ops.pop();
617 43871 : if (first)
618 : {
619 8190 : scheduler_->post_deferred_completions(local_ops);
620 8190 : (*first)();
621 : }
622 : else
623 : {
624 35681 : scheduler_->compensating_work_started();
625 : }
626 43871 : }
627 :
628 244 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
629 244 : : epoll_fd_(-1)
630 244 : , event_fd_(-1)
631 244 : , timer_fd_(-1)
632 244 : , outstanding_work_(0)
633 244 : , stopped_(false)
634 244 : , task_running_{false}
635 244 : , task_interrupted_(false)
636 488 : , state_(0)
637 : {
638 244 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
639 244 : if (epoll_fd_ < 0)
640 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
641 :
642 HIT 244 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
643 244 : if (event_fd_ < 0)
644 : {
645 MIS 0 : int errn = errno;
646 0 : ::close(epoll_fd_);
647 0 : detail::throw_system_error(make_err(errn), "eventfd");
648 : }
649 :
650 HIT 244 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
651 244 : if (timer_fd_ < 0)
652 : {
653 MIS 0 : int errn = errno;
654 0 : ::close(event_fd_);
655 0 : ::close(epoll_fd_);
656 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
657 : }
658 :
659 HIT 244 : epoll_event ev{};
660 244 : ev.events = EPOLLIN | EPOLLET;
661 244 : ev.data.ptr = nullptr;
662 244 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
663 : {
664 MIS 0 : int errn = errno;
665 0 : ::close(timer_fd_);
666 0 : ::close(event_fd_);
667 0 : ::close(epoll_fd_);
668 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
669 : }
670 :
671 HIT 244 : epoll_event timer_ev{};
672 244 : timer_ev.events = EPOLLIN | EPOLLERR;
673 244 : timer_ev.data.ptr = &timer_fd_;
674 244 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
675 : {
676 MIS 0 : int errn = errno;
677 0 : ::close(timer_fd_);
678 0 : ::close(event_fd_);
679 0 : ::close(epoll_fd_);
680 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
681 : }
682 :
683 HIT 244 : timer_svc_ = &get_timer_service(ctx, *this);
684 244 : timer_svc_->set_on_earliest_changed(
685 4557 : timer_service::callback(this, [](void* p) {
686 4313 : auto* self = static_cast<epoll_scheduler*>(p);
687 4313 : self->timerfd_stale_.store(true, std::memory_order_release);
688 4313 : if (self->task_running_.load(std::memory_order_acquire))
689 MIS 0 : self->interrupt_reactor();
690 HIT 4313 : }));
691 :
692 : // Initialize resolver service
693 244 : get_resolver_service(ctx, *this);
694 :
695 : // Initialize signal service
696 244 : get_signal_service(ctx, *this);
697 :
698 : // Push task sentinel to interleave reactor runs with handler execution
699 244 : completed_ops_.push(&task_op_);
700 244 : }
701 :
702 488 : inline epoll_scheduler::~epoll_scheduler()
703 : {
704 244 : if (timer_fd_ >= 0)
705 244 : ::close(timer_fd_);
706 244 : if (event_fd_ >= 0)
707 244 : ::close(event_fd_);
708 244 : if (epoll_fd_ >= 0)
709 244 : ::close(epoll_fd_);
710 488 : }
711 :
712 : inline void
713 244 : epoll_scheduler::shutdown()
714 : {
715 : {
716 244 : std::unique_lock lock(mutex_);
717 :
718 523 : while (auto* h = completed_ops_.pop())
719 : {
720 279 : if (h == &task_op_)
721 244 : continue;
722 35 : lock.unlock();
723 35 : h->destroy();
724 35 : lock.lock();
725 279 : }
726 :
727 244 : signal_all(lock);
728 244 : }
729 :
730 244 : if (event_fd_ >= 0)
731 244 : interrupt_reactor();
732 244 : }
733 :
734 : inline void
735 6203 : epoll_scheduler::post(std::coroutine_handle<> h) const
736 : {
737 : struct post_handler final : scheduler_op
738 : {
739 : std::coroutine_handle<> h_;
740 :
741 6203 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
742 :
743 12406 : ~post_handler() override = default;
744 :
745 6197 : void operator()() override
746 : {
747 6197 : auto h = h_;
748 6197 : delete this;
749 6197 : h.resume();
750 6197 : }
751 :
752 6 : void destroy() override
753 : {
754 6 : auto h = h_;
755 6 : delete this;
756 6 : h.destroy();
757 6 : }
758 : };
759 :
760 6203 : auto ph = std::make_unique<post_handler>(h);
761 :
762 : // Fast path: same thread posts to private queue
763 : // Only count locally; work_cleanup batches to global counter
764 6203 : if (auto* ctx = epoll::find_context(this))
765 : {
766 4498 : ++ctx->private_outstanding_work;
767 4498 : ctx->private_queue.push(ph.release());
768 4498 : return;
769 : }
770 :
771 : // Slow path: cross-thread post requires mutex
772 1705 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
773 :
774 1705 : std::unique_lock lock(mutex_);
775 1705 : completed_ops_.push(ph.release());
776 1705 : wake_one_thread_and_unlock(lock);
777 6203 : }
778 :
779 : inline void
780 55467 : epoll_scheduler::post(scheduler_op* h) const
781 : {
782 : // Fast path: same thread posts to private queue
783 : // Only count locally; work_cleanup batches to global counter
784 55467 : if (auto* ctx = epoll::find_context(this))
785 : {
786 55437 : ++ctx->private_outstanding_work;
787 55437 : ctx->private_queue.push(h);
788 55437 : return;
789 : }
790 :
791 : // Slow path: cross-thread post requires mutex
792 30 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
793 :
794 30 : std::unique_lock lock(mutex_);
795 30 : completed_ops_.push(h);
796 30 : wake_one_thread_and_unlock(lock);
797 30 : }
798 :
799 : inline bool
800 736 : epoll_scheduler::running_in_this_thread() const noexcept
801 : {
802 736 : for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
803 450 : if (c->key == this)
804 450 : return true;
805 286 : return false;
806 : }
807 :
808 : inline void
809 222 : epoll_scheduler::stop()
810 : {
811 222 : std::unique_lock lock(mutex_);
812 222 : if (!stopped_)
813 : {
814 191 : stopped_ = true;
815 191 : signal_all(lock);
816 191 : interrupt_reactor();
817 : }
818 222 : }
819 :
820 : inline bool
821 18 : epoll_scheduler::stopped() const noexcept
822 : {
823 18 : std::unique_lock lock(mutex_);
824 36 : return stopped_;
825 18 : }
826 :
827 : inline void
828 53 : epoll_scheduler::restart()
829 : {
830 53 : std::unique_lock lock(mutex_);
831 53 : stopped_ = false;
832 53 : }
833 :
834 : inline std::size_t
835 209 : epoll_scheduler::run()
836 : {
837 418 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
838 : {
839 26 : stop();
840 26 : return 0;
841 : }
842 :
843 183 : epoll::thread_context_guard ctx(this);
844 183 : std::unique_lock lock(mutex_);
845 :
846 183 : std::size_t n = 0;
847 : for (;;)
848 : {
849 105679 : if (!do_one(lock, -1, &ctx.frame_))
850 183 : break;
851 105496 : if (n != (std::numeric_limits<std::size_t>::max)())
852 105496 : ++n;
853 105496 : if (!lock.owns_lock())
854 49881 : lock.lock();
855 : }
856 183 : return n;
857 183 : }
858 :
859 : inline std::size_t
860 2 : epoll_scheduler::run_one()
861 : {
862 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
863 : {
864 MIS 0 : stop();
865 0 : return 0;
866 : }
867 :
868 HIT 2 : epoll::thread_context_guard ctx(this);
869 2 : std::unique_lock lock(mutex_);
870 2 : return do_one(lock, -1, &ctx.frame_);
871 2 : }
872 :
873 : inline std::size_t
874 34 : epoll_scheduler::wait_one(long usec)
875 : {
876 68 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
877 : {
878 7 : stop();
879 7 : return 0;
880 : }
881 :
882 27 : epoll::thread_context_guard ctx(this);
883 27 : std::unique_lock lock(mutex_);
884 27 : return do_one(lock, usec, &ctx.frame_);
885 27 : }
886 :
887 : inline std::size_t
888 4 : epoll_scheduler::poll()
889 : {
890 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
891 : {
892 1 : stop();
893 1 : return 0;
894 : }
895 :
896 3 : epoll::thread_context_guard ctx(this);
897 3 : std::unique_lock lock(mutex_);
898 :
899 3 : std::size_t n = 0;
900 : for (;;)
901 : {
902 7 : if (!do_one(lock, 0, &ctx.frame_))
903 3 : break;
904 4 : if (n != (std::numeric_limits<std::size_t>::max)())
905 4 : ++n;
906 4 : if (!lock.owns_lock())
907 4 : lock.lock();
908 : }
909 3 : return n;
910 3 : }
911 :
912 : inline std::size_t
913 4 : epoll_scheduler::poll_one()
914 : {
915 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
916 : {
917 2 : stop();
918 2 : return 0;
919 : }
920 :
921 2 : epoll::thread_context_guard ctx(this);
922 2 : std::unique_lock lock(mutex_);
923 2 : return do_one(lock, 0, &ctx.frame_);
924 2 : }
925 :
926 : inline void
927 8278 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
928 : {
929 : // Only register read interest upfront. EPOLLOUT is deferred until
930 : // the first write/connect op to avoid a spurious write-ready event
931 : // on freshly opened (always-writable) sockets.
932 8278 : epoll_event ev{};
933 8278 : ev.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP;
934 8278 : ev.data.ptr = desc;
935 :
936 8278 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
937 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
938 :
939 HIT 8278 : desc->registered_events = ev.events;
940 8278 : desc->fd = fd;
941 8278 : desc->scheduler_ = this;
942 :
943 8278 : std::lock_guard lock(desc->mutex);
944 8278 : desc->read_ready = false;
945 8278 : desc->write_ready = false;
946 8278 : }
947 :
948 : inline void
949 4095 : epoll_scheduler::ensure_write_events(int fd, descriptor_state* desc) const
950 : {
951 4095 : std::lock_guard lock(desc->mutex);
952 4095 : if (desc->registered_events & EPOLLOUT)
953 MIS 0 : return;
954 :
955 HIT 4095 : epoll_event ev{};
956 4095 : ev.events = desc->registered_events | EPOLLOUT;
957 4095 : ev.data.ptr = desc;
958 4095 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev) == 0)
959 4095 : desc->registered_events = ev.events;
960 4095 : }
961 :
962 : inline void
963 8278 : epoll_scheduler::deregister_descriptor(int fd) const
964 : {
965 8278 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
966 8278 : }
967 :
968 : inline void
969 13478 : epoll_scheduler::work_started() noexcept
970 : {
971 13478 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
972 13478 : }
973 :
974 : inline void
975 19512 : epoll_scheduler::work_finished() noexcept
976 : {
977 39024 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
978 184 : stop();
979 19512 : }
980 :
981 : inline void
982 35681 : epoll_scheduler::compensating_work_started() const noexcept
983 : {
984 35681 : auto* ctx = epoll::find_context(this);
985 35681 : if (ctx)
986 35681 : ++ctx->private_outstanding_work;
987 35681 : }
988 :
989 : inline void
990 MIS 0 : epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
991 : {
992 : // Note: outstanding_work_ was already incremented when posting
993 0 : std::unique_lock lock(mutex_);
994 0 : completed_ops_.splice(queue);
995 0 : if (count > 0)
996 0 : maybe_unlock_and_signal_one(lock);
997 0 : }
998 :
999 : inline void
1000 HIT 8190 : epoll_scheduler::post_deferred_completions(op_queue& ops) const
1001 : {
1002 8190 : if (ops.empty())
1003 8190 : return;
1004 :
1005 : // Fast path: if on scheduler thread, use private queue
1006 MIS 0 : if (auto* ctx = epoll::find_context(this))
1007 : {
1008 0 : ctx->private_queue.splice(ops);
1009 0 : return;
1010 : }
1011 :
1012 : // Slow path: add to global queue and wake a thread
1013 0 : std::unique_lock lock(mutex_);
1014 0 : completed_ops_.splice(ops);
1015 0 : wake_one_thread_and_unlock(lock);
1016 0 : }
1017 :
1018 : inline void
1019 HIT 461 : epoll_scheduler::interrupt_reactor() const
1020 : {
1021 : // Only write if not already armed to avoid redundant writes
1022 461 : bool expected = false;
1023 461 : if (eventfd_armed_.compare_exchange_strong(
1024 : expected, true, std::memory_order_release,
1025 : std::memory_order_relaxed))
1026 : {
1027 316 : std::uint64_t val = 1;
1028 316 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
1029 : }
1030 461 : }
1031 :
1032 : inline void
1033 435 : epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
1034 : {
1035 435 : state_ |= 1;
1036 435 : cond_.notify_all();
1037 435 : }
1038 :
1039 : inline bool
1040 1735 : epoll_scheduler::maybe_unlock_and_signal_one(
1041 : std::unique_lock<std::mutex>& lock) const
1042 : {
1043 1735 : state_ |= 1;
1044 1735 : if (state_ > 1)
1045 : {
1046 MIS 0 : lock.unlock();
1047 0 : cond_.notify_one();
1048 0 : return true;
1049 : }
1050 HIT 1735 : return false;
1051 : }
1052 :
1053 : inline bool
1054 134277 : epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1055 : {
1056 134277 : state_ |= 1;
1057 134277 : bool have_waiters = state_ > 1;
1058 134277 : lock.unlock();
1059 134277 : if (have_waiters)
1060 MIS 0 : cond_.notify_one();
1061 HIT 134277 : return have_waiters;
1062 : }
1063 :
1064 : inline void
1065 MIS 0 : epoll_scheduler::clear_signal() const
1066 : {
1067 0 : state_ &= ~std::size_t(1);
1068 0 : }
1069 :
1070 : inline void
1071 0 : epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1072 : {
1073 0 : while ((state_ & 1) == 0)
1074 : {
1075 0 : state_ += 2;
1076 0 : cond_.wait(lock);
1077 0 : state_ -= 2;
1078 : }
1079 0 : }
1080 :
1081 : inline void
1082 0 : epoll_scheduler::wait_for_signal_for(
1083 : std::unique_lock<std::mutex>& lock, long timeout_us) const
1084 : {
1085 0 : if ((state_ & 1) == 0)
1086 : {
1087 0 : state_ += 2;
1088 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1089 0 : state_ -= 2;
1090 : }
1091 0 : }
1092 :
1093 : inline void
1094 HIT 1735 : epoll_scheduler::wake_one_thread_and_unlock(
1095 : std::unique_lock<std::mutex>& lock) const
1096 : {
1097 1735 : if (maybe_unlock_and_signal_one(lock))
1098 MIS 0 : return;
1099 :
1100 HIT 1735 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1101 : {
1102 26 : task_interrupted_ = true;
1103 26 : lock.unlock();
1104 26 : interrupt_reactor();
1105 : }
1106 : else
1107 : {
1108 1709 : lock.unlock();
1109 : }
1110 : }
1111 :
1112 105531 : inline epoll_scheduler::work_cleanup::~work_cleanup()
1113 : {
1114 105531 : if (ctx)
1115 : {
1116 105531 : long produced = ctx->private_outstanding_work;
1117 105531 : if (produced > 1)
1118 8 : scheduler->outstanding_work_.fetch_add(
1119 : produced - 1, std::memory_order_relaxed);
1120 105523 : else if (produced < 1)
1121 14224 : scheduler->work_finished();
1122 105531 : ctx->private_outstanding_work = 0;
1123 :
1124 105531 : if (!ctx->private_queue.empty())
1125 : {
1126 55626 : lock->lock();
1127 55626 : scheduler->completed_ops_.splice(ctx->private_queue);
1128 : }
1129 : }
1130 : else
1131 : {
1132 MIS 0 : scheduler->work_finished();
1133 : }
1134 HIT 105531 : }
1135 :
1136 74332 : inline epoll_scheduler::task_cleanup::~task_cleanup()
1137 : {
1138 37166 : if (!ctx)
1139 MIS 0 : return;
1140 :
1141 HIT 37166 : if (ctx->private_outstanding_work > 0)
1142 : {
1143 4295 : scheduler->outstanding_work_.fetch_add(
1144 4295 : ctx->private_outstanding_work, std::memory_order_relaxed);
1145 4295 : ctx->private_outstanding_work = 0;
1146 : }
1147 :
1148 37166 : if (!ctx->private_queue.empty())
1149 : {
1150 4295 : if (!lock->owns_lock())
1151 MIS 0 : lock->lock();
1152 HIT 4295 : scheduler->completed_ops_.splice(ctx->private_queue);
1153 : }
1154 37166 : }
1155 :
1156 : inline void
1157 8587 : epoll_scheduler::update_timerfd() const
1158 : {
1159 8587 : auto nearest = timer_svc_->nearest_expiry();
1160 :
1161 8587 : itimerspec ts{};
1162 8587 : int flags = 0;
1163 :
1164 8587 : if (nearest == timer_service::time_point::max())
1165 : {
1166 : // No timers - disarm by setting to 0 (relative)
1167 : }
1168 : else
1169 : {
1170 8542 : auto now = std::chrono::steady_clock::now();
1171 8542 : if (nearest <= now)
1172 : {
1173 : // Use 1ns instead of 0 - zero disarms the timerfd
1174 237 : ts.it_value.tv_nsec = 1;
1175 : }
1176 : else
1177 : {
1178 8305 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1179 8305 : nearest - now)
1180 8305 : .count();
1181 8305 : ts.it_value.tv_sec = nsec / 1000000000;
1182 8305 : ts.it_value.tv_nsec = nsec % 1000000000;
1183 : // Ensure non-zero to avoid disarming if duration rounds to 0
1184 8305 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1185 MIS 0 : ts.it_value.tv_nsec = 1;
1186 : }
1187 : }
1188 :
1189 HIT 8587 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1190 MIS 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
1191 HIT 8587 : }
1192 :
1193 : inline void
1194 37166 : epoll_scheduler::run_task(
1195 : std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1196 : {
1197 37166 : int timeout_ms = task_interrupted_ ? 0 : -1;
1198 :
1199 37166 : if (lock.owns_lock())
1200 8420 : lock.unlock();
1201 :
1202 37166 : task_cleanup on_exit{this, &lock, ctx};
1203 :
1204 : // Flush deferred timerfd programming before blocking
1205 37166 : if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1206 4292 : update_timerfd();
1207 :
1208 : // Event loop runs without mutex held
1209 : epoll_event events[128];
1210 37166 : int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1211 :
1212 37166 : if (nfds < 0 && errno != EINTR)
1213 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_wait");
1214 :
1215 HIT 37166 : bool check_timers = false;
1216 37166 : op_queue local_ops;
1217 :
1218 : // Process events without holding the mutex
1219 85429 : for (int i = 0; i < nfds; ++i)
1220 : {
1221 48263 : if (events[i].data.ptr == nullptr)
1222 : {
1223 : std::uint64_t val;
1224 : // Mutex released above; analyzer can't track unlock via ref
1225 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1226 72 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1227 72 : eventfd_armed_.store(false, std::memory_order_relaxed);
1228 72 : continue;
1229 72 : }
1230 :
1231 48191 : if (events[i].data.ptr == &timer_fd_)
1232 : {
1233 : std::uint64_t expirations;
1234 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1235 : [[maybe_unused]] auto r =
1236 4295 : ::read(timer_fd_, &expirations, sizeof(expirations));
1237 4295 : check_timers = true;
1238 4295 : continue;
1239 4295 : }
1240 :
1241 : // Deferred I/O: just set ready events and enqueue descriptor
1242 : // No per-descriptor mutex locking in reactor hot path!
1243 43896 : auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1244 43896 : desc->add_ready_events(events[i].events);
1245 :
1246 : // Only enqueue if not already enqueued
1247 43896 : bool expected = false;
1248 43896 : if (desc->is_enqueued_.compare_exchange_strong(
1249 : expected, true, std::memory_order_release,
1250 : std::memory_order_relaxed))
1251 : {
1252 43896 : local_ops.push(desc);
1253 : }
1254 : }
1255 :
1256 : // Process timers only when timerfd fires
1257 37166 : if (check_timers)
1258 : {
1259 4295 : timer_svc_->process_expired();
1260 4295 : update_timerfd();
1261 : }
1262 :
1263 37166 : lock.lock();
1264 :
1265 37166 : if (!local_ops.empty())
1266 28214 : completed_ops_.splice(local_ops);
1267 37166 : }
1268 :
1269 : inline std::size_t
1270 105717 : epoll_scheduler::do_one(
1271 : std::unique_lock<std::mutex>& lock,
1272 : long timeout_us,
1273 : epoll::scheduler_context* ctx)
1274 : {
1275 : for (;;)
1276 : {
1277 142883 : if (stopped_)
1278 183 : return 0;
1279 :
1280 142700 : scheduler_op* op = completed_ops_.pop();
1281 :
1282 : // Handle reactor sentinel - time to poll for I/O
1283 142700 : if (op == &task_op_)
1284 : {
1285 37168 : bool more_handlers = !completed_ops_.empty();
1286 :
1287 : // Nothing to run the reactor for: no pending work to wait on,
1288 : // or caller requested a non-blocking poll
1289 45590 : if (!more_handlers &&
1290 16844 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1291 : timeout_us == 0))
1292 : {
1293 2 : completed_ops_.push(&task_op_);
1294 2 : return 0;
1295 : }
1296 :
1297 37166 : task_interrupted_ = more_handlers || timeout_us == 0;
1298 37166 : task_running_.store(true, std::memory_order_release);
1299 :
1300 37166 : if (more_handlers)
1301 28746 : unlock_and_signal_one(lock);
1302 :
1303 37166 : run_task(lock, ctx);
1304 :
1305 37166 : task_running_.store(false, std::memory_order_relaxed);
1306 37166 : completed_ops_.push(&task_op_);
1307 37166 : continue;
1308 37166 : }
1309 :
1310 : // Handle operation
1311 105532 : if (op != nullptr)
1312 : {
1313 105531 : bool more = !completed_ops_.empty();
1314 :
1315 105531 : if (more)
1316 105531 : ctx->unassisted = !unlock_and_signal_one(lock);
1317 : else
1318 : {
1319 MIS 0 : ctx->unassisted = false;
1320 0 : lock.unlock();
1321 : }
1322 :
1323 HIT 105531 : work_cleanup on_exit{this, &lock, ctx};
1324 :
1325 105531 : (*op)();
1326 105531 : return 1;
1327 105531 : }
1328 :
1329 : // No pending work to wait on, or caller requested non-blocking poll
1330 2 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1331 : timeout_us == 0)
1332 1 : return 0;
1333 :
1334 MIS 0 : clear_signal();
1335 0 : if (timeout_us < 0)
1336 0 : wait_for_signal(lock);
1337 : else
1338 0 : wait_for_signal_for(lock, timeout_us);
1339 HIT 37166 : }
1340 : }
1341 :
1342 : } // namespace boost::corosio::detail
1343 :
1344 : #endif // BOOST_COROSIO_HAS_EPOLL
1345 :
1346 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
|