[rust] Add KvBatchScanner for full PK-table bucket scans#633
Open
gnuhpc wants to merge 6 commits into
Open
Conversation
- Update fluss_api.proto with all 1.x message types (ACLs, KV snapshots,
producer offsets, cluster config, rebalance, server tags, etc.)
- Add optional fields: rack, remote_data_dir, leader_epoch, agg_mode, etc.
- Register 24 new ApiKey variants (1023-1064) in api_key.rs
- Update build.rs prost bytes config for new proto fields
- Add None defaults in convert.rs and partition.rs
- Update pre-existing message wrappers that reference renamed proto fields /
ApiKey variants so the crate still builds:
* create_partition.rs: ignore_if_exists -> ignore_if_not_exists
* get_latest_lake_snapshot.rs: ApiKey::GetLatestLakeSnapshot -> GetLakeSnapshot
* list_databases.rs: populate new include_summary field
* lookup.rs: PbLookupReqForBucket.key -> keys; new LookupRequest fields
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add 9 RPC message wrapper types: - alter_database, alter_table (DDL operations) - get_table_stats (table statistics) - list_database_summaries (database listing with summaries) - create_acls, list_acls, drop_acls (ACL management) - describe_cluster_configs, alter_cluster_configs (cluster configuration) Each wrapper follows the standard pattern: a request struct wrapping the proto-generated type, implementing RequestBody (tying to ApiKey and ResponseBody), WriteType, and ReadType. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add message wrappers for the remaining 1.x RPC APIs: - KV snapshot lifecycle: acquire/release/drop lease, list, metadata, latest snapshots, lake snapshot - Server management: add/remove server tag, rebalance + progress + cancel, get cluster health, list remote log manifests - Producer offsets: register/get/delete - ScanKv (API 1061): full KV-table bucket scan request/response
Add 27 new admin methods to FlussAdmin: - Database/table extensions: list_database_summaries, alter_database, alter_table, get_table_stats - KV snapshot operations: get_latest_kv_snapshots, get_kv_snapshot_metadata, create_kv_snapshot_lease, get_lake_snapshot - ACL management: create_acls, list_acls, drop_acls - Cluster configuration: describe_cluster_configs, alter_cluster_configs - Server management: add_server_tag, remove_server_tag, rebalance, list_rebalance_progress, cancel_rebalance - Producer offsets: register_producer_offsets, get_producer_offsets, delete_producer_offsets - Monitoring: get_cluster_health, list_remote_log_manifests - KV snapshots: list_kv_snapshots, release_kv_snapshot_lease, drop_kv_snapshot_lease
Introduce a stateful, unbounded KV-table scanner using the ScanKv API (1061). The first next_batch() opens the server-side cursor; subsequent calls iterate; dropping the scanner sends a best-effort close. - client/table/kv_batch_scanner.rs: KvBatchScanner with per-bucket state machine (Pending -> Active -> Done), best-effort close on Drop, and retry-with-backoff on retriable server errors (leader-election races, TooManyScanners, etc.) — not just TooManyScanners. - client/table/scanner.rs: TableScan::create_kv_batch_scanner() with PK/bucket validation. - config.rs: scanner_kv_fetch_max_bytes (default 4MB, matching Java). - rpc/fluss_api_error.rs: new error codes 66-69 (ScannerExpiredException, UnknownScannerIdException, InvalidScanRequestException, TooManyScanners) with correct retriable classification. - batch_scanner.rs: expose KV decode helpers (pub(super)) for reuse. - tests: 4 KV integration tests; tolerate UnsupportedVersion so they also pass (no-op) against 0.9.x servers that lack ScanKv.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Introduces a stateful, unbounded KV-table scanner using the ScanKv API (1061). The first
next_batch()opens the server-side cursor; subsequent calls iterate; dropping the scanner sends a best-effort close.client/table/kv_batch_scanner.rs:KvBatchScannerwith a per-bucket state machine (Pending → Active → Done), best-effort close onDrop, and retry-with-backoff on any retriable server error (leader-election races on a freshly created bucket, transientTooManyScanners, …) — not justTooManyScanners.client/table/scanner.rs:TableScan::create_kv_batch_scanner()with PK / bucket-range validation.config.rs:scanner_kv_fetch_max_bytes(default 4 MB, matching JavaCLIENT_SCANNER_KV_FETCH_MAX_BYTES).rpc/fluss_api_error.rs: new error codes 66–69 (ScannerExpiredException,UnknownScannerIdException,InvalidScanRequestException,TooManyScanners) with correct retriable classification.client/table/batch_scanner.rs: expose the KV decode helpers (pub(super)) for reuse by the new scanner.tests/integration/batch_scanner.rs: 4 KV integration tests; they tolerateUnsupportedVersionso they also pass (as no-ops) against 0.9.x servers that lack ScanKv.Verification
Built +
cargo clippyclean;cargo fmtclean. Integration suite against a real Fluss 1.x cluster: 70/72 pass. The only 2 failures are the SASL negative auth tests (test_sasl_connect_with_wrong_password,test_sasl_connect_with_unknown_user) — these fail because the test docker image does not enforce SASL auth (verified: valid/wrong/unknown credentials all connect). The Rust client's SASL handshake is correct (sendsAuthenticateRequest, retriesRetriableAuthenticateException, propagates non-retriable); Fluss's own reference testSaslAuthenticationITCasewith identical config does reject bad credentials. So this is a test-image/server packaging issue, not an SDK defect.Stack
Part 6/6 (final), stacked on #632 → … → #628. All target
main.🤖 Generated with Claude Code