Skip to content

[Feature] Add Distributed Posting Router for SPANN#448

Open
TerrenceZhangX wants to merge 27 commits into
users/qiazh/pre-merge-tikv-bugfixfrom
users/zhangt/merge-distributed-to-tikv
Open

[Feature] Add Distributed Posting Router for SPANN#448
TerrenceZhangX wants to merge 27 commits into
users/qiazh/pre-merge-tikv-bugfixfrom
users/zhangt/merge-distributed-to-tikv

Conversation

@TerrenceZhangX
Copy link
Copy Markdown

@TerrenceZhangX TerrenceZhangX commented May 7, 2026

Limitations

  1. Using buildOnly flags to mitigate no ditributed build limitation. Current flow is using buildOnly to let single node to build and distribute to other nodes. Then using disable buildOnly to let multiple nodes load same head index and run benchmarks.

Scale results

Dataset: SIFT1B bigann_base.u8bin, 128d UInt8, L2. SPANN 2-layer index,
4 search threads, 4 insert threads, top-K=5, 200 queries.

1M base + 1M insert

Metric 1node 2node Scale
Build time (s) 74.2 91.3 0.81×
Pre-insert QPS 429.3 696.3 1.62×
Pre-insert mean latency (ms) 9.26 8.63
Pre-insert p99 (ms) 27.31 29.18
Post-insert QPS 425.0 708.1 1.67×
Insert throughput (vec/s) ~900 1793
Recall@5 (pre-insert) 0.984 0.978
Recall@5 (post-insert) 0.983 0.984

Notes:

  • Build is slower on 2node at 1M scale: the cross-node coordination
    overhead (head-sync RPCs, control plane) dominates when there are only
    ~40k head vectors to build.
  • Search scales well already at 1M; recall is unchanged.

100M base + 1M insert

Metric 1node 2node Scale
Build time (s) 15292 16264 0.94×
Pre-insert QPS 183.3 360.5 1.97×
Pre-insert mean latency (ms) 21.56 21.92
Pre-insert p99 (ms) 32.81 39.55
Post-insert QPS (round 2) 183.2 337.2 1.84×
Insert throughput (vec/s) 738 1285 1.74×
Recall@5 (pre-insert) 0.912 0.904
Recall@5 (post-insert) 0.912 0.904

Notes:

  • Search scales near-linearly (1.97×) at the target scale — the per-node
    query partition + remote KV reads are well-balanced.
  • Insert scales sublinearly (1.74×), expected: every insert that
    promotes/splits a head triggers head-index sync across nodes, which is the
    current bottleneck.
  • Build is essentially flat (0.94×): build is dominated by per-node local
    graph construction; 2node has additional head-sync but no work split, so
    near-no scaling here is expected for the current single-builder design.
  • Recall is stable across configurations (within 0.01).

@TerrenceZhangX TerrenceZhangX force-pushed the users/zhangt/merge-distributed-to-tikv branch from dfc4c89 to a158014 Compare May 7, 2026 13:55
@TerrenceZhangX TerrenceZhangX marked this pull request as ready for review May 14, 2026 10:54
Comment thread AnnService/inc/Core/Common/FineGrainedLock.h Outdated
Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h Outdated
Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h Outdated
@TerrenceZhangX TerrenceZhangX requested a review from zqxjjj May 18, 2026 06:59
Branch users/zhangt/merge-onto-qiazh ports our shared remote/local pool +
per-layer routing changes from users/zhangt/merge-distributed-to-tikv on
top of qianxi's TiKV bugfix branch (lock ordering, splitAsync, version
check, etc.). Avoids the 21-block ExtraDynamicSearcher.h merge conflict
on the merged_spfresh side by replaying instead of merging.

