diff --git a/libs/gl-client/src/lib.rs b/libs/gl-client/src/lib.rs index e17c825c2..80508bed8 100644 --- a/libs/gl-client/src/lib.rs +++ b/libs/gl-client/src/lib.rs @@ -29,6 +29,7 @@ pub mod scheduler; pub mod signer; pub mod persist; +pub mod metrics; pub mod lnurl; diff --git a/libs/gl-client/src/metrics.rs b/libs/gl-client/src/metrics.rs new file mode 100644 index 000000000..453db8cfa --- /dev/null +++ b/libs/gl-client/src/metrics.rs @@ -0,0 +1,43 @@ +use prost::Message; + +const HSM_REQUEST_SIGNER_STATE_FIELD_NUMBER: u32 = 4; +const HSM_RESPONSE_SIGNER_STATE_FIELD_NUMBER: u32 = 5; + +fn protobuf_varint_len(mut value: usize) -> usize { + let mut len = 1; + while value >= 0x80 { + value >>= 7; + len += 1; + } + len +} + +fn signer_state_wire_bytes(entries: &[crate::pb::SignerStateEntry], field_number: u32) -> usize { + let field_key = ((field_number << 3) | 2) as usize; // wire type 2 = length-delimited + let field_key_len = protobuf_varint_len(field_key); + entries + .iter() + .map(|entry| { + let entry_len = entry.encoded_len(); + field_key_len + protobuf_varint_len(entry_len) + entry_len + }) + .sum() +} + +pub fn signer_state_request_wire_bytes(entries: &[crate::pb::SignerStateEntry]) -> usize { + signer_state_wire_bytes(entries, HSM_REQUEST_SIGNER_STATE_FIELD_NUMBER) +} + +pub fn signer_state_response_wire_bytes(entries: &[crate::pb::SignerStateEntry]) -> usize { + signer_state_wire_bytes(entries, HSM_RESPONSE_SIGNER_STATE_FIELD_NUMBER) +} + +pub fn savings_percent(full_wire_bytes: usize, diff_wire_bytes: usize) -> usize { + if full_wire_bytes == 0 { + return 0; + } + full_wire_bytes + .saturating_sub(diff_wire_bytes) + .saturating_mul(100) + / full_wire_bytes +} diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index 169470bde..624fb75de 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -20,6 +20,7 @@ const NODE_STATE_PREFIX: &str = "nodestates"; const CHANNEL_PREFIX: &str = "channels"; const ALLOWLIST_PREFIX: &str = "allowlists"; const TRACKER_PREFIX: &str = "trackers"; +const TOMBSTONE_VERSION: u64 = u64::MAX; #[derive(Clone, Serialize, Deserialize)] pub struct State { @@ -35,17 +36,16 @@ impl State { ) -> Result<(), Error> { let node_key = format!("{NODE_PREFIX}/{key}"); let state_key = format!("{NODE_STATE_PREFIX}/{key}"); - assert!(!self.values.contains_key(&node_key), "inserting node twice"); - assert!( - !self.values.contains_key(&state_key), - "inserting node_state twice" - ); + self.ensure_not_tombstone(&node_key)?; + self.ensure_not_tombstone(&state_key)?; + let node_version = self.next_version(&node_key); + let state_version = self.next_version(&state_key); self.values - .insert(node_key, (0u64, serde_json::to_value(node_entry).unwrap())); + .insert(node_key, (node_version, serde_json::to_value(node_entry).unwrap())); self.values.insert( state_key, - (0u64, serde_json::to_value(node_state_entry).unwrap()), + (state_version, serde_json::to_value(node_state_entry).unwrap()), ); Ok(()) } @@ -60,6 +60,7 @@ impl State { serde_json::to_string(&node_state).unwrap() ); let key = format!("{NODE_STATE_PREFIX}/{key}"); + self.ensure_not_tombstone(&key)?; let v = self .values .get_mut(&key) @@ -71,9 +72,23 @@ impl State { fn delete_node(&mut self, key: &str) -> Result<(), Error> { let node_key = format!("{NODE_PREFIX}/{key}"); let state_key = format!("{NODE_STATE_PREFIX}/{key}"); + let allowlist_key = format!("{ALLOWLIST_PREFIX}/{key}"); + let tracker_key = format!("{TRACKER_PREFIX}/{key}"); + let channel_prefix = format!("{CHANNEL_PREFIX}/{key}"); + let channel_keys: Vec = self + .values + .keys() + .filter(|k| k.starts_with(&channel_prefix)) + .cloned() + .collect(); - self.values.remove(&node_key); - self.values.remove(&state_key); + self.put_tombstone(&node_key); + self.put_tombstone(&state_key); + self.put_tombstone(&allowlist_key); + self.put_tombstone(&tracker_key); + for channel_key in channel_keys { + self.put_tombstone(&channel_key); + } Ok(()) } @@ -83,14 +98,16 @@ impl State { channel_entry: vls_persist::model::ChannelEntry, ) -> Result<(), Error> { let key = format!("{CHANNEL_PREFIX}/{key}"); - assert!(!self.values.contains_key(&key)); + self.ensure_not_tombstone(&key)?; + let version = self.next_version(&key); self.values - .insert(key, (0u64, serde_json::to_value(channel_entry).unwrap())); + .insert(key, (version, serde_json::to_value(channel_entry).unwrap())); Ok(()) } fn delete_channel(&mut self, key: &str) { - self.values.remove(key); + let live_key = format!("{CHANNEL_PREFIX}/{key}"); + self.put_tombstone(&live_key); } fn update_channel( @@ -100,6 +117,7 @@ impl State { ) -> Result<(), Error> { trace!("Updating channel {key}"); let key = format!("{CHANNEL_PREFIX}/{key}"); + self.ensure_not_tombstone(&key)?; let v = self .values .get_mut(&key) @@ -113,7 +131,13 @@ impl State { key: &str, ) -> Result { let key = format!("{CHANNEL_PREFIX}/{key}"); - let value = self.values.get(&key).unwrap(); + if self.is_tombstone(&key) { + return Err(Error::Internal(format!("channel {} has been deleted", key))); + } + let value = self + .values + .get(&key) + .ok_or_else(|| Error::Internal(format!("missing channel state for key {}", key)))?; let entry: vls_persist::model::ChannelEntry = serde_json::from_value(value.1.clone()).unwrap(); Ok(entry.into()) @@ -135,6 +159,7 @@ impl State { .values .iter() .filter(|(k, _)| k.starts_with(&prefix)) + .filter(|(k, _)| !self.is_tombstone(k)) .map(|(k, v)| { let key = k.split('/').last().unwrap(); let key = vls_persist::model::NodeChannelId(hex::decode(&key).unwrap()); @@ -153,12 +178,13 @@ impl State { ) -> Result<(), Error> { let key = hex::encode(node_id.serialize()); let key = format!("{TRACKER_PREFIX}/{key}"); - assert!(!self.values.contains_key(&key)); + self.ensure_not_tombstone(&key)?; + let version = self.next_version(&key); let tracker: vls_persist::model::ChainTrackerEntry = tracker.into(); self.values - .insert(key, (0u64, serde_json::to_value(tracker).unwrap())); + .insert(key, (version, serde_json::to_value(tracker).unwrap())); Ok(()) } @@ -166,6 +192,33 @@ impl State { self.values.clear(); Ok(()) } + + fn put_tombstone(&mut self, live_key: &str) { + self.values + .insert(live_key.to_owned(), (TOMBSTONE_VERSION, serde_json::Value::Null)); + } + + fn next_version(&self, live_key: &str) -> u64 { + self.values + .get(live_key) + .map(|v| v.0.saturating_add(1)) + .unwrap_or(0) + } + + fn is_tombstone(&self, live_key: &str) -> bool { + self.values + .get(live_key) + .map(|v| v.0 == TOMBSTONE_VERSION) + .unwrap_or(false) + } + + fn ensure_not_tombstone(&self, key: &str) -> Result<(), Error> { + if self.is_tombstone(key) { + return Err(Error::Internal(format!("key {} has been deleted", key))); + } + Ok(()) + } + } #[derive(Debug)] @@ -175,6 +228,18 @@ pub struct StateChange { new: (u64, serde_json::Value), } +#[derive(Debug, Default)] +pub struct MergeResult { + pub changes: Vec<(String, Option, u64)>, + pub conflict_count: usize, +} + +impl MergeResult { + pub fn has_conflicts(&self) -> bool { + self.conflict_count > 0 + } +} + use core::fmt::Display; impl Display for StateChange { @@ -205,25 +270,54 @@ impl State { } } - /// Take another `State` and attempt to update ourselves with any - /// entry that is newer than ours. This may fail if the other - /// state includes states that are older than our own. - pub fn merge(&mut self, other: &State) -> anyhow::Result, u64)>> { - let mut res = Vec::new(); - for (key, (newver, newval)) in other.values.iter() { - let local = self.values.get_mut(key); + pub fn len(&self) -> usize { + self.values.len() + } - match local { + /// Merge incoming state and track potential version conflicts. + /// + /// A conflict means the incoming state is stale or incompatible with local + /// tombstone knowledge. Callers may use this signal to trigger a full sync. + pub fn merge(&mut self, other: &State) -> anyhow::Result { + let mut res = MergeResult::default(); + for (key, (newver, newval)) in other.values.iter() { + let incoming_is_tombstone = *newver == TOMBSTONE_VERSION; + match self.values.get_mut(key) { None => { trace!("Insert new key {}: version={}", key, newver); - res.push((key.to_owned(), None, *newver)); - self.values.insert(key.clone(), (*newver, newval.clone())); + res.changes.push((key.to_owned(), None, *newver)); + let value = if incoming_is_tombstone { + serde_json::Value::Null + } else { + newval.clone() + }; + self.values.insert(key.clone(), (*newver, value)); } Some(v) => { + if incoming_is_tombstone { + if v.0 == TOMBSTONE_VERSION { + continue; + } + trace!("Tombstoning key {}: version={} => version={}", key, v.0, newver); + res.changes.push((key.to_owned(), Some(v.0), *newver)); + *v = (*newver, serde_json::Value::Null); + continue; + } + + if v.0 == TOMBSTONE_VERSION { + trace!( + "Ignoring live key {} version={} because local key is tombstoned", + key, newver + ); + res.conflict_count += 1; + continue; + } + if v.0 == *newver { continue; } else if v.0 > *newver { warn!("Ignoring outdated state version newver={}, we have oldver={}: newval={:?} vs oldval={:?}", newver, v.0, serde_json::to_string(newval), serde_json::to_string(&v.1)); + res.conflict_count += 1; continue; } else { trace!( @@ -232,7 +326,7 @@ impl State { v.0, *newver ); - res.push((key.to_owned(), Some(v.0), *newver)); + res.changes.push((key.to_owned(), Some(v.0), *newver)); *v = (*newver, newval.clone()); } } @@ -257,6 +351,80 @@ impl State { }) .collect()) } + + /// Return a `State` containing only entries that are newer in `other` than in `self`. + /// This is useful for sending compact state diffs. + pub fn diff_state(&self, other: &State) -> State { + let mut values = BTreeMap::new(); + for (key, (newver, newval)) in other.values.iter() { + match self.values.get(key) { + None => { + values.insert(key.clone(), (*newver, newval.clone())); + } + Some((oldver, _)) if oldver < newver => { + values.insert(key.clone(), (*newver, newval.clone())); + } + _ => {} + } + } + State { values } + } + + pub fn sketch(&self) -> StateSketch { + StateSketch::from_state(self) + } + + // Return a copy of the state with tombstoned entries omitted. + pub fn omit_tombstones(&self) -> State { + let values = self + .values + .iter() + .filter(|(_, (version, _))| *version != TOMBSTONE_VERSION) + .map(|(key, value)| (key.clone(), value.clone())) + .collect(); + State { values } + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct StateSketch { + versions: BTreeMap, +} + +impl StateSketch { + pub fn new() -> Self { + Self::default() + } + + pub fn from_state(state: &State) -> Self { + let mut sketch = Self::new(); + sketch.apply_state(state); + sketch + } + + /// Apply versions from `state` without clearing existing entries. + pub fn apply_state(&mut self, state: &State) { + for (key, (ver, _)) in state.values.iter() { + self.versions.insert(key.clone(), *ver); + } + } + + /// Build a `State` containing entries newer than those recorded in the sketch. + pub fn diff_state(&self, state: &State) -> State { + let mut values = BTreeMap::new(); + for (key, (newver, newval)) in state.values.iter() { + match self.versions.get(key) { + None => { + values.insert(key.clone(), (*newver, newval.clone())); + } + Some(oldver) if oldver < newver => { + values.insert(key.clone(), (*newver, newval.clone())); + } + _ => {} + } + } + State { values } + } } impl Into> for State { @@ -416,6 +584,7 @@ impl Persist for MemoryPersister { let key = format!("{TRACKER_PREFIX}/{key}"); let mut state = self.state.lock().unwrap(); + state.ensure_not_tombstone(&key)?; let v = state.values.get_mut(&key).unwrap(); let tracker: vls_persist::model::ChainTrackerEntry = tracker.into(); *v = (v.0 + 1, serde_json::to_value(tracker).unwrap()); @@ -437,8 +606,19 @@ impl Persist for MemoryPersister { let key = format!("{TRACKER_PREFIX}/{key}"); let state = self.state.lock().unwrap(); + if state.is_tombstone(&key) { + return Err(Error::Internal(format!("tracker {} has been deleted", key))); + } let v: vls_persist::model::ChainTrackerEntry = - serde_json::from_value(state.values.get(&key).unwrap().1.clone()).unwrap(); + serde_json::from_value( + state + .values + .get(&key) + .ok_or_else(|| Error::Internal(format!("missing tracker state {}", key)))? + .1 + .clone(), + ) + .unwrap(); Ok(v.into_tracker(node_id, validator_factory)) } @@ -459,14 +639,16 @@ impl Persist for MemoryPersister { let key = format!("{ALLOWLIST_PREFIX}/{key}"); let mut state = self.state.lock().unwrap(); + state.ensure_not_tombstone(&key)?; match state.values.get_mut(&key) { Some(v) => { *v = (v.0 + 1, serde_json::to_value(allowlist).unwrap()); } None => { + let version = state.next_version(&key); state .values - .insert(key, (0u64, serde_json::to_value(allowlist).unwrap())); + .insert(key, (version, serde_json::to_value(allowlist).unwrap())); } } Ok(()) @@ -476,7 +658,10 @@ impl Persist for MemoryPersister { let state = self.state.lock().unwrap(); let key = hex::encode(node_id.serialize()); let key = format!("{ALLOWLIST_PREFIX}/{key}"); - + if state.is_tombstone(&key) { + return Ok(Vec::new()); + } + // If allowlist doesn't exist (e.g., node created before VLS 0.14), default to empty let allowlist: Vec = match state.values.get(&key) { Some(value) => serde_json::from_value(value.1.clone()).unwrap_or_default(), @@ -495,9 +680,9 @@ impl Persist for MemoryPersister { let node_ids: Vec<&str> = state .values .keys() - .map(|k| k.split('/')) - .filter(|k| k.clone().next().unwrap() == NODE_PREFIX) - .map(|k| k.clone().last().unwrap()) + .filter(|k| k.starts_with(&format!("{NODE_PREFIX}/"))) + .filter(|k| !state.is_tombstone(k)) + .filter_map(|k| k.split('/').last()) .collect(); let mut res = Vec::new(); @@ -506,22 +691,34 @@ impl Persist for MemoryPersister { let state_key = format!("{NODE_STATE_PREFIX}/{node_id}"); let allowlist_key = format!("{ALLOWLIST_PREFIX}/{node_id}"); - let node: vls_persist::model::NodeEntry = - serde_json::from_value(state.values.get(&node_key).unwrap().1.clone()).unwrap(); - let state_e: vls_persist::model::NodeStateEntry = - serde_json::from_value(state.values.get(&state_key).unwrap().1.clone()).unwrap(); + if state.is_tombstone(&node_key) || state.is_tombstone(&state_key) { + continue; + } + + let node: vls_persist::model::NodeEntry = match state.values.get(&node_key) { + Some(value) => serde_json::from_value(value.1.clone()).unwrap(), + None => continue, + }; + let state_e: vls_persist::model::NodeStateEntry = match state.values.get(&state_key) { + Some(value) => serde_json::from_value(value.1.clone()).unwrap(), + None => continue, + }; // Load allowlist, defaulting to empty if not found (for nodes created before VLS 0.14) - let allowlist_strings: Vec = match state.values.get(&allowlist_key) { - Some(value) => serde_json::from_value(value.1.clone()).unwrap_or_default(), - None => Vec::new(), + let allowlist_strings: Vec = if state.is_tombstone(&allowlist_key) { + Vec::new() + } else { + match state.values.get(&allowlist_key) { + Some(value) => serde_json::from_value(value.1.clone()).unwrap_or_default(), + None => Vec::new(), + } }; // Parse allowlist strings into Allowable objects use lightning_signer::node::Allowable; let network = lightning_signer::bitcoin::Network::from_str(&node.network) .map_err(|e| Error::Internal(format!("Invalid network: {}", e)))?; - + let allowlist: Vec = allowlist_strings .into_iter() .filter_map(|s| { @@ -572,3 +769,236 @@ impl Persist for MemoryPersister { [0u8; 16] } } + +#[cfg(test)] +mod tests { + use crate::persist::TOMBSTONE_VERSION; + + use super::{ + State, StateSketch, ALLOWLIST_PREFIX, CHANNEL_PREFIX, NODE_PREFIX, NODE_STATE_PREFIX, + TRACKER_PREFIX, + }; + use serde_json::json; + use std::collections::BTreeMap; + + fn mk_state(entries: Vec<(&str, u64, serde_json::Value)>) -> State { + let mut values = BTreeMap::new(); + for (key, version, value) in entries { + values.insert(key.to_string(), (version, value)); + } + State { values } + } + + fn assert_entry(state: &State, key: &str, expected_version: u64, expected_value: serde_json::Value) { + let (actual_version, actual_value) = state.values.get(key).unwrap_or_else(|| { + panic!("expected state to include key: {key}") + }); + assert_eq!(*actual_version, expected_version); + assert_eq!(actual_value, &expected_value); + } + + fn assert_entry_absent(state: &State, key: &str) { + assert!(state.values.get(key).is_none(), "expected state to omit key {key}"); + } + + fn assert_tombstone(state: &State, key: &str) { + assert_entry(state, key, TOMBSTONE_VERSION, serde_json::Value::Null); + } + + #[test] + fn omit_tombstones_omits_tombstoned_entries() { + let state = mk_state(vec![ + ("k1", 1, json!({"v": 1})), + ("k2", TOMBSTONE_VERSION, serde_json::Value::Null), + ]); + + let filtered = state.omit_tombstones(); + + assert_eq!(filtered.values.len(), 1); + assert_entry(&filtered, "k1", 1, json!({"v": 1})); + assert_entry_absent(&filtered, "k2"); + } + + #[test] + fn diff_state_only_includes_new_or_newer_entries() { + let old = mk_state(vec![ + ("k1", 1, json!({"v": 1})), + ("k2", 2, json!({"v": 2})), + ("k3", 3, json!({"v": 3})), + ]); + let new = mk_state(vec![ + // unchanged version, changed value should still be ignored + ("k1", 1, json!({"v": 999})), + // newer version should be included + ("k2", 3, json!({"v": 22})), + // older version should be ignored + ("k3", 2, json!({"v": 33})), + // brand new key should be included + ("k4", 0, json!({"v": 4})), + ]); + + let diff = old.diff_state(&new); + + assert_eq!(diff.values.len(), 2); + assert_entry(&diff, "k2", 3, json!({"v": 22})); + assert_entry(&diff, "k4", 0, json!({"v": 4})); + assert_entry_absent(&diff, "k1"); + assert_entry_absent(&diff, "k3"); + } + + #[test] + fn sate_diff_with_empty_old_state_includes_all_entries() { + let old = State::new(); + let new = mk_state(vec![ + ("k1", 1, json!({"v": 1})), + ("k2", 2, json!({"v": 2})), + ]); + + let diff = old.diff_state(&new); + + assert_eq!(diff.values.len(), 2); + assert_entry(&diff, "k1", 1, json!({"v": 1})); + assert_entry(&diff, "k2", 2, json!({"v": 2})); + } + + #[test] + fn sketch_diff_matches_state_diff() { + let old = mk_state(vec![ + ("a", 5, json!(1)), + ("b", 2, json!(2)), + ("c", 7, json!(3)), + ]); + let new = mk_state(vec![ + ("a", 5, json!(10)), + ("b", 3, json!(20)), + ("c", 6, json!(30)), + ("d", 1, json!(40)), + ]); + + let state_diff = old.diff_state(&new); + let sketch_diff = old.sketch().diff_state(&new); + + assert_eq!(state_diff.values, sketch_diff.values); + } + + #[test] + fn sketch_diff_with_empty_sketch_includes_all_entries() { + let state = mk_state(vec![ + ("a", 1, json!(1)), + ("b", 2, json!(2)), + ]); + let sketch = StateSketch::new(); + + let diff = sketch.diff_state(&state); + + assert_eq!(diff.values.len(), 2); + assert_entry(&diff, "a", 1, json!(1)); + assert_entry(&diff, "b", 2, json!(2)); + } + + #[test] + fn sketch_apply_follow_version_updates() { + let base = mk_state(vec![("a", 1, json!(1)), ("b", 2, json!(2))]); + let mut sketch = StateSketch::new(); + sketch.apply_state(&base); + + let next = mk_state(vec![("a", 2, json!(10)), ("b", 2, json!(20)), ("c", 0, json!(30))]); + let first_diff = sketch.diff_state(&next); + assert_eq!(first_diff.values.len(), 2); + assert_entry(&first_diff, "a", 2, json!(10)); + assert_entry(&first_diff, "c", 0, json!(30)); + assert_entry_absent(&first_diff, "b"); + + sketch.apply_state(&first_diff); + let second_diff = sketch.diff_state(&next); + assert!(second_diff.values.is_empty()); + } + + #[test] + fn merge_tombstone_deletes_older_live_entry() { + let live_key = format!("{CHANNEL_PREFIX}/abc"); + let mut state = mk_state(vec![(live_key.as_str(), 2, json!({"v": 1}))]); + let incoming = mk_state(vec![(live_key.as_str(), u64::MAX, serde_json::Value::Null)]); + + let res = state.merge(&incoming).unwrap(); + + assert_tombstone(&state, &live_key); + assert_eq!(res.conflict_count, 0); + } + + #[test] + fn merge_ignores_live_entry_if_key_is_tombstone() { + let live_key = format!("{CHANNEL_PREFIX}/abc"); + let mut state = mk_state(vec![(live_key.as_str(), u64::MAX, serde_json::Value::Null)]); + let incoming = mk_state(vec![(live_key.as_str(), 4, json!({"v": 1}))]); + + let res = state.merge(&incoming).unwrap(); + + assert_tombstone(&state, &live_key); + assert_eq!(res.conflict_count, 1); + } + + #[test] + fn merge_ignores_newer_live_entry_if_key_is_tombstone() { + let live_key = format!("{CHANNEL_PREFIX}/abc"); + let mut state = mk_state(vec![(live_key.as_str(), u64::MAX, serde_json::Value::Null)]); + let incoming = mk_state(vec![(live_key.as_str(), 6, json!({"v": 2}))]); + + let res = state.merge(&incoming).unwrap(); + + assert_tombstone(&state, &live_key); + assert_eq!(res.conflict_count, 1); + } + + #[test] + fn safe_merge_reports_conflict_for_outdated_live_version() { + let mut state = mk_state(vec![("k1", 5, json!({"v": 5}))]); + let incoming = mk_state(vec![("k1", 4, json!({"v": 4}))]); + + let res = state.merge(&incoming).unwrap(); + + assert_eq!(res.conflict_count, 1); + assert_entry(&state, "k1", 5, json!({"v": 5})); + } + + #[test] + fn delete_channel_marks_channel_with_tombstone_version() { + let live_key = format!("{CHANNEL_PREFIX}/abc"); + let mut state = mk_state(vec![(live_key.as_str(), 7, json!({"v": 1}))]); + + state.delete_channel("abc"); + + assert_tombstone(&state, &live_key); + } + + #[test] + fn delete_node_creates_tombstones_for_node_related_keys() { + let node_id = "deadbeef"; + let node_key = format!("{NODE_PREFIX}/{node_id}"); + let node_state_key = format!("{NODE_STATE_PREFIX}/{node_id}"); + let allowlist_key = format!("{ALLOWLIST_PREFIX}/{node_id}"); + let tracker_key = format!("{TRACKER_PREFIX}/{node_id}"); + let channel_key = format!("{CHANNEL_PREFIX}/{node_id}cafebabe"); + + let mut state = mk_state(vec![ + (node_key.as_str(), 1, json!({"n": 1})), + (node_state_key.as_str(), 2, json!({"s": 1})), + (allowlist_key.as_str(), 3, json!(["127.0.0.1"])), + (tracker_key.as_str(), 4, json!({"t": 1})), + (channel_key.as_str(), 5, json!({"c": 1})), + ]); + + state.delete_node(node_id).unwrap(); + + for (live_key, old_version) in vec![ + (node_key, 1u64), + (node_state_key, 2u64), + (allowlist_key, 3u64), + (tracker_key, 4u64), + (channel_key, 5u64), + ] { + assert_tombstone(&state, &live_key); + assert!(old_version < u64::MAX); + } + } +} diff --git a/libs/gl-client/src/signer/mod.rs b/libs/gl-client/src/signer/mod.rs index daa858e01..d5522f667 100644 --- a/libs/gl-client/src/signer/mod.rs +++ b/libs/gl-client/src/signer/mod.rs @@ -10,6 +10,9 @@ use crate::pb::PendingRequest; use crate::pb::{node_client::NodeClient, Empty, HsmRequest, HsmRequestContext, HsmResponse}; use crate::runes; use crate::signer::resolve::Resolver; +use crate::metrics::{ + signer_state_response_wire_bytes, savings_percent, +}; use crate::tls::TlsConfig; use crate::{node, node::Client}; use anyhow::{anyhow, Result}; @@ -487,16 +490,27 @@ impl Signer { debug!("Processing request {:?}", req); let diff: crate::persist::State = req.signer_state.clone().into(); - let prestate = { + let (prestate_sketch, prestate_log) = { debug!("Updating local signer state with state from node"); let mut state = self.state.lock().map_err(|e| { Error::Other(anyhow!("Failed to acquire state lock: {:?}", e)) })?; - state.merge(&diff).map_err(|e| { + let merge_res = state.merge(&diff).map_err(|e| { Error::Other(anyhow!("Failed to merge signer state: {:?}", e)) })?; + if merge_res.has_conflicts() { + debug!( + "State merge ignored stale versions (count={})", + merge_res.conflict_count + ); + } trace!("Processing request {}", hex::encode(&req.raw)); - state.clone() + let log_state = serde_json::to_string(&*state).map_err(|e| { + Error::Other(anyhow!("Failed to serialize signer state for logging: {:?}", e)) + })?; + + + (state.sketch(), log_state) }; // The first two bytes represent the message type. Check that @@ -527,10 +541,7 @@ impl Signer { let msg = vls_protocol::msgs::from_vec(req.raw.clone()).map_err(|e| Error::Protocol(e))?; log::debug!("Handling message {:?}", msg); - log::trace!( - "Signer state {}", - serde_json::to_string(&prestate).unwrap_or_else(|_| "".to_string()) - ); + log::trace!("Signer state {}", prestate_log); if let Err(e) = self.authenticate_request(&msg, &ctxrequests) { report::Reporter::report(crate::pb::scheduler::SignerRejection { @@ -628,7 +639,22 @@ impl Signer { let state = self.state.lock().map_err(|e| { Error::Other(anyhow!("Failed to acquire state lock for serialization: {:?}", e)) })?; - state.clone().into() + let full_wire_bytes = { + let full_entries: Vec = state.clone().into(); + signer_state_response_wire_bytes(&full_entries) + }; + let diff_state = prestate_sketch.diff_state(&state); + let diff_entries: Vec = diff_state.into(); + let diff_wire_bytes = signer_state_response_wire_bytes(&diff_entries); + let saved_percent = savings_percent(full_wire_bytes, diff_wire_bytes); + trace!( + "Signer state diff entries={}, wire_bytes={}, full_wire_bytes={}, saved {}% bandwidth syncing the state", + diff_entries.len(), + diff_wire_bytes, + full_wire_bytes, + saved_percent + ); + diff_entries }; Ok(HsmResponse { raw: response.as_vec(), diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index 2fa131714..a9593e662 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -7,7 +7,10 @@ use anyhow::{Context, Error, Result}; use base64::{engine::general_purpose, Engine as _}; use bytes::BufMut; use cln_rpc::Notification; -use gl_client::persist::State; +use gl_client::persist::{State, StateSketch}; +use gl_client::metrics::{ + signer_state_request_wire_bytes, savings_percent, +}; use governor::{ clock::MonotonicClock, state::direct::NotKeyed, state::InMemoryState, Quota, RateLimiter, }; @@ -328,6 +331,7 @@ impl Node for PluginNodeServer { tokio::spawn(async move { trace!("hsmd hsm_id={} request processor started", hsm_id); + let mut last_sent_sketch = StateSketch::new(); { // We start by immediately injecting a @@ -339,9 +343,12 @@ impl Node for PluginNodeServer { // presumably time-critical messages, do not have to carry // the large state with them. - let state = signer_state.lock().await.clone(); - let state: Vec = state.into(); - let state: Vec = state + let state_snapshot = signer_state.lock().await.clone(); + let state_entries: Vec = state_snapshot + .omit_tombstones() + .into(); + let state_wire_bytes = signer_state_request_wire_bytes(&state_entries); + let state_entries: Vec = state_entries .into_iter() .map(|s| pb::SignerStateEntry { key: s.key, @@ -349,14 +356,19 @@ impl Node for PluginNodeServer { value: s.value, }) .collect(); - + trace!( + "Signer state heartbeat to hsm_id={} entries={}, wire_bytes={}", + hsm_id, + state_entries.len(), + state_wire_bytes + ); let msg = vls_protocol::msgs::GetHeartbeat {}; use vls_protocol::msgs::SerBolt; let req = crate::pb::HsmRequest { // Notice that the request_counter starts at 1000, to // avoid collisions. request_id: 0, - signer_state: state, + signer_state: state_entries, raw: msg.as_vec(), requests: vec![], // No pending requests yet, nothing to authorize. context: None, @@ -364,6 +376,8 @@ impl Node for PluginNodeServer { if let Err(e) = tx.send(Ok(req)).await { log::warn!("Failed to send heartbeat message to signer: {}", e); + } else { + last_sent_sketch = state_snapshot.sketch(); } } @@ -384,11 +398,23 @@ impl Node for PluginNodeServer { hsm_id ); - let state = signer_state.lock().await.clone(); - let state: Vec = state.into(); + + let state_snapshot = signer_state.lock().await.clone(); + // Estimate the size of the full state to calculate the bandwidth savings of sending diffs + let full_entries: Vec = + state_snapshot.omit_tombstones().into(); + let full_wire_bytes = signer_state_request_wire_bytes(&full_entries); + + // Send only the changes since the last time we sent state to this signer. + let diff_state = last_sent_sketch.diff_state(&state_snapshot); + let outgoing_entries: Vec = + diff_state.clone().into(); + let outgoing_wire_bytes = signer_state_request_wire_bytes(&outgoing_entries); + let outgoing_entry_count = outgoing_entries.len(); + last_sent_sketch.apply_state(&diff_state); // TODO Consolidate protos in `gl-client` and `gl-plugin`, then remove this map. - let state: Vec = state + let outgoing_entries: Vec = outgoing_entries .into_iter() .map(|s| pb::SignerStateEntry { key: s.key, @@ -397,7 +423,18 @@ impl Node for PluginNodeServer { }) .collect(); - req.request.signer_state = state.into(); + let saved_percent = savings_percent(full_wire_bytes, outgoing_wire_bytes); + trace!( + "Signer state diff to hsm_id={} request_id={} entries={}, wire_bytes={}, full_wire_bytes={}, saved {}% bandwidth syncing the state", + hsm_id, + req.request.request_id, + outgoing_entry_count, + outgoing_wire_bytes, + full_wire_bytes, + saved_percent + ); + + req.request.signer_state = outgoing_entries; req.request.requests = ctx.snapshot().await.into_iter().map(|r| r.into()).collect(); let serialized_configure_request = SERIALIZED_CONFIGURE_REQUEST.lock().await; @@ -446,6 +483,8 @@ impl Node for PluginNodeServer { } eprintln!("WIRE: signer -> plugin: {:?}", req); + // Merge diff entries returned by signer. + // Create a state from the key-value-version tuples. Need to // convert here, since `pb` is duplicated in the two different // crates. @@ -462,12 +501,18 @@ impl Node for PluginNodeServer { // Apply state changes to the in-memory state let mut state = self.signer_state.lock().await; - state.merge(&new_state).map_err(|e| { + let merge_res = state.merge(&new_state).map_err(|e| { Status::new( Code::Internal, format!("Error updating internal state: {e}"), ) })?; + if merge_res.has_conflicts() { + debug!( + "State merge ignored stale versions (count={})", + merge_res.conflict_count + ); + } // Send changes to the signer_state_store for persistence let store = self.signer_state_store.lock().await; diff --git a/libs/proto/glclient/greenlight.proto b/libs/proto/glclient/greenlight.proto index 63dd182c0..76ea1ef12 100644 --- a/libs/proto/glclient/greenlight.proto +++ b/libs/proto/glclient/greenlight.proto @@ -179,8 +179,8 @@ message NodeConfig { // The `GlConfig` is used to pass greenlight-specific startup parameters -// to the node. The `gl-plugin` will look for a serialized config object in -// the node's datastore to load these values from. Please refer to the +// to the node. The `gl-plugin` will look for a serialized config object in +// the node's datastore to load these values from. Please refer to the // individual fields to learn what they do. message GlConfig { string close_to_addr = 1; @@ -232,7 +232,7 @@ message LspInvoiceRequest { // Optional: for discounts/API keys string token = 2; // len=0 => None // Pass-through of cln invoice rpc params - uint64 amount_msat = 3; // 0 => Any + uint64 amount_msat = 3; // 0 => Any string description = 4; string label = 5; }