Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ pub use metrics::populate_name_registry;
const MAX_FETCH_RETRIES: u32 = 5;
const INITIAL_BACKOFF_MS: u64 = 10;
const BACKOFF_MULTIPLIER: u64 = 4;
const PEER_REDIAL_INTERVAL_SECS: u64 = 12;

enum RetryMessage {
BlockFetch(H256),
PeerRedial(PeerId),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Correctness and Potential Bugs

  • Lines 2-7: The comment confirms that the dial operation is non-blocking and that a failed attempt will trigger an OutgoingConnectionError. Ensure that the OutgoingConnectionError event is properly handled downstream to prevent any missed reconnection attempts. Without this, peers might stay disconnected longer than desirable.

Security Vulnerabilities

  • General Note: Ensuring that peer reconnections are secure is crucial. Potential points of concern include man-in-the-middle attacks during the redial process. Ensure that TLS or another encryption layer is implemented during reconnection.

Performance Implications

  • Non-blocking operations, as mentioned, are essential for maintaining responsive applications. Ensure that if the task is being retried, it incorporates exponential backoff to avoid overwhelming network resources in case of persistent failures.

Rust Best Practices and Idiomatic Patterns

  • Error Handling: The use of if let Err(e) = is idiomatic for handling possible errors. Make sure that the warn! logging is not suppressed or truncated in production, as this error information might be crucial for debugging.

Memory Safety and Proper Error Handling

  • Error Logging: Use of warn! is good for logging non-fatal errors. Consider augmenting with telemetry if available, to provide access to historical patterns of errors.

Code Readability and Maintainability

  • Documentation: The inline comment explaining the non-blocking nature of the dial attempt enhances readability and understanding of the asynchronous functionality. Consider expanding this with a reference to relevant documentation or other parts of the codebase where OutgoingConnectionError is properly handled.

Consensus Layer Considerations

  • General Note: This snippet pertains to network connections, rather than consensus-specific logic directly. Ensure elsewhere in the codebase that proper integrity checks are imposed on messages being exchanged at these peer connections to safeguard consensus security.

Overall, the change in this code snippet looks reasonable with respect to the networking operation being performed. However, as network operations can be integral to consensus operation stability, ensuring robustness and security remains critical. Address the handling of OutgoingConnectionError and incorporate secure communication practices if not already present in your codebase.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General Comments

The code snippet provided is limited, focusing primarily on peer disconnection handling in a P2P context. The changes suggest minor formatting adjustments, with no direct functional impact visible in this snippet.

Code Correctness and Bugs

  • No direct issues: The changes from this snippet don't introduce functional errors or bugs.

Security

  • Security checks missing: While not shown here, ensure that peer disconnection logic elsewhere in your code handles unexpected cases securely, such as preventing disallowed state transitions or peer impersonation.

Performance

  • Negligible performance impact: Formatting changes will not affect performance.

Rust Best Practices

  • Idiomatic Spacing: The removal of trailing whitespace aligns with Rust's style conventions, promoting clean, readable code.

Memory Safety and Error Handling

  • Error Handling: Ensure robust error handling when dealing with potentially unreliable network connections, though this snippet alone provides no context for such evaluation.

Code Readability and Maintainability

  • Code Clarity: The logging statement continuation (info!) suggests appropriate logging but should encapsulate relevant context detailing why disconnections occur, if not already present in your logging elsewhere.

Consensus-Layer Considerations

This snippet does not reveal specific consensus-layer logic, impairing the ability to critique areas like fork choice or state transition functions directly. However, ensure across the codebase that:

  • Fork Choice: Logic strictly follows LMD GHOST or 3SF-mini rules.
  • Attestation/Justification/Finalization: Ensure they adhere to Ethereum's expected protocols.

Conclusion

While this snippet looks good in terms of style and doesn't break functionality, please review the codebase for the points mentioned above, especially from security and consensus-layer perspectives. Given the limited scope presented, the full PR review context is necessary.

Final Note

If the changes are isolated to formatting or minor style adjustments without functionality alteration, consider this review as active acceptance contingent upon ensuring broader systemic checks across the lines of code implicated in the PR.

}