Pragmatic approach for heavy files (ExtraDynamicSearcher.h, SPFreshTest.cpp):
take our HEAD versions wholesale (which already contain our distributed +
MultiChunk logic), and patch only the compile-breaking deltas caused by
qianxi's refactors:
  - PostingCountCache moved from ExtraDynamicSearcher.h to ExtraTiKVController.h
  - KeyValueIO grew MultiMerge + LogAsyncWaitStatsAndReset virtuals
    (qianxi version kept; our MultiPut/MultiDelete virtuals re-added on top)
  - Options/ParameterDefinitionList: kept qianxi version (adds m_globalIDPath)
  - ThreadPool: kept our add_high + added addfront alias for qianxi callers

Index.h / IExtraSearcher.h / SPANNIndex.cpp: applied small additive hooks
on top of qianxi (forward-decl WorkerNode, SetWorker/GetSharedSplitPool
accessors, BuildIndexInternalLayer + AddIndex worker loop). qianxi
bugfixes preserved in those files.

Build system:
  - CMakeLists updated for absl_cord + cordz family (kvproto 25.3 uses
    absl 2308, anaconda's grpc bundles 2111; explicit linkage avoids
    DSO-missing-from-command-line)
  - cmake invoked with gRPC_DIR/Protobuf_DIR/absl_DIR pointing at
    /usr/local so generated kvproto + libabsl 2308 versions align

Verified: SPTAGTest links cleanly.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@TerrenceZhangX TerrenceZhangX force-pushed the users/zhangt/merge-distributed-to-tikv branch from 90dbbae to 8716007 Compare May 20, 2026 06:52
TerrenceZhangX and others added 11 commits May 20, 2026 07:21
Strip the SPFRESH_SHARD_STRIDE opt-in code path (4 helpers + plumbing
through LoadAndInsertBatch/RunBenchmark/RunWorker). No active config
sets the env var; we always use the contiguous slice partition.

Test/CMakeLists.txt: explicitly link ${TiKV_LIBRARIES} into SPTAGTest
so a clean build (no .o cache) resolves gpr_/grpc_ symbols pulled in
by the kvproto generated stubs.

ThirdParty/kvproto/.gitignore: stop tracking regenerated stubs going
forward — they are environment-specific (must match the protoc/grpc
in the build env); regenerate locally via generate_cpp.sh.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The previous if/else duplicated the thread launch+join. Restructure to
a single launch with an optional search-during-insert thread:
  - launch insertThreadCount workers
  - if benchmarking, launch one search thread in parallel
  - join all, then compute stats (only when search ran)

Also log a clear note when the bulk router path is used: the user-
supplied InsertThreadNum is unused there (driver runs one launcher
thread and parallelism comes from [BuildSSDIndex] AppendThreadNum
inside ExtraDynamicSearcher's append/split pool).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
8716007 removed the (m_layers+1) multiplier in the SPDK BlockController
queue-depth formula. The change was based on an incorrect assumption
that the distributed port collapses all per-layer SPDK pools into the
single shared layer-0 pool. In practice only layer 0 + the RPC receiver
share a pool; every inner layer (m_layer >= 1) still creates its own
SPDKThreadPool in both BuildIndex and LoadIndex.

With Layers=2 (current active configs) we therefore have ~2 independent
pools each running insert + reassign + append worker threads, so the
peak concurrent IO-submitter count remains the qianxi-original
(layers+1)*(insert+reassign+append) plus search threads. Under-sizing
the BlockController queue could stall IO submission under heavy
split/reassign + search load; over-sizing is harmless. Restore the
multiplier to match qianxi behaviour.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
All distributed runs override VersionCacheMaxChunks=0 (set by
run_distributed.sh in build/run/nocache phases), so the LRU cache is
effectively disabled. Using ReadChunkCached inside SetVersionBatch
adds bookkeeping noise (cache hit/miss path, refresh-mutex acquire)
that produces no benefit. Switch to direct ReadChunk; the dirty-byte
gating still saves the WriteChunk RPC when no version byte actually
changes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The distributed port introduced a separate m_highJobs queue + add_high
in ThreadPool plus 'urgent' parameters on AppendAsync/ReassignAsync.
Receiver dispatch already discovered high-priority starved Split jobs
and switched to high=false. The remaining urgent=true callers were:

  - AppendAsync in CollectReAssign's non-TiKV branch (dead under
    Storage::TIKVIO which is the only storage we use)
  - ReassignAsync on head-miss in Append/BatchAppend (same starvation
    risk against Split that motivated the receiver-side revert)

