diff --git a/blacklight-node/Cargo.toml b/blacklight-node/Cargo.toml index 8ebc9b0..c2303cd 100644 --- a/blacklight-node/Cargo.toml +++ b/blacklight-node/Cargo.toml @@ -17,6 +17,7 @@ serde_json = "1.0" sha2 = "0.10" term-table = "1.4" tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "signal"] } +tokio-util = "0.7" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } diff --git a/blacklight-node/src/main.rs b/blacklight-node/src/main.rs index d1cded5..0565078 100644 --- a/blacklight-node/src/main.rs +++ b/blacklight-node/src/main.rs @@ -1,470 +1,29 @@ -use alloy::primitives::Address; -use alloy::primitives::utils::{format_ether, format_units}; use anyhow::Result; -use args::{CliArgs, NodeConfig, validate_node_requirements}; -use blacklight_contract_clients::{ - BlacklightClient, ContractConfig, - heartbeat_manager::{RoundStartedEvent, Verdict}, - htx::Htx, -}; +use args::{CliArgs, NodeConfig}; use clap::Parser; -use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::Duration; -use tokio::sync::Notify; -use tracing::{debug, error, info, warn}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; use tracing_subscriber::{EnvFilter, fmt, prelude::*}; -use verification::HtxVerifier; -use crate::args::MIN_ETH_BALANCE; - -/// Initial reconnection delay in seconds -const INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1); -/// Maximum reconnection delay in seconds -const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(60); +use crate::verification::HtxVerifier; mod args; +mod shutdown; +mod supervisor; mod verification; -mod version; mod wallet; -use version::{VERSION, validate_node_version}; - -// ============================================================================ -// Signal Handling -// ============================================================================ - -/// Setup shutdown signal handler (Ctrl+C / SIGTERM) -async fn setup_shutdown_handler(shutdown_notify: Arc) { - #[cfg(unix)] - { - use tokio::signal::unix::{SignalKind, signal}; - - let mut sigterm = - signal(SignalKind::terminate()).expect("Failed to register SIGTERM handler"); - let mut sigint = - signal(SignalKind::interrupt()).expect("Failed to register SIGINT handler"); - - tokio::select! { - _ = sigterm.recv() => { - info!("Shutdown signal received (SIGTERM)"); - } - _ = sigint.recv() => { - info!("Shutdown signal received (SIGINT/Ctrl+C)"); - } - } - - shutdown_notify.notify_waiters(); - } - - #[cfg(not(unix))] - { - match tokio::signal::ctrl_c().await { - Ok(()) => { - info!("Shutdown signal received (Ctrl+C)"); - shutdown_notify.notify_waiters(); - } - Err(err) => { - error!(error = %err, "Failed to listen for shutdown signal"); - } - } - } -} - -// ============================================================================ -// Status Reporting -// ============================================================================ - -/// Print status information (ETH balance, staked balance, verified HTXs) -async fn print_status(client: &BlacklightClient, verified_count: u64) -> Result<()> { - let eth_balance = client.get_balance().await?; - let node_address = client.signer_address(); - let staked_balance = client.staking.stake_of(node_address).await?; - - info!( - "📊 STATUS | ETH: {} | STAKED: {} NIL | Verified HTXs: {}", - format_ether(eth_balance), - format_units(staked_balance, 6)?, - verified_count - ); - - Ok(()) -} - -// ============================================================================ -// HTX Processing -// ============================================================================ - -/// Process a single HTX assignment - verifies and submits result -async fn process_htx_assignment( - client: Arc, - event: RoundStartedEvent, - verifier: &HtxVerifier, - verified_counter: Arc, - shutdown_notify: Arc, - node_address: Address, -) -> Result<()> { - let htx_id = event.heartbeatKey; - // Debug: log the raw HTX bytes - let raw_bytes: &[u8] = &event.rawHTX; - tracing::debug!( - htx_id = ?htx_id, - raw_len = raw_bytes.len(), - raw_hex = %alloy::hex::encode(raw_bytes), - "Raw HTX bytes" - ); - // Parse the HTX data - tries JSON first (nilCC/Phala), then ABI decoding (ERC-8004) - let verification_result = match Htx::try_parse(&event.rawHTX) { - Ok(htx) => match htx { - Htx::Nillion(htx) => { - info!(htx_id = ?htx_id, "Detected nilCC HTX"); - verifier.verify_nillion_htx(&htx).await - } - Htx::Phala(htx) => { - info!(htx_id = ?htx_id, "Detected Phala HTX"); - verifier.verify_phala_htx(&htx).await - } - Htx::Erc8004(htx) => { - info!( - htx_id = ?htx_id, - agent_id = %htx.agent_id, - request_uri = %htx.request_uri, - "Detected ERC-8004 validation HTX" - ); - verifier.verify_erc8004_htx(&htx).await - } - }, - Err(e) => { - error!(htx_id = ?htx_id, error = %e, "Failed to parse HTX data"); - // If we parse invalid data, it could be a malicious node, so Failure and it doesn't get rewarded - client - .manager - .respond_htx(event, Verdict::Failure, node_address) - .await?; - info!(htx_id = ?htx_id, "✅ HTX verification submitted"); - return Ok(()); - } - }; - let verdict = match verification_result { - Ok(_) => Verdict::Success, - Err(ref e) => e.verdict(), - }; - - // Submit the verification result - match client - .manager - .respond_htx(event, verdict, node_address) - .await - { - Ok(tx_hash) => { - let count = verified_counter.fetch_add(1, Ordering::SeqCst) + 1; - - match (verdict, verification_result) { - (Verdict::Success, Ok(_)) => { - info!(tx_hash=?tx_hash, "✅ VALID HTX verification submitted"); - } - (Verdict::Failure, Err(e)) => { - info!(tx_hash=?tx_hash, error=?e, verdict="failure", "❌ INVALID HTX verification submitted"); - } - (Verdict::Inconclusive, Err(e)) => { - info!(tx_hash=?tx_hash, error=?e, verdict="inconclusive", "⚠️ INCONCLUSIVE HTX verification submitted"); - } - (_, _) => { - error!(tx_hash=?tx_hash, verdict=?verdict, "Unexpected verification state"); - } - } - - if let Err(e) = print_status(&client, count).await { - warn!(error = %e, "Failed to fetch status information"); - } - - // Validate node version against protocol requirement and initiate shutdown if incompatible - if let Err(e) = validate_node_version(&client).await { - error!(error = %e, "Node version validation failed. Initiating shutdown..."); - shutdown_notify.notify_waiters(); - return Err(anyhow::anyhow!( - "Node version is incompatible with protocol requirement" - )); - } - - // Check if balance is below minimum threshold - match client.get_balance().await { - Ok(balance) => { - if balance < MIN_ETH_BALANCE { - error!( - balance = %format_ether(balance), - min_required = %format_ether(MIN_ETH_BALANCE), - "⚠️ ETH balance below minimum threshold. Initiating shutdown..." - ); - shutdown_notify.notify_waiters(); - return Err(anyhow::anyhow!("Insufficient ETH balance")); - } - } - Err(e) => { - warn!(error = %e, "Failed to check balance after transaction"); - } - } - - Ok(()) - } - Err(e) => { - error!(htx_id = ?htx_id, error = %e, "Failed to respond to HTX"); - Err(e) - } - } -} - -/// Process backlog of historical assignments -async fn process_assignment_backlog( - client: Arc, - node_address: Address, - verifier: &HtxVerifier, - verified_counter: Arc, - shutdown_notify: Arc, -) -> Result<()> { - info!("Checking for pending assignments from before connection"); - - let assigned_events = client.manager.get_htx_assigned_events().await?; - let pending: Vec<_> = assigned_events - .into_iter() - .filter(|e| e.members.contains(&node_address)) - .collect(); - - if pending.is_empty() { - info!("No pending assignments found"); - return Ok(()); - } - - info!( - count = pending.len(), - "Found historical assignments, processing backlog" - ); - - for event in pending { - let htx_id = event.heartbeatKey; - - // Check if already responded - match client.manager.get_node_vote(htx_id, node_address).await { - Ok(Some(_)) => { - debug!(htx_id = ?htx_id, "Already responded HTX, skipping"); - } - Ok(None) => { - info!(htx_id = ?htx_id, "📥 HTX received (backlog)"); - let client_clone = client.clone(); - let verifier = verifier.clone(); - let counter = verified_counter.clone(); - let shutdown_clone = shutdown_notify.clone(); - tokio::spawn(async move { - if let Err(e) = process_htx_assignment( - client_clone, - event, - &verifier, - counter, - shutdown_clone, - node_address, - ) - .await - { - error!(htx_id = ?htx_id, error = %e, "Failed to process pending HTX"); - } - }); - } - Err(e) => { - error!(htx_id = ?htx_id, error = %e, "Failed to check assignment status"); - } - } - } - - info!("Backlog processing complete"); - Ok(()) -} - -// ============================================================================ -// Node Registration -// ============================================================================ - -/// Register node with the contract if not already registered -async fn register_node_if_needed(client: &BlacklightClient, node_address: Address) -> Result<()> { - info!(node_address = %node_address, "Checking node registration"); - - let is_registered = client.staking.is_active_operator(node_address).await?; - - if is_registered { - info!("Node already registered"); - return Ok(()); - } - - info!("Registering node with contract"); - let tx_hash = client.staking.register_operator("".to_string()).await?; - info!(tx_hash = ?tx_hash, "Node registered successfully"); - - Ok(()) -} - -// ============================================================================ -// Client Creation -// ============================================================================ - -/// Create a WebSocket client with exponential backoff retry logic -async fn create_client_with_retry( - config: &NodeConfig, - shutdown_notify: &Arc, -) -> Result { - let mut reconnect_delay = INITIAL_RECONNECT_DELAY; - let max_reconnect_delay = MAX_RECONNECT_DELAY; - - let contract_config = ContractConfig::new( - config.rpc_url.clone(), - config.manager_contract_address, - config.staking_contract_address, - config.token_contract_address, - ); - - loop { - let client_result = - BlacklightClient::new(contract_config.clone(), config.private_key.clone()).await; - - match client_result { - Ok(client) => { - let balance = client.get_balance().await?; - info!(balance = ?balance, "WebSocket connection established"); - return Ok(client); - } - Err(e) => { - error!(error = %e, reconnect_delay = ?reconnect_delay, "Failed to connect WebSocket. Retrying..."); - - // Sleep with ability to be interrupted by shutdown - tokio::select! { - _ = tokio::time::sleep(reconnect_delay) => { - reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay); - } - _ = shutdown_notify.notified() => { - return Err(anyhow::anyhow!("Shutdown signal received during connection retry")); - } - } - } - } - } -} - -// ============================================================================ -// Event Listening -// ============================================================================ - -/// Listen for HTX assignment events and process them -async fn run_event_listener( - client: Arc, - node_address: Address, - shutdown_notify: Arc, - verifier: &HtxVerifier, - verified_counter: Arc, -) -> Result<()> { - let client_for_callback = client.clone(); - let counter_for_callback = verified_counter.clone(); - let shutdown_for_callback = shutdown_notify.clone(); - - let manager = Arc::new(client.manager.clone()); - let listen_future = manager.listen_htx_assigned_for_node(node_address, move |event| { - let client = client_for_callback.clone(); - let counter = counter_for_callback.clone(); - let shutdown_clone = shutdown_for_callback.clone(); - - async move { - let htx_id = event.heartbeatKey; - let node_addr = client.signer_address(); - let verifier = verifier.clone(); - tokio::spawn(async move { - // Check if already responded - match client.manager.get_node_vote(htx_id, node_addr).await { - Ok(Some(_)) => (), - Ok(None) => { - info!(htx_id = ?htx_id, "📥 HTX received"); - if let Err(e) = process_htx_assignment( - client, - event, - &verifier, - counter, - shutdown_clone, - node_address, - ) - .await - { - error!(htx_id = ?htx_id, error = %e, "Failed to process real-time HTX"); - } - } - Err(e) => { - error!(htx_id = ?htx_id, error = %e, "Failed to get assignment for HTX"); - } - } - }); - - Ok(()) - } - }); - - // Listen for either events or shutdown signal - tokio::select! { - result = listen_future => { - result?; - Ok(()) - }, - _ = shutdown_notify.notified() => { - info!("Shutdown signal received during event listening"); - Err(anyhow::anyhow!("Shutdown requested")) - } - } -} - -// ============================================================================ -// Shutdown -// ============================================================================ - -/// Deactivate node from contract on shutdown -async fn deactivate_node_on_shutdown( - config: &NodeConfig, - node_address: Option
, -) -> Result<()> { - info!("Initiating graceful shutdown"); - - let Some(addr) = node_address else { - warn!("Node was never registered, skipping deactivation"); - return Ok(()); - }; - - info!(node_address = %addr, "Deactivating node from contract"); - - let contract_config = ContractConfig::new( - config.rpc_url.clone(), - config.manager_contract_address, - config.staking_contract_address, - config.token_contract_address, - ); - - let client = BlacklightClient::new(contract_config, config.private_key.clone()).await?; - let tx_hash = client.staking.deactivate_operator().await?; - info!(tx_hash = ?tx_hash, "Node deactivated successfully"); - - Ok(()) -} - -// ============================================================================ -// Main -// ============================================================================ - #[tokio::main] async fn main() -> Result<()> { - // Initialize tracing with filters to reduce noise from dependencies + // Initialize tracing let filter = EnvFilter::builder() .with_default_directive(tracing::Level::ERROR.into()) .with_default_directive("attestation_verification=warn".parse()?) .with_default_directive("alloy_transport_ws=off".parse()?) .from_env_lossy() - // Silence noisy attestation verification modules .add_directive("nilcc_artifacts=warn".parse()?) - // Silence Alloy framework noise .add_directive("alloy=warn".parse()?) .add_directive("alloy_pubsub=error".parse()?) - // Keep blacklight logs at info level .add_directive("blacklight=info".parse()?) .add_directive("blacklight_node=info".parse()?); @@ -478,114 +37,19 @@ async fn main() -> Result<()> { let verifier = HtxVerifier::new(cli_args.artifact_cache.clone(), cli_args.cert_cache.clone())?; let config = NodeConfig::load(cli_args).await?; - // Create initial client to validate requirements - let contract_config = ContractConfig::new( - config.rpc_url.clone(), - config.manager_contract_address, - config.staking_contract_address, - config.token_contract_address, - ); - let validation_client = - BlacklightClient::new(contract_config, config.private_key.clone()).await?; - - // Validate node has sufficient ETH and staked NIL tokens - validate_node_requirements( - &validation_client, - &config.rpc_url, - config.was_wallet_created, - ) - .await?; - - // Validate node version against protocol requirement - validate_node_version(&validation_client).await?; - - info!(version = VERSION, "Node initialized"); - info!("Press Ctrl+C to gracefully shutdown and deactivate"); - - // Setup graceful shutdown handler - let shutdown_notify = Arc::new(Notify::new()); - let shutdown_notify_clone = shutdown_notify.clone(); + // Setup shutdown handler + let shutdown_token = CancellationToken::new(); + let shutdown_token_clone = shutdown_token.clone(); tokio::spawn(async move { - setup_shutdown_handler(shutdown_notify_clone).await; + shutdown::shutdown_signal(shutdown_token_clone).await; }); - // Counter for verified HTXs (for status reporting) - let verified_counter = Arc::new(AtomicU64::new(0)); - - // Main reconnection loop - let mut node_address: Option
= None; - let mut reconnect_delay = INITIAL_RECONNECT_DELAY; - let max_reconnect_delay = MAX_RECONNECT_DELAY; - - loop { - info!("Starting WebSocket event listener with auto-reconnection"); - - // Create client with retry logic - let client = match create_client_with_retry(&config, &shutdown_notify).await { - Ok(client) => client, - Err(_) => break, // Shutdown requested or unrecoverable error - }; - - let current_address = client.signer_address(); - node_address = Some(current_address); - - // Register node if needed - if let Err(e) = register_node_if_needed(&client, current_address).await { - error!(error = %e, reconnect_delay = ?reconnect_delay, "Failed to register node. Retrying..."); - - // Exit the loop - std::process::exit(1); - } - - let client_arc = Arc::new(client); - - // Process any backlog of assignments - if let Err(e) = process_assignment_backlog( - client_arc.clone(), - current_address, - &verifier, - verified_counter.clone(), - shutdown_notify.clone(), - ) - .await - { - error!(error = %e, "Failed to query historical assignments"); - } - - // Start listening for events - match run_event_listener( - client_arc, - current_address, - shutdown_notify.clone(), - &verifier, - verified_counter.clone(), - ) - .await - { - Ok(_) => { - warn!(reconnect_delay = ?reconnect_delay, "WebSocket listener exited normally. Reconnecting..."); - } - Err(e) if e.to_string().contains("Shutdown") => { - break; // Graceful shutdown - } - Err(e) => { - error!(error = %e, reconnect_delay = ?reconnect_delay, "WebSocket listener error. Reconnecting..."); - } - } - - // Sleep before reconnecting, with ability to be interrupted by shutdown - tokio::select! { - _ = tokio::time::sleep(reconnect_delay) => { - reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay); - } - _ = shutdown_notify.notified() => { - break; // Shutdown requested - } - } - } + // Create and run supervisor (handles connection, validation, and event processing) + let supervisor = supervisor::Supervisor::new(&config, &verifier, shutdown_token).await?; + let client = supervisor.run().await?; - // Graceful shutdown - deactivate node from contract - if let Err(e) = deactivate_node_on_shutdown(&config, node_address).await { + // Graceful shutdown - deactivate node + if let Err(e) = shutdown::deactivate_node(&client).await { error!(error = %e, "Failed to deactivate node gracefully"); } diff --git a/blacklight-node/src/shutdown.rs b/blacklight-node/src/shutdown.rs new file mode 100644 index 0000000..6a211f7 --- /dev/null +++ b/blacklight-node/src/shutdown.rs @@ -0,0 +1,55 @@ +use anyhow::Result; +use blacklight_contract_clients::BlacklightClient; +use tokio_util::sync::CancellationToken; +use tracing::info; + +/// Setup shutdown signal handler (Ctrl+C / SIGTERM) +pub async fn shutdown_signal(shutdown_token: CancellationToken) { + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + + let mut sigterm = + signal(SignalKind::terminate()).expect("Failed to register SIGTERM handler"); + let mut sigint = + signal(SignalKind::interrupt()).expect("Failed to register SIGINT handler"); + + tokio::select! { + _ = sigterm.recv() => { + info!("Shutdown signal received (SIGTERM)"); + } + _ = sigint.recv() => { + info!("Shutdown signal received (SIGINT/Ctrl+C)"); + } + } + + shutdown_token.cancel(); + } + + #[cfg(not(unix))] + { + use tracing::error; + + match tokio::signal::ctrl_c().await { + Ok(()) => { + info!("Shutdown signal received (Ctrl+C)"); + shutdown_token.cancel(); + } + Err(err) => { + error!(error = %err, "Failed to listen for shutdown signal"); + } + } + } +} + +/// Deactivate node from contract on shutdown +pub async fn deactivate_node(client: &BlacklightClient) -> Result<()> { + let node_address = client.signer_address(); + info!("Initiating graceful shutdown"); + info!(node_address = %node_address, "Deactivating node from contract"); + + let tx_hash = client.staking.deactivate_operator().await?; + info!(node_address = %node_address, tx_hash = ?tx_hash, "Node deactivated successfully"); + + Ok(()) +} diff --git a/blacklight-node/src/supervisor/events.rs b/blacklight-node/src/supervisor/events.rs new file mode 100644 index 0000000..50b5658 --- /dev/null +++ b/blacklight-node/src/supervisor/events.rs @@ -0,0 +1,37 @@ +use anyhow::Result; +use blacklight_contract_clients::BlacklightClient; +use std::sync::Arc; +use tracing::info; + +use super::htx::{HtxEventSource, HtxProcessor}; +/// Listen for HTX assignment events and process them +pub async fn run_event_listener(client: BlacklightClient, processor: HtxProcessor) -> Result<()> { + let client_for_callback = client.clone(); + let processor_for_callback = processor.clone(); + + let manager = Arc::new(client.manager.clone()); + let node_address = processor.node_address(); + let listen_future = manager.listen_htx_assigned_for_node(node_address, move |event| { + let client = client_for_callback.clone(); + let processor = processor_for_callback.clone(); + async move { + let vote_address = client.signer_address(); + processor.spawn_processing(event, vote_address, HtxEventSource::Realtime, true); + + Ok(()) + } + }); + + // Listen for either events or shutdown signal + let shutdown_token = processor.shutdown_token(); + tokio::select! { + result = listen_future => { + result?; + Ok(()) + }, + _ = shutdown_token.cancelled() => { + info!("Shutdown signal received during event listening"); + Err(anyhow::anyhow!("Shutdown requested")) + } + } +} diff --git a/blacklight-node/src/supervisor/htx.rs b/blacklight-node/src/supervisor/htx.rs new file mode 100644 index 0000000..402d20a --- /dev/null +++ b/blacklight-node/src/supervisor/htx.rs @@ -0,0 +1,251 @@ +use alloy::primitives::Address; +use anyhow::Result; +use blacklight_contract_clients::{ + BlacklightClient, + heartbeat_manager::{RoundStartedEvent, Verdict}, + htx::Htx, +}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +use crate::supervisor::status::{check_minimum_balance, print_status}; +use crate::supervisor::version::validate_node_version; +use crate::verification::HtxVerifier; + +#[derive(Clone)] +pub struct HtxProcessor { + client: BlacklightClient, + verifier: HtxVerifier, + verified_counter: Arc, + node_address: Address, + shutdown_token: CancellationToken, +} + +impl HtxProcessor { + pub fn new( + client: BlacklightClient, + verifier: HtxVerifier, + verified_counter: Arc, + node_address: Address, + shutdown_token: CancellationToken, + ) -> Self { + Self { + client, + verifier, + verified_counter, + node_address, + shutdown_token, + } + } + + pub fn node_address(&self) -> Address { + self.node_address + } + + pub fn shutdown_token(&self) -> CancellationToken { + self.shutdown_token.clone() + } + + pub fn spawn_processing( + &self, + event: RoundStartedEvent, + vote_address: Address, + source: HtxEventSource, + run_post_process: bool, + ) { + let processor = self.clone(); + tokio::spawn(async move { + let htx_id = event.heartbeatKey; + + let client = processor.client.clone(); + // Check if already responded + match client.manager.get_node_vote(htx_id, vote_address).await { + Ok(Some(_)) => { + if let Some(message) = source.already_responded_message() { + debug!(htx_id = ?htx_id, "{}", message); + } + } + Ok(None) => { + info!(htx_id = ?htx_id, "{}", source.received_message()); + match processor.process_htx_assignment(client, event).await { + Ok(Some(count)) => { + if run_post_process + && let Err(e) = processor.post_process_checks(count).await + { + warn!(error = %e, "Failed to post-process events"); + } + } + Ok(None) => {} + Err(e) => { + error!(htx_id = ?htx_id, error = %e, "{}", source.process_error_message()); + } + } + } + Err(e) => { + error!(htx_id = ?htx_id, error = %e, "{}", source.vote_error_message()); + } + } + }); + } + + /// Process a single HTX assignment - verifies and submits result + pub async fn process_htx_assignment( + &self, + client: BlacklightClient, + event: RoundStartedEvent, + ) -> Result> { + let htx_id = event.heartbeatKey; + // Parse the HTX data - tries JSON first (nilCC/Phala), then ABI decoding (ERC-8004) + let verification_result = match Htx::try_parse(&event.rawHTX) { + Ok(htx) => match htx { + Htx::Nillion(htx) => { + info!(htx_id = ?htx_id, "Detected nilCC HTX"); + self.verifier.verify_nillion_htx(&htx).await + } + Htx::Phala(htx) => { + info!(htx_id = ?htx_id, "Detected Phala HTX"); + self.verifier.verify_phala_htx(&htx).await + } + Htx::Erc8004(htx) => { + info!( + htx_id = ?htx_id, + agent_id = %htx.agent_id, + request_uri = %htx.request_uri, + "Detected ERC-8004 validation HTX" + ); + self.verifier.verify_erc8004_htx(&htx).await + } + }, + Err(e) => { + error!(htx_id = ?htx_id, error = %e, "Failed to parse HTX data"); + // If we parse invalid data, it could be a malicious node, so Failure and it doesn't get rewarded + client + .manager + .respond_htx(event, Verdict::Failure, self.node_address) + .await?; + info!(htx_id = ?htx_id, "✅ HTX verification submitted"); + return Ok(None); + } + }; + let verdict = match verification_result { + Ok(_) => Verdict::Success, + Err(ref e) => e.verdict(), + }; + + // Submit the verification result + match client + .manager + .respond_htx(event, verdict, self.node_address) + .await + { + Ok(tx_hash) => { + let count = self.verified_counter.fetch_add(1, Ordering::SeqCst) + 1; + + match (verdict, verification_result) { + (Verdict::Success, Ok(_)) => { + info!(tx_hash=?tx_hash, "✅ VALID HTX verification submitted"); + } + (Verdict::Failure, Err(e)) => { + info!(tx_hash=?tx_hash, error=?e, verdict="failure", "❌ INVALID HTX verification submitted"); + } + (Verdict::Inconclusive, Err(e)) => { + info!(tx_hash=?tx_hash, error=?e, verdict="inconclusive", "⚠️ INCONCLUSIVE HTX verification submitted"); + } + (_, _) => { + error!(tx_hash=?tx_hash, verdict=?verdict, "Unexpected verification state"); + } + } + + Ok(Some(count)) + } + Err(e) => { + error!(htx_id = ?htx_id, error = %e, "Failed to respond to HTX"); + Err(e) + } + } + } + + /// Process backlog of historical assignments + pub async fn process_assignment_backlog(&self, client: BlacklightClient) -> Result<()> { + info!("Checking for pending assignments from before connection"); + + let assigned_events = client.manager.get_htx_assigned_events().await?; + let pending: Vec<_> = assigned_events + .into_iter() + .filter(|e| e.members.contains(&self.node_address)) + .collect(); + + if pending.is_empty() { + info!("No pending assignments found"); + return Ok(()); + } + + info!( + count = pending.len(), + "Found historical assignments, processing backlog" + ); + + for event in pending { + self.spawn_processing(event, self.node_address, HtxEventSource::Backlog, false); + } + + info!("Backlog processing complete"); + Ok(()) + } + + pub async fn post_process_checks(&self, verified_count: u64) -> Result<()> { + let client = self.client.clone(); + let shutdown_token = self.shutdown_token.clone(); + if let Err(e) = print_status(&client, verified_count).await { + warn!(error = %e, "Failed to fetch status information"); + } + + if let Err(e) = check_minimum_balance(&client, &shutdown_token).await { + warn!(error = %e, "Failed to check minimum balance is above minimum threshold"); + } + + if let Err(e) = validate_node_version(&client).await { + warn!(error = %e, "Failed to validate node version against protocol requirement"); + } + + Ok(()) + } +} + +#[derive(Copy, Clone)] +pub enum HtxEventSource { + Realtime, + Backlog, +} + +impl HtxEventSource { + fn received_message(self) -> &'static str { + match self { + Self::Realtime => "📥 HTX received", + Self::Backlog => "📥 HTX received (backlog)", + } + } + + fn already_responded_message(self) -> Option<&'static str> { + match self { + Self::Realtime => None, + Self::Backlog => Some("Already responded HTX, skipping"), + } + } + + fn process_error_message(self) -> &'static str { + match self { + Self::Realtime => "Failed to process real-time HTX", + Self::Backlog => "Failed to process pending HTX", + } + } + + fn vote_error_message(self) -> &'static str { + match self { + Self::Realtime => "Failed to get assignment for HTX", + Self::Backlog => "Failed to check assignment status", + } + } +} diff --git a/blacklight-node/src/supervisor/mod.rs b/blacklight-node/src/supervisor/mod.rs new file mode 100644 index 0000000..a8167e6 --- /dev/null +++ b/blacklight-node/src/supervisor/mod.rs @@ -0,0 +1,222 @@ +use alloy::primitives::Address; +use anyhow::{Result, bail}; +use blacklight_contract_clients::{BlacklightClient, ContractConfig}; +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::time::Duration; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +use crate::args::{NodeConfig, validate_node_requirements}; +use crate::verification::HtxVerifier; + +use crate::supervisor::htx::HtxProcessor; +use crate::supervisor::version::validate_node_version; + +mod events; +mod htx; +mod status; +mod version; + +/// Initial reconnection delay +const INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1); +/// Maximum reconnection delay +const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(60); + +/// Node supervisor - manages WebSocket connection, reconnection, and event processing +pub struct Supervisor<'a> { + config: &'a NodeConfig, + verifier: &'a HtxVerifier, + shutdown_token: CancellationToken, + verified_counter: Arc, + node_address: Address, + reconnect_delay: Duration, + client: BlacklightClient, +} + +impl<'a> Supervisor<'a> { + /// Create a new supervisor, establishing the initial connection and validating requirements + pub async fn new( + config: &'a NodeConfig, + verifier: &'a HtxVerifier, + shutdown_token: CancellationToken, + ) -> Result { + let client = Self::create_client_with_retry(config, &shutdown_token).await?; + let node_address = client.signer_address(); + + // Validate node version against protocol requirement + validate_node_version(&client).await?; + + // Validate node has sufficient ETH and staked NIL tokens + validate_node_requirements(&client, &config.rpc_url, config.was_wallet_created).await?; + + info!(node_address = %node_address, "Node initialized"); + + Ok(Self { + config, + verifier, + shutdown_token, + verified_counter: Arc::new(AtomicU64::new(0)), + node_address, + reconnect_delay: INITIAL_RECONNECT_DELAY, + client, + }) + } + + /// Run the supervisor loop, returns the client for use in shutdown + pub async fn run(mut self) -> Result { + loop { + info!("Starting WebSocket event listener with auto-reconnection"); + info!("Press Ctrl+C to gracefully shutdown and deactivate"); + + // Use existing client or create a new one + let client = self.client.clone(); + + // Register node if needed + if let Err(e) = self.register_node_if_needed(&client).await { + error!(error = %e, "Failed to register node"); + std::process::exit(1); + } + + // Process any backlog of assignments + if let Err(e) = self.process_backlog(client.clone()).await { + error!(error = %e, "Failed to query historical assignments"); + } + + // Start listening for events + match self.listen_for_events(client).await { + Ok(_) => { + warn!("WebSocket listener exited normally. Reconnecting..."); + if self.reconnect_client().await? { + break; + } + } + Err(e) if e.to_string().contains("Shutdown") => { + break; + } + Err(e) => { + error!(error = %e, "WebSocket listener error. Reconnecting..."); + if self.reconnect_client().await? { + break; + } + } + } + } + + Ok(self.client) + } + + /// Create a new WebSocket client + async fn create_client(config: &NodeConfig) -> Result { + let contract_config = ContractConfig::new( + config.rpc_url.clone(), + config.manager_contract_address, + config.staking_contract_address, + config.token_contract_address, + ); + BlacklightClient::new(contract_config, config.private_key.clone()).await + } + + /// Create a client with retry/backoff. Returns Shutdown error if cancelled. + async fn create_client_with_retry( + config: &NodeConfig, + shutdown_token: &CancellationToken, + ) -> Result { + let mut reconnect_delay = INITIAL_RECONNECT_DELAY; + loop { + match Self::create_client(config).await { + Ok(client) => return Ok(client), + Err(e) => { + error!(error = %e, "Failed to create client. Retrying..."); + let sleep = tokio::time::sleep(reconnect_delay); + tokio::select! { + _ = sleep => { + reconnect_delay = std::cmp::min( + reconnect_delay * 2, + MAX_RECONNECT_DELAY + ); + } + _ = shutdown_token.cancelled() => { + bail!("Shutdown requested during initial connect"); + } + } + } + } + } + } + + /// Register node with the contract if not already registered + async fn register_node_if_needed(&self, client: &BlacklightClient) -> Result<()> { + info!(node_address = %self.node_address, "Checking node registration"); + + let is_registered = client.staking.is_active_operator(self.node_address).await?; + + if is_registered { + info!("Node already registered"); + return Ok(()); + } + + info!("Registering node with contract"); + let tx_hash = client.staking.register_operator("".to_string()).await?; + info!(tx_hash = ?tx_hash, "Node registered successfully"); + + Ok(()) + } + + /// Process backlog of historical assignments + async fn process_backlog(&self, client: BlacklightClient) -> Result<()> { + self.build_htx_processor(client.clone()) + .process_assignment_backlog(client) + .await + } + + /// Listen for HTX assignment events + async fn listen_for_events(&self, client: BlacklightClient) -> Result<()> { + events::run_event_listener(client.clone(), self.build_htx_processor(client)).await + } + + fn build_htx_processor(&self, client: BlacklightClient) -> HtxProcessor { + HtxProcessor::new( + client, + self.verifier.clone(), + self.verified_counter.clone(), + self.node_address, + self.shutdown_token.clone(), + ) + } + + /// Reconnect the client with retry/backoff. Returns true if shutdown was requested. + async fn reconnect_client(&mut self) -> Result { + loop { + match Self::create_client(self.config).await { + Ok(client) => { + self.client = client; + self.reconnect_delay = INITIAL_RECONNECT_DELAY; + return Ok(false); + } + Err(e) => { + error!(error = %e, "Failed to create client. Retrying..."); + if self.wait_before_reconnect().await { + return Ok(true); + } + } + } + } + } + + /// Wait before reconnecting, returns true if shutdown was requested + async fn wait_before_reconnect(&mut self) -> bool { + tokio::select! { + _ = tokio::time::sleep(self.reconnect_delay) => { + self.reconnect_delay = std::cmp::min( + self.reconnect_delay * 2, + MAX_RECONNECT_DELAY + ); + false + } + _ = self.shutdown_token.cancelled() => { + true + } + } + } +} diff --git a/blacklight-node/src/supervisor/status.rs b/blacklight-node/src/supervisor/status.rs new file mode 100644 index 0000000..5400e80 --- /dev/null +++ b/blacklight-node/src/supervisor/status.rs @@ -0,0 +1,49 @@ +use alloy::primitives::utils::{format_ether, format_units}; +use anyhow::{Result, bail}; +use blacklight_contract_clients::BlacklightClient; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +use crate::args::MIN_ETH_BALANCE; + +/// Print status information (ETH balance, staked balance, verified HTXs) +pub async fn print_status(client: &BlacklightClient, verified_count: u64) -> Result<()> { + let eth_balance = client.get_balance().await?; + let node_address = client.signer_address(); + let staked_balance = client.staking.stake_of(node_address).await?; + + info!( + "📊 STATUS | ETH: {} | STAKED: {} NIL | Verified HTXs since boot: {}", + format_ether(eth_balance), + format_units(staked_balance, 6)?, + verified_count + ); + + Ok(()) +} + +/// Print status and check balance after HTX processing +pub async fn check_minimum_balance( + client: &BlacklightClient, + shutdown_token: &CancellationToken, +) -> Result<()> { + // Check if balance is below minimum threshold + match client.get_balance().await { + Ok(balance) => { + if balance < MIN_ETH_BALANCE { + error!( + balance = %format_ether(balance), + min_required = %format_ether(MIN_ETH_BALANCE), + "⚠️ ETH balance below minimum threshold. Initiating shutdown..." + ); + shutdown_token.cancel(); + bail!("Insufficient ETH balance"); + } + } + Err(e) => { + warn!(error = %e, "Failed to check balance after transaction"); + } + } + + Ok(()) +} diff --git a/blacklight-node/src/version.rs b/blacklight-node/src/supervisor/version.rs similarity index 99% rename from blacklight-node/src/version.rs rename to blacklight-node/src/supervisor/version.rs index f82492c..4862a03 100644 --- a/blacklight-node/src/version.rs +++ b/blacklight-node/src/supervisor/version.rs @@ -1,4 +1,4 @@ -use anyhow::{Result, anyhow}; +use anyhow::{Result, anyhow, bail}; use semver::Version; use blacklight_contract_clients::BlacklightClient; @@ -121,12 +121,12 @@ pub async fn validate_node_version(client: &BlacklightClient) -> Result<()> { "Node version is incompatible with protocol requirement; upgrade required" ); - Err(anyhow!( + bail!( "Node version {} is incompatible with required {}. Upgrade with: {}", VERSION, required_version, upgrade_cmd - )) + ); } } } diff --git a/crates/blacklight-contract-clients/src/htx/abi/erc8004.rs b/crates/blacklight-contract-clients/src/htx/abi/erc8004.rs new file mode 100644 index 0000000..f1bedef --- /dev/null +++ b/crates/blacklight-contract-clients/src/htx/abi/erc8004.rs @@ -0,0 +1,63 @@ +//! ERC-8004 HTX - ABI-Encoded +//! +//! On-chain validation standard for agent validations. + +use alloy::primitives::{Address, B256, U256}; +use alloy::sol_types::{sol_data, SolType}; + +/// ERC-8004 Validation HTX data parsed from ABI-encoded bytes. +/// +/// This format follows the ERC-8004 standard for on-chain agent validations. +/// +/// **ABI Format**: `abi.encode(validatorAddress, agentId, requestURI, requestHash)` +#[derive(Debug, Clone)] +pub struct Erc8004Htx { + /// Address of the validator performing the validation. + pub validator_address: Address, + /// Unique identifier for the agent being validated. + pub agent_id: U256, + /// URI pointing to the validation request data. + pub request_uri: String, + /// Hash of the validation request for integrity verification. + pub request_hash: B256, +} + +impl Erc8004Htx { + /// Decode ABI-encoded ERC-8004 validation data from raw bytes. + /// + /// # Errors + /// + /// Returns `Erc8004DecodeError` if the data cannot be decoded according to + /// the ERC-8004 ABI specification. + pub fn try_decode(data: &[u8]) -> Result { + type Erc8004Tuple = ( + sol_data::Address, + sol_data::Uint<256>, + sol_data::String, + sol_data::FixedBytes<32>, + ); + + let (validator_address, agent_id, request_uri, request_hash) = + Erc8004Tuple::abi_decode_params(data) + .map_err(|e| Erc8004DecodeError(e.to_string()))?; + + Ok(Self { + validator_address, + agent_id, + request_uri, + request_hash, + }) + } +} + +/// Error type for ERC-8004 HTX decoding failures. +#[derive(Debug)] +pub struct Erc8004DecodeError(pub String); + +impl std::fmt::Display for Erc8004DecodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ERC-8004 decode error: {}", self.0) + } +} + +impl std::error::Error for Erc8004DecodeError {} diff --git a/crates/blacklight-contract-clients/src/htx/abi/mod.rs b/crates/blacklight-contract-clients/src/htx/abi/mod.rs new file mode 100644 index 0000000..d12c245 --- /dev/null +++ b/crates/blacklight-contract-clients/src/htx/abi/mod.rs @@ -0,0 +1,36 @@ +//! ABI-Encoded HTX Formats +//! +//! This module contains all ABI-encoded HTX types. + +pub mod erc8004; + +pub use erc8004::*; + +/// ABI-encoded HTX wrapper for all ABI-based formats. +#[derive(Debug, Clone)] +pub enum AbiHtx { + Erc8004(Erc8004Htx), +} + +impl AbiHtx { + /// Try to decode ABI-encoded HTX data, attempting all known formats. + /// + /// # Errors + /// + /// Returns `AbiDecodeError::UnknownFormat` if the data doesn't match any + /// supported ABI format. + pub fn try_decode(data: &[u8]) -> Result { + if let Ok(erc8004_htx) = Erc8004Htx::try_decode(data) { + return Ok(AbiHtx::Erc8004(erc8004_htx)); + } + + Err(AbiDecodeError::UnknownFormat) + } +} + +/// Error type for ABI HTX decoding failures. +#[derive(Debug, thiserror::Error)] +pub enum AbiDecodeError { + #[error("Unknown ABI format: not valid ERC-8004 or other known ABI encoding")] + UnknownFormat, +} diff --git a/crates/blacklight-contract-clients/src/htx/json/mod.rs b/crates/blacklight-contract-clients/src/htx/json/mod.rs new file mode 100644 index 0000000..d67b536 --- /dev/null +++ b/crates/blacklight-contract-clients/src/htx/json/mod.rs @@ -0,0 +1,24 @@ +//! JSON-Encoded HTX Formats +//! +//! This module contains all JSON-encoded HTX types from various providers. + +pub mod nillion; +pub mod phala; + +use serde::{Deserialize, Serialize}; + +pub use nillion::*; +pub use phala::*; + +/// JSON-serializable HTX wrapper for deserialization from JSON files. +/// +/// This enum encompasses all JSON-encoded HTX formats. Each variant corresponds +/// to a different confidential compute provider. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "provider", rename_all = "camelCase")] +pub enum JsonHtx { + /// Nillion confidential compute HTX. + Nillion(NillionHtx), + /// Phala confidential compute HTX. + Phala(PhalaHtx), +} diff --git a/crates/blacklight-contract-clients/src/htx/json/nillion.rs b/crates/blacklight-contract-clients/src/htx/json/nillion.rs new file mode 100644 index 0000000..118e813 --- /dev/null +++ b/crates/blacklight-contract-clients/src/htx/json/nillion.rs @@ -0,0 +1,69 @@ +//! Nillion HTX - JSON-Encoded, Versioned +//! +//! TEE-based confidential compute workloads with hardware measurements. + +use serde::{Deserialize, Serialize}; +use serde_with::{hex::Hex, serde_as}; + +/// Nillion workload identifier with optional history tracking. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WorkloadId { + pub current: String, + pub previous: Option, +} + +/// Nillion confidential compute operator information. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NilCcOperator { + pub id: u64, + pub name: String, +} + +/// Builder information for Nillion workloads. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Builder { + pub id: u64, + pub name: String, +} + +/// Measurement data for a Nillion workload including hardware requirements. +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WorkloadMeasurement { + pub url: String, + pub artifacts_version: String, + pub cpus: u64, + pub gpus: u64, + #[serde_as(as = "Hex")] + pub docker_compose_hash: [u8; 32], +} + +/// Builder measurement data for Nillion workloads. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BuilderMeasurement { + pub url: String, +} + +/// Nillion HTX Version 1 - Initial format. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NillionHtxV1 { + pub workload_id: WorkloadId, + pub operator: Option, + pub builder: Option, + pub workload_measurement: WorkloadMeasurement, + pub builder_measurement: BuilderMeasurement, +} + +/// Versioned Nillion HTX format. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "version", rename_all = "camelCase")] +pub enum NillionHtx { + /// Version 1: Initial Nillion HTX format. + V1(NillionHtxV1), +} + +impl From for NillionHtx { + fn from(htx: NillionHtxV1) -> Self { + NillionHtx::V1(htx) + } +} diff --git a/crates/blacklight-contract-clients/src/htx/json/phala.rs b/crates/blacklight-contract-clients/src/htx/json/phala.rs new file mode 100644 index 0000000..53866c7 --- /dev/null +++ b/crates/blacklight-contract-clients/src/htx/json/phala.rs @@ -0,0 +1,27 @@ +//! Phala HTX - JSON-Encoded, Versioned +//! +//! TEE-based confidential compute with attestation data. + +use serde::{Deserialize, Serialize}; + +/// Phala attestation data containing quote and event logs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PhalaAttestData { + pub quote: String, + pub event_log: String, +} + +/// Phala HTX Version 1 - Initial format. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PhalaHtxV1 { + pub app_compose: String, + pub attest_data: PhalaAttestData, +} + +/// Versioned Phala HTX format. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "version", rename_all = "camelCase")] +pub enum PhalaHtx { + /// Version 1: Initial Phala HTX format. + V1(PhalaHtxV1), +} diff --git a/crates/blacklight-contract-clients/src/htx.rs b/crates/blacklight-contract-clients/src/htx/mod.rs similarity index 52% rename from crates/blacklight-contract-clients/src/htx.rs rename to crates/blacklight-contract-clients/src/htx/mod.rs index 2ff6624..8461952 100644 --- a/crates/blacklight-contract-clients/src/htx.rs +++ b/crates/blacklight-contract-clients/src/htx/mod.rs @@ -1,179 +1,71 @@ -use alloy::dyn_abi::{DynSolType, DynSolValue}; -use alloy::primitives::{Address, B256, Bytes, U256}; +//! # HTX Data Types +//! +//! This module provides a taxonomy of HTX formats organized by encoding type and version. +//! +//! ## Taxonomy +//! +//! ### JSON-Encoded HTXs ([`json`] module) +//! - **Nillion** ([`json::nillion`]): TEE-based confidential compute workloads +//! - V1: Initial version with workload measurements, operators, and builders +//! - **Phala** ([`json::phala`]): TEE-based confidential compute with attestation +//! - V1: Initial version with app composition and attestation data +//! +//! ### ABI-Encoded HTXs ([`abi`] module) +//! - **ERC-8004** ([`abi::erc8004`]): On-chain validation standard for agent validations +//! - Format: `abi.encode(validatorAddress, agentId, requestURI, requestHash)` +//! +//! ## Main Types +//! +//! - [`Htx`]: Unified enum for all HTX types to be used externally +//! - [`JsonHtx`]: JSON-serializable HTX formats (Nillion, Phala) +//! - [`AbiHtx`]: ABI-encoded HTX formats (ERC-8004) + +pub mod abi; +pub mod json; + +use alloy::primitives::Bytes; use alloy::sol_types::SolValue; -use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; -use serde_with::{hex::Hex, serde_as}; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WorkloadId { - pub current: String, - pub previous: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NilCcOperator { - pub id: u64, - pub name: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Builder { - pub id: u64, - pub name: String, -} - -#[serde_as] -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WorkloadMeasurement { - pub url: String, - pub artifacts_version: String, - pub cpus: u64, - pub gpus: u64, - #[serde_as(as = "Hex")] - pub docker_compose_hash: [u8; 32], -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BuilderMeasurement { - pub url: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NillionHtxV1 { - pub workload_id: WorkloadId, - pub operator: Option, - pub builder: Option, - pub workload_measurement: WorkloadMeasurement, - pub builder_measurement: BuilderMeasurement, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "version", rename_all = "camelCase")] -pub enum NillionHtx { - /// The first HTX format version. - V1(NillionHtxV1), -} - -impl From for NillionHtx { - fn from(htx: NillionHtxV1) -> Self { - NillionHtx::V1(htx) - } -} +pub use abi::*; +pub use json::*; -// Phala HTX types -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PhalaAttestData { - pub quote: String, - pub event_log: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PhalaHtxV1 { - pub app_compose: String, - pub attest_data: PhalaAttestData, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "version", rename_all = "camelCase")] -pub enum PhalaHtx { - /// The first HTX format version. - V1(PhalaHtxV1), -} - -/// ERC-8004 Validation HTX data parsed from ABI-encoded bytes. -/// Format: `abi.encode(validatorAddress, agentId, requestURI, requestHash)` +/// Unified HTX type encompassing all supported formats. +/// +/// This enum provides a common interface for working with different HTX types. +/// The variants are organized by provider/standard for convenient pattern matching. +/// +/// The module structure organizes types by encoding (json/ and abi/ modules), +/// but the enum is flat for ease of use. #[derive(Debug, Clone)] -pub struct Erc8004Htx { - pub validator_address: Address, - pub agent_id: U256, - pub request_uri: String, - pub request_hash: B256, +pub enum Htx { + /// Nillion confidential compute HTX (JSON-encoded, versioned). + Nillion(json::NillionHtx), + /// Phala confidential compute HTX (JSON-encoded, versioned). + Phala(json::PhalaHtx), + /// ERC-8004 validation HTX (ABI-encoded). + Erc8004(abi::Erc8004Htx), } -impl Erc8004Htx { - /// Try to decode ABI-encoded ERC-8004 validation data. - pub fn try_decode(data: &[u8]) -> Result { - let tuple_type = DynSolType::Tuple(vec![ - DynSolType::Address, - DynSolType::Uint(256), - DynSolType::String, - DynSolType::FixedBytes(32), - ]); - - let decoded = tuple_type - .abi_decode_params(data) - .map_err(|e| Erc8004DecodeError(e.to_string()))?; - - let values = match decoded { - DynSolValue::Tuple(values) => values, - _ => return Err(Erc8004DecodeError("Expected tuple".to_string())), - }; - - if values.len() != 4 { - return Err(Erc8004DecodeError(format!( - "Expected 4 values, got {}", - values.len() - ))); - } - - let validator_address = match &values[0] { - DynSolValue::Address(addr) => *addr, - _ => return Err(Erc8004DecodeError("Expected address".to_string())), - }; - - let agent_id = match &values[1] { - DynSolValue::Uint(val, _) => *val, - _ => return Err(Erc8004DecodeError("Expected uint256".to_string())), - }; - - let request_uri = match &values[2] { - DynSolValue::String(s) => s.clone(), - _ => return Err(Erc8004DecodeError("Expected string".to_string())), - }; - - let request_hash = match &values[3] { - DynSolValue::FixedBytes(word, 32) => B256::from_slice(word.as_slice()), - _ => return Err(Erc8004DecodeError("Expected bytes32".to_string())), - }; - - Ok(Self { - validator_address, - agent_id, - request_uri, - request_hash, - }) +impl From for Htx { + fn from(htx: json::NillionHtx) -> Self { + Htx::Nillion(htx) } } -#[derive(Debug)] -pub struct Erc8004DecodeError(pub String); - -impl std::fmt::Display for Erc8004DecodeError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "ERC-8004 decode error: {}", self.0) +impl From for Htx { + fn from(htx: json::PhalaHtx) -> Self { + Htx::Phala(htx) } } -impl std::error::Error for Erc8004DecodeError {} - -/// Unified HTX type that can represent nilCC, Phala, and ERC-8004 HTXs. -#[derive(Debug, Clone)] -pub enum Htx { - Nillion(NillionHtx), - Phala(PhalaHtx), - Erc8004(Erc8004Htx), -} - -/// JSON-serializable HTX types (Nillion and Phala only, not ERC-8004). -/// Used for loading HTXs from JSON files. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "provider", rename_all = "camelCase")] -pub enum JsonHtx { - Nillion(NillionHtx), - Phala(PhalaHtx), +impl From for Htx { + fn from(htx: abi::Erc8004Htx) -> Self { + Htx::Erc8004(htx) + } } +// Internal conversions for parsing impl From for Htx { fn from(htx: JsonHtx) -> Self { match htx { @@ -183,71 +75,93 @@ impl From for Htx { } } +impl From for Htx { + fn from(htx: AbiHtx) -> Self { + match htx { + AbiHtx::Erc8004(htx) => Htx::Erc8004(htx), + } + } +} + impl Htx { - /// Parse HTX from raw bytes, trying JSON first then ABI decoding. + /// Parse HTX from raw bytes using auto-detection. + /// + /// This method attempts to parse the data in the following order: + /// 1. JSON-encoded HTX (Nillion or Phala) + /// 2. ABI-encoded HTX (ERC-8004) + /// + /// # Errors + /// + /// Returns `HtxParseError::UnknownFormat` if the data doesn't match any + /// supported HTX format. pub fn try_parse(data: &[u8]) -> Result { if let Ok(json_htx) = serde_json::from_slice::(data) { return Ok(json_htx.into()); } - if let Ok(erc8004_htx) = Erc8004Htx::try_decode(data) { - return Ok(erc8004_htx.into()); + if let Ok(abi_htx) = AbiHtx::try_decode(data) { + return Ok(abi_htx.into()); } Err(HtxParseError::UnknownFormat) } } +/// Error type for HTX parsing failures. #[derive(Debug, thiserror::Error)] pub enum HtxParseError { - #[error("Unknown HTX format: not valid JSON or ABI-encoded ERC-8004")] + #[error("Unknown HTX format: not valid JSON or ABI-encoded")] UnknownFormat, } -impl From for Htx { - fn from(htx: NillionHtx) -> Self { - Htx::Nillion(htx) - } -} - -impl From for Htx { - fn from(htx: PhalaHtx) -> Self { - Htx::Phala(htx) - } -} - -impl From for Htx { - fn from(htx: Erc8004Htx) -> Self { - Htx::Erc8004(htx) - } -} +// ============================================================================ +// Serialization & Encoding +// ============================================================================ impl TryFrom<&Htx> for Bytes { type Error = anyhow::Error; + /// Convert an HTX to its wire format (bytes). + /// + /// - JSON-encoded HTXs (Nillion, Phala) are serialized as canonical JSON + /// - ABI-encoded HTXs (ERC-8004) are encoded according to their ABI specification fn try_from(htx: &Htx) -> Result { match htx { Htx::Nillion(htx) => json_htx_to_bytes(JsonHtx::Nillion(htx.clone())), Htx::Phala(htx) => json_htx_to_bytes(JsonHtx::Phala(htx.clone())), - Htx::Erc8004(htx) => { - let tuple = ( - htx.validator_address, - htx.agent_id, - htx.request_uri.clone(), - htx.request_hash, - ); - Ok(Bytes::from(tuple.abi_encode())) - } + Htx::Erc8004(htx) => abi_htx_to_bytes(&AbiHtx::Erc8004(htx.clone())), } } } +/// Serialize a JSON HTX to bytes with canonical JSON formatting. +/// +/// Canonical formatting ensures deterministic serialization by sorting +/// all object keys alphabetically. fn json_htx_to_bytes(htx: JsonHtx) -> Result { let json = canonicalize_json(&serde_json::to_value(htx)?); let json = serde_json::to_string(&json)?; Ok(Bytes::from(json.into_bytes())) } +/// Encode an ABI HTX to bytes according to its ABI specification. +fn abi_htx_to_bytes(htx: &AbiHtx) -> Result { + match htx { + AbiHtx::Erc8004(htx) => { + let tuple = ( + htx.validator_address, + htx.agent_id, + htx.request_uri.clone(), + htx.request_hash, + ); + Ok(Bytes::from(tuple.abi_encode())) + } + } +} + +/// Canonicalize JSON by recursively sorting all object keys. +/// +/// This ensures deterministic serialization regardless of insertion order. fn canonicalize_json(value: &Value) -> Value { match value { Value::Object(map) => { @@ -264,12 +178,22 @@ fn canonicalize_json(value: &Value) -> Value { } } +// ============================================================================ +// Tests +// ============================================================================ + #[cfg(test)] mod tests { use super::*; + // ------------------------------------------------------------------------ + // JSON-Encoded HTX Tests + // ------------------------------------------------------------------------ + #[test] - fn test_htx_deterministic_serialization() { + fn test_nillion_deterministic_serialization() { + use json::*; + // Create an HTX let htx = NillionHtxV1 { workload_id: WorkloadId { @@ -326,7 +250,34 @@ mod tests { } #[test] - fn test_htx_phala_serialization() { + fn test_nillion_deserialization() { + let nilcc_json = r#"{ + "provider": "nillion", + "version": "v1", + "workload_id": { + "current": "1", + "previous": null + }, + "workload_measurement": { + "url": "https://example.com/measurement", + "artifacts_version": "1.0.0", + "cpus": 8, + "gpus": 0, + "docker_compose_hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + "builder_measurement": { + "url": "https://example.com/builder" + } + }"#; + + let htx: JsonHtx = serde_json::from_str(nilcc_json).unwrap(); + assert!(matches!(htx, JsonHtx::Nillion(_)), "not a nillion HTX"); + } + + #[test] + fn test_phala_serialization() { + use json::*; + let htx_phala = PhalaHtxV1 { app_compose: "test-compose-config".to_string(), attest_data: PhalaAttestData { @@ -346,7 +297,7 @@ mod tests { } #[test] - fn test_deserialize_phala() { + fn test_phala_deserialization() { let phala_json = r#"{ "provider": "phala", "version": "v1", @@ -364,9 +315,32 @@ mod tests { assert_eq!(htx.app_compose, "test-compose"); } + // ------------------------------------------------------------------------ + // ABI-Encoded HTX Tests + // ------------------------------------------------------------------------ + #[test] - fn test_deserialize_nillion() { - let nilcc_json = r#"{ + fn test_erc8004_decode() { + use alloy::primitives::Address; + + // Test data: abi.encode(0x5fc8d32690cc91d4c39d9d3abcbd16989f875707, 0, "https://api.nilai.nillion.network/", 0xa6719a2ea05fac172c1b20e16beea2a9739b715499a3a9ad488e6ce81602ffac) + let raw_hex = "0000000000000000000000005fc8d32690cc91d4c39d9d3abcbd16989f87570700000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080a6719a2ea05fac172c1b20e16beea2a9739b715499a3a9ad488e6ce81602ffac000000000000000000000000000000000000000000000000000000000000002268747470733a2f2f6170692e6e696c61692e6e696c6c696f6e2e6e6574776f726b2f000000000000000000000000000000000000000000000000000000000000"; + let data = alloy::hex::decode(raw_hex).unwrap(); + + let htx = Erc8004Htx::try_decode(&data).expect("should decode ERC-8004 HTX"); + assert_eq!( + htx.validator_address, + "0x5fc8d32690cc91d4c39d9d3abcbd16989f875707" + .parse::
() + .unwrap() + ); + assert_eq!(htx.agent_id, alloy::primitives::U256::ZERO); + assert_eq!(htx.request_uri, "https://api.nilai.nillion.network/"); + } + + #[test] + fn test_htx_parse_json() { + let json_data = r#"{ "provider": "nillion", "version": "v1", "workload_id": { @@ -385,24 +359,16 @@ mod tests { } }"#; - let htx: JsonHtx = serde_json::from_str(nilcc_json).unwrap(); - assert!(matches!(htx, JsonHtx::Nillion(_)), "not a nillion HTX"); + let htx = Htx::try_parse(json_data.as_bytes()).unwrap(); + assert!(matches!(htx, Htx::Nillion(_))); } #[test] - fn test_erc8004_decode() { - // Test data: abi.encode(0x5fc8d32690cc91d4c39d9d3abcbd16989f875707, 0, "https://api.nilai.nillion.network/", 0xa6719a2ea05fac172c1b20e16beea2a9739b715499a3a9ad488e6ce81602ffac) + fn test_htx_parse_abi() { let raw_hex = "0000000000000000000000005fc8d32690cc91d4c39d9d3abcbd16989f87570700000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080a6719a2ea05fac172c1b20e16beea2a9739b715499a3a9ad488e6ce81602ffac000000000000000000000000000000000000000000000000000000000000002268747470733a2f2f6170692e6e696c61692e6e696c6c696f6e2e6e6574776f726b2f000000000000000000000000000000000000000000000000000000000000"; let data = alloy::hex::decode(raw_hex).unwrap(); - let htx = Erc8004Htx::try_decode(&data).expect("should decode ERC-8004 HTX"); - assert_eq!( - htx.validator_address, - "0x5fc8d32690cc91d4c39d9d3abcbd16989f875707" - .parse::
() - .unwrap() - ); - assert_eq!(htx.agent_id, U256::ZERO); - assert_eq!(htx.request_uri, "https://api.nilai.nillion.network/"); + let htx = Htx::try_parse(&data).unwrap(); + assert!(matches!(htx, Htx::Erc8004(_))); } } diff --git a/crates/blacklight-contract-clients/src/lib.rs b/crates/blacklight-contract-clients/src/lib.rs index 4595134..4708189 100644 --- a/crates/blacklight-contract-clients/src/lib.rs +++ b/crates/blacklight-contract-clients/src/lib.rs @@ -135,7 +135,7 @@ impl ContractConfig { #[cfg(test)] mod tests { use super::*; - use crate::htx::{ + use crate::htx::json::{ Builder, BuilderMeasurement, NilCcOperator, NillionHtx, NillionHtxV1, WorkloadId, WorkloadMeasurement, }; diff --git a/crates/contract-clients-common/src/errors.rs b/crates/contract-clients-common/src/errors.rs index 6c4de35..653f363 100644 --- a/crates/contract-clients-common/src/errors.rs +++ b/crates/contract-clients-common/src/errors.rs @@ -535,10 +535,10 @@ mod tests { /// - Bytes 68+: UTF-8 string data (padded to 32 bytes) #[test] fn test_decode_error_string() { - // "NilAV: unknown HTX" encoded as Error(string) + // "blacklight: unknown HTX" encoded as Error(string) // Selector: 08c379a0 // Offset: 0000...0020 (32 bytes) - // Length: 0000...0012 (18 bytes = "NilAV: unknown HTX".len()) + // Length: 0000...0012 (18 bytes = "blacklight: unknown HTX".len()) // Data: 4e696c41563a20756e6b6e6f776e20485458 + padding let data = hex::decode( "08c379a0\ @@ -552,7 +552,7 @@ mod tests { match decoded { DecodedRevert::ErrorString(msg) => { - assert_eq!(msg, "NilAV: unknown HTX"); + assert_eq!(msg, "blacklight: unknown HTX"); } _ => panic!("Expected ErrorString, got {:?}", decoded), } @@ -605,7 +605,7 @@ mod tests { let decoded = try_extract_from_string(error_msg); assert!(decoded.is_some()); if let Some(DecodedRevert::ErrorString(msg)) = decoded { - assert!(msg.contains("NilAV")); + assert!(msg.contains("blacklight")); } // Test with raw hex selector embedded in string diff --git a/crates/contract-clients-common/src/provider_context.rs b/crates/contract-clients-common/src/provider_context.rs index 3c8f854..4384573 100644 --- a/crates/contract-clients-common/src/provider_context.rs +++ b/crates/contract-clients-common/src/provider_context.rs @@ -100,7 +100,7 @@ impl ProviderContext { let tx = TransactionRequest { to: Some(TxKind::Call(to)), value: Some(amount), - max_priority_fee_per_gas: Some(0), + max_priority_fee_per_gas: Some(1), ..Default::default() }; diff --git a/crates/contract-clients-common/src/tx_submitter.rs b/crates/contract-clients-common/src/tx_submitter.rs index 206e9ab..dbba675 100644 --- a/crates/contract-clients-common/src/tx_submitter.rs +++ b/crates/contract-clients-common/src/tx_submitter.rs @@ -3,7 +3,7 @@ use alloy::{ consensus::Transaction, contract::CallBuilder, primitives::B256, providers::Provider, rpc::types::TransactionReceipt, sol_types::SolInterface, }; -use anyhow::{Result, anyhow}; +use anyhow::{Result, bail}; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; @@ -34,7 +34,7 @@ impl TransactionSubmitter { // Pre-simulate to catch reverts with proper error messages if let Err(e) = call.call().await { let e = self.decode_error(e); - return Err(anyhow!("{method} reverted: {e}")); + bail!("{method} reverted: {e}"); } let (call, gas_limit) = match self.gas_buffer { @@ -58,10 +58,13 @@ impl TransactionSubmitter { // Acquire lock and send let _guard = self.tx_lock.lock().await; - let pending = call.send().await.map_err(|e| { - let e = self.decode_error(e); - anyhow!("{method} failed to send: {e}") - })?; + let pending = match call.send().await { + Ok(pending) => pending, + Err(e) => { + let e = self.decode_error(e); + bail!("{method} failed to send: {e}"); + } + }; // Wait for receipt let receipt = pending.get_receipt().await?; @@ -82,13 +85,13 @@ impl TransactionSubmitter { if let Some(gas_limit) = gas_limit { let used = receipt.gas_used; if used >= gas_limit { - return Err(anyhow!( + bail!( "{method} ran out of gas (used {used} of {gas_limit} limit). Tx: {tx_hash:?}" - )); + ); } } - return Err(anyhow!("{method} reverted on-chain. Tx hash: {tx_hash:?}")); + bail!("{method} reverted on-chain. Tx hash: {tx_hash:?}"); } Ok(tx_hash) diff --git a/crates/erc-8004-contract-clients/src/validation_registry.rs b/crates/erc-8004-contract-clients/src/validation_registry.rs index 4a34787..86eb653 100644 --- a/crates/erc-8004-contract-clients/src/validation_registry.rs +++ b/crates/erc-8004-contract-clients/src/validation_registry.rs @@ -19,11 +19,40 @@ sol! { bytes32 requestHash, uint64 snapshotId ) external; + + function validationResponse( + bytes32 requestHash, + uint8 response, + string calldata responseURI, + bytes32 responseHash, + string calldata tag + ) external; + + event ValidationRequest( + address indexed validatorAddress, + uint256 indexed agentId, + string requestURI, + bytes32 indexed requestHash + ); + + event ValidationResponse( + address indexed validatorAddress, + uint256 indexed agentId, + bytes32 indexed requestHash, + uint8 response, + string responseURI, + bytes32 responseHash, + string tag + ); } } use ValidationRegistryUpgradeable::ValidationRegistryUpgradeableInstance; +// Event type re-exports +pub type ValidationRequestEvent = ValidationRegistryUpgradeable::ValidationRequest; +pub type ValidationResponseEvent = ValidationRegistryUpgradeable::ValidationResponse; + /// Client for interacting with the ValidationRegistryUpgradeable contract. #[derive(Clone)] pub struct ValidationRegistryClient { @@ -83,4 +112,30 @@ impl ValidationRegistryClient

{ ); self.submitter.invoke("validationRequest", call).await } + + /// Submit a validation response. + /// + /// # Arguments + /// * `request_hash` - The request hash identifying the validation request + /// * `response` - Response value 0-100 (0=invalid, 100=valid) + /// * `response_uri` - Optional URI pointing to response details + /// * `response_hash` - Hash of the response data (can be zero) + /// * `tag` - Tag identifying the response source (e.g., "heartbeat") + pub async fn validation_response( + &self, + request_hash: B256, + response: u8, + response_uri: String, + response_hash: B256, + tag: String, + ) -> Result { + let call = self.contract.validationResponse( + request_hash, + response, + response_uri, + response_hash, + tag, + ); + self.submitter.invoke("validationResponse", call).await + } } diff --git a/docker-compose.yml b/docker-compose.yml index 43db1b3..5182681 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ services: # Anvil - Local Ethereum testnet anvil: - image: nilanvil:latest + image: ghcr.io/nillionnetwork/blacklight-contracts/anvil:sha-dfb9847 container_name: blacklight-anvil ports: - "8545:8545" @@ -32,9 +32,14 @@ services: - L2_RPC_URL=http://anvil:8545 - L1_RPC_URL=http://anvil:8545 - PRIVATE_KEY=0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6 + - L2_STAKING_OPERATORS_ADDRESS=0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512 - L2_HEARTBEAT_MANAGER_ADDRESS=0x5FC8d32690cc91D4c39d9d3abcBD16989F875707 - - L2_JAILING_POLICY_ADDRESS=0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6 - - L1_EMISSIONS_CONTROLLER_ADDRESS=0x0165878A594ca255338adfa4d48449f69242Eb8F + - L2_JAILING_POLICY_ADDRESS=0x0000000000000000000000000000000000000000 + - L1_EMISSIONS_CONTROLLER_ADDRESS=0x0000000000000000000000000000000000000000 + # ERC-8004 Keeper configuration + - ENABLE_ERC8004_KEEPER=true + - L2_VALIDATION_REGISTRY_ADDRESS=0x3Aa5ebB10DC797CAC828524e59A333d0A371443c + - RUST_LOG=info # NilCC Simulator - Submits HTXs to the contract simulator: @@ -97,7 +102,7 @@ services: - PRIVATE_KEY=0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a - IDENTITY_REGISTRY_CONTRACT_ADDRESS=0x959922bE3CAee4b8Cd9a407cc3ac1C251C2007B1 - VALIDATION_REGISTRY_CONTRACT_ADDRESS=0x3Aa5ebB10DC797CAC828524e59A333d0A371443c - - HEARTBEAT_MANAGER_ADDRESS=0x5FC8d32690cc91D4c39d9d3abcBD16989F875707 + - VALIDATOR_ADDRESS=0x90F79bf6EB2c4f870365E785982E1f101E93b906 - AGENT_URI=https://api.nilai.nillion.network/v1/health/ - RUST_LOG=info networks: diff --git a/keeper/Cargo.toml b/keeper/Cargo.toml index 4dd1f59..21ee092 100644 --- a/keeper/Cargo.toml +++ b/keeper/Cargo.toml @@ -17,3 +17,4 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } blacklight-contract-clients = { path = "../crates/blacklight-contract-clients" } contract-clients-common = { path = "../crates/contract-clients-common" } +erc-8004-contract-clients = { path = "../crates/erc-8004-contract-clients" } diff --git a/keeper/src/args.rs b/keeper/src/args.rs index 9aec607..5e65540 100644 --- a/keeper/src/args.rs +++ b/keeper/src/args.rs @@ -34,6 +34,14 @@ pub struct CliArgs { #[arg(long, env = "DISABLE_JAILING")] pub disable_jailing: bool, + /// L2 ValidationRegistry contract address for ERC-8004 validation responses + #[arg(long, env = "L2_VALIDATION_REGISTRY_ADDRESS")] + pub l2_validation_registry_address: Option

