1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/corosio/detail/tcp_service.hpp>
18  
#include <boost/corosio/detail/tcp_service.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
20  
#include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
22  
#include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23  

23  

24  
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
24  
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25  

25  

26  
#include <coroutine>
26  
#include <coroutine>
27  

27  

28  
#include <errno.h>
28  
#include <errno.h>
29  
#include <netinet/in.h>
29  
#include <netinet/in.h>
30  
#include <netinet/tcp.h>
30  
#include <netinet/tcp.h>
31  
#include <sys/epoll.h>
31  
#include <sys/epoll.h>
32  
#include <sys/socket.h>
32  
#include <sys/socket.h>
33  
#include <unistd.h>
33  
#include <unistd.h>
34  

34  

35  
/*
35  
/*
36  
    epoll Socket Implementation
36  
    epoll Socket Implementation
37  
    ===========================
37  
    ===========================
38  

38  

39  
    Each I/O operation follows the same pattern:
39  
    Each I/O operation follows the same pattern:
40  
      1. Try the syscall immediately (non-blocking socket)
40  
      1. Try the syscall immediately (non-blocking socket)
41  
      2. If it succeeds or fails with a real error, post to completion queue
41  
      2. If it succeeds or fails with a real error, post to completion queue
42  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
42  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43  

43  

44  
    This "try first" approach avoids unnecessary epoll round-trips for
44  
    This "try first" approach avoids unnecessary epoll round-trips for
45  
    operations that can complete immediately (common for small reads/writes
45  
    operations that can complete immediately (common for small reads/writes
46  
    on fast local connections).
46  
    on fast local connections).
47  

47  

48  
    One-Shot Registration
48  
    One-Shot Registration
49  
    ---------------------
49  
    ---------------------
50  
    We use one-shot epoll registration: each operation registers, waits for
50  
    We use one-shot epoll registration: each operation registers, waits for
51  
    one event, then unregisters. This simplifies the state machine since we
51  
    one event, then unregisters. This simplifies the state machine since we
52  
    don't need to track whether an fd is currently registered or handle
52  
    don't need to track whether an fd is currently registered or handle
53  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
53  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54  
    simplicity is worth it.
54  
    simplicity is worth it.
55  

55  

56  
    Cancellation
56  
    Cancellation
57  
    ------------
57  
    ------------
58  
    See op.hpp for the completion/cancellation race handling via the
58  
    See op.hpp for the completion/cancellation race handling via the
59  
    `registered` atomic. cancel() must complete pending operations (post
59  
    `registered` atomic. cancel() must complete pending operations (post
60  
    them with cancelled flag) so coroutines waiting on them can resume.
60  
    them with cancelled flag) so coroutines waiting on them can resume.
61  
    close_socket() calls cancel() first to ensure this.
61  
    close_socket() calls cancel() first to ensure this.
62  

62  

63  
    Impl Lifetime with shared_ptr
63  
    Impl Lifetime with shared_ptr
64  
    -----------------------------
64  
    -----------------------------
65  
    Socket impls use enable_shared_from_this. The service owns impls via
65  
    Socket impls use enable_shared_from_this. The service owns impls via
66  
    shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
66  
    shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67  
    removal. When a user calls close(), we call cancel() which posts pending
67  
    removal. When a user calls close(), we call cancel() which posts pending
68  
    ops to the scheduler.
68  
    ops to the scheduler.
69  

69  

70  
    CRITICAL: The posted ops must keep the impl alive until they complete.
70  
    CRITICAL: The posted ops must keep the impl alive until they complete.
71  
    Otherwise the scheduler would process a freed op (use-after-free). The
71  
    Otherwise the scheduler would process a freed op (use-after-free). The
72  
    cancel() method captures shared_from_this() into op.impl_ptr before
72  
    cancel() method captures shared_from_this() into op.impl_ptr before
73  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
73  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
74  
    to be destroyed if no other references exist.
74  
    to be destroyed if no other references exist.
75  

75  

76  
    Service Ownership
76  
    Service Ownership
77  
    -----------------
77  
    -----------------
78  
    epoll_tcp_service owns all socket impls. destroy_impl() removes the
78  
    epoll_tcp_service owns all socket impls. destroy_impl() removes the
79  
    shared_ptr from the map, but the impl may survive if ops still hold
79  
    shared_ptr from the map, but the impl may survive if ops still hold
80  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
80  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
81  
    in-flight ops will complete and release their refs.
81  
    in-flight ops will complete and release their refs.
82  
*/
82  
*/
83  

83  