Restore ThreadPool.h to the upstream deque+addfront shape (no semantic
change vs. original) and drop the urgent parameter from AppendAsync/
ReassignAsync, the high flag from JobSubmitter, and the high path from
WireJobSubmitterIfReady.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
run_distributed.sh:
- Remove wait_workers_ready() — dead since the driver-listens-on-30001
  handshake replaced log-grep readiness detection.
- Drop the stale 'Binary already pushed; nothing else to do here' comment
  that sat immediately after the actual binary-push rsync block.

README.md:
- Correct the TiKV deployment model: the cluster is SHARED (all PDs in
  one raft group, all TiKVs registered as stores, max-replicas=1) — not
  one isolated PD+TiKV per node as the old text claimed. Architecture
  diagram, port table, and pre-split helper updated accordingly (one PD
  endpoint, not a per-node loop).
- Fix Step 1 cluster-config path: configs/cluster_2node.conf (an actual
  shipped file), not the non-existent cluster.conf.example.
- Update port defaults to match cluster_2node.conf (23791/23801/20171)
  and call out that the driver's router_port must not collide with the
  dispatcher port 30001 (cluster_2node.conf uses 30011 for this reason).
- List all shipped configs (10m, 100m, insert_dominant, tikv.toml,
  cluster_*.conf) in the file table.
- Document setup-bins subcommand alongside deploy.
- Flag the Build / Distribute / Run split as a workaround for the
  missing distributed SelectHead/BuildHead implementation, so readers
  don't mistake it for the steady-state design.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The previous wording made it sound like the driver was a stateless
