From 9fcc2722dd79989ba6d6df6118e3ff22ada221da Mon Sep 17 00:00:00 2001 From: cDc Date: Sat, 30 May 2026 22:51:45 +0300 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20fold=20resume=20into=20`send`=20?= =?UTF-8?q?=E2=80=94=20auto-detect=20prior=20incomplete=20transfers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-running the identical `send` of an interrupted transfer now auto-detects the prior on-disk state and continues it; the explicit `resume` subcommand is removed. Detection is keyed by (peer fingerprint, file list) rather than a random UUID: - FolderTransferState records the negotiated peer_fingerprint; send_path stamps it and persists a checkpoint on every file completion (so an abrupt kill, not just a recoverable network error, leaves resumable state). - New p2p-cli util helpers: default_state_dir() (per-user data dir via the directories crate) and find_resumable_state() (strict, order-independent match on every file's (path, size, mtime); newest-wins on conflict). - handle_send locates resumable state after pairing and logs "Resuming transfer …"; --no-resume forces a fresh transfer. - enumerate_files extracted in p2p-core so the live send and resume-detection build identical file lists. Removed: the resume subcommand, p2p-cli/src/resume.rs, and the Resume CLI variant/dispatch. Resume state moved from CWD to a per-user data dir by default (--state-dir still overrides). Tests: rewrote the rendezvous disconnect/resume integration test to drive phase 2 through handle_send; added find_resumable_state unit tests for every match/reject branch. Docs (README/DESIGN/CHANGELOG/AGENTS) and the stress.sh T10 smoke step updated for the new flow. Co-Authored-By: Claude Opus 4.8 (1M context) --- AGENTS.md | 5 +- CHANGELOG.md | 17 ++ Cargo.toml | 6 +- DESIGN.md | 22 +- README.md | 39 +-- p2p-cli/AGENTS.md | 10 +- p2p-cli/Cargo.toml | 1 + p2p-cli/src/cli.rs | 44 +-- p2p-cli/src/lib.rs | 24 +- p2p-cli/src/resume.rs | 190 ------------ p2p-cli/src/send.rs | 95 +++++- p2p-cli/src/util.rs | 282 ++++++++++++++++-- p2p-core/src/session.rs | 28 ++ p2p-core/src/transfer_folder.rs | 156 +++++----- smoke/src/stress.sh | 27 +- tests/rendezvous_disconnect_resume_test.rs | 330 +++++++++++---------- 16 files changed, 726 insertions(+), 550 deletions(-) delete mode 100644 p2p-cli/src/resume.rs diff --git a/AGENTS.md b/AGENTS.md index f13ce4a..e29d8a4 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -26,8 +26,7 @@ cargo build --release --features gui --no-default-features ./target/release/p2p-transfer receive --output ./downloads --port 14567 --auto-accept ./target/release/p2p-transfer receive --output ./downloads --rendezvous host:14570 --code ABC123 ./target/release/p2p-transfer discover -./target/release/p2p-transfer resume --path --peer --peer-fingerprint -./target/release/p2p-transfer resume --path --rendezvous host:14570 --code ABC123 +# Resume is automatic: re-run the same `send` to continue an interrupted transfer (or --no-resume to start over) ./target/release/p2p-transfer nat-test ./target/release/p2p-transfer nat-test --rendezvous host:14570 # self-loop punch test ./target/release/p2p-transfer history @@ -184,7 +183,7 @@ python3 test_transfer.py --size 50 --compressible # ratio > 100× - **Don't nest Tokio runtimes.** Anything that calls `Iced::run` must be reached *outside* `block_on`; that's why `run_cli_sync` returns early for the GUI cases. - **The QUIC bidi control stream only materialises on the responder once the initiator writes to it.** Real handshake code does this immediately; tests that don't exchange messages must either send a marker first or use the same `oneshot` "hold the connection" pattern the existing tests use. - **Adaptive compression accounting**: track uncompressed size from `chunk_data.len()` *before* compression, not from the compressed payload, otherwise stats and SHA-256 boundaries break. -- **Resume state files** are written as `transfer_.json` in the working directory at the time of the transfer. Resume requires the original `--path`, `--peer`, and `--peer-fingerprint` because the file doesn't store any of them. +- **Resume is automatic — there is no `resume` subcommand.** Re-running the same `send` finds a prior incomplete `transfer_.json` by `(peer fingerprint, file list)` and continues it; `--no-resume` forces a fresh transfer. The state file records the negotiated `peer_fingerprint`, and lookup matches the source's `(path, size, mtime)` strictly. State lives in a per-user data dir by default (`p2p-cli`'s `default_state_dir`); `--state-dir` overrides. - **Receiver event loop**: the receiver stays alive after a transfer finishes and accepts further transfers on the same connection until the peer disconnects — don't add logic that exits after the first transfer. - **Chunk indices are `u64` end-to-end**. `ChunkReader::total_chunks`, `read_chunk`, `fold_chunk`, `ChunkWriter::write_chunk` and the wire format all use `u64`. Do not narrow back to `u32` anywhere on the chunk path — that's what previously truncated large files at `2^32` chunks. - **Sanitize before joining paths.** Anything written under the output directory goes through `transfer_folder::sanitize_relative_path` first — adding a new write site means routing it through the same sanitizer. diff --git a/CHANGELOG.md b/CHANGELOG.md index f3a94a9..15fc61e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed — 2026-05-30 — Resume folded into `send` +- Resume is no longer a separate subcommand — the `resume` command is + **removed**. Re-running the same `send` auto-detects and continues a + prior incomplete transfer to the same peer. +- Detection is keyed by `(peer fingerprint, file list)`: the state file + now records the negotiated peer fingerprint, and `send` enumerates the + source and matches it strictly on every file's `(path, size, mtime)`. + Any drift starts a fresh transfer; `--no-resume` forces one. +- Resume state moved from the current working directory to a per-user + data dir by default (`%APPDATA%\p2p-transfer\state`, + `$XDG_DATA_HOME/p2p-transfer/state`, or + `~/Library/Application Support/p2p-transfer/state`), so a re-run from + any directory finds it. `--state-dir` still overrides. +- `send_path` now persists a checkpoint each time a file completes, so an + abrupt kill (not just a recoverable network error) leaves resumable + state. + ### Fixed — 2026-05-23 — Security & robustness audit (16 findings) Landed all 16 findings from a code review on the `quic` branch (4 diff --git a/Cargo.toml b/Cargo.toml index 46cbe85..de07a58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,12 +32,16 @@ anyhow = "1.0" # punch primitives directly (no STUN, no NAT — pure localhost smoke). p2p-rendezvous = { path = "./p2p-rendezvous" } # Used by tests/rendezvous_disconnect_resume_test.rs to exercise the -# CLI-level send/receive/resume handlers end-to-end through a localhost +# CLI-level send/receive handlers end-to-end through a localhost # rendezvous. p2p-cli = { path = "./p2p-cli" } tokio = { version = "1.40", features = ["full"] } tempfile = "3.12" sha2 = "0.10" +# Capture the "Resuming transfer" log line emitted by handle_send so the +# disconnect-resume test can assert phase 2 resumed rather than restarted. +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["fmt"] } [features] # Default: CLI only (small binary, ~5-10 MB) diff --git a/DESIGN.md b/DESIGN.md index 7c0805a..a3f99c2 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -142,11 +142,23 @@ discovery toggle use this to pick the first responding peer. ## Resume -Chunk-level resume uses `state::TransferState` (a `BitVec` of completed -chunk indices per file) persisted to JSON. `P2PSession::send_path` loops -on a recoverable error (network/timeout/QUIC), re-establishes the -connection via `reconnect()`, and re-runs the folder send — which skips -any chunk index already in the bitmap. +Resume state is a `FolderTransferState` (the per-file completed-chunk +bitmap, the completed-files index, and a snapshot of the negotiated +`ConfigMessage`) persisted to `transfer_.json`. The file also +records the **peer fingerprint** it was negotiated with. `send_path` +persists a fresh checkpoint each time a file completes (so an abrupt +kill still leaves resumable state) and on every recoverable-error retry; +on retry it re-establishes the connection via `reconnect()` and re-runs +the folder send, skipping any chunk index already in the bitmap. + +There is no `resume` subcommand. Re-running the identical `send` finds +the prior state by `(peer_fingerprint, file list)` rather than by UUID: +`p2p-cli`'s `find_resumable_state` enumerates the source as a fresh send +would, then scans the per-user state dir for a `transfer_*.json` whose +stamped peer matches and whose recorded `(path, size, mtime)` list +matches the source exactly. A strict match resumes; any drift (or +`--no-resume`) starts fresh. State files live in a per-user data dir by +default so a re-run from any working directory still finds them. ## Bandwidth diff --git a/README.md b/README.md index bea6852..9fb66a9 100644 --- a/README.md +++ b/README.md @@ -23,9 +23,10 @@ NATs. Ships with a CLI and an optional Iced GUI. * **Relay fallback** — symmetric NATs that can't be punched directly fall through to a UDP forwarder; QUIC TLS still terminates end-to-end (the relay sees ciphertext only). -* **Resume** — chunk-level bitmap persisted per transfer; reconnects - pick up where they left off. Chunk indices are `u64` end-to-end — - very large files transfer correctly. +* **Resume** — interrupted transfers are persisted per (peer, source); + re-running the same `send` automatically continues where it left off + (`--no-resume` forces a fresh start). Chunk indices are `u64` + end-to-end — very large files transfer correctly. * **Integrity** — per-file SHA-256 exchanged both ways; receiver mismatch is a hard failure (no silent acceptance). * **Path safety** — every incoming relative path is sanitized; the @@ -72,6 +73,11 @@ p2p-transfer send ./bigfile.bin \ `--peer-fingerprint` is required and is the 64-hex-char SHA-256 of the receiver's cert (printed when the receiver starts up). +If a previous `send` of the same source to the same peer was interrupted, +`send` automatically picks up where it left off; pass `--no-resume` to +force a fresh transfer. Resume state lives in a per-user directory by +default (override with `--state-dir`). + ### Send (LAN auto-discovery) ``` @@ -203,33 +209,6 @@ installed copy and only restarts the service when it actually changed, so no-op re-runs don't interrupt active pairings. A `clean-build` + later `install` works fine — cargo just rebuilds `target/` from scratch. -### Resume - -`resume` accepts the same pairing flags as `send`/`receive` — either -direct addressing or rendezvous-mediated. Pick whichever matches how the -original `send` reached the peer. - -``` -# Direct (same LAN, or a stable port-forwarded receiver) -p2p-transfer resume \ - --path ./bigfile.bin \ - --peer 192.168.1.42:14567 \ - --peer-fingerprint - -# Cross-NAT (the receiver is still listening through the same rendezvous + code) -p2p-transfer resume \ - --path ./bigfile.bin \ - --rendezvous rendezvous.example.com:14570 \ - --code ABC123 -``` - -Reads `transfer_.json` (written when a transfer is -interrupted) and continues from the chunk bitmap. The state file lives -in the working directory where the transfer started — pass -`--state-dir` if you started the original `send` from somewhere else. -The original `--path` and pairing flags aren't stored, so you have to -supply them again on resume. - ### History ``` diff --git a/p2p-cli/AGENTS.md b/p2p-cli/AGENTS.md index 053ab49..42a26b3 100644 --- a/p2p-cli/AGENTS.md +++ b/p2p-cli/AGENTS.md @@ -19,14 +19,20 @@ If you add a new command, add it to the `Commands` enum in `cli.rs` and its matc src/ ├── lib.rs # run_cli_sync, run_cli_async, init_logging ├── cli.rs # clap definitions: Cli, Commands, SessionParams, TransferParams -├── send.rs # handle_send +├── send.rs # handle_send (auto-resumes a prior interrupted send) ├── receive.rs # handle_receive ├── discover.rs # handle_discover ├── nat_test.rs # handle_nat_test -├── resume.rs # handle_resume +├── util.rs # base-name + resume-state location (find_resumable_state) └── history.rs # handle_history ``` +There is no `resume` command. A re-run of `send` finds a prior incomplete +transfer via `util::find_resumable_state` — matching the source against a +`transfer_*.json` by `(peer fingerprint, file list)` — and continues it. +`--no-resume` forces a fresh transfer; `--state-dir` overrides the +per-user default state location. + Each command module exposes a single `handle_*` entry point taking the parsed args. Keep CLI translation (prompts, progress bars, formatting) in these files; push protocol/transfer logic into `p2p-core`. ## Shared arg groups diff --git a/p2p-cli/Cargo.toml b/p2p-cli/Cargo.toml index 9dc1f9e..0e49067 100644 --- a/p2p-cli/Cargo.toml +++ b/p2p-cli/Cargo.toml @@ -20,6 +20,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } console = "0.15" dialoguer = "0.11" chrono = "0.4" +directories = "5" [dev-dependencies] tempfile = "3.12" diff --git a/p2p-cli/src/cli.rs b/p2p-cli/src/cli.rs index cd28b09..2f617e3 100644 --- a/p2p-cli/src/cli.rs +++ b/p2p-cli/src/cli.rs @@ -154,13 +154,21 @@ pub enum Commands { /// File or folder to send path: PathBuf, - /// Directory to write the resume state file into. Defaults to the - /// current working directory. Pass an absolute path here so - /// `p2p-transfer resume --state-dir ` works regardless - /// of where the user runs the resume command from. + /// Directory holding resume state files. Defaults to a per-user + /// location (`%APPDATA%\p2p-transfer\state` on Windows, + /// `$XDG_DATA_HOME/p2p-transfer/state` on Linux, + /// `~/Library/Application Support/p2p-transfer/state` on macOS) so + /// a re-run from any working directory finds a prior incomplete + /// transfer. Override only if you want the state kept elsewhere. #[arg(long)] state_dir: Option, + /// Force a fresh transfer even if a matching incomplete transfer + /// to the same peer exists. Use when the source content changed in + /// a way the size+mtime resume check can't detect. + #[arg(long)] + no_resume: bool, + #[command(flatten)] session: SessionParams, @@ -213,34 +221,6 @@ pub enum Commands { rendezvous: Option, }, - /// Resume a previous transfer - /// - /// Reconnects to the original receiver and continues from the last - /// persisted chunk boundary. Use the same pairing flags you used for - /// the original `send`: either `--peer` + `--peer-fingerprint` (direct - /// mode) or `--rendezvous` + `--code` (cross-NAT). - Resume { - /// Transfer ID to resume (or state file path) - transfer_id: String, - - /// Original file or folder path to resume from - #[arg(long)] - path: PathBuf, - - /// Directory the resume state file lives in. Must match whatever - /// `--state-dir` the original `send` used; defaults to the current - /// working directory. - #[arg(long)] - state_dir: Option, - - /// Max reconnect attempts after a connection drop (0 = retry forever) - #[arg(long, default_value = "5")] - max_reconnect_attempts: u32, - - #[command(flatten)] - session: SessionParams, - }, - /// View transfer history History { /// Show only recent N transfers diff --git a/p2p-cli/src/lib.rs b/p2p-cli/src/lib.rs index 5e226f8..5d9fa4e 100644 --- a/p2p-cli/src/lib.rs +++ b/p2p-cli/src/lib.rs @@ -5,9 +5,8 @@ //! - `send`: Send operations for files and folders //! - `receive`: Receive operations //! - `discover`: Peer discovery functionality -//! - `resume`: Resume interrupted transfers -// `cli`, `send`, `receive`, and `resume` are `pub` so the workspace-level +// `cli`, `send`, and `receive` are `pub` so the workspace-level // integration test in `tests/rendezvous_disconnect_resume_test.rs` can // drive the same handler functions the binary dispatches to. The rest // stay private — they're not stable surface for external consumers. @@ -17,7 +16,6 @@ mod history; mod nat_test; pub mod receive; mod rendezvous; -pub mod resume; pub mod send; mod util; @@ -132,10 +130,11 @@ async fn run_cli_async(cli: Cli) -> Result<()> { Some(cli::Commands::Send { path, state_dir, + no_resume, session, transfer, }) => { - send::handle_send(path, state_dir, session, transfer, identity_dir).await?; + send::handle_send(path, state_dir, no_resume, session, transfer, identity_dir).await?; } Some(cli::Commands::Receive { output, @@ -153,23 +152,6 @@ async fn run_cli_async(cli: Cli) -> Result<()> { }) => { nat_test::handle_nat_test(stun_server, rendezvous).await?; } - Some(cli::Commands::Resume { - transfer_id, - path, - state_dir, - max_reconnect_attempts, - session, - }) => { - resume::handle_resume( - transfer_id, - path, - state_dir, - max_reconnect_attempts, - session, - identity_dir, - ) - .await?; - } Some(cli::Commands::History { limit, direction, diff --git a/p2p-cli/src/resume.rs b/p2p-cli/src/resume.rs deleted file mode 100644 index 9105f46..0000000 --- a/p2p-cli/src/resume.rs +++ /dev/null @@ -1,190 +0,0 @@ -//! Resume operations. - -use std::path::PathBuf; -use std::sync::Arc; - -use anyhow::Result; -use tokio::signal; -use tracing::{debug, info, warn}; - -use p2p_core::{ - identity::Identity, progress::ProgressState, reconnect::ReconnectConfig, - transfer_folder::FolderTransferState, Uuid, -}; - -use crate::cli::SessionParams; -use crate::rendezvous::establish_session; - -pub async fn handle_resume( - transfer_id: String, - path: PathBuf, - state_dir: Option, - max_reconnect_attempts: u32, - session_params: SessionParams, - identity_dir: Option, -) -> Result<()> { - info!("Resuming transfer"); - info!(" Transfer ID: {}", transfer_id); - info!(" Path: {}", path.display()); - - if !path.exists() { - anyhow::bail!("Path does not exist: {}", path.display()); - } - - let state_path = crate::util::resolve_state_file(state_dir.as_deref(), &transfer_id)?; - if !state_path.exists() { - anyhow::bail!( - "State file not found: {}. (If the original `send` ran with --state-dir, pass the same value here.)", - state_path.display() - ); - } - - info!("Loading transfer state..."); - let state = FolderTransferState::load_from_file(&state_path).await?; - debug!( - "Progress: {}/{} files ({:.1}%)", - state.completed_files.len(), - state.files.len(), - state.progress_percentage() - ); - - let identity = Arc::new(Identity::load_or_generate(identity_dir.as_deref())?); - - info!("Reconnecting to peer..."); - // Resume the original negotiated config — using ConfigMessage::default - // here would mis-align the .partial on disk because the receiver and - // ChunkWriter compute offsets from this chunk_size. - let mut session = establish_session( - &session_params, - "client", - identity, - Uuid::new_v4(), - Some(state.config.clone()), - ) - .await?; - info!("Session established"); - - let mut progress = ProgressState::new(state.total_bytes); - progress.add_bytes(state.transferred_bytes); - - let reconnect_config = ReconnectConfig { - max_attempts: max_reconnect_attempts, - ..Default::default() - }; - - info!("Resuming folder transfer..."); - tokio::select! { - result = session.send_path(&path, &reconnect_config, Some(&state_path), Some(&mut progress)) => { - result?; - let _ = tokio::fs::remove_file(&state_path).await; - info!("Transfer resumed and completed!"); - } - _ = signal::ctrl_c() => { - // The on-disk state is up to date as of the last completed - // file (sender persists per-file via the FolderTransferSession - // state callback wired in send_path's error path). Chunks - // completed mid-file since the last file boundary will be - // re-sent on the next resume. - warn!("Transfer interrupted. State persisted up to the most recent file boundary."); - warn!( - "Re-run the same `p2p-transfer resume` command to continue from where this stopped." - ); - } - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - fn empty_session_params() -> SessionParams { - SessionParams { - role: None, - peer: Some("127.0.0.1:1".into()), - peer_fingerprint: Some("0".repeat(64)), - port: 14567, - discover: false, - rendezvous: None, - code: None, - force_relay: false, - } - } - - #[tokio::test] - async fn rejects_nonexistent_path() { - let tid = Uuid::new_v4().to_string(); - let result = handle_resume( - tid, - PathBuf::from("definitely/does/not/exist"), - None, - 1, - empty_session_params(), - None, - ) - .await; - let err = result.expect_err("nonexistent path must error").to_string(); - assert!(err.contains("does not exist"), "got: {err}"); - } - - /// Finding 3.4: when --state-dir is supplied, handle_resume reads - /// the state file from there rather than the current working - /// directory. Without the flag, users who `cd`-ed between failure - /// and resume saw "State file not found" with no recovery hint. - #[tokio::test] - async fn finds_state_file_via_state_dir_flag_from_unrelated_cwd() { - let tmp = tempfile::tempdir().unwrap(); - let state_dir = tmp.path().join("state"); - let file_path = tmp.path().join("payload.bin"); - tokio::fs::write(&file_path, b"hi").await.unwrap(); - - // The state file just needs to exist for handle_resume to get - // past the early "State file not found" bail; it will then fail - // later trying to deserialise — but that's after we've proven - // the path resolution honours --state-dir. - let tid = Uuid::new_v4().to_string(); - tokio::fs::create_dir_all(&state_dir).await.unwrap(); - tokio::fs::write( - state_dir.join(format!("transfer_{tid}.json")), - b"{}", // empty JSON object — will fail to deserialise later - ) - .await - .unwrap(); - - let result = handle_resume( - tid, - file_path, - Some(state_dir.clone()), - 1, - empty_session_params(), - None, - ) - .await; - - let err = result - .expect_err("should fail later for unrelated reasons") - .to_string(); - assert!( - !err.contains("State file not found"), - "--state-dir must let resume locate the file; got: {err}" - ); - } - - #[tokio::test] - async fn accepts_file_path() { - let tmp = tempfile::tempdir().unwrap(); - let file_path = tmp.path().join("payload.bin"); - tokio::fs::write(&file_path, b"hello").await.unwrap(); - - let tid = Uuid::new_v4().to_string(); - let result = handle_resume(tid, file_path, None, 1, empty_session_params(), None).await; - let err = result - .expect_err("no state file → should error later") - .to_string(); - assert!( - !err.contains("not a directory"), - "resume must accept file paths; got: {err}" - ); - } -} diff --git a/p2p-cli/src/send.rs b/p2p-cli/src/send.rs index 8d0a7f3..da7edb3 100644 --- a/p2p-cli/src/send.rs +++ b/p2p-cli/src/send.rs @@ -17,11 +17,12 @@ use p2p_core::{ use crate::cli::{SessionParams, TransferParams}; use crate::rendezvous::establish_session; -use crate::util::{derive_base_name, resolve_state_file}; +use crate::util::{default_state_dir, derive_base_name, find_resumable_state, resolve_state_file}; pub async fn handle_send( path: PathBuf, state_dir: Option, + no_resume: bool, session_params: SessionParams, transfer_params: TransferParams, identity_dir: Option, @@ -72,18 +73,83 @@ pub async fn handle_send( hex::encode(session.peer_fingerprint()) ); + let peer_fp = session.peer_fingerprint(); let peer_addr = session.peer_addr().to_string(); + // Resume bridge: with no `--state-dir`, prior state lives in a stable + // per-user location so a re-run from any working directory finds it. + let state_dir = state_dir.unwrap_or_else(default_state_dir); + + // Auto-detect a prior incomplete transfer of this exact source to this + // exact peer and pick up where it left off. `--no-resume` forces a + // fresh transfer (e.g. when the source content changed in a way the + // size+mtime check can't see). + let (transfer_id, state_file, resume_from_bytes) = + match resolve_resume(&state_dir, &path, peer_fp, no_resume).await? { + Some((id, file, bytes)) => (id, file, bytes), + None => { + let id = Uuid::new_v4(); + ( + id, + resolve_state_file(Some(&state_dir), &id.to_string())?, + 0, + ) + } + }; + tokio::select! { - result = send(&mut session, &path, state_dir.as_deref(), transfer_params.max_reconnect_attempts, &peer_addr) => result, + result = send( + &mut session, + &path, + &state_file, + transfer_id, + resume_from_bytes, + transfer_params.max_reconnect_attempts, + &peer_addr, + ) => result, _ = signal::ctrl_c() => Err(anyhow::anyhow!("Transfer interrupted by user (Ctrl+C)")), } } +/// Returns `Some((transfer_id, state_file, already_transferred_bytes))` +/// when a prior incomplete transfer should be resumed, or `None` for a +/// fresh transfer (also `None` when `--no-resume` is set). +async fn resolve_resume( + state_dir: &Path, + path: &Path, + peer_fp: [u8; 32], + no_resume: bool, +) -> Result> { + if no_resume { + return Ok(None); + } + let Some((existing_path, existing_state)) = + find_resumable_state(state_dir, path, peer_fp).await? + else { + return Ok(None); + }; + let pct = 100 * existing_state.transferred_bytes / existing_state.total_bytes.max(1); + info!( + "Resuming transfer {} ({}/{} bytes, {}% done)", + existing_state.transfer_id, + existing_state.transferred_bytes, + existing_state.total_bytes, + pct, + ); + Ok(Some(( + existing_state.transfer_id, + existing_path, + existing_state.transferred_bytes, + ))) +} + +#[allow(clippy::too_many_arguments)] async fn send( session: &mut P2PSession, path: &Path, - state_dir: Option<&Path>, + state_file: &Path, + transfer_id: Uuid, + resume_from_bytes: u64, max_reconnect_attempts: u32, peer_addr: &str, ) -> Result<()> { @@ -94,9 +160,12 @@ async fn send( info!("Sending folder: {}", base_name); } - let transfer_id = Uuid::new_v4(); - let state_file = resolve_state_file(state_dir, &transfer_id.to_string())?; let mut progress = p2p_core::progress::ProgressState::new(0); + // Pre-account bytes a prior session already moved so the progress bar + // (and the recorded total) start at the resumed percentage rather than 0. + if resume_from_bytes > 0 { + progress.add_bytes(resume_from_bytes); + } let reconnect_config = p2p_core::reconnect::ReconnectConfig { max_attempts: max_reconnect_attempts, ..Default::default() @@ -108,7 +177,7 @@ async fn send( .send_path( path, &reconnect_config, - Some(&state_file), + Some(state_file), Some(&mut progress), ) .await; @@ -116,7 +185,7 @@ async fn send( match result { Ok(summary) => { if state_file.exists() { - let _ = tokio::fs::remove_file(&state_file).await; + let _ = tokio::fs::remove_file(state_file).await; } // Prefer the per-file list from the summary so folder // transfers record every file rather than just the folder @@ -136,13 +205,13 @@ async fn send( } Err(e) => { if state_file.exists() { - warn!("Transfer interrupted"); - warn!("State saved to: {}", state_file.display()); warn!( - "Resume with: p2p-transfer resume {} --path \ - (then your original pairing flags: --peer + --peer-fingerprint, \ - or --rendezvous + --code)", - transfer_id + "Transfer interrupted; state saved to {}", + state_file.display() + ); + warn!( + "Re-run the same `send` command to resume from here \ + (or pass --no-resume to start over)." ); } record.fail(e.to_string()); diff --git a/p2p-cli/src/util.rs b/p2p-cli/src/util.rs index 8279c7c..0696292 100644 --- a/p2p-cli/src/util.rs +++ b/p2p-cli/src/util.rs @@ -1,8 +1,14 @@ -//! Small CLI helpers shared by send/resume. +//! Small CLI helpers for the send path: base-name derivation and locating +//! resume state on disk. use std::path::{Path, PathBuf}; +use std::time::SystemTime; use anyhow::{Context, Result}; +use tracing::warn; + +use p2p_core::protocol::FileMetadata; +use p2p_core::transfer_folder::{enumerate_files, FolderTransferState}; /// Derive a human-readable "base name" from a path, even when the path is /// `.`, `..`, or ends with a trailing separator. `Path::file_name` returns @@ -28,30 +34,140 @@ pub fn derive_base_name(path: &Path) -> Result { Ok(canonical.display().to_string()) } -/// Build the on-disk path for a resume state file. Honours an explicit -/// `--state-dir` from the caller and falls back to the current working -/// directory (the historical default). When `state_dir` is `Some`, the -/// directory is created on demand so the caller doesn't have to. +/// Per-user default directory for resume state files, used whenever +/// `--state-dir` is not supplied. Keying off a stable per-user location +/// (not the CWD) lets a `send` re-run from any working directory still +/// find the prior incomplete transfer for the same (peer, source). /// -/// Without `--state-dir`, `p2p-transfer resume ` was implicitly -/// scoped to the CWD; users who `cd`-ed between failure and resume saw -/// "State file not found" with no recovery hint (review finding 3.4). +/// * Windows: `%APPDATA%\p2p-transfer\state` +/// * Linux: `$XDG_DATA_HOME/p2p-transfer/state` (`~/.local/share/...`) +/// * macOS: `~/Library/Application Support/p2p-transfer/state` +pub fn default_state_dir() -> PathBuf { + match directories::BaseDirs::new() { + Some(base) => base.data_dir().join("p2p-transfer").join("state"), + // No home directory (extremely unusual — e.g. a stripped service + // account). Fall back to a relative dir so resume still works + // within a single working directory. + None => PathBuf::from("p2p-transfer").join("state"), + } +} + +/// Build the on-disk path for a resume state file. An explicit +/// `--state-dir` wins; otherwise the per-user [`default_state_dir`] is +/// used. The directory is created on demand so the caller can write the +/// state file straight away. pub fn resolve_state_file(state_dir: Option<&Path>, transfer_id: &str) -> Result { let file_name = format!("transfer_{transfer_id}.json"); - match state_dir { - Some(dir) => { - std::fs::create_dir_all(dir) - .with_context(|| format!("failed to create state dir {}", dir.display()))?; - Ok(dir.join(file_name)) + let dir = match state_dir { + Some(dir) => dir.to_path_buf(), + None => default_state_dir(), + }; + std::fs::create_dir_all(&dir) + .with_context(|| format!("failed to create state dir {}", dir.display()))?; + Ok(dir.join(file_name)) +} + +/// Find a prior incomplete transfer that a fresh `send` of `source_root` +/// to `peer_fingerprint` should resume, by scanning `state_dir` for a +/// `transfer_*.json` whose stamped peer matches and whose recorded file +/// list matches the source as it exists on disk right now. +/// +/// Matching is **strict**: every file must agree on `(relative path, size, +/// modified time)`. Any drift — a resized file, a touched mtime, a file +/// added or removed — is treated as a different transfer and yields `None` +/// (i.e. a fresh transfer). When more than one state file matches, the +/// newest (by file mtime) is chosen and a warning is logged; the stale +/// ones are removed by their own success path on a later run. +pub async fn find_resumable_state( + state_dir: &Path, + source_root: &Path, + peer_fingerprint: [u8; 32], +) -> Result> { + // Enumerate the source exactly as a fresh send would. If it can't be + // enumerated (empty folder, vanished path) there's nothing to resume; + // let the main send path surface the real error. + let Ok(current) = enumerate_files(source_root).await else { + return Ok(None); + }; + let current_key = sorted_file_keys(¤t); + + let mut entries = match tokio::fs::read_dir(state_dir).await { + Ok(entries) => entries, + // State dir doesn't exist yet → no prior transfers. + Err(_) => return Ok(None), + }; + + let mut best: Option<(PathBuf, SystemTime, FolderTransferState)> = None; + let mut matches = 0usize; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if !is_transfer_state_file(&path) { + continue; + } + // Unparseable files (corrupt, or written by an older format) are + // ignored rather than migrated — per the no-backcompat rule, an + // unreadable checkpoint just means "start fresh". + let Ok(state) = FolderTransferState::load_from_file(&path).await else { + continue; + }; + if state.peer_fingerprint != peer_fingerprint { + continue; + } + if sorted_file_keys(&state.files) != current_key { + continue; + } + matches += 1; + let mtime = entry + .metadata() + .await + .ok() + .and_then(|m| m.modified().ok()) + .unwrap_or(SystemTime::UNIX_EPOCH); + let is_newer = match &best { + Some((_, best_mtime, _)) => mtime > *best_mtime, + None => true, + }; + if is_newer { + best = Some((path, mtime, state)); } - None => Ok(PathBuf::from(file_name)), } + + if matches > 1 { + warn!( + "{} resumable state files match this source and peer; resuming the newest. \ + Stale ones are removed when their transfer next completes.", + matches + ); + } + + Ok(best.map(|(path, _, state)| (path, state))) +} + +fn is_transfer_state_file(path: &Path) -> bool { + path.file_name() + .and_then(|n| n.to_str()) + .is_some_and(|n| n.starts_with("transfer_") && n.ends_with(".json")) +} + +/// `(path, size, modified)` triples sorted by path, so the match in +/// [`find_resumable_state`] is independent of directory-enumeration order +/// (which is not guaranteed stable across runs). The stored `state.files` +/// order is left untouched — only these throwaway comparison keys are +/// sorted — so the resume's file indices stay valid. +fn sorted_file_keys(files: &[FileMetadata]) -> Vec<(String, u64, u64)> { + let mut keys: Vec<(String, u64, u64)> = files + .iter() + .map(|f| (f.path.clone(), f.size, f.modified)) + .collect(); + keys.sort(); + keys } #[cfg(test)] mod tests { use super::*; - use std::path::PathBuf; + use p2p_core::protocol::ConfigMessage; + use p2p_core::Uuid; /// Finding 3.1: `derive_base_name` must not panic on `.`, `..`, or /// trailing separators. The pre-fix code used `path.file_name().unwrap()` @@ -74,9 +190,22 @@ mod tests { assert_eq!(name, "hello.bin"); } - /// Finding 3.4: when `--state-dir` is supplied, the resume state - /// file lives under that directory regardless of the user's CWD. - /// The directory is auto-created. + /// `default_state_dir` always ends with `p2p-transfer/state`, wherever + /// the per-user data dir lands on this platform. + #[test] + fn default_state_dir_targets_per_user_p2p_transfer_state() { + let dir = default_state_dir(); + let mut tail = dir + .components() + .rev() + .map(|c| c.as_os_str().to_string_lossy().to_string()); + assert_eq!(tail.next().as_deref(), Some("state")); + assert_eq!(tail.next().as_deref(), Some("p2p-transfer")); + } + + /// Finding 3.4: when `--state-dir` is supplied, the resume state file + /// lives under that directory regardless of the user's CWD. The + /// directory is auto-created. #[test] fn resolve_state_file_honours_explicit_state_dir() { let tmp = tempfile::tempdir().unwrap(); @@ -86,9 +215,118 @@ mod tests { assert!(dir.exists(), "state dir must be auto-created"); } - #[test] - fn resolve_state_file_defaults_to_cwd_when_state_dir_absent() { - let path = resolve_state_file(None, "abc-123").unwrap(); - assert_eq!(path, PathBuf::from("transfer_abc-123.json")); + // ---- find_resumable_state ------------------------------------------- + + /// Create `tmp/src/` populated with the given files and return the + /// `TempDir` (kept alive by the caller so it isn't reaped). + async fn make_source(files: &[(&str, &[u8])]) -> tempfile::TempDir { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path().join("src"); + tokio::fs::create_dir_all(&root).await.unwrap(); + for (name, content) in files { + tokio::fs::write(root.join(name), content).await.unwrap(); + } + tmp + } + + /// Persist a state file (stamped with `peer_fp`) describing `files`. + async fn save_state(state_dir: &Path, files: Vec, peer_fp: [u8; 32]) -> PathBuf { + tokio::fs::create_dir_all(state_dir).await.unwrap(); + let mut state = FolderTransferState::new( + Uuid::new_v4(), + "src".into(), + files, + &ConfigMessage::default(), + ); + state.peer_fingerprint = peer_fp; + let path = state_dir.join(format!("transfer_{}.json", state.transfer_id)); + state.save_to_file(&path).await.unwrap(); + path + } + + #[tokio::test] + async fn matches_when_peer_and_files_agree() { + let tmp = make_source(&[("a.txt", b"hello"), ("b.txt", b"world!!")]).await; + let src = tmp.path().join("src"); + let state_dir = tmp.path().join("state"); + let fp = [7u8; 32]; + let files = enumerate_files(&src).await.unwrap(); + let saved = save_state(&state_dir, files, fp).await; + + let (path, state) = find_resumable_state(&state_dir, &src, fp) + .await + .unwrap() + .expect("matching peer + file list should resume"); + assert_eq!(path, saved); + assert_eq!(state.peer_fingerprint, fp); + } + + #[tokio::test] + async fn rejects_when_peer_fingerprint_differs() { + let tmp = make_source(&[("a.txt", b"hello")]).await; + let src = tmp.path().join("src"); + let state_dir = tmp.path().join("state"); + let files = enumerate_files(&src).await.unwrap(); + save_state(&state_dir, files, [1u8; 32]).await; + + let found = find_resumable_state(&state_dir, &src, [2u8; 32]) + .await + .unwrap(); + assert!(found.is_none(), "a different peer must not match"); + } + + #[tokio::test] + async fn rejects_when_a_file_size_differs() { + let tmp = make_source(&[("a.txt", b"hello")]).await; + let src = tmp.path().join("src"); + let state_dir = tmp.path().join("state"); + let fp = [3u8; 32]; + let mut files = enumerate_files(&src).await.unwrap(); + files[0].size += 1; // pretend the file used to be a different size + save_state(&state_dir, files, fp).await; + + let found = find_resumable_state(&state_dir, &src, fp).await.unwrap(); + assert!(found.is_none(), "size drift must not match"); + } + + #[tokio::test] + async fn rejects_when_a_file_mtime_differs() { + let tmp = make_source(&[("a.txt", b"hello")]).await; + let src = tmp.path().join("src"); + let state_dir = tmp.path().join("state"); + let fp = [4u8; 32]; + let mut files = enumerate_files(&src).await.unwrap(); + files[0].modified ^= 0xFFFF; // perturb the recorded mtime + save_state(&state_dir, files, fp).await; + + let found = find_resumable_state(&state_dir, &src, fp).await.unwrap(); + assert!(found.is_none(), "mtime drift must not match"); + } + + #[tokio::test] + async fn rejects_when_a_file_is_added_or_removed() { + let tmp = make_source(&[("a.txt", b"hello"), ("b.txt", b"there")]).await; + let src = tmp.path().join("src"); + let state_dir = tmp.path().join("state"); + let fp = [5u8; 32]; + let mut files = enumerate_files(&src).await.unwrap(); + files.pop(); // state knows about fewer files than exist now + save_state(&state_dir, files, fp).await; + + let found = find_resumable_state(&state_dir, &src, fp).await.unwrap(); + assert!(found.is_none(), "file-set drift must not match"); + } + + #[tokio::test] + async fn returns_none_on_empty_state_dir() { + let tmp = make_source(&[("a.txt", b"hello")]).await; + let src = tmp.path().join("src"); + let state_dir = tmp.path().join("state"); + tokio::fs::create_dir_all(&state_dir).await.unwrap(); + + let found = find_resumable_state(&state_dir, &src, [9u8; 32]) + .await + .unwrap(); + assert!(found.is_none(), "no state files → no match"); } } diff --git a/p2p-core/src/session.rs b/p2p-core/src/session.rs index 418f087..f902da7 100644 --- a/p2p-core/src/session.rs +++ b/p2p-core/src/session.rs @@ -262,6 +262,13 @@ impl P2PSession { ))); } + // Stamp the negotiated peer into every persisted checkpoint so a + // future `send` of the same source can locate this state by + // (peer, file list) instead of a random UUID. Available the moment + // the session is up; `[u8; 32]` and Copy, so capturing it here + // doesn't hold a borrow across the `&mut self.connection` below. + let peer_fp = self.peer_fingerprint(); + let mut attempt = 0; let fresh_state = || { @@ -315,11 +322,32 @@ impl P2PSession { transfer_id, ); + // Persist a fresh checkpoint each time a file completes, so + // an abrupt termination (Ctrl+C, kill, crash) still leaves a + // resumable state — not just the recoverable-error path + // below. The snapshot is stamped with the peer fingerprint + // so `find_resumable_state` can match it on the next run. + if let Some(state_file) = state_path { + let path_buf = state_file.to_path_buf(); + folder_session.set_state_callback(Arc::new(move |st: &FolderTransferState| { + let mut snapshot = st.clone(); + snapshot.peer_fingerprint = peer_fp; + if let Ok(json) = serde_json::to_string_pretty(&snapshot) { + let _ = std::fs::write(&path_buf, json); + } + })); + } + folder_session .send(path, &mut state, progress.as_deref_mut()) .await }; + // Make sure the error-path saves below also carry the peer + // identity (the in-memory `state` may have been (re)built inside + // `send` with the zero-fingerprint default). + state.peer_fingerprint = peer_fp; + match result { Ok(_) => { if let Some(state_file) = state_path { diff --git a/p2p-core/src/transfer_folder.rs b/p2p-core/src/transfer_folder.rs index 500d5d4..383a02a 100644 --- a/p2p-core/src/transfer_folder.rs +++ b/p2p-core/src/transfer_folder.rs @@ -244,38 +244,7 @@ impl<'a> FolderTransferSession<'a> { .to_string_lossy() .to_string(); - let files = if path.is_file() { - let metadata = fs::metadata(path).await?; - let size = metadata.len(); - let modified = metadata - .modified() - .unwrap_or(SystemTime::UNIX_EPOCH) - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_secs(); - let file_name = path.file_name().unwrap().to_string_lossy().to_string(); - vec![( - PathBuf::from(file_name.clone()), - FileMetadata { - path: file_name, - size, - modified, - checksum: [0u8; 32], - }, - )] - } else if path.is_dir() { - let files = self.scan_folder(path).await?; - if files.is_empty() { - return Err(Error::Protocol("Folder is empty".to_string())); - } - files - } else { - return Err(Error::Protocol( - "Path is neither a file nor a directory".to_string(), - )); - }; - - let file_list: Vec = files.iter().map(|(_, m)| m.clone()).collect(); + let file_list = enumerate_files(path).await?; *state = FolderTransferState::new(self.transfer_id, base_name, file_list, &self.config); None }; @@ -654,47 +623,88 @@ impl<'a> FolderTransferSession<'a> { } Ok(()) } +} - async fn scan_folder(&self, folder_path: &Path) -> Result> { - let mut files = Vec::new(); - let base_path = folder_path.parent().unwrap_or(folder_path); - let mut stack: std::collections::VecDeque = std::collections::VecDeque::new(); - stack.push_back(folder_path.to_path_buf()); - while let Some(current) = stack.pop_front() { - let mut entries = fs::read_dir(¤t).await?; - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - let metadata = entry.metadata().await?; - if metadata.is_file() { - let relative_path = path - .strip_prefix(base_path) - .map_err(|e| Error::Protocol(format!("Invalid path: {}", e)))? - .to_path_buf(); - let relative_path = sanitize_relative_path(&relative_path)?; - let size = metadata.len(); - let modified = metadata - .modified() - .unwrap_or(SystemTime::UNIX_EPOCH) - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_secs(); - files.push(( - relative_path.clone(), - FileMetadata { - path: relative_path.to_string_lossy().to_string(), - size, - modified, - checksum: [0u8; 32], - }, - )); - trace!("Found file: {} ({} bytes)", path.display(), size); - } else if metadata.is_dir() { - stack.push_back(path); - } +/// Enumerate the files a `send` of `path` would transfer, in the same shape +/// (relative path + size + mtime) that [`FolderTransferState`] records. +/// +/// Shared by the live send and by the CLI's resume-detection +/// (`find_resumable_state`) so the two always produce byte-identical file +/// lists to compare against. A single file yields a one-element list keyed +/// by its bare name; a directory is walked recursively with paths relative +/// to its parent (so the top-level folder name is preserved on the wire). +pub async fn enumerate_files(path: &Path) -> Result> { + if path.is_file() { + let metadata = fs::metadata(path).await?; + let modified = metadata + .modified() + .unwrap_or(SystemTime::UNIX_EPOCH) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let file_name = path + .file_name() + .ok_or_else(|| Error::Protocol("Invalid path".to_string()))? + .to_string_lossy() + .to_string(); + Ok(vec![FileMetadata { + path: file_name, + size: metadata.len(), + modified, + checksum: [0u8; 32], + }]) + } else if path.is_dir() { + let files = scan_folder(path).await?; + if files.is_empty() { + return Err(Error::Protocol("Folder is empty".to_string())); + } + Ok(files.into_iter().map(|(_, m)| m).collect()) + } else { + Err(Error::Protocol( + "Path is neither a file nor a directory".to_string(), + )) + } +} + +async fn scan_folder(folder_path: &Path) -> Result> { + let mut files = Vec::new(); + let base_path = folder_path.parent().unwrap_or(folder_path); + let mut stack: std::collections::VecDeque = std::collections::VecDeque::new(); + stack.push_back(folder_path.to_path_buf()); + while let Some(current) = stack.pop_front() { + let mut entries = fs::read_dir(¤t).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + let metadata = entry.metadata().await?; + if metadata.is_file() { + let relative_path = path + .strip_prefix(base_path) + .map_err(|e| Error::Protocol(format!("Invalid path: {}", e)))? + .to_path_buf(); + let relative_path = sanitize_relative_path(&relative_path)?; + let size = metadata.len(); + let modified = metadata + .modified() + .unwrap_or(SystemTime::UNIX_EPOCH) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + files.push(( + relative_path.clone(), + FileMetadata { + path: relative_path.to_string_lossy().to_string(), + size, + modified, + checksum: [0u8; 32], + }, + )); + trace!("Found file: {} ({} bytes)", path.display(), size); + } else if metadata.is_dir() { + stack.push_back(path); } } - Ok(files) } + Ok(files) } /// On-disk state for chunk-level resume. Embeds the negotiated @@ -715,6 +725,13 @@ pub struct FolderTransferState { /// Negotiated config snapshot — must match what the `.partial` on /// disk was laid out with. Resume reads `config.chunk_size` directly. pub config: ConfigMessage, + /// SHA-256 fingerprint of the peer this transfer was negotiated with. + /// Stamped by `P2PSession::send_path` once the session is up. Lets a + /// fresh `send` find the right prior state to resume by matching on + /// `(peer, file list)` rather than a random UUID. `[0u8; 32]` until + /// the first save; never collides with a real fingerprint. + #[serde(default)] + pub peer_fingerprint: [u8; 32], } impl FolderTransferState { @@ -735,6 +752,7 @@ impl FolderTransferState { transferred_bytes: 0, file_chunks: HashMap::new(), config: config.clone(), + peer_fingerprint: [0u8; 32], } } diff --git a/smoke/src/stress.sh b/smoke/src/stress.sh index 5d33906..4f18caf 100644 --- a/smoke/src/stress.sh +++ b/smoke/src/stress.sh @@ -4,7 +4,7 @@ # Uses the new capabilities: # --identity-dir distinct identities per process # --max-reconnect-attempts N finite retries (default 5) -# resume --path FILE works for single files now +# send … (re-run) auto-resumes a prior interrupted transfer # history --limit N works at any -v level + records from CLI # # Run from repo root: bash smoke/src/stress_v4.sh @@ -165,9 +165,9 @@ sleep 1; killtree "$RECV"; wait "$RECV" 2>/dev/null killtree "$RV"; wait "$RV" 2>/dev/null ############################################################ -# T10 — single-file resume (now possible because resume accepts files) -note "T10 single-file resume + bounded retries" -mkdir -p t10/in t10/out +# T10 — single-file auto-resume (re-running the same `send` continues it) +note "T10 single-file auto-resume + bounded retries" +mkdir -p t10/in t10/out t10/state head -c 8388608 /dev/urandom > t10/in/resume.bin # 8 MB at 1 MB/s = 8 s SH_IN=$(sha256 t10/in/resume.bin) "$BIN" -v info --identity-dir "$ID_R" receive --port 26590 --auto-accept --output t10/out > t10r.log 2>&1 & @@ -176,7 +176,8 @@ FP=$(grep -oE '[0-9a-f]{64}' t10r.log | head -1) # default max_reconnect_attempts=5 with 3+6+12+24+48 backoff = ~93s total max, # but we kill the receiver permanently so each reconnect attempt fails fast. -"$BIN" -v info --identity-dir "$ID_S" send t10/in/resume.bin --peer 127.0.0.1:26590 --peer-fingerprint "$FP" --max-speed 1M > t10s.log 2>&1 & +# --state-dir pins the checkpoint location so the re-run below finds it. +"$BIN" -v info --identity-dir "$ID_S" send t10/in/resume.bin --peer 127.0.0.1:26590 --peer-fingerprint "$FP" --max-speed 1M --state-dir t10/state > t10s.log 2>&1 & SEND=$! sleep 3 # ~3 MB in echo "T10 killing receiver (sender must persist state, then bounded retries)…" @@ -186,24 +187,26 @@ wait "$SEND" 2>/dev/null SEND_RC=$? echo "T10 sender exited rc=$SEND_RC" -STATE=$(ls transfer_*.json 2>/dev/null | head -1) +STATE=$(ls t10/state/transfer_*.json 2>/dev/null | head -1) if [[ -n "$STATE" ]]; then ok "T10a state file written ($STATE)" - TID=$(echo "$STATE" | sed -E 's/transfer_(.+)\.json/\1/') "$BIN" -v info --identity-dir "$ID_R" receive --port 26590 --auto-accept --output t10/out > t10r2.log 2>&1 & RECV2=$!; sleep 3 FP2=$(grep -oE '[0-9a-f]{64}' t10r2.log | head -1) - # Resume with a FILE path — this is the bug we fixed. - "$BIN" -v info --identity-dir "$ID_S" resume "$TID" --to 127.0.0.1:26590 --peer-fingerprint "$FP2" --path t10/in/resume.bin > t10res.log 2>&1 + # Re-run the IDENTICAL send (same source, same peer, same --state-dir). + # It must auto-detect the prior checkpoint and resume — no `resume` + # subcommand, no transfer id. + "$BIN" -v info --identity-dir "$ID_S" send t10/in/resume.bin --peer 127.0.0.1:26590 --peer-fingerprint "$FP2" --state-dir t10/state > t10res.log 2>&1 RC=$? sleep 1; killtree "$RECV2"; wait "$RECV2" 2>/dev/null - if [[ $RC -eq 0 && -f t10/out/resume.bin && "$SH_IN" == "$(sha256 t10/out/resume.bin)" ]]; then - ok "T10b single-file resume completed, sha256 match" + RESUMED=$(grep -c "Resuming transfer" t10res.log) + if [[ $RC -eq 0 && -f t10/out/resume.bin && "$SH_IN" == "$(sha256 t10/out/resume.bin)" && $RESUMED -ge 1 ]]; then + ok "T10b single-file auto-resume completed, sha256 match" else - bad "T10b rc=$RC file_present=$([[ -f t10/out/resume.bin ]] && echo yes || echo no)" + bad "T10b rc=$RC file_present=$([[ -f t10/out/resume.bin ]] && echo yes || echo no) resumed=$RESUMED" tail -10 t10res.log fi else diff --git a/tests/rendezvous_disconnect_resume_test.rs b/tests/rendezvous_disconnect_resume_test.rs index ad25e9a..d03e30a 100644 --- a/tests/rendezvous_disconnect_resume_test.rs +++ b/tests/rendezvous_disconnect_resume_test.rs @@ -1,89 +1,122 @@ -//! End-to-end test: rendezvous pairing → first sender closes → receiver -//! re-pairs through the same rendezvous → second sender uses -//! `handle_resume` with `--rendezvous` → destination matches source. +//! End-to-end test: rendezvous pairing → receiver re-pairs after the first +//! sender disconnects → a second `send` of the *same source* auto-resumes a +//! prior incomplete transfer → destination matches source. //! -//! This test exists to prevent two structural regressions that landed -//! before any test caught them: +//! This guards two structural behaviours: //! -//! 1. **Receiver re-pair under rendezvous.** Post-rendezvous, the QUIC -//! role (initiator vs responder) is decided by a UUID compare; the -//! receiver wins only ~half the time, so `session.reaccept()` is -//! structurally wrong half the time. The fix re-pairs through the -//! rendezvous on disconnect. Here we drive the same receiver instance -//! through TWO consecutive pairings — if `reaccept()` were still on -//! the disconnect path, the second pairing would fail with -//! "reaccept() is only valid for responder sessions" half the time. +//! 1. **Receiver re-pair under rendezvous.** Post-rendezvous, the QUIC role +//! (initiator vs responder) is decided by a UUID compare; the receiver +//! wins only ~half the time, so `session.reaccept()` is structurally +//! wrong half the time. The fix re-pairs through the rendezvous on +//! disconnect. A single long-lived receiver is driven through TWO +//! consecutive pairings — if `reaccept()` were on the disconnect path it +//! would fail half the time on the second pair. //! -//! 2. **Resume over rendezvous.** The original `resume` CLI only accepted -//! `--to `, making cross-NAT resume impossible. The fix -//! flattens `SessionParams` into the `resume` command; phase 2 of -//! this test calls `handle_resume` with `--rendezvous` + `--code` so -//! a regression would surface as a CLI-parse failure or a -//! session-establish failure. +//! 2. **Resume folded into `send`.** There is no longer a `resume` +//! subcommand. Re-running the identical `send` of an interrupted +//! transfer must auto-detect the prior state (by peer fingerprint + file +//! list) and continue it. Phase 2 calls `handle_send` again — a +//! regression would either restart from zero (no "Resuming transfer" +//! log, prior state file left orphaned) or fail to pair / skip the +//! already-completed file. +//! +//! The mid-transfer checkpoint is *reconstructed* rather than produced by +//! killing a live transfer: in rendezvous mode the QUIC initiator/responder +//! — and therefore which side's `ConfigMessage` (and bandwidth limit) wins +//! — is decided by a UUID race, so a live transfer's duration is not stable +//! enough to interrupt it deterministically. Seeding the checkpoint keeps +//! the test exercising the real resume path (find_resumable_state → +//! handle_send → receiver skips completed files → rendezvous re-pair) +//! without a timing race. use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::{Path, PathBuf}; -use std::time::{Duration, Instant}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use sha2::{Digest, Sha256}; use tokio::time::{sleep, timeout}; use p2p_cli::cli::{SessionParams, TransferParams}; -use p2p_core::{ - protocol::{ConfigMessage, FileMetadata}, - transfer_folder::FolderTransferState, - Uuid, -}; +use p2p_core::identity::Identity; +use p2p_core::protocol::ConfigMessage; +use p2p_core::transfer_folder::{enumerate_files, FolderTransferState}; +use p2p_core::Uuid; use p2p_rendezvous::Server; const PAIRING_CODE: &str = "RZRTEST"; -const PAYLOAD_SIZE: usize = 1_048_576; // 1 MiB +const PAYLOAD_SIZE: usize = 256 * 1024; // 256 KiB per file const PHASE_DEADLINE: Duration = Duration::from_secs(45); -const POLL_INTERVAL: Duration = Duration::from_millis(100); #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn receiver_re_pairs_after_sender_disconnect_and_resume_uses_rendezvous() { +async fn interrupted_send_auto_resumes_on_rerun_over_rendezvous() { + let logs = install_log_capture(); + let tmp = tempfile::tempdir().expect("tmpdir"); let dirs = Dirs::lay_out(tmp.path()).await; let payloads = Payloads::create(&dirs).await; + let dst_root = dirs.dst.join("src"); // folder name is preserved on the wire + + // Pre-generate the receiver identity so we can compute the fingerprint + // the sender will see as its peer — used to stamp the seeded checkpoint. + // The receiver task loads this same persisted identity on startup. + let peer_fp = Identity::load_or_generate(Some(&dirs.receiver_identity)) + .expect("receiver identity") + .fingerprint(); let rzv_addr = start_local_rendezvous().await; - // The receiver instance must survive the first sender disconnecting - // and accept a second sender that arrives through resume. Both - // pairings are through the same rendezvous + code. + // One receiver instance must survive the first sender disconnecting and + // accept the resuming sender. Both pairings go through the same + // rendezvous + code. let receiver = spawn_receiver(rzv_addr, &dirs); + sleep(Duration::from_millis(200)).await; // let the receiver register first - // PHASE 1 — fresh send via rendezvous. Completes naturally. When the - // sender exits, the receiver's QUIC connection closes; this is the - // point at which the receive loop must successfully re-pair through - // the rendezvous (and not call `reaccept()`). - // - // Give the receiver a head-start so its register arrives first; the - // rendezvous treats whichever side arrives second as the match. - // Without this both can race for the "first peer" slot and the - // loser sees "code already in use". - sleep(Duration::from_millis(200)).await; - phase1_send_file(rzv_addr, &dirs, &payloads.a).await; - wait_until_file_at(&dirs.dst, &payloads.a.name, PAYLOAD_SIZE).await; - assert_file_matches(&payloads.a, &dirs.dst.join(&payloads.a.name)).await; - - // PHASE 2 — resume via rendezvous. We synthesise a fresh state file - // describing a not-yet-started transfer of file B, then drive - // `handle_resume` with `--rendezvous` + `--code`. Pre-fix this would - // fail at CLI signature or session establish; post-fix it pairs - // through the rendezvous (the receiver is now in re-pair after - // phase 1) and transfers file B. - // - // Same ordering caveat as phase 1: the receiver loops back into a - // fresh rendezvous registration after the phase-1 sender disconnects; - // give it a moment to land in the waiter slot before the phase-2 - // sender arrives. - let resume_id = synthesize_state_for_resume(&dirs, &payloads.b).await; - sleep(Duration::from_millis(500)).await; - phase2_resume_file(rzv_addr, &dirs, &payloads.b, resume_id).await; - wait_until_file_at(&dirs.dst, &payloads.b.name, PAYLOAD_SIZE).await; - assert_file_matches(&payloads.b, &dirs.dst.join(&payloads.b.name)).await; + // PHASE 1 — a real folder send that completes. The sender then exits, + // closing the QUIC connection; the receiver must recover by re-pairing + // through the rendezvous (not reaccept()). + timeout(PHASE_DEADLINE, run_send(rzv_addr, &dirs)) + .await + .expect("phase 1 send timed out") + .expect("phase 1 send failed"); + assert_file_matches(&payloads.a, &dst_root.join(&payloads.a.name)).await; + assert_file_matches(&payloads.b, &dst_root.join(&payloads.b.name)).await; + + // Reconstruct a mid-transfer checkpoint: file A done, file B still + // pending, stamped with the real peer fingerprint. (See the module doc + // for why this is seeded rather than produced by a live interruption.) + clear_state_dir(&dirs.state).await; // drop phase 1's own (now-complete) state + let seed = seed_partial_state(&dirs, peer_fp, &payloads.b.name).await; + // Drop B from the destination so a successful resume must re-deliver it. + tokio::fs::remove_file(dst_root.join(&payloads.b.name)) + .await + .expect("remove dst B"); + + // PHASE 2 — re-run the identical `send`. It must locate the checkpoint + // (same peer + file list), pair again through the rendezvous, skip the + // completed file A, and re-deliver B. + sleep(Duration::from_millis(500)).await; // let the receiver re-register + timeout(PHASE_DEADLINE, run_send(rzv_addr, &dirs)) + .await + .expect("phase 2 send timed out") + .expect("phase 2 send failed"); + + assert_file_matches(&payloads.a, &dst_root.join(&payloads.a.name)).await; + assert_file_matches(&payloads.b, &dst_root.join(&payloads.b.name)).await; + + // Resume, not restart: phase 2 must have consumed the checkpoint (a + // fresh transfer would have minted a new UUID and left it orphaned), and + // the "Resuming transfer" line must have been logged. + assert!( + !seed.exists(), + "resume should consume the checkpoint, not orphan it: {}", + seed.display() + ); + let captured = String::from_utf8_lossy(&logs.lock().unwrap()).to_string(); + assert!( + captured.contains("Resuming transfer"), + "phase 2 should have resumed (no 'Resuming transfer' in logs)" + ); receiver.abort(); } @@ -120,8 +153,7 @@ impl Dirs { } } -/// A single source file's name, on-disk path, and SHA-256 — small bundle -/// so the test body doesn't juggle three parallel variables per payload. +/// A single source file's name, on-disk path, and SHA-256. struct Payload { name: String, path: PathBuf, @@ -165,14 +197,14 @@ fn rendezvous_session_params(rzv_addr: SocketAddr) -> SessionParams { } } -fn transfer_params_no_compression() -> TransferParams { +fn transfer_params() -> TransferParams { TransferParams { compress: false, // random payload is incompressible; skip the work compress_level: 3, adaptive: true, chunk_size: 1024, // KB → 1 MiB chunks → one chunk per file max_speed: 0, // unlimited; localhost is fast - max_reconnect_attempts: 0, // don't auto-reconnect across the loop + max_reconnect_attempts: 1, // single attempt; the test re-pairs explicitly } } @@ -182,80 +214,96 @@ fn spawn_receiver(rzv_addr: SocketAddr, dirs: &Dirs) -> tokio::task::JoinHandle< let identity_dir = dirs.receiver_identity.clone(); tokio::spawn(async move { // Auto-accept so the y/N prompt doesn't block the test. - // `handle_receive` runs an infinite loop; it returns only on a - // fatal (non-disconnect) error or when the task is aborted. + // `handle_receive` loops; it returns only on a fatal (non-disconnect) + // error or when the task is aborted. let _ = p2p_cli::receive::handle_receive(output, true, params, Some(identity_dir)).await; }) } -async fn phase1_send_file(rzv_addr: SocketAddr, dirs: &Dirs, payload: &Payload) { - let params = rendezvous_session_params(rzv_addr); - let transfer = transfer_params_no_compression(); - let identity_dir = dirs.sender_identity.clone(); - let result = timeout( - PHASE_DEADLINE, - p2p_cli::send::handle_send( - payload.path.clone(), - Some(dirs.state.clone()), - params, - transfer, - Some(identity_dir), - ), - ) - .await - .expect("phase 1 send timed out"); - result.expect("phase 1 send failed"); -} - -async fn phase2_resume_file( - rzv_addr: SocketAddr, - dirs: &Dirs, - payload: &Payload, - transfer_id: Uuid, -) { - let params = rendezvous_session_params(rzv_addr); - let identity_dir = dirs.sender_identity.clone(); - let result = timeout( - PHASE_DEADLINE, - p2p_cli::resume::handle_resume( - transfer_id.to_string(), - payload.path.clone(), - Some(dirs.state.clone()), - 0, - params, - Some(identity_dir), - ), +/// Drive `handle_send` of the whole `src` folder to completion (resume +/// enabled). +async fn run_send(rzv_addr: SocketAddr, dirs: &Dirs) -> anyhow::Result<()> { + p2p_cli::send::handle_send( + dirs.src.clone(), + Some(dirs.state.clone()), + false, // allow resume + rendezvous_session_params(rzv_addr), + transfer_params(), + Some(dirs.sender_identity.clone()), ) .await - .expect("phase 2 resume timed out"); - result.expect("phase 2 resume failed"); } -/// Build a `FolderTransferState` describing a not-yet-started transfer of -/// `payload`, save it as `transfer_.json` in `dirs.state`, and -/// return the transfer id. `handle_resume` will load this file, see -/// `completed_files` is empty + `file_chunks` is empty, and stream the -/// whole payload — exactly the same wire path a real "resume from -/// scratch" would take. -async fn synthesize_state_for_resume(dirs: &Dirs, payload: &Payload) -> Uuid { - let transfer_id = Uuid::new_v4(); - let state = FolderTransferState::new( - transfer_id, - "src".to_string(), - vec![FileMetadata { - path: payload.name.clone(), - size: PAYLOAD_SIZE as u64, - modified: 0, - checksum: [0u8; 32], - }], +/// Persist a checkpoint describing the `src` folder with every file marked +/// complete *except* `pending_name`, stamped with `peer_fp`. Returns the +/// state file path. +async fn seed_partial_state(dirs: &Dirs, peer_fp: [u8; 32], pending_name: &str) -> PathBuf { + let files = enumerate_files(&dirs.src).await.expect("enumerate source"); + let mut state = FolderTransferState::new( + Uuid::new_v4(), + "src".into(), + files.clone(), &ConfigMessage::default(), ); - let state_path = dirs.state.join(format!("transfer_{transfer_id}.json")); - state - .save_to_file(&state_path) - .await - .expect("save synthetic state"); - transfer_id + state.peer_fingerprint = peer_fp; + for (i, f) in files.iter().enumerate() { + if !f.path.ends_with(pending_name) { + state.mark_file_complete(i); + } + } + let path = dirs + .state + .join(format!("transfer_{}.json", state.transfer_id)); + state.save_to_file(&path).await.expect("save checkpoint"); + path +} + +/// Remove any `transfer_*.json` left in the state dir so the only resumable +/// state the next `send` can find is the one the test seeds. +async fn clear_state_dir(state_dir: &Path) { + let mut entries = match tokio::fs::read_dir(state_dir).await { + Ok(entries) => entries, + Err(_) => return, + }; + while let Ok(Some(entry)) = entries.next_entry().await { + let _ = tokio::fs::remove_file(entry.path()).await; + } +} + +// ---- log capture ------------------------------------------------------------ + +#[derive(Clone)] +struct SharedBuf(Arc>>); + +impl std::io::Write for SharedBuf { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.lock().unwrap().extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for SharedBuf { + type Writer = SharedBuf; + fn make_writer(&'a self) -> Self::Writer { + self.clone() + } +} + +/// Install a process-global subscriber that mirrors INFO logs into a buffer +/// the test can inspect. This binary has a single test, so setting the +/// global default once is safe. +fn install_log_capture() -> Arc>> { + let buf = Arc::new(Mutex::new(Vec::new())); + let subscriber = tracing_subscriber::fmt() + .with_writer(SharedBuf(buf.clone())) + .with_ansi(false) + .with_max_level(tracing::Level::INFO) + .finish(); + let _ = tracing::subscriber::set_global_default(subscriber); + buf } // ---- payload generation + verification -------------------------------------- @@ -285,28 +333,10 @@ fn fill_pseudo_random(buf: &mut [u8], seed: u64) { } } -async fn wait_until_file_at(dir: &Path, name: &str, expected_size: usize) { - let target = dir.join(name); - let deadline = Instant::now() + PHASE_DEADLINE; - loop { - match tokio::fs::metadata(&target).await { - Ok(m) if m.len() as usize == expected_size => return, - _ => {} - } - if Instant::now() >= deadline { - panic!( - "{} never reached {} bytes within {:?}", - target.display(), - expected_size, - PHASE_DEADLINE - ); - } - sleep(POLL_INTERVAL).await; - } -} - async fn assert_file_matches(expected: &Payload, actual: &Path) { - let bytes = tokio::fs::read(actual).await.expect("read destination"); + let bytes = tokio::fs::read(actual) + .await + .unwrap_or_else(|e| panic!("read destination {}: {e}", actual.display())); let got: [u8; 32] = Sha256::digest(&bytes).into(); assert_eq!( got, From 212e39fae65877f444991a325b5a798bbb876bb3 Mon Sep 17 00:00:00 2001 From: cDc Date: Wed, 3 Jun 2026 10:05:42 +0300 Subject: [PATCH 2/2] fix: address PR #4 review findings on auto-resume - Guard resume on chunk_size: a re-run with a different --chunk-size has an incompatible .partial layout, so resume now requires the saved config.chunk_size to match the current negotiated size and otherwise starts a fresh transfer with a warning (no skipped/overwritten ranges). - Clean up stale duplicate state files: when several checkpoints match the same source+peer, resume the newest and delete the rest so a later identical send can't pick up a stale checkpoint. - Make checkpointing cheap: throttle to <=1 write/2s, serialize compact, and write via spawn_blocking with atomic temp+rename instead of a full pretty-JSON serialize + blocking std::fs::write per completed file (was O(files^2) serialization and blocked a Tokio worker). - Correct the now-accurate FolderTransferState.config doc comment. - Add unit tests for chunk-size rejection and stale-duplicate cleanup. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 22 +++- p2p-cli/src/send.rs | 34 +++--- p2p-cli/src/util.rs | 176 ++++++++++++++++++++++++++------ p2p-core/src/session.rs | 50 +++++++-- p2p-core/src/transfer_folder.rs | 16 +-- 5 files changed, 233 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15fc61e..130bcab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,9 +20,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `$XDG_DATA_HOME/p2p-transfer/state`, or `~/Library/Application Support/p2p-transfer/state`), so a re-run from any directory finds it. `--state-dir` still overrides. -- `send_path` now persists a checkpoint each time a file completes, so an - abrupt kill (not just a recoverable network error) leaves resumable - state. +- `send_path` now persists a checkpoint as files complete (throttled to + at most once every 2s, written off the async runtime), so an abrupt + kill — not just a recoverable network error — leaves resumable state. + +### Fixed — 2026-06-03 — PR #4 review (auto-resume) +- **Chunk-size mismatch no longer corrupts resume.** Resume detection now + also requires the saved `config.chunk_size` to match the current + invocation's; a re-run with a different `--chunk-size` (whose `.partial` + layout is incompatible) starts a fresh transfer with a warning instead + of skipping or overwriting the wrong byte ranges. +- **Stale duplicate state files are cleaned up.** When more than one + checkpoint matches the same source and peer, `send` resumes the newest + and deletes the older duplicates immediately, so a later identical + `send` can't pick up a stale checkpoint after the chosen one completes. +- **Checkpoint write is no longer O(files²) / blocking.** The per-file + checkpoint is throttled (≤ once per 2s), serialized compactly, and + written via `spawn_blocking` with an atomic temp+rename, instead of a + full pretty-JSON serialize and blocking `std::fs::write` on a Tokio + worker after every completed file. ### Fixed — 2026-05-23 — Security & robustness audit (16 findings) diff --git a/p2p-cli/src/send.rs b/p2p-cli/src/send.rs index da7edb3..5419d79 100644 --- a/p2p-cli/src/send.rs +++ b/p2p-cli/src/send.rs @@ -84,18 +84,25 @@ pub async fn handle_send( // exact peer and pick up where it left off. `--no-resume` forces a // fresh transfer (e.g. when the source content changed in a way the // size+mtime check can't see). - let (transfer_id, state_file, resume_from_bytes) = - match resolve_resume(&state_dir, &path, peer_fp, no_resume).await? { - Some((id, file, bytes)) => (id, file, bytes), - None => { - let id = Uuid::new_v4(); - ( - id, - resolve_state_file(Some(&state_dir), &id.to_string())?, - 0, - ) - } - }; + let (transfer_id, state_file, resume_from_bytes) = match resolve_resume( + &state_dir, + &path, + peer_fp, + session.config().chunk_size, + no_resume, + ) + .await? + { + Some((id, file, bytes)) => (id, file, bytes), + None => { + let id = Uuid::new_v4(); + ( + id, + resolve_state_file(Some(&state_dir), &id.to_string())?, + 0, + ) + } + }; tokio::select! { result = send( @@ -118,13 +125,14 @@ async fn resolve_resume( state_dir: &Path, path: &Path, peer_fp: [u8; 32], + chunk_size: u32, no_resume: bool, ) -> Result> { if no_resume { return Ok(None); } let Some((existing_path, existing_state)) = - find_resumable_state(state_dir, path, peer_fp).await? + find_resumable_state(state_dir, path, peer_fp, chunk_size).await? else { return Ok(None); }; diff --git a/p2p-cli/src/util.rs b/p2p-cli/src/util.rs index 0696292..6321f7f 100644 --- a/p2p-cli/src/util.rs +++ b/p2p-cli/src/util.rs @@ -73,15 +73,22 @@ pub fn resolve_state_file(state_dir: Option<&Path>, transfer_id: &str) -> Result /// list matches the source as it exists on disk right now. /// /// Matching is **strict**: every file must agree on `(relative path, size, -/// modified time)`. Any drift — a resized file, a touched mtime, a file -/// added or removed — is treated as a different transfer and yields `None` -/// (i.e. a fresh transfer). When more than one state file matches, the -/// newest (by file mtime) is chosen and a warning is logged; the stale -/// ones are removed by their own success path on a later run. +/// modified time)` and the saved `config.chunk_size` must equal +/// `current_chunk_size`. Any drift — a resized file, a touched mtime, a +/// file added or removed, or a changed `--chunk-size` — is treated as a +/// different transfer and yields `None` (i.e. a fresh transfer). The chunk +/// size matters because the `.partial` byte offsets on disk were laid out +/// under the saved size; resuming under a different size would skip or +/// overwrite the wrong ranges. +/// +/// When more than one state file matches, the newest (by file mtime) is +/// resumed and the older duplicates are deleted on the spot, so a later +/// identical `send` can't accidentally pick up a stale checkpoint. pub async fn find_resumable_state( state_dir: &Path, source_root: &Path, peer_fingerprint: [u8; 32], + current_chunk_size: u32, ) -> Result> { // Enumerate the source exactly as a fresh send would. If it can't be // enumerated (empty folder, vanished path) there's nothing to resume; @@ -97,8 +104,12 @@ pub async fn find_resumable_state( Err(_) => return Ok(None), }; - let mut best: Option<(PathBuf, SystemTime, FolderTransferState)> = None; - let mut matches = 0usize; + // Every state file that matches peer + file list + chunk size, paired + // with its mtime so we can pick the newest and delete the rest. + let mut matches: Vec<(PathBuf, SystemTime, FolderTransferState)> = Vec::new(); + // A candidate matched peer + file list but differed only on chunk size: + // worth a warning so the user understands why resume restarted. + let mut chunk_size_mismatch = false; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if !is_transfer_state_file(&path) { @@ -116,31 +127,60 @@ pub async fn find_resumable_state( if sorted_file_keys(&state.files) != current_key { continue; } - matches += 1; + if state.config.chunk_size != current_chunk_size { + chunk_size_mismatch = true; + continue; + } let mtime = entry .metadata() .await .ok() .and_then(|m| m.modified().ok()) .unwrap_or(SystemTime::UNIX_EPOCH); - let is_newer = match &best { - Some((_, best_mtime, _)) => mtime > *best_mtime, - None => true, - }; - if is_newer { - best = Some((path, mtime, state)); - } + matches.push((path, mtime, state)); } - if matches > 1 { + if chunk_size_mismatch && matches.is_empty() { + warn!( + "A prior transfer of this source to this peer used a different chunk size; \ + its on-disk layout is incompatible, so starting a fresh transfer." + ); + } + + // Pick the newest match; delete every other match so it can't resurface + // as the "newest" on a later run once this one completes and is removed. + let newest_idx = matches + .iter() + .enumerate() + .max_by_key(|(_, (_, mtime, _))| *mtime) + .map(|(i, _)| i); + let Some(newest_idx) = newest_idx else { + return Ok(None); + }; + + if matches.len() > 1 { warn!( - "{} resumable state files match this source and peer; resuming the newest. \ - Stale ones are removed when their transfer next completes.", - matches + "{} resumable state files match this source and peer; resuming the newest \ + and removing the {} stale duplicate(s).", + matches.len(), + matches.len() - 1, ); } - Ok(best.map(|(path, _, state)| (path, state))) + let mut chosen = None; + for (i, (path, _, state)) in matches.into_iter().enumerate() { + if i == newest_idx { + chosen = Some((path, state)); + } else { + // Best-effort: a stale duplicate we fail to delete is harmless + // until the chosen transfer completes, by which point it would + // be the newest — but a delete failure here is rare and the + // strict (peer, files, chunk_size) match still guards resume. + let _ = tokio::fs::remove_file(&path).await; + } + } + + Ok(chosen) } fn is_transfer_state_file(path: &Path) -> bool { @@ -229,15 +269,30 @@ mod tests { tmp } - /// Persist a state file (stamped with `peer_fp`) describing `files`. + /// Default chunk size every helper saves under unless told otherwise; + /// the matcher must be queried with the same value to find a match. + const CHUNK: u32 = p2p_core::DEFAULT_CHUNK_SIZE; + + /// Persist a state file (stamped with `peer_fp`, chunked at `CHUNK`) + /// describing `files`. async fn save_state(state_dir: &Path, files: Vec, peer_fp: [u8; 32]) -> PathBuf { + save_state_chunked(state_dir, files, peer_fp, CHUNK).await + } + + /// Like [`save_state`] but with an explicit `chunk_size`, for exercising + /// the chunk-size match guard. + async fn save_state_chunked( + state_dir: &Path, + files: Vec, + peer_fp: [u8; 32], + chunk_size: u32, + ) -> PathBuf { tokio::fs::create_dir_all(state_dir).await.unwrap(); - let mut state = FolderTransferState::new( - Uuid::new_v4(), - "src".into(), - files, - &ConfigMessage::default(), - ); + let config = ConfigMessage { + chunk_size, + ..ConfigMessage::default() + }; + let mut state = FolderTransferState::new(Uuid::new_v4(), "src".into(), files, &config); state.peer_fingerprint = peer_fp; let path = state_dir.join(format!("transfer_{}.json", state.transfer_id)); state.save_to_file(&path).await.unwrap(); @@ -253,7 +308,7 @@ mod tests { let files = enumerate_files(&src).await.unwrap(); let saved = save_state(&state_dir, files, fp).await; - let (path, state) = find_resumable_state(&state_dir, &src, fp) + let (path, state) = find_resumable_state(&state_dir, &src, fp, CHUNK) .await .unwrap() .expect("matching peer + file list should resume"); @@ -269,7 +324,7 @@ mod tests { let files = enumerate_files(&src).await.unwrap(); save_state(&state_dir, files, [1u8; 32]).await; - let found = find_resumable_state(&state_dir, &src, [2u8; 32]) + let found = find_resumable_state(&state_dir, &src, [2u8; 32], CHUNK) .await .unwrap(); assert!(found.is_none(), "a different peer must not match"); @@ -285,7 +340,9 @@ mod tests { files[0].size += 1; // pretend the file used to be a different size save_state(&state_dir, files, fp).await; - let found = find_resumable_state(&state_dir, &src, fp).await.unwrap(); + let found = find_resumable_state(&state_dir, &src, fp, CHUNK) + .await + .unwrap(); assert!(found.is_none(), "size drift must not match"); } @@ -299,7 +356,9 @@ mod tests { files[0].modified ^= 0xFFFF; // perturb the recorded mtime save_state(&state_dir, files, fp).await; - let found = find_resumable_state(&state_dir, &src, fp).await.unwrap(); + let found = find_resumable_state(&state_dir, &src, fp, CHUNK) + .await + .unwrap(); assert!(found.is_none(), "mtime drift must not match"); } @@ -313,7 +372,9 @@ mod tests { files.pop(); // state knows about fewer files than exist now save_state(&state_dir, files, fp).await; - let found = find_resumable_state(&state_dir, &src, fp).await.unwrap(); + let found = find_resumable_state(&state_dir, &src, fp, CHUNK) + .await + .unwrap(); assert!(found.is_none(), "file-set drift must not match"); } @@ -324,9 +385,58 @@ mod tests { let state_dir = tmp.path().join("state"); tokio::fs::create_dir_all(&state_dir).await.unwrap(); - let found = find_resumable_state(&state_dir, &src, [9u8; 32]) + let found = find_resumable_state(&state_dir, &src, [9u8; 32], CHUNK) .await .unwrap(); assert!(found.is_none(), "no state files → no match"); } + + /// Finding 1: a state saved under a different chunk size must NOT + /// resume — its `.partial` byte offsets are laid out for that size, so + /// resuming under the current size would corrupt the file. + #[tokio::test] + async fn rejects_when_chunk_size_differs() { + let tmp = make_source(&[("a.txt", b"hello")]).await; + let src = tmp.path().join("src"); + let state_dir = tmp.path().join("state"); + let fp = [6u8; 32]; + let files = enumerate_files(&src).await.unwrap(); + save_state_chunked(&state_dir, files, fp, CHUNK).await; + + let found = find_resumable_state(&state_dir, &src, fp, CHUNK * 2) + .await + .unwrap(); + assert!(found.is_none(), "a different chunk size must not resume"); + } + + /// Finding 2: when several state files match, resume the newest and + /// delete the stale duplicates so a later `send` can't pick one up. + #[tokio::test] + async fn keeps_only_newest_match_and_deletes_stale() { + let tmp = make_source(&[("a.txt", b"hello")]).await; + let src = tmp.path().join("src"); + let state_dir = tmp.path().join("state"); + let fp = [8u8; 32]; + let files = enumerate_files(&src).await.unwrap(); + + let older = save_state(&state_dir, files.clone(), fp).await; + let newer = save_state(&state_dir, files, fp).await; + // Force a strictly newer mtime on the second file so "newest" is + // unambiguous regardless of filesystem mtime granularity. + let later = SystemTime::now() + std::time::Duration::from_secs(10); + std::fs::File::options() + .write(true) + .open(&newer) + .unwrap() + .set_modified(later) + .unwrap(); + + let (path, _) = find_resumable_state(&state_dir, &src, fp, CHUNK) + .await + .unwrap() + .expect("a match should resume"); + assert_eq!(path, newer, "the newest state file should be chosen"); + assert!(!older.exists(), "the stale duplicate should be deleted"); + assert!(newer.exists(), "the chosen state file should remain"); + } } diff --git a/p2p-core/src/session.rs b/p2p-core/src/session.rs index f902da7..a893767 100644 --- a/p2p-core/src/session.rs +++ b/p2p-core/src/session.rs @@ -10,8 +10,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::Path; -use std::sync::Arc; -use std::time::Duration; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use tracing::{debug, info, trace, warn}; use uuid::Uuid; @@ -27,6 +27,12 @@ use crate::transfer_folder::{ }; use crate::traversal::{establish_via_rendezvous, RendezvousParams, DEFAULT_STUN_SERVERS}; +/// Minimum wall-clock gap between on-disk resume checkpoints during a +/// transfer. Bounds checkpoint writes to ~one per interval regardless of +/// file count (otherwise persisting after every completed file is +/// O(files²) serialization plus a blocking write per file). +const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(2); + /// An established connection plus the parameters needed to resurrect it. pub struct P2PSession { endpoint: QuicEndpoint, @@ -322,19 +328,43 @@ impl P2PSession { transfer_id, ); - // Persist a fresh checkpoint each time a file completes, so - // an abrupt termination (Ctrl+C, kill, crash) still leaves a - // resumable state — not just the recoverable-error path - // below. The snapshot is stamped with the peer fingerprint - // so `find_resumable_state` can match it on the next run. + // Persist a checkpoint as files complete, so an abrupt + // termination (Ctrl+C, kill, crash) still leaves a resumable + // state — not just the recoverable-error path below. The + // snapshot is stamped with the peer fingerprint so + // `find_resumable_state` can match it on the next run. + // + // Throttled to at most once per `CHECKPOINT_INTERVAL`: a + // write after *every* file would serialize the whole + // (growing) state each time — O(files²) — and block a Tokio + // worker on disk I/O per file. The gate is checked before any + // clone/serialize, so skipped ticks are nearly free; the + // actual write runs off-worker via `spawn_blocking` and is + // atomic (tmp + rename) so a reader never sees a torn file. if let Some(state_file) = state_path { let path_buf = state_file.to_path_buf(); + let last_checkpoint = Arc::new(Mutex::new(Instant::now())); folder_session.set_state_callback(Arc::new(move |st: &FolderTransferState| { + { + let mut last = last_checkpoint.lock().unwrap(); + if last.elapsed() < CHECKPOINT_INTERVAL { + return; + } + *last = Instant::now(); + } let mut snapshot = st.clone(); snapshot.peer_fingerprint = peer_fp; - if let Ok(json) = serde_json::to_string_pretty(&snapshot) { - let _ = std::fs::write(&path_buf, json); - } + let Ok(json) = serde_json::to_vec(&snapshot) else { + return; + }; + let dest = path_buf.clone(); + tokio::task::spawn_blocking(move || { + let mut tmp = dest.clone(); + tmp.set_extension("json.tmp"); + if std::fs::write(&tmp, &json).is_ok() { + let _ = std::fs::rename(&tmp, &dest); + } + }); })); } diff --git a/p2p-core/src/transfer_folder.rs b/p2p-core/src/transfer_folder.rs index 383a02a..cd19c92 100644 --- a/p2p-core/src/transfer_folder.rs +++ b/p2p-core/src/transfer_folder.rs @@ -708,10 +708,12 @@ async fn scan_folder(folder_path: &Path) -> Result> } /// On-disk state for chunk-level resume. Embeds the negotiated -/// [`ConfigMessage`] verbatim so resume rehydrates the same chunk_size and -/// compression settings the original session used — without this the -/// `.partial` on disk (laid out under the original chunk_size) and the -/// resumed session's offsets disagree, silently corrupting the file. +/// [`ConfigMessage`] verbatim so a resuming `send` can *validate* that its +/// current chunk_size still matches the one the `.partial` files were laid +/// out under. If it differs (the user re-ran with a different +/// `--chunk-size`), the offsets would disagree and silently corrupt the +/// file, so the resume scanner rejects the mismatch and starts fresh +/// instead (see `find_resumable_state` in p2p-cli). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FolderTransferState { pub transfer_id: Uuid, @@ -722,8 +724,10 @@ pub struct FolderTransferState { pub total_bytes: u64, pub transferred_bytes: u64, pub file_chunks: HashMap>, - /// Negotiated config snapshot — must match what the `.partial` on - /// disk was laid out with. Resume reads `config.chunk_size` directly. + /// Negotiated config snapshot — must match what the `.partial` on disk + /// was laid out with. A resuming `send` compares `config.chunk_size` + /// against its current negotiated size and refuses to resume on a + /// mismatch (starting fresh) rather than corrupting the file. pub config: ConfigMessage, /// SHA-256 fingerprint of the peer this transfer was negotiated with. /// Stamped by `P2PSession::send_path` once the session is up. Lets a