From fe0449461629c0f0815b44a185688bdaf6009a43 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 5 May 2026 22:12:28 +0200 Subject: [PATCH 1/2] Reset LSPS5 `persistence_in_flight` counter on persist errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `LSPS5ServiceHandler::persist` incremented `persistence_in_flight` at the top as a single-runner gate, but only decremented it on the success path: each interior `?` on a `kv_store` future propagated the error out of the function while leaving the counter at >= 1. After one transient I/O failure (disk full, brief unavailability of a remote `KVStore`, EPERM, etc.) every subsequent `persist()` call hit the `fetch_add > 0` short-circuit and silently returned `Ok(false)`. The in-memory `needs_persist` flags then continued to grow without ever reaching disk, so webhook state, removals, and notification cooldowns were lost on the next process restart — including the spec-mandated webhook retention/pruning state — without any error surfaced to the operator. The counter is monotonic, so recovery required a process restart. Adopt the LSPS1 / LSPS2 pattern: split the body into an inner `do_persist` and an outer `persist` that unconditionally clears the counter via `store(0)` after the call returns, regardless of outcome. A failed write now still propagates `Err`, but the next `persist()` attempt actually retries the write instead of no-op'ing. Co-Authored-By: HAL 9000 --- lightning-liquidity/src/lsps5/service.rs | 13 +- .../tests/lsps5_integration_tests.rs | 170 ++++++++++++++++++ 2 files changed, 180 insertions(+), 3 deletions(-) diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 4678d38dc9a..4a57ff6048e 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -245,14 +245,21 @@ where // introduce some batching to upper-bound the number of requests inflight at any given // time. - let mut did_persist = false; - if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 { // If we're not the first event processor to get here, just return early, the increment // we just did will be treated as "go around again" at the end. - return Ok(did_persist); + return Ok(false); } + let res = self.do_persist().await; + debug_assert!(res.is_err() || self.persistence_in_flight.load(Ordering::Acquire) == 0); + self.persistence_in_flight.store(0, Ordering::Release); + res + } + + async fn do_persist(&self) -> Result { + let mut did_persist = false; + loop { let mut need_remove = Vec::new(); let mut need_persist = Vec::new(); diff --git a/lightning-liquidity/tests/lsps5_integration_tests.rs b/lightning-liquidity/tests/lsps5_integration_tests.rs index 2b32b4dcbc6..deed6b2f8b8 100644 --- a/lightning-liquidity/tests/lsps5_integration_tests.rs +++ b/lightning-liquidity/tests/lsps5_integration_tests.rs @@ -1633,3 +1633,173 @@ fn lsps5_service_handler_persistence_across_restarts() { } } } + +struct FailableKVStore { + inner: TestStore, + fail_lsps5: std::sync::atomic::AtomicBool, +} + +impl FailableKVStore { + fn new() -> Self { + Self { inner: TestStore::new(false), fail_lsps5: std::sync::atomic::AtomicBool::new(false) } + } + + fn set_fail_lsps5(&self, fail: bool) { + self.fail_lsps5.store(fail, std::sync::atomic::Ordering::SeqCst); + } +} + +impl lightning::util::persist::KVStoreSync for FailableKVStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> lightning::io::Result> { + ::read( + &self.inner, + primary_namespace, + secondary_namespace, + key, + ) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> lightning::io::Result<()> { + if secondary_namespace == "lsps5_service" + && self.fail_lsps5.load(std::sync::atomic::Ordering::SeqCst) + { + return Err(lightning::io::Error::new( + lightning::io::ErrorKind::Other, + "intentional failure for lsps5 namespace", + )); + } + ::write( + &self.inner, + primary_namespace, + secondary_namespace, + key, + buf, + ) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> lightning::io::Result<()> { + if secondary_namespace == "lsps5_service" + && self.fail_lsps5.load(std::sync::atomic::Ordering::SeqCst) + { + return Err(lightning::io::Error::new( + lightning::io::ErrorKind::Other, + "intentional failure for lsps5 namespace", + )); + } + ::remove( + &self.inner, + primary_namespace, + secondary_namespace, + key, + lazy, + ) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> lightning::io::Result> { + ::list( + &self.inner, + primary_namespace, + secondary_namespace, + ) + } +} + +#[test] +fn lsps5_service_persist_resets_in_flight_counter_on_io_error() { + use lightning::ln::peer_handler::CustomMessageHandler; + + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let service_kv_store = Arc::new(FailableKVStore::new()); + let client_kv_store = Arc::new(TestStore::new(false)); + + let service_config = LiquidityServiceConfig { + lsps1_service_config: None, + lsps2_service_config: None, + lsps5_service_config: Some(LSPS5ServiceConfig::default()), + advertise_service: true, + }; + let client_config = LiquidityClientConfig { + lsps1_client_config: None, + lsps2_client_config: None, + lsps5_client_config: Some(LSPS5ClientConfig::default()), + }; + let time_provider: Arc = Arc::new(DefaultTimeProvider); + + let service_lm = LiquidityManagerSync::new_with_custom_time_provider( + nodes[0].keys_manager, + nodes[0].keys_manager, + nodes[0].node, + Arc::clone(&service_kv_store), + nodes[0].tx_broadcaster, + Some(service_config), + None, + Arc::clone(&time_provider), + ) + .unwrap(); + + let client_lm = LiquidityManagerSync::new_with_custom_time_provider( + nodes[1].keys_manager, + nodes[1].keys_manager, + nodes[1].node, + client_kv_store, + nodes[1].tx_broadcaster, + None, + Some(client_config), + Arc::clone(&time_provider), + ) + .unwrap(); + + let service_node_id = nodes[0].node.get_our_node_id(); + let client_node_id = nodes[1].node.get_our_node_id(); + + create_chan_between_nodes(&nodes[0], &nodes[1]); + + let client_handler = client_lm.lsps5_client_handler().unwrap(); + client_handler + .set_webhook(service_node_id, "App".to_string(), "https://example.org/hook".to_string()) + .unwrap(); + + let req_msgs = client_lm.get_and_clear_pending_msg(); + assert_eq!(req_msgs.len(), 1); + let (_, request) = req_msgs.into_iter().next().unwrap(); + service_lm.handle_custom_message(request, client_node_id).unwrap(); + + // Consume the SendWebhookNotification event so pending events queue is drained. + let _ = service_lm.next_event(); + let _ = service_lm.get_and_clear_pending_msg(); + + // Initial persist should succeed and clear all needs_persist flags. + service_lm.persist().expect("initial persist should succeed"); + + // Now arrange for lsps5 writes to fail and dirty lsps5 state without dirtying + // pending_events (which lives in a different namespace). + service_kv_store.set_fail_lsps5(true); + service_lm.peer_disconnected(client_node_id); + + // First persist attempt should error out due to the failing kv_store. + let res1 = service_lm.persist(); + assert!(res1.is_err(), "persist should fail when lsps5 kv_store write fails"); + + // Second persist attempt must still attempt the write (and fail again). With the + // bug, the LSPS5 service handler's `persistence_in_flight` counter is left above + // zero on error so this returns Ok(false) immediately, silently dropping the + // pending state and breaking persistence forever. + let res2 = service_lm.persist(); + assert!( + res2.is_err(), + "after a failed persist, subsequent persist calls must still attempt to persist; got {:?}", + res2, + ); +} From a23feecb60b9083568497898cb5e8d7d65424786 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 6 May 2026 11:16:27 +0200 Subject: [PATCH 2/2] fixup! Reset LSPS5 `persistence_in_flight` counter on persist errors --- lightning-liquidity/src/lsps5/service.rs | 136 ++++++++++++----------- 1 file changed, 71 insertions(+), 65 deletions(-) diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 4a57ff6048e..55d96e186d1 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -251,85 +251,91 @@ where return Ok(false); } - let res = self.do_persist().await; - debug_assert!(res.is_err() || self.persistence_in_flight.load(Ordering::Acquire) == 0); - self.persistence_in_flight.store(0, Ordering::Release); - res - } - - async fn do_persist(&self) -> Result { let mut did_persist = false; loop { - let mut need_remove = Vec::new(); - let mut need_persist = Vec::new(); + match self.do_persist().await { + Ok(pass_did_persist) => did_persist |= pass_did_persist, + Err(e) => { + self.persistence_in_flight.store(0, Ordering::Release); + return Err(e); + }, + } - self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap()); - { - let outer_state_lock = self.per_peer_state.read().unwrap(); - - for (client_id, peer_state) in outer_state_lock.iter() { - let is_prunable = peer_state.is_prunable(); - let has_open_channel = self.client_has_open_channel(client_id); - if is_prunable && !has_open_channel { - need_remove.push(*client_id); - } else if peer_state.needs_persist { - need_persist.push(*client_id); - } - } + if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 { + // If another thread incremented the state while we were running we should go + // around again, but only once. + self.persistence_in_flight.store(1, Ordering::Release); + continue; } + break; + } - for client_id in need_persist.into_iter() { - debug_assert!(!need_remove.contains(&client_id)); - self.persist_peer_state(client_id).await?; - did_persist = true; + Ok(did_persist) + } + + async fn do_persist(&self) -> Result { + let mut did_persist = false; + let mut need_remove = Vec::new(); + let mut need_persist = Vec::new(); + + self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap()); + { + let outer_state_lock = self.per_peer_state.read().unwrap(); + + for (client_id, peer_state) in outer_state_lock.iter() { + let is_prunable = peer_state.is_prunable(); + let has_open_channel = self.client_has_open_channel(client_id); + if is_prunable && !has_open_channel { + need_remove.push(*client_id); + } else if peer_state.needs_persist { + need_persist.push(*client_id); + } } + } - for client_id in need_remove { - let mut future_opt = None; - { - // We need to take the `per_peer_state` write lock to remove an entry, but also - // have to hold it until after the `remove` call returns (but not through - // future completion) to ensure that writes for the peer's state are - // well-ordered with other `persist_peer_state` calls even across the removal - // itself. - let mut per_peer_state = self.per_peer_state.write().unwrap(); - if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) { - let state = entry.get_mut(); - if state.is_prunable() && !self.client_has_open_channel(&client_id) { - entry.remove(); - let key = client_id.to_string(); - future_opt = Some(self.kv_store.remove( - LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - true, - )); - } else { - // If the peer was re-added, force a re-persist of the current state. - state.needs_persist = true; - } + for client_id in need_persist.into_iter() { + debug_assert!(!need_remove.contains(&client_id)); + self.persist_peer_state(client_id).await?; + did_persist = true; + } + + for client_id in need_remove { + let mut future_opt = None; + { + // We need to take the `per_peer_state` write lock to remove an entry, but also + // have to hold it until after the `remove` call returns (but not through + // future completion) to ensure that writes for the peer's state are + // well-ordered with other `persist_peer_state` calls even across the removal + // itself. + let mut per_peer_state = self.per_peer_state.write().unwrap(); + if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) { + let state = entry.get_mut(); + if state.is_prunable() && !self.client_has_open_channel(&client_id) { + entry.remove(); + let key = client_id.to_string(); + future_opt = Some(self.kv_store.remove( + LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + true, + )); } else { - // This should never happen, we can only have one `persist` call - // in-progress at once and map entries are only removed by it. - debug_assert!(false); + // If the peer was re-added, force a re-persist of the current state. + state.needs_persist = true; } - } - if let Some(future) = future_opt { - future.await?; - did_persist = true; } else { - self.persist_peer_state(client_id).await?; + // This should never happen, we can only have one `persist` call + // in-progress at once and map entries are only removed by it. + debug_assert!(false); } } - - if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 { - // If another thread incremented the state while we were running we should go - // around again, but only once. - self.persistence_in_flight.store(1, Ordering::Release); - continue; + if let Some(future) = future_opt { + future.await?; + did_persist = true; + } else { + self.persist_peer_state(client_id).await?; } - break; } Ok(did_persist)