pub(crate) struct PendingRequest {
pub(crate) attempts: u32,
Expand Down Expand Up @@ -122,6 +128,7 @@ pub async fn start_p2p(
})
.build();
let local_peer_id = *swarm.local_peer_id();
let mut bootnode_addrs = HashMap::new();
for bootnode in bootnodes {
let peer_id = PeerId::from_public_key(&bootnode.public_key);
if peer_id == local_peer_id {
Expand All @@ -133,6 +140,7 @@ pub async fn start_p2p(
.with(Protocol::QuicV1)
.with_p2p(peer_id)
.expect("failed to add peer ID to multiaddr");
bootnode_addrs.insert(peer_id, addr.clone());
swarm.dial(addr).unwrap();
}
let addr = Multiaddr::empty()
Expand All @@ -159,7 +167,7 @@ pub async fn start_p2p(
"/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy"
));

info!("P2P node started on {listening_socket}");
info!(socket=%listening_socket, "P2P node started");

let (retry_tx, retry_rx) = mpsc::unbounded_channel();

Expand All @@ -173,6 +181,7 @@ pub async fn start_p2p(
connected_peers: HashSet::new(),
pending_requests: HashMap::new(),
request_id_map: HashMap::new(),
bootnode_addrs,
retry_tx,
retry_rx,
};
Expand All @@ -197,8 +206,11 @@ pub(crate) struct P2PServer {
pub(crate) connected_peers: HashSet<PeerId>,
pub(crate) pending_requests: HashMap<ethlambda_types::primitives::H256, PendingRequest>,
pub(crate) request_id_map: HashMap<OutboundRequestId, ethlambda_types::primitives::H256>,
retry_tx: mpsc::UnboundedSender<ethlambda_types::primitives::H256>,
retry_rx: mpsc::UnboundedReceiver<ethlambda_types::primitives::H256>,
/// Bootnode addresses for redialing when disconnected
bootnode_addrs: HashMap<PeerId, Multiaddr>,
/// Channel for scheduling retries (block fetches and peer redials)
pub(crate) retry_tx: mpsc::UnboundedSender<RetryMessage>,
retry_rx: mpsc::UnboundedReceiver<RetryMessage>,
}

/// Event loop for the P2P crate.
Expand All @@ -220,8 +232,11 @@ async fn event_loop(mut server: P2PServer) {
};
handle_swarm_event(&mut server, event).await;
}
Some(root) = server.retry_rx.recv() => {
handle_retry(&mut server, root).await;
Some(msg) = server.retry_rx.recv() => {
match msg {
RetryMessage::BlockFetch(root) => handle_retry(&mut server, root).await,
RetryMessage::PeerRedial(peer_id) => handle_peer_redial(&mut server, peer_id).await,
}
}
}
}
Expand Down Expand Up @@ -302,13 +317,20 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent<BehaviourE
server.connected_peers.remove(&peer_id);
let peer_count = server.connected_peers.len();
metrics::notify_peer_disconnected(&Some(peer_id), direction, reason);

info!(
%peer_id,
%direction,
%reason,
peer_count,
"Peer disconnected"
);

// Schedule redial if this is a bootnode
if server.bootnode_addrs.contains_key(&peer_id) {
schedule_peer_redial(server.retry_tx.clone(), peer_id);
info!(%peer_id, "Scheduled bootnode redial in {}s", PEER_REDIAL_INTERVAL_SECS);
}
} else {
info!(%peer_id, %direction, %reason, "Peer connection closed but other connections remain");
}
Expand All @@ -321,6 +343,15 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent<BehaviourE
};
metrics::notify_peer_connected(&peer_id, "outbound", result);
warn!(?peer_id, %error, "Outgoing connection error");

// Schedule redial if this was a bootnode
if let Some(pid) = peer_id
&& server.bootnode_addrs.contains_key(&pid)
&& !server.connected_peers.contains(&pid)
{
schedule_peer_redial(server.retry_tx.clone(), pid);
info!(%pid, "Scheduled bootnode redial after connection error");
}
}
SwarmEvent::IncomingConnectionError { peer_id, error, .. } => {
metrics::notify_peer_connected(&peer_id, "inbound", "error");
Expand Down Expand Up @@ -369,6 +400,33 @@ async fn handle_retry(server: &mut P2PServer, root: H256) {
}
}

async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) {
// Skip if already reconnected
if server.connected_peers.contains(&peer_id) {
trace!(%peer_id, "Bootnode reconnected during redial delay, skipping");
return;
}

if let Some(addr) = server.bootnode_addrs.get(&peer_id) {
info!(%peer_id, "Redialing disconnected bootnode");
// NOTE: this dial does some checks and adds a pending outbound connection attempt.
// It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event.
if let Err(e) = server.swarm.dial(addr.clone()) {
warn!(%peer_id, %e, "Failed to redial bootnode, will retry");
// Schedule another redial attempt
schedule_peer_redial(server.retry_tx.clone(), peer_id);
}
}
}