coordinator and workers only talked back to it. Reality: node 0 runs as
worker 0 (owns its hash shard like every other worker) and additionally
hosts the dispatcher; workers talk to each other directly through
PostingRouter for remote append, head sync, and merge hints — no
driver-mediated forwarding. Diagram and 'What run does' steps updated.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
We never actually ran the pre-split/scatter helper in our benchmark
runs. Keeping it in the doc gives the false impression that it's part
of the recommended setup. Remove the whole section.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
// If headID is owned by a remote node, queue the append for that
// node and return true; otherwise return false (caller continues
// with local write logic).
bool TryRouteRemoteAppend(SizeType headID,
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we find somewhere to just identify once?

Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h Outdated
Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h
Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h Outdated
if (!m_headIndex->ContainSample(queryResult->VID, m_layer + 1)) continue;

if ((ret=db->Get(DBKey(queryResult->VID), &nextPostingList, MaxTimeout, &(p_exWorkSpace->m_diskRequests))) != ErrorCode::Success) {
if (isRemoteCandidate) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct?

Comment thread AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h
m_splitThreadPool = std::make_shared<SPDKThreadPool>();
m_splitThreadPool->initSPDK(m_opt->m_appendThreadNum, this);
SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "SPFresh: finish initialization, zeroReplicaCount:%zu\n", zeroReplicaCount);
if (m_layer == 0 && m_headIndex) m_headIndex->SetSharedSplitPool(m_splitThreadPool);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every layer has its split pool.

TerrenceZhangX and others added 4 commits May 21, 2026 09:11
Three Split/Merge/Append code paths duplicated the same check:
  m_worker && m_worker->IsEnabled() &&
each with their own (or missing) m_layer != 0 gate.  Split() at L878
and MergePostings() at L1336 were missing the layer gate entirely, so
on a hypothetical multi-layer cluster they would have skipped local
inner-layer ops (which never use owner-ring routing).

Unify on a single predicate IsRemoteOwnedHead(headID, &nodeIndex) and
gate every callsite on it:
  - TryRouteRemoteAppend  (routing — populates nodeIndex)
  - Split                 (drop remote splits early)
  - MergePostings         (defense-in-depth net)
  - SplitAsync / MergeAsync (don't burn a pool slot for jobs we'll drop)

Addresses PR #448 L553 review comment 'Can we find somewhere to just
identify once'.  Also folds the L1336 'if refine is not there, do we
still need the filter' question — the filter at MergePostings is now
only a safety net behind the MergeAsync enqueue-time gate, so future
RefineIndex removal won't change anything.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Design specifies that when the local VersionMap lags behind a posting
written by a remote peer, the lagging node catches up via
  AddBatch(capacity * numWorkers)

This works because the global VID space is striped across worker
nodes (VID % numWorkers == nodeID), so the peer's maxVID can be at
most ~ localCount * numWorkers ahead of us.  Extending in this large
chunk amortizes many remote inserts into one capacity bump and keeps
growth conflict-free.

The previous EnsureVersionMapCoversPosting did AddBatch(maxVid+1-localCount),
which is correct but causes thrashing — every remote append where
maxVid happens to be slightly past localCount triggers a small extend.

Floor at the exact-gap need so single-node builds (numWorkers <= 1)
behave identically to before.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The four magic constants buried in the RPC layer
  kChunkSize         = 3000   (RemotePostingOps.h)
  attempt < 3        = retry  (RemotePostingOps.h)
  wait_for(180s)     = timeout (RemotePostingOps.h)
  kMaxInflightPerNode = 4     (WorkerNode.h)
are now exposed as SPANN INI parameters under [SSDIndex]:
  RemoteAppendChunkSize       (default 3000)
  RemoteAppendRetry           (default 3)
  RemoteAppendTimeoutSec      (default 180)
  RemoteAppendMaxInflight     (default 4)

Defaults preserve current behavior.  Plumbing:
- Options.h / ParameterDefinitionList.h: declare/register parameters
- RemotePostingOps: hold values in atomics, expose Set/Get* setters
- WorkerNode: forward setters; m_maxInflightPerNode is now atomic
- ExtraDynamicSearcher::SetWorker: push m_opt->m_remoteAppend* once

This unblocks per-deployment RPC tuning (e.g. larger chunks on low-
latency clusters, shorter timeouts in CI) without recompiling, and
removes the long historical comments documenting why the chunk size
was changed 5 times during benchmarking.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Design says async Split/MergeAsync jobs must be safe-to-retry from
any compute node (Section: Async Job Fault Tolerance).  Previous code
recorded a non-Success ret into m_asyncStatus and silently dropped
the job — a transient failure (TiKV blip, remote-lock timeout, etc.)
permanently lost the split/merge.

Both MergeAsyncJob and SplitAsyncJob now carry an attempts counter.
On non-Success, if attempts+1 < m_asyncJobMaxRetry (new SPANN option,
default 3), the job re-adds itself to m_splitThreadPool without
touching the in-flight counter, so the outer drain loop still
accounts for it. After MaxRetry exhaustion the failure surfaces via
m_asyncStatus as before, plus a clear LL_Error log identifying the
head and attempt count.

Idempotency requirements for safe retry are already in place:
- Owner check (IsRemoteOwnedHead) drops remote heads immediately
- ContainSample liveness gate inside Split/MergePostings
- Re-locking the per-head RWLock on each entry
- Read-deduplicate during the next split attempt for partial writes

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
TerrenceZhangX and others added 11 commits May 21, 2026 09:36
Previously the dispatch result only signalled Success/Failed via a 1-byte
enum, so any worker-side failure (TiKV unavailable, KeyNotFound during
search, append rejection, etc.) collapsed into a generic 'Failed' that
the driver couldn't distinguish or react to differently.

Bump DispatchResult MirrorVersion 1 -> 2 and add m_errorCode (int32).
Read/Write gated on mirror >= 2 so older peers stay compatible (they
leave the field at 0).  Driver-side HandleDispatchResult now logs the
errorCode at LL_Error on failed paths, and the existing log line for
every result echoes the code so post-mortem traces show exactly what
each worker reported.

Sample wiring: SPFreshTest's worker dispatch callback sets m_errorCode
on its Unknown-command fallback.  Other code paths (Search/Insert)
already only fail through exceptions in the helpers, which the driver
treats as crash-class events; the field is ready for future failure
propagation work in those paths.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Design's receive-side flow specifies a HandleRaceCondition step before
the local Append callback runs: 'check whether the target HeadID is
currently being split or merged on this node; if so, the append waits
for the structural operation to commit before proceeding.'  Without
this, the existing wasMissing branch (which re-materializes a missing
head from the sender's headVec) can resurrect a head that local Merge
just deleted.  The race is real but small — the per-head RWLock used
by Append/Split/Merge already serializes RMW, but the head-index
ContainSample check + AddHeadIndex resurrection happens outside that
lock.

Add ExtraDynamicSearcher::HandleRaceCondition(headID) that:
  1. Peeks m_splitList / m_mergeList for the head.
  2. If present, briefly acquires the per-head RWLock to wait for the
     structural op to commit.
  3. Returns; the callback continues with a stable view, and the
     normal Append re-acquires the RWLock for the actual RMW.

When the head is genuinely gone after the wait, the sender's later
retry will see the updated head index (via HeadSync) and re-route to
the new owner — exactly the path the design's Append-vs-Merge race
section describes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace the per-bucket atomic<bool> remote-lock cache with a dedicated
RemoteLeaseTable that tracks per-bucket expiry timestamps.  This lets
the owner auto-reclaim a remote lock when the holder crashes or stalls
beyond RemoteLockTtlMs (default 30s) instead of blocking Split/Merge
forever.

New file: AnnService/inc/Core/SPANN/Distributed/RemoteLeaseTable.h.

Fencing tokens deferred — they require a protocol-mirror bump on
RemoteLock{Request,Response} and a callback signature change; will be
added when the watchdog/resend path lands.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
QueueRemoteAppend's auto-flush is fire-and-forget: when the receiver
is briefly unreachable the batch was previously dropped after a single
log line.  This breaks the distributed design's at-least-once async
job contract.

Add AsyncJobWatchdog (new file under Distributed/) that owns timeout-
driven, bounded exponential-backoff resends in a single background
thread.  Wire WorkerNode's auto-flush failure path to hand the batch
to the watchdog instead of dropping it.  RemoteAppend is idempotent on
the receive side (per-posting RMW), so at-least-once is safe.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Adds two TiKV-backed durability primitives matching the distributed
design's HeadSync Job Fault Tolerance and Split Path WAL sections:

  * HeadSyncLog (new file Distributed/HeadSyncLog.h)
      Per-shard monotonically-versioned log of HeadSyncEntry, keyed by
      'hs/e/<shard>/<verBE>', with 'hs/v/<shard>' as the published
      tip and 'hs/c/<node>/<shard>' as each node's applied cursor.
      Exposes Append/ReadSince/LoadCursor/StoreCursor and an optional
      background reconciler thread.  Raw KV (no txn) per design
      guidance; producer-side per-shard mutex serializes version
      bumps and the next reader catches up via cursor replay.

  * SplitWAL (new file Distributed/SplitWAL.h)
      Stage-tracked record under 'wal/split/<headID>/<jobID>' so that
      a cross-owner split can be GC'd after partial failure (one side
      written, the other not).

Wire-in: ExtraDynamicSearcher's BroadcastHeadSync now persists entries
to HeadSyncLog before issuing the in-memory broadcast.  Broadcast
remains the latency path; TiKV is the source of truth so lost or
duplicated broadcasts no longer threaten correctness.

SplitWAL Begin/Commit hooks at the split site, and reconciler thread
activation, are scaffolded behind the new members but not yet wired
into the split flow; they are sequential follow-ups that require
distributed integration testing.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Per the design's Async Job Fault Tolerance section, lease-based locks
need an accompanying fencing token so that a zombie holder which
resumes after its lease expired cannot mutate state now protected by
a newer holder.

Protocol bumps (backwards compatible via mirror-version gates):
  * RemoteAppendRequest  mirror 1 -> 2: m_fencingToken (uint64).
                         Token 0 = unfenced (normal owner-ring route).
  * RemoteLockRequest    mirror 1 -> 2: m_token (uint64).
                         Lock sends 0; Unlock sends issued token.
  * RemoteLockResponse   mirror 0 -> 1: m_token (uint64).
                         Owner returns issued fencing token on Lock.

API changes:
  * RemoteLeaseTable: TryAcquire returns uint64_t token (0=denied);
    Release(bucket, token) only succeeds if token matches; Validate
    used by receiver-side fence check.
  * RemoteLockCallback: bool -> uint64_t signature carrying the token.
  * SendRemoteLock returns uint64_t (issued token on Lock).
  * New FenceValidator callback + RemotePostingOps fence-check on
    inbound RemoteAppend; rejected if token stale.
  * New WorkerNode::SendFencedRemoteAppend synchronous helper for
    split's cross-owner write path (unblocks split-atomicity).

The ExtraDynamicSearcher lock callback now plumbs tokens end-to-end
through RemoteLeaseTable.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace the two TryRouteRemoteAppend call sites in Split (existing-head
merge path and new-head create path) with the synchronous
TryWriteRemoteSplitChildFenced helper when the new head is remote-owned.
The helper performs try-lock-both, writes a SplitWAL Begin record,
sends a fenced RemoteAppend with the lock's monotonic token, then
releases the lock and clears the WAL on success.

On fenced-write failure (lock contention or RPC error), fall back to
the legacy async TryRouteRemoteAppend so the posting is not stranded;
the WAL + watchdog converge eventually.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…PC chunks, remote stats

* Drop the m_layer != 0 short-circuit in IsRemoteOwnedHead.  Both layers
  store postings in the same TiKV cluster (DBKey = m_maxID*m_layer+postingID)
  and need owner-ring routing, fencing, and SplitWAL just like layer 0.
  HeadSync broadcast stays layer-0-only since layer-1 centroids are derived
  from layer-0 splits and reach peers via that broadcast.
  SplitWAL keys now carry the layer to avoid collisions across layers.

* Fix MergeAsyncJob / SplitAsyncJob retry use-after-free: the SPDKThreadPool
  worker unconditionally deletes the Job after exec() returns, so the prior
  'add this; return;' retry path freed the Job while it was still queued.
  Enqueue a fresh Job carrying the bumped attempt count instead.

* Bump RemoteAppendChunkSize 3000->10000 and RemoteAppendMaxInflight 4->8.
  Per-chunk grpc framing was dominating, and with replica fan-out =8 the
  outbound queue at 1M+1M scale ships ~8M items; larger chunks amortize
  send overhead ~3x.

* Add remote queue stats to layer progress + ALL DONE logs and gate the
  ALL DONE boundary on the outbound queue draining.  Previously ALL DONE
  fired as soon as local SPDK pool was empty, even though the network
  pump was still shipping millions of fan-out items, making runs look
  stuck for tens of minutes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…Option A)

