Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/bitcoin/node/define.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <utility>
#include <variant>

/// Pulls in common /node headers (excluding settings/config/parser/full_node).
/// Pulls in common /node headers (excluding settings/config/full_node).
#include <bitcoin/node/chase.hpp>

/// Now we use the generic helper definitions above to define BCN_API
Expand Down
33 changes: 30 additions & 3 deletions include/bitcoin/node/protocols/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ namespace libbitcoin {
namespace node {

/// Abstract base for node protocols, thread safe.
/// This node::protocol is not derived from network::protocol, but given the
/// channel constructor parameter is derived from network::channel, the strand
/// is accessible despite lack of bind/post/parallel templates access. This
/// allows event subscription by derived protocols without the need to derive
/// from protocol_peer (which would prevent derivation from service protocols).
class BCN_API protocol
{
protected:
Expand All @@ -42,8 +47,7 @@ class BCN_API protocol
// reinterpret_pointer_cast because channel is abstract.
inline protocol(const auto& session,
const network::channel::ptr& channel) NOEXCEPT
: channel_(std::reinterpret_pointer_cast<node::channel>(channel)),
session_(session)
: channel_(channel), session_(session)
{
}

Expand All @@ -63,12 +67,35 @@ class BCN_API protocol
/// The candidate|confirmed chain is current.
virtual bool is_current(bool confirmed) const NOEXCEPT;

/// Events subscription.
/// -----------------------------------------------------------------------

/// Subscribe to chaser events (max one active per protocol).
virtual void subscribe_events(event_notifier&& handler) NOEXCEPT;

/// Override to handle subscription completion (stranded).
virtual void subscribed(const code& ec, object_key key) NOEXCEPT;

/// Unsubscribe from chaser events.
/// Subscribing protocol must invoke from overridden stopping().
virtual void unsubscribe_events() NOEXCEPT;

/// Get the subscription key (for notify_one).
virtual object_key events_key() const NOEXCEPT;

private:
void handle_subscribed(const code& ec, object_key key) NOEXCEPT;
void handle_subscribe(const code& ec, object_key key,
const event_completer& complete) NOEXCEPT;

// This channel requires stranded calls, base is thread safe.
const node::channel::ptr channel_;
const network::channel::ptr channel_;

// This is thread safe.
const node::session::ptr session_;

// This is protected by singular subscription.
object_key key_{};
};

} // namespace node
Expand Down
23 changes: 0 additions & 23 deletions include/bitcoin/node/protocols/protocol_peer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,35 +102,12 @@ class BCN_API protocol_peer
virtual void notify_one(object_key key, const code& ec, chase event_,
event_value value) const NOEXCEPT;

/// Events subscription.
/// -----------------------------------------------------------------------

/// Subscribe to chaser events (max one active per protocol).
virtual void subscribe_events(event_notifier&& handler) NOEXCEPT;

/// Override to handle subscription completion (stranded).
virtual void subscribed(const code& ec, object_key key) NOEXCEPT;

/// Unsubscribe from chaser events.
/// Subscribing protocol must invoke from overridden stopping().
virtual void unsubscribe_events() NOEXCEPT;

/// Get the subscription key (for notify_one).
virtual object_key events_key() const NOEXCEPT;

private:
void handle_subscribed(const code& ec, object_key key) NOEXCEPT;
void handle_subscribe(const code& ec, object_key key,
const event_completer& complete) NOEXCEPT;

// This derived channel requires stranded calls, base is thread safe.
const node::channel_peer::ptr channel_;

// This is thread safe.
const node::session::ptr session_;

// This is protected by singular subscription.
object_key key_{};
};

} // namespace node
Expand Down
2 changes: 1 addition & 1 deletion src/chasers/chaser_confirm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ bool chaser_confirm::complete_block(const code& ec, const header_link& link,
}

