diff --git a/include/boost/corosio/detail/timer_service.hpp b/include/boost/corosio/detail/timer_service.hpp index e324bbfe..063f8101 100644 --- a/include/boost/corosio/detail/timer_service.hpp +++ b/include/boost/corosio/detail/timer_service.hpp @@ -30,6 +30,7 @@ #include #include #include +#include #include namespace boost::corosio::detail { @@ -198,8 +199,7 @@ struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node completion_op() noexcept : scheduler_op(&do_complete) {} void operator()() override; - // No-op — lifetime owned by waiter_node, not the scheduler queue - void destroy() override {} + void destroy() override; }; // Per-waiter stop_token cancellation @@ -328,15 +328,22 @@ timer_service::shutdown() { timer_service_invalidate_cache(); - // Cancel waiting timers still in the heap + // Cancel waiting timers still in the heap. + // Each waiter called work_started() in implementation::wait(). + // On IOCP the scheduler shutdown loop exits when outstanding_work_ + // reaches zero, so we must call work_finished() here to balance it. + // On other backends this is harmless (their drain loops exit when + // the queue is empty, not based on outstanding_work_). for (auto& entry : heap_) { auto* impl = entry.timer_; while (auto* w = impl->waiters_.pop_front()) { w->stop_cb_.reset(); - w->h_ = {}; + auto h = std::exchange(w->h_, {}); sched_->work_finished(); + if (h) + h.destroy(); delete w; } impl->heap_index_ = (std::numeric_limits::max)(); @@ -722,10 +729,12 @@ waiter_node::canceller::operator()() const inline void waiter_node::completion_op::do_complete( - void* owner, scheduler_op* base, std::uint32_t, std::uint32_t) + [[maybe_unused]] void* owner, scheduler_op* base, std::uint32_t, std::uint32_t) { - if (!owner) - return; + // owner is always non-null here. The destroy path (owner == nullptr) + // is unreachable because completion_op overrides destroy() directly, + // bypassing scheduler_op::destroy() which would call func_(nullptr, ...). + BOOST_COROSIO_ASSERT(owner); static_cast(base)->operator()(); } @@ -748,6 +757,30 @@ waiter_node::completion_op::operator()() sched.work_finished(); } +inline void +waiter_node::completion_op::destroy() +{ + // Called during scheduler shutdown drain when this completion_op is + // in the scheduler's ready queue (posted by cancel_timer() or + // process_expired()). Balances the work_started() from + // implementation::wait(). The scheduler drain loop separately + // balances the work_started() from post(). On IOCP both decrements + // are required for outstanding_work_ to reach zero; on other + // backends this is harmless. + // + // This override also prevents scheduler_op::destroy() from calling + // do_complete(nullptr, ...). See also: timer_service::shutdown() + // which drains waiters still in the timer heap (the other path). + auto* w = waiter_; + w->stop_cb_.reset(); + auto h = std::exchange(w->h_, {}); + auto& sched = w->svc_->get_scheduler(); + delete w; + sched.work_finished(); + if (h) + h.destroy(); +} + inline std::coroutine_handle<> timer_service::implementation::wait( std::coroutine_handle<> h, diff --git a/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp b/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp index 784c3f98..8bca20bc 100644 --- a/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp +++ b/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp @@ -288,7 +288,6 @@ class BOOST_COROSIO_DECL epoll_scheduler final mutable op_queue completed_ops_; mutable std::atomic outstanding_work_; bool stopped_; - bool shutdown_; // True while a thread is blocked in epoll_wait. Used by // wake_one_thread_and_unlock and work_finished to know when @@ -612,7 +611,6 @@ inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int) , timer_fd_(-1) , outstanding_work_(0) , stopped_(false) - , shutdown_(false) , task_running_{false} , task_interrupted_(false) , state_(0) @@ -696,7 +694,6 @@ epoll_scheduler::shutdown() { { std::unique_lock lock(mutex_); - shutdown_ = true; while (auto* h = completed_ops_.pop()) { @@ -710,8 +707,6 @@ epoll_scheduler::shutdown() signal_all(lock); } - outstanding_work_.store(0, std::memory_order_release); - if (event_fd_ >= 0) interrupt_reactor(); } @@ -736,7 +731,9 @@ epoll_scheduler::post(std::coroutine_handle<> h) const void destroy() override { + auto h = h_; delete this; + h.destroy(); } }; diff --git a/include/boost/corosio/native/detail/iocp/win_scheduler.hpp b/include/boost/corosio/native/detail/iocp/win_scheduler.hpp index f1f22292..3849211e 100644 --- a/include/boost/corosio/native/detail/iocp/win_scheduler.hpp +++ b/include/boost/corosio/native/detail/iocp/win_scheduler.hpp @@ -100,7 +100,6 @@ class BOOST_COROSIO_DECL win_scheduler final void* iocp_; mutable long outstanding_work_; mutable long stopped_; - long shutdown_; long stop_event_posted_; mutable long dispatch_required_; @@ -162,7 +161,6 @@ inline win_scheduler::win_scheduler( : iocp_(nullptr) , outstanding_work_(0) , stopped_(0) - , shutdown_(0) , stop_event_posted_(0) , dispatch_required_(0) { @@ -194,12 +192,19 @@ inline win_scheduler::~win_scheduler() inline void win_scheduler::shutdown() { - ::InterlockedExchange(&shutdown_, 1); - if (timers_) timers_->stop(); - for (;;) + // Drain timer heap before the work-counting loop. The timer_service + // was registered after this scheduler (nested make_service from our + // constructor), so execution_context::shutdown() calls us first. + // Asio avoids this by owning timer queues directly inside the + // scheduler; we bridge the gap by shutting down the timer service + // early. The subsequent call from execution_context is a no-op. + if (timer_svc_) + timer_svc_->shutdown(); + + while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0) { op_queue ops; { @@ -207,38 +212,39 @@ win_scheduler::shutdown() ops.splice(completed_ops_); } - bool drained_any = false; - - while (auto* h = ops.pop()) + if (!ops.empty()) { - h->destroy(); - drained_any = true; - } - - DWORD bytes; - ULONG_PTR key; - LPOVERLAPPED overlapped; - ::GetQueuedCompletionStatus(iocp_, &bytes, &key, &overlapped, 0); - if (overlapped) - { - if (key == key_posted) + while (auto* h = ops.pop()) { - auto* op = reinterpret_cast(overlapped); - op->destroy(); + ::InterlockedDecrement(&outstanding_work_); + h->destroy(); } - else + } + else + { + DWORD bytes; + ULONG_PTR key; + LPOVERLAPPED overlapped; + ::GetQueuedCompletionStatus( + iocp_, &bytes, &key, &overlapped, + iocp::max_gqcs_timeout); + if (overlapped) { - auto* op = overlapped_to_op(overlapped); - op->destroy(); + ::InterlockedDecrement(&outstanding_work_); + if (key == key_posted) + { + auto* op = + reinterpret_cast(overlapped); + op->destroy(); + } + else + { + auto* op = overlapped_to_op(overlapped); + op->destroy(); + } } - drained_any = true; } - - if (!drained_any) - break; } - - ::InterlockedExchange(&outstanding_work_, 0); } inline void @@ -254,7 +260,28 @@ win_scheduler::post(std::coroutine_handle<> h) const auto* self = static_cast(base); if (!owner) { + // Shutdown path: destroy the coroutine frame synchronously. + // + // Bounded destruction invariant: the chain triggered by + // coro.destroy() is at most two levels deep: + // 1. task frame destroyed → ~io_awaitable_promise_base() + // destroys stored continuation (if != noop_coroutine) + // 2. continuation (trampoline) destroyed → final_suspend + // returns suspend_never, no further continuation + // + // If a future refactor adds deeper continuation chains, + // this would reintroduce re-entrant stack overflow risk. +#ifndef NDEBUG + static thread_local int destroy_depth = 0; + ++destroy_depth; + BOOST_COROSIO_ASSERT(destroy_depth <= 2); +#endif + auto coro = self->h_; delete self; + coro.destroy(); +#ifndef NDEBUG + --destroy_depth; +#endif return; } auto coro = self->h_; diff --git a/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp b/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp index f1e99bd0..3408b4dd 100644 --- a/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp +++ b/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp @@ -363,7 +363,6 @@ class BOOST_COROSIO_DECL kqueue_scheduler final mutable op_queue completed_ops_; mutable std::atomic outstanding_work_{0}; std::atomic stopped_{false}; - bool shutdown_ = false; // True while a thread is blocked in kevent(). Used by // wake_one_thread_and_unlock and work_finished to know when @@ -709,7 +708,6 @@ inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int) : kq_fd_(-1) , outstanding_work_(0) , stopped_(false) - , shutdown_(false) , task_running_(false) , task_interrupted_(false) , state_(0) @@ -764,7 +762,6 @@ kqueue_scheduler::shutdown() { { std::unique_lock lock(mutex_); - shutdown_ = true; while (auto* h = completed_ops_.pop()) { @@ -778,8 +775,6 @@ kqueue_scheduler::shutdown() signal_all(lock); } - outstanding_work_.store(0, std::memory_order_release); - if (kq_fd_ >= 0) interrupt_reactor(); } @@ -808,7 +803,9 @@ kqueue_scheduler::post(std::coroutine_handle<> h) const void destroy() override { + auto h = h_; delete this; + h.destroy(); } }; diff --git a/include/boost/corosio/native/detail/select/select_scheduler.hpp b/include/boost/corosio/native/detail/select/select_scheduler.hpp index 52d3bd69..823c7bfc 100644 --- a/include/boost/corosio/native/detail/select/select_scheduler.hpp +++ b/include/boost/corosio/native/detail/select/select_scheduler.hpp @@ -156,7 +156,6 @@ class BOOST_COROSIO_DECL select_scheduler final mutable op_queue completed_ops_; mutable std::atomic outstanding_work_; std::atomic stopped_; - bool shutdown_; // Per-fd state for tracking registered operations struct fd_state @@ -259,7 +258,6 @@ inline select_scheduler::select_scheduler(capy::execution_context& ctx, int) : pipe_fds_{-1, -1} , outstanding_work_(0) , stopped_(false) - , shutdown_(false) , max_fd_(-1) , reactor_running_(false) , reactor_interrupted_(false) @@ -325,7 +323,6 @@ select_scheduler::shutdown() { { std::unique_lock lock(mutex_); - shutdown_ = true; while (auto* h = completed_ops_.pop()) { @@ -337,8 +334,6 @@ select_scheduler::shutdown() } } - outstanding_work_.store(0, std::memory_order_release); - if (pipe_fds_[1] >= 0) interrupt_reactor(); @@ -365,7 +360,9 @@ select_scheduler::post(std::coroutine_handle<> h) const void destroy() override { + auto h = h_; delete this; + h.destroy(); } }; diff --git a/test/unit/io_context.cpp b/test/unit/io_context.cpp index 7b75ede8..0de60ef0 100644 --- a/test/unit/io_context.cpp +++ b/test/unit/io_context.cpp @@ -22,6 +22,7 @@ #include #include +#include "context.hpp" #include "test_suite.hpp" namespace boost::corosio { @@ -124,6 +125,45 @@ make_atomic_coro(std::atomic& counter) return c; } +// Coroutine whose promise destructor increments a counter. +// Both initial_suspend and final_suspend return suspend_always so the +// frame is only freed by an explicit .destroy() call. +struct destroy_counter_coro +{ + struct promise_type + { + int* counter_ = nullptr; + + destroy_counter_coro get_return_object() + { + return {std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_always final_suspend() noexcept { return {}; } + void return_void() {} + void unhandled_exception() { std::terminate(); } + + ~promise_type() + { + if (counter_) + ++(*counter_); + } + }; + + std::coroutine_handle h; + + operator std::coroutine_handle<>() const { return h; } +}; + +inline destroy_counter_coro +make_destroy_coro(int& counter) +{ + auto c = []() -> destroy_counter_coro { co_return; }(); + c.h.promise().counter_ = &counter; + return c; +} + // Coroutine that checks running_in_this_thread when resumed struct check_coro { @@ -513,6 +553,24 @@ struct io_context_test BOOST_TEST(finished); } + void testShutdownDestroysPostedCoroutineFrames() + { + int destroyed = 0; + + { + io_context ioc; + auto ex = ioc.get_executor(); + + ex.post(make_destroy_coro(destroyed)); + ex.post(make_destroy_coro(destroyed)); + ex.post(make_destroy_coro(destroyed)); + + // io_context destructor triggers shutdown + } + + BOOST_TEST_EQ(destroyed, 3); + } + void run() { testConstruction(); @@ -529,9 +587,38 @@ struct io_context_test testMultithreaded(); testMultithreadedStress(); testWhenAllSetEvent(); + testShutdownDestroysPostedCoroutineFrames(); } }; TEST_SUITE(io_context_test, "boost.corosio.io_context"); +// Backend-parameterized tests for shutdown paths that differ per backend +template +struct io_context_shutdown_test +{ + void testShutdownDestroysPostedCoroutineFrames() + { + int destroyed = 0; + + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + + ex.post(make_destroy_coro(destroyed)); + ex.post(make_destroy_coro(destroyed)); + ex.post(make_destroy_coro(destroyed)); + } + + BOOST_TEST_EQ(destroyed, 3); + } + + void run() + { + testShutdownDestroysPostedCoroutineFrames(); + } +}; + +COROSIO_BACKEND_TESTS(io_context_shutdown_test, "boost.corosio.io_context_shutdown") + } // namespace boost::corosio diff --git a/test/unit/native/iocp/iocp_shutdown.cpp b/test/unit/native/iocp/iocp_shutdown.cpp new file mode 100644 index 00000000..3b0255db --- /dev/null +++ b/test/unit/native/iocp/iocp_shutdown.cpp @@ -0,0 +1,157 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +#if BOOST_COROSIO_HAS_IOCP + +#include +#include +#include +#include + +#include "test_suite.hpp" + +namespace boost::corosio { + +// Test helper: exposes IOCP handle for direct posting. +struct iocp_test_context : native_io_context +{ + void* iocp_handle() + { + return static_cast(sched_)->native_handle(); + } +}; + +// Overlapped op that increments a counter when destroyed during shutdown. +struct test_overlapped_op : detail::overlapped_op +{ + int* destroyed_; + + static void do_complete( + void* owner, + detail::scheduler_op* base, + std::uint32_t, + std::uint32_t) + { + auto* self = static_cast(base); + if (!owner) + { + ++(*self->destroyed_); + delete self; + return; + } + delete self; + } + + explicit test_overlapped_op(int& destroyed) + : detail::overlapped_op(&do_complete) + , destroyed_(&destroyed) + { + } +}; + +struct iocp_shutdown_test +{ + // Shutdown drains I/O completions (key_io) from the IOCP. + // Covers the else branch of `if (key == key_posted)` in shutdown(). + void testShutdownDrainsIOCompletion() + { + int destroyed = 0; + + { + iocp_test_context ctx; + auto ex = ctx.get_executor(); + void* ioc = ctx.iocp_handle(); + + auto* op = new test_overlapped_op(destroyed); + + ex.on_work_started(); + + BOOL ok = ::PostQueuedCompletionStatus( + ioc, 0, detail::key_io, static_cast(op)); + BOOST_TEST(ok != 0); + } + + BOOST_TEST_EQ(destroyed, 1); + } + + // Shutdown drains key_result_stored completions from the IOCP. + void testShutdownDrainsResultStoredCompletion() + { + int destroyed = 0; + + { + iocp_test_context ctx; + auto ex = ctx.get_executor(); + void* ioc = ctx.iocp_handle(); + + auto* op = new test_overlapped_op(destroyed); + op->ready_ = 1; + op->dwError = 0; + op->bytes_transferred = 42; + + ex.on_work_started(); + + BOOL ok = ::PostQueuedCompletionStatus( + ioc, + 0, + detail::key_result_stored, + static_cast(op)); + BOOST_TEST(ok != 0); + } + + BOOST_TEST_EQ(destroyed, 1); + } + + // Shutdown drains multiple I/O completions with different keys. + void testShutdownDrainsMixedCompletions() + { + int io_destroyed = 0; + int stored_destroyed = 0; + + { + iocp_test_context ctx; + auto ex = ctx.get_executor(); + void* ioc = ctx.iocp_handle(); + + auto* io_op = new test_overlapped_op(io_destroyed); + ex.on_work_started(); + ::PostQueuedCompletionStatus( + ioc, 0, detail::key_io, static_cast(io_op)); + + auto* stored_op = new test_overlapped_op(stored_destroyed); + stored_op->ready_ = 1; + stored_op->dwError = 0; + stored_op->bytes_transferred = 0; + ex.on_work_started(); + ::PostQueuedCompletionStatus( + ioc, + 0, + detail::key_result_stored, + static_cast(stored_op)); + } + + BOOST_TEST_EQ(io_destroyed, 1); + BOOST_TEST_EQ(stored_destroyed, 1); + } + + void run() + { + testShutdownDrainsIOCompletion(); + testShutdownDrainsResultStoredCompletion(); + testShutdownDrainsMixedCompletions(); + } +}; + +TEST_SUITE(iocp_shutdown_test, "boost.corosio.native.iocp_shutdown"); + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_HAS_IOCP diff --git a/test/unit/timer.cpp b/test/unit/timer.cpp index 9a686539..0e6bc9c7 100644 --- a/test/unit/timer.cpp +++ b/test/unit/timer.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include "context.hpp" @@ -345,7 +346,7 @@ struct timer_test std::error_code result_ec; t.expires_after(std::chrono::seconds(60)); - delay_timer.expires_after(std::chrono::milliseconds(10)); + delay_timer.expires_after(std::chrono::milliseconds(50)); auto wait_task = [](timer& t_ref, std::error_code& ec_out, bool& done_out) -> capy::task<> { @@ -361,7 +362,7 @@ struct timer_test }; capy::run_async(ioc.get_executor())(delay_task(delay_timer, t)); - ioc.run_for(std::chrono::milliseconds(100)); + ioc.run_for(std::chrono::milliseconds(500)); BOOST_TEST(completed); BOOST_TEST(result_ec == capy::cond::canceled); } @@ -918,6 +919,119 @@ struct timer_test BOOST_TEST(!captured_ec); } + // Shutdown cleanup + + void testShutdownDestroysTimerWaiters() + { + bool started = false; + bool destroyed = false; + + { + io_context ioc(Backend); + timer t(ioc); + t.expires_after(std::chrono::seconds(3600)); + + auto task = [](timer& t_ref, bool& started_flag, + bool& destroyed_flag) -> capy::task<> { + struct guard + { + bool& flag_; + ~guard() { flag_ = true; } + }; + guard g{destroyed_flag}; + started_flag = true; + auto [ec] = co_await t_ref.wait(); + (void)ec; + }; + + capy::run_async(ioc.get_executor())(task(t, started, destroyed)); + ioc.poll(); + + BOOST_TEST(started); + // io_context destructor triggers shutdown + } + + BOOST_TEST(destroyed); + } + + void testShutdownDrainsHeapWaiters() + { + // Exercises timer_service::shutdown()'s waiter drain loop. + // Normally the timer destructs before io_context, cancelling + // waiters via cancel_timer(). Here we use placement-new so the + // timer outlives io_context — its destructor is skipped, leaving + // waiters in the heap for shutdown() to drain. + int destroyed = 0; + + { + io_context ioc(Backend); + + alignas(timer) unsigned char buf[sizeof(timer)]; + auto* t = new (buf) timer(ioc); + t->expires_after(std::chrono::seconds(3600)); + + auto task = [](timer& t_ref, int& counter) -> capy::task<> { + struct guard + { + int& c_; + ~guard() { ++c_; } + }; + guard g{counter}; + auto [ec] = co_await t_ref.wait(); + (void)ec; + }; + + capy::run_async(ioc.get_executor())(task(*t, destroyed)); + ioc.poll(); + + // io_context destructs. Timer t is still alive in buf. + // timer_service::shutdown() finds the waiter in the heap + // and drains it, destroying the coroutine frame. + // Timer destructor is intentionally skipped (placement-new). + } + + BOOST_TEST_EQ(destroyed, 1); + } + + void testAbruptStopWithPendingTimerOps() + { + bool waiter_started = false; + + { + io_context ioc(Backend); + timer t1(ioc); + timer t2(ioc); + timer t3(ioc); + + t1.expires_after(std::chrono::hours(1)); + t2.expires_after(std::chrono::hours(1)); + t3.expires_after(std::chrono::hours(1)); + + auto waiter = [](timer& t, bool& started) -> capy::task<> { + started = true; + auto [ec] = co_await t.wait(); + (void)ec; + }; + + auto stopper = [](io_context& ctx) -> capy::task<> { + ctx.stop(); + co_return; + }; + + capy::run_async(ioc.get_executor())(waiter(t1, waiter_started)); + capy::run_async(ioc.get_executor())(waiter(t2, waiter_started)); + capy::run_async(ioc.get_executor())(waiter(t3, waiter_started)); + capy::run_async(ioc.get_executor())(stopper(ioc)); + + ioc.run(); + + BOOST_TEST(waiter_started); + // io_context destructs with 3 pending timer waiters + } + // Shutdown completes without hanging + BOOST_TEST_PASS(); + } + // Edge cases void testLongDuration() @@ -1032,6 +1146,11 @@ struct timer_test testIoResultCanceled(); testIoResultStructuredBinding(); + // Shutdown cleanup + testShutdownDestroysTimerWaiters(); + testShutdownDrainsHeapWaiters(); + testAbruptStopWithPendingTimerOps(); + // Edge cases testLongDuration(); testNegativeDuration();