When a worker receives a BatchRemoteAppendRequest from a peer, instead of
holding the connection open until every item has been applied (which made
big chunks block long enough to trigger sender timeouts and full-chunk
retries), it now:

  1. Serializes the batch and Put()s it to TiKV under
       wal/rappend/<receiverNode>/<batchID>
  2. Immediately ACKs the sender as 'Accepted'.
  3. Submits the per-item Append jobs onto the per-layer searcher pool.
  4. On last-item completion, deletes the WAL key (best-effort).

On startup, layer-0's SetWorker scans the WAL prefix and re-submits any
batches durably accepted before a previous crash. The Append callback is
already idempotent (versionMap dedup), so duplicate replays are safe.

Implementation:
- New BatchAppendWAL helper (mirrors SplitWAL's style).
- New KeyValueIO::ScanPrefix(prefix, out, max) virtual; TiKVIO implements
  it via paged RawScan with logical-key stripping. Default is no-op so
  non-TiKV backends keep compiling.
- RemotePostingOps::HandleBatchAppendRequest now WAL-then-ACK-then-submit,
  with a graceful fallback to the legacy synchronous-ACK path if the WAL
  Put fails. Shared item-dispatch logic is factored out into
  SubmitBatchItems for reuse by RecoverPendingBatches.
- BatchAppendItemJob takes sendResponse/batchID flags so the same job
  serves both the WAL-backed path (delete WAL on last completion) and the
  legacy path (ACK on last completion).
- ExtraDynamicSearcher::SetWorker constructs the WAL once (layer 0 only,
  scoped by receiver node) and triggers recovery after callbacks are
  wired.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
After enabling the WAL-then-ACK fast path, an aggressive sender could
ACK 1M items in seconds while the receiver's apply pool was still
working through the first 10k — pending queue grew unbounded (1M+),
splits starved because pool workers were all blocked on appends.

Add admission control: RemotePostingOps counts items currently queued
for async apply via m_walPendingItems. When admitting a new batch would
push that above m_walPendingItemsCap (default 50000) we DELIBERATELY
fall back to the synchronous-ACK path, which re-engages the sender's
MaxInflight gate as a natural backpressure mechanism.

Also surface m_walPendingItems in the per-layer progress log
('walPendingItems:N') so operators can see when admission control is
actively engaged.

Verified 2-node insert_dominant 1M+1M: insert throughput 710→770/s
(+8.5%), recall@5 0.976→0.984, post-insert qps 401→438. Pending queue
stays bounded at ~80-130k under load; splits make steady progress.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…rigin pending gauge

Bug fix
-------
SendBatchRemoteAppend moves items[i] into per-chunk std::vector and
calls SendBatchRemoteAppendChunk. If a chunk failed (e.g. timeout) the
function returned without restoring the moved-out items, so the
caller's vector ended up with the leading chunks moved-from (headID +
appendNum scalars still valid, but m_headVec / m_appendPosting empty).
WorkerNode::QueueRemoteAppend's auto-flush path then copied the whole
vector into a watchdog retry queue, which re-sent valid headIDs with
empty postings. The receiver hit the TiKVIO::Merge empty-value gate,
logged 'TiKVIO::Merge: empty append posting!' and 'Merge failed for
HEAD! Posting Size:0' for every such phantom item — in a 2-node
insert_dominant run we observed 390k+ such errors on the driver and
60k on the worker.

