From 7c0f61f0fffb47288c97bc5683193d188ac68432 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Thu, 18 Jun 2026 11:40:16 +0800 Subject: [PATCH 1/5] [rust] Sync Fluss 1.x proto definitions and ApiKey registry - Update fluss_api.proto with all 1.x message types (ACLs, KV snapshots, producer offsets, cluster config, rebalance, server tags, etc.) - Add optional fields: rack, remote_data_dir, leader_epoch, agg_mode, etc. - Register 24 new ApiKey variants (1023-1064) in api_key.rs - Update build.rs prost bytes config for new proto fields - Add None defaults in convert.rs and partition.rs - Update pre-existing message wrappers that reference renamed proto fields / ApiKey variants so the crate still builds: * create_partition.rs: ignore_if_exists -> ignore_if_not_exists * get_latest_lake_snapshot.rs: ApiKey::GetLatestLakeSnapshot -> GetLakeSnapshot * list_databases.rs: populate new include_summary field * lookup.rs: PbLookupReqForBucket.key -> keys; new LookupRequest fields Co-Authored-By: Claude Opus 4.8 --- crates/fluss/build.rs | 3 +- crates/fluss/src/client/admin.rs | 1 + crates/fluss/src/client/table/scanner.rs | 5 + crates/fluss/src/metadata/partition.rs | 1 + crates/fluss/src/proto/fluss_api.proto | 464 +++++++++++++++++- crates/fluss/src/rpc/api_key.rs | 137 +++++- crates/fluss/src/rpc/convert.rs | 2 + .../fluss/src/rpc/message/create_partition.rs | 2 +- .../rpc/message/get_latest_lake_snapshot.rs | 2 +- .../fluss/src/rpc/message/list_databases.rs | 4 +- crates/fluss/src/rpc/message/lookup.rs | 5 +- 11 files changed, 612 insertions(+), 14 deletions(-) diff --git a/crates/fluss/build.rs b/crates/fluss/build.rs index 040ee521..f9248c36 100644 --- a/crates/fluss/build.rs +++ b/crates/fluss/build.rs @@ -22,8 +22,9 @@ fn main() -> Result<()> { config.bytes([ ".proto.PbProduceLogReqForBucket.records", ".proto.PbPutKvReqForBucket.records", - ".proto.PbLookupReqForBucket.key", + ".proto.PbLookupReqForBucket.keys", ".proto.PbPrefixLookupReqForBucket.keys", + ".proto.ScanKvResponse.records", ]); config.compile_protos(&["src/proto/fluss_api.proto"], &["src/proto"])?; Ok(()) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 0828b83b..d3d5a5e3 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -150,6 +150,7 @@ impl FlussAdmin { table_json, created_time, modified_time, + .. } = response; let v: &[u8] = &table_json[..]; let table_descriptor = diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 8578daa4..b36b0e42 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -2058,6 +2058,8 @@ impl LogFetcher { projection_pushdown_enabled: projection_enabled, projected_fields: projected_fields.clone(), buckets_req: feq_for_buckets, + filter_predicate: None, + filter_schema_id: None, }; let fetch_log_request = FetchLogRequest { @@ -2408,6 +2410,7 @@ mod tests { log_start_offset: None, remote_log_fetch_info: None, records: None, + filtered_end_offset: None, }], }], }; @@ -2464,6 +2467,7 @@ mod tests { log_start_offset: None, remote_log_fetch_info: None, records: None, + filtered_end_offset: None, }], }], }; @@ -2801,6 +2805,7 @@ mod tests { log_start_offset: Some(0), remote_log_fetch_info: None, records: None, + filtered_end_offset: None, }], }], }; diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index 18402354..c63fe296 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -300,6 +300,7 @@ impl PartitionInfo { PbPartitionInfo { partition_id: self.partition_id, partition_spec: self.partition_spec.to_pb(), + remote_data_dir: None, } } diff --git a/crates/fluss/src/proto/fluss_api.proto b/crates/fluss/src/proto/fluss_api.proto index 2add80d7..7d7fd89e 100644 --- a/crates/fluss/src/proto/fluss_api.proto +++ b/crates/fluss/src/proto/fluss_api.proto @@ -94,6 +94,7 @@ message PbServerNode { required string host = 2; required int32 port = 3; optional string listeners = 4; + optional string rack = 5; } message PbTableMetadata { @@ -104,6 +105,7 @@ message PbTableMetadata { repeated PbBucketMetadata bucket_metadata = 5; required int64 created_time = 6; required int64 modified_time = 7; + optional string remote_data_dir = 8; } message PbPartitionMetadata { @@ -119,7 +121,7 @@ message PbBucketMetadata { // optional as some time the leader may not elected yet optional int32 leader_id = 2; repeated int32 replica_id = 3 [packed = true]; - // TODO: Add isr here. + optional int32 leader_epoch = 4; } message PbProduceLogReqForBucket { @@ -145,6 +147,7 @@ message PutKvRequest { // if empty, means write all columns repeated int32 target_columns = 4 [packed = true]; repeated PbPutKvReqForBucket buckets_req = 5; + optional int32 agg_mode = 6; } message PutKvResponse { @@ -162,6 +165,7 @@ message PbPutKvRespForBucket { required int32 bucket_id = 2; optional int32 error_code = 3; optional string error_message = 4; + optional int64 log_end_offset = 5; } message CreateTableRequest { @@ -199,6 +203,7 @@ message GetTableInfoResponse { required bytes table_json = 3; required int64 created_time = 4; required int64 modified_time = 5; + optional string remote_data_dir = 6; } // get table schema request and response. Mirrors the Java RPC at api key 1011. @@ -258,10 +263,12 @@ message DatabaseExistsResponse { } message ListDatabasesRequest { + optional bool include_summary = 1; } message ListDatabasesResponse { repeated string database_name = 1; + repeated PbDatabaseSummary database_summary = 2; } // list offsets request and response @@ -291,11 +298,46 @@ message FetchLogResponse { repeated PbFetchLogRespForTable tables_resp = 1; } +message PbPredicate { + required int32 type = 1; + optional PbLeafPredicate leaf = 2; + optional PbCompoundPredicate compound = 3; +} + +message PbLeafPredicate { + required int32 function = 1; + required int32 field_id = 2; + repeated PbLiteralValue literals = 3; +} + +message PbCompoundPredicate { + required int32 function = 1; + repeated PbPredicate children = 2; +} + +message PbLiteralValue { + required int32 literal_type = 1; + required bool is_null = 2; + optional bool boolean_value = 3; + optional int32 int_value = 4; + optional int64 bigint_value = 5; + optional float float_value = 6; + optional double double_value = 7; + optional string string_value = 8; + optional bytes binary_value = 9; + optional int64 decimal_value = 10; + optional int64 timestamp_millis_value = 11; + optional int32 timestamp_nano_of_millis_value = 12; + optional bytes decimal_bytes = 13; +} + message PbFetchLogReqForTable { required int64 table_id = 1; required bool projection_pushdown_enabled = 2; repeated int32 projected_fields = 3 [packed = true]; repeated PbFetchLogReqForBucket buckets_req = 4; + optional PbPredicate filter_predicate = 5; + optional int32 filter_schema_id = 6; } @@ -321,6 +363,7 @@ message PbFetchLogRespForBucket { optional int64 log_start_offset = 6; // TODO now we don't introduce log start offset, but remain it in protobuf optional PbRemoteLogFetchInfo remote_log_fetch_info = 7; optional bytes records = 8; + optional int64 filtered_end_offset = 9; } message PbRemoteLogFetchInfo { @@ -360,6 +403,7 @@ message PbLakeSnapshotForBucket { optional int64 partition_id = 1; required int32 bucket_id = 2; optional int64 log_offset = 3; + optional string partition_name = 4; } message PbKeyValue { @@ -381,6 +425,9 @@ message GetFileSystemSecurityTokenResponse { message LookupRequest { required int64 table_id = 1; repeated PbLookupReqForBucket buckets_req = 2; + optional bool insert_if_not_exists = 3; + optional int32 acks = 4; + optional int32 timeout_ms = 5; } message LookupResponse { @@ -390,7 +437,7 @@ message LookupResponse { message PbLookupReqForBucket { optional int64 partition_id = 1; required int32 bucket_id = 2; - repeated bytes key = 3; + repeated bytes keys = 3; } message PbLookupRespForBucket { @@ -440,6 +487,7 @@ message PbPartitionSpec { message PbPartitionInfo { required int64 partition_id = 1; required PbPartitionSpec partition_spec = 2; + optional string remote_data_dir = 3; } message ListPartitionInfosRequest { @@ -454,7 +502,7 @@ message ListPartitionInfosResponse { message CreatePartitionRequest { required PbTablePath table_path = 1; required PbPartitionSpec partition_spec = 2; - required bool ignore_if_exists = 3; + required bool ignore_if_not_exists = 3; } message CreatePartitionResponse {} @@ -501,3 +549,413 @@ message InitWriterRequest { message InitWriterResponse { required int64 writer_id = 1; } + +message PbDatabaseSummary { + required string database_name = 1; + required int64 created_time = 2; + required int32 table_count = 3; +} + +message AlterDatabaseRequest { + required string database_name = 1; + required bool ignore_if_not_exists = 2; + repeated PbAlterConfig config_changes = 3; + optional string comment = 4; +} + +message AlterDatabaseResponse { +} + +message AlterTableRequest { + required PbTablePath table_path = 1; + required bool ignore_if_not_exists = 2; + repeated PbAlterConfig config_changes = 3; + repeated PbAddColumn add_columns = 4; + repeated PbDropColumn drop_columns = 5; + repeated PbRenameColumn rename_columns = 6; + repeated PbModifyColumn modify_columns = 7; +} + +message AlterTableResponse { +} + +message PbAlterConfig { + required string config_key = 1; + optional string config_value = 2; + required int32 op_type = 3; +} + +message PbAddColumn { + required string column_name = 1; + required bytes data_type_json = 2; + optional string comment = 3; + required int32 column_position_type = 4; +} + +message PbDropColumn { + required string column_name = 1; +} + +message PbRenameColumn { + required string old_column_name = 1; + required string new_column_name = 2; +} + +message PbModifyColumn { + required string column_name = 1; + optional bytes data_type_json = 2; + optional string comment = 3; + optional int32 column_position_type = 4; +} + +message GetTableStatsRequest { + required int64 table_id = 1; + repeated PbTableStatsReqForBucket buckets_req = 2; + repeated int32 target_columns = 3 [packed = true]; +} + +message GetTableStatsResponse { + repeated PbTableStatsRespForBucket buckets_resp = 1; +} + +message PbTableStatsReqForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; +} + +message PbTableStatsRespForBucket { + optional int32 error_code = 1; + optional string error_message = 2; + optional int64 partition_id = 3; + required int32 bucket_id = 4; + optional int64 row_count = 5; +} + +message GetLatestKvSnapshotsRequest { + required PbTablePath table_path = 1; + optional string partition_name = 2; +} + +message GetLatestKvSnapshotsResponse { + required int64 table_id = 1; + optional int64 partition_id = 2; + repeated PbKvSnapshot latest_snapshots = 3; +} + +message PbKvSnapshot { + required int32 bucket_id = 1; + optional int64 snapshot_id = 2; + optional int64 log_offset = 3; +} + +message GetKvSnapshotMetadataRequest { + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; + required int64 snapshot_id = 4; +} + +message GetKvSnapshotMetadataResponse { + required int64 log_offset = 1; + repeated PbRemotePathAndLocalFile snapshot_files = 2; +} + +message PbRemotePathAndLocalFile { + required string remote_path = 1; + required string local_file_name = 2; +} + +message AcquireKvSnapshotLeaseRequest { + required string lease_id = 1; + required int64 lease_duration_ms = 2; + repeated PbKvSnapshotLeaseForTable snapshots_to_lease = 3; +} + +message AcquireKvSnapshotLeaseResponse { + repeated PbKvSnapshotLeaseForTable unavailable_snapshots = 1; +} + +message PbKvSnapshotLeaseForTable { + required int64 table_id = 1; + repeated PbKvSnapshotLeaseForBucket bucket_snapshots = 2; +} + +message PbKvSnapshotLeaseForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + required int64 snapshot_id = 3; +} + +message GetLakeSnapshotRequest { + required PbTablePath table_path = 1; + optional int64 snapshot_id = 2; + optional bool readable = 3; +} + +message GetLakeSnapshotResponse { + required int64 table_id = 1; + required int64 snapshotId = 2; + repeated PbLakeSnapshotForBucket bucket_snapshots = 3; +} + +message PbAclInfo { + required string resource_name = 1; + required int32 resource_type = 2; + required string principal_name = 3; + required string principal_type = 4; + required string host = 5; + required int32 operation_type = 6; + required int32 permission_type = 7; +} + +message PbAclFilter { + optional string resource_name = 1; + required int32 resource_type = 2; + optional string principal_name = 3; + optional string principal_type = 4; + optional string host = 5; + required int32 operation_type = 6; + required int32 permission_type = 7; +} + +message PbCreateAclRespInfo { + required PbAclInfo acl = 1; + optional int32 error_code = 2; + optional string error_message = 3; +} + +message PbDropAclsFilterResult { + repeated PbDropAclsMatchingAcl matching_acls = 1; + optional int32 error_code = 2; + optional string error_message = 3; +} + +message PbDropAclsMatchingAcl { + required PbAclInfo acl = 1; + optional int32 error_code = 2; + optional string error_message = 3; +} + +message ListAclsRequest { + required PbAclFilter acl_filter = 1; +} + +message ListAclsResponse { + repeated PbAclInfo acl = 1; +} + +message CreateAclsRequest { + repeated PbAclInfo acl = 1; +} + +message CreateAclsResponse { + repeated PbCreateAclRespInfo aclRes = 1; +} + +message DropAclsRequest { + repeated PbAclFilter acl_filter = 1; +} + +message DropAclsResponse { + repeated PbDropAclsFilterResult filter_results = 1; +} + +message PbDescribeConfig { + required string config_key = 1; + optional string config_value = 2; + required string config_source = 3; +} + +message DescribeClusterConfigsRequest { +} + +message DescribeClusterConfigsResponse { + repeated PbDescribeConfig configs = 1; +} + +message AlterClusterConfigsRequest { + repeated PbAlterConfig alter_configs = 1; +} + +message AlterClusterConfigsResponse { +} + +message AddServerTagRequest { + repeated int32 server_ids = 1 [packed = true]; + required int32 server_tag = 2; +} + +message AddServerTagResponse { +} + +message RemoveServerTagRequest { + repeated int32 server_ids = 1 [packed = true]; + required int32 server_tag = 2; +} + +message RemoveServerTagResponse { +} + +message RebalanceRequest { + repeated int32 goals = 1 [packed = true]; +} + +message RebalanceResponse { + required string rebalance_id = 1; +} + +message ListRebalanceProgressRequest { + optional string rebalance_id = 1; +} + +message ListRebalanceProgressResponse { + optional string rebalance_id = 1; + optional int32 rebalance_status = 2; + repeated PbRebalanceProgressForTable table_progress = 3; +} + +message PbRebalanceProgressForTable { + required int64 table_id = 1; + repeated PbRebalanceProgressForBucket buckets_progress = 2; +} + +message PbRebalanceProgressForBucket { + required PbRebalancePlanForBucket rebalance_plan = 1; + required int32 rebalance_status = 2; +} + +message PbRebalancePlanForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + optional int32 original_leader = 3; + optional int32 new_leader = 4; + repeated int32 original_replicas = 5 [packed = true]; + repeated int32 new_replicas = 6 [packed = true]; +} + +message CancelRebalanceRequest { + optional string rebalance_id = 1; +} + +message CancelRebalanceResponse { +} + +message PbBucketOffset { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + optional int64 log_end_offset = 4; +} + +message PbProducerTableOffsets { + required int64 table_id = 1; + repeated PbBucketOffset bucket_offsets = 2; +} + +message RegisterProducerOffsetsRequest { + required string producer_id = 1; + repeated PbProducerTableOffsets table_offsets = 2; + optional int64 ttl_ms = 3; +} + +message RegisterProducerOffsetsResponse { + optional int32 result = 1; +} + +message GetProducerOffsetsRequest { + required string producer_id = 1; +} + +message GetProducerOffsetsResponse { + optional string producer_id = 1; + optional int64 expiration_time = 2; + repeated PbProducerTableOffsets table_offsets = 3; +} + +message DeleteProducerOffsetsRequest { + required string producer_id = 1; +} + +message DeleteProducerOffsetsResponse { +} + +message PbTableBucket { + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; +} + +message ReleaseKvSnapshotLeaseRequest { + required string lease_id = 1; + repeated PbTableBucket buckets_to_release = 2; +} + +message ReleaseKvSnapshotLeaseResponse { +} + +message DropKvSnapshotLeaseRequest { + required string lease_id = 1; +} + +message DropKvSnapshotLeaseResponse { +} + +message PbScanReqForBucket { + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; + optional int64 limit = 4; +} + +message ScanKvRequest { + optional bytes scanner_id = 1; + optional PbScanReqForBucket bucket_scan_req = 2; + optional int32 call_seq_id = 3; + optional int32 batch_size_bytes = 4; + optional bool close_scanner = 5; +} + +message ScanKvResponse { + optional int32 error_code = 1; + optional string error_message = 2; + optional bytes scanner_id = 3; + optional bool has_more_results = 4; + optional bytes records = 5; + optional int64 log_offset = 6; +} + +message GetClusterHealthRequest { +} + +message GetClusterHealthResponse { + required int32 num_replicas = 1; + required int32 in_sync_replicas = 2; + required int32 num_leader_replicas = 3; + required int32 active_leader_replicas = 4; + required int32 status = 5; +} + +message PbRemoteLogManifestEntry { + required PbTableBucket table_bucket = 1; + required string remote_log_manifest_path = 2; + required int64 remote_log_end_offset = 3; +} + +message ListRemoteLogManifestsRequest { + required int64 table_id = 1; + optional int64 partition_id = 2; +} + +message ListRemoteLogManifestsResponse { + repeated PbRemoteLogManifestEntry manifests = 1; +} + +message ListKvSnapshotsRequest { + required int64 table_id = 1; + optional int64 partition_id = 2; +} + +message ListKvSnapshotsResponse { + required int64 table_id = 1; + optional int64 partition_id = 2; + repeated PbKvSnapshot active_snapshots = 3; +} diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs index aaeca70d..cab034d2 100644 --- a/crates/fluss/src/rpc/api_key.rs +++ b/crates/fluss/src/rpc/api_key.rs @@ -38,15 +38,40 @@ pub enum ApiKey { PutKv, // 1016 Lookup, // 1017 ListOffsets, // 1021 + GetLatestKvSnapshots, // 1023 + GetKvSnapshotMetadata, // 1024 GetFileSystemSecurityToken, // 1025 InitWriter, // 1026 - GetLatestLakeSnapshot, // 1032 + GetLakeSnapshot, // 1032 LimitScan, // 1033 PrefixLookup, // 1034 GetDatabaseInfo, // 1035 CreatePartition, // 1036 DropPartition, // 1037 Authenticate, // 1038 + CreateAcls, // 1039 + ListAcls, // 1040 + DropAcls, // 1041 + AlterTable, // 1044 + DescribeClusterConfigs, // 1045 + AlterClusterConfigs, // 1046 + AddServerTag, // 1047 + RemoveServerTag, // 1048 + Rebalance, // 1049 + ListRebalanceProgress, // 1050 + CancelRebalance, // 1051 + RegisterProducerOffsets, // 1053 + GetProducerOffsets, // 1054 + DeleteProducerOffsets, // 1055 + AcquireKvSnapshotLease, // 1056 + ReleaseKvSnapshotLease, // 1057 + DropKvSnapshotLease, // 1058 + GetTableStats, // 1059 + AlterDatabase, // 1060 + ScanKv, // 1061 + GetClusterHealth, // 1062 + ListRemoteLogManifests, // 1063 + ListKvSnapshots, // 1064 Unknown(i16), } @@ -71,14 +96,39 @@ impl ApiKey { | ApiKey::ProduceLog | ApiKey::FetchLog | ApiKey::ListOffsets + | ApiKey::GetLatestKvSnapshots + | ApiKey::GetKvSnapshotMetadata | ApiKey::GetFileSystemSecurityToken | ApiKey::InitWriter - | ApiKey::GetLatestLakeSnapshot + | ApiKey::GetLakeSnapshot | ApiKey::LimitScan | ApiKey::GetDatabaseInfo | ApiKey::CreatePartition | ApiKey::DropPartition - | ApiKey::Authenticate => Some(ApiVersionRange::new(ApiVersion(0), ApiVersion(0))), + | ApiKey::Authenticate + | ApiKey::CreateAcls + | ApiKey::ListAcls + | ApiKey::DropAcls + | ApiKey::AlterTable + | ApiKey::DescribeClusterConfigs + | ApiKey::AlterClusterConfigs + | ApiKey::AddServerTag + | ApiKey::RemoveServerTag + | ApiKey::Rebalance + | ApiKey::ListRebalanceProgress + | ApiKey::CancelRebalance + | ApiKey::RegisterProducerOffsets + | ApiKey::GetProducerOffsets + | ApiKey::DeleteProducerOffsets + | ApiKey::AcquireKvSnapshotLease + | ApiKey::ReleaseKvSnapshotLease + | ApiKey::DropKvSnapshotLease + | ApiKey::GetTableStats + | ApiKey::AlterDatabase + | ApiKey::ScanKv + | ApiKey::GetClusterHealth + | ApiKey::ListRemoteLogManifests + | ApiKey::ListKvSnapshots => Some(ApiVersionRange::new(ApiVersion(0), ApiVersion(0))), // PutKv / Lookup / PrefixLookup support v0 (legacy key encoding) // and v1 (Paimon BinaryRow key encoding for kv_format_version=2 // non-default bucket keys). The Rust client encodes both. @@ -111,15 +161,40 @@ impl From for ApiKey { 1016 => ApiKey::PutKv, 1017 => ApiKey::Lookup, 1021 => ApiKey::ListOffsets, + 1023 => ApiKey::GetLatestKvSnapshots, + 1024 => ApiKey::GetKvSnapshotMetadata, 1025 => ApiKey::GetFileSystemSecurityToken, 1026 => ApiKey::InitWriter, - 1032 => ApiKey::GetLatestLakeSnapshot, + 1032 => ApiKey::GetLakeSnapshot, 1033 => ApiKey::LimitScan, 1034 => ApiKey::PrefixLookup, 1035 => ApiKey::GetDatabaseInfo, 1036 => ApiKey::CreatePartition, 1037 => ApiKey::DropPartition, 1038 => ApiKey::Authenticate, + 1039 => ApiKey::CreateAcls, + 1040 => ApiKey::ListAcls, + 1041 => ApiKey::DropAcls, + 1044 => ApiKey::AlterTable, + 1045 => ApiKey::DescribeClusterConfigs, + 1046 => ApiKey::AlterClusterConfigs, + 1047 => ApiKey::AddServerTag, + 1048 => ApiKey::RemoveServerTag, + 1049 => ApiKey::Rebalance, + 1050 => ApiKey::ListRebalanceProgress, + 1051 => ApiKey::CancelRebalance, + 1053 => ApiKey::RegisterProducerOffsets, + 1054 => ApiKey::GetProducerOffsets, + 1055 => ApiKey::DeleteProducerOffsets, + 1056 => ApiKey::AcquireKvSnapshotLease, + 1057 => ApiKey::ReleaseKvSnapshotLease, + 1058 => ApiKey::DropKvSnapshotLease, + 1059 => ApiKey::GetTableStats, + 1060 => ApiKey::AlterDatabase, + 1061 => ApiKey::ScanKv, + 1062 => ApiKey::GetClusterHealth, + 1063 => ApiKey::ListRemoteLogManifests, + 1064 => ApiKey::ListKvSnapshots, _ => Unknown(key), } @@ -147,15 +222,40 @@ impl From for i16 { ApiKey::PutKv => 1016, ApiKey::Lookup => 1017, ApiKey::ListOffsets => 1021, + ApiKey::GetLatestKvSnapshots => 1023, + ApiKey::GetKvSnapshotMetadata => 1024, ApiKey::GetFileSystemSecurityToken => 1025, ApiKey::InitWriter => 1026, - ApiKey::GetLatestLakeSnapshot => 1032, + ApiKey::GetLakeSnapshot => 1032, ApiKey::LimitScan => 1033, ApiKey::PrefixLookup => 1034, ApiKey::GetDatabaseInfo => 1035, ApiKey::CreatePartition => 1036, ApiKey::DropPartition => 1037, ApiKey::Authenticate => 1038, + ApiKey::CreateAcls => 1039, + ApiKey::ListAcls => 1040, + ApiKey::DropAcls => 1041, + ApiKey::AlterTable => 1044, + ApiKey::DescribeClusterConfigs => 1045, + ApiKey::AlterClusterConfigs => 1046, + ApiKey::AddServerTag => 1047, + ApiKey::RemoveServerTag => 1048, + ApiKey::Rebalance => 1049, + ApiKey::ListRebalanceProgress => 1050, + ApiKey::CancelRebalance => 1051, + ApiKey::RegisterProducerOffsets => 1053, + ApiKey::GetProducerOffsets => 1054, + ApiKey::DeleteProducerOffsets => 1055, + ApiKey::AcquireKvSnapshotLease => 1056, + ApiKey::ReleaseKvSnapshotLease => 1057, + ApiKey::DropKvSnapshotLease => 1058, + ApiKey::GetTableStats => 1059, + ApiKey::AlterDatabase => 1060, + ApiKey::ScanKv => 1061, + ApiKey::GetClusterHealth => 1062, + ApiKey::ListRemoteLogManifests => 1063, + ApiKey::ListKvSnapshots => 1064, Unknown(x) => x, } } @@ -186,15 +286,40 @@ mod tests { (1016, ApiKey::PutKv), (1017, ApiKey::Lookup), (1021, ApiKey::ListOffsets), + (1023, ApiKey::GetLatestKvSnapshots), + (1024, ApiKey::GetKvSnapshotMetadata), (1025, ApiKey::GetFileSystemSecurityToken), (1026, ApiKey::InitWriter), - (1032, ApiKey::GetLatestLakeSnapshot), + (1032, ApiKey::GetLakeSnapshot), (1033, ApiKey::LimitScan), (1034, ApiKey::PrefixLookup), (1035, ApiKey::GetDatabaseInfo), (1036, ApiKey::CreatePartition), (1037, ApiKey::DropPartition), (1038, ApiKey::Authenticate), + (1039, ApiKey::CreateAcls), + (1040, ApiKey::ListAcls), + (1041, ApiKey::DropAcls), + (1044, ApiKey::AlterTable), + (1045, ApiKey::DescribeClusterConfigs), + (1046, ApiKey::AlterClusterConfigs), + (1047, ApiKey::AddServerTag), + (1048, ApiKey::RemoveServerTag), + (1049, ApiKey::Rebalance), + (1050, ApiKey::ListRebalanceProgress), + (1051, ApiKey::CancelRebalance), + (1053, ApiKey::RegisterProducerOffsets), + (1054, ApiKey::GetProducerOffsets), + (1055, ApiKey::DeleteProducerOffsets), + (1056, ApiKey::AcquireKvSnapshotLease), + (1057, ApiKey::ReleaseKvSnapshotLease), + (1058, ApiKey::DropKvSnapshotLease), + (1059, ApiKey::GetTableStats), + (1060, ApiKey::AlterDatabase), + (1061, ApiKey::ScanKv), + (1062, ApiKey::GetClusterHealth), + (1063, ApiKey::ListRemoteLogManifests), + (1064, ApiKey::ListKvSnapshots), ]; for (raw, key) in cases { diff --git a/crates/fluss/src/rpc/convert.rs b/crates/fluss/src/rpc/convert.rs index 1862589b..441645c2 100644 --- a/crates/fluss/src/rpc/convert.rs +++ b/crates/fluss/src/rpc/convert.rs @@ -73,6 +73,7 @@ mod tests { host: "127.0.0.1".to_string(), port: 9092, listeners: None, + rack: None, }; let node = from_pb_server_node(pb, ServerType::TabletServer); assert_eq!(node.id(), 7); @@ -84,6 +85,7 @@ mod tests { host: "localhost".to_string(), port: 8123, listeners: None, + rack: None, }; let node = from_pb_server_node(pb, ServerType::CoordinatorServer); assert_eq!(node.uid(), "cs-3"); diff --git a/crates/fluss/src/rpc/message/create_partition.rs b/crates/fluss/src/rpc/message/create_partition.rs index ad633655..1646f333 100644 --- a/crates/fluss/src/rpc/message/create_partition.rs +++ b/crates/fluss/src/rpc/message/create_partition.rs @@ -40,7 +40,7 @@ impl CreatePartitionRequest { inner_request: proto::CreatePartitionRequest { table_path: to_table_path(table_path), partition_spec: partition_spec.to_pb(), - ignore_if_exists, + ignore_if_not_exists: ignore_if_exists, }, } } diff --git a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs index 0b59384d..a23a985d 100644 --- a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs +++ b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs @@ -48,7 +48,7 @@ impl GetLatestLakeSnapshotRequest { impl RequestBody for GetLatestLakeSnapshotRequest { type ResponseBody = proto::GetLatestLakeSnapshotResponse; - const API_KEY: ApiKey = ApiKey::GetLatestLakeSnapshot; + const API_KEY: ApiKey = ApiKey::GetLakeSnapshot; } impl_write_type!(GetLatestLakeSnapshotRequest); diff --git a/crates/fluss/src/rpc/message/list_databases.rs b/crates/fluss/src/rpc/message/list_databases.rs index 21e16400..74ca4944 100644 --- a/crates/fluss/src/rpc/message/list_databases.rs +++ b/crates/fluss/src/rpc/message/list_databases.rs @@ -32,7 +32,9 @@ pub struct ListDatabasesRequest { impl ListDatabasesRequest { pub fn new() -> Self { ListDatabasesRequest { - inner_request: proto::ListDatabasesRequest {}, + inner_request: proto::ListDatabasesRequest { + include_summary: None, + }, } } } diff --git a/crates/fluss/src/rpc/message/lookup.rs b/crates/fluss/src/rpc/message/lookup.rs index 200d4bc8..e205fa6b 100644 --- a/crates/fluss/src/rpc/message/lookup.rs +++ b/crates/fluss/src/rpc/message/lookup.rs @@ -42,7 +42,7 @@ impl LookupRequest { |(bucket_id, partition_id, keys)| proto::PbLookupReqForBucket { partition_id, bucket_id, - key: keys, + keys, }, ) .collect(); @@ -50,6 +50,9 @@ impl LookupRequest { let request = proto::LookupRequest { table_id, buckets_req, + insert_if_not_exists: None, + acks: None, + timeout_ms: None, }; Self { From 784f39db6d241af1a3f482e6966c27a4c5a836d1 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Thu, 18 Jun 2026 10:11:46 +0800 Subject: [PATCH 2/5] [rust] Add RPC message wrappers for core DB/table operations Add 9 RPC message wrapper types: - alter_database, alter_table (DDL operations) - get_table_stats (table statistics) - list_database_summaries (database listing with summaries) - create_acls, list_acls, drop_acls (ACL management) - describe_cluster_configs, alter_cluster_configs (cluster configuration) Each wrapper follows the standard pattern: a request struct wrapping the proto-generated type, implementing RequestBody (tying to ApiKey and ResponseBody), WriteType, and ReadType. Co-Authored-By: Claude Opus 4.8 --- .../src/rpc/message/alter_cluster_configs.rs | 44 +++++++++++++ .../fluss/src/rpc/message/alter_database.rs | 53 ++++++++++++++++ crates/fluss/src/rpc/message/alter_table.rs | 62 +++++++++++++++++++ crates/fluss/src/rpc/message/create_acls.rs | 44 +++++++++++++ .../rpc/message/describe_cluster_configs.rs | 44 +++++++++++++ crates/fluss/src/rpc/message/drop_acls.rs | 44 +++++++++++++ .../fluss/src/rpc/message/get_table_stats.rs | 48 ++++++++++++++ crates/fluss/src/rpc/message/list_acls.rs | 44 +++++++++++++ .../rpc/message/list_database_summaries.rs | 45 ++++++++++++++ crates/fluss/src/rpc/message/mod.rs | 18 ++++++ 10 files changed, 446 insertions(+) create mode 100644 crates/fluss/src/rpc/message/alter_cluster_configs.rs create mode 100644 crates/fluss/src/rpc/message/alter_database.rs create mode 100644 crates/fluss/src/rpc/message/alter_table.rs create mode 100644 crates/fluss/src/rpc/message/create_acls.rs create mode 100644 crates/fluss/src/rpc/message/describe_cluster_configs.rs create mode 100644 crates/fluss/src/rpc/message/drop_acls.rs create mode 100644 crates/fluss/src/rpc/message/get_table_stats.rs create mode 100644 crates/fluss/src/rpc/message/list_acls.rs create mode 100644 crates/fluss/src/rpc/message/list_database_summaries.rs diff --git a/crates/fluss/src/rpc/message/alter_cluster_configs.rs b/crates/fluss/src/rpc/message/alter_cluster_configs.rs new file mode 100644 index 00000000..48bf4e8b --- /dev/null +++ b/crates/fluss/src/rpc/message/alter_cluster_configs.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct AlterClusterConfigsRequest { + pub inner_request: proto::AlterClusterConfigsRequest, +} + +impl AlterClusterConfigsRequest { + pub fn new(alter_configs: Vec) -> Self { + AlterClusterConfigsRequest { + inner_request: proto::AlterClusterConfigsRequest { alter_configs }, + } + } +} + +impl RequestBody for AlterClusterConfigsRequest { + type ResponseBody = proto::AlterClusterConfigsResponse; + const API_KEY: ApiKey = ApiKey::AlterClusterConfigs; +} + +impl_write_type!(AlterClusterConfigsRequest); +impl_read_type!(proto::AlterClusterConfigsResponse); diff --git a/crates/fluss/src/rpc/message/alter_database.rs b/crates/fluss/src/rpc/message/alter_database.rs new file mode 100644 index 00000000..e580afd7 --- /dev/null +++ b/crates/fluss/src/rpc/message/alter_database.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct AlterDatabaseRequest { + pub inner_request: proto::AlterDatabaseRequest, +} + +impl AlterDatabaseRequest { + pub fn new( + database_name: &str, + ignore_if_not_exists: bool, + config_changes: Vec, + ) -> Self { + AlterDatabaseRequest { + inner_request: proto::AlterDatabaseRequest { + database_name: database_name.to_string(), + ignore_if_not_exists, + config_changes, + comment: None, + }, + } + } +} + +impl RequestBody for AlterDatabaseRequest { + type ResponseBody = proto::AlterDatabaseResponse; + const API_KEY: ApiKey = ApiKey::AlterDatabase; +} + +impl_write_type!(AlterDatabaseRequest); +impl_read_type!(proto::AlterDatabaseResponse); diff --git a/crates/fluss/src/rpc/message/alter_table.rs b/crates/fluss/src/rpc/message/alter_table.rs new file mode 100644 index 00000000..5d77c522 --- /dev/null +++ b/crates/fluss/src/rpc/message/alter_table.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TablePath; +use crate::rpc::api_key::ApiKey; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct AlterTableRequest { + pub inner_request: proto::AlterTableRequest, +} + +impl AlterTableRequest { + pub fn new( + table_path: &TablePath, + ignore_if_not_exists: bool, + config_changes: Vec, + add_columns: Vec, + drop_columns: Vec, + rename_columns: Vec, + modify_columns: Vec, + ) -> Self { + AlterTableRequest { + inner_request: proto::AlterTableRequest { + table_path: to_table_path(table_path), + ignore_if_not_exists, + config_changes, + add_columns, + drop_columns, + rename_columns, + modify_columns, + }, + } + } +} + +impl RequestBody for AlterTableRequest { + type ResponseBody = proto::AlterTableResponse; + const API_KEY: ApiKey = ApiKey::AlterTable; +} + +impl_write_type!(AlterTableRequest); +impl_read_type!(proto::AlterTableResponse); diff --git a/crates/fluss/src/rpc/message/create_acls.rs b/crates/fluss/src/rpc/message/create_acls.rs new file mode 100644 index 00000000..a84e0ec3 --- /dev/null +++ b/crates/fluss/src/rpc/message/create_acls.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct CreateAclsRequest { + pub inner_request: proto::CreateAclsRequest, +} + +impl CreateAclsRequest { + pub fn new(acl: Vec) -> Self { + CreateAclsRequest { + inner_request: proto::CreateAclsRequest { acl }, + } + } +} + +impl RequestBody for CreateAclsRequest { + type ResponseBody = proto::CreateAclsResponse; + const API_KEY: ApiKey = ApiKey::CreateAcls; +} + +impl_write_type!(CreateAclsRequest); +impl_read_type!(proto::CreateAclsResponse); diff --git a/crates/fluss/src/rpc/message/describe_cluster_configs.rs b/crates/fluss/src/rpc/message/describe_cluster_configs.rs new file mode 100644 index 00000000..d085b3e6 --- /dev/null +++ b/crates/fluss/src/rpc/message/describe_cluster_configs.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct DescribeClusterConfigsRequest { + pub inner_request: proto::DescribeClusterConfigsRequest, +} + +impl DescribeClusterConfigsRequest { + pub fn new() -> Self { + DescribeClusterConfigsRequest { + inner_request: proto::DescribeClusterConfigsRequest {}, + } + } +} + +impl RequestBody for DescribeClusterConfigsRequest { + type ResponseBody = proto::DescribeClusterConfigsResponse; + const API_KEY: ApiKey = ApiKey::DescribeClusterConfigs; +} + +impl_write_type!(DescribeClusterConfigsRequest); +impl_read_type!(proto::DescribeClusterConfigsResponse); diff --git a/crates/fluss/src/rpc/message/drop_acls.rs b/crates/fluss/src/rpc/message/drop_acls.rs new file mode 100644 index 00000000..afbf303c --- /dev/null +++ b/crates/fluss/src/rpc/message/drop_acls.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct DropAclsRequest { + pub inner_request: proto::DropAclsRequest, +} + +impl DropAclsRequest { + pub fn new(acl_filter: Vec) -> Self { + DropAclsRequest { + inner_request: proto::DropAclsRequest { acl_filter }, + } + } +} + +impl RequestBody for DropAclsRequest { + type ResponseBody = proto::DropAclsResponse; + const API_KEY: ApiKey = ApiKey::DropAcls; +} + +impl_write_type!(DropAclsRequest); +impl_read_type!(proto::DropAclsResponse); diff --git a/crates/fluss/src/rpc/message/get_table_stats.rs b/crates/fluss/src/rpc/message/get_table_stats.rs new file mode 100644 index 00000000..4155cbf0 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_table_stats.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct GetTableStatsRequest { + pub inner_request: proto::GetTableStatsRequest, +} + +impl GetTableStatsRequest { + pub fn new(table_id: i64, buckets_req: Vec) -> Self { + GetTableStatsRequest { + inner_request: proto::GetTableStatsRequest { + table_id, + buckets_req, + target_columns: vec![], + }, + } + } +} + +impl RequestBody for GetTableStatsRequest { + type ResponseBody = proto::GetTableStatsResponse; + const API_KEY: ApiKey = ApiKey::GetTableStats; +} + +impl_write_type!(GetTableStatsRequest); +impl_read_type!(proto::GetTableStatsResponse); diff --git a/crates/fluss/src/rpc/message/list_acls.rs b/crates/fluss/src/rpc/message/list_acls.rs new file mode 100644 index 00000000..b93ceb0b --- /dev/null +++ b/crates/fluss/src/rpc/message/list_acls.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct ListAclsRequest { + pub inner_request: proto::ListAclsRequest, +} + +impl ListAclsRequest { + pub fn new(acl_filter: proto::PbAclFilter) -> Self { + ListAclsRequest { + inner_request: proto::ListAclsRequest { acl_filter }, + } + } +} + +impl RequestBody for ListAclsRequest { + type ResponseBody = proto::ListAclsResponse; + const API_KEY: ApiKey = ApiKey::ListAcls; +} + +impl_write_type!(ListAclsRequest); +impl_read_type!(proto::ListAclsResponse); diff --git a/crates/fluss/src/rpc/message/list_database_summaries.rs b/crates/fluss/src/rpc/message/list_database_summaries.rs new file mode 100644 index 00000000..406233fc --- /dev/null +++ b/crates/fluss/src/rpc/message/list_database_summaries.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::WriteError; +use crate::rpc::message::{RequestBody, WriteType}; +use crate::{impl_write_type, proto}; +use bytes::BufMut; +use prost::Message; + +#[derive(Debug, Default)] +pub struct ListDatabaseSummariesRequest { + pub inner_request: proto::ListDatabasesRequest, +} + +impl ListDatabaseSummariesRequest { + pub fn new() -> Self { + ListDatabaseSummariesRequest { + inner_request: proto::ListDatabasesRequest { + include_summary: Some(true), + }, + } + } +} + +impl RequestBody for ListDatabaseSummariesRequest { + type ResponseBody = proto::ListDatabasesResponse; + const API_KEY: ApiKey = ApiKey::ListDatabases; +} + +impl_write_type!(ListDatabaseSummariesRequest); diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index 096066ed..f126d214 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -19,12 +19,18 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use bytes::{Buf, BufMut}; +mod alter_cluster_configs; +mod alter_database; +mod alter_table; mod api_versions; mod authenticate; +mod create_acls; mod create_database; mod create_partition; mod create_table; mod database_exists; +mod describe_cluster_configs; +mod drop_acls; mod drop_database; mod drop_partition; mod drop_table; @@ -34,9 +40,12 @@ mod get_latest_lake_snapshot; mod get_security_token; mod get_table; mod get_table_schema; +mod get_table_stats; mod header; mod init_writer; mod limit_scan; +mod list_acls; +mod list_database_summaries; mod list_databases; mod list_offsets; mod list_partition_infos; @@ -49,12 +58,18 @@ mod table_exists; mod update_metadata; pub use crate::rpc::RpcError; +pub use alter_cluster_configs::*; +pub use alter_database::*; +pub use alter_table::*; pub use api_versions::*; pub use authenticate::*; +pub use create_acls::*; pub use create_database::*; pub use create_partition::*; pub use create_table::*; pub use database_exists::*; +pub use describe_cluster_configs::*; +pub use drop_acls::*; pub use drop_database::*; pub use drop_partition::*; pub use drop_table::*; @@ -64,9 +79,12 @@ pub use get_latest_lake_snapshot::*; pub use get_security_token::*; pub use get_table::*; pub use get_table_schema::*; +pub use get_table_stats::*; pub use header::*; pub use init_writer::*; pub use limit_scan::*; +pub use list_acls::*; +pub use list_database_summaries::*; pub use list_databases::*; pub use list_offsets::*; pub use list_partition_infos::*; From ea7c1da862aca311945d0aca5f44c169ace34ef2 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Fri, 19 Jun 2026 09:00:00 +0800 Subject: [PATCH 3/5] [rust] Add RPC message wrappers for extended operations Add message wrappers for the remaining 1.x RPC APIs: - KV snapshot lifecycle: acquire/release/drop lease, list, metadata, latest snapshots, lake snapshot - Server management: add/remove server tag, rebalance + progress + cancel, get cluster health, list remote log manifests - Producer offsets: register/get/delete - ScanKv (API 1061): full KV-table bucket scan request/response --- .../rpc/message/acquire_kv_snapshot_lease.rs | 52 +++++++++++++++++ .../fluss/src/rpc/message/add_server_tag.rs | 47 ++++++++++++++++ .../fluss/src/rpc/message/cancel_rebalance.rs | 46 +++++++++++++++ .../rpc/message/delete_producer_offsets.rs | 46 +++++++++++++++ .../src/rpc/message/drop_kv_snapshot_lease.rs | 46 +++++++++++++++ .../src/rpc/message/get_cluster_health.rs | 44 +++++++++++++++ .../rpc/message/get_kv_snapshot_metadata.rs | 49 ++++++++++++++++ .../src/rpc/message/get_lake_snapshot.rs | 50 +++++++++++++++++ .../rpc/message/get_latest_kv_snapshots.rs | 49 ++++++++++++++++ .../src/rpc/message/get_producer_offsets.rs | 46 +++++++++++++++ .../src/rpc/message/list_kv_snapshots.rs | 47 ++++++++++++++++ .../rpc/message/list_rebalance_progress.rs | 46 +++++++++++++++ .../rpc/message/list_remote_log_manifests.rs | 47 ++++++++++++++++ crates/fluss/src/rpc/message/mod.rs | 36 ++++++++++++ crates/fluss/src/rpc/message/rebalance.rs | 44 +++++++++++++++ .../rpc/message/register_producer_offsets.rs | 52 +++++++++++++++++ .../rpc/message/release_kv_snapshot_lease.rs | 47 ++++++++++++++++ .../src/rpc/message/remove_server_tag.rs | 47 ++++++++++++++++ crates/fluss/src/rpc/message/scan_kv.rs | 56 +++++++++++++++++++ 19 files changed, 897 insertions(+) create mode 100644 crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs create mode 100644 crates/fluss/src/rpc/message/add_server_tag.rs create mode 100644 crates/fluss/src/rpc/message/cancel_rebalance.rs create mode 100644 crates/fluss/src/rpc/message/delete_producer_offsets.rs create mode 100644 crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs create mode 100644 crates/fluss/src/rpc/message/get_cluster_health.rs create mode 100644 crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs create mode 100644 crates/fluss/src/rpc/message/get_lake_snapshot.rs create mode 100644 crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs create mode 100644 crates/fluss/src/rpc/message/get_producer_offsets.rs create mode 100644 crates/fluss/src/rpc/message/list_kv_snapshots.rs create mode 100644 crates/fluss/src/rpc/message/list_rebalance_progress.rs create mode 100644 crates/fluss/src/rpc/message/list_remote_log_manifests.rs create mode 100644 crates/fluss/src/rpc/message/rebalance.rs create mode 100644 crates/fluss/src/rpc/message/register_producer_offsets.rs create mode 100644 crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs create mode 100644 crates/fluss/src/rpc/message/remove_server_tag.rs create mode 100644 crates/fluss/src/rpc/message/scan_kv.rs diff --git a/crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs b/crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs new file mode 100644 index 00000000..f25adf72 --- /dev/null +++ b/crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct AcquireKvSnapshotLeaseRequest { + pub inner_request: proto::AcquireKvSnapshotLeaseRequest, +} + +impl AcquireKvSnapshotLeaseRequest { + pub fn new( + lease_id: &str, + lease_duration_ms: i64, + snapshots_to_lease: Vec, + ) -> Self { + AcquireKvSnapshotLeaseRequest { + inner_request: proto::AcquireKvSnapshotLeaseRequest { + lease_id: lease_id.to_string(), + lease_duration_ms, + snapshots_to_lease, + }, + } + } +} + +impl RequestBody for AcquireKvSnapshotLeaseRequest { + type ResponseBody = proto::AcquireKvSnapshotLeaseResponse; + const API_KEY: ApiKey = ApiKey::AcquireKvSnapshotLease; +} + +impl_write_type!(AcquireKvSnapshotLeaseRequest); +impl_read_type!(proto::AcquireKvSnapshotLeaseResponse); diff --git a/crates/fluss/src/rpc/message/add_server_tag.rs b/crates/fluss/src/rpc/message/add_server_tag.rs new file mode 100644 index 00000000..f497aa9b --- /dev/null +++ b/crates/fluss/src/rpc/message/add_server_tag.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct AddServerTagRequest { + pub inner_request: proto::AddServerTagRequest, +} + +impl AddServerTagRequest { + pub fn new(server_ids: Vec, server_tag: i32) -> Self { + AddServerTagRequest { + inner_request: proto::AddServerTagRequest { + server_ids, + server_tag, + }, + } + } +} + +impl RequestBody for AddServerTagRequest { + type ResponseBody = proto::AddServerTagResponse; + const API_KEY: ApiKey = ApiKey::AddServerTag; +} + +impl_write_type!(AddServerTagRequest); +impl_read_type!(proto::AddServerTagResponse); diff --git a/crates/fluss/src/rpc/message/cancel_rebalance.rs b/crates/fluss/src/rpc/message/cancel_rebalance.rs new file mode 100644 index 00000000..d443d355 --- /dev/null +++ b/crates/fluss/src/rpc/message/cancel_rebalance.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct CancelRebalanceRequest { + pub inner_request: proto::CancelRebalanceRequest, +} + +impl CancelRebalanceRequest { + pub fn new(rebalance_id: Option<&str>) -> Self { + CancelRebalanceRequest { + inner_request: proto::CancelRebalanceRequest { + rebalance_id: rebalance_id.map(|s| s.to_string()), + }, + } + } +} + +impl RequestBody for CancelRebalanceRequest { + type ResponseBody = proto::CancelRebalanceResponse; + const API_KEY: ApiKey = ApiKey::CancelRebalance; +} + +impl_write_type!(CancelRebalanceRequest); +impl_read_type!(proto::CancelRebalanceResponse); diff --git a/crates/fluss/src/rpc/message/delete_producer_offsets.rs b/crates/fluss/src/rpc/message/delete_producer_offsets.rs new file mode 100644 index 00000000..2cb8f8bd --- /dev/null +++ b/crates/fluss/src/rpc/message/delete_producer_offsets.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct DeleteProducerOffsetsRequest { + pub inner_request: proto::DeleteProducerOffsetsRequest, +} + +impl DeleteProducerOffsetsRequest { + pub fn new(producer_id: &str) -> Self { + DeleteProducerOffsetsRequest { + inner_request: proto::DeleteProducerOffsetsRequest { + producer_id: producer_id.to_string(), + }, + } + } +} + +impl RequestBody for DeleteProducerOffsetsRequest { + type ResponseBody = proto::DeleteProducerOffsetsResponse; + const API_KEY: ApiKey = ApiKey::DeleteProducerOffsets; +} + +impl_write_type!(DeleteProducerOffsetsRequest); +impl_read_type!(proto::DeleteProducerOffsetsResponse); diff --git a/crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs b/crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs new file mode 100644 index 00000000..51b0d597 --- /dev/null +++ b/crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct DropKvSnapshotLeaseRequest { + pub inner_request: proto::DropKvSnapshotLeaseRequest, +} + +impl DropKvSnapshotLeaseRequest { + pub fn new(lease_id: &str) -> Self { + DropKvSnapshotLeaseRequest { + inner_request: proto::DropKvSnapshotLeaseRequest { + lease_id: lease_id.to_string(), + }, + } + } +} + +impl RequestBody for DropKvSnapshotLeaseRequest { + type ResponseBody = proto::DropKvSnapshotLeaseResponse; + const API_KEY: ApiKey = ApiKey::DropKvSnapshotLease; +} + +impl_write_type!(DropKvSnapshotLeaseRequest); +impl_read_type!(proto::DropKvSnapshotLeaseResponse); diff --git a/crates/fluss/src/rpc/message/get_cluster_health.rs b/crates/fluss/src/rpc/message/get_cluster_health.rs new file mode 100644 index 00000000..1971460c --- /dev/null +++ b/crates/fluss/src/rpc/message/get_cluster_health.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct GetClusterHealthRequest { + pub inner_request: proto::GetClusterHealthRequest, +} + +impl GetClusterHealthRequest { + pub fn new() -> Self { + GetClusterHealthRequest { + inner_request: proto::GetClusterHealthRequest {}, + } + } +} + +impl RequestBody for GetClusterHealthRequest { + type ResponseBody = proto::GetClusterHealthResponse; + const API_KEY: ApiKey = ApiKey::GetClusterHealth; +} + +impl_write_type!(GetClusterHealthRequest); +impl_read_type!(proto::GetClusterHealthResponse); diff --git a/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs b/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs new file mode 100644 index 00000000..4a05ed5f --- /dev/null +++ b/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct GetKvSnapshotMetadataRequest { + pub inner_request: proto::GetKvSnapshotMetadataRequest, +} + +impl GetKvSnapshotMetadataRequest { + pub fn new(table_id: i64, partition_id: Option, bucket_id: i32, snapshot_id: i64) -> Self { + GetKvSnapshotMetadataRequest { + inner_request: proto::GetKvSnapshotMetadataRequest { + table_id, + partition_id, + bucket_id, + snapshot_id, + }, + } + } +} + +impl RequestBody for GetKvSnapshotMetadataRequest { + type ResponseBody = proto::GetKvSnapshotMetadataResponse; + const API_KEY: ApiKey = ApiKey::GetKvSnapshotMetadata; +} + +impl_write_type!(GetKvSnapshotMetadataRequest); +impl_read_type!(proto::GetKvSnapshotMetadataResponse); diff --git a/crates/fluss/src/rpc/message/get_lake_snapshot.rs b/crates/fluss/src/rpc/message/get_lake_snapshot.rs new file mode 100644 index 00000000..90ee04d6 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_lake_snapshot.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TablePath; +use crate::rpc::api_key::ApiKey; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct GetLakeSnapshotRequest { + pub inner_request: proto::GetLakeSnapshotRequest, +} + +impl GetLakeSnapshotRequest { + pub fn new(table_path: &TablePath, snapshot_id: Option, readable: Option) -> Self { + GetLakeSnapshotRequest { + inner_request: proto::GetLakeSnapshotRequest { + table_path: to_table_path(table_path), + snapshot_id, + readable, + }, + } + } +} + +impl RequestBody for GetLakeSnapshotRequest { + type ResponseBody = proto::GetLakeSnapshotResponse; + const API_KEY: ApiKey = ApiKey::GetLakeSnapshot; +} + +impl_write_type!(GetLakeSnapshotRequest); +impl_read_type!(proto::GetLakeSnapshotResponse); diff --git a/crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs b/crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs new file mode 100644 index 00000000..4a02e0d6 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TablePath; +use crate::rpc::api_key::ApiKey; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct GetLatestKvSnapshotsRequest { + pub inner_request: proto::GetLatestKvSnapshotsRequest, +} + +impl GetLatestKvSnapshotsRequest { + pub fn new(table_path: &TablePath, partition_name: Option<&str>) -> Self { + GetLatestKvSnapshotsRequest { + inner_request: proto::GetLatestKvSnapshotsRequest { + table_path: to_table_path(table_path), + partition_name: partition_name.map(|s| s.to_string()), + }, + } + } +} + +impl RequestBody for GetLatestKvSnapshotsRequest { + type ResponseBody = proto::GetLatestKvSnapshotsResponse; + const API_KEY: ApiKey = ApiKey::GetLatestKvSnapshots; +} + +impl_write_type!(GetLatestKvSnapshotsRequest); +impl_read_type!(proto::GetLatestKvSnapshotsResponse); diff --git a/crates/fluss/src/rpc/message/get_producer_offsets.rs b/crates/fluss/src/rpc/message/get_producer_offsets.rs new file mode 100644 index 00000000..46e4f839 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_producer_offsets.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct GetProducerOffsetsRequest { + pub inner_request: proto::GetProducerOffsetsRequest, +} + +impl GetProducerOffsetsRequest { + pub fn new(producer_id: &str) -> Self { + GetProducerOffsetsRequest { + inner_request: proto::GetProducerOffsetsRequest { + producer_id: producer_id.to_string(), + }, + } + } +} + +impl RequestBody for GetProducerOffsetsRequest { + type ResponseBody = proto::GetProducerOffsetsResponse; + const API_KEY: ApiKey = ApiKey::GetProducerOffsets; +} + +impl_write_type!(GetProducerOffsetsRequest); +impl_read_type!(proto::GetProducerOffsetsResponse); diff --git a/crates/fluss/src/rpc/message/list_kv_snapshots.rs b/crates/fluss/src/rpc/message/list_kv_snapshots.rs new file mode 100644 index 00000000..236113cd --- /dev/null +++ b/crates/fluss/src/rpc/message/list_kv_snapshots.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct ListKvSnapshotsRequest { + pub inner_request: proto::ListKvSnapshotsRequest, +} + +impl ListKvSnapshotsRequest { + pub fn new(table_id: i64, partition_id: Option) -> Self { + ListKvSnapshotsRequest { + inner_request: proto::ListKvSnapshotsRequest { + table_id, + partition_id, + }, + } + } +} + +impl RequestBody for ListKvSnapshotsRequest { + type ResponseBody = proto::ListKvSnapshotsResponse; + const API_KEY: ApiKey = ApiKey::ListKvSnapshots; +} + +impl_write_type!(ListKvSnapshotsRequest); +impl_read_type!(proto::ListKvSnapshotsResponse); diff --git a/crates/fluss/src/rpc/message/list_rebalance_progress.rs b/crates/fluss/src/rpc/message/list_rebalance_progress.rs new file mode 100644 index 00000000..be8aa515 --- /dev/null +++ b/crates/fluss/src/rpc/message/list_rebalance_progress.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct ListRebalanceProgressRequest { + pub inner_request: proto::ListRebalanceProgressRequest, +} + +impl ListRebalanceProgressRequest { + pub fn new(rebalance_id: Option<&str>) -> Self { + ListRebalanceProgressRequest { + inner_request: proto::ListRebalanceProgressRequest { + rebalance_id: rebalance_id.map(|s| s.to_string()), + }, + } + } +} + +impl RequestBody for ListRebalanceProgressRequest { + type ResponseBody = proto::ListRebalanceProgressResponse; + const API_KEY: ApiKey = ApiKey::ListRebalanceProgress; +} + +impl_write_type!(ListRebalanceProgressRequest); +impl_read_type!(proto::ListRebalanceProgressResponse); diff --git a/crates/fluss/src/rpc/message/list_remote_log_manifests.rs b/crates/fluss/src/rpc/message/list_remote_log_manifests.rs new file mode 100644 index 00000000..7c294ae3 --- /dev/null +++ b/crates/fluss/src/rpc/message/list_remote_log_manifests.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct ListRemoteLogManifestsRequest { + pub inner_request: proto::ListRemoteLogManifestsRequest, +} + +impl ListRemoteLogManifestsRequest { + pub fn new(table_id: i64, partition_id: Option) -> Self { + ListRemoteLogManifestsRequest { + inner_request: proto::ListRemoteLogManifestsRequest { + table_id, + partition_id, + }, + } + } +} + +impl RequestBody for ListRemoteLogManifestsRequest { + type ResponseBody = proto::ListRemoteLogManifestsResponse; + const API_KEY: ApiKey = ApiKey::ListRemoteLogManifests; +} + +impl_write_type!(ListRemoteLogManifestsRequest); +impl_read_type!(proto::ListRemoteLogManifestsResponse); diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index f126d214..aba690a5 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -19,24 +19,34 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use bytes::{Buf, BufMut}; +mod acquire_kv_snapshot_lease; +mod add_server_tag; mod alter_cluster_configs; mod alter_database; mod alter_table; mod api_versions; mod authenticate; +mod cancel_rebalance; mod create_acls; mod create_database; mod create_partition; mod create_table; mod database_exists; +mod delete_producer_offsets; mod describe_cluster_configs; mod drop_acls; mod drop_database; +mod drop_kv_snapshot_lease; mod drop_partition; mod drop_table; mod fetch; +mod get_cluster_health; mod get_database_info; +mod get_kv_snapshot_metadata; +mod get_lake_snapshot; +mod get_latest_kv_snapshots; mod get_latest_lake_snapshot; +mod get_producer_offsets; mod get_security_token; mod get_table; mod get_table_schema; @@ -47,35 +57,53 @@ mod limit_scan; mod list_acls; mod list_database_summaries; mod list_databases; +mod list_kv_snapshots; mod list_offsets; mod list_partition_infos; +mod list_rebalance_progress; +mod list_remote_log_manifests; mod list_tables; mod lookup; mod prefix_lookup; mod produce_log; mod put_kv; +mod rebalance; +mod register_producer_offsets; +mod release_kv_snapshot_lease; +mod remove_server_tag; +mod scan_kv; mod table_exists; mod update_metadata; pub use crate::rpc::RpcError; +pub use acquire_kv_snapshot_lease::*; +pub use add_server_tag::*; pub use alter_cluster_configs::*; pub use alter_database::*; pub use alter_table::*; pub use api_versions::*; pub use authenticate::*; +pub use cancel_rebalance::*; pub use create_acls::*; pub use create_database::*; pub use create_partition::*; pub use create_table::*; pub use database_exists::*; +pub use delete_producer_offsets::*; pub use describe_cluster_configs::*; pub use drop_acls::*; pub use drop_database::*; +pub use drop_kv_snapshot_lease::*; pub use drop_partition::*; pub use drop_table::*; pub use fetch::*; +pub use get_cluster_health::*; pub use get_database_info::*; +pub use get_kv_snapshot_metadata::*; +pub use get_lake_snapshot::*; +pub use get_latest_kv_snapshots::*; pub use get_latest_lake_snapshot::*; +pub use get_producer_offsets::*; pub use get_security_token::*; pub use get_table::*; pub use get_table_schema::*; @@ -86,13 +114,21 @@ pub use limit_scan::*; pub use list_acls::*; pub use list_database_summaries::*; pub use list_databases::*; +pub use list_kv_snapshots::*; pub use list_offsets::*; pub use list_partition_infos::*; +pub use list_rebalance_progress::*; +pub use list_remote_log_manifests::*; pub use list_tables::*; pub use lookup::*; pub use prefix_lookup::*; pub use produce_log::*; pub use put_kv::*; +pub use rebalance::*; +pub use register_producer_offsets::*; +pub use release_kv_snapshot_lease::*; +pub use remove_server_tag::*; +pub use scan_kv::*; pub use table_exists::*; pub use update_metadata::*; diff --git a/crates/fluss/src/rpc/message/rebalance.rs b/crates/fluss/src/rpc/message/rebalance.rs new file mode 100644 index 00000000..062f8f49 --- /dev/null +++ b/crates/fluss/src/rpc/message/rebalance.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct RebalanceRequest { + pub inner_request: proto::RebalanceRequest, +} + +impl RebalanceRequest { + pub fn new(goals: Vec) -> Self { + RebalanceRequest { + inner_request: proto::RebalanceRequest { goals }, + } + } +} + +impl RequestBody for RebalanceRequest { + type ResponseBody = proto::RebalanceResponse; + const API_KEY: ApiKey = ApiKey::Rebalance; +} + +impl_write_type!(RebalanceRequest); +impl_read_type!(proto::RebalanceResponse); diff --git a/crates/fluss/src/rpc/message/register_producer_offsets.rs b/crates/fluss/src/rpc/message/register_producer_offsets.rs new file mode 100644 index 00000000..8dfc13bb --- /dev/null +++ b/crates/fluss/src/rpc/message/register_producer_offsets.rs @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct RegisterProducerOffsetsRequest { + pub inner_request: proto::RegisterProducerOffsetsRequest, +} + +impl RegisterProducerOffsetsRequest { + pub fn new( + producer_id: &str, + table_offsets: Vec, + ttl_ms: Option, + ) -> Self { + RegisterProducerOffsetsRequest { + inner_request: proto::RegisterProducerOffsetsRequest { + producer_id: producer_id.to_string(), + table_offsets, + ttl_ms, + }, + } + } +} + +impl RequestBody for RegisterProducerOffsetsRequest { + type ResponseBody = proto::RegisterProducerOffsetsResponse; + const API_KEY: ApiKey = ApiKey::RegisterProducerOffsets; +} + +impl_write_type!(RegisterProducerOffsetsRequest); +impl_read_type!(proto::RegisterProducerOffsetsResponse); diff --git a/crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs b/crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs new file mode 100644 index 00000000..38a8fdfa --- /dev/null +++ b/crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct ReleaseKvSnapshotLeaseRequest { + pub inner_request: proto::ReleaseKvSnapshotLeaseRequest, +} + +impl ReleaseKvSnapshotLeaseRequest { + pub fn new(lease_id: &str, buckets_to_release: Vec) -> Self { + ReleaseKvSnapshotLeaseRequest { + inner_request: proto::ReleaseKvSnapshotLeaseRequest { + lease_id: lease_id.to_string(), + buckets_to_release, + }, + } + } +} + +impl RequestBody for ReleaseKvSnapshotLeaseRequest { + type ResponseBody = proto::ReleaseKvSnapshotLeaseResponse; + const API_KEY: ApiKey = ApiKey::ReleaseKvSnapshotLease; +} + +impl_write_type!(ReleaseKvSnapshotLeaseRequest); +impl_read_type!(proto::ReleaseKvSnapshotLeaseResponse); diff --git a/crates/fluss/src/rpc/message/remove_server_tag.rs b/crates/fluss/src/rpc/message/remove_server_tag.rs new file mode 100644 index 00000000..25cbeb4f --- /dev/null +++ b/crates/fluss/src/rpc/message/remove_server_tag.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct RemoveServerTagRequest { + pub inner_request: proto::RemoveServerTagRequest, +} + +impl RemoveServerTagRequest { + pub fn new(server_ids: Vec, server_tag: i32) -> Self { + RemoveServerTagRequest { + inner_request: proto::RemoveServerTagRequest { + server_ids, + server_tag, + }, + } + } +} + +impl RequestBody for RemoveServerTagRequest { + type ResponseBody = proto::RemoveServerTagResponse; + const API_KEY: ApiKey = ApiKey::RemoveServerTag; +} + +impl_write_type!(RemoveServerTagRequest); +impl_read_type!(proto::RemoveServerTagResponse); diff --git a/crates/fluss/src/rpc/message/scan_kv.rs b/crates/fluss/src/rpc/message/scan_kv.rs new file mode 100644 index 00000000..4294584f --- /dev/null +++ b/crates/fluss/src/rpc/message/scan_kv.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct ScanKvRequest { + pub inner_request: proto::ScanKvRequest, +} + +impl ScanKvRequest { + pub fn new( + scanner_id: Option>, + bucket_scan_req: Option, + call_seq_id: Option, + batch_size_bytes: Option, + close_scanner: Option, + ) -> Self { + ScanKvRequest { + inner_request: proto::ScanKvRequest { + scanner_id, + bucket_scan_req, + call_seq_id, + batch_size_bytes, + close_scanner, + }, + } + } +} + +impl RequestBody for ScanKvRequest { + type ResponseBody = proto::ScanKvResponse; + const API_KEY: ApiKey = ApiKey::ScanKv; +} + +impl_write_type!(ScanKvRequest); +impl_read_type!(proto::ScanKvResponse); From e4a4715a06d1167a21bbe74c9871a570e072a165 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Fri, 19 Jun 2026 09:00:37 +0800 Subject: [PATCH 4/5] [rust] Add Fluss 1.x protocol support to the admin client Add 27 new admin methods to FlussAdmin: - Database/table extensions: list_database_summaries, alter_database, alter_table, get_table_stats - KV snapshot operations: get_latest_kv_snapshots, get_kv_snapshot_metadata, create_kv_snapshot_lease, get_lake_snapshot - ACL management: create_acls, list_acls, drop_acls - Cluster configuration: describe_cluster_configs, alter_cluster_configs - Server management: add_server_tag, remove_server_tag, rebalance, list_rebalance_progress, cancel_rebalance - Producer offsets: register_producer_offsets, get_producer_offsets, delete_producer_offsets - Monitoring: get_cluster_health, list_remote_log_manifests - KV snapshots: list_kv_snapshots, release_kv_snapshot_lease, drop_kv_snapshot_lease --- crates/fluss/src/client/admin.rs | 338 ++++++++++++++++++++++++++++++- 1 file changed, 335 insertions(+), 3 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index d3d5a5e3..65b4f594 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -22,15 +22,24 @@ use crate::metadata::{ PhysicalTablePath, Schema, SchemaInfo, TableBucket, TableDescriptor, TableInfo, TablePath, }; use crate::rpc::message::{ + AcquireKvSnapshotLeaseRequest, AddServerTagRequest, AlterClusterConfigsRequest, + AlterDatabaseRequest, AlterTableRequest, CancelRebalanceRequest, CreateAclsRequest, CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest, - DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest, - GetLatestLakeSnapshotRequest, GetTableRequest, GetTableSchemaRequestMsg, ListDatabasesRequest, - ListPartitionInfosRequest, ListTablesRequest, TableExistsRequest, + DeleteProducerOffsetsRequest, DescribeClusterConfigsRequest, DropAclsRequest, + DropDatabaseRequest, DropKvSnapshotLeaseRequest, DropPartitionRequest, DropTableRequest, + GetClusterHealthRequest, GetDatabaseInfoRequest, GetKvSnapshotMetadataRequest, + GetLakeSnapshotRequest, GetLatestKvSnapshotsRequest, GetLatestLakeSnapshotRequest, + GetProducerOffsetsRequest, GetTableRequest, GetTableSchemaRequestMsg, GetTableStatsRequest, + ListAclsRequest, ListDatabaseSummariesRequest, ListDatabasesRequest, ListKvSnapshotsRequest, + ListPartitionInfosRequest, ListRebalanceProgressRequest, ListRemoteLogManifestsRequest, + ListTablesRequest, RebalanceRequest, RegisterProducerOffsetsRequest, + ReleaseKvSnapshotLeaseRequest, RemoveServerTagRequest, TableExistsRequest, }; use crate::rpc::message::{ListOffsetsRequest, OffsetSpec}; use crate::rpc::{RpcClient, ServerConnection}; use crate::error::{Error, Result}; +use crate::proto; use crate::proto::GetTableInfoResponse; use crate::{BucketId, PartitionId, TableId}; use std::collections::{HashMap, HashSet}; @@ -483,4 +492,327 @@ impl FlussAdmin { } Ok(tasks) } + + /// List database summaries (name, created_time, table_count). + pub async fn list_database_summaries(&self) -> Result> { + let response = self + .admin_gateway() + .await? + .request(ListDatabaseSummariesRequest::new()) + .await?; + Ok(response.database_summary) + } + + /// Alter a database's configuration. + pub async fn alter_database( + &self, + name: &str, + config_changes: Vec, + ignore_if_not_exists: bool, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterDatabaseRequest::new( + name, + ignore_if_not_exists, + config_changes, + )) + .await?; + Ok(()) + } + + /// Alter a table (config changes, add columns). + pub async fn alter_table( + &self, + table_path: &TablePath, + config_changes: Vec, + add_columns: Vec, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterTableRequest::new( + table_path, + false, + config_changes, + add_columns, + vec![], + vec![], + vec![], + )) + .await?; + Ok(()) + } + + /// Get table statistics for buckets. + pub async fn get_table_stats( + &self, + table_id: i64, + buckets_req: Vec, + ) -> Result { + self.admin_gateway() + .await? + .request(GetTableStatsRequest::new(table_id, buckets_req)) + .await + } + + /// Get the latest KV snapshots for a table. + pub async fn get_latest_kv_snapshots( + &self, + table_path: &TablePath, + partition_name: Option<&str>, + ) -> Result { + self.admin_gateway() + .await? + .request(GetLatestKvSnapshotsRequest::new(table_path, partition_name)) + .await + } + + /// Get KV snapshot metadata. + pub async fn get_kv_snapshot_metadata( + &self, + table_id: i64, + partition_id: Option, + bucket_id: i32, + snapshot_id: i64, + ) -> Result { + self.admin_gateway() + .await? + .request(GetKvSnapshotMetadataRequest::new( + table_id, + partition_id, + bucket_id, + snapshot_id, + )) + .await + } + + /// Acquire a KV snapshot lease. + pub async fn create_kv_snapshot_lease( + &self, + lease_id: &str, + lease_duration_ms: i64, + snapshots_to_lease: Vec, + ) -> Result { + self.admin_gateway() + .await? + .request(AcquireKvSnapshotLeaseRequest::new( + lease_id, + lease_duration_ms, + snapshots_to_lease, + )) + .await + } + + /// Get a lake snapshot for a table. + pub async fn get_lake_snapshot( + &self, + table_path: &TablePath, + snapshot_id: Option, + ) -> Result { + self.admin_gateway() + .await? + .request(GetLakeSnapshotRequest::new(table_path, snapshot_id, None)) + .await + } + + /// Create ACLs. + pub async fn create_acls( + &self, + acl: Vec, + ) -> Result { + self.admin_gateway() + .await? + .request(CreateAclsRequest::new(acl)) + .await + } + + /// List ACLs matching a filter. + pub async fn list_acls( + &self, + acl_filter: proto::PbAclFilter, + ) -> Result { + self.admin_gateway() + .await? + .request(ListAclsRequest::new(acl_filter)) + .await + } + + /// Drop ACLs matching filters. + pub async fn drop_acls( + &self, + acl_filter: Vec, + ) -> Result { + self.admin_gateway() + .await? + .request(DropAclsRequest::new(acl_filter)) + .await + } + + /// Describe cluster configuration. + pub async fn describe_cluster_configs(&self) -> Result { + self.admin_gateway() + .await? + .request(DescribeClusterConfigsRequest::new()) + .await + } + + /// Alter cluster configuration. + pub async fn alter_cluster_configs( + &self, + alter_configs: Vec, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterClusterConfigsRequest::new(alter_configs)) + .await?; + Ok(()) + } + + /// Add a tag to servers. + pub async fn add_server_tag(&self, server_ids: Vec, server_tag: i32) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AddServerTagRequest::new(server_ids, server_tag)) + .await?; + Ok(()) + } + + /// Remove a tag from servers. + pub async fn remove_server_tag(&self, server_ids: Vec, server_tag: i32) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(RemoveServerTagRequest::new(server_ids, server_tag)) + .await?; + Ok(()) + } + + /// Trigger a rebalance. + pub async fn rebalance(&self, goals: Vec) -> Result { + self.admin_gateway() + .await? + .request(RebalanceRequest::new(goals)) + .await + } + + /// List rebalance progress. + pub async fn list_rebalance_progress( + &self, + rebalance_id: Option<&str>, + ) -> Result { + self.admin_gateway() + .await? + .request(ListRebalanceProgressRequest::new(rebalance_id)) + .await + } + + /// Cancel a rebalance. + pub async fn cancel_rebalance(&self, rebalance_id: Option<&str>) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(CancelRebalanceRequest::new(rebalance_id)) + .await?; + Ok(()) + } + + /// Register producer offsets. + pub async fn register_producer_offsets( + &self, + producer_id: &str, + table_offsets: Vec, + ) -> Result { + self.admin_gateway() + .await? + .request(RegisterProducerOffsetsRequest::new( + producer_id, + table_offsets, + None, + )) + .await + } + + /// Get producer offsets. + pub async fn get_producer_offsets( + &self, + producer_id: &str, + ) -> Result { + self.admin_gateway() + .await? + .request(GetProducerOffsetsRequest::new(producer_id)) + .await + } + + /// Delete producer offsets. + pub async fn delete_producer_offsets(&self, producer_id: &str) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(DeleteProducerOffsetsRequest::new(producer_id)) + .await?; + Ok(()) + } + + /// Get cluster health status. + pub async fn get_cluster_health(&self) -> Result { + self.admin_gateway() + .await? + .request(GetClusterHealthRequest::new()) + .await + } + + /// List remote log manifests for a table. + pub async fn list_remote_log_manifests( + &self, + table_id: i64, + partition_id: Option, + ) -> Result { + self.admin_gateway() + .await? + .request(ListRemoteLogManifestsRequest::new(table_id, partition_id)) + .await + } + + /// List active KV snapshots for a table. + pub async fn list_kv_snapshots( + &self, + table_id: i64, + partition_id: Option, + ) -> Result { + self.admin_gateway() + .await? + .request(ListKvSnapshotsRequest::new(table_id, partition_id)) + .await + } + + /// Release specific bucket snapshots from a KV snapshot lease. + pub async fn release_kv_snapshot_lease( + &self, + lease_id: &str, + buckets_to_release: Vec, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(ReleaseKvSnapshotLeaseRequest::new( + lease_id, + buckets_to_release, + )) + .await?; + Ok(()) + } + + /// Drop an entire KV snapshot lease. + pub async fn drop_kv_snapshot_lease(&self, lease_id: &str) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(DropKvSnapshotLeaseRequest::new(lease_id)) + .await?; + Ok(()) + } } From 5b2ce66b0e5ef1f460be5fbb9a176c2503b6c40f Mon Sep 17 00:00:00 2001 From: warmbupt Date: Fri, 19 Jun 2026 09:00:44 +0800 Subject: [PATCH 5/5] [rust] Add version-aware integration test harness (0.9.x / 1.x) --- crates/fluss/Cargo.toml | 3 + .../fluss/tests/integration/admin_extended.rs | 783 ++++++++++++++++++ crates/fluss/tests/integration/admin_v1.rs | 211 +++++ crates/fluss/tests/test_fluss.rs | 3 + 4 files changed, 1000 insertions(+) create mode 100644 crates/fluss/tests/integration/admin_extended.rs create mode 100644 crates/fluss/tests/integration/admin_v1.rs diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index 821ee52e..18a37728 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -41,6 +41,9 @@ storage-fs = ["opendal/services-fs"] storage-s3 = ["opendal/services-s3"] storage-oss = ["opendal/services-oss"] integration_tests = [] +# Gates tests that exercise APIs only available on Fluss 1.x servers. +# Enable alongside `integration_tests` when running against a 1.x server image. +fluss_v1 = [] [dependencies] arrow = { workspace = true } diff --git a/crates/fluss/tests/integration/admin_extended.rs b/crates/fluss/tests/integration/admin_extended.rs new file mode 100644 index 00000000..d1850510 --- /dev/null +++ b/crates/fluss/tests/integration/admin_extended.rs @@ -0,0 +1,783 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Extended admin integration tests covering APIs available on both Fluss +//! 0.9.x and 1.x servers but not exercised by `admin.rs`. +//! +//! These tests run against the shared cluster gated behind `integration_tests`. +//! Some admin operations have semantics that depend on optional cluster +//! configuration (lake storage, authorization). For those the tests assert the +//! request/response roundtrip succeeds and tolerate a *structured* server-side +//! error (a decoded [`FlussError`]) while still failing on transport/decoding +//! errors — which is what a Rust-client integration test must guard. + +#[cfg(test)] +mod admin_extended_test { + use crate::integration::utils::get_shared_cluster; + use fluss::error::FlussError; + use fluss::metadata::{ + DataTypes, DatabaseDescriptorBuilder, JsonSerde, KvFormat, LogFormat, Schema, + TableDescriptor, TablePath, + }; + use fluss::proto; + + // ACL enum codes (mirrors the Java security model). + const RESOURCE_TYPE_DATABASE: i32 = 3; + const OPERATION_TYPE_READ: i32 = 3; + const PERMISSION_TYPE_ALLOW: i32 = 2; + + // AlterConfigOpType codes. + const OP_TYPE_SET: i32 = 0; + + // ServerTag codes. + const SERVER_TAG_TEMPORARY_OFFLINE: i32 = 1; + + /// Asserts an error decoded into one of the `allowed` Fluss API errors. + /// Panics (failing the test) on a transport/decoding error or an unexpected + /// API error code. + fn assert_expected_api_error(error: fluss::error::Error, allowed: &[FlussError]) { + match error.api_error() { + Some(code) if allowed.contains(&code) => {} + other => { + panic!("Expected one of {allowed:?} but got {other:?} (full error: {error:?})") + } + } + } + + /// Builds a simple non-partitioned log table descriptor (id INT, name STRING). + fn simple_log_table() -> TableDescriptor { + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .build() + .expect("build schema"); + TableDescriptor::builder() + .schema(schema) + .distributed_by(Some(1), vec![]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .build() + .expect("build table descriptor") + } + + /// Builds a simple primary-key/KV table descriptor (id INT PK, name STRING). + fn simple_kv_table() -> TableDescriptor { + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("build schema"); + TableDescriptor::builder() + .schema(schema) + .distributed_by(Some(1), vec!["id".to_string()]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .kv_format(KvFormat::COMPACTED) + .build() + .expect("build table descriptor") + } + + // --------------------------------------------------------------------- + // Group A: Database listing & summaries + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_list_databases() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_a = "test_list_databases_a"; + let db_b = "test_list_databases_b"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + + admin + .create_database(db_a, Some(&descriptor), true) + .await + .unwrap(); + admin + .create_database(db_b, Some(&descriptor), true) + .await + .unwrap(); + + let databases = admin.list_databases().await.expect("should list databases"); + assert!( + databases.iter().any(|d| d == db_a), + "Expected {db_a} in {databases:?}" + ); + assert!( + databases.iter().any(|d| d == db_b), + "Expected {db_b} in {databases:?}" + ); + + admin.drop_database(db_a, true, true).await.unwrap(); + admin.drop_database(db_b, true, true).await.unwrap(); + } + + #[tokio::test] + async fn test_list_database_summaries() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_list_db_summaries"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_descriptor = simple_log_table(); + for table in ["summary_t1", "summary_t2"] { + admin + .create_table(&TablePath::new(db_name, table), &table_descriptor, true) + .await + .unwrap(); + } + + let summaries = admin + .list_database_summaries() + .await + .expect("should list database summaries"); + + let summary = summaries + .iter() + .find(|s| s.database_name == db_name) + .unwrap_or_else(|| panic!("Expected summary for {db_name} in {summaries:?}")); + + assert_eq!( + summary.table_count, 2, + "Database {db_name} should report 2 tables" + ); + assert!( + summary.created_time > 0, + "created_time should be positive, got {}", + summary.created_time + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group B: Schema operations + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_get_table_schema() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_get_table_schema_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "schema_table"); + let table_descriptor = simple_kv_table(); + admin + .create_table(&table_path, &table_descriptor, true) + .await + .unwrap(); + + // Request the latest schema (schema_id = None). + let schema_info = admin + .get_table_schema(&table_path, None) + .await + .expect("should get latest table schema"); + assert!( + schema_info.schema_id() > 0, + "schema_id should be positive, got {}", + schema_info.schema_id() + ); + assert_eq!( + schema_info.schema().columns().len(), + 2, + "schema should have 2 columns" + ); + + // Request the same schema by explicit id. + let by_id = admin + .get_table_schema(&table_path, Some(schema_info.schema_id())) + .await + .expect("should get table schema by id"); + assert_eq!(by_id.schema_id(), schema_info.schema_id()); + assert_eq!(by_id.schema().columns().len(), 2); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group C: Alter operations + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_alter_table_add_column() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_alter_table_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "alter_table"); + admin + .create_table(&table_path, &simple_log_table(), true) + .await + .unwrap(); + + // Add a nullable "email" column at the end of the schema. + let data_type_json = serde_json::to_vec( + &DataTypes::string() + .serialize_json() + .expect("serialize STRING type"), + ) + .expect("encode data_type_json"); + let add_column = proto::PbAddColumn { + column_name: "email".to_string(), + data_type_json, + comment: Some("user email".to_string()), + column_position_type: 0, // LAST (the only position the server supports) + }; + + admin + .alter_table(&table_path, vec![], vec![add_column]) + .await + .expect("should add column"); + + let schema_info = admin + .get_table_schema(&table_path, None) + .await + .expect("should get schema after alter"); + assert_eq!( + schema_info.schema().columns().len(), + 3, + "schema should have 3 columns after adding email" + ); + assert!( + schema_info + .schema() + .columns() + .iter() + .any(|c| c.name() == "email"), + "schema should contain the new 'email' column" + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + #[tokio::test] + async fn test_alter_database() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_alter_database_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let config_change = proto::PbAlterConfig { + config_key: "custom.key".to_string(), + config_value: Some("custom-value".to_string()), + op_type: OP_TYPE_SET, + }; + // AlterDatabase is not implemented on every server build (e.g. 0.9.x). + // Accept the server's "unsupported" signal but never a transport failure. + match admin + .alter_database(db_name, vec![config_change], false) + .await + { + Ok(()) => { + // Altering a non-existent database with ignore_if_not_exists = true is a no-op. + admin + .alter_database("no_such_db_for_alter", vec![], true) + .await + .expect("altering missing db with ignore flag should succeed"); + } + Err(fluss::error::Error::UnsupportedVersion { .. }) => {} + Err(error) => panic!("unexpected error from alter_database: {error:?}"), + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group D: Cluster configuration + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_describe_cluster_configs() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let response = admin + .describe_cluster_configs() + .await + .expect("should describe cluster configs"); + + for config in &response.configs { + assert!( + !config.config_key.is_empty(), + "config_key should not be empty" + ); + assert!( + !config.config_source.is_empty(), + "config_source should not be empty for {}", + config.config_key + ); + } + } + + #[tokio::test] + async fn test_alter_cluster_configs() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + // Read an existing config that has a value, then SET it to the same value. + // This exercises the write path without changing cluster behaviour. + let described = admin + .describe_cluster_configs() + .await + .expect("should describe cluster configs"); + + let Some(existing) = described.configs.iter().find(|c| c.config_value.is_some()) else { + // No config with a value to round-trip; nothing to assert. + return; + }; + + let alter = proto::PbAlterConfig { + config_key: existing.config_key.clone(), + config_value: existing.config_value.clone(), + op_type: OP_TYPE_SET, + }; + + // Many keys are not dynamically alterable; the server rejects those with + // either InvalidConfigException or a generic "not allowed to be changed + // dynamically" error. Either way the request/response roundtrip worked. + if let Err(error) = admin.alter_cluster_configs(vec![alter]).await { + assert_expected_api_error( + error, + &[ + FlussError::InvalidConfigException, + FlussError::UnknownServerError, + ], + ); + } + } + + // --------------------------------------------------------------------- + // Group E: Table statistics + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_get_table_stats() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_get_table_stats_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "stats_table"); + admin + .create_table(&table_path, &simple_kv_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + let buckets_req = vec![proto::PbTableStatsReqForBucket { + partition_id: None, + bucket_id: 0, + }]; + // GetTableStats is not implemented on every server build. Accept either a + // decoded response or the server's "unsupported" signal, but never a + // transport/decoding failure. + match admin.get_table_stats(table_id, buckets_req).await { + Ok(response) => { + for bucket in &response.buckets_resp { + // Per-bucket entries echo the requested bucket id. + assert_eq!( + bucket.bucket_id, 0, + "unexpected bucket id in stats response" + ); + } + } + Err(fluss::error::Error::UnsupportedVersion { .. }) => {} + Err(error) => panic!("unexpected error from get_table_stats: {error:?}"), + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group F: KV snapshot operations (0.9.x) + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_get_latest_kv_snapshots() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_latest_kv_snapshots_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "latest_kv_snapshots_table"); + admin + .create_table(&table_path, &simple_kv_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + let response = admin + .get_latest_kv_snapshots(&table_path, None) + .await + .expect("should get latest kv snapshots"); + assert_eq!( + response.table_id, table_id, + "response table_id should match the requested table" + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + #[tokio::test] + async fn test_kv_snapshot_lease_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_kv_snapshot_lease_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "lease_table"); + admin + .create_table(&table_path, &simple_kv_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + // A fresh table has no snapshot to lease, so the requested snapshot is + // reported back as unavailable. The RPC itself must still succeed. + let lease = proto::PbKvSnapshotLeaseForTable { + table_id, + bucket_snapshots: vec![proto::PbKvSnapshotLeaseForBucket { + partition_id: None, + bucket_id: 0, + snapshot_id: 0, + }], + }; + let response = admin + .create_kv_snapshot_lease("test-lease-id", 60_000, vec![lease]) + .await + .expect("should acquire kv snapshot lease"); + + // The fresh-table snapshot is unavailable; just confirm the response decodes. + let _ = response.unavailable_snapshots; + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group G: Server tags + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_server_tag_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let nodes = admin.get_server_nodes().await.expect("should get nodes"); + let tablet_id = nodes + .iter() + .find(|n| *n.server_type() == fluss::ServerType::TabletServer) + .map(|n| n.id()) + .expect("expected a tablet server node"); + + // Add then immediately remove a TEMPORARY_OFFLINE tag so cluster state + // is restored. Both RPCs must complete without a transport error. + admin + .add_server_tag(vec![tablet_id], SERVER_TAG_TEMPORARY_OFFLINE) + .await + .expect("should add server tag"); + admin + .remove_server_tag(vec![tablet_id], SERVER_TAG_TEMPORARY_OFFLINE) + .await + .expect("should remove server tag"); + } + + // --------------------------------------------------------------------- + // Group H: Rebalance + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_rebalance_operations() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + // No rebalance is running; listing progress and cancelling are read/no-op + // paths. Tolerate a structured server error (e.g. nothing to cancel). + if let Err(error) = admin.list_rebalance_progress(None).await { + assert_expected_api_error( + error, + &[ + FlussError::UnknownServerError, + FlussError::InvalidCoordinatorException, + ], + ); + } + + if let Err(error) = admin.cancel_rebalance(None).await { + assert_expected_api_error( + error, + &[ + FlussError::UnknownServerError, + FlussError::InvalidCoordinatorException, + ], + ); + } + } + + // --------------------------------------------------------------------- + // Group I: Producer offsets + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_producer_offsets_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_producer_offsets_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "producer_offsets_table"); + admin + .create_table(&table_path, &simple_log_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + let producer_id = "test-producer"; + let table_offsets = vec![proto::PbProducerTableOffsets { + table_id, + bucket_offsets: vec![proto::PbBucketOffset { + partition_id: None, + bucket_id: 0, + log_end_offset: Some(42), + }], + }]; + + admin + .register_producer_offsets(producer_id, table_offsets) + .await + .expect("should register producer offsets"); + + let fetched = admin + .get_producer_offsets(producer_id) + .await + .expect("should get producer offsets"); + let registered = fetched + .table_offsets + .iter() + .find(|t| t.table_id == table_id) + .unwrap_or_else(|| panic!("expected offsets for table {table_id}")); + assert_eq!( + registered + .bucket_offsets + .first() + .and_then(|b| b.log_end_offset), + Some(42), + "registered log_end_offset should be 42" + ); + + admin + .delete_producer_offsets(producer_id) + .await + .expect("should delete producer offsets"); + + let after_delete = admin + .get_producer_offsets(producer_id) + .await + .expect("should get producer offsets after delete"); + assert!( + after_delete + .table_offsets + .iter() + .all(|t| t.table_id != table_id), + "offsets for table {table_id} should be gone after delete" + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group J: ACL management + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_acl_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_acl_lifecycle_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let acl = proto::PbAclInfo { + resource_name: db_name.to_string(), + resource_type: RESOURCE_TYPE_DATABASE, + principal_name: "alice".to_string(), + principal_type: "User".to_string(), + host: "*".to_string(), + operation_type: OPERATION_TYPE_READ, + permission_type: PERMISSION_TYPE_ALLOW, + }; + + // Authorization may be disabled on the shared cluster. In that case the + // server rejects the request with SecurityDisabledException; otherwise + // the full create/list/drop lifecycle should round-trip. + match admin.create_acls(vec![acl.clone()]).await { + Err(error) => { + assert_expected_api_error( + error, + &[ + FlussError::SecurityDisabledException, + FlussError::AuthorizationException, + ], + ); + } + Ok(_) => { + let filter = proto::PbAclFilter { + resource_name: Some(db_name.to_string()), + resource_type: RESOURCE_TYPE_DATABASE, + principal_name: Some("alice".to_string()), + principal_type: Some("User".to_string()), + host: Some("*".to_string()), + operation_type: OPERATION_TYPE_READ, + permission_type: PERMISSION_TYPE_ALLOW, + }; + let listed = admin + .list_acls(filter.clone()) + .await + .expect("should list acls"); + assert!( + listed.acl.iter().any(|a| a.resource_name == db_name), + "created ACL should appear in list: {listed:?}" + ); + + admin + .drop_acls(vec![filter]) + .await + .expect("should drop acls"); + } + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group K: Lake snapshots (0.9.x) + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_lake_snapshot_operations() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_lake_snapshot_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "lake_table"); + admin + .create_table(&table_path, &simple_log_table(), true) + .await + .unwrap(); + + // Lake storage is typically not configured for the test cluster, so both + // calls are expected to fail with a lake-related API error rather than a + // transport error. + let lake_errors = [ + FlussError::LakeStorageNotConfiguredException, + FlussError::LakeSnapshotNotExist, + ]; + + if let Err(error) = admin.get_latest_lake_snapshot(&table_path).await { + assert_expected_api_error(error, &lake_errors); + } + if let Err(error) = admin.get_lake_snapshot(&table_path, None).await { + assert_expected_api_error(error, &lake_errors); + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } +} diff --git a/crates/fluss/tests/integration/admin_v1.rs b/crates/fluss/tests/integration/admin_v1.rs new file mode 100644 index 00000000..bbf947e2 --- /dev/null +++ b/crates/fluss/tests/integration/admin_v1.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for admin APIs that are only available on Fluss 1.x +//! servers (API keys 1057-1058, 1061-1064). The whole module is gated behind +//! the `fluss_v1` feature so it is skipped when running against a 0.9.x server. + +#[cfg(test)] +mod admin_v1_test { + use crate::integration::utils::get_shared_cluster; + use fluss::metadata::DataTypes; + use fluss::metadata::{ + DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, TableDescriptor, TablePath, + }; + + /// `get_cluster_health` (API key 1062) reports replica/leader counts and a + /// status code for the cluster. + #[tokio::test] + async fn test_get_cluster_health() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let response = admin + .get_cluster_health() + .await + .expect("should get cluster health"); + + assert!( + response.status >= 0, + "Cluster health status should be non-negative, got: {}", + response.status + ); + } + + /// `list_kv_snapshots` (API key 1064) returns the active snapshots for a KV + /// table. A freshly created table has none, but the response must echo the + /// requested table id. + #[tokio::test] + async fn test_list_kv_snapshots() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let test_db_name = "test_list_kv_snapshots_db"; + let db_descriptor = DatabaseDescriptorBuilder::default() + .comment("Database for test_list_kv_snapshots") + .build(); + + admin + .create_database(test_db_name, Some(&db_descriptor), true) + .await + .expect("Failed to create test database"); + + let test_table_name = "kv_snapshot_table"; + let table_path = TablePath::new(test_db_name, test_table_name); + + let table_schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build table schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(table_schema) + .distributed_by(Some(1), vec!["id".to_string()]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .kv_format(KvFormat::COMPACTED) + .build() + .expect("Failed to build table descriptor"); + + admin + .create_table(&table_path, &table_descriptor, true) + .await + .expect("Failed to create table"); + + let table_info = admin + .get_table_info(&table_path) + .await + .expect("should get table info"); + let table_id = table_info.get_table_id(); + + let response = admin + .list_kv_snapshots(table_id, None) + .await + .expect("should list kv snapshots"); + + assert_eq!( + response.table_id, table_id, + "Response table_id should match request" + ); + + // Cleanup + admin.drop_table(&table_path, true).await.unwrap(); + admin.drop_database(test_db_name, true, true).await.unwrap(); + } + + /// `list_remote_log_manifests` (API key 1063) lists tiered log segments. A + /// newly created table has had no remote log activity yet. + #[tokio::test] + async fn test_list_remote_log_manifests() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let test_db_name = "test_list_remote_log_manifests_db"; + let db_descriptor = DatabaseDescriptorBuilder::default() + .comment("Database for test_list_remote_log_manifests") + .build(); + + admin + .create_database(test_db_name, Some(&db_descriptor), true) + .await + .expect("Failed to create test database"); + + let test_table_name = "remote_log_table"; + let table_path = TablePath::new(test_db_name, test_table_name); + + let table_schema = Schema::builder() + .column("id", DataTypes::int()) + .column("data", DataTypes::string()) + .build() + .expect("Failed to build table schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(table_schema) + .distributed_by(Some(1), vec![]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .build() + .expect("Failed to build table descriptor"); + + admin + .create_table(&table_path, &table_descriptor, true) + .await + .expect("Failed to create table"); + + let table_info = admin + .get_table_info(&table_path) + .await + .expect("should get table info"); + let table_id = table_info.get_table_id(); + + let response = admin + .list_remote_log_manifests(table_id, None) + .await + .expect("should list remote log manifests"); + + // A newly created table with no remote log activity should return empty manifests. + assert!( + response.manifests.is_empty(), + "Newly created table should have no remote log manifests" + ); + + // Cleanup + admin.drop_table(&table_path, true).await.unwrap(); + admin.drop_database(test_db_name, true, true).await.unwrap(); + } + + /// `drop_kv_snapshot_lease` (API key 1058) removes an entire lease. Dropping + /// a lease that never existed is a server-side no-op. + #[tokio::test] + async fn test_drop_kv_snapshot_lease() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + // Dropping a non-existent lease should succeed (no-op on server) + let result = admin.drop_kv_snapshot_lease("non-existent-lease-id").await; + assert!( + result.is_ok(), + "Dropping non-existent lease should succeed, got: {:?}", + result + ); + } + + /// `release_kv_snapshot_lease` (API key 1057) releases specific buckets from + /// a lease. Releasing an empty bucket set against an unknown lease is a + /// no-op and must not error. + #[tokio::test] + async fn test_release_kv_snapshot_lease() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let result = admin + .release_kv_snapshot_lease("non-existent-lease-id", vec![]) + .await; + assert!( + result.is_ok(), + "Releasing an empty bucket set should succeed, got: {:?}", + result + ); + } +} diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index 2d2bd152..73ed0651 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -21,6 +21,9 @@ extern crate fluss; #[cfg(feature = "integration_tests")] mod integration { mod admin; + mod admin_extended; + #[cfg(feature = "fluss_v1")] + mod admin_v1; mod batch_scanner; mod fluss_cluster; mod kv_table;