, + + /// Enable ERC-8004 keeper functionality + #[arg(long, env = "ENABLE_ERC8004_KEEPER", default_value_t = false)] + pub enable_erc8004: bool, + /// L1 EmissionsController contract address #[arg(long, env = "L1_EMISSIONS_CONTROLLER_ADDRESS")] pub l1_emissions_controller_address: Address, @@ -82,6 +90,7 @@ pub struct KeeperConfig { pub l1_rpc_url: String, pub l2_heartbeat_manager_address: Address, pub l2_jailing_policy_address: Option
, + pub l2_validation_registry_address: Option
, pub l1_emissions_controller_address: Address, pub l2_staking_operators_address: Address, pub private_key: String, @@ -90,6 +99,7 @@ pub struct KeeperConfig { pub tick_interval: Duration, pub emissions_interval: Duration, pub disable_jailing: bool, + pub enable_erc8004: bool, pub otel: Option, } @@ -104,12 +114,18 @@ impl KeeperConfig { let l2_staking_operators_address = args.l2_staking_operators_address; let l2_jailing_policy_address = args.l2_jailing_policy_address; let disable_jailing = args.disable_jailing; + let enable_erc8004 = args.enable_erc8004; let private_key = args.private_key; let l2_jailing_policy_address = if disable_jailing { None } else { l2_jailing_policy_address }; + let l2_validation_registry_address = if enable_erc8004 { + args.l2_validation_registry_address + } else { + None + }; let l1_bridge_value = args.l1_bridge_value_wei; let lookback_blocks = args.lookback_blocks; let tick_interval = Duration::from_secs(args.tick_interval_secs); @@ -139,6 +155,7 @@ impl KeeperConfig { l1_rpc_url, l2_heartbeat_manager_address, l2_jailing_policy_address, + l2_validation_registry_address, l1_emissions_controller_address, l2_staking_operators_address, private_key, @@ -147,6 +164,7 @@ impl KeeperConfig { tick_interval, emissions_interval, disable_jailing, + enable_erc8004, otel, }) } diff --git a/keeper/src/clients.rs b/keeper/src/clients.rs index 2c4db89..b948832 100644 --- a/keeper/src/clients.rs +++ b/keeper/src/clients.rs @@ -1,11 +1,14 @@ use crate::contracts::{EmissionsController, Erc20, JailingPolicy, RewardPolicy}; use alloy::{ - network::{Ethereum, EthereumWallet, NetworkWallet}, primitives::{Address, U256}, - providers::{DynProvider, Provider, ProviderBuilder, WsConnect}, - signers::local::PrivateKeySigner, + providers::DynProvider, }; use blacklight_contract_clients::{HeartbeatManager, StakingOperators}; +use contract_clients_common::ProviderContext; +use erc_8004_contract_clients::ValidationRegistryClient; +use erc_8004_contract_clients::validation_registry::ValidationRegistryUpgradeable; +use std::sync::Arc; +use tokio::sync::Mutex; pub type HeartbeatManagerInstance = HeartbeatManager::HeartbeatManagerInstance; pub type StakingOperatorsInstance = StakingOperators::StakingOperatorsInstance; @@ -14,36 +17,16 @@ pub type EmissionsControllerInstance = EmissionsController::EmissionsControllerInstance; pub type RewardPolicyInstance = RewardPolicy::RewardPolicyInstance; pub type ERC20Instance = Erc20::Erc20Instance; - -async fn connect_ws( - rpc_url: &str, - private_key: &str, -) -> anyhow::Result<(DynProvider, EthereumWallet)> { - let ws_url = rpc_url - .replace("http://", "ws://") - .replace("https://", "wss://"); - let ws = WsConnect::new(ws_url).with_max_retries(u32::MAX); - let signer: PrivateKeySigner = private_key.parse::()?; - let wallet = EthereumWallet::from(signer); - - let provider: DynProvider = ProviderBuilder::new() - .wallet(wallet.clone()) - .with_simple_nonce_management() - .with_gas_estimation() - .connect_ws(ws) - .await? - .erased(); - - Ok((provider, wallet)) -} +pub type ValidationRegistryInstance = + ValidationRegistryUpgradeable::ValidationRegistryUpgradeableInstance; /// WebSocket-based client for L2 keeper duties (heartbeat rounds + jailing) pub struct L2KeeperClient { + ctx: ProviderContext, heartbeat_manager: HeartbeatManagerInstance, staking_operators: StakingOperatorsInstance, jailing_policy: Option, - provider: DynProvider, - wallet: EthereumWallet, + validation_registry: Option>, } impl L2KeeperClient { @@ -52,22 +35,28 @@ impl L2KeeperClient { heartbeat_manager_address: Address, staking_operators_address: Address, jailing_policy_address: Option
, + validation_registry_address: Option
, private_key: String, ) -> anyhow::Result { - let (provider, wallet) = connect_ws(&rpc_url, &private_key).await?; + let ctx = ProviderContext::with_ws_retries(&rpc_url, &private_key, Some(u32::MAX)).await?; + let provider = ctx.provider().clone(); + let tx_lock = ctx.tx_lock(); + let heartbeat_manager = HeartbeatManagerInstance::new(heartbeat_manager_address, provider.clone()); let staking_operators = StakingOperatorsInstance::new(staking_operators_address, provider.clone()); let jailing_policy = jailing_policy_address.map(|addr| JailingPolicyInstance::new(addr, provider.clone())); + let validation_registry = validation_registry_address + .map(|addr| ValidationRegistryClient::new(provider.clone(), addr, tx_lock)); Ok(Self { + ctx, heartbeat_manager, staking_operators, jailing_policy, - provider, - wallet, + validation_registry, }) } @@ -83,32 +72,40 @@ impl L2KeeperClient { self.jailing_policy.as_ref() } + pub fn validation_registry(&self) -> Option<&ValidationRegistryClient> { + self.validation_registry.as_ref() + } + pub fn reward_policy(&self, address: Address) -> RewardPolicyInstance { - RewardPolicyInstance::new(address, self.provider.clone()) + RewardPolicyInstance::new(address, self.ctx.provider().clone()) } pub fn erc20(&self, address: Address) -> ERC20Instance { - ERC20Instance::new(address, self.provider.clone()) + ERC20Instance::new(address, self.ctx.provider().clone()) } pub fn provider(&self) -> DynProvider { - self.provider.clone() + self.ctx.provider().clone() } pub fn signer_address(&self) -> Address { - >::default_signer_address(&self.wallet) + self.ctx.signer_address() + } + + /// Shared transaction lock for nonce coordination across all contract clients. + pub fn tx_lock(&self) -> Arc> { + self.ctx.tx_lock() } pub async fn get_balance(&self) -> anyhow::Result { - Ok(self.provider.get_balance(self.signer_address()).await?) + self.ctx.get_balance().await } } /// WebSocket-based client for L1 emissions minting/bridging pub struct L1EmissionsClient { + ctx: ProviderContext, emissions: EmissionsControllerInstance, - provider: DynProvider, - wallet: EthereumWallet, } impl L1EmissionsClient { @@ -117,13 +114,9 @@ impl L1EmissionsClient { emissions_address: Address, private_key: String, ) -> anyhow::Result { - let (provider, wallet) = connect_ws(&rpc_url, &private_key).await?; - let emissions = EmissionsControllerInstance::new(emissions_address, provider.clone()); - Ok(Self { - emissions, - provider, - wallet, - }) + let ctx = ProviderContext::new(&rpc_url, &private_key).await?; + let emissions = EmissionsControllerInstance::new(emissions_address, ctx.provider().clone()); + Ok(Self { ctx, emissions }) } pub fn emissions(&self) -> &EmissionsControllerInstance { @@ -131,14 +124,14 @@ impl L1EmissionsClient { } pub fn provider(&self) -> DynProvider { - self.provider.clone() + self.ctx.provider().clone() } pub fn signer_address(&self) -> Address { - >::default_signer_address(&self.wallet) + self.ctx.signer_address() } pub async fn get_balance(&self) -> anyhow::Result { - Ok(self.provider.get_balance(self.signer_address()).await?) + self.ctx.get_balance().await } } diff --git a/keeper/src/erc8004/events.rs b/keeper/src/erc8004/events.rs new file mode 100644 index 0000000..93ae189 --- /dev/null +++ b/keeper/src/erc8004/events.rs @@ -0,0 +1,277 @@ +use crate::{erc8004::ValidationRequestInfo, l2::KeeperState, metrics}; +use alloy::{ + primitives::B256, + providers::Provider, + rpc::types::Log, + sol_types::{SolEvent, SolValue}, +}; +use anyhow::Context; +use erc_8004_contract_clients::validation_registry::{ + ValidationRegistryUpgradeable::ValidationRegistryUpgradeableInstance, ValidationRequestEvent, +}; +use futures_util::{Stream, StreamExt}; +use std::{pin::pin, sync::Arc}; +use tokio::sync::Mutex; +use tracing::{debug, error, info, warn}; + +use super::Erc8004State; + +pub type ValidationRegistryInstance

