Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/bin/blacklight_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use blacklight::{
heartbeat_manager::{RoundStartedEvent, Verdict},
BlacklightClient, ContractConfig,
},
retry::RetryConfig,
types::Htx,
verification::HtxVerifier,
};
Expand Down Expand Up @@ -101,11 +102,15 @@ async fn process_htx_assignment(
Ok(htx) => match htx {
Htx::Nillion(htx) => {
info!(htx_id = ?htx_id, "Detected nilCC HTX");
verifier.verify_nillion_htx(&htx).await
verifier
.verify_nillion_htx_with_retry(&htx, RetryConfig::default())
.await
}
Htx::Phala(htx) => {
info!(htx_id = ?htx_id, "Detected Phala HTX");
verifier.verify_phala_htx(&htx).await
verifier
.verify_phala_htx_with_retry(&htx, RetryConfig::default())
.await
}
},
Err(e) => {
Expand Down
14 changes: 14 additions & 0 deletions src/config/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,17 @@ const fn eth_to_wei(eth: f64) -> U256 {
/// Minimum ETH balance required to continue operating
/// Node will initiate shutdown if balance falls below this threshold
pub const MIN_ETH_BALANCE: U256 = eth_to_wei(0.00001);

// =============================================================================
// Retry Configuration
// =============================================================================

/// Default delay between retry attempts in seconds
pub const DEFAULT_RETRY_DELAY_SECS: u64 = 30;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can use Duration directly so you don't need the _SECS and the conversion to Duration on use later on


/// Default delay between retry attempts for read operations in seconds
/// Shorter than write operations since reads are fast and idempotent
pub const DEFAULT_READ_RETRY_DELAY_SECS: u64 = 5;

/// Default maximum number of retry attempts
pub const DEFAULT_MAX_RETRY_ATTEMPTS: u32 = 3;
17 changes: 15 additions & 2 deletions src/contract_client/blacklight_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::contract_client::{
ContractConfig, HeartbeatManagerClient, NilTokenClient, StakingOperatorsClient,
};
use crate::retry::{retry, RetryConfig};

use alloy::{
network::{Ethereum, EthereumWallet, NetworkWallet},
Expand Down Expand Up @@ -68,12 +69,24 @@ impl BlacklightClient {
/// Get the balance of the wallet
pub async fn get_balance(&self) -> anyhow::Result<U256> {
let address = self.signer_address();
Ok(self.provider.get_balance(address).await?)
retry(RetryConfig::for_reads(), "getBalance", || async {
self.provider
.get_balance(address)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
})
.await
}

/// Get the balance of a specific address
pub async fn get_balance_of(&self, address: Address) -> anyhow::Result<U256> {
Ok(self.provider.get_balance(address).await?)
retry(RetryConfig::for_reads(), "getBalance", || async {
self.provider
.get_balance(address)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
})
.await
}

/// Send ETH to an address
Expand Down
30 changes: 30 additions & 0 deletions src/contract_client/common/tx_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::retry::{retry, IntoAnyhow, RetryConfig};

#[derive(Clone)]
pub(crate) struct TransactionSubmitter<S> {
tx_lock: Arc<Mutex<()>>,
Expand Down Expand Up @@ -69,6 +71,34 @@ impl<S: SolInterface + Debug + Clone> TransactionSubmitter<S> {
Ok(tx_hash)
}

/// Invoke a contract method with automatic retry on failure.
///
/// This wraps `invoke()` with retry logic, using the provided configuration.
/// All errors from `invoke()` are considered retryable.
///
/// # Arguments
/// * `method` - Method name for logging
/// * `config` - Retry configuration
/// * `call_fn` - Function that produces the CallBuilder (called on each retry)
pub(crate) async fn invoke_with_retry<P, D, F>(
&self,
method: &str,
config: RetryConfig,
call_fn: F,
) -> Result<B256>
where
P: Provider + Clone,
D: alloy::contract::CallDecoder + Clone,
F: Fn() -> CallBuilder<P, D>,
{
retry(config, method, || async {
let call = call_fn();
self.invoke(method, call).await
})
.await
.into_anyhow()
}

pub(crate) fn with_gas_limit(&self, limit: u64) -> Self {
let mut this = self.clone();
this.gas_limit = Some(limit);
Expand Down
175 changes: 118 additions & 57 deletions src/contract_client/heartbeat_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
common::tx_submitter::TransactionSubmitter,
heartbeat_manager::HearbeatManager::HearbeatManagerInstance,
},
retry::{retry, RetryConfig},
types::Htx,
};
use alloy::{
Expand All @@ -20,7 +21,7 @@ use crate::contract_client::common::errors::decode_any_error;
use crate::contract_client::common::event_helper::{
listen_events, listen_events_filtered, BlockRange,
};
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{anyhow, bail, Result};

sol! {
interface ISlashingPolicy {
Expand Down Expand Up @@ -137,7 +138,13 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> {

/// Get the current block number
pub async fn get_block_number(&self) -> Result<u64> {
Ok(self.provider.get_block_number().await?)
retry(RetryConfig::for_reads(), "getBlockNumber", || async {
self.provider
.get_block_number()
.await
.map_err(|e| anyhow!("{e}"))
})
.await
}

// ------------------------------------------------------------------------
Expand All @@ -146,12 +153,26 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> {

/// Get the total number of active nodes
pub async fn node_count(&self) -> Result<U256> {
Ok(self.contract.nodeCount().call().await?)
retry(RetryConfig::for_reads(), "nodeCount", || async {
self.contract
.nodeCount()
.call()
.await
.map_err(|e| anyhow!("{e}"))
})
.await
}

/// Get the list of all active node addresses
pub async fn get_nodes(&self) -> Result<Vec<Address>> {
Ok(self.contract.getNodes().call().await?)
retry(RetryConfig::for_reads(), "getNodes", || async {
self.contract
.getNodes()
.call()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine as is but I think there could be a much less verbose way of doing the same so you don't need closures everywhere: create an extension trait. e.g. something like (uncompiled code)

#[async_trait]
pub trait CallBuilderExt<T> {
    async fn call_with_retries(self, config: RetryConfig) -> Result<T, SomeError>;
}

impl <...> CallBuilderExt<...> for CallBuilder<P, D> {
    async fn call_with_retries(self, config: RetryConfig) -> Result<T, SomeError> {
        .... do the actual retries
    }
}


// usage

self.contract
      .getNodes()
      .call_with_retries(RetryConfig::for_reads())
      .await?

We don't have to switch to it (and I'm not sure if you wouldn't hit some annoyance around async here) but just to throw it out there.

.await
.map_err(|e| anyhow!("{e}"))
})
.await
}

// ------------------------------------------------------------------------
Expand All @@ -160,15 +181,20 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> {

/// Submit an HTX for verification
pub async fn submit_htx(&self, htx: &Htx) -> Result<B256> {
let snapshot_id = self.contract.provider().get_block_number().await?;
let snapshot_id = snapshot_id.saturating_sub(1);
// Pre-compute raw_htx outside retry (deterministic, won't fail on retry)
let raw_htx = alloy::primitives::Bytes::try_from(htx)?;
let call = self.contract.submitHeartbeat(raw_htx, snapshot_id);
let gas_with_buffer = Self::overestimate_gas(&call).await?;
self.submitter
.with_gas_limit(gas_with_buffer)
.invoke("submitHeartbeat", call)
.await

retry(RetryConfig::default(), "submitHeartbeat", || async {
let snapshot_id = self.contract.provider().get_block_number().await?;
let snapshot_id = snapshot_id.saturating_sub(1);
let call = self.contract.submitHeartbeat(raw_htx.clone(), snapshot_id);
let gas_with_buffer = Self::overestimate_gas(&call).await?;
self.submitter
.with_gas_limit(gas_with_buffer)
.invoke("submitHeartbeat", call)
.await
})
.await
}

/// Respond to an HTX assignment (called by assigned node)
Expand All @@ -178,21 +204,27 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> {
verdict: Verdict,
submitter_address: Address,
) -> Result<B256> {
// Pre-compute proofs outside retry (deterministic)
let proofs =
Self::compute_merkle_proof(*self.contract.address(), &event, submitter_address)?;
let verdict = match verdict {
let verdict_u8 = match verdict {
Verdict::Success => 1,
Verdict::Failure => 2,
Verdict::Inconclusive => 3,
};
let call = self
.contract
.submitVerdict(event.heartbeatKey, verdict, proofs);
let gas_with_buffer = Self::overestimate_gas(&call).await?;
self.submitter
.with_gas_limit(gas_with_buffer)
.invoke("submitVerdict", call)
.await
let heartbeat_key = event.heartbeatKey;

retry(RetryConfig::default(), "submitVerdict", || async {
let call = self
.contract
.submitVerdict(heartbeat_key, verdict_u8, proofs.clone());
let gas_with_buffer = Self::overestimate_gas(&call).await?;
self.submitter
.with_gas_limit(gas_with_buffer)
.invoke("submitVerdict", call)
.await
})
.await
}

/// Check if a specific node has responded to an HTX
Expand All @@ -201,12 +233,15 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> {
workload_key: B256,
node: Address,
) -> Result<Option<Verdict>> {
let vote = self
.contract
.getVotePacked(workload_key, 0, node)
.call()
.await?;
match u8::try_from(vote).context("invalid vote")? {
let vote = retry(RetryConfig::for_reads(), "getVotePacked", || async {
self.contract
.getVotePacked(workload_key, 0, node)
.call()
.await
.map_err(|e| anyhow!("{e}"))
})
.await?;
match u8::try_from(vote).map_err(|_| anyhow!("invalid vote"))? {
0 => Ok(None),
1 => Ok(Some(Verdict::Success)),
2 => Ok(Some(Verdict::Failure)),
Expand Down Expand Up @@ -316,17 +351,27 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> {
&self,
range: BlockRange,
) -> Result<Vec<HeartbeatEnqueuedEvent>> {
let mut filter = self
.contract
.event_filter::<HeartbeatEnqueuedEvent>()
.from_block(range.from_block);

if let Some(to_block) = range.to_block {
filter = filter.to_block(to_block);
}
retry(
RetryConfig::for_reads(),
"getHtxSubmittedEvents",
|| async {
let mut filter = self
.contract
.event_filter::<HeartbeatEnqueuedEvent>()
.from_block(range.from_block);

if let Some(to_block) = range.to_block {
filter = filter.to_block(to_block);
}

let events = filter.query().await?;
Ok(events.into_iter().map(|(event, _log)| event).collect())
filter
.query()
.await
.map(|events| events.into_iter().map(|(event, _log)| event).collect())
.map_err(|e| anyhow!("{e}"))
},
)
.await
}

/// Get HTX assigned events from recent history (default: last 1000 blocks)
Expand Down Expand Up @@ -356,17 +401,23 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> {
&self,
range: BlockRange,
) -> Result<Vec<RoundStartedEvent>> {
let mut filter = self
.contract
.event_filter::<RoundStartedEvent>()
.from_block(range.from_block);

if let Some(to_block) = range.to_block {
filter = filter.to_block(to_block);
}
retry(RetryConfig::for_reads(), "getHtxAssignedEvents", || async {
let mut filter = self
.contract
.event_filter::<RoundStartedEvent>()
.from_block(range.from_block);

if let Some(to_block) = range.to_block {
filter = filter.to_block(to_block);
}

let events = filter.query().await?;
Ok(events.into_iter().map(|(event, _log)| event).collect())
filter
.query()
.await
.map(|events| events.into_iter().map(|(event, _log)| event).collect())
.map_err(|e| anyhow!("{e}"))
})
.await
}

/// Get HTX responded events from recent history (default: last 1000 blocks)
Expand Down Expand Up @@ -396,17 +447,27 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> {
&self,
range: BlockRange,
) -> Result<Vec<OperatorVotedEvent>> {
let mut filter = self
.contract
.event_filter::<OperatorVotedEvent>()
.from_block(range.from_block);

if let Some(to_block) = range.to_block {
filter = filter.to_block(to_block);
}
retry(
RetryConfig::for_reads(),
"getHtxRespondedEvents",
|| async {
let mut filter = self
.contract
.event_filter::<OperatorVotedEvent>()
.from_block(range.from_block);

if let Some(to_block) = range.to_block {
filter = filter.to_block(to_block);
}

let events = filter.query().await?;
Ok(events.into_iter().map(|(event, _log)| event).collect())
filter
.query()
.await
.map(|events| events.into_iter().map(|(event, _log)| event).collect())
.map_err(|e| anyhow!("{e}"))
},
)
.await
}

fn compute_leaf(
Expand Down
Loading