// CONFIRMABLE BLOCK
notify(error::success, chase::confirmable, height);
notify(error::success, chase::confirmable, link);
fire(events::block_confirmed, height);
LOGV("Block confirmable: " << height << (bypass ? " (bypass)" : ""));
return true;
Expand Down
6 changes: 3 additions & 3 deletions src/chasers/chaser_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ bool chaser_snapshot::handle_event(const code&, chase event_,
//// if (!enabled_confirm_ || ec)
//// break;
////
//// BC_ASSERT(std::holds_alternative<height_t>(value));
//// POST(do_confirm, std::get<height_t>(value));
//// BC_ASSERT(std::holds_alternative<header_t>(value));
//// POST(do_confirm, std::get<header_t>(value));
//// break;
////}
case chase::block:
Expand Down Expand Up @@ -207,7 +207,7 @@ void chaser_snapshot::do_snap(size_t height) NOEXCEPT
//// take_snapshot(height);
////}
////
////void chaser_snapshot::do_confirm(size_t height) NOEXCEPT
////void chaser_snapshot::do_confirm(header_t link) NOEXCEPT
////{
//// BC_ASSERT(stranded());
//// if (closed() || !update_confirm(height))
Expand Down
70 changes: 70 additions & 0 deletions src/protocols/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
namespace libbitcoin {
namespace node {

#define CLASS protocol

using namespace std::placeholders;

// Properties.
// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -62,5 +66,71 @@ bool protocol::is_current(bool confirmed) const NOEXCEPT
return session_->is_current(confirmed);
}

// Events subscription.
// ----------------------------------------------------------------------------

void protocol::subscribe_events(event_notifier&& handler) NOEXCEPT
{
// This is a shared instance multiply-derived from network::protocol.
const auto self = dynamic_cast<network::protocol&>(*this)
.shared_from_sibling<node::protocol, network::protocol>();

event_completer completer = std::bind(&protocol::handle_subscribed, self,
_1, _2);

session_->subscribe_events(std::move(handler),
std::bind(&protocol::handle_subscribe,
self, _1, _2, std::move(completer)));
}

// private
void protocol::handle_subscribe(const code& ec, object_key key,
const event_completer& complete) NOEXCEPT
{
// The key member is protected by one event subscription per protocol.
BC_ASSERT_MSG(is_zero(key_), "unsafe access");

// Protocol stop is thread safe.
if (ec)
{
channel_->stop(ec);
return;
}

key_ = key;
complete(ec, key_);
}

void protocol::handle_subscribed(const code& ec, object_key key) NOEXCEPT
{
// This is a shared instance multiply-derived from network::protocol.
const auto self = dynamic_cast<network::protocol&>(*this)
.shared_from_sibling<node::protocol, network::protocol>();

boost::asio::post(channel_->strand(),
std::bind(&protocol::subscribed, self, ec, key));
}

void protocol::subscribed(const code& ec, object_key) NOEXCEPT
{
BC_ASSERT(channel_->stranded());

// Unsubscriber race is ok.
if (channel_->stopped() || ec)
unsubscribe_events();
}

// As this has no completion handler resubscription is not allowed.
void protocol::unsubscribe_events() NOEXCEPT
{
session_->unsubscribe_events(key_);
key_ = {};
}

object_key protocol::events_key() const NOEXCEPT
{
return key_;
}

} // namespace node
} // namespace libbitcoin
59 changes: 1 addition & 58 deletions src/protocols/protocol_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
namespace libbitcoin {
namespace node {

#define CLASS protocol_peer

using namespace system;
using namespace network;
using namespace std::placeholders;

// Organizers.
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -62,7 +59,7 @@ void protocol_peer::performance(uint64_t speed,
network::result_handler&& handler) const NOEXCEPT
{
// Passed protocol->session->full_node->check_chaser.post->do_update.
session_->performance(key_, speed, std::move(handler));
session_->performance(events_key(), speed, std::move(handler));
}

code protocol_peer::fault(const code& ec) NOEXCEPT
Expand Down Expand Up @@ -103,59 +100,5 @@ void protocol_peer::notify_one(object_key key, const code& ec, chase event_,
session_->notify_one(key, ec, event_, value);
}

// Events subscription.
// ----------------------------------------------------------------------------

void protocol_peer::subscribe_events(event_notifier&& handler) NOEXCEPT
{
event_completer completer = BIND(handle_subscribed, _1, _2);
session_->subscribe_events(std::move(handler),
BIND(handle_subscribe, _1, _2, std::move(completer)));
}

// private
void protocol_peer::handle_subscribe(const code& ec, object_key key,
const event_completer& complete) NOEXCEPT
{
// The key member is protected by one event subscription per protocol.
BC_ASSERT_MSG(is_zero(key_), "unsafe access");

// Protocol stop is thread safe.
if (ec)
{
stop(ec);
return;
}

key_ = key;
complete(ec, key_);
}

void protocol_peer::handle_subscribed(const code& ec, object_key key) NOEXCEPT
{
POST(subscribed, ec, key);
}

void protocol_peer::subscribed(const code& ec, object_key) NOEXCEPT
{
BC_ASSERT(stranded());

// Unsubscriber race is ok.
if (stopped(ec))
unsubscribe_events();
}

// As this has no completion handler resubscription is not allowed.
void protocol_peer::unsubscribe_events() NOEXCEPT
{
session_->unsubscribe_events(key_);
key_ = {};
}

object_key protocol_peer::events_key() const NOEXCEPT
{
return key_;
}

} // namespace node
} // namespace libbitcoin
Loading