84  
namespace boost::corosio::detail {
84  
namespace boost::corosio::detail {
85  

85  

86  
/** epoll TCP service implementation.
86  
/** epoll TCP service implementation.
87  

87  

88  
    Inherits from tcp_service to enable runtime polymorphism.
88  
    Inherits from tcp_service to enable runtime polymorphism.
89  
    Uses key_type = tcp_service for service lookup.
89  
    Uses key_type = tcp_service for service lookup.
90  
*/
90  
*/
91  
class BOOST_COROSIO_DECL epoll_tcp_service final
91  
class BOOST_COROSIO_DECL epoll_tcp_service final
92  
    : public reactor_socket_service<
92  
    : public reactor_socket_service<
93  
          epoll_tcp_service,
93  
          epoll_tcp_service,
94  
          tcp_service,
94  
          tcp_service,
95  
          epoll_scheduler,
95  
          epoll_scheduler,
96  
          epoll_tcp_socket>
96  
          epoll_tcp_socket>
97  
{
97  
{
98  
public:
98  
public:
99  
    explicit epoll_tcp_service(capy::execution_context& ctx)
99  
    explicit epoll_tcp_service(capy::execution_context& ctx)
100  
        : reactor_socket_service(ctx)
100  
        : reactor_socket_service(ctx)
101  
    {
101  
    {
102  
    }
102  
    }
103  

103  

104  
    std::error_code open_socket(
104  
    std::error_code open_socket(
105  
        tcp_socket::implementation& impl,
105  
        tcp_socket::implementation& impl,
106  
        int family,
106  
        int family,
107  
        int type,
107  
        int type,
108 -

 
109 -
    std::error_code
 
110 -
    bind_socket(tcp_socket::implementation& impl, endpoint ep) override;
 
111  
        int protocol) override;
108  
        int protocol) override;
112  
};
109  
};
113  

110  

114  
inline void
111  
inline void
115  
epoll_connect_op::cancel() noexcept
112  
epoll_connect_op::cancel() noexcept
116  
{
113  
{
117  
    if (socket_impl_)
114  
    if (socket_impl_)
118  
        socket_impl_->cancel_single_op(*this);
115  
        socket_impl_->cancel_single_op(*this);
119  
    else
116  
    else
120  
        request_cancel();
117  
        request_cancel();
121  
}
118  
}
122  

119  

123  
inline void
120  
inline void
124  
epoll_read_op::cancel() noexcept
121  
epoll_read_op::cancel() noexcept
125  
{
122  
{
126  
    if (socket_impl_)
123  
    if (socket_impl_)
127  
        socket_impl_->cancel_single_op(*this);
124  
        socket_impl_->cancel_single_op(*this);
128  
    else
125  
    else
129  
        request_cancel();
126  
        request_cancel();
130  
}
127  
}
131  

128  

132  
inline void
129  
inline void
133  
epoll_write_op::cancel() noexcept
130  
epoll_write_op::cancel() noexcept
134  
{
131  
{
135  
    if (socket_impl_)
132  
    if (socket_impl_)
136  
        socket_impl_->cancel_single_op(*this);
133  
        socket_impl_->cancel_single_op(*this);
137  
    else
134  
    else
138  
        request_cancel();
135  
        request_cancel();
139  
}
136  
}
140  

137  

141  
inline void
138  
inline void
142  
epoll_op::operator()()
139  
epoll_op::operator()()
143  
{
140  
{
144  
    complete_io_op(*this);
141  
    complete_io_op(*this);
145  
}
142  
}
146  

143  

147  
inline void
144  
inline void
148  
epoll_connect_op::operator()()
145  
epoll_connect_op::operator()()
149  
{
146  
{
150  
    complete_connect_op(*this);
147  
    complete_connect_op(*this);
151  
}
148  
}
152  

149  

153  
inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
150  
inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
154  
    : reactor_stream_socket(svc)
151  
    : reactor_stream_socket(svc)
155  
{
152  
{
156  
}
153  
}
157  

154  

158  
inline epoll_tcp_socket::~epoll_tcp_socket() = default;
155  
inline epoll_tcp_socket::~epoll_tcp_socket() = default;
159  

156  

160  
inline std::coroutine_handle<>
157  
inline std::coroutine_handle<>
161  
epoll_tcp_socket::connect(
158  
epoll_tcp_socket::connect(
162  
    std::coroutine_handle<> h,
159  
    std::coroutine_handle<> h,
163  
    capy::executor_ref ex,
160  
    capy::executor_ref ex,
164  
    endpoint ep,
161  
    endpoint ep,
165  
    std::stop_token token,
162  
    std::stop_token token,
166  
    std::error_code* ec)
163  
    std::error_code* ec)
167  
{
164  
{
168  
    return do_connect(h, ex, ep, token, ec);
165  
    return do_connect(h, ex, ep, token, ec);
169  
}
166  
}
170  

167  

171  
inline std::coroutine_handle<>
168  
inline std::coroutine_handle<>
172  
epoll_tcp_socket::read_some(
169  
epoll_tcp_socket::read_some(
173  
    std::coroutine_handle<> h,
170  
    std::coroutine_handle<> h,
174  
    capy::executor_ref ex,
171  
    capy::executor_ref ex,
175  
    buffer_param param,
172  
    buffer_param param,
176  
    std::stop_token token,
173  
    std::stop_token token,
177  
    std::error_code* ec,
174  
    std::error_code* ec,
178  
    std::size_t* bytes_out)
175  
    std::size_t* bytes_out)
179  
{
176  
{
180  
    return do_read_some(h, ex, param, token, ec, bytes_out);
177  
    return do_read_some(h, ex, param, token, ec, bytes_out);
181  
}
178  
}
182  

179  

183  
inline std::coroutine_handle<>
180  
inline std::coroutine_handle<>
184  
epoll_tcp_socket::write_some(
181  
epoll_tcp_socket::write_some(
185  
    std::coroutine_handle<> h,
182  
    std::coroutine_handle<> h,
186  
    capy::executor_ref ex,
183  
    capy::executor_ref ex,
187  
    buffer_param param,
184  
    buffer_param param,
188  
    std::stop_token token,
185  
    std::stop_token token,
189  
    std::error_code* ec,
186  
    std::error_code* ec,
190  
    std::size_t* bytes_out)
187  
    std::size_t* bytes_out)
191  
{
188  
{
192  
    return do_write_some(h, ex, param, token, ec, bytes_out);
189  
    return do_write_some(h, ex, param, token, ec, bytes_out);
193  
}
190  
}
194  

191  

195  
inline void
192  
inline void
196  
epoll_tcp_socket::cancel() noexcept
193  
epoll_tcp_socket::cancel() noexcept
197  
{
194  
{
198  
    do_cancel();
195  
    do_cancel();
199  
}
196  
}
200  

197  

201  
inline void
198  
inline void
202  
epoll_tcp_socket::close_socket() noexcept
199  
epoll_tcp_socket::close_socket() noexcept
203  
{
200  
{
204  
    do_close_socket();
201  
    do_close_socket();
205  
}
202  
}
206  

203  

207  
inline std::error_code
204  
inline std::error_code
208  
epoll_tcp_service::open_socket(
205  
epoll_tcp_service::open_socket(
209  
    tcp_socket::implementation& impl, int family, int type, int protocol)
206  
    tcp_socket::implementation& impl, int family, int type, int protocol)
210  
{
207  
{
211  
    auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
208  
    auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
212  
    epoll_impl->close_socket();
209  
    epoll_impl->close_socket();
213  

210  

214  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
211  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
215  
    if (fd < 0)
212  
    if (fd < 0)
216  
        return make_err(errno);
213  
        return make_err(errno);
217  

214  

218  
    if (family == AF_INET6)
215  
    if (family == AF_INET6)
219  
    {
216  
    {
220  
        int one = 1;
217  
        int one = 1;
221  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
218  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
222  
    }
219  
    }
223  

220  

224  
    epoll_impl->fd_ = fd;
221  
    epoll_impl->fd_ = fd;
225  

222  

226  
    // Register fd with epoll (edge-triggered mode)
223  
    // Register fd with epoll (edge-triggered mode)
227  
    epoll_impl->desc_state_.fd = fd;
224  
    epoll_impl->desc_state_.fd = fd;
228  
    {
225  
    {
229  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
226  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
230  
        epoll_impl->desc_state_.read_op    = nullptr;
227  
        epoll_impl->desc_state_.read_op    = nullptr;
231  
        epoll_impl->desc_state_.write_op   = nullptr;
228  
        epoll_impl->desc_state_.write_op   = nullptr;
232  
        epoll_impl->desc_state_.connect_op = nullptr;
229  
        epoll_impl->desc_state_.connect_op = nullptr;
233  
    }
230  
    }
234  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
231  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
235  

232  

236 -
}
 
237 -

 
238 -
inline std::error_code
 
239 -
epoll_tcp_service::bind_socket(
 
240 -
    tcp_socket::implementation& impl, endpoint ep)
 
241 -
{
 
242 -
    return static_cast<epoll_tcp_socket*>(&impl)->do_bind(ep);
 
243  
    return {};
233  
    return {};
244  
}
234  
}
245  

235  

246  
} // namespace boost::corosio::detail
236  
} // namespace boost::corosio::detail
247  

237  

248  
#endif // BOOST_COROSIO_HAS_EPOLL
238  
#endif // BOOST_COROSIO_HAS_EPOLL
249  

239  

250  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
240  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP