Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions hyperdb-api-core/src/client/async_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ impl AsyncClient {
let sock = socket2::SockRef::from(&tcp_stream);
sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
sock.set_send_buffer_size(4 * 1024 * 1024).ok();
// TCP keepalive: detect a half-open peer (laptop sleep, network blip,
// a hyperd that vanished without a FIN) in ~90s instead of the 2h OS
// idle default. See the rationale on `super::client::apply_tcp_keepalive`
// (the sync mirror). Best-effort: a rejected knob leaves OS defaults.
{
let keepalive = socket2::TcpKeepalive::new()
.with_time(std::time::Duration::from_secs(60))
.with_interval(std::time::Duration::from_secs(10));
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
let keepalive = keepalive.with_retries(3);
sock.set_tcp_keepalive(&keepalive).ok();
}

let stream = AsyncStream::tcp(tcp_stream);
let mut connection = AsyncRawConnection::new(stream);
Expand Down
25 changes: 25 additions & 0 deletions hyperdb-api-core/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,33 @@

use std::net::TcpStream;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;

use tracing::{debug, info, trace, warn};

/// Enable TCP keepalive on a connection socket so a half-open peer (laptop
/// sleep, network blip, a hyperd that vanished without a FIN) is detected in
/// ~90s instead of blocking a blocking `read()` until the OS default idle
/// timeout (7200s / 2h on macOS and Linux).
///
/// This matters most for long-lived idle connections — e.g. an MCP client
/// holding a connection to a resident daemon's hyperd across a laptop suspend.
/// Without keepalive, the next query on a silently-dead socket hangs for hours.
///
/// Tuning: 60s idle before the first probe, 10s between probes, 3 probes →
/// the peer is declared dead ~90s after it goes silent. Probe count is only
/// honored on platforms whose `socket2` build exposes `with_retries`; macOS
/// honors idle+interval. All calls are best-effort (`.ok()`): a kernel that
/// rejects a knob leaves the connection working at OS defaults.
fn apply_tcp_keepalive(sock: &socket2::SockRef<'_>) {
let keepalive = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(60))
.with_interval(Duration::from_secs(10));
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
let keepalive = keepalive.with_retries(3);
sock.set_tcp_keepalive(&keepalive).ok();
}

#[cfg(unix)]
use std::os::unix::net::UnixStream;

Expand Down Expand Up @@ -192,6 +216,7 @@
let sock = socket2::SockRef::from(&tcp_stream);
sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
sock.set_send_buffer_size(4 * 1024 * 1024).ok();
apply_tcp_keepalive(&sock);

let stream = SyncStream::tcp(tcp_stream);
let mut connection = RawConnection::new(stream);
Expand Down Expand Up @@ -307,7 +332,7 @@
#[cfg(windows)]
pub fn connect_named_pipe(pipe_path: &str, config: &Config) -> Result<Self> {
use std::fs::OpenOptions;
use std::time::{Duration, Instant};

Check warning on line 335 in hyperdb-api-core/src/client/client.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest)

the item `Duration` is imported redundantly

Check warning on line 335 in hyperdb-api-core/src/client/client.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest)

the item `Duration` is imported redundantly

Check warning on line 335 in hyperdb-api-core/src/client/client.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest)

the item `Duration` is imported redundantly

Check warning on line 335 in hyperdb-api-core/src/client/client.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest)

the item `Duration` is imported redundantly

Check warning on line 335 in hyperdb-api-core/src/client/client.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest)

the item `Duration` is imported redundantly

Check warning on line 335 in hyperdb-api-core/src/client/client.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest)

the item `Duration` is imported redundantly

info!(
target: "hyperdb_api",
Expand Down
14 changes: 14 additions & 0 deletions hyperdb-mcp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ All notable changes to the `hyperdb-mcp` crate will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/),
and this project adheres to [Semantic Versioning](https://semver.org/).

## [Unreleased]

### Fixed

- **TCP keepalive on the `hyperd` connection.** Connections to `hyperd` now
enable TCP keepalive (60s idle, 10s interval, ~90s to declare a dead peer)
instead of relying on the OS default 2-hour idle timeout. Without it, a
long-lived idle connection that went half-open — laptop sleep, a network
blip, or a `hyperd` that vanished without sending a FIN — would make the
next query (including the `status` tool) block for up to two hours instead
of failing fast and reconnecting. This is most visible since the daemon
became resident-by-default in 0.5.0, which made long-lived idle connections
the norm. (Fixed in `hyperdb-api-core` for both the sync and async clients.)

## [0.5.0] - 2026-06-07

### Fixed
Expand Down
25 changes: 24 additions & 1 deletion hyperdb-mcp/src/daemon/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,30 @@ pub fn ensure_daemon(scan: PortScan) -> io::Result<DaemonInfo> {
ScanOutcome::FreePort(port) => {
info!(port, "no running daemon detected, spawning on free port");
spawn_detached(port)?;
wait_for_daemon()
let info = wait_for_daemon()?;
// If the daemon we just spawned bound a port above the scan base (because
// concurrent clients raced and one of them grabbed the base port first),
// prefer the lower-port daemon so we don't accumulate redundant
// daemon+hyperd pairs on adjacent ports. The lower-port daemon wins
// because it bound first and is the canonical single instance.
if info.health_port > scan.base {
let lower_scan = PortScan {
base: scan.base,
span: info.health_port.saturating_sub(scan.base),
};
if let ScanOutcome::Found(lower_info) = discovery::scan_for_daemon(lower_scan) {
debug!(
prefer_port = lower_info.health_port,
stop_port = info.health_port,
"found lower-port daemon from concurrent spawn; stopping off-base daemon"
);
// Best-effort STOP — if it fails the off-base daemon idles
// harmlessly (it has no clients and will only cost background CPU).
let _ = super::health::send_command(info.health_port, "STOP");
return Ok(*lower_info);
}
}
Ok(info)
}
ScanOutcome::AllOccupied => Err(io::Error::new(
io::ErrorKind::AddrInUse,
Expand Down
34 changes: 34 additions & 0 deletions hyperdb-mcp/tests/daemon_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,40 @@ fn discover_finds_live_daemon() {
assert_eq!(discovered.health_port, port);
}

// ─── Unit tests: concurrent-spawn dedup (no env vars, safe parallel) ────────────

#[test]
fn scan_for_daemon_prefers_lower_port_daemon() {
// Simulates the concurrent-spawn race: two daemons ended up on adjacent ports
// (base and base+1). The client that landed on base+1 should prefer the
// base-port daemon when it re-scans the lower range.
//
// We model this by starting two real health listeners (which answer PING with
// the identifying token), then asserting that scan_for_daemon with a 2-port
// range returns the LOWER port's daemon as Found.
let (lower_port, _lower_handle, _lower_state) = start_health_listener();
let (higher_port, _higher_handle, _higher_state) = start_health_listener();

// Make sure lower_port < higher_port for a predictable result.
let (base, _top) = if lower_port < higher_port {
(lower_port, higher_port)
} else {
(higher_port, lower_port)
};

let scan = PortScan { base, span: 2 };

match discovery::scan_for_daemon(scan) {
discovery::ScanOutcome::Found(info) => {
assert_eq!(
info.health_port, base,
"scan should return the LOWEST-port daemon (the first one bound)"
);
}
other => panic!("expected Found, got {other:?}"),
}
}

// ─── Unit tests: Version takeover decision (no env vars, safe parallel) ─────────

#[test]
Expand Down
Loading