Skip to content

[rust] Add KvBatchScanner for full PK-table bucket scans#633

Open
gnuhpc wants to merge 6 commits into
apache:mainfrom
gnuhpc:pr/6-kv-batch-scanner
Open

[rust] Add KvBatchScanner for full PK-table bucket scans#633
gnuhpc wants to merge 6 commits into
apache:mainfrom
gnuhpc:pr/6-kv-batch-scanner

Conversation

@gnuhpc

@gnuhpc gnuhpc commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

What

Introduces a stateful, unbounded KV-table scanner using the ScanKv API (1061). The first next_batch() opens the server-side cursor; subsequent calls iterate; dropping the scanner sends a best-effort close.

  • client/table/kv_batch_scanner.rs: KvBatchScanner with a per-bucket state machine (Pending → Active → Done), best-effort close on Drop, and retry-with-backoff on any retriable server error (leader-election races on a freshly created bucket, transient TooManyScanners, …) — not just TooManyScanners.
  • client/table/scanner.rs: TableScan::create_kv_batch_scanner() with PK / bucket-range validation.
  • config.rs: scanner_kv_fetch_max_bytes (default 4 MB, matching Java CLIENT_SCANNER_KV_FETCH_MAX_BYTES).
  • rpc/fluss_api_error.rs: new error codes 66–69 (ScannerExpiredException, UnknownScannerIdException, InvalidScanRequestException, TooManyScanners) with correct retriable classification.
  • client/table/batch_scanner.rs: expose the KV decode helpers (pub(super)) for reuse by the new scanner.
  • tests/integration/batch_scanner.rs: 4 KV integration tests; they tolerate UnsupportedVersion so they also pass (as no-ops) against 0.9.x servers that lack ScanKv.

Verification

Built + cargo clippy clean; cargo fmt clean. Integration suite against a real Fluss 1.x cluster: 70/72 pass. The only 2 failures are the SASL negative auth tests (test_sasl_connect_with_wrong_password, test_sasl_connect_with_unknown_user) — these fail because the test docker image does not enforce SASL auth (verified: valid/wrong/unknown credentials all connect). The Rust client's SASL handshake is correct (sends AuthenticateRequest, retries RetriableAuthenticateException, propagates non-retriable); Fluss's own reference test SaslAuthenticationITCase with identical config does reject bad credentials. So this is a test-image/server packaging issue, not an SDK defect.

Stack

Part 6/6 (final), stacked on #632 → … → #628. All target main.

🤖 Generated with Claude Code

warmbupt and others added 6 commits June 18, 2026 11:44
- Update fluss_api.proto with all 1.x message types (ACLs, KV snapshots,
  producer offsets, cluster config, rebalance, server tags, etc.)
- Add optional fields: rack, remote_data_dir, leader_epoch, agg_mode, etc.
- Register 24 new ApiKey variants (1023-1064) in api_key.rs
- Update build.rs prost bytes config for new proto fields
- Add None defaults in convert.rs and partition.rs
- Update pre-existing message wrappers that reference renamed proto fields /
  ApiKey variants so the crate still builds:
    * create_partition.rs: ignore_if_exists -> ignore_if_not_exists
    * get_latest_lake_snapshot.rs: ApiKey::GetLatestLakeSnapshot -> GetLakeSnapshot
    * list_databases.rs: populate new include_summary field
    * lookup.rs: PbLookupReqForBucket.key -> keys; new LookupRequest fields

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add 9 RPC message wrapper types:
- alter_database, alter_table (DDL operations)
- get_table_stats (table statistics)
- list_database_summaries (database listing with summaries)
- create_acls, list_acls, drop_acls (ACL management)
- describe_cluster_configs, alter_cluster_configs (cluster configuration)

Each wrapper follows the standard pattern: a request struct wrapping the
proto-generated type, implementing RequestBody (tying to ApiKey and ResponseBody),
WriteType, and ReadType.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add message wrappers for the remaining 1.x RPC APIs:
- KV snapshot lifecycle: acquire/release/drop lease, list, metadata,
  latest snapshots, lake snapshot
- Server management: add/remove server tag, rebalance + progress +
  cancel, get cluster health, list remote log manifests
- Producer offsets: register/get/delete
- ScanKv (API 1061): full KV-table bucket scan request/response
Add 27 new admin methods to FlussAdmin:
- Database/table extensions: list_database_summaries, alter_database,
  alter_table, get_table_stats
- KV snapshot operations: get_latest_kv_snapshots,
  get_kv_snapshot_metadata, create_kv_snapshot_lease, get_lake_snapshot
- ACL management: create_acls, list_acls, drop_acls
- Cluster configuration: describe_cluster_configs, alter_cluster_configs
- Server management: add_server_tag, remove_server_tag, rebalance,
  list_rebalance_progress, cancel_rebalance
- Producer offsets: register_producer_offsets, get_producer_offsets,
  delete_producer_offsets
- Monitoring: get_cluster_health, list_remote_log_manifests
- KV snapshots: list_kv_snapshots, release_kv_snapshot_lease,
  drop_kv_snapshot_lease
Introduce a stateful, unbounded KV-table scanner using the ScanKv API
(1061). The first next_batch() opens the server-side cursor; subsequent
calls iterate; dropping the scanner sends a best-effort close.

- client/table/kv_batch_scanner.rs: KvBatchScanner with per-bucket state
  machine (Pending -> Active -> Done), best-effort close on Drop, and
  retry-with-backoff on retriable server errors (leader-election races,
  TooManyScanners, etc.) — not just TooManyScanners.
- client/table/scanner.rs: TableScan::create_kv_batch_scanner() with
  PK/bucket validation.
- config.rs: scanner_kv_fetch_max_bytes (default 4MB, matching Java).
- rpc/fluss_api_error.rs: new error codes 66-69
  (ScannerExpiredException, UnknownScannerIdException,
  InvalidScanRequestException, TooManyScanners) with correct retriable
  classification.
- batch_scanner.rs: expose KV decode helpers (pub(super)) for reuse.
- tests: 4 KV integration tests; tolerate UnsupportedVersion so they
  also pass (no-op) against 0.9.x servers that lack ScanKv.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants