1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/native/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/native/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/make_err.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
27  
#include <boost/corosio/detail/except.hpp>
27  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/capy/buffers.hpp>
28  
#include <boost/capy/buffers.hpp>
29  

29  

30  
#include <coroutine>
30  
#include <coroutine>
31  
#include <mutex>
31  
#include <mutex>
32  
#include <unordered_map>
32  
#include <unordered_map>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <netinet/in.h>
36  
#include <netinet/in.h>
37  
#include <netinet/tcp.h>
37  
#include <netinet/tcp.h>
38  
#include <sys/epoll.h>
38  
#include <sys/epoll.h>
39  
#include <sys/socket.h>
39  
#include <sys/socket.h>
40  
#include <unistd.h>
40  
#include <unistd.h>
41  

41  

42  
/*
42  
/*
43  
    epoll Socket Implementation
43  
    epoll Socket Implementation
44  
    ===========================
44  
    ===========================
45  

45  

46  
    Each I/O operation follows the same pattern:
46  
    Each I/O operation follows the same pattern:
47  
      1. Try the syscall immediately (non-blocking socket)
47  
      1. Try the syscall immediately (non-blocking socket)
48  
      2. If it succeeds or fails with a real error, post to completion queue
48  
      2. If it succeeds or fails with a real error, post to completion queue
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50  

50  

51  
    This "try first" approach avoids unnecessary epoll round-trips for
51  
    This "try first" approach avoids unnecessary epoll round-trips for
52  
    operations that can complete immediately (common for small reads/writes
52  
    operations that can complete immediately (common for small reads/writes
53  
    on fast local connections).
53  
    on fast local connections).
54  

54  

55  
    One-Shot Registration
55  
    One-Shot Registration
56  
    ---------------------
56  
    ---------------------
57  
    We use one-shot epoll registration: each operation registers, waits for
57  
    We use one-shot epoll registration: each operation registers, waits for
58  
    one event, then unregisters. This simplifies the state machine since we
58  
    one event, then unregisters. This simplifies the state machine since we
59  
    don't need to track whether an fd is currently registered or handle
59  
    don't need to track whether an fd is currently registered or handle
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61  
    simplicity is worth it.
61  
    simplicity is worth it.
62  

62  

63  
    Cancellation
63  
    Cancellation
64  
    ------------
64  
    ------------
65  
    See op.hpp for the completion/cancellation race handling via the
65  
    See op.hpp for the completion/cancellation race handling via the
66  
    `registered` atomic. cancel() must complete pending operations (post
66  
    `registered` atomic. cancel() must complete pending operations (post
67  
    them with cancelled flag) so coroutines waiting on them can resume.
67  
    them with cancelled flag) so coroutines waiting on them can resume.
68  
    close_socket() calls cancel() first to ensure this.
68  
    close_socket() calls cancel() first to ensure this.
69  

69  

70  
    Impl Lifetime with shared_ptr
70  
    Impl Lifetime with shared_ptr
71  
    -----------------------------
71  
    -----------------------------
72  
    Socket impls use enable_shared_from_this. The service owns impls via
72  
    Socket impls use enable_shared_from_this. The service owns impls via
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74  
    removal. When a user calls close(), we call cancel() which posts pending
74  
    removal. When a user calls close(), we call cancel() which posts pending
75  
    ops to the scheduler.
75  
    ops to the scheduler.
76  

76  

77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
81  
    to be destroyed if no other references exist.
81  
    to be destroyed if no other references exist.
82  

82  

83  
    Service Ownership
83  
    Service Ownership
84  
    -----------------
84  
    -----------------
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
86  
    shared_ptr from the map, but the impl may survive if ops still hold
86  
    shared_ptr from the map, but the impl may survive if ops still hold
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
88  
    in-flight ops will complete and release their refs.
88  
    in-flight ops will complete and release their refs.
89  
*/
89  
*/
90  

90  

