Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ console_bs_SOURCES = \
console/executor_options.cpp \
console/executor_runner.cpp \
console/executor_scans.cpp \
console/executor_signals.cpp \
console/executor_store.cpp \
console/executor_test_reader.cpp \
console/executor_test_writer.cpp \
Expand Down
1 change: 1 addition & 0 deletions builds/cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ if (with-console)
"../../console/executor_options.cpp"
"../../console/executor_runner.cpp"
"../../console/executor_scans.cpp"
"../../console/executor_signals.cpp"
"../../console/executor_store.cpp"
"../../console/executor_test_reader.cpp"
"../../console/executor_test_writer.cpp"
Expand Down
1 change: 1 addition & 0 deletions builds/msvc/vs2022/bs/bs.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
<ClCompile Include="..\..\..\..\console\executor_options.cpp" />
<ClCompile Include="..\..\..\..\console\executor_runner.cpp" />
<ClCompile Include="..\..\..\..\console\executor_scans.cpp" />
<ClCompile Include="..\..\..\..\console\executor_signals.cpp" />
<ClCompile Include="..\..\..\..\console\executor_store.cpp" />
<ClCompile Include="..\..\..\..\console\executor_test_reader.cpp" />
<ClCompile Include="..\..\..\..\console\executor_test_writer.cpp" />
Expand Down
3 changes: 3 additions & 0 deletions builds/msvc/vs2022/bs/bs.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
<ClCompile Include="..\..\..\..\console\executor_scans.cpp">
<Filter>src</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\console\executor_signals.cpp">
<Filter>src</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\console\executor_store.cpp">
<Filter>src</Filter>
</ClCompile>
Expand Down
125 changes: 26 additions & 99 deletions console/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@

#include <atomic>
#include <chrono>
#include <csignal>
#include <future>
#include <thread>
#include <optional>