= ValidationRegistryUpgradeableInstance

; + +/// Event listener for ERC-8004 ValidationRequest events. +pub struct Erc8004EventListener { + registry: ValidationRegistryInstance

, +} + +impl Erc8004EventListener

{ + pub fn new(registry: ValidationRegistryInstance

) -> Self { + Self { registry } + } + + /// Process historical ValidationRequest events and populate state. + pub async fn process_historical_events( + &self, + from_block: u64, + to_block: u64, + state: &mut Erc8004State, + ) -> anyhow::Result<()> { + let events = self + .query_events::(from_block, to_block) + .await?; + + for (event, _log) in events { + let heartbeat_key = compute_heartbeat_key( + event.validatorAddress, + event.agentId, + &event.requestURI, + event.requestHash, + ); + let info = ValidationRequestInfo::new( + event.validatorAddress, + event.agentId, + event.requestURI, + event.requestHash, + ); + state.pending_validations.insert(heartbeat_key, info); + } + + info!( + from_block, + to_block, + pending_validations = state.pending_validations.len(), + "Loaded historical ERC-8004 validation requests" + ); + + Ok(()) + } + + /// Spawn background task to listen for new ValidationRequest events. + pub async fn spawn( + self, + from_block: u64, + state: Arc>, + ) -> anyhow::Result<()> { + let validation_request = self.subscribe::(from_block).await?; + tokio::spawn(Self::process_validation_requests(validation_request, state)); + Ok(()) + } + + async fn query_events( + &self, + from_block: u64, + to_block: u64, + ) -> anyhow::Result> { + let events = self + .registry + .event_filter::() + .from_block(from_block) + .to_block(to_block) + .query() + .await?; + Ok(events) + } + + async fn subscribe( + &self, + from_block: u64, + ) -> anyhow::Result + 'static> { + let event_name = E::SIGNATURE + .split_once('(') + .map(|(name, _)| name) + .unwrap_or(E::SIGNATURE); + let stream = self + .registry + .event_filter::() + .from_block(from_block) + .subscribe() + .await + .context("Failed to subscribe to ERC-8004 events")? + .into_stream() + .filter_map(async move |e| match e { + Ok((event, _)) => { + metrics::get().l2.events.inc_events_received(event_name); + Some(event) + } + Err(e) => { + error!("Failed to receive {} event: {e}", E::SIGNATURE); + None + } + }); + Ok(stream) + } + + async fn process_validation_requests( + events: impl Stream, + state: Arc>, + ) { + let mut events = pin!(events); + while let Some(event) = events.next().await { + let heartbeat_key = compute_heartbeat_key( + event.validatorAddress, + event.agentId, + &event.requestURI, + event.requestHash, + ); + + let info = ValidationRequestInfo::new( + event.validatorAddress, + event.agentId, + event.requestURI.clone(), + event.requestHash, + ); + + let mut guard = state.lock().await; + guard + .erc8004 + .pending_validations + .insert(heartbeat_key, info); + metrics::get() + .l2 + .erc8004 + .set_requests_tracked(guard.erc8004.pending_validations.len() as u64); + + info!( + heartbeat_key = ?heartbeat_key, + validator = ?event.validatorAddress, + agent_id = ?event.agentId, + request_hash = ?event.requestHash, + "ERC-8004 validation request tracked" + ); + } + } +} + +/// Compute the heartbeat key from validation request parameters. +/// +/// This matches the Solidity encoding: `keccak256(abi.encode(validatorAddress, agentId, requestURI, requestHash))` +pub fn compute_heartbeat_key( + validator_address: alloy::primitives::Address, + agent_id: alloy::primitives::U256, + request_uri: &str, + request_hash: B256, +) -> B256 { + let tuple = ( + validator_address, + agent_id, + request_uri.to_string(), + request_hash, + ); + let encoded = tuple.abi_encode(); + alloy::primitives::keccak256(&encoded) +} + +/// Update ERC-8004 state when a round finalizes. +/// +/// Called from the L2 event processor when RoundFinalized events are received. +pub fn on_round_finalized(state: &mut Erc8004State, heartbeat_key: B256, outcome: u8) { + if let Some(info) = state.pending_validations.get_mut(&heartbeat_key) { + let response = match outcome { + 0 => 50, // inconclusive + 1 => 100, // valid + 2 => 0, // invalid + other => { + warn!( + heartbeat_key = %heartbeat_key, + outcome = other, + "Unexpected HeartbeatManager outcome; defaulting ERC-8004 response to 0" + ); + 0 + } + }; + info.outcome = Some(response); + info!( + heartbeat_key = %heartbeat_key, + outcome, + response, + request_hash = %info.request_hash, + "ERC-8004 validation round finalized" + ); + } else { + debug!( + heartbeat_key = %heartbeat_key, + "RoundFinalized for unknown heartbeat_key (not an ERC-8004 validation)" + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::{Address, U256}; + + #[test] + fn test_heartbeat_key_computation_matches_htx() { + // Test data from htx.rs test: + // abi.encode(0x5fc8d32690cc91d4c39d9d3abcbd16989f875707, 0, "https://api.nilai.nillion.network/", 0xa6719a2ea05fac172c1b20e16beea2a9739b715499a3a9ad488e6ce81602ffac) + let validator_address: Address = "0x5fc8d32690cc91d4c39d9d3abcbd16989f875707" + .parse() + .unwrap(); + let agent_id = U256::ZERO; + let request_uri = "https://api.nilai.nillion.network/"; + let request_hash: B256 = + "0xa6719a2ea05fac172c1b20e16beea2a9739b715499a3a9ad488e6ce81602ffac" + .parse() + .unwrap(); + + let heartbeat_key = + compute_heartbeat_key(validator_address, agent_id, request_uri, request_hash); + + // The heartbeat key should be the keccak256 of the ABI-encoded tuple + // This should match what the Solidity contract computes + assert!(!heartbeat_key.is_zero()); + + // Verify consistency - same inputs should produce same output + let heartbeat_key2 = + compute_heartbeat_key(validator_address, agent_id, request_uri, request_hash); + assert_eq!(heartbeat_key, heartbeat_key2); + } + + #[test] + fn test_heartbeat_key_different_for_different_inputs() { + let validator_address: Address = "0x5fc8d32690cc91d4c39d9d3abcbd16989f875707" + .parse() + .unwrap(); + let agent_id = U256::ZERO; + let request_uri = "https://api.nilai.nillion.network/"; + let request_hash: B256 = + "0xa6719a2ea05fac172c1b20e16beea2a9739b715499a3a9ad488e6ce81602ffac" + .parse() + .unwrap(); + + let key1 = compute_heartbeat_key(validator_address, agent_id, request_uri, request_hash); + + // Different agent_id should produce different key + let key2 = + compute_heartbeat_key(validator_address, U256::from(1), request_uri, request_hash); + assert_ne!(key1, key2); + + // Different request_uri should produce different key + let key3 = compute_heartbeat_key( + validator_address, + agent_id, + "https://different.uri/", + request_hash, + ); + assert_ne!(key1, key3); + } +} diff --git a/keeper/src/erc8004/mod.rs b/keeper/src/erc8004/mod.rs new file mode 100644 index 0000000..de84fdd --- /dev/null +++ b/keeper/src/erc8004/mod.rs @@ -0,0 +1,51 @@ +use alloy::primitives::{Address, B256, U256}; +use std::collections::HashMap; + +pub mod events; +pub mod responder; + +/// State tracking for ERC-8004 validations. +/// +/// Tracks validation requests by their heartbeat key and stores the outcome +/// when rounds finalize, enabling the keeper to submit validation responses. +#[derive(Default)] +pub struct Erc8004State { + /// Maps heartbeat_key -> ValidationRequestInfo + pub pending_validations: HashMap, +} + +/// Information about a pending validation request. +#[derive(Debug, Clone)] +#[allow(dead_code)] // Fields kept for debugging and potential future use +pub struct ValidationRequestInfo { + /// The validator address that submitted the request + pub validator_address: Address, + /// The agent ID being validated + pub agent_id: U256, + /// The request URI + pub request_uri: String, + /// The request hash (unique identifier for the validation) + pub request_hash: B256, + /// The ERC-8004 validation response value (0-100), mapped from HeartbeatManager outcome. + pub outcome: Option, + /// Whether the validation response has been submitted + pub response_submitted: bool, +} + +impl ValidationRequestInfo { + pub fn new( + validator_address: Address, + agent_id: U256, + request_uri: String, + request_hash: B256, + ) -> Self { + Self { + validator_address, + agent_id, + request_uri, + request_hash, + outcome: None, + response_submitted: false, + } + } +} diff --git a/keeper/src/erc8004/responder.rs b/keeper/src/erc8004/responder.rs new file mode 100644 index 0000000..03dbd56 --- /dev/null +++ b/keeper/src/erc8004/responder.rs @@ -0,0 +1,100 @@ +use crate::{l2::KeeperState, metrics}; +use alloy::hex; +use alloy::primitives::B256; +use alloy::providers::Provider; +use erc_8004_contract_clients::ValidationRegistryClient; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::{debug, error, info}; + +/// Submits validation responses for finalized ERC-8004 validation rounds. +pub struct ValidationResponder { + registry: ValidationRegistryClient

, + state: Arc>, +} + +impl ValidationResponder