91  
namespace boost::corosio::detail {
91  
namespace boost::corosio::detail {
92  

92  

93  
/** State for epoll socket service. */
93  
/** State for epoll socket service. */
94  
class epoll_socket_state
94  
class epoll_socket_state
95  
{
95  
{
96  
public:
96  
public:
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98  
    {
98  
    {
99  
    }
99  
    }
100  

100  

101  
    epoll_scheduler& sched_;
101  
    epoll_scheduler& sched_;
102  
    std::mutex mutex_;
102  
    std::mutex mutex_;
103  
    intrusive_list<epoll_socket> socket_list_;
103  
    intrusive_list<epoll_socket> socket_list_;
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105  
        socket_ptrs_;
105  
        socket_ptrs_;
106  
};
106  
};
107  

107  

108  
/** epoll socket service implementation.
108  
/** epoll socket service implementation.
109  

109  

110  
    Inherits from socket_service to enable runtime polymorphism.
110  
    Inherits from socket_service to enable runtime polymorphism.
111  
    Uses key_type = socket_service for service lookup.
111  
    Uses key_type = socket_service for service lookup.
112  
*/
112  
*/
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114  
{
114  
{
115  
public:
115  
public:
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
117  
    ~epoll_socket_service() override;
117  
    ~epoll_socket_service() override;
118  

118  

119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121  

121  

122  
    void shutdown() override;
122  
    void shutdown() override;
123  

123  

124  
    io_object::implementation* construct() override;
124  
    io_object::implementation* construct() override;
125  
    void destroy(io_object::implementation*) override;
125  
    void destroy(io_object::implementation*) override;
126  
    void close(io_object::handle&) override;
126  
    void close(io_object::handle&) override;
127  
    std::error_code open_socket(
127  
    std::error_code open_socket(
128  
        tcp_socket::implementation& impl,
128  
        tcp_socket::implementation& impl,
129  
        int family,
129  
        int family,
130  
        int type,
130  
        int type,
131  
        int protocol) override;
131  
        int protocol) override;
132  

132  

133  
    epoll_scheduler& scheduler() const noexcept
133  
    epoll_scheduler& scheduler() const noexcept
134  
    {
134  
    {
135  
        return state_->sched_;
135  
        return state_->sched_;
136  
    }
136  
    }
137  
    void post(epoll_op* op);
137  
    void post(epoll_op* op);
138  
    void work_started() noexcept;
138  
    void work_started() noexcept;
139  
    void work_finished() noexcept;
139  
    void work_finished() noexcept;
140  

140  

141  
private:
141  
private:
142  
    std::unique_ptr<epoll_socket_state> state_;
142  
    std::unique_ptr<epoll_socket_state> state_;
143  
};
143  
};
144  

144  

145  
//--------------------------------------------------------------------------
145  
//--------------------------------------------------------------------------
146  
//
146  
//
147  
// Implementation
147  
// Implementation
148  
//
148  
//
149  
//--------------------------------------------------------------------------
149  
//--------------------------------------------------------------------------
150  

150  

151  
// Register an op with the reactor, handling cached edge events.
151  
// Register an op with the reactor, handling cached edge events.
152  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
152  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
153  
inline void
153  
inline void
154  
epoll_socket::register_op(
154  
epoll_socket::register_op(
155  
    epoll_op& op,
155  
    epoll_op& op,
156  
    epoll_op*& desc_slot,
156  
    epoll_op*& desc_slot,
157  
    bool& ready_flag,
157  
    bool& ready_flag,
158  
    bool& cancel_flag) noexcept
158  
    bool& cancel_flag) noexcept
159  
{
159  
{
160  
    svc_.work_started();
160  
    svc_.work_started();
161  

161  

162  
    std::lock_guard lock(desc_state_.mutex);
162  
    std::lock_guard lock(desc_state_.mutex);
163  
    bool io_done = false;
163  
    bool io_done = false;
164  
    if (ready_flag)
164  
    if (ready_flag)
165  
    {
165  
    {
166  
        ready_flag = false;
166  
        ready_flag = false;
167  
        op.perform_io();
167  
        op.perform_io();
168  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
168  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
169  
        if (!io_done)
169  
        if (!io_done)
170  
            op.errn = 0;
170  
            op.errn = 0;
171  
    }
171  
    }
172  

172  

173  
    if (cancel_flag)
173  
    if (cancel_flag)
174  
    {
174  
    {
175  
        cancel_flag = false;
175  
        cancel_flag = false;
176  
        op.cancelled.store(true, std::memory_order_relaxed);
176  
        op.cancelled.store(true, std::memory_order_relaxed);
177  
    }
177  
    }
178  

178  

179  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
179  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
180  
    {
180  
    {
181  
        svc_.post(&op);
181  
        svc_.post(&op);
182  
        svc_.work_finished();
182  
        svc_.work_finished();
183  
    }
183  
    }
184  
    else
184  
    else
185  
    {
185  
    {
186  
        desc_slot = &op;
186  
        desc_slot = &op;
187  
    }
187  
    }
188  
}
188  
}
189  

189  

190  
inline void
190  
inline void
191  
epoll_op::canceller::operator()() const noexcept
191  
epoll_op::canceller::operator()() const noexcept
192  
{
192  
{
193  
    op->cancel();
193  
    op->cancel();
194  
}
194  
}
195  

195  

196  
inline void
196  
inline void
197  
epoll_connect_op::cancel() noexcept
197  
epoll_connect_op::cancel() noexcept
198  
{
198  
{
199  
    if (socket_impl_)
199  
    if (socket_impl_)
200  
        socket_impl_->cancel_single_op(*this);
200  
        socket_impl_->cancel_single_op(*this);
201  
    else
201  
    else
202  
        request_cancel();
202  
        request_cancel();
203  
}
203  
}
204  

204  

205  
inline void
205  
inline void
206  
epoll_read_op::cancel() noexcept
206  
epoll_read_op::cancel() noexcept
207  
{
207  
{
208  
    if (socket_impl_)
208  
    if (socket_impl_)
209  
        socket_impl_->cancel_single_op(*this);
209  
        socket_impl_->cancel_single_op(*this);
210  
    else
210  
    else
211  
        request_cancel();
211  
        request_cancel();
212  
}
212  
}
213  

213  

214  
inline void
214  
inline void
215  
epoll_write_op::cancel() noexcept
215  
epoll_write_op::cancel() noexcept
216  
{
216  
{
217  
    if (socket_impl_)
217  
    if (socket_impl_)
218  
        socket_impl_->cancel_single_op(*this);
218  
        socket_impl_->cancel_single_op(*this);
219  
    else
219  
    else
220  
        request_cancel();
220  
        request_cancel();
221  
}
221  
}
222  

222  

223  
inline void
223  
inline void
224  
epoll_op::operator()()
224  
epoll_op::operator()()
225  
{
225  
{
226  
    stop_cb.reset();
226  
    stop_cb.reset();
227  

227  

228  
    socket_impl_->svc_.scheduler().reset_inline_budget();
228  
    socket_impl_->svc_.scheduler().reset_inline_budget();
229  

229  

230  
    if (cancelled.load(std::memory_order_acquire))
230  
    if (cancelled.load(std::memory_order_acquire))
231  
        *ec_out = capy::error::canceled;
231  
        *ec_out = capy::error::canceled;
232  
    else if (errn != 0)
232  
    else if (errn != 0)
233  
        *ec_out = make_err(errn);
233  
        *ec_out = make_err(errn);
234  
    else if (is_read_operation() && bytes_transferred == 0)
234  
    else if (is_read_operation() && bytes_transferred == 0)
235  
        *ec_out = capy::error::eof;
235  
        *ec_out = capy::error::eof;
236  
    else
236  
    else
237  
        *ec_out = {};
237  
        *ec_out = {};
238  

238  

239  
    *bytes_out = bytes_transferred;
239  
    *bytes_out = bytes_transferred;
240  

240  

241  
    // Move to stack before resuming coroutine. The coroutine might close
241  
    // Move to stack before resuming coroutine. The coroutine might close
242  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
242  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
243  
    // last ref and we destroyed it while still in operator(), we'd have
243  
    // last ref and we destroyed it while still in operator(), we'd have
244  
    // use-after-free. Moving to local ensures destruction happens at
244  
    // use-after-free. Moving to local ensures destruction happens at
245  
    // function exit, after all member accesses are complete.
245  
    // function exit, after all member accesses are complete.
246  
    capy::executor_ref saved_ex(ex);
246  
    capy::executor_ref saved_ex(ex);
247  
    std::coroutine_handle<> saved_h(h);
247  
    std::coroutine_handle<> saved_h(h);
248  
    auto prevent_premature_destruction = std::move(impl_ptr);
248  
    auto prevent_premature_destruction = std::move(impl_ptr);
249  
    dispatch_coro(saved_ex, saved_h).resume();
249  
    dispatch_coro(saved_ex, saved_h).resume();
250  
}
250  
}
251  

251  

252  
inline void
252  
inline void
253  
epoll_connect_op::operator()()
253  
epoll_connect_op::operator()()
254  
{
254  
{
255  
    stop_cb.reset();
255  
    stop_cb.reset();
256  

256  

257  
    socket_impl_->svc_.scheduler().reset_inline_budget();
257  
    socket_impl_->svc_.scheduler().reset_inline_budget();
258  

258  

259  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
259  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
260  

260  

261  
    // Cache endpoints on successful connect
261  
    // Cache endpoints on successful connect
262  
    if (success && socket_impl_)
262  
    if (success && socket_impl_)
263  
    {
263  
    {
264  
        endpoint local_ep;
264  
        endpoint local_ep;
265  
        sockaddr_storage local_storage{};
265  
        sockaddr_storage local_storage{};
266  
        socklen_t local_len = sizeof(local_storage);
266  
        socklen_t local_len = sizeof(local_storage);
267  
        if (::getsockname(
267  
        if (::getsockname(
268  
                fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
268  
                fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
269  
            0)
269  
            0)
270  
            local_ep = from_sockaddr(local_storage);
270  
            local_ep = from_sockaddr(local_storage);
271  
        static_cast<epoll_socket*>(socket_impl_)
271  
        static_cast<epoll_socket*>(socket_impl_)
272  
            ->set_endpoints(local_ep, target_endpoint);
272  
            ->set_endpoints(local_ep, target_endpoint);
273  
    }
273  
    }
274  

274  

275  
    if (cancelled.load(std::memory_order_acquire))
275  
    if (cancelled.load(std::memory_order_acquire))
276  
        *ec_out = capy::error::canceled;
276  
        *ec_out = capy::error::canceled;
277  
    else if (errn != 0)
277  
    else if (errn != 0)
278  
        *ec_out = make_err(errn);
278  
        *ec_out = make_err(errn);
279  
    else
279  
    else
280  
        *ec_out = {};
280  
        *ec_out = {};
281  

281  

282  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
282  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
283  
    capy::executor_ref saved_ex(ex);
283  
    capy::executor_ref saved_ex(ex);
284  
    std::coroutine_handle<> saved_h(h);
284  
    std::coroutine_handle<> saved_h(h);
285  
    auto prevent_premature_destruction = std::move(impl_ptr);
285  
    auto prevent_premature_destruction = std::move(impl_ptr);
286  
    dispatch_coro(saved_ex, saved_h).resume();
286  
    dispatch_coro(saved_ex, saved_h).resume();
287  
}
287  
}
288  

288  

289  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
289  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
290  
    : svc_(svc)
290  
    : svc_(svc)
291  
{
291  
{
292  
}
292  
}
293  

293  

294  
inline epoll_socket::~epoll_socket() = default;
294  
inline epoll_socket::~epoll_socket() = default;
295  

295  

296  
inline std::coroutine_handle<>
296  
inline std::coroutine_handle<>
297  
epoll_socket::connect(
297  
epoll_socket::connect(
298  
    std::coroutine_handle<> h,
298  
    std::coroutine_handle<> h,
299  
    capy::executor_ref ex,
299  
    capy::executor_ref ex,
300  
    endpoint ep,
300  
    endpoint ep,
301  
    std::stop_token token,
301  
    std::stop_token token,
302  
    std::error_code* ec)
302  
    std::error_code* ec)
303  
{
303  
{
304  
    auto& op = conn_;
304  
    auto& op = conn_;
305  

305  

306  
    sockaddr_storage storage{};
306  
    sockaddr_storage storage{};
307  
    socklen_t addrlen =
307  
    socklen_t addrlen =
308  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
308  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
309  
    int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
309  
    int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
310  

310  

311  
    if (result == 0)
311  
    if (result == 0)
312  
    {
312  
    {
313  
        sockaddr_storage local_storage{};
313  
        sockaddr_storage local_storage{};
314  
        socklen_t local_len = sizeof(local_storage);
314  
        socklen_t local_len = sizeof(local_storage);
315  
        if (::getsockname(
315  
        if (::getsockname(
316  
                fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
316  
                fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
317  
            0)
317  
            0)
318  
            local_endpoint_ = detail::from_sockaddr(local_storage);
318  
            local_endpoint_ = detail::from_sockaddr(local_storage);
319  
        remote_endpoint_ = ep;
319  
        remote_endpoint_ = ep;
320  
    }
320  
    }
321  

321  

322  
    if (result == 0 || errno != EINPROGRESS)
322  
    if (result == 0 || errno != EINPROGRESS)
323  
    {
323  
    {
324  
        int err = (result < 0) ? errno : 0;
324  
        int err = (result < 0) ? errno : 0;
325  
        if (svc_.scheduler().try_consume_inline_budget())
325  
        if (svc_.scheduler().try_consume_inline_budget())
326  
        {
326  
        {
327  
            *ec = err ? make_err(err) : std::error_code{};
327  
            *ec = err ? make_err(err) : std::error_code{};
328  
            return dispatch_coro(ex, h);
328  
            return dispatch_coro(ex, h);
329  
        }
329  
        }
330  
        op.reset();
330  
        op.reset();
331  
        op.h               = h;
331  
        op.h               = h;
332  
        op.ex              = ex;
332  
        op.ex              = ex;
333  
        op.ec_out          = ec;
333  
        op.ec_out          = ec;
334  
        op.fd              = fd_;
334  
        op.fd              = fd_;
335  
        op.target_endpoint = ep;
335  
        op.target_endpoint = ep;
336  
        op.start(token, this);
336  
        op.start(token, this);
337  
        op.impl_ptr = shared_from_this();
337  
        op.impl_ptr = shared_from_this();
338  
        op.complete(err, 0);
338  
        op.complete(err, 0);
339  
        svc_.post(&op);
339  
        svc_.post(&op);
340  
        return std::noop_coroutine();
340  
        return std::noop_coroutine();
341  
    }
341  
    }
342  

342  

343  
    // EINPROGRESS — register with reactor
343  
    // EINPROGRESS — register with reactor
 
344 +
    svc_.scheduler().ensure_write_events(fd_, &desc_state_);
 
345 +

344  
    op.reset();
346  
    op.reset();
345  
    op.h               = h;
347  
    op.h               = h;
346  
    op.ex              = ex;
348  
    op.ex              = ex;
347  
    op.ec_out          = ec;
349  
    op.ec_out          = ec;
348  
    op.fd              = fd_;
350  
    op.fd              = fd_;
349  
    op.target_endpoint = ep;
351  
    op.target_endpoint = ep;
350  
    op.start(token, this);
352  
    op.start(token, this);
351  
    op.impl_ptr = shared_from_this();
353  
    op.impl_ptr = shared_from_this();
352  

354  

353  
    register_op(
355  
    register_op(
354  
        op, desc_state_.connect_op, desc_state_.write_ready,
356  
        op, desc_state_.connect_op, desc_state_.write_ready,
355  
        desc_state_.connect_cancel_pending);
357  
        desc_state_.connect_cancel_pending);
356  
    return std::noop_coroutine();
358  
    return std::noop_coroutine();
357  
}
359  
}
358  

360  

359  
inline std::coroutine_handle<>
361  
inline std::coroutine_handle<>
360  
epoll_socket::read_some(
362  
epoll_socket::read_some(
361  
    std::coroutine_handle<> h,
363  
    std::coroutine_handle<> h,
362  
    capy::executor_ref ex,
364  
    capy::executor_ref ex,
363  
    buffer_param param,
365  
    buffer_param param,
364  
    std::stop_token token,
366  
    std::stop_token token,
365  
    std::error_code* ec,
367  
    std::error_code* ec,
366  
    std::size_t* bytes_out)
368  
    std::size_t* bytes_out)
367  
{
369  
{
368  
    auto& op = rd_;
370  
    auto& op = rd_;
369  
    op.reset();
371  
    op.reset();
370  

372  

371  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
373  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
372  
    op.iovec_count =
374  
    op.iovec_count =
373  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
375  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
374  

376  

375  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
377  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
376  
    {
378  
    {
377  
        op.empty_buffer_read = true;
379  
        op.empty_buffer_read = true;
378  
        op.h                 = h;
380  
        op.h                 = h;
379  
        op.ex                = ex;
381  
        op.ex                = ex;
380  
        op.ec_out            = ec;
382  
        op.ec_out            = ec;
381  
        op.bytes_out         = bytes_out;
383  
        op.bytes_out         = bytes_out;
382  
        op.start(token, this);
384  
        op.start(token, this);
383  
        op.impl_ptr = shared_from_this();
385  
        op.impl_ptr = shared_from_this();
384  
        op.complete(0, 0);
386  
        op.complete(0, 0);
385  
        svc_.post(&op);
387  
        svc_.post(&op);
386  
        return std::noop_coroutine();
388  
        return std::noop_coroutine();
387  
    }
389  
    }
388  

390  

389  
    for (int i = 0; i < op.iovec_count; ++i)
391  
    for (int i = 0; i < op.iovec_count; ++i)
390  
    {
392  
    {
391  
        op.iovecs[i].iov_base = bufs[i].data();
393  
        op.iovecs[i].iov_base = bufs[i].data();
392  
        op.iovecs[i].iov_len  = bufs[i].size();
394  
        op.iovecs[i].iov_len  = bufs[i].size();
393  
    }
395  
    }
394  

396  

395  
    // Speculative read
397  
    // Speculative read
396  
    ssize_t n;
398  
    ssize_t n;
397  
    do
399  
    do
398  
    {
400  
    {
399  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
401  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
400  
    }
402  
    }
401  
    while (n < 0 && errno == EINTR);
403  
    while (n < 0 && errno == EINTR);
402  

404  

403  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
405  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
404  
    {
406  
    {
405  
        int err    = (n < 0) ? errno : 0;
407  
        int err    = (n < 0) ? errno : 0;
406  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
408  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
407  

409  

408  
        if (svc_.scheduler().try_consume_inline_budget())
410  
        if (svc_.scheduler().try_consume_inline_budget())
409  
        {
411  
        {
410  
            if (err)
412  
            if (err)
411  
                *ec = make_err(err);
413  
                *ec = make_err(err);
412  
            else if (n == 0)
414  
            else if (n == 0)
413  
                *ec = capy::error::eof;
415  
                *ec = capy::error::eof;
414  
            else
416  
            else
415  
                *ec = {};
417  
                *ec = {};
416  
            *bytes_out = bytes;
418  
            *bytes_out = bytes;
417  
            return dispatch_coro(ex, h);
419  
            return dispatch_coro(ex, h);
418  
        }
420  
        }
419  
        op.h         = h;
421  
        op.h         = h;
420  
        op.ex        = ex;
422  
        op.ex        = ex;
421  
        op.ec_out    = ec;
423  
        op.ec_out    = ec;
422  
        op.bytes_out = bytes_out;
424  
        op.bytes_out = bytes_out;
423  
        op.start(token, this);
425  
        op.start(token, this);
424  
        op.impl_ptr = shared_from_this();
426  
        op.impl_ptr = shared_from_this();
425  
        op.complete(err, bytes);
427  
        op.complete(err, bytes);
426  
        svc_.post(&op);
428  
        svc_.post(&op);
427  
        return std::noop_coroutine();
429  
        return std::noop_coroutine();
428  
    }
430  
    }
429  

431  

430  
    // EAGAIN — register with reactor
432  
    // EAGAIN — register with reactor
431  
    op.h         = h;
433  
    op.h         = h;
432  
    op.ex        = ex;
434  
    op.ex        = ex;
433  
    op.ec_out    = ec;
435  
    op.ec_out    = ec;
434  
    op.bytes_out = bytes_out;
436  
    op.bytes_out = bytes_out;
435  
    op.fd        = fd_;
437  
    op.fd        = fd_;
436  
    op.start(token, this);
438  
    op.start(token, this);
437  
    op.impl_ptr = shared_from_this();
439  
    op.impl_ptr = shared_from_this();
438  

440  

439  
    register_op(
441  
    register_op(
440  
        op, desc_state_.read_op, desc_state_.read_ready,
442  
        op, desc_state_.read_op, desc_state_.read_ready,
441  
        desc_state_.read_cancel_pending);
443  
        desc_state_.read_cancel_pending);
442  
    return std::noop_coroutine();
444  
    return std::noop_coroutine();
443  
}
445  
}
444  

446  

445  
inline std::coroutine_handle<>
447  
inline std::coroutine_handle<>
446  
epoll_socket::write_some(
448  
epoll_socket::write_some(
447  
    std::coroutine_handle<> h,
449  
    std::coroutine_handle<> h,
448  
    capy::executor_ref ex,
450  
    capy::executor_ref ex,
449  
    buffer_param param,
451  
    buffer_param param,
450  
    std::stop_token token,
452  
    std::stop_token token,
451  
    std::error_code* ec,
453  
    std::error_code* ec,
452  
    std::size_t* bytes_out)
454  
    std::size_t* bytes_out)
453  
{
455  
{
454  
    auto& op = wr_;
456  
    auto& op = wr_;
455  
    op.reset();
457  
    op.reset();
456  

458  

457  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
459  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
458  
    op.iovec_count =
460  
    op.iovec_count =
459  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
461  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
460  

462  

461  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
463  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
462  
    {
464  
    {
463  
        op.h         = h;
465  
        op.h         = h;
464  
        op.ex        = ex;
466  
        op.ex        = ex;
465  
        op.ec_out    = ec;
467  
        op.ec_out    = ec;
466  
        op.bytes_out = bytes_out;
468  
        op.bytes_out = bytes_out;
467  
        op.start(token, this);
469  
        op.start(token, this);
468  
        op.impl_ptr = shared_from_this();
470  
        op.impl_ptr = shared_from_this();
469  
        op.complete(0, 0);
471  
        op.complete(0, 0);
470  
        svc_.post(&op);
472  
        svc_.post(&op);
471  
        return std::noop_coroutine();
473  
        return std::noop_coroutine();
472  
    }
474  
    }
473  

475  

474  
    for (int i = 0; i < op.iovec_count; ++i)
476  
    for (int i = 0; i < op.iovec_count; ++i)
475  
    {
477  
    {
476  
        op.iovecs[i].iov_base = bufs[i].data();
478  
        op.iovecs[i].iov_base = bufs[i].data();
477  
        op.iovecs[i].iov_len  = bufs[i].size();
479  
        op.iovecs[i].iov_len  = bufs[i].size();
478  
    }
480  
    }
479  

481  

480  
    // Speculative write
482  
    // Speculative write
481  
    msghdr msg{};
483  
    msghdr msg{};
482  
    msg.msg_iov    = op.iovecs;
484  
    msg.msg_iov    = op.iovecs;
483  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
485  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
484  

486  

485  
    ssize_t n;
487  
    ssize_t n;
486  
    do
488  
    do
487  
    {
489  
    {
488  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
490  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
489  
    }
491  
    }
490  
    while (n < 0 && errno == EINTR);
492  
    while (n < 0 && errno == EINTR);
491  

493  

492  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
494  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
493  
    {
495  
    {
494  
        int err    = (n < 0) ? errno : 0;
496  
        int err    = (n < 0) ? errno : 0;
495  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
497  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
496  

498  

497  
        if (svc_.scheduler().try_consume_inline_budget())
499  
        if (svc_.scheduler().try_consume_inline_budget())
498  
        {
500  
        {
499  
            *ec        = err ? make_err(err) : std::error_code{};
501  
            *ec        = err ? make_err(err) : std::error_code{};
500  
            *bytes_out = bytes;
502  
            *bytes_out = bytes;
501  
            return dispatch_coro(ex, h);
503  
            return dispatch_coro(ex, h);
502  
        }
504  
        }
503  
        op.h         = h;
505  
        op.h         = h;
504  
        op.ex        = ex;
506  
        op.ex        = ex;
505  
        op.ec_out    = ec;
507  
        op.ec_out    = ec;
506  
        op.bytes_out = bytes_out;
508  
        op.bytes_out = bytes_out;
507  
        op.start(token, this);
509  
        op.start(token, this);
508  
        op.impl_ptr = shared_from_this();
510  
        op.impl_ptr = shared_from_this();
509  
        op.complete(err, bytes);
511  
        op.complete(err, bytes);
510  
        svc_.post(&op);
512  
        svc_.post(&op);
511  
        return std::noop_coroutine();
513  
        return std::noop_coroutine();
512  
    }
514  
    }
513  

515  

514  
    // EAGAIN — register with reactor
516  
    // EAGAIN — register with reactor
 
517 +
    svc_.scheduler().ensure_write_events(fd_, &desc_state_);
 
518 +

515  
    op.h         = h;
519  
    op.h         = h;
516  
    op.ex        = ex;
520  
    op.ex        = ex;
517  
    op.ec_out    = ec;
521  
    op.ec_out    = ec;
518  
    op.bytes_out = bytes_out;
522  
    op.bytes_out = bytes_out;
519  
    op.fd        = fd_;
523  
    op.fd        = fd_;
520  
    op.start(token, this);
524  
    op.start(token, this);
521  
    op.impl_ptr = shared_from_this();
525  
    op.impl_ptr = shared_from_this();
522  

526  

523  
    register_op(
527  
    register_op(
524  
        op, desc_state_.write_op, desc_state_.write_ready,
528  
        op, desc_state_.write_op, desc_state_.write_ready,
525  
        desc_state_.write_cancel_pending);
529  
        desc_state_.write_cancel_pending);
526  
    return std::noop_coroutine();
530  
    return std::noop_coroutine();
527  
}
531  
}
528  

532  

529  
inline std::error_code
533  
inline std::error_code
530  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
534  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
531  
{
535  
{
532  
    int how;
536  
    int how;
533  
    switch (what)
537  
    switch (what)
534  
    {
538  
    {
535  
    case tcp_socket::shutdown_receive:
539  
    case tcp_socket::shutdown_receive:
536  
        how = SHUT_RD;
540  
        how = SHUT_RD;
537  
        break;
541  
        break;
538  
    case tcp_socket::shutdown_send:
542  
    case tcp_socket::shutdown_send:
539  
        how = SHUT_WR;
543  
        how = SHUT_WR;
540  
        break;
544  
        break;
541  
    case tcp_socket::shutdown_both:
545  
    case tcp_socket::shutdown_both:
542  
        how = SHUT_RDWR;
546  
        how = SHUT_RDWR;
543  
        break;
547  
        break;
544  
    default:
548  
    default:
545  
        return make_err(EINVAL);
549  
        return make_err(EINVAL);
546  
    }
550  
    }
547  
    if (::shutdown(fd_, how) != 0)
551  
    if (::shutdown(fd_, how) != 0)
548  
        return make_err(errno);
552  
        return make_err(errno);
549  
    return {};
553  
    return {};
550  
}
554  
}
551  

555  

552  
inline std::error_code
556  
inline std::error_code
553  
epoll_socket::set_option(
557  
epoll_socket::set_option(
554  
    int level, int optname, void const* data, std::size_t size) noexcept
558  
    int level, int optname, void const* data, std::size_t size) noexcept
555  
{
559  
{
556  
    if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
560  
    if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
557  
        0)
561  
        0)
558  
        return make_err(errno);
562  
        return make_err(errno);
559  
    return {};
563  
    return {};
560  
}
564  
}
561  

565  

562  
inline std::error_code
566  
inline std::error_code
563  
epoll_socket::get_option(
567  
epoll_socket::get_option(
564  
    int level, int optname, void* data, std::size_t* size) const noexcept
568  
    int level, int optname, void* data, std::size_t* size) const noexcept
565  
{
569  
{
566  
    socklen_t len = static_cast<socklen_t>(*size);
570  
    socklen_t len = static_cast<socklen_t>(*size);
567  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
571  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
568  
        return make_err(errno);
572  
        return make_err(errno);
569  
    *size = static_cast<std::size_t>(len);
573  
    *size = static_cast<std::size_t>(len);
570  
    return {};
574  
    return {};
571  
}
575  
}
572  

576  

573  
inline void
577  
inline void
574  
epoll_socket::cancel() noexcept
578  
epoll_socket::cancel() noexcept
575  
{
579  
{
576  
    auto self = weak_from_this().lock();
580  
    auto self = weak_from_this().lock();
577  
    if (!self)
581  
    if (!self)
578  
        return;
582  
        return;
579  

583  

580  
    conn_.request_cancel();
584  
    conn_.request_cancel();
581  
    rd_.request_cancel();
585  
    rd_.request_cancel();
582  
    wr_.request_cancel();
586  
    wr_.request_cancel();
583  

587  

584  
    epoll_op* conn_claimed = nullptr;
588  
    epoll_op* conn_claimed = nullptr;
585  
    epoll_op* rd_claimed   = nullptr;
589  
    epoll_op* rd_claimed   = nullptr;
586  
    epoll_op* wr_claimed   = nullptr;
590  
    epoll_op* wr_claimed   = nullptr;
587  
    {
591  
    {
588  
        std::lock_guard lock(desc_state_.mutex);
592  
        std::lock_guard lock(desc_state_.mutex);
589  
        if (desc_state_.connect_op == &conn_)
593  
        if (desc_state_.connect_op == &conn_)
590  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
594  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
591  
        else
595  
        else
592  
            desc_state_.connect_cancel_pending = true;
596  
            desc_state_.connect_cancel_pending = true;
593  
        if (desc_state_.read_op == &rd_)
597  
        if (desc_state_.read_op == &rd_)
594  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
598  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
595  
        else
599  
        else
596  
            desc_state_.read_cancel_pending = true;
600  
            desc_state_.read_cancel_pending = true;
597  
        if (desc_state_.write_op == &wr_)
601  
        if (desc_state_.write_op == &wr_)
598  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
602  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
599  
        else
603  
        else
600  
            desc_state_.write_cancel_pending = true;
604  
            desc_state_.write_cancel_pending = true;
601  
    }
605  
    }
602  

606  

603  
    if (conn_claimed)
607  
    if (conn_claimed)
604  
    {
608  
    {
605  
        conn_.impl_ptr = self;
609  
        conn_.impl_ptr = self;
606  
        svc_.post(&conn_);
610  
        svc_.post(&conn_);
607  
        svc_.work_finished();
611  
        svc_.work_finished();
608  
    }
612  
    }
609  
    if (rd_claimed)
613  
    if (rd_claimed)
610  
    {
614  
    {
611  
        rd_.impl_ptr = self;
615  
        rd_.impl_ptr = self;
612  
        svc_.post(&rd_);
616  
        svc_.post(&rd_);
613  
        svc_.work_finished();
617  
        svc_.work_finished();
614  
    }
618  
    }
615  
    if (wr_claimed)
619  
    if (wr_claimed)
616  
    {
620  
    {
617  
        wr_.impl_ptr = self;
621  
        wr_.impl_ptr = self;
618  
        svc_.post(&wr_);
622  
        svc_.post(&wr_);
619  
        svc_.work_finished();
623  
        svc_.work_finished();
620  
    }
624  
    }
621  
}
625  
}
622  

626  

623  
inline void
627  
inline void
624  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
628  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
625  
{
629  
{
626  
    auto self = weak_from_this().lock();
630  
    auto self = weak_from_this().lock();
627  
    if (!self)
631  
    if (!self)
628  
        return;
632  
        return;
629  

633  

630  
    op.request_cancel();
634  
    op.request_cancel();
631  

635  

632  
    epoll_op** desc_op_ptr = nullptr;
636  
    epoll_op** desc_op_ptr = nullptr;
633  
    if (&op == &conn_)
637  
    if (&op == &conn_)
634  
        desc_op_ptr = &desc_state_.connect_op;
638  
        desc_op_ptr = &desc_state_.connect_op;
635  
    else if (&op == &rd_)
639  
    else if (&op == &rd_)
636  
        desc_op_ptr = &desc_state_.read_op;
640  
        desc_op_ptr = &desc_state_.read_op;
637  
    else if (&op == &wr_)
641  
    else if (&op == &wr_)
638  
        desc_op_ptr = &desc_state_.write_op;
642  
        desc_op_ptr = &desc_state_.write_op;
639  

643  

640  
    if (desc_op_ptr)
644  
    if (desc_op_ptr)
641  
    {
645  
    {
642  
        epoll_op* claimed = nullptr;
646  
        epoll_op* claimed = nullptr;
643  
        {
647  
        {
644  
            std::lock_guard lock(desc_state_.mutex);
648  
            std::lock_guard lock(desc_state_.mutex);
645  
            if (*desc_op_ptr == &op)
649  
            if (*desc_op_ptr == &op)
646  
                claimed = std::exchange(*desc_op_ptr, nullptr);
650  
                claimed = std::exchange(*desc_op_ptr, nullptr);
647  
            else if (&op == &conn_)
651  
            else if (&op == &conn_)
648  
                desc_state_.connect_cancel_pending = true;
652  
                desc_state_.connect_cancel_pending = true;
649  
            else if (&op == &rd_)
653  
            else if (&op == &rd_)
650  
                desc_state_.read_cancel_pending = true;
654  
                desc_state_.read_cancel_pending = true;
651  
            else if (&op == &wr_)
655  
            else if (&op == &wr_)
652  
                desc_state_.write_cancel_pending = true;
656  
                desc_state_.write_cancel_pending = true;
653  
        }
657  
        }
654  
        if (claimed)
658  
        if (claimed)
655  
        {
659  
        {
656  
            op.impl_ptr = self;
660  
            op.impl_ptr = self;
657  
            svc_.post(&op);
661  
            svc_.post(&op);
658  
            svc_.work_finished();
662  
            svc_.work_finished();
659  
        }
663  
        }
660  
    }
664  
    }
661  
}
665  
}
662  

666  

663  
inline void
667  
inline void
664  
epoll_socket::close_socket() noexcept
668  
epoll_socket::close_socket() noexcept
665  
{
669  
{
666  
    auto self = weak_from_this().lock();
670  
    auto self = weak_from_this().lock();
667  
    if (self)
671  
    if (self)
668  
    {
672  
    {
669  
        conn_.request_cancel();
673  
        conn_.request_cancel();
670  
        rd_.request_cancel();
674  
        rd_.request_cancel();
671  
        wr_.request_cancel();
675  
        wr_.request_cancel();
672  

676  

673  
        epoll_op* conn_claimed = nullptr;
677  
        epoll_op* conn_claimed = nullptr;
674  
        epoll_op* rd_claimed   = nullptr;
678  
        epoll_op* rd_claimed   = nullptr;
675  
        epoll_op* wr_claimed   = nullptr;
679  
        epoll_op* wr_claimed   = nullptr;
676  
        {
680  
        {
677  
            std::lock_guard lock(desc_state_.mutex);
681  
            std::lock_guard lock(desc_state_.mutex);
678  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
682  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
679  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
683  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
680  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
684  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
681  
            desc_state_.read_ready             = false;
685  
            desc_state_.read_ready             = false;
682  
            desc_state_.write_ready            = false;
686  
            desc_state_.write_ready            = false;
683  
            desc_state_.read_cancel_pending    = false;
687  
            desc_state_.read_cancel_pending    = false;
684  
            desc_state_.write_cancel_pending   = false;
688  
            desc_state_.write_cancel_pending   = false;
685  
            desc_state_.connect_cancel_pending = false;
689  
            desc_state_.connect_cancel_pending = false;
686  
        }
690  
        }
687  

691  

688  
        if (conn_claimed)
692  
        if (conn_claimed)
689  
        {
693  
        {
690  
            conn_.impl_ptr = self;
694  
            conn_.impl_ptr = self;
691  
            svc_.post(&conn_);
695  
            svc_.post(&conn_);
692  
            svc_.work_finished();
696  
            svc_.work_finished();
693  
        }
697  
        }
694  
        if (rd_claimed)
698  
        if (rd_claimed)
695  
        {
699  
        {
696  
            rd_.impl_ptr = self;
700  
            rd_.impl_ptr = self;
697  
            svc_.post(&rd_);
701  
            svc_.post(&rd_);
698  
            svc_.work_finished();
702  
            svc_.work_finished();
699  
        }
703  
        }
700  
        if (wr_claimed)
704  
        if (wr_claimed)
701  
        {
705  
        {
702  
            wr_.impl_ptr = self;
706  
            wr_.impl_ptr = self;
703  
            svc_.post(&wr_);
707  
            svc_.post(&wr_);
704  
            svc_.work_finished();
708  
            svc_.work_finished();
705  
        }
709  
        }
706  

710  

707  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
711  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
708  
            desc_state_.impl_ref_ = self;
712  
            desc_state_.impl_ref_ = self;
709  
    }
713  
    }
710  

714  

711  
    if (fd_ >= 0)
715  
    if (fd_ >= 0)
712  
    {
716  
    {
713  
        if (desc_state_.registered_events != 0)
717  
        if (desc_state_.registered_events != 0)
714  
            svc_.scheduler().deregister_descriptor(fd_);
718  
            svc_.scheduler().deregister_descriptor(fd_);
715  
        ::close(fd_);
719  
        ::close(fd_);
716  
        fd_ = -1;
720  
        fd_ = -1;
717  
    }
721  
    }
718  

722  

719  
    desc_state_.fd                = -1;
723  
    desc_state_.fd                = -1;
720  
    desc_state_.registered_events = 0;
724  
    desc_state_.registered_events = 0;
721  

725  

722  
    local_endpoint_  = endpoint{};
726  
    local_endpoint_  = endpoint{};
723  
    remote_endpoint_ = endpoint{};
727  
    remote_endpoint_ = endpoint{};
724  
}
728  
}
725  

729  

726  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
730  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
727  
    : state_(
731  
    : state_(
728  
          std::make_unique<epoll_socket_state>(
732  
          std::make_unique<epoll_socket_state>(
729  
              ctx.use_service<epoll_scheduler>()))
733  
              ctx.use_service<epoll_scheduler>()))
730  
{
734  
{
731  
}
735  
}
732  

736  

733  
inline epoll_socket_service::~epoll_socket_service() {}
737  
inline epoll_socket_service::~epoll_socket_service() {}
734  

738  

735  
inline void
739  
inline void
736  
epoll_socket_service::shutdown()
740  
epoll_socket_service::shutdown()
737  
{
741  
{
738  
    std::lock_guard lock(state_->mutex_);
742  
    std::lock_guard lock(state_->mutex_);
739  

743  

740  
    while (auto* impl = state_->socket_list_.pop_front())
744  
    while (auto* impl = state_->socket_list_.pop_front())
741  
        impl->close_socket();
745  
        impl->close_socket();
742  

746  

743  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
747  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
744  
    // drains completed_ops_, calling destroy() on each queued op. If we
748  
    // drains completed_ops_, calling destroy() on each queued op. If we
745  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
749  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
746  
    // last ref to an impl whose embedded descriptor_state is still linked
750  
    // last ref to an impl whose embedded descriptor_state is still linked
747  
    // in the queue — use-after-free on the next pop(). Letting ~state_
751  
    // in the queue — use-after-free on the next pop(). Letting ~state_
748  
    // release the ptrs (during service destruction, after scheduler
752  
    // release the ptrs (during service destruction, after scheduler
749  
    // shutdown) keeps every impl alive until all ops have been drained.
753  
    // shutdown) keeps every impl alive until all ops have been drained.
750  
}
754  
}
751  

755  

752  
inline io_object::implementation*
756  
inline io_object::implementation*
753  
epoll_socket_service::construct()
757  
epoll_socket_service::construct()
754  
{
758  
{
755  
    auto impl = std::make_shared<epoll_socket>(*this);
759  
    auto impl = std::make_shared<epoll_socket>(*this);
756  
    auto* raw = impl.get();
760  
    auto* raw = impl.get();
757  

761  

758  
    {
762  
    {
759  
        std::lock_guard lock(state_->mutex_);
763  
        std::lock_guard lock(state_->mutex_);
760  
        state_->socket_list_.push_back(raw);
764  
        state_->socket_list_.push_back(raw);
761  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
765  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
762  
    }
766  
    }
763  

767  

764  
    return raw;
768  
    return raw;
765  
}
769  
}
766  

770  

767  
inline void
771  
inline void
768  
epoll_socket_service::destroy(io_object::implementation* impl)
772  
epoll_socket_service::destroy(io_object::implementation* impl)
769  
{
773  
{
770  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
774  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
771  
    epoll_impl->close_socket();
775  
    epoll_impl->close_socket();
772  
    std::lock_guard lock(state_->mutex_);
776  
    std::lock_guard lock(state_->mutex_);
773  
    state_->socket_list_.remove(epoll_impl);
777  
    state_->socket_list_.remove(epoll_impl);
774  
    state_->socket_ptrs_.erase(epoll_impl);
778  
    state_->socket_ptrs_.erase(epoll_impl);
775  
}
779  
}
776  

780  

777  
inline std::error_code
781  
inline std::error_code
778  
epoll_socket_service::open_socket(
782  
epoll_socket_service::open_socket(
779  
    tcp_socket::implementation& impl, int family, int type, int protocol)
783  
    tcp_socket::implementation& impl, int family, int type, int protocol)
780  
{
784  
{
781  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
785  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
782  
    epoll_impl->close_socket();
786  
    epoll_impl->close_socket();
783  

787  

784  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
788  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
785  
    if (fd < 0)
789  
    if (fd < 0)
786  
        return make_err(errno);
790  
        return make_err(errno);
787  

791  

788  
    if (family == AF_INET6)
792  
    if (family == AF_INET6)
789  
    {
793  
    {
790  
        int one = 1;
794  
        int one = 1;
791  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
795  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
792  
    }
796  
    }
793  

797  

794  
    epoll_impl->fd_ = fd;
798  
    epoll_impl->fd_ = fd;
795  

799  

796  
    // Register fd with epoll (edge-triggered mode)
800  
    // Register fd with epoll (edge-triggered mode)
797  
    epoll_impl->desc_state_.fd = fd;
801  
    epoll_impl->desc_state_.fd = fd;
798  
    {
802  
    {
799  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
803  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
800  
        epoll_impl->desc_state_.read_op    = nullptr;
804  
        epoll_impl->desc_state_.read_op    = nullptr;
801  
        epoll_impl->desc_state_.write_op   = nullptr;
805  
        epoll_impl->desc_state_.write_op   = nullptr;
802  
        epoll_impl->desc_state_.connect_op = nullptr;
806  
        epoll_impl->desc_state_.connect_op = nullptr;
803  
    }
807  
    }
804  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
808  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
805  

809  

806  
    return {};
810  
    return {};
807  
}
811  
}
808  

812  

809  
inline void
813  
inline void
810  
epoll_socket_service::close(io_object::handle& h)
814  
epoll_socket_service::close(io_object::handle& h)
811  
{
815  
{
812  
    static_cast<epoll_socket*>(h.get())->close_socket();
816  
    static_cast<epoll_socket*>(h.get())->close_socket();
813  
}
817  
}
814  

818  

815  
inline void
819  
inline void
816  
epoll_socket_service::post(epoll_op* op)
820  
epoll_socket_service::post(epoll_op* op)
817  
{
821  
{
818  
    state_->sched_.post(op);
822  
    state_->sched_.post(op);
819  
}
823  
}
820  

824  

821  
inline void
825  
inline void
822  
epoll_socket_service::work_started() noexcept
826  
epoll_socket_service::work_started() noexcept
823  
{
827  
{
824  
    state_->sched_.work_started();
828  
    state_->sched_.work_started();
825  
}
829  
}
826  

830  

827  
inline void
831  
inline void
828  
epoll_socket_service::work_finished() noexcept
832  
epoll_socket_service::work_finished() noexcept
829  
{
833  
{
830  
    state_->sched_.work_finished();
834  
    state_->sched_.work_finished();
831  
}
835  
}
832  

836  

833  
} // namespace boost::corosio::detail
837  
} // namespace boost::corosio::detail
834  

838  

835  
#endif // BOOST_COROSIO_HAS_EPOLL
839  
#endif // BOOST_COROSIO_HAS_EPOLL
836  

840  

837  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
841  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP