From 5d9791613101bbb89eab9b90548bcebf410ea9f1 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Sun, 28 Dec 2025 17:42:29 -0600 Subject: [PATCH 1/3] Make inserting into TxMetadataStore async --- orange-sdk/src/event.rs | 2 +- orange-sdk/src/lib.rs | 42 ++--- orange-sdk/src/rebalancer.rs | 56 ++++--- orange-sdk/src/store.rs | 178 ++++++++++----------- orange-sdk/src/trusted_wallet/cashu/mod.rs | 5 +- orange-sdk/src/trusted_wallet/dummy.rs | 1 + orange-sdk/src/trusted_wallet/spark/mod.rs | 2 +- 7 files changed, 144 insertions(+), 142 deletions(-) diff --git a/orange-sdk/src/event.rs b/orange-sdk/src/event.rs index a815fd1..d7c740d 100644 --- a/orange-sdk/src/event.rs +++ b/orange-sdk/src/event.rs @@ -339,7 +339,7 @@ impl LdkEventHandler { let preimage = payment_preimage.unwrap(); // safe let payment_id = PaymentId::SelfCustodial(payment_id.unwrap().0); // safe - if self.tx_metadata.set_preimage(payment_id, preimage.0).is_err() { + if self.tx_metadata.set_preimage(payment_id, preimage.0).await.is_err() { log_error!(self.logger, "Failed to set preimage for payment {payment_id:?}"); } diff --git a/orange-sdk/src/lib.rs b/orange-sdk/src/lib.rs index be69f95..3b4bcca 100644 --- a/orange-sdk/src/lib.rs +++ b/orange-sdk/src/lib.rs @@ -1095,15 +1095,18 @@ impl Wallet { let res = self.inner.trusted.pay(method, instructions.amount).await; match res { Ok(id) => { - self.inner.tx_metadata.insert( - PaymentId::Trusted(id), - TxMetadata { - ty: TxType::Payment { ty: ty() }, - time: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(), - }, - ); + self.inner + .tx_metadata + .insert( + PaymentId::Trusted(id), + TxMetadata { + ty: TxType::Payment { ty: ty() }, + time: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(), + }, + ) + .await; return Ok(PaymentId::Trusted(id)); }, Err(e) => { @@ -1136,15 +1139,18 @@ impl Wallet { // Note that the Payment Id can be repeated if we make a payment, // it fails, then we attempt to pay the same (BOLT 11) invoice // again. - self.inner.tx_metadata.upsert( - PaymentId::SelfCustodial(id.0), - TxMetadata { - ty: TxType::Payment { ty: typ }, - time: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(), - }, - ); + self.inner + .tx_metadata + .upsert( + PaymentId::SelfCustodial(id.0), + TxMetadata { + ty: TxType::Payment { ty: typ }, + time: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(), + }, + ) + .await; let inner_ref = Arc::clone(&self.inner); self.inner.runtime.spawn_cancellable_background_task(async move { inner_ref.rebalancer.do_rebalance_if_needed().await; diff --git a/orange-sdk/src/rebalancer.rs b/orange-sdk/src/rebalancer.rs index 84771d5..00920a6 100644 --- a/orange-sdk/src/rebalancer.rs +++ b/orange-sdk/src/rebalancer.rs @@ -99,15 +99,17 @@ impl RebalanceTrigger for OrangeTrigger { payment.id.as_hex() ); new_txn.push((payment.amount, &payment.id)); - self.tx_metadata.insert( - payment_id, - TxMetadata { - ty: TxType::Payment { ty: PaymentType::IncomingLightning {} }, - time: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(), - }, - ); + self.tx_metadata + .insert( + payment_id, + TxMetadata { + ty: TxType::Payment { ty: PaymentType::IncomingLightning {} }, + time: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(), + }, + ) + .await; } } @@ -215,19 +217,21 @@ impl RebalanceTrigger for OrangeTrigger { // make sure we have a metadata entry for the triggering transaction let trigger = PaymentId::SelfCustodial(txid.to_byte_array()); if self.tx_metadata.read().get(&trigger).is_none() { - self.tx_metadata.insert( - trigger, - TxMetadata { - ty: TxType::Payment { - ty: PaymentType::IncomingOnChain { - txid: Some(txid), + self.tx_metadata + .insert( + trigger, + TxMetadata { + ty: TxType::Payment { + ty: PaymentType::IncomingOnChain { + txid: Some(txid), + }, }, + time: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(), }, - time: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(), - }, - ); + ) + .await; } Some(TriggerParams { @@ -295,7 +299,8 @@ impl graduated_rebalancer::EventHandler for OrangeRebalanceEventHandler { time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(), }; self.tx_metadata - .insert(PaymentId::Trusted(trusted_rebalance_payment_id), metadata); + .insert(PaymentId::Trusted(trusted_rebalance_payment_id), metadata) + .await; if let Err(e) = self .event_queue .add_event(Event::RebalanceInitiated { @@ -318,6 +323,7 @@ impl graduated_rebalancer::EventHandler for OrangeRebalanceEventHandler { let triggering_transaction_id = PaymentId::Trusted(trigger_id); self.tx_metadata .set_tx_caused_rebalance(&triggering_transaction_id) + .await .expect("Failed to write metadata for rebalance transaction"); let metadata = TxMetadata { ty: TxType::TrustedToLightning { @@ -327,8 +333,8 @@ impl graduated_rebalancer::EventHandler for OrangeRebalanceEventHandler { }, time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(), }; - self.tx_metadata.upsert(PaymentId::Trusted(rebalance_id), metadata); - self.tx_metadata.insert(PaymentId::SelfCustodial(lightning_id), metadata); + self.tx_metadata.upsert(PaymentId::Trusted(rebalance_id), metadata).await; + self.tx_metadata.insert(PaymentId::SelfCustodial(lightning_id), metadata).await; let event_queue = Arc::clone(&self.event_queue); let logger = Arc::clone(&self.logger); @@ -357,13 +363,15 @@ impl graduated_rebalancer::EventHandler for OrangeRebalanceEventHandler { let trigger_id = PaymentId::SelfCustodial(triggering_txid.to_byte_array()); self.tx_metadata .set_tx_caused_rebalance(&trigger_id) + .await .expect("Failed to write metadata for onchain rebalance transaction"); let metadata = TxMetadata { ty: TxType::OnchainToLightning { channel_txid: chan_txid, triggering_txid }, time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(), }; self.tx_metadata - .insert(PaymentId::SelfCustodial(chan_txid.to_byte_array()), metadata); + .insert(PaymentId::SelfCustodial(chan_txid.to_byte_array()), metadata) + .await; }, } }) diff --git a/orange-sdk/src/store.rs b/orange-sdk/src/store.rs index 2908e25..4f1034c 100644 --- a/orange-sdk/src/store.rs +++ b/orange-sdk/src/store.rs @@ -322,124 +322,108 @@ impl TxMetadataStore { self.tx_metadata.read().unwrap() } - fn do_set(&self, key: PaymentId, value: TxMetadata) -> bool { - let mut tx_metadata = self.tx_metadata.write().unwrap(); + async fn do_set(&self, key: PaymentId, value: TxMetadata) -> bool { let key_str = key.to_string(); let ser = value.encode(); - let old = tx_metadata.insert(key, value); - KVStoreSync::write( - self.store.as_ref(), - STORE_PRIMARY_KEY, - STORE_SECONDARY_KEY, - &key_str, - ser, - ) - .expect("We do not allow writes to fail"); + let old = { + let mut tx_metadata = self.tx_metadata.write().unwrap(); + tx_metadata.insert(key, value) + }; + KVStore::write(self.store.as_ref(), STORE_PRIMARY_KEY, STORE_SECONDARY_KEY, &key_str, ser) + .await + .expect("We do not allow writes to fail"); old.is_some() } - pub fn upsert(&self, key: PaymentId, value: TxMetadata) { - self.do_set(key, value); + pub async fn upsert(&self, key: PaymentId, value: TxMetadata) { + self.do_set(key, value).await; } - pub fn insert(&self, key: PaymentId, value: TxMetadata) { - let had_old = self.do_set(key, value); + pub async fn insert(&self, key: PaymentId, value: TxMetadata) { + let had_old = self.do_set(key, value).await; debug_assert!(!had_old); } - pub fn set_tx_caused_rebalance(&self, payment_id: &PaymentId) -> Result<(), ()> { - let mut tx_metadata = self.tx_metadata.write().unwrap(); - if let Some(metadata) = tx_metadata.get_mut(payment_id) { - if let TxType::Payment { ty } = &mut metadata.ty { - metadata.ty = TxType::PaymentTriggeringTransferLightning { ty: *ty }; - let key_str = payment_id.to_string(); - let ser = metadata.encode(); - KVStoreSync::write( - self.store.as_ref(), - STORE_PRIMARY_KEY, - STORE_SECONDARY_KEY, - &key_str, - ser, - ) - .expect("We do not allow writes to fail"); - Ok(()) + pub async fn set_tx_caused_rebalance(&self, payment_id: &PaymentId) -> Result<(), ()> { + let (key_str, ser) = { + let mut tx_metadata = self.tx_metadata.write().unwrap(); + if let Some(metadata) = tx_metadata.get_mut(payment_id) { + if let TxType::Payment { ty } = &mut metadata.ty { + metadata.ty = TxType::PaymentTriggeringTransferLightning { ty: *ty }; + let key_str = payment_id.to_string(); + let ser = metadata.encode(); + (key_str, ser) + } else { + eprintln!("payment_id {payment_id} is not a payment, cannot set rebalance"); + return Err(()); + } } else { - eprintln!("payment_id {payment_id} is not a payment, cannot set rebalance"); - Err(()) + eprintln!("doesn't exist in metadata store: {payment_id}"); + return Err(()); } - } else { - eprintln!("doesn't exist in metadata store: {payment_id}"); - Err(()) - } + }; + KVStore::write(self.store.as_ref(), STORE_PRIMARY_KEY, STORE_SECONDARY_KEY, &key_str, ser) + .await + .expect("We do not allow writes to fail"); + Ok(()) } /// Sets the preimage for an outgoing lightning payment. If the payment already has a preimage, /// this is a no-op and returns Ok(()). If the payment_id does not exist in the store, or if the payment /// is not an outgoing lightning payment, returns Err(()). - pub fn set_preimage(&self, payment_id: PaymentId, preimage: [u8; 32]) -> Result<(), ()> { - let mut tx_metadata = self.tx_metadata.write().unwrap(); - if let Some(metadata) = tx_metadata.get_mut(&payment_id) { - match metadata.ty { - TxType::Payment { ty } => match ty { - PaymentType::OutgoingLightningBolt12 { payment_preimage } => { - if payment_preimage.is_some() { - Ok(()) - } else { - metadata.ty = TxType::Payment { - ty: PaymentType::OutgoingLightningBolt12 { - payment_preimage: Some(PaymentPreimage(preimage)), - }, - }; - - KVStoreSync::write( - self.store.as_ref(), - STORE_PRIMARY_KEY, - STORE_SECONDARY_KEY, - &payment_id.to_string(), - metadata.encode(), - ) - .expect("We do not allow writes to fail"); - Ok(()) - } - }, - PaymentType::OutgoingLightningBolt11 { payment_preimage } => { - if payment_preimage.is_some() { - Ok(()) - } else { - metadata.ty = TxType::Payment { - ty: PaymentType::OutgoingLightningBolt11 { - payment_preimage: Some(PaymentPreimage(preimage)), - }, - }; - - KVStoreSync::write( - self.store.as_ref(), - STORE_PRIMARY_KEY, - STORE_SECONDARY_KEY, - &payment_id.to_string(), - metadata.encode(), - ) - .expect("We do not allow writes to fail"); - Ok(()) - } + pub async fn set_preimage(&self, payment_id: PaymentId, preimage: [u8; 32]) -> Result<(), ()> { + let (key_str, ser) = { + let mut tx_metadata = self.tx_metadata.write().unwrap(); + if let Some(metadata) = tx_metadata.get_mut(&payment_id) { + match metadata.ty { + TxType::Payment { ty } => match ty { + PaymentType::OutgoingLightningBolt12 { payment_preimage } => { + if payment_preimage.is_some() { + return Ok(()); + } else { + metadata.ty = TxType::Payment { + ty: PaymentType::OutgoingLightningBolt12 { + payment_preimage: Some(PaymentPreimage(preimage)), + }, + }; + (payment_id.to_string(), metadata.encode()) + } + }, + PaymentType::OutgoingLightningBolt11 { payment_preimage } => { + if payment_preimage.is_some() { + return Ok(()); + } else { + metadata.ty = TxType::Payment { + ty: PaymentType::OutgoingLightningBolt11 { + payment_preimage: Some(PaymentPreimage(preimage)), + }, + }; + (payment_id.to_string(), metadata.encode()) + } + }, + _ => { + eprintln!( + "payment_id {payment_id} is not an outgoing lightning payment, cannot set preimage" + ); + return Err(()); + }, }, _ => { - eprintln!( - "payment_id {payment_id} is not an outgoing lightning payment, cannot set preimage" - ); - Err(()) + // if we're trying to set a preimage on a non-payment, just continue + // this should only happen when we finish a rebalance payment + return Ok(()); }, - }, - _ => { - // if we're trying to set a preimage on a non-payment, just continue - // this should only happen when we finish a rebalance payment - Ok(()) - }, + } + } else { + eprintln!("doesn't exist in metadata store: {payment_id}"); + return Err(()); } - } else { - eprintln!("doesn't exist in metadata store: {payment_id}"); - Err(()) - } + }; + + KVStore::write(self.store.as_ref(), STORE_PRIMARY_KEY, STORE_SECONDARY_KEY, &key_str, ser) + .await + .expect("We do not allow writes to fail"); + Ok(()) } } diff --git a/orange-sdk/src/trusted_wallet/cashu/mod.rs b/orange-sdk/src/trusted_wallet/cashu/mod.rs index b1053c0..d001c95 100644 --- a/orange-sdk/src/trusted_wallet/cashu/mod.rs +++ b/orange-sdk/src/trusted_wallet/cashu/mod.rs @@ -368,7 +368,10 @@ impl TrustedWalletInterface for Cashu { let payment_preimage = preimage.unwrap_or(PaymentPreimage([0u8; 32])); - if tx_metadata.set_preimage(payment_id, payment_preimage.0).is_err() + if tx_metadata + .set_preimage(payment_id, payment_preimage.0) + .await + .is_err() { log_error!( logger, diff --git a/orange-sdk/src/trusted_wallet/dummy.rs b/orange-sdk/src/trusted_wallet/dummy.rs index 0dcd527..2c8c241 100644 --- a/orange-sdk/src/trusted_wallet/dummy.rs +++ b/orange-sdk/src/trusted_wallet/dummy.rs @@ -118,6 +118,7 @@ impl DummyTrustedWallet { if !is_rebalance { if tx_metadata .set_preimage(payment_id, payment_preimage.unwrap().0) + .await .is_err() { println!("Failed to set preimage for payment {payment_id:?}"); diff --git a/orange-sdk/src/trusted_wallet/spark/mod.rs b/orange-sdk/src/trusted_wallet/spark/mod.rs index 2c7f0b1..23c35f2 100644 --- a/orange-sdk/src/trusted_wallet/spark/mod.rs +++ b/orange-sdk/src/trusted_wallet/spark/mod.rs @@ -400,7 +400,7 @@ impl SparkEventHandler { TrustedError::Other(format!("Invalid payment_hash hex: {e:?}")) })?; - if self.tx_metadata.set_preimage(payment_id, preimage).is_err() { + if self.tx_metadata.set_preimage(payment_id, preimage).await.is_err() { log_error!( self.logger, "Failed to set preimage for payment {payment_id:?}" From 7d59d74ba2b9d2f8c87cfb503db1654b555017cf Mon Sep 17 00:00:00 2001 From: benthecarman Date: Sun, 28 Dec 2025 17:55:39 -0600 Subject: [PATCH 2/3] Make rebalance_enabled storage async --- examples/cli/src/main.rs | 2 +- orange-sdk/src/event.rs | 2 +- orange-sdk/src/ffi/orange/wallet.rs | 12 ++++++------ orange-sdk/src/lib.rs | 12 ++++++------ orange-sdk/src/rebalancer.rs | 4 ++-- orange-sdk/src/store.rs | 11 ++++++----- orange-sdk/tests/integration_tests.rs | 26 +++++++++++++------------- 7 files changed, 35 insertions(+), 34 deletions(-) diff --git a/examples/cli/src/main.rs b/examples/cli/src/main.rs index 0f71498..138716b 100644 --- a/examples/cli/src/main.rs +++ b/examples/cli/src/main.rs @@ -474,7 +474,7 @@ async fn execute_command(command: Commands, state: &mut WalletState) -> Result<( ); println!( "Rebalance Enabled: {}", - if wallet.get_rebalance_enabled() { + if wallet.get_rebalance_enabled().await { "Yes".bright_green() } else { "No".bright_red() diff --git a/orange-sdk/src/event.rs b/orange-sdk/src/event.rs index d7c740d..87d179d 100644 --- a/orange-sdk/src/event.rs +++ b/orange-sdk/src/event.rs @@ -442,7 +442,7 @@ impl LdkEventHandler { } => { // We experienced a channel close, we disable rebalancing so we don't automatically // try to reopen the channel. - store::set_rebalance_enabled(self.event_queue.kv_store.as_ref(), false); + store::set_rebalance_enabled(self.event_queue.kv_store.as_ref(), false).await; if let Err(e) = self .event_queue diff --git a/orange-sdk/src/ffi/orange/wallet.rs b/orange-sdk/src/ffi/orange/wallet.rs index fe17635..3ad3970 100644 --- a/orange-sdk/src/ffi/orange/wallet.rs +++ b/orange-sdk/src/ffi/orange/wallet.rs @@ -118,13 +118,13 @@ impl Wallet { } /// Sets whether the wallet should automatically rebalance from trusted/onchain to lightning. - pub fn set_rebalance_enabled(&self, value: bool) { - self.inner.set_rebalance_enabled(value) + pub async fn set_rebalance_enabled(&self, value: bool) { + self.inner.set_rebalance_enabled(value).await } /// Whether the wallet should automatically rebalance from trusted/onchain to lightning. - pub fn get_rebalance_enabled(&self) -> bool { - self.inner.get_rebalance_enabled() + pub async fn get_rebalance_enabled(&self) -> bool { + self.inner.get_rebalance_enabled().await } pub async fn list_transactions( @@ -191,8 +191,8 @@ impl Wallet { } /// List our current channels - pub fn close_channels(&self) -> Result<(), WalletError> { - self.inner.close_channels()?; + pub async fn close_channels(&self) -> Result<(), WalletError> { + self.inner.close_channels().await?; Ok(()) } diff --git a/orange-sdk/src/lib.rs b/orange-sdk/src/lib.rs index 3b4bcca..f6a53b4 100644 --- a/orange-sdk/src/lib.rs +++ b/orange-sdk/src/lib.rs @@ -634,13 +634,13 @@ impl Wallet { } /// Sets whether the wallet should automatically rebalance from trusted/onchain to lightning. - pub fn set_rebalance_enabled(&self, value: bool) { - store::set_rebalance_enabled(self.inner.store.as_ref(), value) + pub async fn set_rebalance_enabled(&self, value: bool) { + store::set_rebalance_enabled(self.inner.store.as_ref(), value).await } /// Whether the wallet should automatically rebalance from trusted/onchain to lightning. - pub fn get_rebalance_enabled(&self) -> bool { - store::get_rebalance_enabled(self.inner.store.as_ref()) + pub async fn get_rebalance_enabled(&self) -> bool { + store::get_rebalance_enabled(self.inner.store.as_ref()).await } /// Returns the lightning wallet's node id. @@ -1268,10 +1268,10 @@ impl Wallet { /// Initiates closing all channels in the lightning wallet. The channel will not be closed /// until a [`Event::ChannelClosed`] event is emitted. /// This will disable rebalancing before closing channels, so that we don't try to reopen them. - pub fn close_channels(&self) -> Result<(), WalletError> { + pub async fn close_channels(&self) -> Result<(), WalletError> { // we are explicitly disabling rebalancing here, so that we don't try to // reopen channels after closing them. - self.set_rebalance_enabled(false); + self.set_rebalance_enabled(false).await; self.inner.ln_wallet.close_channels()?; diff --git a/orange-sdk/src/rebalancer.rs b/orange-sdk/src/rebalancer.rs index 00920a6..335717c 100644 --- a/orange-sdk/src/rebalancer.rs +++ b/orange-sdk/src/rebalancer.rs @@ -62,7 +62,7 @@ impl OrangeTrigger { impl RebalanceTrigger for OrangeTrigger { fn needs_trusted_rebalance(&self) -> impl Future> + Send { async move { - let rebalance_enabled = store::get_rebalance_enabled(self.store.as_ref()); + let rebalance_enabled = store::get_rebalance_enabled(self.store.as_ref()).await; if !rebalance_enabled { return None; } @@ -143,7 +143,7 @@ impl RebalanceTrigger for OrangeTrigger { fn needs_onchain_rebalance(&self) -> impl Future> + Send { async move { - let rebalance_enabled = store::get_rebalance_enabled(self.store.as_ref()); + let rebalance_enabled = store::get_rebalance_enabled(self.store.as_ref()).await; if !rebalance_enabled { return None; } diff --git a/orange-sdk/src/store.rs b/orange-sdk/src/store.rs index 4f1034c..52d059d 100644 --- a/orange-sdk/src/store.rs +++ b/orange-sdk/src/store.rs @@ -429,14 +429,14 @@ impl TxMetadataStore { const REBALANCE_ENABLED_KEY: &str = "rebalance_enabled"; -pub(crate) fn get_rebalance_enabled(store: &DynStore) -> bool { - match KVStoreSync::read(store, STORE_PRIMARY_KEY, "", REBALANCE_ENABLED_KEY) { +pub(crate) async fn get_rebalance_enabled(store: &DynStore) -> bool { + match KVStore::read(store, STORE_PRIMARY_KEY, "", REBALANCE_ENABLED_KEY).await { Ok(bytes) => Readable::read(&mut &bytes[..]).expect("Invalid data in rebalance_enabled"), Err(e) if e.kind() == io::ErrorKind::NotFound => { // if rebalance_enabled is not found, default to true // and write it to the store so we don't have to do this again let rebalance_enabled = true; - set_rebalance_enabled(store, rebalance_enabled); + set_rebalance_enabled(store, rebalance_enabled).await; rebalance_enabled }, Err(e) => { @@ -445,9 +445,10 @@ pub(crate) fn get_rebalance_enabled(store: &DynStore) -> bool { } } -pub(crate) fn set_rebalance_enabled(store: &DynStore, enabled: bool) { +pub(crate) async fn set_rebalance_enabled(store: &DynStore, enabled: bool) { let bytes = enabled.encode(); - KVStoreSync::write(store, STORE_PRIMARY_KEY, "", REBALANCE_ENABLED_KEY, bytes) + KVStore::write(store, STORE_PRIMARY_KEY, "", REBALANCE_ENABLED_KEY, bytes) + .await .expect("Failed to write rebalance_enabled"); } diff --git a/orange-sdk/tests/integration_tests.rs b/orange-sdk/tests/integration_tests.rs index 79dd8ad..88e5b0d 100644 --- a/orange-sdk/tests/integration_tests.rs +++ b/orange-sdk/tests/integration_tests.rs @@ -801,7 +801,7 @@ async fn test_pay_onchain_from_self_custody() { let electrsd = Arc::clone(¶ms.electrsd); // disable rebalancing so we have on-chain funds - wallet.set_rebalance_enabled(false); + wallet.set_rebalance_enabled(false).await; let starting_bal = wallet.get_balance().await.unwrap(); assert_eq!(starting_bal.available_balance(), Amount::ZERO); @@ -1030,7 +1030,7 @@ async fn test_force_close_handling() { assert_eq!(starting_bal.available_balance(), Amount::ZERO); assert_eq!(starting_bal.pending_balance, Amount::ZERO); - let rebalancing = wallet.get_rebalance_enabled(); + let rebalancing = wallet.get_rebalance_enabled().await; assert!(rebalancing); // get a channel so we can make a payment @@ -1060,7 +1060,7 @@ async fn test_force_close_handling() { } // rebalancing should be disabled after a force close - let rebalancing = wallet.get_rebalance_enabled(); + let rebalancing = wallet.get_rebalance_enabled().await; assert!(!rebalancing); }) .await; @@ -1079,7 +1079,7 @@ async fn test_close_all_channels() { assert_eq!(starting_bal.available_balance(), Amount::ZERO); assert_eq!(starting_bal.pending_balance, Amount::ZERO); - let rebalancing = wallet.get_rebalance_enabled(); + let rebalancing = wallet.get_rebalance_enabled().await; assert!(rebalancing); // get a channel so we can make a payment @@ -1089,7 +1089,7 @@ async fn test_close_all_channels() { generate_blocks(&bitcoind, &electrsd, 6).await; // init closing all channels - wallet.close_channels().unwrap(); + wallet.close_channels().await.unwrap(); // wait for the channels to be closed let event = wait_next_event(&wallet).await; @@ -1101,7 +1101,7 @@ async fn test_close_all_channels() { } // rebalancing should be disabled after closing all channels - let rebalancing = wallet.get_rebalance_enabled(); + let rebalancing = wallet.get_rebalance_enabled().await; assert!(!rebalancing); }) .await; @@ -1115,7 +1115,7 @@ async fn test_threshold_boundary_trusted_balance_limit() { // we're not testing rebalancing here, so disable it to keep things simple // on slow CI this can cause tests to fail if rebalancing kicks in - wallet.set_rebalance_enabled(false); + wallet.set_rebalance_enabled(false).await; let tunables = wallet.get_tunables(); @@ -1508,7 +1508,7 @@ async fn test_payment_network_mismatch() { let electrsd = Arc::clone(¶ms.electrsd); // disable rebalancing so we have on-chain funds - wallet.set_rebalance_enabled(false); + wallet.set_rebalance_enabled(false).await; // fund wallet with on-chain let recv_amount = Amount::from_sats(1_000_000).unwrap(); @@ -2045,14 +2045,14 @@ async fn test_wallet_configuration_validation() { // without creating new wallets // Test 2: Verify rebalancing can be toggled - let initial_rebalance_state = wallet.get_rebalance_enabled(); + let initial_rebalance_state = wallet.get_rebalance_enabled().await; assert!(initial_rebalance_state, "Rebalancing should be enabled by default"); - wallet.set_rebalance_enabled(false); - assert!(!wallet.get_rebalance_enabled(), "Should be able to disable rebalancing"); + wallet.set_rebalance_enabled(false).await; + assert!(!wallet.get_rebalance_enabled().await, "Should be able to disable rebalancing"); - wallet.set_rebalance_enabled(true); - assert!(wallet.get_rebalance_enabled(), "Should be able to re-enable rebalancing"); + wallet.set_rebalance_enabled(true).await; + assert!(wallet.get_rebalance_enabled().await, "Should be able to re-enable rebalancing"); // Test 3: Verify tunables are consistent and reasonable let tunables = wallet.get_tunables(); From 64d4e4e15c7158c83bc55e4ded51ee751d10b716 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Sun, 28 Dec 2025 17:58:11 -0600 Subject: [PATCH 3/3] Make splice storage async --- orange-sdk/src/lib.rs | 2 +- orange-sdk/src/lightning_wallet.rs | 3 ++- orange-sdk/src/store.rs | 18 ++++++++++-------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/orange-sdk/src/lib.rs b/orange-sdk/src/lib.rs index f6a53b4..9984055 100644 --- a/orange-sdk/src/lib.rs +++ b/orange-sdk/src/lib.rs @@ -664,7 +664,7 @@ impl Wallet { let mut lightning_payments = self.inner.ln_wallet.list_payments(); lightning_payments.sort_by_key(|l| l.latest_update_timestamp); - let splice_outs = store::read_splice_outs(self.inner.store.as_ref()); + let splice_outs = store::read_splice_outs(self.inner.store.as_ref()).await; let mut res = Vec::with_capacity( trusted_payments.len() + lightning_payments.len() + splice_outs.len(), diff --git a/orange-sdk/src/lightning_wallet.rs b/orange-sdk/src/lightning_wallet.rs index b09e07c..4ccdef2 100644 --- a/orange-sdk/src/lightning_wallet.rs +++ b/orange-sdk/src/lightning_wallet.rs @@ -366,7 +366,8 @@ impl LightningWallet { store::write_splice_out( self.inner.store.as_ref(), &details, - ); + ) + .await; return Ok(id); } }, diff --git a/orange-sdk/src/store.rs b/orange-sdk/src/store.rs index 52d059d..f6eb15a 100644 --- a/orange-sdk/src/store.rs +++ b/orange-sdk/src/store.rs @@ -19,7 +19,7 @@ use ldk_node::bitcoin::hex::{DisplayHex, FromHex}; use ldk_node::lightning::io; use ldk_node::lightning::ln::msgs::DecodeError; use ldk_node::lightning::types::payment::PaymentPreimage; -use ldk_node::lightning::util::persist::{KVStore, KVStoreSync}; +use ldk_node::lightning::util::persist::KVStore; use ldk_node::lightning::util::ser::{Readable, Writeable, Writer}; use ldk_node::lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use ldk_node::payment::PaymentDetails; @@ -452,25 +452,27 @@ pub(crate) async fn set_rebalance_enabled(store: &DynStore, enabled: bool) { .expect("Failed to write rebalance_enabled"); } -pub(crate) fn write_splice_out(store: &DynStore, details: &PaymentDetails) { - KVStoreSync::write( +pub(crate) async fn write_splice_out(store: &DynStore, details: &PaymentDetails) { + KVStore::write( store, STORE_PRIMARY_KEY, SPLICE_OUT_SECONDARY_KEY, &details.id.0.to_lower_hex_string(), details.encode(), ) + .await .expect("Failed to write splice out txid"); } -pub(crate) fn read_splice_outs(store: &DynStore) -> Vec { - let keys = KVStoreSync::list(store, STORE_PRIMARY_KEY, SPLICE_OUT_SECONDARY_KEY) +pub(crate) async fn read_splice_outs(store: &DynStore) -> Vec { + let keys = KVStore::list(store, STORE_PRIMARY_KEY, SPLICE_OUT_SECONDARY_KEY) + .await .expect("We do not allow reads to fail"); let mut splice_outs = Vec::with_capacity(keys.len()); for key in keys { - let data_bytes = - KVStoreSync::read(store, STORE_PRIMARY_KEY, SPLICE_OUT_SECONDARY_KEY, &key) - .expect("We do not allow reads to fail"); + let data_bytes = KVStore::read(store, STORE_PRIMARY_KEY, SPLICE_OUT_SECONDARY_KEY, &key) + .await + .expect("We do not allow reads to fail"); let data = Readable::read(&mut &data_bytes[..]).expect("Invalid data in splice out storage"); splice_outs.push(data);