namespace libbitcoin {
namespace server {
Expand All @@ -32,12 +31,24 @@ using boost::format;
using namespace system;
using namespace std::placeholders;

// Construction.
// ----------------------------------------------------------------------------

// static initializers.
std::promise<bool> executor::stopped_{};
std::promise<bool> executor::stopping_{};
std::atomic<bool> executor::initialized_{};
std::atomic<int> executor::signal_{ unsignalled };
std::unique_ptr<std::thread> executor::stop_poller_{};
std::optional<std::thread> executor::poller_thread_{};

// class factory (singleton)
executor& executor::factory(parser& metadata, std::istream& input,
std::ostream& output, std::ostream& error)
{
static executor instance(metadata, input, output, error);
return instance;
}

// private constructor (singleton)
executor::executor(parser& metadata, std::istream& input, std::ostream& output,
std::ostream&)
: metadata_(metadata),
Expand All @@ -60,103 +71,35 @@ executor::executor(parser& metadata, std::istream& input, std::ostream& output,
}
{
initialize_stop();

#if defined(HAVE_MSC)
create_hidden_window();
#endif
}

//
executor::~executor()
{
#if defined(HAVE_MSC)
destroy_hidden_window();
#endif

uninitialize_stop();
}

// Stop signal.
// ----------------------------------------------------------------------------
// static

#if defined(HAVE_MSC)
BOOL WINAPI executor::control_handler(DWORD signal)
{
switch (signal)
{
// Keyboard events. These prevent exit altogether when TRUE returned.
// handle_stop(signal) therefore shuts down gracefully/completely.
case CTRL_C_EVENT:
case CTRL_BREAK_EVENT:

// A signal that the system sends to all processes attached to a
// console when the user closes the console (by clicking Close on the
// console window's window menu). Returning TRUE here does not
// materially delay exit, so aside from capture this is a noop.
case CTRL_CLOSE_EVENT:
executor::handle_stop(possible_narrow_sign_cast<int>(signal));
return TRUE;

////// Only services receive this (*any* user is logging off).
////case CTRL_LOGOFF_EVENT:
////// Only services receive this (all users already logged off).
////case CTRL_SHUTDOWN_EVENT:
default:
return FALSE;
}
}
#endif

void executor::initialize_stop()
{
BC_ASSERT(!initialized_);
initialized_ = true;

poll_for_stopping();

#if defined(HAVE_MSC)
::SetConsoleCtrlHandler(&executor::control_handler, TRUE);
#else
// struct keywork avoids name conflict with posix function sigaction.
struct sigaction action{};

// Restart interrupted system calls.
action.sa_flags = SA_RESTART;

// sa_handler is actually a macro :o
action.sa_handler = handle_stop;

// Set masking.
sigemptyset(&action.sa_mask);

// Block during handling to prevent reentrancy.
sigaddset(&action.sa_mask, SIGINT);
sigaddset(&action.sa_mask, SIGTERM);
sigaddset(&action.sa_mask, SIGHUP);
sigaddset(&action.sa_mask, SIGUSR2);

sigaction(SIGINT, &action, nullptr);
sigaction(SIGTERM, &action, nullptr);
sigaction(SIGHUP, &action, nullptr);
sigaction(SIGUSR2, &action, nullptr);

#if defined(HAVE_LINUX)
sigaction(SIGPWR, &action, nullptr);
#endif
#endif
create_hidden_window();
set_signal_handlers();
}

void executor::uninitialize_stop()
{
BC_ASSERT(initialized_);
initialized_ = false;

stop();
if (stop_poller_ && stop_poller_->joinable())
if (poller_thread_.has_value() && poller_thread_.value().joinable())
{
stop_poller_->join();
stop_poller_.reset();
poller_thread_.value().join();
poller_thread_.reset();
}

destroy_hidden_window();
}

// Handle the stop signal and invoke stop method (requries signal safe code).
Expand Down Expand Up @@ -192,13 +135,13 @@ bool executor::canceled()
// Spinning must be used in signal handler, cannot wait on a promise.
void executor::poll_for_stopping()
{
stop_poller_ = std::make_unique<std::thread>([]()
poller_thread_.emplace(std::thread([]()
{
while (!canceled())
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::sleep_for(std::chrono::milliseconds(100));

stopping_.set_value(true);
});
}));
}

// Blocks until stopping is signalled by poller.
Expand All @@ -207,22 +150,6 @@ void executor::wait_for_stopping()
stopping_.get_future().wait();
}

// Suspend verbose logging and log the stop signal.
void executor::log_stopping()
{
const auto signal = signal_.load();
if (signal == signal_none)
return;

// A high level of consolve logging can obscure and delay stop.
toggle_.at(network::levels::protocol) = false;
toggle_.at(network::levels::verbose) = false;
toggle_.at(network::levels::proxy) = false;

logger(format(BS_NODE_INTERRUPTED) % signal);
logger(BS_NETWORK_STOPPING);
}

// Event handlers.
// ----------------------------------------------------------------------------

Expand Down
83 changes: 44 additions & 39 deletions console/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <atomic>
#include <future>
#include <optional>
#include <thread>
#include <unordered_map>

