-
Notifications
You must be signed in to change notification settings - Fork 8
feat: dial again to disconnected peers #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
32dc1ca
8af9634
ec50c50
ce23083
fa10f47
c0e0d37
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,12 @@ pub use metrics::populate_name_registry; | |
| const MAX_FETCH_RETRIES: u32 = 5; | ||
| const INITIAL_BACKOFF_MS: u64 = 10; | ||
| const BACKOFF_MULTIPLIER: u64 = 4; | ||
| const PEER_REDIAL_INTERVAL_SECS: u64 = 12; | ||
|
|
||
| enum RetryMessage { | ||
| BlockFetch(H256), | ||
| PeerRedial(PeerId), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. General CommentsThe code snippet provided is limited, focusing primarily on peer disconnection handling in a P2P context. The changes suggest minor formatting adjustments, with no direct functional impact visible in this snippet. Code Correctness and Bugs
Security
Performance
Rust Best Practices
Memory Safety and Error Handling
Code Readability and Maintainability
Consensus-Layer ConsiderationsThis snippet does not reveal specific consensus-layer logic, impairing the ability to critique areas like fork choice or state transition functions directly. However, ensure across the codebase that:
ConclusionWhile this snippet looks good in terms of style and doesn't break functionality, please review the codebase for the points mentioned above, especially from security and consensus-layer perspectives. Given the limited scope presented, the full PR review context is necessary. Final NoteIf the changes are isolated to formatting or minor style adjustments without functionality alteration, consider this review as active acceptance contingent upon ensuring broader systemic checks across the lines of code implicated in the PR. |
||
| } | ||
|
|
||
| pub(crate) struct PendingRequest { | ||
| pub(crate) attempts: u32, | ||
|
|
@@ -122,6 +128,7 @@ pub async fn start_p2p( | |
| }) | ||
| .build(); | ||
| let local_peer_id = *swarm.local_peer_id(); | ||
| let mut bootnode_addrs = HashMap::new(); | ||
| for bootnode in bootnodes { | ||
| let peer_id = PeerId::from_public_key(&bootnode.public_key); | ||
| if peer_id == local_peer_id { | ||
|
|
@@ -133,6 +140,7 @@ pub async fn start_p2p( | |
| .with(Protocol::QuicV1) | ||
| .with_p2p(peer_id) | ||
| .expect("failed to add peer ID to multiaddr"); | ||
| bootnode_addrs.insert(peer_id, addr.clone()); | ||
| swarm.dial(addr).unwrap(); | ||
| } | ||
| let addr = Multiaddr::empty() | ||
|
|
@@ -159,7 +167,7 @@ pub async fn start_p2p( | |
| "/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy" | ||
| )); | ||
|
|
||
| info!("P2P node started on {listening_socket}"); | ||
| info!(socket=%listening_socket, "P2P node started"); | ||
|
|
||
| let (retry_tx, retry_rx) = mpsc::unbounded_channel(); | ||
|
|
||
|
|
@@ -173,6 +181,7 @@ pub async fn start_p2p( | |
| connected_peers: HashSet::new(), | ||
| pending_requests: HashMap::new(), | ||
| request_id_map: HashMap::new(), | ||
| bootnode_addrs, | ||
| retry_tx, | ||
| retry_rx, | ||
| }; | ||
|
|
@@ -197,8 +206,11 @@ pub(crate) struct P2PServer { | |
| pub(crate) connected_peers: HashSet<PeerId>, | ||
| pub(crate) pending_requests: HashMap<ethlambda_types::primitives::H256, PendingRequest>, | ||
| pub(crate) request_id_map: HashMap<OutboundRequestId, ethlambda_types::primitives::H256>, | ||
| retry_tx: mpsc::UnboundedSender<ethlambda_types::primitives::H256>, | ||
| retry_rx: mpsc::UnboundedReceiver<ethlambda_types::primitives::H256>, | ||
| /// Bootnode addresses for redialing when disconnected | ||
| bootnode_addrs: HashMap<PeerId, Multiaddr>, | ||
| /// Channel for scheduling retries (block fetches and peer redials) | ||
| pub(crate) retry_tx: mpsc::UnboundedSender<RetryMessage>, | ||
| retry_rx: mpsc::UnboundedReceiver<RetryMessage>, | ||
| } | ||
|
|
||
| /// Event loop for the P2P crate. | ||
|
|
@@ -220,8 +232,11 @@ async fn event_loop(mut server: P2PServer) { | |
| }; | ||
| handle_swarm_event(&mut server, event).await; | ||
| } | ||
| Some(root) = server.retry_rx.recv() => { | ||
| handle_retry(&mut server, root).await; | ||
| Some(msg) = server.retry_rx.recv() => { | ||
| match msg { | ||
| RetryMessage::BlockFetch(root) => handle_retry(&mut server, root).await, | ||
| RetryMessage::PeerRedial(peer_id) => handle_peer_redial(&mut server, peer_id).await, | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -302,13 +317,20 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent<BehaviourE | |
| server.connected_peers.remove(&peer_id); | ||
| let peer_count = server.connected_peers.len(); | ||
| metrics::notify_peer_disconnected(&Some(peer_id), direction, reason); | ||
|
|
||
| info!( | ||
| %peer_id, | ||
| %direction, | ||
| %reason, | ||
| peer_count, | ||
| "Peer disconnected" | ||
| ); | ||
|
|
||
| // Schedule redial if this is a bootnode | ||
| if server.bootnode_addrs.contains_key(&peer_id) { | ||
| schedule_peer_redial(server.retry_tx.clone(), peer_id); | ||
| info!(%peer_id, "Scheduled bootnode redial in {}s", PEER_REDIAL_INTERVAL_SECS); | ||
| } | ||
| } else { | ||
| info!(%peer_id, %direction, %reason, "Peer connection closed but other connections remain"); | ||
| } | ||
|
|
@@ -321,6 +343,15 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent<BehaviourE | |
| }; | ||
| metrics::notify_peer_connected(&peer_id, "outbound", result); | ||
| warn!(?peer_id, %error, "Outgoing connection error"); | ||
|
|
||
| // Schedule redial if this was a bootnode | ||
| if let Some(pid) = peer_id | ||
| && server.bootnode_addrs.contains_key(&pid) | ||
| && !server.connected_peers.contains(&pid) | ||
| { | ||
| schedule_peer_redial(server.retry_tx.clone(), pid); | ||
| info!(%pid, "Scheduled bootnode redial after connection error"); | ||
| } | ||
| } | ||
| SwarmEvent::IncomingConnectionError { peer_id, error, .. } => { | ||
| metrics::notify_peer_connected(&peer_id, "inbound", "error"); | ||
|
|
@@ -369,6 +400,33 @@ async fn handle_retry(server: &mut P2PServer, root: H256) { | |
| } | ||
| } | ||
|
|
||
| async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) { | ||
| // Skip if already reconnected | ||
| if server.connected_peers.contains(&peer_id) { | ||
| trace!(%peer_id, "Bootnode reconnected during redial delay, skipping"); | ||
| return; | ||
| } | ||
|
|
||
| if let Some(addr) = server.bootnode_addrs.get(&peer_id) { | ||
| info!(%peer_id, "Redialing disconnected bootnode"); | ||
| // NOTE: this dial does some checks and adds a pending outbound connection attempt. | ||
| // It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event. | ||
| if let Err(e) = server.swarm.dial(addr.clone()) { | ||
| warn!(%peer_id, %e, "Failed to redial bootnode, will retry"); | ||
| // Schedule another redial attempt | ||
| schedule_peer_redial(server.retry_tx.clone(), peer_id); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Schedules a peer redial after the configured delay interval. | ||
| pub(crate) fn schedule_peer_redial(retry_tx: mpsc::UnboundedSender<RetryMessage>, peer_id: PeerId) { | ||
| tokio::spawn(async move { | ||
| tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await; | ||
| let _ = retry_tx.send(RetryMessage::PeerRedial(peer_id)); | ||
| }); | ||
| } | ||
|
|
||
| pub struct Bootnode { | ||
| ip: IpAddr, | ||
| quic_port: u16, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code Correctness and Potential Bugs
Security Vulnerabilities
Performance Implications
Rust Best Practices and Idiomatic Patterns
Memory Safety and Proper Error Handling
Code Readability and Maintainability
ConclusionThe PR introduces functionality that could potentially enhance the p2p bootnode management but falls short in a few key areas including correctness, security, and maintainability. Address the noted concerns to make the codebase more robust and efficient. |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,10 @@ use super::{ | |
| BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, | ||
| ResponseResult, Status, | ||
| }; | ||
| use crate::{BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest}; | ||
| use crate::{ | ||
| BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, | ||
| RetryMessage, | ||
| }; | ||
|
|
||
| pub async fn handle_req_resp_message( | ||
| server: &mut P2PServer, | ||
|
|
@@ -226,6 +229,6 @@ async fn handle_fetch_failure( | |
| let retry_tx = server.retry_tx.clone(); | ||
| tokio::spawn(async move { | ||
| tokio::time::sleep(backoff).await; | ||
| let _ = retry_tx.send(root); | ||
| let _ = retry_tx.send(RetryMessage::BlockFetch(root)); | ||
| }); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code Review Comments1. Code Correctness and Potential Bugs
2. Security Vulnerabilities
3. Performance Implications
4. Rust Best Practices and Idiomatic Patterns
5. Memory Safety and Proper Error Handling
6. Code Readability and Maintainability
Consensus-Layer Considerations
Overall, the updated code segment indicates progress towards better granularity in message handling but requires careful validation of its integration with existing processing mechanisms, especially the handling and receiving of |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Correctness and Potential Bugs
dialoperation is non-blocking and that a failed attempt will trigger anOutgoingConnectionError. Ensure that theOutgoingConnectionErrorevent is properly handled downstream to prevent any missed reconnection attempts. Without this, peers might stay disconnected longer than desirable.Security Vulnerabilities
Performance Implications
Rust Best Practices and Idiomatic Patterns
if let Err(e) =is idiomatic for handling possible errors. Make sure that thewarn!logging is not suppressed or truncated in production, as this error information might be crucial for debugging.Memory Safety and Proper Error Handling
warn!is good for logging non-fatal errors. Consider augmenting with telemetry if available, to provide access to historical patterns of errors.Code Readability and Maintainability
OutgoingConnectionErroris properly handled.Consensus Layer Considerations
Overall, the change in this code snippet looks reasonable with respect to the networking operation being performed. However, as network operations can be integral to consensus operation stability, ensuring robustness and security remains critical. Address the handling of
OutgoingConnectionErrorand incorporate secure communication practices if not already present in your codebase.