Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions include/boost/corosio/detail/timer_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <mutex>
#include <optional>
#include <stop_token>
#include <utility>
#include <vector>

namespace boost::corosio::detail {
Expand Down Expand Up @@ -198,8 +199,7 @@ struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
completion_op() noexcept : scheduler_op(&do_complete) {}

void operator()() override;
// No-op — lifetime owned by waiter_node, not the scheduler queue
void destroy() override {}
void destroy() override;
};

// Per-waiter stop_token cancellation
Expand Down Expand Up @@ -328,15 +328,22 @@ timer_service::shutdown()
{
timer_service_invalidate_cache();

// Cancel waiting timers still in the heap
// Cancel waiting timers still in the heap.
// Each waiter called work_started() in implementation::wait().
// On IOCP the scheduler shutdown loop exits when outstanding_work_
// reaches zero, so we must call work_finished() here to balance it.
// On other backends this is harmless (their drain loops exit when
// the queue is empty, not based on outstanding_work_).
for (auto& entry : heap_)
{
auto* impl = entry.timer_;
while (auto* w = impl->waiters_.pop_front())
{
w->stop_cb_.reset();
w->h_ = {};
auto h = std::exchange(w->h_, {});
sched_->work_finished();
if (h)
h.destroy();
delete w;
}
impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
Expand Down Expand Up @@ -722,10 +729,12 @@ waiter_node::canceller::operator()() const

inline void
waiter_node::completion_op::do_complete(
void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
[[maybe_unused]] void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
{
if (!owner)
return;
// owner is always non-null here. The destroy path (owner == nullptr)
// is unreachable because completion_op overrides destroy() directly,
// bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
BOOST_COROSIO_ASSERT(owner);
static_cast<completion_op*>(base)->operator()();
}

Expand All @@ -748,6 +757,30 @@ waiter_node::completion_op::operator()()
sched.work_finished();
}

inline void
waiter_node::completion_op::destroy()
{
// Called during scheduler shutdown drain when this completion_op is
// in the scheduler's ready queue (posted by cancel_timer() or
// process_expired()). Balances the work_started() from
// implementation::wait(). The scheduler drain loop separately
// balances the work_started() from post(). On IOCP both decrements
// are required for outstanding_work_ to reach zero; on other
// backends this is harmless.
//
// This override also prevents scheduler_op::destroy() from calling
// do_complete(nullptr, ...). See also: timer_service::shutdown()
// which drains waiters still in the timer heap (the other path).
auto* w = waiter_;
w->stop_cb_.reset();
auto h = std::exchange(w->h_, {});
auto& sched = w->svc_->get_scheduler();
delete w;
sched.work_finished();
if (h)
h.destroy();
}

inline std::coroutine_handle<>
timer_service::implementation::wait(
std::coroutine_handle<> h,
Expand Down
7 changes: 2 additions & 5 deletions include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ class BOOST_COROSIO_DECL epoll_scheduler final
mutable op_queue completed_ops_;
mutable std::atomic<long> outstanding_work_;
bool stopped_;
bool shutdown_;

// True while a thread is blocked in epoll_wait. Used by
// wake_one_thread_and_unlock and work_finished to know when
Expand Down Expand Up @@ -612,7 +611,6 @@ inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
, timer_fd_(-1)
, outstanding_work_(0)
, stopped_(false)
, shutdown_(false)
, task_running_{false}
, task_interrupted_(false)
, state_(0)
Expand Down Expand Up @@ -696,7 +694,6 @@ epoll_scheduler::shutdown()
{
{
std::unique_lock lock(mutex_);
shutdown_ = true;

while (auto* h = completed_ops_.pop())
{
Expand All @@ -710,8 +707,6 @@ epoll_scheduler::shutdown()
signal_all(lock);
}

outstanding_work_.store(0, std::memory_order_release);

if (event_fd_ >= 0)
interrupt_reactor();
}
Expand All @@ -736,7 +731,9 @@ epoll_scheduler::post(std::coroutine_handle<> h) const

void destroy() override
{
auto h = h_;
delete this;
h.destroy();
}
};

Expand Down
87 changes: 57 additions & 30 deletions include/boost/corosio/native/detail/iocp/win_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class BOOST_COROSIO_DECL win_scheduler final
void* iocp_;
mutable long outstanding_work_;
mutable long stopped_;
long shutdown_;
long stop_event_posted_;
mutable long dispatch_required_;

Expand Down Expand Up @@ -162,7 +161,6 @@ inline win_scheduler::win_scheduler(
: iocp_(nullptr)
, outstanding_work_(0)
, stopped_(0)
, shutdown_(0)
, stop_event_posted_(0)
, dispatch_required_(0)
{
Expand Down Expand Up @@ -194,51 +192,59 @@ inline win_scheduler::~win_scheduler()
inline void
win_scheduler::shutdown()
{
::InterlockedExchange(&shutdown_, 1);

if (timers_)
timers_->stop();

for (;;)
// Drain timer heap before the work-counting loop. The timer_service
// was registered after this scheduler (nested make_service from our
// constructor), so execution_context::shutdown() calls us first.
// Asio avoids this by owning timer queues directly inside the
// scheduler; we bridge the gap by shutting down the timer service
// early. The subsequent call from execution_context is a no-op.
if (timer_svc_)
timer_svc_->shutdown();

while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
{
op_queue ops;
{
std::lock_guard<win_mutex> lock(dispatch_mutex_);
ops.splice(completed_ops_);
}

bool drained_any = false;

while (auto* h = ops.pop())
if (!ops.empty())
{
h->destroy();
drained_any = true;
}

DWORD bytes;
ULONG_PTR key;
LPOVERLAPPED overlapped;
::GetQueuedCompletionStatus(iocp_, &bytes, &key, &overlapped, 0);
if (overlapped)
{
if (key == key_posted)
while (auto* h = ops.pop())
{
auto* op = reinterpret_cast<scheduler_op*>(overlapped);
op->destroy();
::InterlockedDecrement(&outstanding_work_);
h->destroy();
}
else
}
else
{
DWORD bytes;
ULONG_PTR key;
LPOVERLAPPED overlapped;
::GetQueuedCompletionStatus(
iocp_, &bytes, &key, &overlapped,
iocp::max_gqcs_timeout);
if (overlapped)
{
auto* op = overlapped_to_op(overlapped);
op->destroy();
::InterlockedDecrement(&outstanding_work_);
if (key == key_posted)
{
auto* op =
reinterpret_cast<scheduler_op*>(overlapped);
op->destroy();
}
else
{
auto* op = overlapped_to_op(overlapped);
op->destroy();
}
}
drained_any = true;
}

if (!drained_any)
break;
}

::InterlockedExchange(&outstanding_work_, 0);
}

inline void
Expand All @@ -254,7 +260,28 @@ win_scheduler::post(std::coroutine_handle<> h) const
auto* self = static_cast<post_handler*>(base);
if (!owner)
{
// Shutdown path: destroy the coroutine frame synchronously.
//
// Bounded destruction invariant: the chain triggered by
// coro.destroy() is at most two levels deep:
// 1. task frame destroyed → ~io_awaitable_promise_base()
// destroys stored continuation (if != noop_coroutine)
// 2. continuation (trampoline) destroyed → final_suspend
// returns suspend_never, no further continuation
//
// If a future refactor adds deeper continuation chains,
// this would reintroduce re-entrant stack overflow risk.
#ifndef NDEBUG
static thread_local int destroy_depth = 0;
++destroy_depth;
BOOST_COROSIO_ASSERT(destroy_depth <= 2);
#endif
auto coro = self->h_;
delete self;
coro.destroy();
#ifndef NDEBUG
--destroy_depth;
#endif
return;
}
auto coro = self->h_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ class BOOST_COROSIO_DECL kqueue_scheduler final
mutable op_queue completed_ops_;
mutable std::atomic<std::int64_t> outstanding_work_{0};
std::atomic<bool> stopped_{false};
bool shutdown_ = false;

// True while a thread is blocked in kevent(). Used by
// wake_one_thread_and_unlock and work_finished to know when
Expand Down Expand Up @@ -709,7 +708,6 @@ inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
: kq_fd_(-1)
, outstanding_work_(0)
, stopped_(false)
, shutdown_(false)
, task_running_(false)
, task_interrupted_(false)
, state_(0)
Expand Down Expand Up @@ -764,7 +762,6 @@ kqueue_scheduler::shutdown()
{
{
std::unique_lock lock(mutex_);
shutdown_ = true;

while (auto* h = completed_ops_.pop())
{
Expand All @@ -778,8 +775,6 @@ kqueue_scheduler::shutdown()
signal_all(lock);
}

outstanding_work_.store(0, std::memory_order_release);

if (kq_fd_ >= 0)
interrupt_reactor();
}
Expand Down Expand Up @@ -808,7 +803,9 @@ kqueue_scheduler::post(std::coroutine_handle<> h) const

void destroy() override
{
auto h = h_;
delete this;
h.destroy();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ class BOOST_COROSIO_DECL select_scheduler final
mutable op_queue completed_ops_;
mutable std::atomic<long> outstanding_work_;
std::atomic<bool> stopped_;
bool shutdown_;

// Per-fd state for tracking registered operations
struct fd_state
Expand Down Expand Up @@ -259,7 +258,6 @@ inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
: pipe_fds_{-1, -1}
, outstanding_work_(0)
, stopped_(false)
, shutdown_(false)
, max_fd_(-1)
, reactor_running_(false)
, reactor_interrupted_(false)
Expand Down Expand Up @@ -325,7 +323,6 @@ select_scheduler::shutdown()
{
{
std::unique_lock lock(mutex_);
shutdown_ = true;

while (auto* h = completed_ops_.pop())
{
Expand All @@ -337,8 +334,6 @@ select_scheduler::shutdown()
}
}

outstanding_work_.store(0, std::memory_order_release);

if (pipe_fds_[1] >= 0)
interrupt_reactor();

Expand All @@ -365,7 +360,9 @@ select_scheduler::post(std::coroutine_handle<> h) const

void destroy() override
{
auto h = h_;
delete this;
h.destroy();
}
};

Expand Down
Loading
Loading