{ + pub fn new(registry: ValidationRegistryClient

, state: Arc>) -> Self { + Self { registry, state } + } + + /// Process pending validation responses. + /// + /// Called from the tick loop to submit validation responses for any + /// validations that have received an outcome but haven't been responded to yet. + pub async fn process_responses(&self) -> anyhow::Result<()> { + // Collect jobs to process outside the lock + let jobs: Vec<_> = { + let state = self.state.lock().await; + state + .erc8004 + .pending_validations + .iter() + .filter(|(_, info)| info.outcome.is_some() && !info.response_submitted) + .map(|(key, info)| (*key, info.request_hash, info.outcome.unwrap())) + .collect() + }; + + if !jobs.is_empty() { + info!( + ready_count = jobs.len(), + "ERC-8004 validations ready for response submission" + ); + } + + if jobs.is_empty() { + debug!( + "No ERC-8004 validations ready for response (waiting for outcomes or already submitted)" + ); + return Ok(()); + } + + for (heartbeat_key, request_hash, outcome) in jobs { + info!( + request_hash = %hex::encode(request_hash), + outcome, + "Submitting ERC-8004 validation response" + ); + match self.submit_response(request_hash, outcome).await { + Ok(tx_hash) => { + // Mark as submitted + let mut state = self.state.lock().await; + if let Some(info) = state.erc8004.pending_validations.get_mut(&heartbeat_key) { + info.response_submitted = true; + } + metrics::get().l2.erc8004.inc_responses_submitted(); + info!( + request_hash = ?request_hash, + outcome, + tx_hash = ?tx_hash, + "Submitted ERC-8004 validation response" + ); + } + Err(e) => { + error!( + request_hash = ?request_hash, + outcome, + "Failed to submit ERC-8004 validation response: {e}" + ); + } + } + } + + Ok(()) + } + + async fn submit_response(&self, request_hash: B256, outcome: u8) -> anyhow::Result { + let tx_hash = self + .registry + .validation_response( + request_hash, + outcome, + String::new(), // responseURI - empty + B256::ZERO, // responseHash - zero + "heartbeat".to_string(), // tag + ) + .await?; + + Ok(tx_hash) + } +} diff --git a/keeper/src/l2/escalator.rs b/keeper/src/l2/escalator.rs index d7dbb52..1f44fb9 100644 --- a/keeper/src/l2/escalator.rs +++ b/keeper/src/l2/escalator.rs @@ -3,7 +3,7 @@ use alloy::primitives::{B256, Bytes}; use blacklight_contract_clients::heartbeat_manager::HeartbeatManagerErrors; use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use contract_clients_common::errors::decode_any_error; use contract_clients_common::tx_submitter::TransactionSubmitter; @@ -16,10 +16,11 @@ pub(crate) struct RoundEscalator { impl RoundEscalator { pub(crate) fn new(client: Arc, state: Arc>) -> Self { + let submitter = TransactionSubmitter::new(client.tx_lock()); Self { client, state, - submitter: TransactionSubmitter::new(Default::default()), + submitter, } } @@ -72,6 +73,14 @@ impl RoundEscalator { for (heartbeat_key, round, deadline, raw_htx) in candidates { if block_timestamp <= deadline { + debug!( + heartbeat_key = ?heartbeat_key, + round, + deadline, + block_timestamp, + remaining_secs = deadline - block_timestamp, + "Round not yet past deadline, skipping" + ); continue; } diff --git a/keeper/src/l2/events.rs b/keeper/src/l2/events.rs index 75e74ff..3a29f8c 100644 --- a/keeper/src/l2/events.rs +++ b/keeper/src/l2/events.rs @@ -1,5 +1,6 @@ use crate::{ clients::HeartbeatManagerInstance, + erc8004::{ValidationRequestInfo, events::on_round_finalized}, l2::{KeeperState, RoundKey}, metrics, }; @@ -11,6 +12,7 @@ use blacklight_contract_clients::{ HeartbeatEnqueuedEvent, RewardDistributionAbandonedEvent, RewardsDistributedEvent, RoundFinalizedEvent, RoundStartedEvent, SlashingCallbackFailedEvent, }, + htx::Erc8004Htx, }; use futures_util::{Stream, StreamExt}; use std::{pin::pin, sync::Arc}; @@ -73,6 +75,7 @@ impl EventListener { }; let entry = state.rounds.entry(key).or_default(); entry.outcome = Some(event.outcome); + on_round_finalized(&mut state.erc8004, event.heartbeatKey, event.outcome); } for (event, _log) in rewards_done { let key = RoundKey { @@ -220,6 +223,39 @@ impl EventListener { members = entry.members.len(), "Round started" ); + + // Detect ERC-8004 HTXs and track them with HeartbeatManager's heartbeat_key + if let Ok(erc8004_htx) = Erc8004Htx::try_decode(&event.rawHTX) { + // Only add if not already tracked (first round) + if event.round == 1 + && !guard + .erc8004 + .pending_validations + .contains_key(&event.heartbeatKey) + { + let info = ValidationRequestInfo::new( + erc8004_htx.validator_address, + erc8004_htx.agent_id, + erc8004_htx.request_uri.clone(), + erc8004_htx.request_hash, + ); + guard + .erc8004 + .pending_validations + .insert(event.heartbeatKey, info); + metrics::get() + .l2 + .erc8004 + .set_requests_tracked(guard.erc8004.pending_validations.len() as u64); + info!( + heartbeat_key = %event.heartbeatKey, + validator = %erc8004_htx.validator_address, + agent_id = %erc8004_htx.agent_id, + request_hash = %erc8004_htx.request_hash, + "ERC-8004 validation tracked from RoundStarted" + ); + } + } } } @@ -242,6 +278,9 @@ impl EventListener { outcome = event.outcome, "Round finalized" ); + + // Notify ERC-8004 state about the round finalization + on_round_finalized(&mut guard.erc8004, event.heartbeatKey, event.outcome); } } diff --git a/keeper/src/l2/mod.rs b/keeper/src/l2/mod.rs index 6e81ce7..24a46b3 100644 --- a/keeper/src/l2/mod.rs +++ b/keeper/src/l2/mod.rs @@ -1,3 +1,4 @@ +use crate::erc8004::Erc8004State; use alloy::primitives::{Address, B256, Bytes}; use std::collections::HashMap; @@ -29,4 +30,5 @@ struct RoundState { pub struct KeeperState { raw_htx_by_heartbeat: HashMap, rounds: HashMap, + pub erc8004: Erc8004State, } diff --git a/keeper/src/l2/rewards.rs b/keeper/src/l2/rewards.rs index 9bcee33..fb7b89d 100644 --- a/keeper/src/l2/rewards.rs +++ b/keeper/src/l2/rewards.rs @@ -46,11 +46,12 @@ pub(crate) struct RewardsDistributor { impl RewardsDistributor { pub(crate) fn new(client: Arc, state: Arc>) -> Self { + let submitter = TransactionSubmitter::new(client.tx_lock()).with_gas_buffer(); Self { client, state, rewards_context: Default::default(), - submitter: TransactionSubmitter::new(Default::default()).with_gas_buffer(), + submitter, } } diff --git a/keeper/src/l2/supervisor.rs b/keeper/src/l2/supervisor.rs index 1b6930a..32cd487 100644 --- a/keeper/src/l2/supervisor.rs +++ b/keeper/src/l2/supervisor.rs @@ -1,6 +1,7 @@ use crate::{ args::KeeperConfig, - clients::L2KeeperClient, + clients::{L2KeeperClient, ValidationRegistryInstance}, + erc8004::{events::Erc8004EventListener, responder::ValidationResponder}, l2::{ KeeperState, escalator::RoundEscalator, events::EventListener, jailing::Jailer, rewards::RewardsDistributor, @@ -11,7 +12,7 @@ use alloy::{eips::BlockId, providers::Provider}; use anyhow::Context; use std::sync::Arc; use tokio::{sync::Mutex, time::interval}; -use tracing::error; +use tracing::{debug, error, info}; pub struct L2Supervisor { client: Arc, @@ -19,22 +20,27 @@ pub struct L2Supervisor { jailer: Jailer, rewards_distributor: RewardsDistributor, round_escalator: RoundEscalator, + erc8004_enabled: bool, } impl L2Supervisor { pub async fn new( client: Arc, state: Arc>, + config: &KeeperConfig, ) -> anyhow::Result { let jailer = Jailer::new(client.clone(), state.clone()); let rewards_distributor = RewardsDistributor::new(client.clone(), state.clone()); let round_escalator = RoundEscalator::new(client.clone(), state.clone()); + let erc8004_enabled = + config.enable_erc8004 && config.l2_validation_registry_address.is_some(); Ok(Self { client, state, jailer, rewards_distributor, round_escalator, + erc8004_enabled, }) } @@ -47,6 +53,23 @@ impl L2Supervisor { .context("Failed to find latest block")?; let from_block = latest_block.saturating_sub(config.lookback_blocks); + // Process historic ERC-8004 requests first so round outcomes can be reconciled. + if self.erc8004_enabled + && let Some(registry) = self.client.validation_registry() + { + let raw_registry = + ValidationRegistryInstance::new(registry.address(), self.client.provider()); + let erc8004_listener = Erc8004EventListener::new(raw_registry); + erc8004_listener + .process_historical_events( + from_block, + latest_block, + &mut self.state.lock().await.erc8004, + ) + .await + .context("Failed to process historical ERC-8004 events")?; + } + // Process historic events from current block - lookback until now let event_listener = EventListener::new(self.client.heartbeat_manager().clone()); event_listener @@ -58,13 +81,42 @@ impl L2Supervisor { event_listener .spawn(latest_block.saturating_add(1), self.state.clone()) .await - .context("Failed tp spawn event listener")?; + .context("Failed to spawn event listener")?; + + // Spawn ERC-8004 event listener if enabled + if self.erc8004_enabled + && let Some(registry) = self.client.validation_registry() + { + let raw_registry = + ValidationRegistryInstance::new(registry.address(), self.client.provider()); + let erc8004_listener = Erc8004EventListener::new(raw_registry); + erc8004_listener + .spawn(latest_block.saturating_add(1), self.state.clone()) + .await + .context("Failed to spawn ERC-8004 event listener")?; + + info!("ERC-8004 event listener spawned"); + } tokio::spawn(self.run(config)); Ok(()) } async fn run(mut self, config: KeeperConfig) { + // Create ERC-8004 responder if enabled (uses shared tx_lock via ValidationRegistryClient) + let erc8004_responder = if self.erc8004_enabled { + self.client + .validation_registry() + .map(|client| ValidationResponder::new(client.clone(), self.state.clone())) + } else { + None + }; + info!( + erc8004_responder_created = erc8004_responder.is_some(), + tick_interval_ms = config.tick_interval.as_millis() as u64, + "L2 supervisor run loop starting" + ); + let mut ticker = interval(config.tick_interval); loop { ticker.tick().await; @@ -98,6 +150,14 @@ impl L2Supervisor { self.process_rounds(block_timestamp).await; + // Process ERC-8004 validation responses + if let Some(ref responder) = erc8004_responder { + debug!("Tick: processing ERC-8004 validation responses"); + if let Err(e) = responder.process_responses().await { + error!("Failed to process ERC-8004 validation responses: {e}"); + } + } + match self .client .provider() diff --git a/keeper/src/main.rs b/keeper/src/main.rs index c9fa6b1..7b24ab0 100644 --- a/keeper/src/main.rs +++ b/keeper/src/main.rs @@ -23,6 +23,7 @@ use crate::l2::L2Supervisor; mod args; mod clients; mod contracts; +mod erc8004; mod l1; mod l2; mod metrics; @@ -115,6 +116,16 @@ async fn main() -> Result<()> { ); } + if config.enable_erc8004 { + if let Some(addr) = config.l2_validation_registry_address { + info!(validation_registry = ?addr, "ERC-8004 keeper enabled"); + } else { + info!("ERC-8004 keeper enabled but no ValidationRegistry address configured"); + } + } else { + info!("ERC-8004 keeper disabled"); + } + info!("Keeper initialized"); let l2_client = Arc::new( @@ -123,6 +134,7 @@ async fn main() -> Result<()> { config.l2_heartbeat_manager_address, config.l2_staking_operators_address, config.l2_jailing_policy_address, + config.l2_validation_registry_address, config.private_key.clone(), ) .await?, @@ -164,7 +176,7 @@ async fn main() -> Result<()> { let state = Arc::new(Mutex::new(Default::default())); let l1 = EmissionsSupervisor::new(config.clone(), l2_client.clone()).await?; - let l2 = L2Supervisor::new(l2_client, state.clone()).await?; + let l2 = L2Supervisor::new(l2_client, state.clone(), &config).await?; l2.spawn(config).await?; l1.spawn(); diff --git a/keeper/src/metrics.rs b/keeper/src/metrics.rs index 2633dd4..b94b5dd 100644 --- a/keeper/src/metrics.rs +++ b/keeper/src/metrics.rs @@ -119,6 +119,7 @@ pub(crate) struct L2Metrics { pub(crate) rewards: L2RewardsMetrics, pub(crate) escalations: L2EscalationsMetrics, pub(crate) eth: L2EthMetrics, + pub(crate) erc8004: L2Erc8004Metrics, } impl L2Metrics { @@ -127,11 +128,13 @@ impl L2Metrics { let rewards = L2RewardsMetrics::new(meter); let escalations = L2EscalationsMetrics::new(meter); let eth = L2EthMetrics::new(meter); + let erc8004 = L2Erc8004Metrics::new(meter); Self { events, rewards, escalations, eth, + erc8004, } } } @@ -249,3 +252,33 @@ impl L2EthMetrics { self.funds.record(amount.into(), &[]); } } + +pub(crate) struct L2Erc8004Metrics { + requests_tracked: Gauge, + responses_submitted: Counter, +} + +impl L2Erc8004Metrics { + fn new(meter: &Meter) -> Self { + let requests_tracked = meter + .u64_gauge("blacklight.keeper.l2.erc8004.requests_tracked") + .with_description("Number of ERC-8004 validation requests currently tracked") + .build(); + let responses_submitted = meter + .u64_counter("blacklight.keeper.l2.erc8004.responses_submitted") + .with_description("Total ERC-8004 validation responses submitted") + .build(); + Self { + requests_tracked, + responses_submitted, + } + } + + pub(crate) fn set_requests_tracked(&self, count: u64) { + self.requests_tracked.record(count, &[]); + } + + pub(crate) fn inc_responses_submitted(&self) { + self.responses_submitted.add(1, &[]); + } +} diff --git a/simulator/src/common.rs b/simulator/src/common.rs index 547c391..33a47c3 100644 --- a/simulator/src/common.rs +++ b/simulator/src/common.rs @@ -8,10 +8,10 @@ use tracing::error; /// Default slot interval in milliseconds. #[cfg(debug_assertions)] -pub const DEFAULT_SLOT_MS: u64 = 3000; +pub const DEFAULT_SLOT_MS: u64 = 15000; #[cfg(not(debug_assertions))] -pub const DEFAULT_SLOT_MS: u64 = 5000; +pub const DEFAULT_SLOT_MS: u64 = 15000; pub const MAX_RETRIES: u32 = 3; pub const RETRY_DELAY_MS: u64 = 500; diff --git a/simulator/src/erc8004.rs b/simulator/src/erc8004.rs index f0f22a9..86ffb81 100644 --- a/simulator/src/erc8004.rs +++ b/simulator/src/erc8004.rs @@ -33,9 +33,10 @@ pub struct Erc8004Args { #[arg(long, env = "AGENT_URI")] pub agent_uri: Option, - /// HeartbeatManager contract address to submit validation requests to - #[arg(long, env = "HEARTBEAT_MANAGER_ADDRESS")] - pub heartbeat_manager_address: Option, + /// Validator address that will be authorized to submit validation responses. + /// This should be the address of the keeper or responder service. + #[arg(long, env = "VALIDATOR_ADDRESS")] + pub validator_address: Option, } #[derive(Debug)] @@ -45,7 +46,8 @@ pub struct Erc8004Config { pub validation_registry_contract_address: Address, pub private_key: String, pub agent_uri: String, - pub heartbeat_manager_address: Address, + /// The validator address authorized to submit responses (typically the keeper) + pub validator_address: Address, pub slot_ms: u64, } @@ -82,14 +84,14 @@ impl Erc8004Config { .or_else(|| state_file.load_value("AGENT_URI")) .unwrap_or_else(|| "https://example.com/agent".to_string()); - let heartbeat_manager_address = args - .heartbeat_manager_address - .or_else(|| state_file.load_value("HEARTBEAT_MANAGER_ADDRESS")) - .unwrap_or_else(|| "0x5FC8d32690cc91D4c39d9d3abcBD16989F875707".to_string()) + let validator_address = args + .validator_address + .or_else(|| state_file.load_value("VALIDATOR_ADDRESS")) + .unwrap_or_else(|| "0x90F79bf6EB2c4f870365E785982E1f101E93b906".to_string()) .parse::

()?; info!( - "Loaded Erc8004Config: rpc_url={rpc_url}, identity_registry={identity_registry_contract_address}, validation_registry={validation_registry_contract_address}" + "Loaded Erc8004Config: rpc_url={rpc_url}, identity_registry={identity_registry_contract_address}, validation_registry={validation_registry_contract_address}, validator={validator_address}" ); Ok(Self { @@ -98,7 +100,7 @@ impl Erc8004Config { validation_registry_contract_address, private_key, agent_uri, - heartbeat_manager_address, + validator_address, slot_ms: DEFAULT_SLOT_MS, }) } @@ -177,7 +179,7 @@ impl Erc8004Simulator { info!( slot, agent_id = %agent_id, - heartbeat_manager = %config.heartbeat_manager_address, + validator = %config.validator_address, snapshot_id = snapshot_id, request_uri = %request_uri, "Submitting validation request" @@ -189,7 +191,7 @@ impl Erc8004Simulator { let tx_hash = client .validation_registry .validation_request( - config.heartbeat_manager_address, + config.validator_address, agent_id, request_uri, request_hash,