Expand All @@ -34,47 +35,26 @@ namespace server {
// This class is just an ad-hoc user interface wrapper on the node.
class executor
{
public:
DELETE_COPY(executor);

// Construct.
private:
// Singleton, private constructor.
executor(parser& metadata, std::istream&, std::ostream& output,
std::ostream& error);

// Clean up.
~executor();
public:
DELETE_COPY_MOVE(executor);

// Get the singleton instance.
static executor& factory(parser& metadata, std::istream& input,
std::ostream& output, std::ostream& error);

// Called from main.
bool dispatch();

private:
static constexpr int unsignalled{ -1 };
static constexpr int signal_none{ -2 };

// Executor (static).
static void initialize_stop();
static void uninitialize_stop();
static void poll_for_stopping();
static void wait_for_stopping();
static void handle_stop(int code);
static void stop(int signal=signal_none);
static bool canceled();

#if defined(HAVE_MSC)
static BOOL WINAPI control_handler(DWORD signal);
static LRESULT CALLBACK window_proc(HWND handle, UINT message,
WPARAM wparam, LPARAM lparam);

void create_hidden_window();
void destroy_hidden_window();

HWND window_{};
std::thread thread_{};
std::promise<bool> ready_{};
#endif
// Release shutdown monitor.
~executor();

private:
// Executor.
void log_stopping();
void handle_started(const system::code& ec);
void handle_subscribed(const system::code& ec, size_t key);
void handle_running(const system::code& ec);
Expand Down Expand Up @@ -148,6 +128,7 @@ class executor
void subscribe_events(std::ostream& sink);
void logger(const boost::format& message) const;
void logger(const std::string& message) const;
void log_stopping();

// Runner.
void stopper(const std::string& message);
Expand All @@ -171,13 +152,6 @@ class executor
// Runtime events.
static const std::unordered_map<uint8_t, std::string> fired_;

// Shutdown.
static std::atomic<int> signal_;
static std::atomic<bool> initialized_;
static std::promise<bool> stopping_;
static std::unique_ptr<std::thread> stop_poller_;
std::promise<bool> log_suspended_{};

parser& metadata_;
server_node::ptr node_{};
server_node::store store_;
Expand All @@ -189,6 +163,37 @@ class executor
network::logger log_{};
network::capture capture_{ input_, close_ };
std_array<std::atomic_bool, add1(network::levels::verbose)> toggle_;

// Shutdown.
// ------------------------------------------------------------------------

static constexpr int unsignalled{ -1 };
static constexpr int signal_none{ -2 };

static std::atomic<int> signal_;
static std::promise<bool> stopped_;
static std::promise<bool> stopping_;
static std::optional<std::thread> poller_thread_;

static void initialize_stop();
static void uninitialize_stop();
static void poll_for_stopping();
static void wait_for_stopping();
static void set_signal_handlers();
static void create_hidden_window();
static void destroy_hidden_window();
static void stop(int signal=signal_none);
static void handle_stop(int code);
static bool canceled();

#if defined(HAVE_MSC)
static HWND window_handle_;
static std::promise<bool> window_ready_;
static std::optional<std::thread> window_thread_;
static BOOL WINAPI control_handler(DWORD signal);
static LRESULT CALLBACK window_proc(HWND handle, UINT message,
WPARAM wparam, LPARAM lparam);
#endif
};

} // namespace server
Expand Down
18 changes: 17 additions & 1 deletion console/executor_logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void executor::subscribe_log(std::ostream& sink)
// only after all logging work is completed. After the above
// message is queued, no more are accepted, and the executor
// may initiate its own (and thereby this log's) destruction.
log_suspended_.set_value(true);
stopped_.set_value(true);
return false;
}
else
Expand All @@ -105,6 +105,22 @@ void executor::subscribe_log(std::ostream& sink)
);
}

// Suspend verbose logging and log the stop signal.
void executor::log_stopping()
{
const auto signal = signal_.load();
if (signal == signal_none)
return;

// A high level of consolve logging can obscure and delay stop.
toggle_.at(network::levels::protocol) = false;
toggle_.at(network::levels::verbose) = false;
toggle_.at(network::levels::proxy) = false;

logger(boost::format(BS_NODE_INTERRUPTED) % signal);
logger(BS_NETWORK_STOPPING);
}

void executor::logger(const std::string& message) const
{
if (log_.stopped())
Expand Down
2 changes: 1 addition & 1 deletion console/executor_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void executor::stopper(const std::string& message)
log_.stop(message,levels::application);

// Suspend process termination until final message is buffered.
log_suspended_.get_future().wait();
stopped_.get_future().wait();
}

void executor::subscribe_connect()
Expand Down
Loading
Loading