Fix:
- SendBatchRemoteAppend now (a) restores moved-out items from the
  still-populated chunk on failure, then erases the already-sent
  prefix so the caller-side retry only sees unsent payload, and
  (b) clears the input vector on full success so any spurious retry
  becomes a no-op instead of resurrecting phantom items.
- Append() drops empty/zero-count payloads with a single warning
  rather than letting them reach the storage layer (defensive guard;
  receiver should never see these once the sender bug above is
  fixed).

Observability
-------------
Added a per-layer counter m_remoteOriginPending in RemotePostingOps,
incremented in SubmitBatchItems and decremented in BatchAppendItemJob.
Exposed via WorkerNode::GetRemoteOriginPendingItems(layer) and a
whole-node aggregate. The progress log in ExtraDynamicSearcher now
prints 'pending queue:N (local:X remote:Y)' so operators can tell
whether the local pool is bottlenecked on its own RMWs/splits or on
serving peer BatchAppend items. Both progress log call sites
(AllFinished's periodic line and GetDBStats's on-demand line) updated
to the same format with the remote out queueDepth / inflightChunks /
walPendingItems context.

Verified on 2-node insert_dominant: 0 empty-posting / merge-failed
errors (was 450k), throughput 758-797/s (within noise of 710 baseline
and 770 WAL run), recall 0.984-0.990.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants