diff --git a/crates/buzz-acp/src/leader.rs b/crates/buzz-acp/src/leader.rs index 3666ed375..144e70df6 100644 --- a/crates/buzz-acp/src/leader.rs +++ b/crates/buzz-acp/src/leader.rs @@ -47,10 +47,25 @@ use std::collections::HashMap; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; +use std::time::{Duration, Instant}; use serde::Deserialize; +/// Stand-down timeout: how long `acquire()` is suppressed after `stand_down()`. +/// +/// 2× the 5s refresh tick guarantees the target gets at least one full tick to +/// acquire before the old leader resumes. If the target is alive, it acquires +/// within 5s. If the target crashed, the old leader recovers after this timeout +/// — zero-leader window is bounded and self-healing. +#[cfg(not(test))] +const STAND_DOWN_TIMEOUT: Duration = Duration::from_secs(10); + +/// Shortened for tests so we can exercise the auto-expire path without sleeping. +#[cfg(test)] +const STAND_DOWN_TIMEOUT: Duration = Duration::from_millis(5); + /// Environment variable carrying this window's leader-election identity. /// /// Per-window election identity. The desktop may inject a process-unique value @@ -109,6 +124,13 @@ pub struct FileLeaderCheck { /// Cached leader status per agent pubkey hex. Seeded read-through on first /// `is_leader`, refreshed in place by `refresh`. cache: Mutex>, + /// When set, `acquire()` becomes a no-op (returns false without touching + /// the lock file) until the stand-down is cleared by `resume()` or after + /// [`STAND_DOWN_TIMEOUT`] expires. + standing_down: AtomicBool, + /// When the stand-down was entered. Used by `is_standing_down()` to + /// auto-expire the suppression after [`STAND_DOWN_TIMEOUT`]. + stood_down_at: Mutex>, } impl FileLeaderCheck { @@ -149,6 +171,8 @@ impl FileLeaderCheck { instance_id, lock_dir, cache: Mutex::new(HashMap::new()), + standing_down: AtomicBool::new(false), + stood_down_at: Mutex::new(None), } } @@ -171,6 +195,47 @@ impl FileLeaderCheck { } } + /// Returns this instance's election id, or `None` for always-leader mode. + pub fn instance_id(&self) -> Option<&str> { + self.instance_id.as_deref() + } + + /// Enter stand-down: suppress all `acquire()` calls until `resume()` or + /// [`STAND_DOWN_TIMEOUT`] expires. Called when this instance receives a + /// `claim_leadership` targeting a different instance and this instance is + /// the current leader. + pub fn stand_down(&self) { + self.standing_down.store(true, Ordering::Release); + *self.stood_down_at.lock().unwrap_or_else(|e| e.into_inner()) = Some(Instant::now()); + } + + /// Exit stand-down. Called by the target after successful acquire, or + /// automatically when [`STAND_DOWN_TIMEOUT`] expires. + #[allow(dead_code)] + pub fn resume(&self) { + self.standing_down.store(false, Ordering::Release); + *self.stood_down_at.lock().unwrap_or_else(|e| e.into_inner()) = None; + } + + /// Whether this instance is currently standing down (suppressing acquire). + /// Auto-expires after [`STAND_DOWN_TIMEOUT`] to prevent permanent + /// zero-leader on a lost handoff. + fn is_standing_down(&self) -> bool { + if !self.standing_down.load(Ordering::Acquire) { + return false; + } + let mut guard = self.stood_down_at.lock().unwrap_or_else(|e| e.into_inner()); + if let Some(at) = *guard { + if at.elapsed() > STAND_DOWN_TIMEOUT { + // Auto-expire: clear stand-down inline to avoid re-locking. + self.standing_down.store(false, Ordering::Release); + *guard = None; + return false; + } + } + true + } + /// Claim the leader lock for `pubkey_hex`, returning whether this process /// now holds it (and may act as leader). /// @@ -187,6 +252,11 @@ impl FileLeaderCheck { use nix::fcntl::{Flock, FlockArg}; use std::io::{Read, Seek, SeekFrom, Write}; + // Stand-down suppresses re-claim during cooperative handoff. + if self.is_standing_down() { + return false; + } + // No election id → nothing to claim with; mirror the read-side // always-leader contract. let Some(self_id) = self.instance_id.as_deref() else { @@ -308,6 +378,9 @@ impl FileLeaderCheck { /// `#[cfg(not(unix))]` pattern. The desktop targets macOS/Linux only. #[cfg(not(unix))] pub fn acquire(&self, _pubkey_hex: &str) -> bool { + if self.is_standing_down() { + return false; + } true } @@ -684,5 +757,71 @@ mod tests { "exactly one concurrent writer may win the lock" ); } + + #[test] + fn test_stand_down_suppresses_acquire() { + let dir = TmpDir::new(); + let a = checker(&dir, SELF_ID); + let b = checker(&dir, OTHER_ID); + assert!(a.acquire(PUBKEY), "A claims the free lock"); + // A stands down and releases — simulating cooperative handoff. + a.stand_down(); + a.release(PUBKEY); + // A's next tick: acquire returns false (standing down). + assert!( + !a.acquire(PUBKEY), + "standing-down instance must not re-claim" + ); + // B can now take the free lock. + assert!(b.acquire(PUBKEY), "B must acquire the released lock"); + } + + #[test] + fn test_stand_down_auto_expires() { + let dir = TmpDir::new(); + let a = checker(&dir, SELF_ID); + assert!(a.acquire(PUBKEY)); + a.stand_down(); + a.release(PUBKEY); + // acquire is suppressed while standing down. + assert!(!a.acquire(PUBKEY)); + // Wait for the test-configured timeout (5ms) to expire. + std::thread::sleep(std::time::Duration::from_millis(10)); + // After timeout, stand-down auto-expires and acquire succeeds. + assert!( + a.acquire(PUBKEY), + "acquire must succeed after stand-down timeout expires" + ); + } + + #[test] + fn test_cooperative_handoff_transfers_leadership() { + let dir = TmpDir::new(); + let a = checker(&dir, SELF_ID); + let b = checker(&dir, OTHER_ID); + assert!(a.acquire(PUBKEY), "A is leader"); + assert!(!b.acquire(PUBKEY), "B blocked while A holds it"); + + // Cooperative handoff: A stands down + releases. + a.stand_down(); + a.release(PUBKEY); + // B acquires on its next tick. + assert!(b.acquire(PUBKEY), "B must win after A stands down"); + // A's next tick: still standing down, can't re-grab. + assert!( + !a.acquire(PUBKEY), + "A must not re-claim while standing down" + ); + } + + #[test] + fn test_instance_id_accessor() { + let dir = TmpDir::new(); + let with_id = FileLeaderCheck::new(Some("test-id-123".into()), dir.0.clone()); + assert_eq!(with_id.instance_id(), Some("test-id-123")); + + let without_id = FileLeaderCheck::new(None, dir.0.clone()); + assert_eq!(without_id.instance_id(), None); + } } } diff --git a/crates/buzz-acp/src/lib.rs b/crates/buzz-acp/src/lib.rs index 97b1d163e..9e46961d9 100644 --- a/crates/buzz-acp/src/lib.rs +++ b/crates/buzz-acp/src/lib.rs @@ -10,6 +10,7 @@ mod pool; mod queue; mod relay; +use leader::LeaderCheck; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -489,6 +490,8 @@ fn handle_relay_observer_control_event( pool: &mut AgentPool, observer: Option<&observer::ObserverHandle>, owner_pubkey_hex: &str, + leader: &std::sync::Arc, + agent_pubkey_hex: &str, ) { // Defense-in-depth: verify signature even though the relay already checked. if let Err(e) = buzz_core::verify_event(&event) { @@ -527,36 +530,75 @@ fn handle_relay_observer_control_event( }; let command_type = payload.get("type").and_then(|value| value.as_str()); - if command_type != Some("cancel_turn") { - tracing::debug!(payload = %payload, "ignoring unknown observer control frame"); - return; - } - - let Some(channel_id) = payload - .get("channelId") - .and_then(|value| value.as_str()) - .and_then(|value| value.parse::().ok()) - else { - tracing::warn!("observer cancel_turn control frame missing valid channelId"); - return; - }; + if command_type == Some("cancel_turn") { + let Some(channel_id) = payload + .get("channelId") + .and_then(|value| value.as_str()) + .and_then(|value| value.parse::().ok()) + else { + tracing::warn!("observer cancel_turn control frame missing valid channelId"); + return; + }; - let fired = signal_in_flight_task(pool, channel_id, ControlSignal::Cancel); - let status = if fired { "sent" } else { "no_active_turn" }; - if let Some(observer) = observer { - observer.emit( - "control_result", - None, - &observer::ObserverContext { - channel_id: Some(channel_id.to_string()), - session_id: None, - turn_id: None, - }, - serde_json::json!({ - "type": "cancel_turn", - "status": status, - }), - ); + let fired = signal_in_flight_task(pool, channel_id, ControlSignal::Cancel); + let status = if fired { "sent" } else { "no_active_turn" }; + if let Some(observer) = observer { + observer.emit( + "control_result", + None, + &observer::ObserverContext { + channel_id: Some(channel_id.to_string()), + session_id: None, + turn_id: None, + }, + serde_json::json!({ + "type": "cancel_turn", + "status": status, + }), + ); + } + } else if command_type == Some("claim_leadership") { + let target_id = payload.get("targetInstanceId").and_then(|v| v.as_str()); + let self_id = leader.instance_id(); + + match (target_id, self_id) { + (Some(target), Some(self_instance)) if target == self_instance => { + // We are the target — attempt to acquire. + let acquired = leader.acquire(agent_pubkey_hex); + if acquired { + if let Some(obs) = observer { + obs.emit( + "control_result", + None, + &observer::ObserverContext { + channel_id: None, + session_id: None, + turn_id: None, + }, + serde_json::json!({ + "type": "claim_leadership", + "status": "claimed", + }), + ); + } + } + // If !acquired: do nothing. Next 5s tick will succeed + // (old leader standing down). leadership_status stream + // is the desktop's source of truth. + } + (Some(_target), Some(_self_instance)) => { + // We are NOT the target — if we're leader, stand down + release. + if leader.is_leader(agent_pubkey_hex) { + leader.stand_down(); + leader.release(agent_pubkey_hex); + } + } + _ => { + tracing::warn!("claim_leadership frame missing targetInstanceId or no self id"); + } + } + } else { + tracing::debug!(payload = %payload, "ignoring unknown observer control frame"); } } @@ -1364,7 +1406,7 @@ async fn tokio_main() -> Result<()> { match control_event { Some(event) => { if let Some(ref owner_hex) = owner_cache.pubkey { - handle_relay_observer_control_event(&config.keys, event, &mut pool, observer.as_ref(), owner_hex); + handle_relay_observer_control_event(&config.keys, event, &mut pool, observer.as_ref(), owner_hex, &leader, &agent_pubkey_hex); } else { tracing::warn!("observer control frame received but no owner resolved — dropping"); } @@ -1791,6 +1833,20 @@ async fn tokio_main() -> Result<()> { // a restart. leader.acquire(&agent_pubkey_hex); ctx.leader.refresh(); + // Emit leadership_status so the desktop can track which + // instances are alive and who currently leads. + if let (Some(obs), Some(instance_id)) = (observer.as_ref(), leader.instance_id()) { + obs.emit( + "leadership_status", + None, + &observer::ObserverContext { channel_id: None, session_id: None, turn_id: None }, + serde_json::json!({ + "type": "leadership_status", + "instanceId": instance_id, + "isLeader": ctx.leader.is_leader(&agent_pubkey_hex), + }), + ); + } None } _ = shutdown_rx.changed() => { diff --git a/docs/nips/NIP-LE.md b/docs/nips/NIP-LE.md index ecf813cbc..5828ffd12 100644 --- a/docs/nips/NIP-LE.md +++ b/docs/nips/NIP-LE.md @@ -135,6 +135,57 @@ refresh interval (e.g. 10s). A live leader rewrites `claimed_at` on every refres tick, so only an abandoned claim ages past the bound — distinguishing a recycled pid from a genuinely active leader without evicting the latter. +### Cooperative Steal (Manual Leadership Transfer) + +A deliberate steal — initiated from the desktop sidebar UI — uses a +**cooperative** mechanism: the current leader voluntarily stands down rather than +being forcibly evicted. This preserves the split-brain guard (`lock_is_takeable`) +that prevents two leaders from coexisting. + +**Stand-down primitive.** When the current leader receives a `claim_leadership` +control frame targeting a different instance, it enters *stand-down*: a state +that suppresses `acquire()` (returns false without touching the lock file) until +either `resume()` is called or a timeout expires. The leader then releases its +lock, making it free for the target. + +**Timeout.** Stand-down auto-expires after 10 seconds (2× the 5s refresh tick). +This bounds the zero-leader window: if the target instance crashes or fails to +acquire, the old leader resumes and re-claims on its next tick. The worst case +is a brief gap with no leader (bounded, self-healing) — never a split-brain. + +**Handoff sequence:** + +1. Desktop sends a `claim_leadership` control frame (NIP-AO kind 24200) with + `{ "type": "claim_leadership", "targetInstanceId": "" }`. +2. All co-located instances receive the broadcast. +3. The **current leader** (non-target): calls `stand_down()` then `release()`. + Its next refresh tick's `acquire()` returns false (standing down), preventing + re-grab. +4. The **target instance** (match): calls `acquire()`. If the lock is now free + (old leader released), it succeeds and emits a `control_result` with + `status: "claimed"`. If the lock is not yet free (race — old leader hasn't + processed the frame yet), it does nothing; its next 5s refresh tick will + succeed once the old leader stands down. +5. **Other observers** (non-target, non-leader): ignore the frame. + +**Failure direction.** Cooperative steal never bypasses the live-leader guard. +The worst case is zero leaders briefly (stand-down entered but target didn't +acquire), which self-heals via the 10s timeout. This is preferable to force-steal +which would risk two leaders briefly. + +### Scope Boundary + +Leader election is **single-host only**. The lock file lives at +`~/.buzz/leader-locks/.lock` — a local filesystem artifact. Instances on +different machines have different `~/.buzz/` directories and cannot see each +other's locks. + +The `claim_leadership` control frame (sent via the relay) reaches all subscribed +instances regardless of host, but the `acquire()` that follows only contends with +co-located processes sharing the same lock file. Cross-machine coordination +(relay-mediated lock or consensus) is a separate future change, explicitly out of +scope. + Hard-steal: claiming an agent in window B immediately aborts window A's in-flight turn. The abort reuses NIP-AO's `cancel_turn` control frame (kind 24200); this NIP does not define a separate cancel mechanism. See NIP-AO