-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add retry logic #46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ use crate::{ | |
| common::tx_submitter::TransactionSubmitter, | ||
| heartbeat_manager::HearbeatManager::HearbeatManagerInstance, | ||
| }, | ||
| retry::{retry, RetryConfig}, | ||
| types::Htx, | ||
| }; | ||
| use alloy::{ | ||
|
|
@@ -20,7 +21,7 @@ use crate::contract_client::common::errors::decode_any_error; | |
| use crate::contract_client::common::event_helper::{ | ||
| listen_events, listen_events_filtered, BlockRange, | ||
| }; | ||
| use anyhow::{anyhow, bail, Context, Result}; | ||
| use anyhow::{anyhow, bail, Result}; | ||
|
|
||
| sol! { | ||
| interface ISlashingPolicy { | ||
|
|
@@ -137,7 +138,13 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> { | |
|
|
||
| /// Get the current block number | ||
| pub async fn get_block_number(&self) -> Result<u64> { | ||
| Ok(self.provider.get_block_number().await?) | ||
| retry(RetryConfig::for_reads(), "getBlockNumber", || async { | ||
| self.provider | ||
| .get_block_number() | ||
| .await | ||
| .map_err(|e| anyhow!("{e}")) | ||
| }) | ||
| .await | ||
| } | ||
|
|
||
| // ------------------------------------------------------------------------ | ||
|
|
@@ -146,12 +153,26 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> { | |
|
|
||
| /// Get the total number of active nodes | ||
| pub async fn node_count(&self) -> Result<U256> { | ||
| Ok(self.contract.nodeCount().call().await?) | ||
| retry(RetryConfig::for_reads(), "nodeCount", || async { | ||
| self.contract | ||
| .nodeCount() | ||
| .call() | ||
| .await | ||
| .map_err(|e| anyhow!("{e}")) | ||
| }) | ||
| .await | ||
| } | ||
|
|
||
| /// Get the list of all active node addresses | ||
| pub async fn get_nodes(&self) -> Result<Vec<Address>> { | ||
| Ok(self.contract.getNodes().call().await?) | ||
| retry(RetryConfig::for_reads(), "getNodes", || async { | ||
| self.contract | ||
| .getNodes() | ||
| .call() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is fine as is but I think there could be a much less verbose way of doing the same so you don't need closures everywhere: create an extension trait. e.g. something like (uncompiled code) #[async_trait]
pub trait CallBuilderExt<T> {
async fn call_with_retries(self, config: RetryConfig) -> Result<T, SomeError>;
}
impl <...> CallBuilderExt<...> for CallBuilder<P, D> {
async fn call_with_retries(self, config: RetryConfig) -> Result<T, SomeError> {
.... do the actual retries
}
}
// usage
self.contract
.getNodes()
.call_with_retries(RetryConfig::for_reads())
.await?We don't have to switch to it (and I'm not sure if you wouldn't hit some annoyance around async here) but just to throw it out there. |
||
| .await | ||
| .map_err(|e| anyhow!("{e}")) | ||
| }) | ||
| .await | ||
| } | ||
|
|
||
| // ------------------------------------------------------------------------ | ||
|
|
@@ -160,15 +181,20 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> { | |
|
|
||
| /// Submit an HTX for verification | ||
| pub async fn submit_htx(&self, htx: &Htx) -> Result<B256> { | ||
| let snapshot_id = self.contract.provider().get_block_number().await?; | ||
| let snapshot_id = snapshot_id.saturating_sub(1); | ||
| // Pre-compute raw_htx outside retry (deterministic, won't fail on retry) | ||
| let raw_htx = alloy::primitives::Bytes::try_from(htx)?; | ||
| let call = self.contract.submitHeartbeat(raw_htx, snapshot_id); | ||
| let gas_with_buffer = Self::overestimate_gas(&call).await?; | ||
| self.submitter | ||
| .with_gas_limit(gas_with_buffer) | ||
| .invoke("submitHeartbeat", call) | ||
| .await | ||
|
|
||
| retry(RetryConfig::default(), "submitHeartbeat", || async { | ||
| let snapshot_id = self.contract.provider().get_block_number().await?; | ||
| let snapshot_id = snapshot_id.saturating_sub(1); | ||
| let call = self.contract.submitHeartbeat(raw_htx.clone(), snapshot_id); | ||
| let gas_with_buffer = Self::overestimate_gas(&call).await?; | ||
| self.submitter | ||
| .with_gas_limit(gas_with_buffer) | ||
| .invoke("submitHeartbeat", call) | ||
| .await | ||
| }) | ||
| .await | ||
| } | ||
|
|
||
| /// Respond to an HTX assignment (called by assigned node) | ||
|
|
@@ -178,21 +204,27 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> { | |
| verdict: Verdict, | ||
| submitter_address: Address, | ||
| ) -> Result<B256> { | ||
| // Pre-compute proofs outside retry (deterministic) | ||
| let proofs = | ||
| Self::compute_merkle_proof(*self.contract.address(), &event, submitter_address)?; | ||
| let verdict = match verdict { | ||
| let verdict_u8 = match verdict { | ||
| Verdict::Success => 1, | ||
| Verdict::Failure => 2, | ||
| Verdict::Inconclusive => 3, | ||
| }; | ||
| let call = self | ||
| .contract | ||
| .submitVerdict(event.heartbeatKey, verdict, proofs); | ||
| let gas_with_buffer = Self::overestimate_gas(&call).await?; | ||
| self.submitter | ||
| .with_gas_limit(gas_with_buffer) | ||
| .invoke("submitVerdict", call) | ||
| .await | ||
| let heartbeat_key = event.heartbeatKey; | ||
|
|
||
| retry(RetryConfig::default(), "submitVerdict", || async { | ||
| let call = self | ||
| .contract | ||
| .submitVerdict(heartbeat_key, verdict_u8, proofs.clone()); | ||
| let gas_with_buffer = Self::overestimate_gas(&call).await?; | ||
| self.submitter | ||
| .with_gas_limit(gas_with_buffer) | ||
| .invoke("submitVerdict", call) | ||
| .await | ||
| }) | ||
| .await | ||
| } | ||
|
|
||
| /// Check if a specific node has responded to an HTX | ||
|
|
@@ -201,12 +233,15 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> { | |
| workload_key: B256, | ||
| node: Address, | ||
| ) -> Result<Option<Verdict>> { | ||
| let vote = self | ||
| .contract | ||
| .getVotePacked(workload_key, 0, node) | ||
| .call() | ||
| .await?; | ||
| match u8::try_from(vote).context("invalid vote")? { | ||
| let vote = retry(RetryConfig::for_reads(), "getVotePacked", || async { | ||
| self.contract | ||
| .getVotePacked(workload_key, 0, node) | ||
| .call() | ||
| .await | ||
| .map_err(|e| anyhow!("{e}")) | ||
| }) | ||
| .await?; | ||
| match u8::try_from(vote).map_err(|_| anyhow!("invalid vote"))? { | ||
| 0 => Ok(None), | ||
| 1 => Ok(Some(Verdict::Success)), | ||
| 2 => Ok(Some(Verdict::Failure)), | ||
|
|
@@ -316,17 +351,27 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> { | |
| &self, | ||
| range: BlockRange, | ||
| ) -> Result<Vec<HeartbeatEnqueuedEvent>> { | ||
| let mut filter = self | ||
| .contract | ||
| .event_filter::<HeartbeatEnqueuedEvent>() | ||
| .from_block(range.from_block); | ||
|
|
||
| if let Some(to_block) = range.to_block { | ||
| filter = filter.to_block(to_block); | ||
| } | ||
| retry( | ||
| RetryConfig::for_reads(), | ||
| "getHtxSubmittedEvents", | ||
| || async { | ||
| let mut filter = self | ||
| .contract | ||
| .event_filter::<HeartbeatEnqueuedEvent>() | ||
| .from_block(range.from_block); | ||
|
|
||
| if let Some(to_block) = range.to_block { | ||
| filter = filter.to_block(to_block); | ||
| } | ||
|
|
||
| let events = filter.query().await?; | ||
| Ok(events.into_iter().map(|(event, _log)| event).collect()) | ||
| filter | ||
| .query() | ||
| .await | ||
| .map(|events| events.into_iter().map(|(event, _log)| event).collect()) | ||
| .map_err(|e| anyhow!("{e}")) | ||
| }, | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| /// Get HTX assigned events from recent history (default: last 1000 blocks) | ||
|
|
@@ -356,17 +401,23 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> { | |
| &self, | ||
| range: BlockRange, | ||
| ) -> Result<Vec<RoundStartedEvent>> { | ||
| let mut filter = self | ||
| .contract | ||
| .event_filter::<RoundStartedEvent>() | ||
| .from_block(range.from_block); | ||
|
|
||
| if let Some(to_block) = range.to_block { | ||
| filter = filter.to_block(to_block); | ||
| } | ||
| retry(RetryConfig::for_reads(), "getHtxAssignedEvents", || async { | ||
| let mut filter = self | ||
| .contract | ||
| .event_filter::<RoundStartedEvent>() | ||
| .from_block(range.from_block); | ||
|
|
||
| if let Some(to_block) = range.to_block { | ||
| filter = filter.to_block(to_block); | ||
| } | ||
|
|
||
| let events = filter.query().await?; | ||
| Ok(events.into_iter().map(|(event, _log)| event).collect()) | ||
| filter | ||
| .query() | ||
| .await | ||
| .map(|events| events.into_iter().map(|(event, _log)| event).collect()) | ||
| .map_err(|e| anyhow!("{e}")) | ||
| }) | ||
| .await | ||
| } | ||
|
|
||
| /// Get HTX responded events from recent history (default: last 1000 blocks) | ||
|
|
@@ -396,17 +447,27 @@ impl<P: Provider + Clone> HeartbeatManagerClient<P> { | |
| &self, | ||
| range: BlockRange, | ||
| ) -> Result<Vec<OperatorVotedEvent>> { | ||
| let mut filter = self | ||
| .contract | ||
| .event_filter::<OperatorVotedEvent>() | ||
| .from_block(range.from_block); | ||
|
|
||
| if let Some(to_block) = range.to_block { | ||
| filter = filter.to_block(to_block); | ||
| } | ||
| retry( | ||
| RetryConfig::for_reads(), | ||
| "getHtxRespondedEvents", | ||
| || async { | ||
| let mut filter = self | ||
| .contract | ||
| .event_filter::<OperatorVotedEvent>() | ||
| .from_block(range.from_block); | ||
|
|
||
| if let Some(to_block) = range.to_block { | ||
| filter = filter.to_block(to_block); | ||
| } | ||
|
|
||
| let events = filter.query().await?; | ||
| Ok(events.into_iter().map(|(event, _log)| event).collect()) | ||
| filter | ||
| .query() | ||
| .await | ||
| .map(|events| events.into_iter().map(|(event, _log)| event).collect()) | ||
| .map_err(|e| anyhow!("{e}")) | ||
| }, | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| fn compute_leaf( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can use
Durationdirectly so you don't need the_SECSand the conversion toDurationon use later on