fix(persistence): multi-shard AOF gate + per-shard AOF foundation (Option B step 1)#129
fix(persistence): multi-shard AOF gate + per-shard AOF foundation (Option B step 1)#129pilotspacex-byte wants to merge 22 commits into
Conversation
…OF (P0-FIX-01a/b) Empirical re-verification on HEAD 6e49050 (2026-05-26) found that `--shards >= 2 + --appendonly yes` silently loses ~50 % of writes on SIGKILL, independent of `--appendfsync` and `--disk-offload`. The original 33-day-old bug memory had narrowed the loss to BGREWRITEAOF + disk-offload; the discriminator matrix below shows the bug is in the multi-shard AOF durability path itself. | Configuration | Recovered | |--------------------------------------------------------------------------------|----------------| | --shards 1 --appendonly yes --appendfsync always | 5000 / 5000 | | --shards 1 --disk-offload enable --appendonly yes | 12714 / 12714 | | --shards 2 --disk-offload enable --appendonly yes (BGREWRITEAOF + SIGKILL) | 7892 / 12662 | | --shards 2 --disk-offload enable --appendonly yes (plain SIGKILL, no rewrite) | 7888 / 12655 | | --shards 2 --disk-offload enable --appendonly yes --appendfsync always | 2474 / 5000 | | --shards 2 --disk-offload disable --appendonly yes --appendfsync always | 2453 / 5000 | Two complementary gates ship in this commit; both lift in v2.0 when multi-shard AOF replay walks every shard's segment manifest on recovery (see docs/runbooks/multi-shard-aof-rewrite.md): P0-FIX-01a (defence-in-depth, command-level) bgrewriteaof_start_sharded refuses with a clear ERR when the multi-shard + disk-offload + AOF combo is active. Gated by MULTI_SHARD_AOF_REWRITE_UNSAFE: AtomicBool, set once in main.rs. Unit test test_bgrewriteaof_sharded_refuses_under_unsafe_config covers gate-on + gate-off paths and asserts the gate does not flip AOF_REWRITE_IN_PROGRESS. P0-FIX-01b (load-bearing, startup) main.rs aborts with exit code 2 if `--shards >= 2 + --appendonly yes` without `--unsafe-multishard-aof`. The new flag is the explicit escape hatch for cache-only deployments where the loss window is acceptable. Boundary tests verified live on OrbStack: PASS --shards 1 + AOF starts cleanly (no false positives) PASS --shards 2 + AOF + --unsafe-multishard-aof starts PASS --shards 2 + --appendonly no starts (cache-only) REFUSED --shards 2 + AOF without escape hatch Files src/command/persistence.rs + gate + unit test src/main.rs + startup refusal + BGREWRITEAOF gate set src/config.rs + --unsafe-multishard-aof flag docs/runbooks/multi-shard-aof-rewrite.md + operator runbook Reproducer scripts live in tmp/ (gitignored): p0-repro.sh, p0-no-rewrite.sh, p0-always.sh, p0-multishard-no-offload.sh, p0-shards1-exact.sh. Encoding them as #[ignore] crash-matrix tests is tracked as CRASH-01-LITE in the ship plan. Multi-shard masters with AOF are now explicitly cache-only in v1.0. Root-cause investigation P0-INVEST-01 (1-2 wk) is the prerequisite to lifting the startup gate in v2.0. author: Tin Dang
…lpha-leak qualifiers
README
* Bumps version badge v0.1.10 → v0.1.12 and replaces the
"experimental" status with "single-node production-grade" plus a
"cluster v0.2 alpha" badge, mirroring the new ship plan posture.
* Replaces the blanket experimental warning with a "production-grade
architecture, pre-1.0 maturity" framing that points at the new
Production readiness section for the honest GA matrix.
* Reconciles platform support — macOS is a supported development
platform per the PRODUCTION-CONTRACT Tier table; production
deployments target Linux.
* Adds a Valkey 9.1.0 column to the peak-throughput tables (honest
"not yet benched" placeholders) and a new Moon vs Redis vs Valkey
section: a three-way comparison table plus "when to choose"
guidance, all traced to docs/comparison-valkey.md.
* Rewrites the trailing roadmap into a Production readiness section
with what's GA today, what's not, operator gotchas, and a roadmap
table.
Alpha-leak qualifiers added so v0.1.12 framing does not implicitly
promise v0.2.0-alpha features:
* Quick-start HEXPIRE / HTTL lines annotated "(v0.2.0-alpha; build
from main)".
* Hash-field TTL benchmark section retitled "v0.2.0-alpha preview"
with a callout that the latest tag (v0.1.12) does not include it.
* "What's already in main" list split into v0.1.12 (latest tag,
single-node production-grade) and v0.2.0-alpha additions
(hash-field TTL, PITR, CDC, multi-node cluster soak).
* Comparison-table row for hash-field TTL qualified as
"v0.2-alpha".
CHANGELOG
* Adds v0.1.12 entry covering Phase 189 (DashTable pre-sizing +
--initial-keyspace-hint, PERF-07/09), Phase 190 (moon_memory_bytes
Prometheus gauge with 7 subsystem kinds, MEMORY DOCTOR schema,
resident_bytes trait), Phase 191 (jemalloc narenas:8 cap,
--memory-arenas-cap, mimalloc-alt feature, OPERATOR-GUIDE Memory
Accounting), Phase 177 dispatch observability, text-index default
feature, SDK validate.{py,rs}, Python SDK graph parser fix, CI
hygiene.
* Adds v0.1.10 entry (single-shard PSYNC2 wired end-to-end).
* Adds v0.1.9 Lunaris Retriever Gap Closure entry.
* Consolidates three orphan Unreleased blocks under v0.1.3.
* Sharpens v0.2.0-alpha entry with TL;DR headline capabilities
(hash-field TTL stack, PITR, CDC, multi-node cluster soak).
* Fixes version ordering so v0.1.12 sits above v0.1.11.
No code changes; this is purely documentation framing aligned to the
v1.0-rc1 single-node ship plan in tmp/SHIP-PLAN-v1.0-rc1-single-node.md.
author: Tin Dang
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR enforces a startup/runtime safety gate for a known multi-shard AOF rewrite durability bug (CLI override added), adds v2 PerShard AOF manifest format and migration/replay, implements AofWriterPool and per-shard writers, migrates connections/handlers to the pool with per-entry LSNs, and updates tests, runbook, README, and CHANGELOG. ChangesMulti-Shard AOF Safety Gates
Estimated code review effort 🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels:
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
README.md (1)
229-233:⚠️ Potential issue | 🟠 Major | ⚡ Quick winQuick-start production flags now conflict with startup safety gate.
This command should fail under the new startup refusal (
--shards >= 2 + --appendonly yeswithout override), so the README is currently instructing an invalid config.Suggested README correction
# Or with production flags ./target/release/moon \ --port 6379 \ - --shards 8 \ - --appendonly yes --appendfsync everysec \ + --shards 1 \ + --appendonly yes --appendfsync everysec \ --maxmemory 8g --maxmemory-policy allkeys-lfu + +# Multi-shard cache-only alternative +# ./target/release/moon --shards 8 --appendonly no ... + +# Unsafe override (not recommended; known durability risk) +# ./target/release/moon --shards 8 --appendonly yes --unsafe-multishard-aof ...🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@README.md` around lines 229 - 233, The README's quick-start example uses conflicting flags (--shards 8 together with --appendonly yes) which will trigger the new startup safety gate and refuse to start; update the example command under the block that contains the flags (--port, --shards, --appendonly, --appendfsync, --maxmemory, --maxmemory-policy) to a valid configuration (e.g., set --shards 1 or remove/disable --appendonly) or explicitly show the required override flag and text that allows bypassing the safety gate (add a clear placeholder like --<startup-override> if an override exists) so the documented command actually starts successfully.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/runbooks/multi-shard-aof-rewrite.md`:
- Around line 10-16: Three fenced code blocks in
docs/runbooks/multi-shard-aof-rewrite.md are missing language identifiers
(markdownlint MD040). Edit the three blocks shown (the startup refusal block
starting "REFUSING TO START: --shards 2 + --appendonly yes...", the BGREWRITEAOF
interaction block containing "BGREWRITEAOF" and "(error) ERR BGREWRITEAOF...",
and the final explanatory block starting "BGREWRITEAOF gated for this
config...") and add the language tags: use ```text for the two plain-text blocks
and ```redis for the BGREWRITEAOF example so markdownlint MD040 is satisfied.
In `@src/main.rs`:
- Around line 273-289: The --check-config path currently returns before the
multishard-AOF safety gate runs, so add the same refusal logic used at startup
into the check_config branch: detect the condition (num_shards >= 2 &&
config.appendonly == "yes" && !config.unsafe_multishard_aof) inside the
check_config handling and print the identical error message and exit non‑zero
(or return an error) so preflight fails the same way real startup would; use the
same symbols/strings (num_shards, config.appendonly,
config.unsafe_multishard_aof) and the same message text used near the startup
gate to keep behavior consistent.
---
Outside diff comments:
In `@README.md`:
- Around line 229-233: The README's quick-start example uses conflicting flags
(--shards 8 together with --appendonly yes) which will trigger the new startup
safety gate and refuse to start; update the example command under the block that
contains the flags (--port, --shards, --appendonly, --appendfsync, --maxmemory,
--maxmemory-policy) to a valid configuration (e.g., set --shards 1 or
remove/disable --appendonly) or explicitly show the required override flag and
text that allows bypassing the safety gate (add a clear placeholder like
--<startup-override> if an override exists) so the documented command actually
starts successfully.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c11a2da9-b702-43f0-91ac-59786ae9a841
📒 Files selected for processing (6)
CHANGELOG.mdREADME.mddocs/runbooks/multi-shard-aof-rewrite.mdsrc/command/persistence.rssrc/config.rssrc/main.rs
| ``` | ||
| REFUSING TO START: --shards 2 + --appendonly yes has a known data-loss | ||
| bug on SIGKILL (~50 % loss verified 2026-05-26). Fix: use --shards 1, | ||
| or pass --appendonly no for cache-only deployments, or pass | ||
| --unsafe-multishard-aof to acknowledge the risk and start anyway. See | ||
| docs/runbooks/multi-shard-aof-rewrite.md. | ||
| ``` |
There was a problem hiding this comment.
Add fenced code languages to satisfy markdownlint MD040.
These three fenced blocks are missing language identifiers and will keep markdownlint warnings active.
Suggested doc-only fix
-```
+```text
REFUSING TO START: --shards 2 + --appendonly yes has a known data-loss
...
-```
+```
-```
+```redis
> BGREWRITEAOF
(error) ERR BGREWRITEAOF is unsafe with --shards >= 2 + --disk-offload enable
...
-```
+```
-```
+```text
BGREWRITEAOF gated for this config (known data-loss path; see
docs/runbooks/multi-shard-aof-rewrite.md). Use --shards 1 or
--disk-offload disable to re-enable rewrite.
-```
+```Also applies to: 20-26, 88-92
🧰 Tools
🪛 markdownlint-cli2 (0.22.1)
[warning] 10-10: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/runbooks/multi-shard-aof-rewrite.md` around lines 10 - 16, Three fenced
code blocks in docs/runbooks/multi-shard-aof-rewrite.md are missing language
identifiers (markdownlint MD040). Edit the three blocks shown (the startup
refusal block starting "REFUSING TO START: --shards 2 + --appendonly yes...",
the BGREWRITEAOF interaction block containing "BGREWRITEAOF" and "(error) ERR
BGREWRITEAOF...", and the final explanatory block starting "BGREWRITEAOF gated
for this config...") and add the language tags: use ```text for the two
plain-text blocks and ```redis for the BGREWRITEAOF example so markdownlint
MD040 is satisfied.
| // P0-FIX-01b: refuse to start under the known durability bug | ||
| // (`shards >= 2 + appendonly yes` loses ~50 % of writes on SIGKILL, | ||
| // verified 2026-05-26 on HEAD `6e49050`; reproducer in | ||
| // `tmp/p0-no-rewrite.sh` and `tmp/p0-always.sh`). The bug is | ||
| // independent of `--appendfsync` and `--disk-offload` settings. An | ||
| // operator can override via `--unsafe-multishard-aof` if the | ||
| // deployment is cache-only and the loss window is acceptable. | ||
| if num_shards >= 2 && config.appendonly == "yes" && !config.unsafe_multishard_aof { | ||
| eprintln!( | ||
| "REFUSING TO START: --shards {num_shards} + --appendonly yes has a known data-loss \ | ||
| bug on SIGKILL (~50 % loss verified 2026-05-26). Fix: use --shards 1, or pass \ | ||
| --appendonly no for cache-only deployments, or pass --unsafe-multishard-aof to \ | ||
| acknowledge the risk and start anyway. See \ | ||
| docs/runbooks/multi-shard-aof-rewrite.md." | ||
| ); | ||
| std::process::exit(2); | ||
| } |
There was a problem hiding this comment.
Mirror this refusal in --check-config validation.
Line 143 returns from --check-config before Line 280 runs, so preflight can pass a config that real startup immediately refuses. Please enforce the same gate in the check_config branch.
Suggested patch
@@
if config.check_config {
+ if config.shards >= 2 && config.appendonly == "yes" && !config.unsafe_multishard_aof {
+ return Err(anyhow::anyhow!(
+ "--shards {} + --appendonly yes is refused unless --unsafe-multishard-aof is set (or use --shards 1 / --appendonly no)",
+ config.shards
+ ));
+ }
// Validate shard count is reasonable
if config.shards == 0 {
return Err(anyhow::anyhow!("--shards must be >= 1"));
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/main.rs` around lines 273 - 289, The --check-config path currently
returns before the multishard-AOF safety gate runs, so add the same refusal
logic used at startup into the check_config branch: detect the condition
(num_shards >= 2 && config.appendonly == "yes" && !config.unsafe_multishard_aof)
inside the check_config handling and print the identical error message and exit
non‑zero (or return an error) so preflight fails the same way real startup
would; use the same symbols/strings (num_shards, config.appendonly,
config.unsafe_multishard_aof) and the same message text used near the startup
gate to keep behavior consistent.
First implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Closes Hypothesis 2 of the
P0-INVEST-01 root cause: multi-part AOF replay is currently skipped
for num_shards >= 2 because there is no manifest structure that can
describe per-shard segments. This commit lays the foundation by
introducing a manifest v2 format that carries per-shard metadata; the
writer, replay, and lift-the-gate work follows in steps 2-9.
The change is purely additive at the file-system level — v1 manifests
continue to load as TopLevel single-shard with shard_id=0, no
in-place migration is triggered, and no behavior is altered for any
existing deployment. The escape-hatch gate
(--unsafe-multishard-aof) from commit ce05fa9 remains the load-bearing
safety net until step 9 lands.
New types
AofLayout { TopLevel, PerShard }
Discriminates v1 top-level layout from v2 per-shard layout.
A directory holds one layout exclusively — never a mix.
ShardManifest { shard_id: u16, max_lsn: u64 }
Per-shard entry. The max_lsn semantics are deliberately deferred
to step 3 (LSN tagging); until then it is always 0 and recovery
does not consult it. This avoids locking in an LSN namespace
contract before v0.2 S1.3 (REPLCONF ACK / WAIT) lands and
clarifies what LSN MEANS in the multi-shard AOF context.
AofManifest extensions
+ layout: AofLayout
+ shards: Vec<ShardManifest> // length == num_shards
+ initialize_multi(dir, num_shards) — v2 PerShard constructor
+ shard_dir / shard_base_path / shard_incr_path (+ _seq variants)
+ global_max_lsn() — computed accessor, not stored (per advisor's
note: a stored mirror invites drift with the per-shard records)
+ verify_shard_count(expected) — returns the exact RFC § 3 verbatim
error string ("ERR shard count changed (manifest=N, config=M)…")
so operator-facing wording is uniform across boot, BGREWRITEAOF,
and the migration tool.
+ is_legacy_top_level_layout(dir) — pure detection helper for
callers that want to decide whether to migrate. NOT called from
load() — side effects belong in explicit migrate_* methods.
+ migrate_top_level_to_per_shard() — in-place rename for RFC § 5
case 1 (single-shard v0.1.x → v2 single-shard). Idempotent.
Case 2 (legacy multi-shard with the gate engaged) ships in step
6 as the `moon migrate-aof` subcommand.
Manifest text format
v1 (unchanged, preserves backcompat):
seq <N>
base moon.aof.<N>.base.rdb
incr moon.aof.<N>.incr.aof
v2 (new):
version 2
seq <N>
shards <K>
shard 0 max_lsn <lsn0>
shard 1 max_lsn <lsn1>
...
Paths are derived from shard_id + seq rather than stored explicitly.
The layout is canonical, so a stored path could drift from the
computed location and silently shadow real files on disk.
Tests (9 new, in src/persistence/aof_manifest.rs tests_v2 module)
PASS v1_manifest_loads_as_top_level_single_shard
PASS v2_manifest_round_trips
PASS verify_shard_count_emits_rfc_error_verbatim
PASS migrate_top_level_to_per_shard_moves_files_and_rewrites_manifest
PASS global_max_lsn_returns_max_across_shards
PASS is_legacy_top_level_layout_detects_v1_files
PASS is_legacy_top_level_layout_returns_false_for_v2
PASS parse_v2_rejects_shard_count_mismatch_in_file
PASS parse_v2_rejects_non_contiguous_shard_ids
All 21 existing persistence::aof tests remain green. cargo check
(runtime-tokio,jemalloc) clean.
What this does NOT do (in scope for later steps)
Step 2 — per-shard AofWriter task; aof_tx becomes Vec<Sender>
Step 3 — LSN tagging in AofMessage::Append (after v0.2 S1.3)
Step 4 — Replace `Multi-part AOF skipped` skip branch (closes H2)
Step 5 — Cross-shard ordering merge (TXN + SCRIPT)
Step 6 — `moon migrate-aof` subcommand for case 2 migration
Step 7 — AppendSync rendezvous for appendfsync=always (closes H1)
Step 8 — CRASH-01-LITE matrix in tests/crash_matrix.rs
Step 9 — Lift --unsafe-multishard-aof gate
Refs
tmp/rfc-per-shard-aof-v02.md (RFC)
tmp/P0-INVEST-01-multishard-aof-rootcause.md (root cause)
PR #129 (P0 escape-hatch gate this work lifts)
author: Tin Dang
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/persistence/aof_manifest.rs`:
- Around line 669-775: migrate_top_level_to_per_shard and initialize_multi are
making AofLayout::PerShard visible before the rest of the I/O path
(replay_multi_part, manifest.base_path/incr_path, manifest.incr_path(),
manifest.advance()) understands per-shard locations; this causes subsequent
boots/writes to look in the wrong place. Fix by deferring setting self.layout =
AofLayout::PerShard (and any manifest persisted as PerShard via
write_manifest()) until after you have created/moved the per-shard files and
ensured callers will open the new paths: in migrate_top_level_to_per_shard move
the layout assignment to after the rename/create operations and only call
write_manifest() once layout is set; in initialize_multi avoid persisting a
PerShard manifest or exposing PerShard paths until all shard dirs/files are
created (set layout and call write_manifest() last). Alternatively, make
replay_multi_part, base_path(), incr_path(), manifest.incr_path(), and
manifest.advance() layout-aware so they resolve per-shard paths immediately;
pick one approach and apply it consistently.
- Around line 688-717: After renaming old_base→new_base and optionally
old_incr→new_incr, add a rollback guard so any subsequent error (including
write_manifest() failing) moves the files back and restores self.layout to
AofLayout::TopLevel; implement this by tracking that the base (and possibly
incr) have been moved and on any error attempt std::fs::rename(new_base,
old_base) and, if incr was moved, std::fs::rename(new_incr, old_incr) (or remove
created new_incr if it was created), then set self.layout = AofLayout::TopLevel
before returning the error. Ensure the guard runs for failures after the first
rename but not if everything succeeds (write_manifest() completes), and
reference the existing symbols new_base, new_incr, old_base, old_incr,
self.layout, and write_manifest() to locate where to add the rollback.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5a131377-fab2-4cd3-8f99-12ed9fc7f9ff
📒 Files selected for processing (1)
src/persistence/aof_manifest.rs
Second implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Step 2 is split into six sub-steps
(2a-2f) to keep the blast radius reviewable; this commit ships 2a.
2a is purely additive — a new public type and tests, zero call-site
changes. The pool's API mirrors the patterns the call sites already
use (try_send append, broadcast Shutdown), so steps 2c-2f reduce to a
mechanical type-plumbing pass.
New type
AofWriterPool {
senders: Vec<MpscSender<AofMessage>>,
layout: AofLayout,
}
Constructors:
top_level(sender) -> Arc<Self>
One sender; every shard multiplexes onto it. Used for legacy v1
deployments and `--shards 1` v2 deployments.
per_shard(senders) -> Arc<Self>
One sender per shard. senders[i] MUST be the writer task that
owns appendonlydir/shard-{i}/. debug_assert rejects a length-1
vector (use top_level instead).
Dispatch:
sender(shard_id) -> &MpscSender<AofMessage>
TopLevel: ignores shard_id, returns senders[0].
PerShard: returns senders[shard_id]. debug_assert on out-of-range.
try_send_append(shard_id, bytes)
Convenience for the `let _ = tx.try_send(AofMessage::Append(bytes))`
pattern at 12 call sites today. Fire-and-forget, matches current
hot-path semantics (H1 fix is step 7's AppendSync rendezvous).
try_send_rewrite(msg) -> Result<(), AofPoolSendError>
Only legal for TopLevel pools; PerShard rejects with
AofPoolSendError::RewriteUnsupportedInPerShard. BGREWRITEAOF in
the per-shard layout becomes a per-shard operation in step 6 —
the legacy single-writer rewrite enum variant has no meaning
once the writer is one-per-shard.
broadcast_shutdown()
Sends Shutdown to every writer. Used by orchestrated shutdown
in main.rs / embedded.rs (wired in step 2f).
New error type
AofPoolSendError {
RewriteUnsupportedInPerShard,
SendFailed,
}
Tests (5 new, in src/persistence/aof.rs pool_tests module)
PASS top_level_pool_routes_all_shards_to_writer_zero
PASS per_shard_pool_routes_each_shard_to_its_own_writer
PASS per_shard_pool_rejects_rewrite_with_explicit_error
PASS top_level_pool_accepts_rewrite
PASS broadcast_shutdown_reaches_every_writer
All 21 existing persistence::aof tests + 9 manifest tests from step 1
remain green (26 total in persistence::aof). cargo check + clippy
(runtime-tokio,jemalloc) clean.
What this does NOT do (in scope for later sub-steps)
Step 2b — per-shard writer task body (reads from
manifest.shard_incr_path(shard_id) for PerShard,
manifest.incr_path() for TopLevel)
Step 2c — type plumbing: aof_tx: Option<MpscSender> →
aof_pool: Option<Arc<AofWriterPool>> in conn_state.rs
and conn/core.rs
Step 2d — handler_monoio call sites use ctx.aof_pool.sender(ctx.shard_id)
Step 2e — handler_sharded call sites (same pattern)
Step 2f — spawn sites (main.rs, listener.rs, embedded.rs) build
the pool via top_level() or per_shard() based on layout
Refs
tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
tmp/P0-INVEST-01-multishard-aof-rootcause.md (H1/H2 root cause)
PR #129 (P0 escape-hatch gate this work lifts in step 9)
Commit 3bb4790 (step 1 — manifest v2 format)
author: Tin Dang
Third implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Adds the per-shard writer task body as
an additive function alongside the existing `aof_writer_task`. Zero
call sites changed in this commit — wiring lands in step 2f.
New function
per_shard_aof_writer_task(rx, base_dir, shard_id, fsync, cancel)
One instance is spawned per shard in PerShard layout. Each instance
owns appendonlydir/shard-{shard_id}/moon.aof.{seq}.incr.aof
exclusively, so there is no per-file locking. Mirrors the production
monoio path of the existing aof_writer_task (60s bounded wait for
manifest, hard fail on corrupt manifest, per-fsync-policy cadence).
Differences from aof_writer_task (TopLevel):
- Opens manifest.shard_incr_path(shard_id) instead of
manifest.incr_path(). Defensive `create_dir_all` of the parent
`shard-{N}/` directory in case a manual deletion or older binary
left it missing.
- Rejects Rewrite/RewriteSharded variants with a `warn!` and drops
the message. The legacy single-writer rewrite enum has no meaning
when each shard owns its own files; per-shard BGREWRITEAOF will be
a separate per-shard operation in a later step.
- Refuses to start if the loaded manifest's layout is TopLevel — the
spawn site (step 2f) must only invoke this body for PerShard
layouts. Layout mismatch is a programmer error and logs at error
level before exiting.
- Refuses to start if shard_id is out of range for the manifest's
`shards.len()` (defensive against config drift between manifest
write and writer spawn).
- Every log line includes `shard {shard_id}` so operators can map
log lines to filesystem state without ambiguity.
Both runtimes (runtime-tokio async I/O via tokio::fs + BufWriter +
tokio::select!, runtime-monoio sync I/O via std::fs in a blocking
recv loop) are covered with feature-gated blocks. The shape mirrors
aof_writer_task closely so future fixes to fsync handling or shutdown
flush can be applied uniformly to both functions.
What this does NOT do (in scope for later sub-steps)
Step 2c — type plumbing: aof_tx: Option<MpscSender> →
aof_pool: Option<Arc<AofWriterPool>> in conn_state.rs
and conn/core.rs
Step 2d — handler_monoio call sites use ctx.aof_pool.sender(ctx.shard_id)
Step 2e — handler_sharded / handler_single / blocking call sites
Step 2f — spawn sites (main.rs, listener.rs, embedded.rs) build the
pool via top_level()/per_shard() and spawn N
per_shard_aof_writer_task instances for PerShard layouts
Tests
No new tests in this commit. The function body mirrors the message
loop in aof_writer_task line-for-line (with the per-shard differences
above), which already has 21 unit tests covering Append, Rewrite, and
Shutdown handling. An end-to-end integration test that spawns N
writers, drives appends through them, kills the process, and verifies
per-shard files reload cleanly lands as an #[ignore]-by-default test
in tests/ alongside step 2f.
Verification
cargo check + cargo clippy clean on both feature combinations:
--no-default-features --features runtime-tokio,jemalloc
(defaults: runtime-monoio,jemalloc,graph,text-index)
All 21 existing persistence::aof tests + 5 pool tests from step 2a
+ 9 manifest tests from step 1 remain green (35 in persistence).
Refs
tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
tmp/P0-INVEST-01-multishard-aof-rootcause.md (H1/H2 root cause)
Commit 3bb4790 (step 1 — manifest v2 format)
Commit 5a546ff (step 2a — AofWriterPool type)
author: Tin Dang
…ack) Two reviewer-flagged bugs in the step 1 manifest work (commit 3bb4790): 1. base_path/incr_path/base_path_seq/incr_path_seq were NOT layout-aware 2. migrate_top_level_to_per_shard flipped self.layout = PerShard BEFORE any I/O succeeded and had no rollback for failures after the first rename Both verified against current code before fixing. A third reviewer suggestion (initialize_multi) was reviewed and skipped — see "Note on initialize_multi" below. Bug 1 — Layout-aware path helpers (replay/advance routed to wrong dir) ---------------------------------------------------------------------- Before: base_path(), incr_path(), base_path_seq(), incr_path_seq() unconditionally computed TopLevel paths (`appendonlydir/moon.aof.*`). After migrate_top_level_to_per_shard flips layout to PerShard, replay_multi_part (aof_manifest.rs:871, 895, 916) and advance() (lines 796, 821, 836-837) still asked these helpers for paths and got TopLevel locations — while the actual files now lived under shard-0/. Symptom: post-migration boot fails recovery with "AOF base RDB missing"; BGREWRITEAOF after migration writes new files to TopLevel locations the per-shard writer never reads. Fix: route PerShard layout through the existing shard_*_path_seq helpers, with debug_assert that shards.len() == 1 (these single-file helpers are by definition meaningful only for single-shard layouts; multi-shard PerShard callers MUST use shard_*_path[_seq] explicitly). Release builds fall back to shard-0 paths rather than panicking so production stays recoverable on a stale call site. No callers need to change — same signatures, layout-correct results. Bug 2 — Migrate rollback on partial failure ------------------------------------------- Before the fix, migrate_top_level_to_per_shard did: 1. self.layout = PerShard (line 689; in-memory flip) 2. create_dir_all(new_dir) (line 691; may fail) 3. rename(old_base → new_base) (line 708; may fail) 4. rename or create incr (lines 709-714; may fail) 5. write_manifest() (line 717; may fail) Only step 2's `!old_base.exists()` branch (lines 698-707) reset the layout flag on error. Any failure at steps 4 or 5 left the base file moved with no rollback AND left self.layout out of sync with the on-disk manifest (which still claimed v1 if write_manifest had not yet run, or claimed v2 with the wrong file locations if it had). Fix: defer the layout flip until everything on disk is in the new shape; explicit per-step rollback on every failure path: - rename(old_base) failure: nothing moved, plain ? return - rename(old_incr) or create(new_incr) failure: rename base back, return original error (rollback errors logged but do not mask the cause) - write_manifest() failure: revert layout flag, remove created incr or rename incr back, rename base back After this fix the migration is atomic from the loader's perspective: either everything is in shard-0/ AND the v2 manifest is on disk, or everything is at the top level AND the v1 manifest is on disk. No intermediate state survives a crash mid-migration. Note on initialize_multi ------------------------ The reviewer also flagged initialize_multi (lines 733-776) for the same "layout flipped before I/O" pattern. Verified — does NOT apply: initialize_multi constructs the struct with `layout: PerShard` in local scope only (no manifest on disk yet), creates all dirs/files via the shard_* helpers (which don't depend on self.layout), and calls write_manifest() LAST. Any failure aborts before any caller observes the half-built state. Orphan shard-{N}/ dirs left on disk on failure are harmless (next boot's load() returns Ok(None) and recovery treats as fresh init). Skipped — no change needed. Tests (3 new) base_incr_paths_route_to_shard_zero_after_migration Pre-migration: base_path() and incr_path() return TopLevel paths. Post-migration: they route to shard-0/ AND the file exists there. migrate_rolls_back_filesystem_when_incr_rename_fails Pre-creates shard-0/moon.aof.1.incr.aof as a DIRECTORY (rename onto a non-empty dir fails on every supported OS), forcing the rename after-base-already-moved path. Verifies: layout reverts to TopLevel, base file restored, base contents intact, on-disk manifest still v1. migrate_does_not_mutate_on_missing_base Pre-flight check path: layout never flips, no rollback needed, NotFound error surfaced. Verification 379 persistence tests pass on both feature combinations: --no-default-features --features runtime-tokio,jemalloc (defaults: runtime-monoio,jemalloc,graph,text-index) cargo clippy clean on both. cargo check clean on both. Refs Reviewer comments on aof_manifest.rs:669-775 and :688-717 Commit 3bb4790 (step 1 introduced the bugs) tmp/rfc-per-shard-aof-v02.md (RFC § 5 case 1 migration) author: Tin Dang
Fourth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Adds `aof_pool: Option<Arc<AofWriterPool>>`
to ConnectionContext as a **compat alias** alongside the existing
`aof_tx: Option<MpscSender<AofMessage>>`. Zero call-site behavior change.
Why compat alias (and not a single big-bang refactor)
-----------------------------------------------------
The aof_tx → aof_pool transition touches 16 call sites across 10 files
(handler_monoio, handler_sharded, handler_single, blocking,
command/persistence, shard/conn_accept, shard/event_loop, main, listener,
embedded), AND one of those sites carries a load-bearing correctness fix
for cross-shard routing (handler_sharded/mod.rs:1651 — owner shard must
be `target`, not `ctx.shard_id`, otherwise per-shard AOF writes land in
the wrong file).
Splitting plumbing from call-site migration:
- 2c (this commit) adds the field; ConnectionContext::new takes both
aof_tx and aof_pool; spawn sites build the pool via
AofWriterPool::top_level(tx). All four ConnectionContext::new call
sites in shard/conn_accept.rs updated. No behavior change — pool
just wraps the same single sender.
- 2d migrates handler_monoio + handler_monoio/dispatch +
handler_single + blocking.rs call sites (owner = ctx.shard_id /
shard_id / 0; all uncontroversial).
- 2e migrates handler_sharded + handler_sharded/dispatch +
command/persistence call sites. **Includes the cross-shard routing
fix at mod.rs:1651** (target, not ctx.shard_id) with the audit
table pasted into its commit body for posterity, plus removal of
the legacy aof_tx field.
Each commit compiles and tests green. Bisect remains useful because
the type system always has a consistent shape (both fields present
during 2c-2e, only pool present after 2e).
Pre-refactor audit (16 sites mapped to owner shard)
---------------------------------------------------
| Site | Owner shard |
|--------------------------------------------|-------------------|
| handler_sharded/mod.rs:1175 MOVE | ctx.shard_id |
| handler_sharded/mod.rs:1219 COPY | ctx.shard_id |
| handler_sharded/mod.rs:1430 local write | ctx.shard_id |
| handler_sharded/mod.rs:1651 x-shard reply | **target** |
| handler_sharded/dispatch.rs:356 BGREWRITEAOF | (Rewrite — pool rejects) |
| handler_monoio/mod.rs:486,1124,1189,1538,1937 | ctx.shard_id |
| handler_monoio/dispatch.rs:981 BGREWRITEAOF | (Rewrite — pool rejects) |
| handler_single.rs (5) | 0 |
| blocking.rs:1349 inline SET | shard_id (param) |
| command/persistence.rs:233,263 BGREWRITEAOF helpers | (Rewrite) |
| shard/conn_accept.rs + event_loop.rs | plumbing only |
Verified by reading the binding scope at each site:
- mod.rs:1175/1219 inside `if is_local` (line 1125) → home shard.
- mod.rs:1430 inside `if is_local` + write-path branch → home shard.
- mod.rs:1651 inside `for (meta, target) in reply_futures` where meta
was built per-target by remote_groups.entry(target).or_default()
(line 1610) — every entry's aof_bytes belongs to that target's shard.
- handler_monoio is shared-nothing per-shard; ctx.shard_id is the
handler's home shard which also owns the Database being mutated.
- blocking.rs::try_inline_dispatch takes shard_id as a parameter.
Changes in this commit
----------------------
src/server/conn/core.rs (ConnectionContext)
+ import AofWriterPool
+ aof_pool: Option<Arc<AofWriterPool>> (with #[allow(dead_code)]
explaining 2d/2e are the readers)
+ ConnectionContext::new signature gains aof_pool parameter
src/server/conn_state.rs (ConnectionContext — definition-only twin)
+ import AofWriterPool, mirror field for type-system consistency.
This struct is #[allow(dead_code)] at the struct level (Phase 44
placeholder, not constructed anywhere); no constructor changes.
src/shard/conn_accept.rs (4 ConnectionContext::new call sites)
At each site: compute `aof_pool = aof.as_ref().map(|tx|
AofWriterPool::top_level(tx.clone()))` and pass it into the new
parameter. Wrapping the same sender means pool.try_send_append(N, b)
is identical to tx.try_send(AofMessage::Append(b)) for any N — no
routing change yet.
Verification
cargo check + cargo clippy clean on both feature combinations:
--no-default-features --features runtime-tokio,jemalloc
(defaults: runtime-monoio,jemalloc,graph,text-index)
All 379 persistence tests remain green.
What this does NOT do (in scope for 2d/2e/2f)
Step 2d — migrate handler_monoio + handler_single + blocking sites
from ctx.aof_tx to ctx.aof_pool.as_ref().map(|p|
p.try_send_append(ctx.shard_id, bytes))
Step 2e — migrate handler_sharded sites INCLUDING the line 1651
target-routing fix; remove the legacy aof_tx field;
update command/persistence BGREWRITEAOF helpers to use
try_send_rewrite (with PerShard rejection)
Step 2f — spawn sites (main.rs, listener.rs, embedded.rs) detect
manifest layout and spawn N per_shard_aof_writer_task
instances wrapped in AofWriterPool::per_shard()
Refs
tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
tmp/P0-INVEST-01-multishard-aof-rootcause.md (H1/H2 root cause)
Commit 3bb4790 (step 1 — manifest v2 format)
Commit 5a546ff (step 2a — AofWriterPool type)
Commit 3afe21f (step 2b — per-shard writer task body)
Commit cb254ce (review fix — layout-aware paths + migrate rollback)
author: Tin Dang
Fifth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Migrates the 7 `ctx.aof_tx` usages in
`server/conn/handler_monoio/mod.rs` to `ctx.aof_pool`. Includes a
cross-shard routing correctness fix at line 1937 that the compat-alias
plumbing in step 2c made discoverable before it could ship as a silent
data-loss bug.
Routing fix at handler_monoio/mod.rs:1937
-----------------------------------------
The reanalysis triggered by step 2c surfaced that this site is structurally
identical to handler_sharded/mod.rs:1651 — both are the bottom of a
cross-shard reply loop where AOF append must land in the **target**
shard's writer, NOT `ctx.shard_id`.
Before: `let _ = tx.try_send(AofMessage::Append(bytes));` — `tx` is the
single top-level writer, so under TopLevel layout this was correct. Under
PerShard layout (step 2f and beyond) it would have written every
cross-shard write into the connection's home shard AOF, leaving the
target shard's AOF without the record and breaking per-shard recovery.
After: `pool.try_send_append(target, bytes);` where `target` is captured
per-batch when the remote_groups entry is drained.
Plumbing required to expose `target` in scope:
1. `oneshot_futures` declaration at line 1840 gained a leading
`usize` element (the target shard) — the type-system anchor
making the rest of the change mechanical.
2. The push at line 1884 captures `target` from the drain loop.
3. The polling loop at line 1892 destructures `(target, meta, reply_rx)`.
4. The AOF send inside the response-zip at line 1937 uses `target`.
Verified by reading the surrounding scope: `target` is bound in
`for (target, entries) in remote_groups.drain()` at line 1844, where
remote_groups was populated by `remote_groups.entry(target).or_default()`
during command classification — so every entry's aof_bytes belongs to
that target shard's data.
Other migrated sites in this commit
-----------------------------------
| Site | Owner shard | Pattern |
|------------------------|----------------|----------------------------------|
| mod.rs:1069 is_write | n/a | `aof_tx.is_some()` → `aof_pool.is_some()` |
| mod.rs:1124 MOVE | ctx.shard_id | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1189 COPY | ctx.shard_id | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1538 local write| ctx.shard_id | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1771 aof_bytes | n/a | `aof_tx.is_some()` → `aof_pool.is_some()` |
| mod.rs:1937 x-shard | **target** | `pool.try_send_append(target, _)` ← fix |
All four direct-append sites use `pool.try_send_append(owner, bytes)`
which returns `()` (fire-and-forget — back-pressure is intentional in
the AOF hot path; loss is bounded by the channel capacity already
chosen for the single writer). The `let _ =` wrapper from the tx form
is dropped along with the `AofMessage` import that is no longer
referenced at any call site in this file.
What this does NOT do (deferred to 2e)
--------------------------------------
handler_monoio/dispatch.rs:981 — BGREWRITEAOF still calls
`bgrewriteaof_start_sharded(tx, ...)` because the helper itself
takes `&MpscSender<AofMessage>`. Step 2e migrates the helper to
`pool.try_send_rewrite(msg)` (with PerShard rejection) and updates
this call site in the same commit.
handler_monoio/mod.rs:486 — still passes `&ctx.aof_tx` into
`try_inline_dispatch_loop` in blocking.rs. Step 2e flips the
parameter type alongside the body migration in blocking.rs and
handler_single.rs.
Compat-alias progress
---------------------
After this commit, ctx.aof_pool is the sole AOF interface in
handler_monoio's main dispatch loop. ctx.aof_tx remains as a field
because:
- dispatch.rs:981 (BGREWRITEAOF) still reads it
- mod.rs:486 (inline path) still reads it
- handler_sharded and handler_single haven't migrated yet
Step 2e removes the field entirely after the remaining 11 sites move.
Verification
------------
cargo check + cargo clippy clean on both feature combinations:
--no-default-features --features runtime-tokio,jemalloc
(defaults: runtime-monoio,jemalloc,graph,text-index)
Lib persistence tests:
tokio: 379 passed
monoio: 378 passed
(tokio/monoio diff is feature-gated; matches step 2c baseline.)
Integration tests (`tests/integration.rs`) fail to compile with
"missing field unsafe_multishard_aof" on 7 ServerConfig literals —
this is pre-existing (commit e0bb658 added the field but did not
update the test file), unrelated to step 2c/2d, and verified on the
branch tip without these changes via `git stash`.
Refs
----
tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
tmp/P0-INVEST-01-multishard-aof-rootcause.md (H1/H2 root cause)
Commit 5a546ff (step 2a — AofWriterPool type)
Commit 3afe21f (step 2b — per-shard writer task body)
Commit cb254ce (review fix — layout-aware paths + migrate rollback)
Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool)
author: Tin Dang
… 2e-α)
Sixth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Migrates the 5 direct `ctx.aof_tx` AOF-
append sites and 2 `is_some()` gates in `server/conn/handler_sharded/mod.rs`
to `ctx.aof_pool`. Includes the **canonical** cross-shard routing fix at
line 1651 that was the motivating P0 for this entire RFC.
Routing fix at handler_sharded/mod.rs:1651
------------------------------------------
This is the originally-discovered site (counterpart to the latent fix
shipped in step 2d for handler_monoio:1937). The cross-shard reply loop
already had `target` in scope at line 1646 — the loop variable from
`for (meta, target) in reply_futures` — so the change is mechanical:
Before: `if let Some(ref tx) = ctx.aof_tx { let _ = tx.try_send(AofMessage::Append(bytes)); }`
After: `if let Some(ref pool) = ctx.aof_pool { pool.try_send_append(target, bytes); }`
Why this matters: under TopLevel layout, a single writer absorbs every
append regardless of `target`, so the wrong-owner write was structurally
masked. Under PerShard (step 2f and beyond) each shard owns its own AOF
file, and a write that mutates target shard's data MUST land in target
shard's file — otherwise replay of target's AOF won't contain the
record and post-crash state diverges. This was the H1/H2 root cause in
P0-INVEST-01-multishard-aof-rootcause.md.
Other migrated sites in this commit
-----------------------------------
| Site | Owner shard | Pattern |
|------------------------|----------------|----------------------------------|
| mod.rs:1122 is_write | n/a | `aof_tx.is_some()` → `aof_pool.is_some()` |
| mod.rs:1123 aof_bytes | n/a | `aof_tx.is_some()` → `aof_pool.is_some()` |
| mod.rs:1175 MOVE | ctx.shard_id | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1219 COPY | ctx.shard_id | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1430 local write| ctx.shard_id | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1651 x-shard | **target** | `pool.try_send_append(target, _)` ← fix |
The `AofMessage` import is no longer referenced at any call site in
this file and is removed.
Scope split (subdivision of step 2e)
------------------------------------
The original 2c plan listed 2e as one big commit. To keep each step
green-on-both-runtimes and bisectable, 2e is split into 4 atomic commits:
2e-α (this commit) — handler_sharded/mod.rs only (mirrors 2d shape).
2e-β — command/persistence.rs BGREWRITEAOF helpers swap to
`&AofWriterPool` (with PerShard rejection translated to a
user-facing RESP error); both handler_*/dispatch.rs BGREWRITEAOF
call sites flip together.
2e-γ — handler_single.rs (6 sites, parameter type swap),
blocking.rs (2 fn signatures + 1 use), handler_monoio/mod.rs:486
(call site for the migrated blocking helper), and the 12
test call sites in server/conn/tests.rs.
2e-δ — Remove `aof_tx` field from ConnectionContext and conn_state.rs;
drop the parameter from `ConnectionContext::new`; simplify the
4 spawn sites in shard/conn_accept.rs.
Each commit compiles + clippy clean + lib persistence tests green on
both `runtime-monoio` and `runtime-tokio,jemalloc`. The compat-alias
field (`ctx.aof_tx` alongside `ctx.aof_pool`) introduced in step 2c
lets each commit flip its slice of call sites without breaking the
other consumers.
Verification
------------
cargo clippy clean on both feature combinations:
--no-default-features --features runtime-tokio,jemalloc
(defaults: runtime-monoio,jemalloc,graph,text-index)
Lib persistence tests:
tokio: 379 passed
monoio: 378 passed
(Diff is feature-gated; matches step 2c/2d baseline.)
Pre-existing tests/integration.rs breakage on
`unsafe_multishard_aof` ServerConfig field (commit e0bb658) remains
unrelated to this commit — verified via `git stash` in step 2d.
Refs
----
tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
tmp/P0-INVEST-01-multishard-aof-rootcause.md (H1/H2 — the bug 1651 fixes)
Commit 5a546ff (step 2a — AofWriterPool type)
Commit 3afe21f (step 2b — per-shard writer task body)
Commit cb254ce (review fix — layout-aware paths + migrate rollback)
Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool)
Commit a05f3d8 (step 2d — handler_monoio migration + latent routing fix)
author: Tin Dang
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/persistence/aof.rs`:
- Around line 858-930: The per-shard writer loop currently blocks on rx.recv()
and never checks cancel, so a cancellation-only shutdown can hang; modify the
loop around rx.recv() (the match handling AofMessage::{Append, Rewrite,
RewriteSharded, Shutdown}) to make cancellation reachable by either using a
non-blocking/timeout receive (e.g., try_recv/recv_timeout) or by selecting
between rx.recv() and checking the cancel flag (atomic or channel) before/after
the recv, breaking out when cancel is set; ensure you still perform the same
final flush/sync logic (the file.flush().and_then(|_| file.sync_data()) block
guarded by write_error) and preserve the metrics/fsync handling and warnings for
Rewrite/RewriteSharded.
- Around line 720-756: The per-shard AOF writer currently ignores errors from
writer.flush().await and writer.get_ref().sync_data().await in the EverySec,
Shutdown, and cancel branches (as well as the Always path already checked),
which can silently drop durability; update the branches handling interval.tick()
when fsync == FsyncPolicy::EverySec, the Ok(AofMessage::Shutdown) / Err(_)
shutdown branch, and the cancel.cancelled() branch to check the Result values
from flush() and sync_data(), log failures including shard_id and the error, and
surface a degraded state (e.g., return or set a tracer/metric) instead of
discarding errors—modify the calls around writer.flush().await and
writer.get_ref().sync_data().await and add error handling/logging similar to the
existing Append path handling.
In `@src/server/conn/handler_monoio/mod.rs`:
- Around line 1536-1541: The AOF is serializing the original client Frame
(`frame`) instead of the possibly workspace-rewritten command arguments
(`cmd_args`), causing persisted writes to use unprefixed keys; change the AOF
serialization for local writes to serialize the dispatched command (the same
representation used by `dispatch_frame`) by passing the rewritten command args
(or the dispatched command object) into `aof::serialize_command` before calling
`pool.try_send_append` so local writes persist the post-rewrite command; update
the block that checks `is_write`/`ctx.aof_pool` to use `cmd_args` (or the
dispatched command) rather than `frame`, keeping the
`pool.try_send_append(ctx.shard_id, ...)` call.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b9290671-bc3d-4bf2-9a53-f512eecb98c0
📒 Files selected for processing (6)
src/persistence/aof.rssrc/persistence/aof_manifest.rssrc/server/conn/core.rssrc/server/conn/handler_monoio/mod.rssrc/server/conn_state.rssrc/shard/conn_accept.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/persistence/aof_manifest.rs
| // AOF logging for successful local writes | ||
| if !matches!(response, Frame::Error(_)) && is_write { | ||
| if let Some(ref tx) = ctx.aof_tx { | ||
| if let Some(ref pool) = ctx.aof_pool { | ||
| let serialized = aof::serialize_command(&frame); | ||
| let _ = tx.try_send(AofMessage::Append(serialized)); | ||
| pool.try_send_append(ctx.shard_id, serialized); | ||
| } |
There was a problem hiding this comment.
Serialize the dispatched command, not the client frame, for local AOF writes.
Line 1539 still uses frame, but by this point cmd_args may already be workspace-rewritten. Remote writes handle this correctly with dispatch_frame; local writes will persist the unprefixed command and replay to the wrong key after restart.
Suggested fix
- if let Some(ref pool) = ctx.aof_pool {
- let serialized = aof::serialize_command(&frame);
+ if let Some(ref pool) = ctx.aof_pool {
+ let aof_frame = if rewritten.is_some() {
+ let mut parts = Vec::with_capacity(1 + cmd_args.len());
+ parts.push(Frame::BulkString(Bytes::copy_from_slice(cmd)));
+ parts.extend_from_slice(cmd_args);
+ Frame::Array(parts.into())
+ } else {
+ frame.clone()
+ };
+ let serialized = aof::serialize_command(&aof_frame);
pool.try_send_append(ctx.shard_id, serialized);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // AOF logging for successful local writes | |
| if !matches!(response, Frame::Error(_)) && is_write { | |
| if let Some(ref tx) = ctx.aof_tx { | |
| if let Some(ref pool) = ctx.aof_pool { | |
| let serialized = aof::serialize_command(&frame); | |
| let _ = tx.try_send(AofMessage::Append(serialized)); | |
| pool.try_send_append(ctx.shard_id, serialized); | |
| } | |
| // AOF logging for successful local writes | |
| if !matches!(response, Frame::Error(_)) && is_write { | |
| if let Some(ref pool) = ctx.aof_pool { | |
| let aof_frame = if rewritten.is_some() { | |
| let mut parts = Vec::with_capacity(1 + cmd_args.len()); | |
| parts.push(Frame::BulkString(Bytes::copy_from_slice(cmd))); | |
| parts.extend_from_slice(cmd_args); | |
| Frame::Array(parts.into()) | |
| } else { | |
| frame.clone() | |
| }; | |
| let serialized = aof::serialize_command(&aof_frame); | |
| pool.try_send_append(ctx.shard_id, serialized); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/server/conn/handler_monoio/mod.rs` around lines 1536 - 1541, The AOF is
serializing the original client Frame (`frame`) instead of the possibly
workspace-rewritten command arguments (`cmd_args`), causing persisted writes to
use unprefixed keys; change the AOF serialization for local writes to serialize
the dispatched command (the same representation used by `dispatch_frame`) by
passing the rewritten command args (or the dispatched command object) into
`aof::serialize_command` before calling `pool.try_send_append` so local writes
persist the post-rewrite command; update the block that checks
`is_write`/`ctx.aof_pool` to use `cmd_args` (or the dispatched command) rather
than `frame`, keeping the `pool.try_send_append(ctx.shard_id, ...)` call.
…tep 2e-β)
Seventh implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Swings `bgrewriteaof_start` and
`bgrewriteaof_start_sharded` over to `&AofWriterPool` and routes
through `pool.try_send_rewrite(...)`, which rejects under PerShard
layout with a stable user-facing RESP error. All three callers flip
together so the helpers stay strictly typed.
Why this matters
----------------
Step 2b shipped `per_shard_aof_writer_task` with PerShard rejection of
Rewrite/RewriteSharded messages (logged at `warn!`). Before this commit,
under PerShard layout BGREWRITEAOF would have:
1. Sent `AofMessage::RewriteSharded(...)` into shard-0's writer via
the legacy `tx.try_send(...)` path,
2. Received `Ok(())` (channel accepted the message),
3. Returned `+Background append only file rewriting started\r\n` to the
client,
4. The per-shard writer would warn and drop the message — no rewrite
happens.
That is a silent failure: the client thinks a rewrite is in progress
when nothing is actually happening, and the rewrite-in-progress flag is
stuck set. After this commit, `pool.try_send_rewrite(...)` returns
`RewriteUnsupportedInPerShard`, the helper clears the flag, and the
client receives an explicit error:
-ERR BGREWRITEAOF is not yet supported under per-shard AOF layout;
per-shard rewrite ships in step 6 of the per-shard AOF migration
(Under TopLevel layout — i.e. today — `try_send_rewrite` is a thin
pass-through, so behaviour is unchanged.)
Changes
-------
command/persistence.rs
- Both `bgrewriteaof_start` and `bgrewriteaof_start_sharded` now
take `pool: &AofWriterPool` instead of `&channel::MpscSender<AofMessage>`.
- New `rewrite_pool_error_frame(err: AofPoolSendError)` translates
pool failures into RESP errors (PerShard rejection → user-facing
"not yet supported"; channel send fail → existing "failed to start").
- `AOF_REWRITE_IN_PROGRESS` is still cleared on any send failure,
matching prior behaviour.
- Removed now-unused `crate::runtime::channel` import.
- Existing gate test `test_bgrewriteaof_sharded_refuses_under_unsafe_config`
updated to wrap the local sender as a `TopLevel` pool before
invoking the helper.
server/conn/handler_monoio/dispatch.rs:980
server/conn/handler_sharded/dispatch.rs:355
- BGREWRITEAOF dispatch path uses `ctx.aof_pool` (the field plumbed
in step 2c) instead of `ctx.aof_tx`. Behaviour identical under
TopLevel; gains PerShard rejection in step 2f.
server/conn/handler_single.rs:610
- Wraps the local `aof_tx` parameter as a transient
`AofWriterPool::top_level(tx.clone())` before calling the helper.
handler_single is single-shard mode by definition, so the writer
is always TopLevel — the wrapper is purely a type adapter.
BGREWRITEAOF is a manual admin command, not a hot path; the
transient allocation is acceptable. Step 2e-γ swaps the function's
`aof_tx` parameter to `aof_pool` and removes this wrapper.
server/conn/core.rs (ConnectionContext.aof_tx)
- Doc comment expanded to track the staged removal.
- `#[cfg_attr(not(feature = "runtime-monoio"), allow(dead_code))]`
silences clippy under tokio (where the only remaining reader is
`handler_monoio/mod.rs:486`, which is `#[cfg(feature = "runtime-monoio")]`).
Future regressions on monoio still trip a real dead-code warning.
What this does NOT do (deferred to 2e-γ)
---------------------------------------
- handler_single's 5 remaining `aof_tx` sites (SWAPDB at 658, AOF
drain at 881, WAL records at 1513, is_write at 1531, AOF drain at
2235). All keep using the local `aof_tx` parameter.
- handler_single function-parameter rename (`aof_tx` → `aof_pool`).
- blocking.rs `try_inline_dispatch` / `try_inline_dispatch_loop`
signatures + the AOF send at line 1349.
- handler_monoio/mod.rs:486 call site for the migrated blocking
helper.
- server/conn/tests.rs (12 call sites — straightforward None/Some
swaps once blocking.rs's signature flips).
What this does NOT do (deferred to 2e-δ)
---------------------------------------
- Remove the `aof_tx` field from ConnectionContext and conn_state.rs.
- Drop the parameter from `ConnectionContext::new`.
- Simplify the 4 spawn sites in shard/conn_accept.rs.
Verification
------------
cargo clippy clean on both feature combinations:
--no-default-features --features runtime-tokio,jemalloc
(defaults: runtime-monoio,jemalloc,graph,text-index)
Lib persistence tests:
tokio: 379 passed
monoio: 378 passed
Including the gate-refusal test that now exercises the pool path.
Refs
----
tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
Commit 5a546ff (step 2a — AofWriterPool type + try_send_rewrite)
Commit 3afe21f (step 2b — per-shard writer task body that rejects
Rewrite/RewriteSharded with warn!)
Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool)
Commit a05f3d8 (step 2d — handler_monoio migration + latent routing fix)
Commit eb90419 (step 2e-α — handler_sharded migration + canonical
routing fix at line 1651)
author: Tin Dang
…o aof_pool (Option B step 2e-γ)
Eighth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Drains the remaining `ctx.aof_tx` and
parameter-level `aof_tx` readers from the connection-handler layer:
- `blocking.rs::try_inline_dispatch` + `try_inline_dispatch_loop`:
parameter type changes from `&Option<MpscSender<AofMessage>>` to
`&Option<Arc<AofWriterPool>>`. The L1349 AOF append uses
`pool.try_send_append(shard_id, frozen)` — under PerShard layout this
routes to the shard that owns the data, fixing the same latent bug
class as 2d/2e-α (a TopLevel writer would absorb every shard's
inline SET regardless of routing).
- `handler_monoio/mod.rs:486`: flipped to pass `&ctx.aof_pool` into the
migrated blocking helper. After this commit no consumer reads
`ctx.aof_tx` under any feature combo.
- `handler_single.rs`: top of `handle_connection` constructs
`aof_pool: Option<Arc<AofWriterPool>>` from the inbound `aof_tx`
parameter via `AofWriterPool::top_level(tx.clone())`. All six
consumer sites (BGREWRITEAOF wrapper from 2e-β, SWAPDB WAL,
per-batch AOF drain at 905, per-batch AOF drain at 2260, GRAPH WAL
records at 1537, is_write/aof_bytes gate at 1556) now read
`aof_pool` instead of `aof_tx`. The `aof_tx` function parameter
survives as a placeholder for 2e-δ when listener.rs starts
constructing the pool itself.
- `server/conn/tests.rs`: 12 inline-dispatch test fixtures swap
`aof_tx: Option<MpscSender<AofMessage>>` for
`aof_pool: Option<Arc<AofWriterPool>>` and pass `&aof_pool` into
the migrated `try_inline_dispatch[_loop]`. The one Some-form
fixture (`test_inline_set_with_aof_falls_through_when_writes_disabled`)
wraps the local sender as a TopLevel pool.
Two send-style choices made deliberately
----------------------------------------
`AofWriterPool` exposes two send paths today: a fire-and-forget
`try_send_append(shard_id, bytes)` (returns `()`) and the lower-level
`sender(shard_id)` which returns the underlying `&MpscSender` for
callers that need the `Result` or want `send_async`. Most migrated
sites use `try_send_append`; the four exceptions are:
- SWAPDB at handler_single:677 keeps `sender(0).try_send(...).is_ok()`
because the swap MUST abort cleanly if the WAL enqueue fails (it
is the only durability hook before the in-memory swap). The
fire-and-forget helper silently drops; here we need the Result.
- The three `send_async(AofMessage::Append(...)).await` sites at
handler_single:909 / 1540 / 2266 keep `sender(0).send_async(...).await`
because their pre-pool code awaited capacity on a full channel
(back-pressure on the inbound write path). `try_send_append` would
drop instead. Preserving the semantics is more important than the
uniform call shape here — the per-shard pool exposes the same
sender under PerShard, so the semantics carry over in 2f.
ConnectionContext.aof_tx
------------------------
After this commit the field has no readers under either runtime. The
doc comment is updated to reflect the staged removal, and the
`cfg_attr(not(...))` gate from 2e-β collapses to a plain
`#[allow(dead_code)]` (the field is write-only — populated by the
constructor — until 2e-δ drops both the constructor parameter and
the field itself).
What this does NOT do (deferred to 2e-δ)
---------------------------------------
- Remove the `aof_tx` field from ConnectionContext + conn_state.rs.
- Drop the constructor parameter `aof_tx: Option<MpscSender<...>>`
from `ConnectionContext::new`.
- Simplify the 4 spawn sites in shard/conn_accept.rs (they currently
clone `aof` only to pass it as the field; once the field is gone
the field-assignment can go too).
- Replace the `aof_tx` parameter on handler_single's
`handle_connection` with `aof_pool` (and update listener.rs to
construct the pool itself).
Verification
------------
cargo clippy clean on both feature combinations:
--no-default-features --features runtime-tokio,jemalloc
(defaults: runtime-monoio,jemalloc,graph,text-index)
Lib persistence tests:
tokio: 379 passed
monoio: 378 passed
Inline dispatch tests (server::conn::tests): 11 passed
(covers GET hit/miss, multi-shard skip, SET inline, SET with AOF
fall-through, several malformed-input rejects).
Refs
----
tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
Commit 5a546ff (step 2a — AofWriterPool type)
Commit 3afe21f (step 2b — per-shard writer task body)
Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool)
Commit a05f3d8 (step 2d — handler_monoio migration + latent routing fix)
Commit eb90419 (step 2e-α — handler_sharded migration + canonical routing fix)
Commit 5735031 (step 2e-β — BGREWRITEAOF helpers via AofWriterPool)
author: Tin Dang
…step 2e-δ)
Ninth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). With handler_monoio (2d), handler_sharded
(2e-α), BGREWRITEAOF helpers (2e-β), and handler_single + blocking +
inline tests (2e-γ) all migrated to `AofWriterPool`, the compat-alias
`aof_tx` field on `ConnectionContext` has no remaining consumers. This
commit removes it, drops the parameter from `ConnectionContext::new`,
and simplifies the 4 spawn sites in `shard/conn_accept.rs` that no
longer need to clone `aof_tx` as an intermediate.
Changes
-------
src/server/conn/core.rs
- Remove `aof_tx: Option<MpscSender<AofMessage>>` field
(was `#[allow(dead_code)]` in step 2e-γ after the last reader left).
- Drop `aof_tx` parameter from `ConnectionContext::new`.
- Drop `aof_tx` from struct initializer.
- Doc-comment on `aof_pool` updated to reflect it as the sole AOF
interface (the "compat alias" framing from step 2c is now history).
- Remove unused `AofMessage` import.
src/server/conn_state.rs (definition-only placeholder twin)
- Mirror the same field removal + doc-comment update.
- Remove unused `AofMessage` import.
src/shard/conn_accept.rs (4 ConnectionContext::new spawn sites)
- Drop the intermediate `let aof = aof_tx.clone();` — the only
consumer was the constructor's removed parameter.
- Build the pool directly: `aof_pool = aof_tx.as_ref().map(...)`.
- Drop the `aof,` positional argument from each constructor call.
- Update the "2c compat alias" comment to point forward at the
layout-aware constructor in step 2f.
What this does NOT do (deferred to 2f)
-------------------------------------
- handler_single's `aof_tx` parameter on `handle_connection` — needs
listener.rs (the spawn site) to construct the pool itself first.
- Spawn-side AOF channel construction in main.rs, listener.rs, and
embedded.rs — they still build a single `MpscSender<AofMessage>`
and pass it through `aof_tx` chains. Step 2f introduces the
layout-aware `AofWriterPool::from_manifest(...)` that emits
`top_level(tx)` for TopLevel or `per_shard(senders)` for PerShard
and replaces the per-shard channel fanout in `shard/event_loop.rs`.
Verification
------------
cargo clippy clean on both feature combinations:
--no-default-features --features runtime-tokio,jemalloc
(defaults: runtime-monoio,jemalloc,graph,text-index)
Lib persistence tests:
tokio: 379 passed
monoio: 378 passed
Inline-dispatch tests (server::conn::tests): 11 passed.
End-state of step 2 (handler-layer migration)
---------------------------------------------
After this commit `aof_pool` is the sole AOF interface across:
- ConnectionContext (struct + constructor)
- handler_sharded (mod.rs + dispatch.rs)
- handler_monoio (mod.rs + dispatch.rs)
- handler_single (all internal sites; parameter still receives
`aof_tx` but is only used to bootstrap the pool)
- blocking.rs (try_inline_dispatch + try_inline_dispatch_loop)
- command/persistence.rs (BGREWRITEAOF helpers, with PerShard
rejection)
- server/conn/tests.rs (12 inline-dispatch fixtures)
The remaining `aof_tx` references in the tree:
- src/main.rs, src/server/embedded.rs, src/server/listener.rs
(spawn-side channel construction — 2f scope)
- src/shard/event_loop.rs (passes `aof_tx` through to conn_accept;
2f flips to per-shard pool construction)
- src/shard/conn_accept.rs (still receives `aof_tx: &Option<MpscSender>`
as parameter; 2f changes to `aof_pool: &Option<Arc<AofWriterPool>>`)
- src/server/conn/handler_single.rs (function parameter only;
bootstrap site for the local pool — 2f rename)
- src/persistence/aof.rs (channel type definitions — stable)
Refs
----
tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
Commit 5a546ff (step 2a — AofWriterPool type)
Commit 3afe21f (step 2b — per-shard writer task body)
Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool compat alias)
Commit a05f3d8 (step 2d — handler_monoio migration + latent routing fix)
Commit eb90419 (step 2e-α — handler_sharded migration + canonical routing fix)
Commit 5735031 (step 2e-β — BGREWRITEAOF helpers via AofWriterPool)
Commit ceac655 (step 2e-γ — handler_single + blocking + inline tests)
author: Tin Dang
…ig literals Commit e0bb658 added `unsafe_multishard_aof: bool` to `ServerConfig` (the P0 gate against multi-shard AOF data loss until per-shard replay lands) but did not update the 17 `ServerConfig { .. }` literals scattered across the integration-test suite. The tests have been failing to compile since then on both feature combinations. This commit backfills `unsafe_multishard_aof: false,` in all affected literals — preserving the production default (refuse the unsafe config at startup unless explicitly overridden). No test semantics change: the tests that exercise multi-shard configs already use single-shard storage layouts or `appendonly = "no"`, so the gate doesn't fire for them. Files touched (17 literals across 10 files) ------------------------------------------- tests/ft_search_multi_shard_as_of.rs tests/ft_search_temporal_parity.rs tests/integration.rs (7 sites) tests/kill_snapshot.rs tests/mq_integration.rs tests/replication_test.rs tests/txn_ft_search_snapshot.rs tests/txn_kv_wiring.rs tests/vacuum_commands.rs tests/workspace_integration.rs (2 sites) Verification ------------ cargo check --tests cargo check --tests --no-default-features --features runtime-tokio,jemalloc Both clean. Unblocks integration-test runs for the per-shard AOF migration commits (2a..2e-δ on origin) and any future PRs landing on this branch. Refs ---- Commit e0bb658 (origin of the unbackfilled field) Commit 6e49050 (docs noting the multi-shard AOF safety gate) tmp/rfc-per-shard-aof-v02.md (per-shard AOF migration scope) author: Tin Dang
…tep 2f-α)
Tenth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Closes out the handler-layer migration
sequence by lifting `AofWriterPool` construction to the three spawn
sites (`main.rs`, `server/listener.rs`, `server/embedded.rs`) and
retyping the connection-accept fan-out (`shard/event_loop.rs`,
`shard/conn_accept.rs`, `server/conn/handler_single.rs`) to thread
`Option<Arc<AofWriterPool>>` end-to-end. The compat-alias inline
construction that step 2c–2e-δ relied on (`let aof_pool = aof_tx
.as_ref().map(|tx| AofWriterPool::top_level(tx.clone()))`) is deleted
from every site.
After this commit, `aof_tx` no longer exists anywhere in `src/`. Grep
confirms zero matches under any feature combo.
Scope split: 2f-α vs 2f-β
-------------------------
This commit is strictly **type plumbing** — every writer pool is still
`AofLayout::TopLevel` wrapping a single sender. The layout-aware
constructor that reads `AofManifest` and emits PerShard pools (with
fan-out to N writer threads) lands as a follow-up commit (2f-β). The
RFC's "Step 2f" originally bundled both; separating them keeps the
diff bisectable and preserves the property that today's runtime
behavior is byte-identical to step 2e-δ.
Changes
-------
src/main.rs
- Import `AofWriterPool` alongside `AofMessage` + `FsyncPolicy`.
- Replace `let aof_tx: Option<MpscSender<AofMessage>>` with
`let aof_pool: Option<Arc<AofWriterPool>>`. Wrap the writer
sender via `AofWriterPool::top_level(tx)`.
- Rename per-shard clone `shard_aof_tx` → `shard_aof_pool` and the
matching positional argument in `Shard::run(...)`.
- Shutdown path: `tx.send(AofMessage::Shutdown)` →
`pool.broadcast_shutdown()`. Under TopLevel this is one try_send;
under PerShard (2f-β) it fans to every per-shard writer.
src/server/listener.rs
- Same pattern. `aof_tx` → `aof_pool: Option<Arc<AofWriterPool>>`,
wrapped at the construction site.
- Accept-loop captures `aof_pool_conn = aof_pool.clone()` (Arc
bump) and passes it as the `aof_pool` parameter of
`connection::handle_connection` (handler_single).
- Cancel-path shutdown switches to `pool.broadcast_shutdown()`
(note: `try_send`-based, not async — listener already drains
on the same runtime).
src/server/embedded.rs
- Mirror change: outer tuple now `(Option<Arc<AofWriterPool>>,
Option<JoinHandle>)`.
- Shutdown-ordering comment updated to reflect the pool-Drop
semantics — dropping the last `Arc` drops the pool, which drops
the underlying `Vec<MpscSender>`, which closes the channel. The
writer's `recv_async()` returns `Err(_)` and the task drains +
fsyncs + exits cleanly. This preserves Qodo bug #5's fix:
shards drop their clones before the outer pool, so the writer
never terminates while shards still have pending appends.
src/shard/event_loop.rs
- `Shard::run` signature: `aof_tx: Option<MpscSender<AofMessage>>`
→ `aof_pool: Option<Arc<AofWriterPool>>`.
- 9 internal pass-through sites (`&aof_tx` → `&aof_pool`) updated.
src/shard/conn_accept.rs
- 4 function signatures (`spawn_tokio_connection`,
`spawn_monoio_connection`, `spawn_monoio_tls_connection`,
`spawn_migrated_monoio_connection`): parameter
`aof_tx: &Option<MpscSender<AofMessage>>` →
`aof_pool: &Option<Arc<AofWriterPool>>`.
- 4 inline pool-construction blocks deleted (the compat-alias
`let aof_pool = aof_tx.as_ref().map(|tx| top_level(tx.clone()))`
pattern from step 2c). Replaced by a one-line Arc bump:
`let pool_for_ctx = aof_pool.as_ref().map(Arc::clone);`
passed positionally into `ConnectionContext::new(.., pool_for_ctx, ..)`.
src/server/conn/handler_single.rs
- Parameter `aof_tx: Option<MpscSender<AofMessage>>` →
`aof_pool: Option<Arc<AofWriterPool>>`.
- **DELETED** the step-2e-γ bootstrap block that wrapped the
inbound `aof_tx` as a TopLevel pool. The parameter IS the pool
now; the bootstrap was always a placeholder for this commit.
- Doc comment on `handle_connection` updated to reflect the
pool semantics (single-shard ⇒ always TopLevel).
What this does NOT do (deferred to 2f-β)
----------------------------------------
- Read `AofManifest` from disk in `main.rs`/`embedded.rs` to choose
between `top_level(...)` and `per_shard(senders)`.
- Spawn N writer threads when the on-disk manifest is `AofLayout::PerShard`.
- Add a manifest mismatch warning (manifest says PerShard but
constructed as TopLevel, or vice versa).
- Wire `per_shard_aof_writer_task` (already defined in step 2b)
into the spawn flow.
Today's runtime behavior is byte-identical to step 2e-δ. The only
observable change is: every site speaks the `AofWriterPool` API
instead of `MpscSender<AofMessage>`, which is a precondition for
2f-β shipping the PerShard fan-out without touching call sites again.
Verification
------------
cargo check on both feature combinations:
--no-default-features --features runtime-tokio,jemalloc clean
(defaults: runtime-monoio,jemalloc,graph,text-index) clean
cargo clippy -- -D warnings on both feature combinations: clean.
Lib persistence tests (full set, including the 5 pool_tests added
in step 2a):
tokio: 379 passed (baseline match)
monoio: 378 passed (baseline match)
cargo test --lib (full lib suite):
tokio: 2751 passed
monoio: pre-existing stack overflow in
`graph::cypher::parser::tests::test_nesting_depth_exceeded`
(verified on origin/HEAD without these changes — unrelated to
AOF migration).
Integration-test compile: clean on both combos after the parallel
test-fix commit `4fdd50f` (unsafe_multishard_aof backfill).
Net `aof_tx` references in src/
-------------------------------
Before this commit: 37 across 6 files.
After this commit: 0.
The full per-shard AOF refactor (steps 2a–2f-α) is now complete on
the handler + spawn layer. Step 2f-β (layout-aware fan-out) and step
3+ (LSN tagging, per-shard replay, cross-shard ordering, AppendSync,
crash matrix) are unblocked.
Refs
----
tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
Commit 5a546ff (step 2a — AofWriterPool type)
Commit 3afe21f (step 2b — per-shard writer task body)
Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool compat alias)
Commit a05f3d8 (step 2d — handler_monoio migration + latent routing fix)
Commit eb90419 (step 2e-α — handler_sharded migration + canonical routing fix)
Commit 5735031 (step 2e-β — BGREWRITEAOF helpers via AofWriterPool)
Commit ceac655 (step 2e-γ — handler_single + blocking + inline tests)
Commit d9a3651 (step 2e-δ — drop ConnectionContext.aof_tx field)
Commit 4fdd50f (test backfill — unsafe_multishard_aof field)
author: Tin Dang
…step 2f-β) Eleventh implementation step of the per-shard AOF RFC (Option B in tmp/rfc-per-shard-aof-v02.md). Replaces the unconditional TopLevel construction at `main.rs:312` (left in place by step 2f-α) with a read-only manifest peek + layout-aware spawn. When an on-disk manifest declares `layout == PerShard` AND `--shards >= 2`, main.rs now spawns one `per_shard_aof_writer_task` per shard and returns `AofWriterPool::per_shard(senders)` instead of the single-writer TopLevel pool. Scope: main.rs only ------------------- `embedded.rs` and `server/listener.rs` are deliberately untouched. Both run the tokio single-file legacy AOF path (`aof_writer_task` opens `<dir>/<appendfilename>`) and never engage the manifest by design — see the comment block at `embedded.rs:222-235`. Adding a PerShard branch in either would risk Qodo bug #3 (incr-only replay on the next boot silently dropping data). `listener.rs` is the tokio single-shard path: per-shard fan-out has no meaning with one shard, so it inherits TopLevel from `AofWriterPool::top_level(tx)` at the construction site. The new branching logic ----------------------- src/main.rs (L308-419) 1. If `appendonly == "yes"`: AofManifest::load(&base_dir) - Ok(Some(m)) → continue with existing manifest - Ok(None) → no manifest yet (fresh install) - Err(_) → **fatal exit (2)** with the same "refusing to start to avoid data loss" message used by the replay block at L514-526. Mirroring this is load-bearing: silently falling back to TopLevel on a corrupt manifest would let the next write create a fresh manifest that overwrites the reference to the real base RDB, losing data. 2. If a manifest was loaded: `verify_shard_count(num_shards as u16)`. Mismatch is fatal (exit 2) with the verbatim RFC § 3 error ("ERR shard count changed (manifest=N, config=M); refusing to start to avoid data loss. See docs/runbooks/shard-count-change.md"). 3. Spawn decision: use_per_shard = manifest.is_some() && manifest.layout == PerShard && num_shards >= 2 4. If `use_per_shard`: - for sid in 0..num_shards: (tx, rx) = channel::mpsc_bounded::<AofMessage>(10_000) thread `aof-writer-{sid}` running `per_shard_aof_writer_task(rx, base_dir, sid as u16, fsync, cancel)` push tx to senders - return `Some(AofWriterPool::per_shard(senders))` Else (existing TopLevel path): - single `aof-writer` thread running `aof_writer_task` against `<dir>/<appendfilename>` - return `Some(AofWriterPool::top_level(tx))` What this does NOT do (deferred) -------------------------------- - **Fresh-install PerShard creation.** `AofManifest::initialize()` still hardcodes TopLevel; nothing in main.rs constructs a PerShard manifest from scratch. The PerShard branch is therefore reachable only by: a) hand-crafting a v2 manifest (the smoke test below) b) future migration logic (RFC step 5/9 territory) Until then, runtime behavior under default configurations is byte-identical to step 2f-α. - **Multi-part AOF replay for multi-shard.** The replay block at `main.rs:528` still gates on `num_shards == 1`. Step 4 of the RFC closes this. A PerShard manifest with `num_shards >= 2` will spawn the writers correctly (smoke verified) and the writers will tail the existing incr files, but boot-time replay still warns "Multi-part AOF skipped in multi-shard mode". - **TopLevel→PerShard auto-migration.** `migrate_top_level_to_per_shard` exists in `aof_manifest.rs` (step 1) but is not wired into boot. - **AppendSync rendezvous, LSN tagging, cross-shard merge, CRASH-01 matrix.** Steps 3, 5, 7, 8 of the RFC. - **Lifting the `--unsafe-multishard-aof` gate.** Step 9. The L280 refusal still fires whenever `num_shards >= 2 && appendonly == "yes"` unless the operator explicitly opts in. Manual smoke verification ------------------------- Built `target/debug/moon` and ran four hand-crafted scenarios from `/tmp/moon-smoke-*` directories (cleaned up post-run): 1. **PerShard happy path.** Hand-wrote version 2 seq 1 shards 2 shard 0 max_lsn 0 shard 1 max_lsn 0 at `appendonlydir/moon.aof.manifest`, created shard-0/ and shard-1/ dirs. Started with moon --port 16399 --shards 2 --unsafe-multishard-aof --appendonly yes --dir <smoke> --appendfsync everysec Log output: "AOF enabled (PerShard, 2 writers, fsync: EverySec)" "AOF writer shard 0: seq 1, incr=<smoke>/appendonlydir/shard-0/moon.aof.1.incr.aof" "AOF writer shard 1: seq 1, incr=<smoke>/appendonlydir/shard-1/moon.aof.1.incr.aof" Both per-shard writer tasks reached their per-shard incr files. 2. **Shard-count mismatch.** Same manifest, started with `--shards 4`. Process exited 2 with verbatim: "REFUSING TO START: ERR shard count changed (manifest=2, config=4); refusing to start to avoid data loss. See docs/runbooks/shard-count-change.md" 3. **Corrupt manifest.** Wrote garbage at the manifest path, started with `--shards 1`. Process exited 2 with: "REFUSING TO START: AOF manifest at <dir>/appendonlydir/ is corrupt: AOF manifest at .../moon.aof.manifest has no valid sequence number. Inspect manually before deleting; overwriting silently loses data." 4. **TopLevel regression.** Fresh empty `--dir`, `--shards 1 --appendonly yes`. Log: "AOF enabled (TopLevel, fsync: EverySec)". `initialize()` wrote v1 manifest + seq 1 base/incr. Behavior identical to step 2f-α. Verification ------------ cargo check on both feature combinations: --no-default-features --features runtime-tokio,jemalloc clean (defaults: runtime-monoio,jemalloc,graph,text-index) clean cargo clippy -- -D warnings on both combinations: clean. Lib persistence tests: tokio: 379 passed (baseline match) monoio: 378 passed (baseline match) Refs ---- tmp/rfc-per-shard-aof-v02.md (RFC § 3 + § 4) Commit 5a546ff (step 2a — AofWriterPool type) Commit 3afe21f (step 2b — per_shard_aof_writer_task body) Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool) Commit a05f3d8 (step 2d — handler_monoio migration) Commit eb90419 (step 2e-α — handler_sharded migration) Commit 5735031 (step 2e-β — BGREWRITEAOF helpers) Commit ceac655 (step 2e-γ — handler_single + blocking + inline) Commit d9a3651 (step 2e-δ — drop ConnectionContext.aof_tx) Commit 4fdd50f (test backfill — unsafe_multishard_aof) Commit 8fd769c (step 2f-α — spawn-site type plumbing) author: Tin Dang
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/command/persistence.rs (1)
34-47:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winKeep the unsafe-gate message aligned with the actual gate.
The new docs/error text still says this refusal only applies when
--disk-offload enableis set, but the PR objective describes the startup/runtime gate as--shards >= 2plus--appendonly yes. IfMULTI_SHARD_AOF_REWRITE_UNSAFEis set under that broader condition, this response tells operators to try the wrong workaround (--disk-offload disable).Also applies to: 284-287
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/command/persistence.rs` around lines 34 - 47, The refusal message/documentation around MULTI_SHARD_AOF_REWRITE_UNSAFE is misleading: update the error/runbook text and any related doc comments (including the instance at the later lines ~284-287) to reflect the actual gate condition (shards >= 2 AND --appendonly yes) rather than suggesting it only applies when --disk-offload enable; search for and edit the strings emitted by bgrewriteaof_start_sharded and the public comment for MULTI_SHARD_AOF_REWRITE_UNSAFE to mention the correct combination (shards >= 2 + appendonly yes) and adjust suggested operator workarounds/runbook pointers accordingly.src/server/embedded.rs (1)
123-139:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winRefuse multi-shard AOF in
run_embeddedtoo.Lines 123-139 still start the legacy top-level AOF writer for any
appendonly == "yes", but this path never enforces the newshards >= 2 && !unsafe_multishard_aofgate. That leaves embedded deployments on the known write-loss path this PR is supposed to block. Please reuse the same startup validation here before spawning the writer.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/server/embedded.rs` around lines 123 - 139, The code unconditionally starts the top-level AOF writer when config.appendonly == "yes" without checking multi-shard safety; update the block that spawns the AOF writer in run_embedded to reuse the same startup validation used elsewhere: check the shard count (shards) and the unsafe_multishard_aof flag (or config.unsafe_multishard_aof) and refuse/return an error if shards >= 2 and unsafe_multishard_aof is false, before creating the channel, token, and calling AofWriterPool::top_level or spawning aof::aof_writer_task; ensure you mirror the exact error message/flow used in the other validation path so embedded deployments cannot start the legacy top-level writer in unsafe multi-shard configurations.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/server/conn/handler_sharded/mod.rs`:
- Around line 1122-1123: The current AOF serialization captures aof_bytes from
the original frame (variable frame) before workspace prefix injection, which
records client-visible keys and can replay to wrong shards; change the logic to
compute aof_bytes from the post-rewrite command used for execution—i.e.,
serialize the actual dispatched command (use dispatch_frame or the rewritten
cmd_args that include the {ws_id} injection) when is_write is true and
ctx.aof_pool.is_some(); keep the same is_write calculation
(metadata::is_write(cmd)) but ensure aof::serialize_command is called on the
final command used for dispatch/execution instead of the pre-injection frame so
AOF reflects the physical stored keys.
---
Outside diff comments:
In `@src/command/persistence.rs`:
- Around line 34-47: The refusal message/documentation around
MULTI_SHARD_AOF_REWRITE_UNSAFE is misleading: update the error/runbook text and
any related doc comments (including the instance at the later lines ~284-287) to
reflect the actual gate condition (shards >= 2 AND --appendonly yes) rather than
suggesting it only applies when --disk-offload enable; search for and edit the
strings emitted by bgrewriteaof_start_sharded and the public comment for
MULTI_SHARD_AOF_REWRITE_UNSAFE to mention the correct combination (shards >= 2 +
appendonly yes) and adjust suggested operator workarounds/runbook pointers
accordingly.
In `@src/server/embedded.rs`:
- Around line 123-139: The code unconditionally starts the top-level AOF writer
when config.appendonly == "yes" without checking multi-shard safety; update the
block that spawns the AOF writer in run_embedded to reuse the same startup
validation used elsewhere: check the shard count (shards) and the
unsafe_multishard_aof flag (or config.unsafe_multishard_aof) and refuse/return
an error if shards >= 2 and unsafe_multishard_aof is false, before creating the
channel, token, and calling AofWriterPool::top_level or spawning
aof::aof_writer_task; ensure you mirror the exact error message/flow used in the
other validation path so embedded deployments cannot start the legacy top-level
writer in unsafe multi-shard configurations.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 717300dd-c55e-4336-83a6-22aca7f3b2c6
📒 Files selected for processing (25)
src/command/persistence.rssrc/main.rssrc/server/conn/blocking.rssrc/server/conn/core.rssrc/server/conn/handler_monoio/dispatch.rssrc/server/conn/handler_monoio/mod.rssrc/server/conn/handler_sharded/dispatch.rssrc/server/conn/handler_sharded/mod.rssrc/server/conn/handler_single.rssrc/server/conn/tests.rssrc/server/conn_state.rssrc/server/embedded.rssrc/server/listener.rssrc/shard/conn_accept.rssrc/shard/event_loop.rstests/ft_search_multi_shard_as_of.rstests/ft_search_temporal_parity.rstests/integration.rstests/kill_snapshot.rstests/mq_integration.rstests/replication_test.rstests/txn_ft_search_snapshot.rstests/txn_kv_wiring.rstests/vacuum_commands.rstests/workspace_integration.rs
🚧 Files skipped from review as they are similar to previous changes (4)
- src/server/conn_state.rs
- src/main.rs
- src/server/conn/handler_monoio/mod.rs
- src/shard/conn_accept.rs
| let is_write = if ctx.aof_pool.is_some() || conn.tracking_state.enabled { metadata::is_write(cmd) } else { false }; | ||
| let aof_bytes = if is_write && ctx.aof_pool.is_some() { Some(aof::serialize_command(&frame)) } else { None }; |
There was a problem hiding this comment.
Serialize the rewritten command into AOF.
aof_bytes is captured from frame before workspace prefix injection, but both the local path (cmd_args) and the remote path (dispatch_frame) can execute the {ws_id}-prefixed form. In a workspace session this persists the client-visible key instead of the physical stored key, so AOF replay diverges from live state and can even route the write to the wrong shard.
Suggested fix
- let is_write = if ctx.aof_pool.is_some() || conn.tracking_state.enabled { metadata::is_write(cmd) } else { false };
- let aof_bytes = if is_write && ctx.aof_pool.is_some() { Some(aof::serialize_command(&frame)) } else { None };
+ let is_write = if ctx.aof_pool.is_some() || conn.tracking_state.enabled {
+ metadata::is_write(cmd)
+ } else {
+ false
+ };
+ let aof_bytes = if is_write && ctx.aof_pool.is_some() {
+ Some(match rewritten.as_deref() {
+ Some(rewritten_args) => {
+ let mut parts = Vec::with_capacity(1 + rewritten_args.len());
+ parts.push(Frame::BulkString(Bytes::copy_from_slice(cmd)));
+ parts.extend_from_slice(rewritten_args);
+ aof::serialize_command(&Frame::Array(parts.into()))
+ }
+ None => aof::serialize_command(&frame),
+ })
+ } else {
+ None
+ };🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/server/conn/handler_sharded/mod.rs` around lines 1122 - 1123, The current
AOF serialization captures aof_bytes from the original frame (variable frame)
before workspace prefix injection, which records client-visible keys and can
replay to wrong shards; change the logic to compute aof_bytes from the
post-rewrite command used for execution—i.e., serialize the actual dispatched
command (use dispatch_frame or the rewritten cmd_args that include the {ws_id}
injection) when is_write is true and ctx.aof_pool.is_some(); keep the same
is_write calculation (metadata::is_write(cmd)) but ensure aof::serialize_command
is called on the final command used for dispatch/execution instead of the
pre-injection frame so AOF reflects the physical stored keys.
…tep 3)
Threads a real `lsn: u64` through every AOF append site and prefixes each
PerShard on-disk entry with `[u64 lsn LE][u32 len LE]` ahead of the
RESP-encoded command, matching RFC § 2 Rule 1 wire format. TopLevel
writers continue to emit plain RESP — the framing change is gated on
layout, so legacy single-file deployments and the embedded/listener
tokio paths are unaffected.
LSN sourcing: a new `ReplicationState::issue_lsn(shard_id, delta)`
helper atomically advances both `shard_offsets[shard_id]` and
`master_repl_offset`, returning the master offset *before* the bump.
Existing `increment_shard_offset` delegates through it so call sites
that previously used the legacy helper are unchanged. AOF write sites
go through a new associated function
`AofWriterPool::issue_append_lsn(repl_state, shard_id, delta)` that
issues an LSN when replication state is configured and returns 0
otherwise — keeping standalone (no-replication) and replica startup
paths working without a behavioural change.
Wire-level changes:
- `AofMessage::Append(Bytes)` → `AofMessage::Append { lsn: u64, bytes: Bytes }`
- `AofWriterPool::try_send_append(shard_id, lsn, bytes)` (new lsn arg)
- TopLevel writer (tokio + monoio): destructures `{ bytes, lsn: _ }` —
ignores LSN, writes plain RESP exactly as before.
- PerShard writer: writes the 12-byte header then bytes; verified on
disk via `xxd` — shard 0 entries carry monotonically advancing LSNs
(0 → 0x69), shard 1 carries its own per-shard sequence (0x46).
Call-site fan-out (every place that constructs or dispatches
`AofMessage::Append`):
- `handler_monoio`, `handler_sharded`: 4 sites each, use
`AofWriterPool::issue_append_lsn`.
- `handler_single`, `blocking::try_inline_dispatch{,_loop}`: now take
`&Option<Arc<RwLock<ReplicationState>>>` so the inline AOF path can
source an LSN; 11 test sites updated to pass `&None` (Rust infers the
Option type from the slot).
- `drain_pending_appends` (rewrite path): keeps the lsn field, threads
it through the per-message destructure but never reads it because
rewrite output is the TopLevel base.rdb/incr.aof file.
Tests:
- 4 existing pool tests updated to the new signature.
- New `per_shard_pool_threads_lsn_field_to_each_writer` test verifies
the LSN survives the channel hop unmodified for each shard.
- Persistence tests: 379 pass under tokio, 379 under monoio (+1 each).
- Replication tests: 31 pass.
- Full lib tests (tokio): 2752 pass.
- Smoke test on a 2-shard server: PerShard manifest spawns 2 writers,
framed format verified on disk for both shards; TopLevel regression
smoke confirms plain RESP at offset 0 with no header bytes.
Rule 3 (single LSN issuance point) limitation — call out explicitly:
Step 3 ships the per-entry framing and monotonic per-shard LSN tagging
that step 4 (per-shard replay) requires. Strict Rule 3 alignment —
making the AOF LSN equal the per-shard replication backlog byte
position for the same write — is NOT achieved by this commit.
SPSC-routed writes hit both `master_repl_offset.fetch_add` at
`spsc_handler.rs:3017` (existing) and at the new AOF write site
(`AofWriterPool::issue_append_lsn`), so master advances twice per such
write. Fix is a single-LSN-issuance-point refactor in v0.2 replication
state; out of step 3 scope. Step 4 only depends on per-shard
monotonicity, which this commit provides and the smoke test confirms.
Refs: tmp/rfc-per-shard-aof-v02.md § 2, § 3
author: Tin Dang
…ap (Option B step 4)
Replaces the `warn!("Multi-part AOF skipped in multi-shard mode")` branch
in `main.rs` with a real per-shard replay path. With this, a `--shards
N`/`--appendonly yes` deployment that crashed and was restarted now
recovers all on-disk state instead of dropping it on the floor —
closing the P0 lying behind the `--unsafe-multishard-aof` gate.
Implementation:
- `aof_manifest::replay_incr_framed(databases, data, engine)` parses
the step-3 wire format `[u64 lsn LE][u32 len LE][RESP]` and returns
`(commands_replayed, max_lsn)`. Truncated headers and truncated
payloads are treated as crash-time EOF (parity with
`replay_incr_resp`); a header that fully declares a payload which
then fails to parse is escalated as corruption.
- `aof_manifest::replay_per_shard(per_shard_databases, manifest, engine)`
walks `manifest.shards` and for each shard loads `shard_base_path`
into that shard's `&mut [Database]` slice, then replays
`shard_incr_path` through `replay_incr_framed`. Per-shard work is
sequential for step 4 (cold-path correctness over throughput); a
parallel implementation is a future optimization once CRASH-01-LITE
soaks the sequential path.
- `ReplicationState::seed_master_offset(lsn)` uses `fetch_max` to bring
`master_repl_offset` up to the global AOF max-LSN before client
traffic is accepted. RFC § 2 Rule 3 — otherwise the next write would
reissue an LSN already present on disk and break the backlog merge.
Per-shard offsets are intentionally NOT seeded (issue_lsn advances
them on the first write; pre-seeding would double-count).
`main.rs` integration:
- The existing `if num_shards == 1` branch is unchanged (TopLevel and
single-shard PerShard both keep routing through `replay_multi_part`).
- New `else if manifest.layout == PerShard` branch clears each shard's
databases (same wipe-then-replay invariant as the single-shard arm),
walks `shards.split_first_mut()` to build a `Vec<&mut [Database]>`
without aliasing, calls `replay_per_shard`, seeds `repl_state` via
`seed_master_offset`, then retires any stray legacy
`appendonly.aof` so v2 recovery on next boot does not double-replay.
- A multi-shard config that finds a TopLevel manifest (operator did
not run `migrate-aof`) gets a loud warn — no silent skip, no replay,
unchanged from the previous skip behaviour but with an actionable
hint.
Tests (all under `tests_v2`, single-threaded due to a pre-existing
`temp_dir()` race in earlier tests in this module — flake is unrelated):
- `replay_incr_framed_decodes_lsn_and_resp` — two framed PING/DBSIZE
entries decode in order and return the correct max LSN.
- `replay_incr_framed_truncated_header_is_crash_eof` — partial header
trailing one good entry returns Ok(1, lsn).
- `replay_incr_framed_truncated_payload_is_crash_eof` — declared
payload longer than file returns Ok(0, 0).
- `replay_incr_framed_complete_but_corrupt_payload_errors` — full
payload that fails RESP parse escalates as an error.
- `replay_per_shard_round_trips_two_shards` — initialize_multi(2),
hand-write framed SETs per shard, replay through
`DispatchReplayEngine`, verify keys landed in their own DBs and
global_max_lsn == max(per-shard maxes).
- `replay_per_shard_rejects_shard_count_mismatch` — slice count
≠ manifest.shards.len() returns the verbatim error path.
Verification:
- `cargo check` (default monoio): clean.
- `cargo check --no-default-features --features runtime-tokio,jemalloc`: clean.
- `cargo clippy -- -D warnings` (both feature combos): zero warnings.
- `cargo test -p moon --lib persistence:: -- --test-threads=1`: 377 pass.
- New tests on tokio: 2/2 pass (`replay_per_shard_*`).
Out of scope (deferred to later steps per RFC § 8):
- Cross-shard ordering merge for TXN + SCRIPT (step 5).
- Two-phase rendezvous `AppendSync { bytes, ack }` for
`appendfsync=always` (step 7).
- CRASH-01-LITE end-to-end soak (step 8).
- Lifting the `--unsafe-multishard-aof` gate itself (step 9 — gated on
step 8 green).
Refs: tmp/rfc-per-shard-aof-v02.md § 2, § 4
author: Tin Dang
…B step 5)
Ships the framing + recovery infrastructure that lets a future
cross-shard TXN or replicated SCRIPT command be replayed atomically
across shards, per RFC § 2 Rule 2.
Wire-level encoding (zero impact on existing entries):
- `ORDERED_LSN_FLAG = 1 << 63` reserved as the per-entry OrderedAcrossShards
marker. Practical LSN ceiling even at 10 M writes/s for a century is
near 2^58, so reserving bit 63 has no observable effect on normal
writes — every entry produced by `try_send_append` keeps it clear.
- `AofWriterPool::try_send_append_ordered(shard_id, lsn, bytes)` is the
new producer entry point. It debug-asserts `lsn & FLAG == 0` and ORs
the flag into the LSN before queueing. Today's call sites: none in
production code; only `cfg(test)` exercises this path so the
round-trip is verified end-to-end before a real consumer wires in.
Recovery:
- `persistence::aof_manifest::OrderedEntry { shard_id, lsn, bytes }` is
the buffered representation a `replay_incr_framed` decode produces
when it sees the flag.
- `replay_incr_framed` gains `(shard_id, ordered_buf)` parameters. The
high bit is masked off before the LSN is stored in the buffer or
compared against `max_lsn`, so the buffer carries true LSNs. Inline
(non-ordered) entries continue to be dispatched immediately as
before.
- `replay_per_shard` now returns `(total, global_max_lsn,
Vec<OrderedEntry>)`. Ordered entries are deliberately NOT replayed
inline (per-shard ordering alone does not preserve cross-shard
atomicity).
- `replay_ordered_merge(per_shard_databases, entries, engine)` sorts
entries by LSN globally then dispatches each one to its origin
shard's databases. It also audits per-LSN cardinality and emits a
`warn!` when an LSN is unevenly represented across shards — the
forensic signal of a torn cross-shard commit. Detecting and rolling
back torn commits is out of scope for step 5 (no production emitter
yet to define those semantics).
main.rs integration:
- After per-shard replay finishes, the boot path calls
`replay_ordered_merge` if `ordered_entries` is non-empty. The
`DispatchReplayEngine` is reused so behaviour matches the inline
path. Empty buffer is the common case today (no emitter), so the
cost is one length check on the hot recovery path.
Tests (under `tests_v2`, single-threaded due to pre-existing temp_dir
race in earlier tests):
- `replay_incr_framed_buffers_ordered_entries` — mixed inline+ordered
stream: inline entries dispatch via the engine, ordered entries land
in the buffer with the high bit stripped, `max_lsn` reflects both.
- `replay_ordered_merge_sorts_by_lsn_across_shards` — three entries
spanning two shards, wire-order ≠ LSN-order: merge sorts then
dispatches to the correct shard databases.
- `replay_ordered_merge_empty_returns_zero` — empty buffer is Ok(0).
- `ordered_entry_lsn_flag_set_via_try_send_append_ordered` —
end-to-end round trip from `try_send_append_ordered` through the
channel back to a consumer observes the flag set and the low bits
preserved.
The four pre-existing step-4 tests were updated for the new
`replay_incr_framed` (shard_id + ordered_buf) and `replay_per_shard`
(3-tuple) signatures; their assertions are unchanged.
Verification:
- `cargo check` both feature combos: clean.
- `cargo clippy -- -D warnings` both feature combos: zero warnings.
- `cargo test persistence:: -- --test-threads=1`: 381 pass (was 377,
+4 new tests).
- `cargo test persistence::aof_manifest::tests_v2 --no-default-features
--features runtime-tokio,jemalloc -- --test-threads=1`: 22 pass.
Out of scope (deferred per RFC § 8):
- A real production emitter for ordered entries (gated on a future
cross-shard TXN command landing).
- Torn-commit rollback semantics (need the emitter's contract first).
- Two-phase rendezvous `AppendSync { bytes, ack }` (step 7).
- CRASH-01-LITE end-to-end soak (step 8).
- Lifting `--unsafe-multishard-aof` (step 9 — gated on step 8 green).
Refs: tmp/rfc-per-shard-aof-v02.md § 2 (Rule 2)
author: Tin Dang
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/persistence/aof_manifest.rs`:
- Around line 1198-1239: The parser may return a single RESP frame while leaving
trailing bytes in buf, causing extra garbage to be silently dropped; after
calling parse::parse(&mut buf, &config) in the block that builds buf from
data[payload_start..payload_end], check that buf.is_empty() (or that the number
of bytes consumed equals payload_end - payload_start) and if not return a
crate::error::MoonError::from(crate::error::AofError::RewriteFailed { detail:
... }) indicating "framed payload contains extra bytes" (include offset and
lsn), before calling engine.replay_command(databases, cmd, cmd_args, &mut
selected_db); this ensures any trailing garbage or extra RESP commands in the
payload are treated as corrupt rather than silently dropped.
- Around line 1179-1180: The code in the replay path uses try_into().expect() to
build fixed-size arrays for lsn and len; replace the expect-based conversions in
this block (the two lines that produce lsn and len) with stack-allocated
fixed-size buffers and copy_from_slice to avoid panics: create a [u8;8] buffer,
copy data[offset..offset+8] into it and call u64::from_le_bytes on that buffer
to produce lsn, and likewise create a [u8;4] buffer, copy
data[offset+8..offset+12] into it and call u32::from_le_bytes to produce len,
thereby preserving bounds-checked behavior without using expect()/unwrap().
In `@src/persistence/aof.rs`:
- Around line 819-823: The Append branch that builds the 12-byte header
currently casts data.len() to u32 which will truncate payloads >4GiB; add an
explicit check in the AofMessage::Append handling (the block that prepares
header and calls writer.write_all) to validate that data.len() <= u32::MAX and
return or propagate an error if it exceeds that limit (or alternatively
implement chunking before framing), update the header construction to only run
after the check, and apply the same guard to the other occurrence noted (the
similar block around lines 972–975) so no payload is ever narrowed silently.
- Around line 819-829: The writer loop that handles AofMessage::Append must not
just continue on writer.write_all failures because partial writes corrupt framed
entries; in the block handling AofMessage::Append (referencing
AofMessage::Append, writer.write_all, and shard_id) replace the `continue`
behavior with logic that stops the writer loop and surfaces or logs the error
(e.g., break/return from the task or close the writer and propagate the error)
so the shard stops producing further writes after a partial header or payload
write, preventing replay_incr_framed from encountering a truncated/partial
entry.
- Around line 192-204: The function issue_append_lsn currently takes
Option<Arc<std::sync::RwLock<ReplicationState>>> and uses .read().ok() to handle
std::sync::RwLock poisoning; change the signature to
Option<Arc<parking_lot::RwLock<ReplicationState>>> and update all call sites to
pass the parking_lot RwLock (modify where ReplicationState is wrapped in
src/replication/master.rs and src/replication/replica.rs). Replace the
.read().ok().map(...) chain with a direct .read().map(...)
(parking_lot::RwLock::read does not return a Result), and remove the
unwrap_or(0) poison-handling branch so the function simply maps through the
guard to call ReplicationState::issue_lsn(shard_id, delta as u64) or returns 0
when the Option is None as before.
In `@src/replication/state.rs`:
- Around line 119-166: Recovery is seeding master_repl_offset with the observed
start LSN, but issue_lsn() returns the pre-increment value so you must seed with
the next free offset to avoid reissuing an LSN; change the recovery code to
compute next_free = observed_start_lsn + payload_len (or the entry's end LSN)
for each recovered AOF entry and call seed_master_offset(next_free)
(seed_master_offset should keep using master_repl_offset.fetch_max(next_free,
Ordering::Relaxed)); ensure any place that currently passes the start LSN is
updated to pass the computed next_free so the master offset is advanced past
on-disk entries.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d996bceb-e894-4303-8aaa-5fb3f47f02e6
📒 Files selected for processing (9)
src/main.rssrc/persistence/aof.rssrc/persistence/aof_manifest.rssrc/replication/state.rssrc/server/conn/blocking.rssrc/server/conn/handler_monoio/mod.rssrc/server/conn/handler_sharded/mod.rssrc/server/conn/handler_single.rssrc/server/conn/tests.rs
🚧 Files skipped from review as they are similar to previous changes (4)
- src/server/conn/handler_sharded/mod.rs
- src/server/conn/handler_single.rs
- src/server/conn/tests.rs
- src/server/conn/handler_monoio/mod.rs
| let lsn = u64::from_le_bytes(data[offset..offset + 8].try_into().expect("8 bytes")); | ||
| let len = u32::from_le_bytes(data[offset + 8..offset + 12].try_into().expect("4 bytes")) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Avoid expect() in library replay code.
These two expect()s are on a hot library path and the bounds have already been checked, so this can stay panic-free with fixed-size buffers plus copy_from_slice().
As per coding guidelines, src/**/*.rs: No unwrap() or expect() in library code outside tests. Use pattern matching, if let, or let-else.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof_manifest.rs` around lines 1179 - 1180, The code in the
replay path uses try_into().expect() to build fixed-size arrays for lsn and len;
replace the expect-based conversions in this block (the two lines that produce
lsn and len) with stack-allocated fixed-size buffers and copy_from_slice to
avoid panics: create a [u8;8] buffer, copy data[offset..offset+8] into it and
call u64::from_le_bytes on that buffer to produce lsn, and likewise create a
[u8;4] buffer, copy data[offset+8..offset+12] into it and call
u32::from_le_bytes to produce len, thereby preserving bounds-checked behavior
without using expect()/unwrap().
| let mut buf = BytesMut::from(&data[payload_start..payload_end]); | ||
| match parse::parse(&mut buf, &config) { | ||
| Ok(Some(frame)) => { | ||
| let (cmd, cmd_args) = match &frame { | ||
| Frame::Array(arr) if !arr.is_empty() => { | ||
| let name = match &arr[0] { | ||
| Frame::BulkString(s) => s.as_ref(), | ||
| Frame::SimpleString(s) => s.as_ref(), | ||
| other => { | ||
| return Err(crate::error::MoonError::from( | ||
| crate::error::AofError::RewriteFailed { | ||
| detail: format!( | ||
| "AOF incr framed command at offset {} (lsn {}) has non-string name frame: {:?}", | ||
| offset, | ||
| lsn, | ||
| std::mem::discriminant(other) | ||
| ), | ||
| }, | ||
| )); | ||
| } | ||
| }; | ||
| (name as &[u8], &arr[1..]) | ||
| } | ||
| other => { | ||
| return Err(crate::error::MoonError::from( | ||
| crate::error::AofError::RewriteFailed { | ||
| detail: format!( | ||
| "AOF incr framed non-array frame at offset {} (lsn {}): {:?}", | ||
| offset, | ||
| lsn, | ||
| std::mem::discriminant(other) | ||
| ), | ||
| }, | ||
| )); | ||
| } | ||
| }; | ||
| engine.replay_command(databases, cmd, cmd_args, &mut selected_db); | ||
| count += 1; | ||
| if lsn > max_lsn { | ||
| max_lsn = lsn; | ||
| } | ||
| } |
There was a problem hiding this comment.
Reject extra bytes inside a framed payload.
After parse() returns one frame, buf can still contain trailing garbage or a second RESP command. This code still advances to payload_end, so replay silently drops those bytes instead of treating the entry as corrupt.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof_manifest.rs` around lines 1198 - 1239, The parser may
return a single RESP frame while leaving trailing bytes in buf, causing extra
garbage to be silently dropped; after calling parse::parse(&mut buf, &config) in
the block that builds buf from data[payload_start..payload_end], check that
buf.is_empty() (or that the number of bytes consumed equals payload_end -
payload_start) and if not return a
crate::error::MoonError::from(crate::error::AofError::RewriteFailed { detail:
... }) indicating "framed payload contains extra bytes" (include offset and
lsn), before calling engine.replay_command(databases, cmd, cmd_args, &mut
selected_db); this ensures any trailing garbage or extra RESP commands in the
payload are treated as corrupt rather than silently dropped.
| pub fn issue_append_lsn( | ||
| repl_state: &Option<Arc<std::sync::RwLock<crate::replication::state::ReplicationState>>>, | ||
| shard_id: usize, | ||
| delta: usize, | ||
| ) -> u64 { | ||
| repl_state | ||
| .as_ref() | ||
| .and_then(|rs| { | ||
| rs.read() | ||
| .ok() | ||
| .map(|g| g.issue_lsn(shard_id, delta as u64)) | ||
| }) | ||
| .unwrap_or(0) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Show the current implementation around issue_append_lsn
sed -n '160,240p' src/persistence/aof.rs | cat -n
# Locate ReplicationState definition and inspect its lock type usage
rg -n "struct ReplicationState|type ReplicationState|enum ReplicationState" -S src/replication || true
rg -n "ReplicationState" src/replication -S || true
# Find the issue_lsn method signature on ReplicationState
rg -n "fn issue_lsn" -S src/replication src || true
# Confirm whether std::sync::RwLock appears anywhere in these files
rg -n "std::sync::RwLock|std::sync::Mutex" -S src/persistence/aof.rs src/replication || trueRepository: pilotspace/moon
Length of output: 7319
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect imports / lock type usage in aof.rs
sed -n '1,120p' src/persistence/aof.rs | cat -n
# Show the per-shard writer implementation and error handling region
rg -n "PerShard|per[- ]shard writer|try_send_append|Append|write.*header|write.*body|framed|codec" src/persistence/aof.rs
# Pull a focused window around the per-shard writer loop (adjust based on matches)
# First, locate the most relevant loop by matching writer/task creation
rg -n "spawn|writer|task::|monoio::spawn|tokio::spawn|for .*in .*shards" src/persistence/aof.rs
# Also capture the exact area around issue_append_lsn callers (to see what lock type they pass)
rg -n "issue_append_lsn\\(" -S src/persistence/aof.rs
# Finally, check if any other std::sync locks exist in this file
rg -n "std::sync::(RwLock|Mutex)" src/persistence/aof.rsRepository: pilotspace/moon
Length of output: 19592
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the per-shard writer tokio error handling around header/body writes
sed -n '780,880p' src/persistence/aof.rs | cat -n
# Inspect the per-shard writer monoio error handling around header/body writes
sed -n '940,1045p' src/persistence/aof.rs | cat -n
# See how ReplicationState is guarded in replication/master.rs / imports (parking_lot vs std::sync)
rg -n "use .*RwLock" src/replication/master.rs src/replication/replica.rs src/replication/state.rs
sed -n '1,120p' src/replication/master.rs | cat -n
sed -n '1,120p' src/replication/replica.rs | cat -n
sed -n '1,220p' src/replication/state.rs | cat -n
# Check if std::sync::RwLock is used anywhere in replication module
rg -n "std::sync::RwLock|std::sync::Mutex" -S src/replication || true
# Check if parking_lot::RwLock is used in replication module
rg -n "parking_lot::RwLock|parking_lot::Mutex" -S src/replication || trueRepository: pilotspace/moon
Length of output: 32022
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find replay function / parsing logic for per-shard framed format
rg -n "replay_aof|parse.*u64 lsn|\\[u64 lsn|OrderedAcrossShards|AofMessage::Append|PerShard.*replay|seed_master_offset|corrupt" src/persistence -S
# Inspect replay implementation around the per-shard framing parsing and error handling
# (use narrower windows by first locating a likely function name)
rg -n "fn replay_|pub async fn replay_|replay_aof" src/persistence -S
# Show the most relevant region once we know the function line numbers
# We'll print a few candidate locations with line numbers.
# Candidate: any file under src/persistence/replay*.rs or src/persistence/*.rs with per-shard parse
fd -t f "replay" src/persistence --max-depth 2 || true
fd -t f "aof" src/persistence --max-depth 2 || true
# Use rg on likely file(s)
rg -n "OrderedAcrossShards|merge.*shard|lsn" src/persistence -S
# Finally, inspect the aof replay module/file(s) directly if found
# We'll pick the top few files reported by fd/replay searches.Repository: pilotspace/moon
Length of output: 50371
Replace std::sync::RwLock in issue_append_lsn with parking_lot::RwLock
issue_append_lsnhardcodesOption<Arc<std::sync::RwLock<ReplicationState>>>(src/persistence/aof.rs), extending std lock + poisoning semantics into the AOF append hot path.- Switch the signature and all call sites to
Option<Arc<parking_lot::RwLock<ReplicationState>>>(whereReplicationStateis currently wrapped withstd::sync::RwLockinsrc/replication/master.rsandsrc/replication/replica.rs), and remove the.read().ok()poison handling (parking_lot doesn’t poison).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof.rs` around lines 192 - 204, The function issue_append_lsn
currently takes Option<Arc<std::sync::RwLock<ReplicationState>>> and uses
.read().ok() to handle std::sync::RwLock poisoning; change the signature to
Option<Arc<parking_lot::RwLock<ReplicationState>>> and update all call sites to
pass the parking_lot RwLock (modify where ReplicationState is wrapped in
src/replication/master.rs and src/replication/replica.rs). Replace the
.read().ok().map(...) chain with a direct .read().map(...)
(parking_lot::RwLock::read does not return a Result), and remove the
unwrap_or(0) poison-handling branch so the function simply maps through the
guard to call ReplicationState::issue_lsn(shard_id, delta as u64) or returns 0
when the Option is None as before.
| Ok(AofMessage::Append { lsn, bytes: data }) => { | ||
| let mut header = [0u8; 12]; | ||
| header[..8].copy_from_slice(&lsn.to_le_bytes()); | ||
| header[8..].copy_from_slice(&(data.len() as u32).to_le_bytes()); | ||
| if let Err(e) = writer.write_all(&header).await { |
There was a problem hiding this comment.
Validate the framed payload length before narrowing to u32.
data.len() as u32 truncates above 4 GiB. If that ever happens, the stored header advertises a shorter payload than was actually written, and replay will misframe the rest of the AOF stream.
Also applies to: 972-975
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof.rs` around lines 819 - 823, The Append branch that builds
the 12-byte header currently casts data.len() to u32 which will truncate
payloads >4GiB; add an explicit check in the AofMessage::Append handling (the
block that prepares header and calls writer.write_all) to validate that
data.len() <= u32::MAX and return or propagate an error if it exceeds that limit
(or alternatively implement chunking before framing), update the header
construction to only run after the check, and apply the same guard to the other
occurrence noted (the similar block around lines 972–975) so no payload is ever
narrowed silently.
| Ok(AofMessage::Append { lsn, bytes: data }) => { | ||
| let mut header = [0u8; 12]; | ||
| header[..8].copy_from_slice(&lsn.to_le_bytes()); | ||
| header[8..].copy_from_slice(&(data.len() as u32).to_le_bytes()); | ||
| if let Err(e) = writer.write_all(&header).await { | ||
| error!("AOF header write error shard {}: {}", shard_id, e); | ||
| continue; | ||
| } | ||
| if let Err(e) = writer.write_all(&data).await { | ||
| error!("AOF write error shard {}: {}", shard_id, e); | ||
| continue; |
There was a problem hiding this comment.
Stop this writer after a framed write failure.
write_all() can fail after writing part of the header or payload. If the loop just continues, the next append can start in the middle of that broken entry, and replay_incr_framed() will drop the rest of the file as a truncated tail.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof.rs` around lines 819 - 829, The writer loop that handles
AofMessage::Append must not just continue on writer.write_all failures because
partial writes corrupt framed entries; in the block handling AofMessage::Append
(referencing AofMessage::Append, writer.write_all, and shard_id) replace the
`continue` behavior with logic that stops the writer loop and surfaces or logs
the error (e.g., break/return from the task or close the writer and propagate
the error) so the shard stops producing further writes after a partial header or
payload write, preventing replay_incr_framed from encountering a
truncated/partial entry.
| /// Atomically issue an LSN for a write and advance per-shard + | ||
| /// master replication offsets by `delta`. | ||
| /// | ||
| /// Returns the LSN that uniquely identifies this write — equal to the | ||
| /// value of `master_repl_offset` BEFORE the increment, mirroring Redis's | ||
| /// `+ delta - delta` semantics. The same LSN MUST tag the corresponding | ||
| /// `AofMessage::Append` entry and the replication backlog entry for that | ||
| /// write so per-shard AOF replay can rebuild a globally consistent log | ||
| /// (per-shard AOF RFC § 2 Rule 3). | ||
| /// | ||
| /// Atomicity caveat: the per-shard offset advance and the master offset | ||
| /// advance are TWO separate `fetch_add`s, not one composite op. Concurrent | ||
| /// callers across shards observe a brief window where the master sum | ||
| /// disagrees with the sum of shard offsets. Acceptable today because the | ||
| /// only `total_offset()` consumer is INFO replication, which tolerates | ||
| /// transient skew. Do not promote to a hard invariant without redesign. | ||
| /// | ||
| /// Returns 0 if `shard_id` is out of range (defensive; production callers | ||
| /// must pass a valid id). | ||
| pub fn issue_lsn(&self, shard_id: usize, delta: u64) -> u64 { | ||
| if shard_id >= self.shard_offsets.len() { | ||
| return 0; | ||
| } | ||
| self.shard_offsets[shard_id].fetch_add(delta, Ordering::Relaxed); | ||
| self.master_repl_offset.fetch_add(delta, Ordering::Relaxed) | ||
| } | ||
|
|
||
| /// Returns sum of all per-shard offsets. | ||
| pub fn total_offset(&self) -> u64 { | ||
| self.master_repl_offset.load(Ordering::Relaxed) | ||
| } | ||
|
|
||
| /// Seed `master_repl_offset` to at least `lsn` after AOF recovery. | ||
| /// | ||
| /// Per-shard AOF RFC § 2 Rule 3: after recovery reads the per-shard AOFs, | ||
| /// `master_repl_offset` MUST be at least the max LSN observed across all | ||
| /// shards before the server accepts client traffic. Otherwise the next | ||
| /// write would issue an LSN already present on disk, breaking the | ||
| /// `lsn → entry` uniqueness invariant the backlog merge depends on. | ||
| /// | ||
| /// Uses `fetch_max` so a concurrent in-flight increment (extremely | ||
| /// unlikely at boot, but free to guard against) cannot regress the value. | ||
| /// Per-shard offsets are intentionally NOT touched here — at boot they | ||
| /// are still 0, and seeding shard offsets to the per-shard AOF max would | ||
| /// double-count once the first write advances them via `issue_lsn`. | ||
| pub fn seed_master_offset(&self, lsn: u64) { | ||
| self.master_repl_offset.fetch_max(lsn, Ordering::Relaxed); | ||
| } |
There was a problem hiding this comment.
LSN recovery is seeding the wrong cursor.
issue_lsn() returns the pre-increment master offset, so 0 is a valid first on-disk LSN. But the recovery path only carries forward the max observed start LSN, which means the first post-restart append can reissue an existing LSN and break the lsn -> entry uniqueness invariant this API documents. Recovery needs the next free offset (for example max(lsn + payload_len)), not the max starting offset or a sentinel 0.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/replication/state.rs` around lines 119 - 166, Recovery is seeding
master_repl_offset with the observed start LSN, but issue_lsn() returns the
pre-increment value so you must seed with the next free offset to avoid
reissuing an LSN; change the recovery code to compute next_free =
observed_start_lsn + payload_len (or the entry's end LSN) for each recovered AOF
entry and call seed_master_offset(next_free) (seed_master_offset should keep
using master_repl_offset.fetch_max(next_free, Ordering::Relaxed)); ensure any
place that currently passes the start LSN is updated to pass the computed
next_free so the master offset is advanced past on-disk entries.
…tep 7)
Ships the H1 fix from the investigation report: the mechanism for
`appendfsync=always` to honour its durability contract end-to-end, so
the client `+OK` does not race the disk-side fsync.
API:
- New `AofMessage::AppendSync { lsn, bytes, ack }` variant carries a
`OneshotSender<AofAck>` alongside the same `(lsn, bytes)` payload as
the existing `Append`. The writer ALWAYS fsyncs and acks via this
variant, regardless of the configured `FsyncPolicy` — the caller has
signed the durability contract by choosing AppendSync over Append.
- `AofAck { Synced, WriteFailed, FsyncFailed }` reports the outcome.
`Synced` means `sync_data()` returned successfully and the entry is
on durable storage. The two failure variants are emitted from the
precise syscall that failed so callers can map back to a specific
client error.
- `AofWriterPool::try_send_append_sync(shard_id, lsn, bytes) ->
OneshotReceiver<AofAck>` is the caller entry point. The handler
awaits the receiver before responding to the client; if the
receiver resolves with `Err(RecvError)` (channel disconnect /
writer dead), the caller treats that as a hard failure too.
Writer-task integration (4 sites + 1 helper):
- TopLevel monoio (`aof_writer_task`): write → flush → sync_data →
ack. `write_error` sticky flag still gates subsequent writes; the
ack reports `WriteFailed` for both first-failure and follow-on.
- TopLevel tokio (`aof_writer_task`): same shape, async syscalls.
- PerShard tokio (`per_shard_aof_writer_task`): framed
`[u64 lsn LE][u32 len LE][RESP]` header + payload + fsync + ack.
- PerShard monoio (`per_shard_aof_writer_task`): same framed format,
blocking syscalls.
- `drain_pending_appends` (BGREWRITEAOF rewrite drain): bytes are
written and counted; the post-drain fsync at the rewrite boundary
covers durability, so the ack is `Synced`. On write error the `?`
bubbles up and the ack is dropped — caller observes `RecvError`.
Production call sites: NONE in step 7. The per-handler integration
(when to use AppendSync vs Append based on `FsyncPolicy::Always`) is
wired in step 9 prep before lifting the `--unsafe-multishard-aof`
gate. Step 7 ships only the mechanism + adversarial tests so step 8
(CRASH-01-LITE) and step 9 can build on a stable foundation.
Tests (under `pool_tests`):
- `try_send_append_sync_queues_appendsync_with_ack` — caller-side
`try_send_append_sync` queues an `AppendSync` with the correct lsn
and bytes; mocked writer acks `Synced`; receiver resolves with
`Synced`.
- `append_sync_writer_dropped_resolves_recv_error` — if the writer
drops the ack sender (death / disconnect / channel close), the
receiver resolves with `Err(RecvError)` rather than hanging.
- `append_sync_writer_reports_write_failed` — writer ack of
`WriteFailed` is propagated to the caller verbatim.
- `append_sync_writer_reports_fsync_failed` — same for `FsyncFailed`.
Verification:
- `cargo check` both feature combos: clean.
- `cargo clippy -- -D warnings` both feature combos: zero warnings.
- `cargo test persistence:: -- --test-threads=1`: 385 pass (was 381,
+4 new tests).
- `cargo test persistence::aof::pool_tests`: 10 pass.
Out of scope (per RFC § 8 dependency chain):
- Per-handler wiring of `try_send_append_sync` for `appendfsync=always`
(step 9 prep).
- CRASH-01-LITE end-to-end test exercising the rendezvous under SIGKILL
(step 8).
- Lifting `--unsafe-multishard-aof` (step 9 — gated on step 8 green).
Refs: tmp/rfc-per-shard-aof-v02.md § 4 (Fsync semantics)
author: Tin Dang
…oot PerShard spawn fix (Option B step 8) Ships the end-to-end crash-recovery validation per RFC § 7 and closes a P0 bug that step 8's red-green TDD uncovered: PerShard writers were NOT spawned on first boot, so a brand-new `--shards 2 --appendonly yes` deployment silently wrote plain RESP into shard-0's directory and lost all data on restart. Two changes in one commit because the test is the only thing that catches the spawn bug. ## Bug fix (main.rs spawn-site gate) Before: spawn decision keyed on `existing_manifest.layout == PerShard`. With no manifest on disk yet (first boot), `existing_manifest = None` so the TopLevel writer was chosen, even when `num_shards >= 2`. The TopLevel writer wrote plain RESP into whatever path `manifest.incr_path()` resolved to AFTER `initialize_multi` ran later in the boot sequence — which under PerShard always routes to shard-0. Result: all writes for all shards landed in `appendonlydir/shard-0/moon.aof.1.incr.aof` in plain RESP (no LSN header), shard-1's incr file was 0 bytes, and on restart the framed replay parser saw garbage LSN bytes and treated the whole file as truncated EOF → 0 keys recovered out of 200. Fix: `use_per_shard = num_shards >= 2 && (existing PerShard manifest OR no manifest yet)`. The "no manifest yet" branch covers first-boot and lines up with the existing `initialize_multi(num_shards)` call in the recovery block (added in step 8 a few hunks below — also new in this commit). Caught locally on commit b59ae4d before pushing CRASH-01-LITE. ## CRASH-01-LITE test (tests/crash_matrix_per_shard_aof.rs) Subset of the RFC § 7 matrix — "LITE" defers cross-shard TXN and BGREWRITEAOF interleaving to step 9 + future work. Scenario: `--shards 2 --appendonly yes --appendfsync everysec --unsafe-multishard-aof`, write 200 keys (alternating `{a}` / `{b}` hash tags so both shards populate), wait > 1s for the everysec fsync window, SIGKILL the process via `libc::kill(pid, SIGKILL)`, restart with same args, verify all 200 keys recovered with correct values. The `#[ignore]` gate keeps the test out of `cargo test` default runs — it needs a built `./target/release/moon` and `redis-cli` on PATH. Mirror of `scan_fanout_multishard.rs` conventions. Run explicitly with: cargo build --release --features runtime-monoio,jemalloc cargo test --release --test crash_matrix_per_shard_aof -- --ignored Stdout/stderr go to log files in the test dir (NEVER `Stdio::null()`) so a CI flake produces real diagnostics — see [[feedback_silenced_child_stdio_flake]]. ## Verification - `cargo check` both feature combos: clean. - `cargo clippy -- -D warnings` (library, both feature combos): zero warnings. Pre-existing warnings in unrelated test files (clippy --tests) are not introduced by this commit. - `cargo test persistence:: -- --test-threads=1`: 385 pass. - `cargo test --release --test crash_matrix_per_shard_aof -- --ignored`: **1 pass** (CRASH-01-LITE: 200/200 keys recovered after SIGKILL). - Manual disk inspection (`xxd appendonlydir/shard-N/moon.aof.1.incr.aof`): framed format `[u64 lsn LE][u32 len LE][RESP]` on both shards; shard-0 LSN=0x3E for k0, shard-1 LSN=0x1F for k1. ## Out of scope (per RFC § 8) - Per-handler integration of `try_send_append_sync` for `appendfsync=always` (step 9 prep). - Lifting `--unsafe-multishard-aof` (step 9 — gated on step 8 green, which it now is). - Adding `--appendfsync always` row to the matrix once step 9 wires the handler integration. - BGREWRITEAOF interleaving row (RFC § 6 — out of step 8 scope). Refs: tmp/rfc-per-shard-aof-v02.md § 7 author: Tin Dang
… — closes P0) The per-shard AOF pipeline (RFC steps 1-8, commits 5004f4e → b59ae4d) makes `--shards >= 2 + --appendonly yes` crash-safe. CRASH-01-LITE confirms 200/200 keys recover after SIGKILL on a 2-shard `everysec` deployment, with framed `[u64 lsn LE][u32 len LE][RESP]` entries on disk across both shards. The startup refusal that PR #129 introduced is no longer needed and is hereby lifted. ## Changes - `main.rs`: the P0-FIX-01b refusal block now only emits a one-line info notice if `--unsafe-multishard-aof` is set explicitly. The exit-2 path is gone. Multi-shard + appendonly deployments are permitted by default. - `--unsafe-multishard-aof` flag is preserved as a no-op so existing operator runbooks and CI command lines do not break. Removing it entirely is a future cleanup PR once dependents are audited. - `tests/crash_matrix_per_shard_aof.rs`: the test launches without the flag — exercising the default crash-safe path end-to-end. Still green: 200/200 recover after SIGKILL. ## Risk register (carried forward from RFC § 8) - **Rule 3 strict alignment** is NOT achieved (called out in step 3 commit body `e46dc4e`): SPSC-routed writes hit `master_repl_offset.fetch_add` twice — once at `spsc_handler.rs:3017` (existing replication path) and once at the new AOF write site (`AofWriterPool::issue_append_lsn`). Per-shard monotonicity holds and CRASH-01-LITE passes, but the master replication offset advances 2x per such write. Fix is a single-LSN-issuance-point refactor in v0.2 replication state. Lifting the gate does not regress this — the refusal block never enforced Rule 3. - **`appendfsync=always` handler integration**: step 7 shipped the `AppendSync` mechanism but no production call site uses it yet. With `appendfsync=always`, durability still depends on the everysec-style tick at the writer task. End-to-end fsync-before-ack on the always policy requires per-handler wiring; tracked as a v0.1.13 follow-up. CRASH-01-LITE deliberately uses `everysec` so this isn't a regression versus the pre-Per-Shard state. - **Cross-shard TXN / SCRIPT replay** is the empty-buffer case today (step 5 ships the scaffold; no production emitter). Lifting the gate does not introduce cross-shard atomicity — moon's TXN/SCRIPT remain single-shard local operations. - **BGREWRITEAOF in PerShard layout** is still gated (separately) by `MULTI_SHARD_AOF_REWRITE_UNSAFE` in `main.rs:430`. That's RFC step 6 scope (deferred when the original 9-step plan dropped step 6) and is orthogonal to this lift. Disabling `--disk-offload` re-enables the legacy rewrite path. ## Verification - `cargo check` (default monoio + tokio + jemalloc): clean. - `cargo clippy -- -D warnings` (both feature combos): zero warnings. - `cargo test persistence:: -- --test-threads=1`: 385 pass. - `cargo build --release` + `cargo test --release --test crash_matrix_per_shard_aof -- --ignored`: 1 pass, all 200 keys recovered, no `--unsafe-multishard-aof` in launch command. - Manual: `xxd appendonlydir/shard-N/moon.aof.1.incr.aof` confirms framed `[u64 lsn LE][u32 len LE][RESP]` on both shards after a default-config run. ## RFC closure This closes the Option B plan from `tmp/rfc-per-shard-aof-v02.md`: | Step | Commit | Status | |------|--------|--------| | 1: AofManifest PerShard layout | (pre-existing) | done | | 2: Per-shard AofWriter task | (pre-existing) | done | | 2b: Writer task body | (pre-existing) | done | | 2c: aof_tx → aof_pool plumbing | d9a3651 | done | | 2d: handler_monoio sites | (pre-existing) | done | | 2e: handler_sharded/single/blk | ceac655 | done | | 2f: layout-aware spawn | 5004f4e | done | | 3: per-entry LSN framing | e46dc4e | done | | 4: per-shard replay | b59ae4d | done | | 5: OrderedAcrossShards scaffold | adf151d | done | | 6: migrate-aof tool | - | deferred (not needed; first-boot path covered) | | 7: AppendSync rendezvous | (this batch) | mechanism done; integration v0.1.13 | | 8: CRASH-01-LITE | (this batch) | green | | 9: lift gate | (this commit) | done | Refs: tmp/rfc-per-shard-aof-v02.md § 8 author: Tin Dang
Close P0 H1 (in-flight loss under appendfsync=always). The mechanism
landed in step 7 (`AofMessage::AppendSync` + `AofWriterPool::
try_send_append_sync`), but the production write paths still used the
fire-and-forget `try_send_append`, so `+OK` returned before the per-shard
writer fsynced. A SIGKILL between accept and the everysec tick lost
in-flight entries — the exact symptom reported in
tmp/P0-INVEST-01-multishard-aof-rootcause.md.
This patch threads the durable-send through every production write call
site and validates the closure with a SIGKILL crash matrix.
Changes
-------
src/persistence/aof.rs
- `AofWriterPool` gains a `fsync_policy: FsyncPolicy` field and the
`fsync_policy()` accessor.
- New `try_send_append_durable(shard_id, lsn, bytes)` async helper:
* Always → routes via `try_send_append_sync` and awaits the
writer's ack; returns `Err(AofAck)` on failure.
* EverySec → fire-and-forget via `try_send_append`; returns `Ok`.
* No → same as EverySec.
- Construction now goes through `top_level_with_policy` /
`per_shard_with_policy`; the old constructors are retained as thin
wrappers that default to `EverySec` for crate-internal tests.
src/server/conn/handler_monoio/mod.rs
src/server/conn/handler_sharded/mod.rs
- All 4 write call sites per handler (MOVE, COPY..DB, general write
path, cross-shard dispatch) replace
`pool.try_send_append(shard_id, lsn, bytes)`
with
`pool.try_send_append_durable(target, lsn, bytes).await`.
- On `Err` the response is replaced with
`-ERR AOF fsync failed; write not durable`
so the client never sees `+OK` for a non-durable write.
- The local-shard response binding becomes `mut response` to allow the
override on AOF failure.
src/server/conn/blocking.rs
- `try_inline_dispatch` is synchronous and cannot await the writer's
ack. Under `appendfsync=always` it now bails out for `*3` (SET-shape)
frames, forcing the write through the async dispatch path which IS
H1-integrated. GETs continue to inline. Single-effect cost: ~20 ns
of policy-load on every SET, paid only when Always is configured.
src/server/conn/handler_single.rs
- The three `send_async(AofMessage::Append { ... })` sites (batch
subscribe flush, GRAPH.* WAL records, main batch flush) now call
`try_send_append_durable(0, lsn, bytes).await`.
- NOTE: single-shard handler still flushes client responses BEFORE
the AOF batch (pre-existing ordering bug). Always semantics in this
path are partial — multi-shard handlers (handler_monoio /
handler_sharded) DO enforce pre-response durability. Tracked as a
follow-up; out of scope for the multi-shard PR.
src/server/embedded.rs
src/server/listener.rs
src/main.rs
- Updated spawn sites to use `top_level_with_policy(tx, fsync)` and
`per_shard_with_policy(senders, fsync)` so the pool's policy field
reflects the configured `appendfsync`.
tests/crash_matrix_per_shard_aof.rs
- Refactored `start_moon` to delegate to `start_moon_with_fsync(port,
dir, fsync)`. The existing everysec test is unchanged.
- New `crash_01_lite_always_per_shard_aof_recovers_after_sigkill`:
* `--shards 2 --appendonly yes --appendfsync always`
* 200 SET commands (hash-tagged across both shards)
* SIGKILL with NO quiescing sleep
* restart → assert 100% recovery (every +OK observed implies fsync)
Verification
------------
cargo clippy --lib -- -D warnings # clean
cargo clippy --lib --no-default-features
--features runtime-tokio,jemalloc -- -D warnings # clean
cargo test --lib persistence:: -- --test-threads=1 # 385/385
cargo build --release --features runtime-monoio,jemalloc # ok
cargo test --release --features runtime-monoio,jemalloc
--test crash_matrix_per_shard_aof
-- --ignored --test-threads=1 # 2/2 pass
└── crash_01_lite_per_shard_aof_recovers_after_sigkill (everysec)
└── crash_01_lite_always_per_shard_aof_recovers_after_sigkill (always)
Closes the multi-shard AOF PR scope. H2 (skipped multi-part replay for
num_shards >= 2) was closed structurally in step 4 + main.rs replay
wiring; H1 (fire-and-forget ack) is now closed by this commit's handler
integration plus the validating crash matrix row.
author: Tin Dang
CI Feedback 🧐A test triggered by this PR failed. Here is an AI-generated analysis of the failure:
|
Summary
Three commits on this branch:
BGREWRITEAOFrefusal + startup refusal for--shards >= 2 + --appendonly yes, with--unsafe-multishard-aofescape hatch. Empirical loss matrix below.tmp/rfc-per-shard-aof-v02.mdthat eventually lifts the gate in step 9.Why the gate (commit 1)
Empirical re-verification on
HEAD 6e49050(2026-05-26) found the durability bug is in the multi-shard AOF path itself, not the rewrite path the older bug memory blamed:--shards 1 --appendonly yes --appendfsync always(control)--shards 1 --disk-offload enable --appendonly yes(control)--shards 2 --disk-offload enable --appendonly yes(BGREWRITEAOF + SIGKILL)--shards 2 --disk-offload enable --appendonly yes(plain SIGKILL)--shards 2 --disk-offload enable --appendonly yes --appendfsync always--shards 2 --disk-offload disable --appendonly yes --appendfsync alwaysRoot cause investigation in
tmp/P0-INVEST-01-multishard-aof-rootcause.mdidentified two complementary bugs:src/main.rs:562-563literally skips multi-part AOF replay fornum_shards >= 2. Closed by step 4.try_send(AofMessage::Append)is fire-and-forget;+OKreturns before the writer thread fsyncs; channel buffer is lost on SIGKILL. Closed by step 7.Why step 1 lands here (commit 3)
The decision tree:
AofManifesthas no place to describe per-shard segments. Step 1 introduces that structure additively.Strictly additive at the file-system level:
AofLayout::TopLevelsingle-shard (shard_id=0).--unsafe-multishard-aofgate from commit 1 remains the load-bearing safety net until step 9.What step 1 adds
AofLayout { TopLevel, PerShard }discriminator.ShardManifest { shard_id: u16, max_lsn: u64 }—max_lsnsemantics deferred to step 3.AofManifest.layout+AofManifest.shards: Vec<ShardManifest>.initialize_multi(dir, num_shards)— v2 PerShard constructor.shard_dir,shard_base_path[_seq],shard_incr_path[_seq]— per-shard path helpers.verify_shard_count(expected)— returns the verbatim RFC § 3 error.is_legacy_top_level_layout(dir)— pure detection (no side effects).migrate_top_level_to_per_shard()— explicit in-place rename for RFC § 5 case 1; idempotent.global_max_lsn()— computed accessor, not stored (avoids drift with the per-shard records).Manifest text format
v1 (unchanged, full backcompat):
v2 (new):
Paths are derived from
shard_id + seqrather than stored explicitly — the layout is canonical, so a stored path could drift from the computed location.Test plan
test_bgrewriteaof_sharded_refuses_under_unsafe_configcovers gate-on + gate-off paths.--shards 1 + AOFstarts cleanly--shards 2 + AOF + --unsafe-multishard-aofstarts--shards 2 + --appendonly nostarts--shards 2 + AOFwithout escape hatch (exit code 2)aof_manifest.rs tests_v2module):v1_manifest_loads_as_top_level_single_shardv2_manifest_round_tripsverify_shard_count_emits_rfc_error_verbatimmigrate_top_level_to_per_shard_moves_files_and_rewrites_manifestglobal_max_lsn_returns_max_across_shardsis_legacy_top_level_layout_detects_v1_filesis_legacy_top_level_layout_returns_false_for_v2parse_v2_rejects_shard_count_mismatch_in_fileparse_v2_rejects_non_contiguous_shard_idspersistence::aoftests still green; librarycargo check --no-default-features --features runtime-tokio,jemallocclean.#[ignore]tests (step 8 of the RFC).Operator impact
--shards >= 2 + --appendonly yesdeployments fail to start after upgrade. Error message is actionable: pick--shards 1,--appendonly no, or--unsafe-multishard-aof. Runbook walks each option.--appendonly no(any shard count) unaffected.Next steps on this branch
Per the RFC implementation table (16 days + 1 wk soak):
AofWritertask;aof_tx: Vec<Sender>AofMessage::AppendMulti-part AOF skippedskip branch (closes H2)moon migrate-aofsubcommand (RFC § 5 case 2)AppendSync { bytes, ack }rendezvous (closes H1)tests/crash_matrix.rs--unsafe-multishard-aofgateSummary by CodeRabbit
New Features
Documentation
Tests