/// Schedules a peer redial after the configured delay interval.
pub(crate) fn schedule_peer_redial(retry_tx: mpsc::UnboundedSender<RetryMessage>, peer_id: PeerId) {
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await;
let _ = retry_tx.send(RetryMessage::PeerRedial(peer_id));
});
}

pub struct Bootnode {
ip: IpAddr,
quic_port: u16,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Correctness and Potential Bugs

  • Line 306: When scheduling the redial, the closure capturing peer_id is not robust enough in situations of concurrent execution. Consider using a 'move' semantics to prevent any potential inconsistencies or incorrect behavior.

Security Vulnerabilities

  • Ensure that bootnode redials do not introduce a denial-of-service vulnerability by indefinitely scheduling retries without adequate limits or delays, especially in a network where systemic connection issues could trigger repeated retries.

Performance Implications

  • Constant retries can lead to excessive resource usage. Ensure that there is a maximum retry count or exponential backoff mechanism, especially when attempting to redial bootnodes.

Rust Best Practices and Idiomatic Patterns

  • Line 300 and 314: It is more conventional to avoid nesting logic too deeply; consider refactoring logic in the handle_swarm_event function to be more linear and readable.

Memory Safety and Proper Error Handling

  • Line 388: The absence of error handling in server.swarm.dial() lacks proper memory cleanup on failure scenarios, which could lead to resource leaks. Consider implementing comprehensive error handling with rollback operations.

Code Readability and Maintainability

  • Lines 291-311 and 327-337: The repeated pattern of scheduling delayed tasks with tokio::spawn(async move {...}) should be extracted into a separate helper function to improve code clarity and maintainability. This would also reduce the likelihood of introducing bugs when modifications are made in the future.

Conclusion

The PR introduces functionality that could potentially enhance the p2p bootnode management but falls short in a few key areas including correctness, security, and maintainability. Address the noted concerns to make the codebase more robust and efficient.

Expand Down
7 changes: 5 additions & 2 deletions crates/net/p2p/src/req_resp/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use super::{
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload,
ResponseResult, Status,
};
use crate::{BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest};
use crate::{
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest,
RetryMessage,
};

pub async fn handle_req_resp_message(
server: &mut P2PServer,
Expand Down Expand Up @@ -226,6 +229,6 @@ async fn handle_fetch_failure(
let retry_tx = server.retry_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(backoff).await;
let _ = retry_tx.send(root);
let _ = retry_tx.send(RetryMessage::BlockFetch(root));
});
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Comments

1. Code Correctness and Potential Bugs

  • The change from retry_tx.send(root); to retry_tx.send(RetryMessage::BlockFetch(root)); implies a change in the type of message being sent over retry_tx. Ensure that all related components handle this new type correctly, especially where retry_tx is received and processed. Any mismatch could cause runtime errors or logic bugs. Verify the RetryMessage enum/struct and ensure it’s appropriate and thoroughly tested.

2. Security Vulnerabilities

  • Message passing and async task management are crucial in consensus clients. Ensure that all message-passing channels (like retry_tx) are well-protected against untrusted inputs and that all async tasks are correctly awaited or managed to prevent dangling tasks.

3. Performance Implications

  • The use of tokio::spawn inside handle_fetch_failure is a standard way to handle tasks off the main execution loop without blocking. However, excessive spawning could lead to unbounded growth in spawned tasks depending on conditions triggering retries. Consider implementing task management or limiting max retries appropriately.

4. Rust Best Practices and Idiomatic Patterns

  • Consider more descriptive naming for RetryMessage::BlockFetch; if the enum type is not exclusive to block fetches, the naming should differentiate more complex operations the message may handle. This enhances self-documentation of the code.

5. Memory Safety and Proper Error Handling

  • Ensure every send operation on retry_tx handles errors explicitly according to your defined logic, especially since retry_tx.send returns a result. Instead of using _ =, check the error and log it or handle it as needed. Reviewing error handling on tokio::time::sleep's await results is not necessarily required as it’s not expected to return errors unless cancelled by design.

6. Code Readability and Maintainability

  • Break overly long patterns into smaller, well-documented components if they start growing in complexity. This will improve readability and maintainability in the long run.

Consensus-Layer Considerations

  • The changes are part of a broader consensus mechanism (especially if dealing with retries and backoffs). Simply wrapping the root in a RetryMessage::BlockFetch suggests modular handling of retries — ensure this modularity aligns with the rest of the consensus-layer logic like Fork choice, justification, and finalization.

Overall, the updated code segment indicates progress towards better granularity in message handling but requires careful validation of its integration with existing processing mechanisms, especially the handling and receiving of retry_tx messages.

Loading