diff --git a/crates/fluss/build.rs b/crates/fluss/build.rs index 040ee521..f9248c36 100644 --- a/crates/fluss/build.rs +++ b/crates/fluss/build.rs @@ -22,8 +22,9 @@ fn main() -> Result<()> { config.bytes([ ".proto.PbProduceLogReqForBucket.records", ".proto.PbPutKvReqForBucket.records", - ".proto.PbLookupReqForBucket.key", + ".proto.PbLookupReqForBucket.keys", ".proto.PbPrefixLookupReqForBucket.keys", + ".proto.ScanKvResponse.records", ]); config.compile_protos(&["src/proto/fluss_api.proto"], &["src/proto"])?; Ok(()) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 0828b83b..d3d5a5e3 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -150,6 +150,7 @@ impl FlussAdmin { table_json, created_time, modified_time, + .. } = response; let v: &[u8] = &table_json[..]; let table_descriptor = diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 8578daa4..b36b0e42 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -2058,6 +2058,8 @@ impl LogFetcher { projection_pushdown_enabled: projection_enabled, projected_fields: projected_fields.clone(), buckets_req: feq_for_buckets, + filter_predicate: None, + filter_schema_id: None, }; let fetch_log_request = FetchLogRequest { @@ -2408,6 +2410,7 @@ mod tests { log_start_offset: None, remote_log_fetch_info: None, records: None, + filtered_end_offset: None, }], }], }; @@ -2464,6 +2467,7 @@ mod tests { log_start_offset: None, remote_log_fetch_info: None, records: None, + filtered_end_offset: None, }], }], }; @@ -2801,6 +2805,7 @@ mod tests { log_start_offset: Some(0), remote_log_fetch_info: None, records: None, + filtered_end_offset: None, }], }], }; diff --git a/crates/fluss/src/metadata/acl.rs b/crates/fluss/src/metadata/acl.rs new file mode 100644 index 00000000..84ec97af --- /dev/null +++ b/crates/fluss/src/metadata/acl.rs @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use crate::proto::{PbAclFilter, PbAclInfo}; + +/// Mirrors Java `org.apache.fluss.security.acl.ResourceType`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ResourceType { + Any, + Cluster, + Database, + Table, +} + +impl ResourceType { + pub fn to_i32(self) -> i32 { + match self { + Self::Any => 1, + Self::Cluster => 2, + Self::Database => 3, + Self::Table => 4, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 1 => Ok(Self::Any), + 2 => Ok(Self::Cluster), + 3 => Ok(Self::Database), + 4 => Ok(Self::Table), + _ => Err(Error::IllegalArgument { + message: format!("Unknown resource type code: {value}"), + }), + } + } +} + +/// Mirrors Java `org.apache.fluss.security.acl.OperationType`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OperationType { + Any, + All, + Read, + Write, + Create, + Drop, + Alter, + Describe, +} + +impl OperationType { + pub fn to_i32(self) -> i32 { + match self { + Self::Any => 1, + Self::All => 2, + Self::Read => 3, + Self::Write => 4, + Self::Create => 5, + Self::Drop => 6, + Self::Alter => 7, + Self::Describe => 8, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 1 => Ok(Self::Any), + 2 => Ok(Self::All), + 3 => Ok(Self::Read), + 4 => Ok(Self::Write), + 5 => Ok(Self::Create), + 6 => Ok(Self::Drop), + 7 => Ok(Self::Alter), + 8 => Ok(Self::Describe), + _ => Err(Error::IllegalArgument { + message: format!("Unknown operation type code: {value}"), + }), + } + } +} + +/// Mirrors Java `org.apache.fluss.security.acl.PermissionType`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PermissionType { + Any, + Allow, +} + +impl PermissionType { + pub fn to_i32(self) -> i32 { + match self { + Self::Any => 1, + Self::Allow => 2, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 1 => Ok(Self::Any), + 2 => Ok(Self::Allow), + _ => Err(Error::IllegalArgument { + message: format!("Unknown permission type: {value}"), + }), + } + } +} + +/// Mirrors Java `org.apache.fluss.security.acl.AclBinding` (concrete ACL entry). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AclInfo { + pub resource_name: String, + pub resource_type: ResourceType, + pub principal_name: String, + pub principal_type: String, + pub host: String, + pub operation_type: OperationType, + pub permission_type: PermissionType, +} + +impl AclInfo { + pub fn to_pb(&self) -> PbAclInfo { + PbAclInfo { + resource_name: self.resource_name.clone(), + resource_type: self.resource_type.to_i32(), + principal_name: self.principal_name.clone(), + principal_type: self.principal_type.clone(), + host: self.host.clone(), + operation_type: self.operation_type.to_i32(), + permission_type: self.permission_type.to_i32(), + } + } + + pub fn from_pb(pb: &PbAclInfo) -> Result { + Ok(Self { + resource_name: pb.resource_name.clone(), + resource_type: ResourceType::try_from_i32(pb.resource_type)?, + principal_name: pb.principal_name.clone(), + principal_type: pb.principal_type.clone(), + host: pb.host.clone(), + operation_type: OperationType::try_from_i32(pb.operation_type)?, + permission_type: PermissionType::try_from_i32(pb.permission_type)?, + }) + } +} + +/// Mirrors Java `org.apache.fluss.security.acl.AclBindingFilter` — like `AclInfo` but with +/// optional name/principal/host fields for partial matching. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AclFilter { + pub resource_name: Option, + pub resource_type: ResourceType, + pub principal_name: Option, + pub principal_type: Option, + pub host: Option, + pub operation_type: OperationType, + pub permission_type: PermissionType, +} + +impl AclFilter { + pub fn to_pb(&self) -> PbAclFilter { + PbAclFilter { + resource_name: self.resource_name.clone(), + resource_type: self.resource_type.to_i32(), + principal_name: self.principal_name.clone(), + principal_type: self.principal_type.clone(), + host: self.host.clone(), + operation_type: self.operation_type.to_i32(), + permission_type: self.permission_type.to_i32(), + } + } + + pub fn from_pb(pb: &PbAclFilter) -> Result { + Ok(Self { + resource_name: pb.resource_name.clone(), + resource_type: ResourceType::try_from_i32(pb.resource_type)?, + principal_name: pb.principal_name.clone(), + principal_type: pb.principal_type.clone(), + host: pb.host.clone(), + operation_type: OperationType::try_from_i32(pb.operation_type)?, + permission_type: PermissionType::try_from_i32(pb.permission_type)?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_resource_type_roundtrip() { + for v in [ + ResourceType::Any, + ResourceType::Cluster, + ResourceType::Database, + ResourceType::Table, + ] { + assert_eq!(ResourceType::try_from_i32(v.to_i32()).unwrap(), v); + } + assert!(ResourceType::try_from_i32(0).is_err()); + } + + #[test] + fn test_operation_type_roundtrip() { + for v in [ + OperationType::Any, + OperationType::All, + OperationType::Read, + OperationType::Write, + OperationType::Create, + OperationType::Drop, + OperationType::Alter, + OperationType::Describe, + ] { + assert_eq!(OperationType::try_from_i32(v.to_i32()).unwrap(), v); + } + assert!(OperationType::try_from_i32(0).is_err()); + } + + #[test] + fn test_permission_type_roundtrip() { + for v in [PermissionType::Any, PermissionType::Allow] { + assert_eq!(PermissionType::try_from_i32(v.to_i32()).unwrap(), v); + } + assert!(PermissionType::try_from_i32(0).is_err()); + } + + #[test] + fn test_acl_info_pb_roundtrip() { + let original = AclInfo { + resource_name: "topic-a".to_string(), + resource_type: ResourceType::Table, + principal_name: "alice".to_string(), + principal_type: "User".to_string(), + host: "*".to_string(), + operation_type: OperationType::Read, + permission_type: PermissionType::Allow, + }; + let pb = original.to_pb(); + assert_eq!(AclInfo::from_pb(&pb).unwrap(), original); + } + + #[test] + fn test_acl_filter_pb_roundtrip() { + let original = AclFilter { + resource_name: None, + resource_type: ResourceType::Any, + principal_name: Some("alice".to_string()), + principal_type: None, + host: Some("*".to_string()), + operation_type: OperationType::Any, + permission_type: PermissionType::Any, + }; + let pb = original.to_pb(); + assert_eq!(AclFilter::from_pb(&pb).unwrap(), original); + } +} diff --git a/crates/fluss/src/metadata/config.rs b/crates/fluss/src/metadata/config.rs new file mode 100644 index 00000000..4e045c45 --- /dev/null +++ b/crates/fluss/src/metadata/config.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use crate::proto::PbAlterConfig; + +/// Mirrors Java `org.apache.fluss.config.cluster.AlterConfigOpType`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AlterConfigOpType { + Set, + Delete, + Append, + Subtract, +} + +impl AlterConfigOpType { + pub fn to_i32(self) -> i32 { + match self { + Self::Set => 0, + Self::Delete => 1, + Self::Append => 2, + Self::Subtract => 3, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::Set), + 1 => Ok(Self::Delete), + 2 => Ok(Self::Append), + 3 => Ok(Self::Subtract), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported AlterConfigOpType: {value}"), + }), + } + } +} + +/// Mirrors Java `org.apache.fluss.config.cluster.AlterConfig`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AlterConfig { + pub config_key: String, + pub config_value: Option, + pub op_type: AlterConfigOpType, +} + +impl AlterConfig { + pub fn new>( + config_key: K, + config_value: Option, + op_type: AlterConfigOpType, + ) -> Self { + Self { + config_key: config_key.into(), + config_value, + op_type, + } + } + + pub fn to_pb(&self) -> PbAlterConfig { + PbAlterConfig { + config_key: self.config_key.clone(), + config_value: self.config_value.clone(), + op_type: self.op_type.to_i32(), + } + } + + pub fn from_pb(pb: &PbAlterConfig) -> Result { + Ok(Self { + config_key: pb.config_key.clone(), + config_value: pb.config_value.clone(), + op_type: AlterConfigOpType::try_from_i32(pb.op_type)?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_alter_config_op_type_roundtrip() { + for op in [ + AlterConfigOpType::Set, + AlterConfigOpType::Delete, + AlterConfigOpType::Append, + AlterConfigOpType::Subtract, + ] { + assert_eq!(AlterConfigOpType::try_from_i32(op.to_i32()).unwrap(), op); + } + } + + #[test] + fn test_alter_config_op_type_unknown() { + assert!(AlterConfigOpType::try_from_i32(99).is_err()); + } + + #[test] + fn test_alter_config_pb_roundtrip() { + let cases = [ + AlterConfig::new("foo", Some("bar".to_string()), AlterConfigOpType::Set), + AlterConfig::new("baz", None, AlterConfigOpType::Delete), + ]; + for original in cases { + let pb = original.to_pb(); + let restored = AlterConfig::from_pb(&pb).unwrap(); + assert_eq!(original, restored); + } + } +} diff --git a/crates/fluss/src/metadata/kv_snapshot_lease.rs b/crates/fluss/src/metadata/kv_snapshot_lease.rs new file mode 100644 index 00000000..22679136 --- /dev/null +++ b/crates/fluss/src/metadata/kv_snapshot_lease.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{PbKvSnapshotLeaseForBucket, PbKvSnapshotLeaseForTable}; + +/// One bucket's slot in a KV-snapshot lease request. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KvSnapshotLeaseForBucket { + pub partition_id: Option, + pub bucket_id: i32, + pub snapshot_id: i64, +} + +impl KvSnapshotLeaseForBucket { + pub fn to_pb(&self) -> PbKvSnapshotLeaseForBucket { + PbKvSnapshotLeaseForBucket { + partition_id: self.partition_id, + bucket_id: self.bucket_id, + snapshot_id: self.snapshot_id, + } + } + + pub fn from_pb(pb: &PbKvSnapshotLeaseForBucket) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + snapshot_id: pb.snapshot_id, + } + } +} + +/// All the buckets of a single table that should be leased together. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KvSnapshotLeaseForTable { + pub table_id: i64, + pub bucket_snapshots: Vec, +} + +impl KvSnapshotLeaseForTable { + pub fn to_pb(&self) -> PbKvSnapshotLeaseForTable { + PbKvSnapshotLeaseForTable { + table_id: self.table_id, + bucket_snapshots: self + .bucket_snapshots + .iter() + .map(KvSnapshotLeaseForBucket::to_pb) + .collect(), + } + } + + pub fn from_pb(pb: &PbKvSnapshotLeaseForTable) -> Self { + Self { + table_id: pb.table_id, + bucket_snapshots: pb + .bucket_snapshots + .iter() + .map(KvSnapshotLeaseForBucket::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_kv_snapshot_lease_for_bucket_roundtrip() { + for b in [ + KvSnapshotLeaseForBucket { + partition_id: None, + bucket_id: 0, + snapshot_id: 10, + }, + KvSnapshotLeaseForBucket { + partition_id: Some(42), + bucket_id: 3, + snapshot_id: 99, + }, + ] { + assert_eq!(KvSnapshotLeaseForBucket::from_pb(&b.to_pb()), b); + } + } + + #[test] + fn test_kv_snapshot_lease_for_table_roundtrip() { + let t = KvSnapshotLeaseForTable { + table_id: 7, + bucket_snapshots: vec![ + KvSnapshotLeaseForBucket { + partition_id: None, + bucket_id: 0, + snapshot_id: 10, + }, + KvSnapshotLeaseForBucket { + partition_id: Some(42), + bucket_id: 1, + snapshot_id: 11, + }, + ], + }; + assert_eq!(KvSnapshotLeaseForTable::from_pb(&t.to_pb()), t); + } +} diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index c1d1b72c..ee5e0c41 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -15,18 +15,30 @@ // specific language governing permissions and limitations // under the License. +mod acl; +mod config; mod data_lake_format; mod database; mod datatype; mod json_serde; +mod kv_snapshot_lease; mod partition; +mod producer_offsets; mod schema_util; mod table; +mod table_change; +mod table_stats; +pub use acl::*; +pub use config::*; pub use data_lake_format::*; pub use database::*; pub use datatype::*; pub use json_serde::*; +pub use kv_snapshot_lease::*; pub use partition::*; +pub use producer_offsets::*; pub(crate) use schema_util::{UNEXIST_MAPPING, index_mapping}; pub use table::*; +pub use table_change::*; +pub use table_stats::*; diff --git a/crates/fluss/src/metadata/partition.rs b/crates/fluss/src/metadata/partition.rs index 18402354..c63fe296 100644 --- a/crates/fluss/src/metadata/partition.rs +++ b/crates/fluss/src/metadata/partition.rs @@ -300,6 +300,7 @@ impl PartitionInfo { PbPartitionInfo { partition_id: self.partition_id, partition_spec: self.partition_spec.to_pb(), + remote_data_dir: None, } } diff --git a/crates/fluss/src/metadata/producer_offsets.rs b/crates/fluss/src/metadata/producer_offsets.rs new file mode 100644 index 00000000..f0daddbc --- /dev/null +++ b/crates/fluss/src/metadata/producer_offsets.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{PbBucketOffset, PbProducerTableOffsets}; + +/// Per-bucket producer log-end offset. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketOffset { + pub partition_id: Option, + pub bucket_id: i32, + pub log_end_offset: Option, +} + +impl BucketOffset { + pub fn to_pb(&self) -> PbBucketOffset { + PbBucketOffset { + partition_id: self.partition_id, + bucket_id: self.bucket_id, + log_end_offset: self.log_end_offset, + } + } + + pub fn from_pb(pb: &PbBucketOffset) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + log_end_offset: pb.log_end_offset, + } + } +} + +/// All bucket offsets of a single table belonging to one producer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProducerTableOffsets { + pub table_id: i64, + pub bucket_offsets: Vec, +} + +impl ProducerTableOffsets { + pub fn to_pb(&self) -> PbProducerTableOffsets { + PbProducerTableOffsets { + table_id: self.table_id, + bucket_offsets: self + .bucket_offsets + .iter() + .map(BucketOffset::to_pb) + .collect(), + } + } + + pub fn from_pb(pb: &PbProducerTableOffsets) -> Self { + Self { + table_id: pb.table_id, + bucket_offsets: pb + .bucket_offsets + .iter() + .map(BucketOffset::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bucket_offset_roundtrip() { + for b in [ + BucketOffset { + partition_id: None, + bucket_id: 0, + log_end_offset: None, + }, + BucketOffset { + partition_id: Some(42), + bucket_id: 3, + log_end_offset: Some(1234), + }, + ] { + assert_eq!(BucketOffset::from_pb(&b.to_pb()), b); + } + } + + #[test] + fn test_producer_table_offsets_roundtrip() { + let t = ProducerTableOffsets { + table_id: 5, + bucket_offsets: vec![ + BucketOffset { + partition_id: None, + bucket_id: 0, + log_end_offset: Some(100), + }, + BucketOffset { + partition_id: Some(7), + bucket_id: 1, + log_end_offset: None, + }, + ], + }; + assert_eq!(ProducerTableOffsets::from_pb(&t.to_pb()), t); + } +} diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 35d251d7..a796ed98 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -1463,6 +1463,22 @@ impl TableBucket { pub fn partition_id(&self) -> Option { self.partition_id } + + pub fn to_pb(&self) -> crate::proto::PbTableBucket { + crate::proto::PbTableBucket { + table_id: self.table_id, + partition_id: self.partition_id, + bucket_id: self.bucket, + } + } + + pub fn from_pb(pb: &crate::proto::PbTableBucket) -> Self { + Self { + table_id: pb.table_id, + partition_id: pb.partition_id, + bucket: pb.bucket_id, + } + } } impl Display for TableBucket { diff --git a/crates/fluss/src/metadata/table_change.rs b/crates/fluss/src/metadata/table_change.rs new file mode 100644 index 00000000..2ecf6cfa --- /dev/null +++ b/crates/fluss/src/metadata/table_change.rs @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use crate::proto::{PbAddColumn, PbDropColumn, PbModifyColumn, PbRenameColumn}; + +/// Mirrors Java `org.apache.fluss.config.cluster.ColumnPositionType`. +/// +/// The Java server today only handles `Last`; `First` and `After` are reserved by +/// the proto schema (`column_position_type = 0=LAST, 1=FIRST, 3=AFTER`) for future +/// use. Sending other variants will be rejected by the server. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ColumnPositionType { + Last, +} + +impl ColumnPositionType { + pub fn to_i32(self) -> i32 { + match self { + Self::Last => 0, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::Last), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported ColumnPositionType: {value}"), + }), + } + } +} + +/// Add a column. Mirrors the `AddColumn` variant of Java `TableChange`. +/// +/// `data_type_json` carries the column's `DataType` already serialized to its JSON +/// representation (matching the Java client, which transports `DataType` as JSON over +/// the wire). Callers can produce it via `DataType::serialize_json` + `serde_json::to_vec`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AddColumn { + pub column_name: String, + pub data_type_json: Vec, + pub comment: Option, + pub position: ColumnPositionType, +} + +impl AddColumn { + pub fn to_pb(&self) -> PbAddColumn { + PbAddColumn { + column_name: self.column_name.clone(), + data_type_json: self.data_type_json.clone(), + comment: self.comment.clone(), + column_position_type: self.position.to_i32(), + } + } + + pub fn from_pb(pb: &PbAddColumn) -> Result { + Ok(Self { + column_name: pb.column_name.clone(), + data_type_json: pb.data_type_json.clone(), + comment: pb.comment.clone(), + position: ColumnPositionType::try_from_i32(pb.column_position_type)?, + }) + } +} + +/// Drop a column. Mirrors the `DropColumn` variant of Java `TableChange`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DropColumn { + pub column_name: String, +} + +impl DropColumn { + pub fn to_pb(&self) -> PbDropColumn { + PbDropColumn { + column_name: self.column_name.clone(), + } + } + + pub fn from_pb(pb: &PbDropColumn) -> Self { + Self { + column_name: pb.column_name.clone(), + } + } +} + +/// Rename a column. Mirrors the `RenameColumn` variant of Java `TableChange`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RenameColumn { + pub old_column_name: String, + pub new_column_name: String, +} + +impl RenameColumn { + pub fn to_pb(&self) -> PbRenameColumn { + PbRenameColumn { + old_column_name: self.old_column_name.clone(), + new_column_name: self.new_column_name.clone(), + } + } + + pub fn from_pb(pb: &PbRenameColumn) -> Self { + Self { + old_column_name: pb.old_column_name.clone(), + new_column_name: pb.new_column_name.clone(), + } + } +} + +/// Modify a column's type/comment/position. Mirrors the `ModifyColumn` variant of +/// Java `TableChange`. All fields except `column_name` are optional — only the +/// non-`None` ones are applied. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ModifyColumn { + pub column_name: String, + pub data_type_json: Option>, + pub comment: Option, + pub position: Option, +} + +impl ModifyColumn { + pub fn to_pb(&self) -> PbModifyColumn { + PbModifyColumn { + column_name: self.column_name.clone(), + data_type_json: self.data_type_json.clone(), + comment: self.comment.clone(), + column_position_type: self.position.map(|p| p.to_i32()), + } + } + + pub fn from_pb(pb: &PbModifyColumn) -> Result { + Ok(Self { + column_name: pb.column_name.clone(), + data_type_json: pb.data_type_json.clone(), + comment: pb.comment.clone(), + position: pb + .column_position_type + .map(ColumnPositionType::try_from_i32) + .transpose()?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_column_position_type_roundtrip() { + assert_eq!( + ColumnPositionType::try_from_i32(ColumnPositionType::Last.to_i32()).unwrap(), + ColumnPositionType::Last + ); + assert!(ColumnPositionType::try_from_i32(1).is_err()); + assert!(ColumnPositionType::try_from_i32(3).is_err()); + } + + #[test] + fn test_add_column_pb_roundtrip() { + let original = AddColumn { + column_name: "amount".to_string(), + data_type_json: b"{\"type\":\"BIGINT\"}".to_vec(), + comment: Some("the amount".to_string()), + position: ColumnPositionType::Last, + }; + let pb = original.to_pb(); + assert_eq!(AddColumn::from_pb(&pb).unwrap(), original); + } + + #[test] + fn test_drop_column_pb_roundtrip() { + let original = DropColumn { + column_name: "obsolete".to_string(), + }; + let pb = original.to_pb(); + assert_eq!(DropColumn::from_pb(&pb), original); + } + + #[test] + fn test_rename_column_pb_roundtrip() { + let original = RenameColumn { + old_column_name: "old_name".to_string(), + new_column_name: "new_name".to_string(), + }; + let pb = original.to_pb(); + assert_eq!(RenameColumn::from_pb(&pb), original); + } + + #[test] + fn test_modify_column_pb_roundtrip_full() { + let original = ModifyColumn { + column_name: "x".to_string(), + data_type_json: Some(b"{\"type\":\"DOUBLE\"}".to_vec()), + comment: Some("changed".to_string()), + position: Some(ColumnPositionType::Last), + }; + let pb = original.to_pb(); + assert_eq!(ModifyColumn::from_pb(&pb).unwrap(), original); + } + + #[test] + fn test_modify_column_pb_roundtrip_empty() { + let original = ModifyColumn { + column_name: "x".to_string(), + data_type_json: None, + comment: None, + position: None, + }; + let pb = original.to_pb(); + assert_eq!(ModifyColumn::from_pb(&pb).unwrap(), original); + } +} diff --git a/crates/fluss/src/metadata/table_stats.rs b/crates/fluss/src/metadata/table_stats.rs new file mode 100644 index 00000000..ad2d1fc2 --- /dev/null +++ b/crates/fluss/src/metadata/table_stats.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::PbTableStatsReqForBucket; + +/// Per-bucket request item for `GetTableStats`. +/// Mirrors the bucket-stats request shape used by the Java client. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketStatsRequest { + pub partition_id: Option, + pub bucket_id: i32, +} + +impl BucketStatsRequest { + pub fn new(partition_id: Option, bucket_id: i32) -> Self { + Self { + partition_id, + bucket_id, + } + } + + pub fn to_pb(&self) -> PbTableStatsReqForBucket { + PbTableStatsReqForBucket { + partition_id: self.partition_id, + bucket_id: self.bucket_id, + } + } + + pub fn from_pb(pb: &PbTableStatsReqForBucket) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bucket_stats_request_pb_roundtrip() { + for req in [ + BucketStatsRequest::new(None, 0), + BucketStatsRequest::new(Some(42), 7), + ] { + let pb = req.to_pb(); + assert_eq!(BucketStatsRequest::from_pb(&pb), req); + } + } +} diff --git a/crates/fluss/src/proto/fluss_api.proto b/crates/fluss/src/proto/fluss_api.proto index 2add80d7..7d7fd89e 100644 --- a/crates/fluss/src/proto/fluss_api.proto +++ b/crates/fluss/src/proto/fluss_api.proto @@ -94,6 +94,7 @@ message PbServerNode { required string host = 2; required int32 port = 3; optional string listeners = 4; + optional string rack = 5; } message PbTableMetadata { @@ -104,6 +105,7 @@ message PbTableMetadata { repeated PbBucketMetadata bucket_metadata = 5; required int64 created_time = 6; required int64 modified_time = 7; + optional string remote_data_dir = 8; } message PbPartitionMetadata { @@ -119,7 +121,7 @@ message PbBucketMetadata { // optional as some time the leader may not elected yet optional int32 leader_id = 2; repeated int32 replica_id = 3 [packed = true]; - // TODO: Add isr here. + optional int32 leader_epoch = 4; } message PbProduceLogReqForBucket { @@ -145,6 +147,7 @@ message PutKvRequest { // if empty, means write all columns repeated int32 target_columns = 4 [packed = true]; repeated PbPutKvReqForBucket buckets_req = 5; + optional int32 agg_mode = 6; } message PutKvResponse { @@ -162,6 +165,7 @@ message PbPutKvRespForBucket { required int32 bucket_id = 2; optional int32 error_code = 3; optional string error_message = 4; + optional int64 log_end_offset = 5; } message CreateTableRequest { @@ -199,6 +203,7 @@ message GetTableInfoResponse { required bytes table_json = 3; required int64 created_time = 4; required int64 modified_time = 5; + optional string remote_data_dir = 6; } // get table schema request and response. Mirrors the Java RPC at api key 1011. @@ -258,10 +263,12 @@ message DatabaseExistsResponse { } message ListDatabasesRequest { + optional bool include_summary = 1; } message ListDatabasesResponse { repeated string database_name = 1; + repeated PbDatabaseSummary database_summary = 2; } // list offsets request and response @@ -291,11 +298,46 @@ message FetchLogResponse { repeated PbFetchLogRespForTable tables_resp = 1; } +message PbPredicate { + required int32 type = 1; + optional PbLeafPredicate leaf = 2; + optional PbCompoundPredicate compound = 3; +} + +message PbLeafPredicate { + required int32 function = 1; + required int32 field_id = 2; + repeated PbLiteralValue literals = 3; +} + +message PbCompoundPredicate { + required int32 function = 1; + repeated PbPredicate children = 2; +} + +message PbLiteralValue { + required int32 literal_type = 1; + required bool is_null = 2; + optional bool boolean_value = 3; + optional int32 int_value = 4; + optional int64 bigint_value = 5; + optional float float_value = 6; + optional double double_value = 7; + optional string string_value = 8; + optional bytes binary_value = 9; + optional int64 decimal_value = 10; + optional int64 timestamp_millis_value = 11; + optional int32 timestamp_nano_of_millis_value = 12; + optional bytes decimal_bytes = 13; +} + message PbFetchLogReqForTable { required int64 table_id = 1; required bool projection_pushdown_enabled = 2; repeated int32 projected_fields = 3 [packed = true]; repeated PbFetchLogReqForBucket buckets_req = 4; + optional PbPredicate filter_predicate = 5; + optional int32 filter_schema_id = 6; } @@ -321,6 +363,7 @@ message PbFetchLogRespForBucket { optional int64 log_start_offset = 6; // TODO now we don't introduce log start offset, but remain it in protobuf optional PbRemoteLogFetchInfo remote_log_fetch_info = 7; optional bytes records = 8; + optional int64 filtered_end_offset = 9; } message PbRemoteLogFetchInfo { @@ -360,6 +403,7 @@ message PbLakeSnapshotForBucket { optional int64 partition_id = 1; required int32 bucket_id = 2; optional int64 log_offset = 3; + optional string partition_name = 4; } message PbKeyValue { @@ -381,6 +425,9 @@ message GetFileSystemSecurityTokenResponse { message LookupRequest { required int64 table_id = 1; repeated PbLookupReqForBucket buckets_req = 2; + optional bool insert_if_not_exists = 3; + optional int32 acks = 4; + optional int32 timeout_ms = 5; } message LookupResponse { @@ -390,7 +437,7 @@ message LookupResponse { message PbLookupReqForBucket { optional int64 partition_id = 1; required int32 bucket_id = 2; - repeated bytes key = 3; + repeated bytes keys = 3; } message PbLookupRespForBucket { @@ -440,6 +487,7 @@ message PbPartitionSpec { message PbPartitionInfo { required int64 partition_id = 1; required PbPartitionSpec partition_spec = 2; + optional string remote_data_dir = 3; } message ListPartitionInfosRequest { @@ -454,7 +502,7 @@ message ListPartitionInfosResponse { message CreatePartitionRequest { required PbTablePath table_path = 1; required PbPartitionSpec partition_spec = 2; - required bool ignore_if_exists = 3; + required bool ignore_if_not_exists = 3; } message CreatePartitionResponse {} @@ -501,3 +549,413 @@ message InitWriterRequest { message InitWriterResponse { required int64 writer_id = 1; } + +message PbDatabaseSummary { + required string database_name = 1; + required int64 created_time = 2; + required int32 table_count = 3; +} + +message AlterDatabaseRequest { + required string database_name = 1; + required bool ignore_if_not_exists = 2; + repeated PbAlterConfig config_changes = 3; + optional string comment = 4; +} + +message AlterDatabaseResponse { +} + +message AlterTableRequest { + required PbTablePath table_path = 1; + required bool ignore_if_not_exists = 2; + repeated PbAlterConfig config_changes = 3; + repeated PbAddColumn add_columns = 4; + repeated PbDropColumn drop_columns = 5; + repeated PbRenameColumn rename_columns = 6; + repeated PbModifyColumn modify_columns = 7; +} + +message AlterTableResponse { +} + +message PbAlterConfig { + required string config_key = 1; + optional string config_value = 2; + required int32 op_type = 3; +} + +message PbAddColumn { + required string column_name = 1; + required bytes data_type_json = 2; + optional string comment = 3; + required int32 column_position_type = 4; +} + +message PbDropColumn { + required string column_name = 1; +} + +message PbRenameColumn { + required string old_column_name = 1; + required string new_column_name = 2; +} + +message PbModifyColumn { + required string column_name = 1; + optional bytes data_type_json = 2; + optional string comment = 3; + optional int32 column_position_type = 4; +} + +message GetTableStatsRequest { + required int64 table_id = 1; + repeated PbTableStatsReqForBucket buckets_req = 2; + repeated int32 target_columns = 3 [packed = true]; +} + +message GetTableStatsResponse { + repeated PbTableStatsRespForBucket buckets_resp = 1; +} + +message PbTableStatsReqForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; +} + +message PbTableStatsRespForBucket { + optional int32 error_code = 1; + optional string error_message = 2; + optional int64 partition_id = 3; + required int32 bucket_id = 4; + optional int64 row_count = 5; +} + +message GetLatestKvSnapshotsRequest { + required PbTablePath table_path = 1; + optional string partition_name = 2; +} + +message GetLatestKvSnapshotsResponse { + required int64 table_id = 1; + optional int64 partition_id = 2; + repeated PbKvSnapshot latest_snapshots = 3; +} + +message PbKvSnapshot { + required int32 bucket_id = 1; + optional int64 snapshot_id = 2; + optional int64 log_offset = 3; +} + +message GetKvSnapshotMetadataRequest { + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; + required int64 snapshot_id = 4; +} + +message GetKvSnapshotMetadataResponse { + required int64 log_offset = 1; + repeated PbRemotePathAndLocalFile snapshot_files = 2; +} + +message PbRemotePathAndLocalFile { + required string remote_path = 1; + required string local_file_name = 2; +} + +message AcquireKvSnapshotLeaseRequest { + required string lease_id = 1; + required int64 lease_duration_ms = 2; + repeated PbKvSnapshotLeaseForTable snapshots_to_lease = 3; +} + +message AcquireKvSnapshotLeaseResponse { + repeated PbKvSnapshotLeaseForTable unavailable_snapshots = 1; +} + +message PbKvSnapshotLeaseForTable { + required int64 table_id = 1; + repeated PbKvSnapshotLeaseForBucket bucket_snapshots = 2; +} + +message PbKvSnapshotLeaseForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + required int64 snapshot_id = 3; +} + +message GetLakeSnapshotRequest { + required PbTablePath table_path = 1; + optional int64 snapshot_id = 2; + optional bool readable = 3; +} + +message GetLakeSnapshotResponse { + required int64 table_id = 1; + required int64 snapshotId = 2; + repeated PbLakeSnapshotForBucket bucket_snapshots = 3; +} + +message PbAclInfo { + required string resource_name = 1; + required int32 resource_type = 2; + required string principal_name = 3; + required string principal_type = 4; + required string host = 5; + required int32 operation_type = 6; + required int32 permission_type = 7; +} + +message PbAclFilter { + optional string resource_name = 1; + required int32 resource_type = 2; + optional string principal_name = 3; + optional string principal_type = 4; + optional string host = 5; + required int32 operation_type = 6; + required int32 permission_type = 7; +} + +message PbCreateAclRespInfo { + required PbAclInfo acl = 1; + optional int32 error_code = 2; + optional string error_message = 3; +} + +message PbDropAclsFilterResult { + repeated PbDropAclsMatchingAcl matching_acls = 1; + optional int32 error_code = 2; + optional string error_message = 3; +} + +message PbDropAclsMatchingAcl { + required PbAclInfo acl = 1; + optional int32 error_code = 2; + optional string error_message = 3; +} + +message ListAclsRequest { + required PbAclFilter acl_filter = 1; +} + +message ListAclsResponse { + repeated PbAclInfo acl = 1; +} + +message CreateAclsRequest { + repeated PbAclInfo acl = 1; +} + +message CreateAclsResponse { + repeated PbCreateAclRespInfo aclRes = 1; +} + +message DropAclsRequest { + repeated PbAclFilter acl_filter = 1; +} + +message DropAclsResponse { + repeated PbDropAclsFilterResult filter_results = 1; +} + +message PbDescribeConfig { + required string config_key = 1; + optional string config_value = 2; + required string config_source = 3; +} + +message DescribeClusterConfigsRequest { +} + +message DescribeClusterConfigsResponse { + repeated PbDescribeConfig configs = 1; +} + +message AlterClusterConfigsRequest { + repeated PbAlterConfig alter_configs = 1; +} + +message AlterClusterConfigsResponse { +} + +message AddServerTagRequest { + repeated int32 server_ids = 1 [packed = true]; + required int32 server_tag = 2; +} + +message AddServerTagResponse { +} + +message RemoveServerTagRequest { + repeated int32 server_ids = 1 [packed = true]; + required int32 server_tag = 2; +} + +message RemoveServerTagResponse { +} + +message RebalanceRequest { + repeated int32 goals = 1 [packed = true]; +} + +message RebalanceResponse { + required string rebalance_id = 1; +} + +message ListRebalanceProgressRequest { + optional string rebalance_id = 1; +} + +message ListRebalanceProgressResponse { + optional string rebalance_id = 1; + optional int32 rebalance_status = 2; + repeated PbRebalanceProgressForTable table_progress = 3; +} + +message PbRebalanceProgressForTable { + required int64 table_id = 1; + repeated PbRebalanceProgressForBucket buckets_progress = 2; +} + +message PbRebalanceProgressForBucket { + required PbRebalancePlanForBucket rebalance_plan = 1; + required int32 rebalance_status = 2; +} + +message PbRebalancePlanForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + optional int32 original_leader = 3; + optional int32 new_leader = 4; + repeated int32 original_replicas = 5 [packed = true]; + repeated int32 new_replicas = 6 [packed = true]; +} + +message CancelRebalanceRequest { + optional string rebalance_id = 1; +} + +message CancelRebalanceResponse { +} + +message PbBucketOffset { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + optional int64 log_end_offset = 4; +} + +message PbProducerTableOffsets { + required int64 table_id = 1; + repeated PbBucketOffset bucket_offsets = 2; +} + +message RegisterProducerOffsetsRequest { + required string producer_id = 1; + repeated PbProducerTableOffsets table_offsets = 2; + optional int64 ttl_ms = 3; +} + +message RegisterProducerOffsetsResponse { + optional int32 result = 1; +} + +message GetProducerOffsetsRequest { + required string producer_id = 1; +} + +message GetProducerOffsetsResponse { + optional string producer_id = 1; + optional int64 expiration_time = 2; + repeated PbProducerTableOffsets table_offsets = 3; +} + +message DeleteProducerOffsetsRequest { + required string producer_id = 1; +} + +message DeleteProducerOffsetsResponse { +} + +message PbTableBucket { + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; +} + +message ReleaseKvSnapshotLeaseRequest { + required string lease_id = 1; + repeated PbTableBucket buckets_to_release = 2; +} + +message ReleaseKvSnapshotLeaseResponse { +} + +message DropKvSnapshotLeaseRequest { + required string lease_id = 1; +} + +message DropKvSnapshotLeaseResponse { +} + +message PbScanReqForBucket { + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; + optional int64 limit = 4; +} + +message ScanKvRequest { + optional bytes scanner_id = 1; + optional PbScanReqForBucket bucket_scan_req = 2; + optional int32 call_seq_id = 3; + optional int32 batch_size_bytes = 4; + optional bool close_scanner = 5; +} + +message ScanKvResponse { + optional int32 error_code = 1; + optional string error_message = 2; + optional bytes scanner_id = 3; + optional bool has_more_results = 4; + optional bytes records = 5; + optional int64 log_offset = 6; +} + +message GetClusterHealthRequest { +} + +message GetClusterHealthResponse { + required int32 num_replicas = 1; + required int32 in_sync_replicas = 2; + required int32 num_leader_replicas = 3; + required int32 active_leader_replicas = 4; + required int32 status = 5; +} + +message PbRemoteLogManifestEntry { + required PbTableBucket table_bucket = 1; + required string remote_log_manifest_path = 2; + required int64 remote_log_end_offset = 3; +} + +message ListRemoteLogManifestsRequest { + required int64 table_id = 1; + optional int64 partition_id = 2; +} + +message ListRemoteLogManifestsResponse { + repeated PbRemoteLogManifestEntry manifests = 1; +} + +message ListKvSnapshotsRequest { + required int64 table_id = 1; + optional int64 partition_id = 2; +} + +message ListKvSnapshotsResponse { + required int64 table_id = 1; + optional int64 partition_id = 2; + repeated PbKvSnapshot active_snapshots = 3; +} diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs index aaeca70d..cab034d2 100644 --- a/crates/fluss/src/rpc/api_key.rs +++ b/crates/fluss/src/rpc/api_key.rs @@ -38,15 +38,40 @@ pub enum ApiKey { PutKv, // 1016 Lookup, // 1017 ListOffsets, // 1021 + GetLatestKvSnapshots, // 1023 + GetKvSnapshotMetadata, // 1024 GetFileSystemSecurityToken, // 1025 InitWriter, // 1026 - GetLatestLakeSnapshot, // 1032 + GetLakeSnapshot, // 1032 LimitScan, // 1033 PrefixLookup, // 1034 GetDatabaseInfo, // 1035 CreatePartition, // 1036 DropPartition, // 1037 Authenticate, // 1038 + CreateAcls, // 1039 + ListAcls, // 1040 + DropAcls, // 1041 + AlterTable, // 1044 + DescribeClusterConfigs, // 1045 + AlterClusterConfigs, // 1046 + AddServerTag, // 1047 + RemoveServerTag, // 1048 + Rebalance, // 1049 + ListRebalanceProgress, // 1050 + CancelRebalance, // 1051 + RegisterProducerOffsets, // 1053 + GetProducerOffsets, // 1054 + DeleteProducerOffsets, // 1055 + AcquireKvSnapshotLease, // 1056 + ReleaseKvSnapshotLease, // 1057 + DropKvSnapshotLease, // 1058 + GetTableStats, // 1059 + AlterDatabase, // 1060 + ScanKv, // 1061 + GetClusterHealth, // 1062 + ListRemoteLogManifests, // 1063 + ListKvSnapshots, // 1064 Unknown(i16), } @@ -71,14 +96,39 @@ impl ApiKey { | ApiKey::ProduceLog | ApiKey::FetchLog | ApiKey::ListOffsets + | ApiKey::GetLatestKvSnapshots + | ApiKey::GetKvSnapshotMetadata | ApiKey::GetFileSystemSecurityToken | ApiKey::InitWriter - | ApiKey::GetLatestLakeSnapshot + | ApiKey::GetLakeSnapshot | ApiKey::LimitScan | ApiKey::GetDatabaseInfo | ApiKey::CreatePartition | ApiKey::DropPartition - | ApiKey::Authenticate => Some(ApiVersionRange::new(ApiVersion(0), ApiVersion(0))), + | ApiKey::Authenticate + | ApiKey::CreateAcls + | ApiKey::ListAcls + | ApiKey::DropAcls + | ApiKey::AlterTable + | ApiKey::DescribeClusterConfigs + | ApiKey::AlterClusterConfigs + | ApiKey::AddServerTag + | ApiKey::RemoveServerTag + | ApiKey::Rebalance + | ApiKey::ListRebalanceProgress + | ApiKey::CancelRebalance + | ApiKey::RegisterProducerOffsets + | ApiKey::GetProducerOffsets + | ApiKey::DeleteProducerOffsets + | ApiKey::AcquireKvSnapshotLease + | ApiKey::ReleaseKvSnapshotLease + | ApiKey::DropKvSnapshotLease + | ApiKey::GetTableStats + | ApiKey::AlterDatabase + | ApiKey::ScanKv + | ApiKey::GetClusterHealth + | ApiKey::ListRemoteLogManifests + | ApiKey::ListKvSnapshots => Some(ApiVersionRange::new(ApiVersion(0), ApiVersion(0))), // PutKv / Lookup / PrefixLookup support v0 (legacy key encoding) // and v1 (Paimon BinaryRow key encoding for kv_format_version=2 // non-default bucket keys). The Rust client encodes both. @@ -111,15 +161,40 @@ impl From for ApiKey { 1016 => ApiKey::PutKv, 1017 => ApiKey::Lookup, 1021 => ApiKey::ListOffsets, + 1023 => ApiKey::GetLatestKvSnapshots, + 1024 => ApiKey::GetKvSnapshotMetadata, 1025 => ApiKey::GetFileSystemSecurityToken, 1026 => ApiKey::InitWriter, - 1032 => ApiKey::GetLatestLakeSnapshot, + 1032 => ApiKey::GetLakeSnapshot, 1033 => ApiKey::LimitScan, 1034 => ApiKey::PrefixLookup, 1035 => ApiKey::GetDatabaseInfo, 1036 => ApiKey::CreatePartition, 1037 => ApiKey::DropPartition, 1038 => ApiKey::Authenticate, + 1039 => ApiKey::CreateAcls, + 1040 => ApiKey::ListAcls, + 1041 => ApiKey::DropAcls, + 1044 => ApiKey::AlterTable, + 1045 => ApiKey::DescribeClusterConfigs, + 1046 => ApiKey::AlterClusterConfigs, + 1047 => ApiKey::AddServerTag, + 1048 => ApiKey::RemoveServerTag, + 1049 => ApiKey::Rebalance, + 1050 => ApiKey::ListRebalanceProgress, + 1051 => ApiKey::CancelRebalance, + 1053 => ApiKey::RegisterProducerOffsets, + 1054 => ApiKey::GetProducerOffsets, + 1055 => ApiKey::DeleteProducerOffsets, + 1056 => ApiKey::AcquireKvSnapshotLease, + 1057 => ApiKey::ReleaseKvSnapshotLease, + 1058 => ApiKey::DropKvSnapshotLease, + 1059 => ApiKey::GetTableStats, + 1060 => ApiKey::AlterDatabase, + 1061 => ApiKey::ScanKv, + 1062 => ApiKey::GetClusterHealth, + 1063 => ApiKey::ListRemoteLogManifests, + 1064 => ApiKey::ListKvSnapshots, _ => Unknown(key), } @@ -147,15 +222,40 @@ impl From for i16 { ApiKey::PutKv => 1016, ApiKey::Lookup => 1017, ApiKey::ListOffsets => 1021, + ApiKey::GetLatestKvSnapshots => 1023, + ApiKey::GetKvSnapshotMetadata => 1024, ApiKey::GetFileSystemSecurityToken => 1025, ApiKey::InitWriter => 1026, - ApiKey::GetLatestLakeSnapshot => 1032, + ApiKey::GetLakeSnapshot => 1032, ApiKey::LimitScan => 1033, ApiKey::PrefixLookup => 1034, ApiKey::GetDatabaseInfo => 1035, ApiKey::CreatePartition => 1036, ApiKey::DropPartition => 1037, ApiKey::Authenticate => 1038, + ApiKey::CreateAcls => 1039, + ApiKey::ListAcls => 1040, + ApiKey::DropAcls => 1041, + ApiKey::AlterTable => 1044, + ApiKey::DescribeClusterConfigs => 1045, + ApiKey::AlterClusterConfigs => 1046, + ApiKey::AddServerTag => 1047, + ApiKey::RemoveServerTag => 1048, + ApiKey::Rebalance => 1049, + ApiKey::ListRebalanceProgress => 1050, + ApiKey::CancelRebalance => 1051, + ApiKey::RegisterProducerOffsets => 1053, + ApiKey::GetProducerOffsets => 1054, + ApiKey::DeleteProducerOffsets => 1055, + ApiKey::AcquireKvSnapshotLease => 1056, + ApiKey::ReleaseKvSnapshotLease => 1057, + ApiKey::DropKvSnapshotLease => 1058, + ApiKey::GetTableStats => 1059, + ApiKey::AlterDatabase => 1060, + ApiKey::ScanKv => 1061, + ApiKey::GetClusterHealth => 1062, + ApiKey::ListRemoteLogManifests => 1063, + ApiKey::ListKvSnapshots => 1064, Unknown(x) => x, } } @@ -186,15 +286,40 @@ mod tests { (1016, ApiKey::PutKv), (1017, ApiKey::Lookup), (1021, ApiKey::ListOffsets), + (1023, ApiKey::GetLatestKvSnapshots), + (1024, ApiKey::GetKvSnapshotMetadata), (1025, ApiKey::GetFileSystemSecurityToken), (1026, ApiKey::InitWriter), - (1032, ApiKey::GetLatestLakeSnapshot), + (1032, ApiKey::GetLakeSnapshot), (1033, ApiKey::LimitScan), (1034, ApiKey::PrefixLookup), (1035, ApiKey::GetDatabaseInfo), (1036, ApiKey::CreatePartition), (1037, ApiKey::DropPartition), (1038, ApiKey::Authenticate), + (1039, ApiKey::CreateAcls), + (1040, ApiKey::ListAcls), + (1041, ApiKey::DropAcls), + (1044, ApiKey::AlterTable), + (1045, ApiKey::DescribeClusterConfigs), + (1046, ApiKey::AlterClusterConfigs), + (1047, ApiKey::AddServerTag), + (1048, ApiKey::RemoveServerTag), + (1049, ApiKey::Rebalance), + (1050, ApiKey::ListRebalanceProgress), + (1051, ApiKey::CancelRebalance), + (1053, ApiKey::RegisterProducerOffsets), + (1054, ApiKey::GetProducerOffsets), + (1055, ApiKey::DeleteProducerOffsets), + (1056, ApiKey::AcquireKvSnapshotLease), + (1057, ApiKey::ReleaseKvSnapshotLease), + (1058, ApiKey::DropKvSnapshotLease), + (1059, ApiKey::GetTableStats), + (1060, ApiKey::AlterDatabase), + (1061, ApiKey::ScanKv), + (1062, ApiKey::GetClusterHealth), + (1063, ApiKey::ListRemoteLogManifests), + (1064, ApiKey::ListKvSnapshots), ]; for (raw, key) in cases { diff --git a/crates/fluss/src/rpc/convert.rs b/crates/fluss/src/rpc/convert.rs index 1862589b..441645c2 100644 --- a/crates/fluss/src/rpc/convert.rs +++ b/crates/fluss/src/rpc/convert.rs @@ -73,6 +73,7 @@ mod tests { host: "127.0.0.1".to_string(), port: 9092, listeners: None, + rack: None, }; let node = from_pb_server_node(pb, ServerType::TabletServer); assert_eq!(node.id(), 7); @@ -84,6 +85,7 @@ mod tests { host: "localhost".to_string(), port: 8123, listeners: None, + rack: None, }; let node = from_pb_server_node(pb, ServerType::CoordinatorServer); assert_eq!(node.uid(), "cs-3"); diff --git a/crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs b/crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs new file mode 100644 index 00000000..391cfcfd --- /dev/null +++ b/crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::KvSnapshotLeaseForTable; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct AcquireKvSnapshotLeaseRequest { + pub inner_request: proto::AcquireKvSnapshotLeaseRequest, +} + +impl AcquireKvSnapshotLeaseRequest { + pub fn new( + lease_id: &str, + lease_duration_ms: i64, + snapshots_to_lease: Vec, + ) -> Self { + AcquireKvSnapshotLeaseRequest { + inner_request: proto::AcquireKvSnapshotLeaseRequest { + lease_id: lease_id.to_string(), + lease_duration_ms, + snapshots_to_lease: snapshots_to_lease + .iter() + .map(KvSnapshotLeaseForTable::to_pb) + .collect(), + }, + } + } +} + +impl RequestBody for AcquireKvSnapshotLeaseRequest { + type ResponseBody = proto::AcquireKvSnapshotLeaseResponse; + const API_KEY: ApiKey = ApiKey::AcquireKvSnapshotLease; +} + +impl_write_type!(AcquireKvSnapshotLeaseRequest); +impl_read_type!(proto::AcquireKvSnapshotLeaseResponse); diff --git a/crates/fluss/src/rpc/message/add_server_tag.rs b/crates/fluss/src/rpc/message/add_server_tag.rs new file mode 100644 index 00000000..ee1caeee --- /dev/null +++ b/crates/fluss/src/rpc/message/add_server_tag.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct AddServerTagRequest { + pub inner_request: proto::AddServerTagRequest, +} + +impl AddServerTagRequest { + pub fn new(server_ids: Vec, server_tag: i32) -> Self { + AddServerTagRequest { + inner_request: proto::AddServerTagRequest { + server_ids, + server_tag, + }, + } + } +} + +impl RequestBody for AddServerTagRequest { + type ResponseBody = proto::AddServerTagResponse; + const API_KEY: ApiKey = ApiKey::AddServerTag; +} + +impl_write_type!(AddServerTagRequest); +impl_read_type!(proto::AddServerTagResponse); diff --git a/crates/fluss/src/rpc/message/alter_cluster_configs.rs b/crates/fluss/src/rpc/message/alter_cluster_configs.rs new file mode 100644 index 00000000..fb8ced2d --- /dev/null +++ b/crates/fluss/src/rpc/message/alter_cluster_configs.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::AlterConfig; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct AlterClusterConfigsRequest { + pub inner_request: proto::AlterClusterConfigsRequest, +} + +impl AlterClusterConfigsRequest { + pub fn new(alter_configs: Vec) -> Self { + AlterClusterConfigsRequest { + inner_request: proto::AlterClusterConfigsRequest { + alter_configs: alter_configs.iter().map(AlterConfig::to_pb).collect(), + }, + } + } +} + +impl RequestBody for AlterClusterConfigsRequest { + type ResponseBody = proto::AlterClusterConfigsResponse; + const API_KEY: ApiKey = ApiKey::AlterClusterConfigs; +} + +impl_write_type!(AlterClusterConfigsRequest); +impl_read_type!(proto::AlterClusterConfigsResponse); diff --git a/crates/fluss/src/rpc/message/alter_database.rs b/crates/fluss/src/rpc/message/alter_database.rs new file mode 100644 index 00000000..dc05f6ed --- /dev/null +++ b/crates/fluss/src/rpc/message/alter_database.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::AlterConfig; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct AlterDatabaseRequest { + pub inner_request: proto::AlterDatabaseRequest, +} + +impl AlterDatabaseRequest { + pub fn new( + database_name: &str, + ignore_if_not_exists: bool, + config_changes: Vec, + ) -> Self { + AlterDatabaseRequest { + inner_request: proto::AlterDatabaseRequest { + database_name: database_name.to_string(), + ignore_if_not_exists, + config_changes: config_changes.iter().map(AlterConfig::to_pb).collect(), + comment: None, + }, + } + } +} + +impl RequestBody for AlterDatabaseRequest { + type ResponseBody = proto::AlterDatabaseResponse; + const API_KEY: ApiKey = ApiKey::AlterDatabase; +} + +impl_write_type!(AlterDatabaseRequest); +impl_read_type!(proto::AlterDatabaseResponse); diff --git a/crates/fluss/src/rpc/message/alter_table.rs b/crates/fluss/src/rpc/message/alter_table.rs new file mode 100644 index 00000000..7cd59ae0 --- /dev/null +++ b/crates/fluss/src/rpc/message/alter_table.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::{AddColumn, AlterConfig, DropColumn, ModifyColumn, RenameColumn, TablePath}; +use crate::rpc::api_key::ApiKey; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct AlterTableRequest { + pub inner_request: proto::AlterTableRequest, +} + +impl AlterTableRequest { + pub fn new( + table_path: &TablePath, + ignore_if_not_exists: bool, + config_changes: Vec, + add_columns: Vec, + drop_columns: Vec, + rename_columns: Vec, + modify_columns: Vec, + ) -> Self { + AlterTableRequest { + inner_request: proto::AlterTableRequest { + table_path: to_table_path(table_path), + ignore_if_not_exists, + config_changes: config_changes.iter().map(AlterConfig::to_pb).collect(), + add_columns: add_columns.iter().map(AddColumn::to_pb).collect(), + drop_columns: drop_columns.iter().map(DropColumn::to_pb).collect(), + rename_columns: rename_columns.iter().map(RenameColumn::to_pb).collect(), + modify_columns: modify_columns.iter().map(ModifyColumn::to_pb).collect(), + }, + } + } +} + +impl RequestBody for AlterTableRequest { + type ResponseBody = proto::AlterTableResponse; + const API_KEY: ApiKey = ApiKey::AlterTable; +} + +impl_write_type!(AlterTableRequest); +impl_read_type!(proto::AlterTableResponse); diff --git a/crates/fluss/src/rpc/message/cancel_rebalance.rs b/crates/fluss/src/rpc/message/cancel_rebalance.rs new file mode 100644 index 00000000..9dc4ee1b --- /dev/null +++ b/crates/fluss/src/rpc/message/cancel_rebalance.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct CancelRebalanceRequest { + pub inner_request: proto::CancelRebalanceRequest, +} + +impl CancelRebalanceRequest { + pub fn new(rebalance_id: Option<&str>) -> Self { + CancelRebalanceRequest { + inner_request: proto::CancelRebalanceRequest { + rebalance_id: rebalance_id.map(|s| s.to_string()), + }, + } + } +} + +impl RequestBody for CancelRebalanceRequest { + type ResponseBody = proto::CancelRebalanceResponse; + const API_KEY: ApiKey = ApiKey::CancelRebalance; +} + +impl_write_type!(CancelRebalanceRequest); +impl_read_type!(proto::CancelRebalanceResponse); diff --git a/crates/fluss/src/rpc/message/create_acls.rs b/crates/fluss/src/rpc/message/create_acls.rs new file mode 100644 index 00000000..658aa44d --- /dev/null +++ b/crates/fluss/src/rpc/message/create_acls.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::AclInfo; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct CreateAclsRequest { + pub inner_request: proto::CreateAclsRequest, +} + +impl CreateAclsRequest { + pub fn new(acl: Vec) -> Self { + CreateAclsRequest { + inner_request: proto::CreateAclsRequest { + acl: acl.iter().map(AclInfo::to_pb).collect(), + }, + } + } +} + +impl RequestBody for CreateAclsRequest { + type ResponseBody = proto::CreateAclsResponse; + const API_KEY: ApiKey = ApiKey::CreateAcls; +} + +impl_write_type!(CreateAclsRequest); +impl_read_type!(proto::CreateAclsResponse); diff --git a/crates/fluss/src/rpc/message/create_partition.rs b/crates/fluss/src/rpc/message/create_partition.rs index ad633655..1646f333 100644 --- a/crates/fluss/src/rpc/message/create_partition.rs +++ b/crates/fluss/src/rpc/message/create_partition.rs @@ -40,7 +40,7 @@ impl CreatePartitionRequest { inner_request: proto::CreatePartitionRequest { table_path: to_table_path(table_path), partition_spec: partition_spec.to_pb(), - ignore_if_exists, + ignore_if_not_exists: ignore_if_exists, }, } } diff --git a/crates/fluss/src/rpc/message/delete_producer_offsets.rs b/crates/fluss/src/rpc/message/delete_producer_offsets.rs new file mode 100644 index 00000000..b677b48f --- /dev/null +++ b/crates/fluss/src/rpc/message/delete_producer_offsets.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct DeleteProducerOffsetsRequest { + pub inner_request: proto::DeleteProducerOffsetsRequest, +} + +impl DeleteProducerOffsetsRequest { + pub fn new(producer_id: &str) -> Self { + DeleteProducerOffsetsRequest { + inner_request: proto::DeleteProducerOffsetsRequest { + producer_id: producer_id.to_string(), + }, + } + } +} + +impl RequestBody for DeleteProducerOffsetsRequest { + type ResponseBody = proto::DeleteProducerOffsetsResponse; + const API_KEY: ApiKey = ApiKey::DeleteProducerOffsets; +} + +impl_write_type!(DeleteProducerOffsetsRequest); +impl_read_type!(proto::DeleteProducerOffsetsResponse); diff --git a/crates/fluss/src/rpc/message/describe_cluster_configs.rs b/crates/fluss/src/rpc/message/describe_cluster_configs.rs new file mode 100644 index 00000000..d085b3e6 --- /dev/null +++ b/crates/fluss/src/rpc/message/describe_cluster_configs.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct DescribeClusterConfigsRequest { + pub inner_request: proto::DescribeClusterConfigsRequest, +} + +impl DescribeClusterConfigsRequest { + pub fn new() -> Self { + DescribeClusterConfigsRequest { + inner_request: proto::DescribeClusterConfigsRequest {}, + } + } +} + +impl RequestBody for DescribeClusterConfigsRequest { + type ResponseBody = proto::DescribeClusterConfigsResponse; + const API_KEY: ApiKey = ApiKey::DescribeClusterConfigs; +} + +impl_write_type!(DescribeClusterConfigsRequest); +impl_read_type!(proto::DescribeClusterConfigsResponse); diff --git a/crates/fluss/src/rpc/message/drop_acls.rs b/crates/fluss/src/rpc/message/drop_acls.rs new file mode 100644 index 00000000..cad27e7b --- /dev/null +++ b/crates/fluss/src/rpc/message/drop_acls.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::AclFilter; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct DropAclsRequest { + pub inner_request: proto::DropAclsRequest, +} + +impl DropAclsRequest { + pub fn new(acl_filter: Vec) -> Self { + DropAclsRequest { + inner_request: proto::DropAclsRequest { + acl_filter: acl_filter.iter().map(AclFilter::to_pb).collect(), + }, + } + } +} + +impl RequestBody for DropAclsRequest { + type ResponseBody = proto::DropAclsResponse; + const API_KEY: ApiKey = ApiKey::DropAcls; +} + +impl_write_type!(DropAclsRequest); +impl_read_type!(proto::DropAclsResponse); diff --git a/crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs b/crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs new file mode 100644 index 00000000..13dfbf78 --- /dev/null +++ b/crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct DropKvSnapshotLeaseRequest { + pub inner_request: proto::DropKvSnapshotLeaseRequest, +} + +impl DropKvSnapshotLeaseRequest { + pub fn new(lease_id: &str) -> Self { + DropKvSnapshotLeaseRequest { + inner_request: proto::DropKvSnapshotLeaseRequest { + lease_id: lease_id.to_string(), + }, + } + } +} + +impl RequestBody for DropKvSnapshotLeaseRequest { + type ResponseBody = proto::DropKvSnapshotLeaseResponse; + const API_KEY: ApiKey = ApiKey::DropKvSnapshotLease; +} + +impl_write_type!(DropKvSnapshotLeaseRequest); +impl_read_type!(proto::DropKvSnapshotLeaseResponse); diff --git a/crates/fluss/src/rpc/message/get_cluster_health.rs b/crates/fluss/src/rpc/message/get_cluster_health.rs new file mode 100644 index 00000000..1971460c --- /dev/null +++ b/crates/fluss/src/rpc/message/get_cluster_health.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct GetClusterHealthRequest { + pub inner_request: proto::GetClusterHealthRequest, +} + +impl GetClusterHealthRequest { + pub fn new() -> Self { + GetClusterHealthRequest { + inner_request: proto::GetClusterHealthRequest {}, + } + } +} + +impl RequestBody for GetClusterHealthRequest { + type ResponseBody = proto::GetClusterHealthResponse; + const API_KEY: ApiKey = ApiKey::GetClusterHealth; +} + +impl_write_type!(GetClusterHealthRequest); +impl_read_type!(proto::GetClusterHealthResponse); diff --git a/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs b/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs new file mode 100644 index 00000000..6fb8d5a3 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetKvSnapshotMetadataRequest { + pub inner_request: proto::GetKvSnapshotMetadataRequest, +} + +impl GetKvSnapshotMetadataRequest { + pub fn new(table_id: i64, partition_id: Option, bucket_id: i32, snapshot_id: i64) -> Self { + GetKvSnapshotMetadataRequest { + inner_request: proto::GetKvSnapshotMetadataRequest { + table_id, + partition_id, + bucket_id, + snapshot_id, + }, + } + } +} + +impl RequestBody for GetKvSnapshotMetadataRequest { + type ResponseBody = proto::GetKvSnapshotMetadataResponse; + const API_KEY: ApiKey = ApiKey::GetKvSnapshotMetadata; +} + +impl_write_type!(GetKvSnapshotMetadataRequest); +impl_read_type!(proto::GetKvSnapshotMetadataResponse); diff --git a/crates/fluss/src/rpc/message/get_lake_snapshot.rs b/crates/fluss/src/rpc/message/get_lake_snapshot.rs new file mode 100644 index 00000000..8d24df4c --- /dev/null +++ b/crates/fluss/src/rpc/message/get_lake_snapshot.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TablePath; +use crate::rpc::api_key::ApiKey; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetLakeSnapshotRequest { + pub inner_request: proto::GetLakeSnapshotRequest, +} + +impl GetLakeSnapshotRequest { + pub fn new(table_path: &TablePath, snapshot_id: Option, readable: Option) -> Self { + GetLakeSnapshotRequest { + inner_request: proto::GetLakeSnapshotRequest { + table_path: to_table_path(table_path), + snapshot_id, + readable, + }, + } + } +} + +impl RequestBody for GetLakeSnapshotRequest { + type ResponseBody = proto::GetLakeSnapshotResponse; + const API_KEY: ApiKey = ApiKey::GetLakeSnapshot; +} + +impl_write_type!(GetLakeSnapshotRequest); +impl_read_type!(proto::GetLakeSnapshotResponse); diff --git a/crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs b/crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs new file mode 100644 index 00000000..82efb172 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TablePath; +use crate::rpc::api_key::ApiKey; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetLatestKvSnapshotsRequest { + pub inner_request: proto::GetLatestKvSnapshotsRequest, +} + +impl GetLatestKvSnapshotsRequest { + pub fn new(table_path: &TablePath, partition_name: Option<&str>) -> Self { + GetLatestKvSnapshotsRequest { + inner_request: proto::GetLatestKvSnapshotsRequest { + table_path: to_table_path(table_path), + partition_name: partition_name.map(|s| s.to_string()), + }, + } + } +} + +impl RequestBody for GetLatestKvSnapshotsRequest { + type ResponseBody = proto::GetLatestKvSnapshotsResponse; + const API_KEY: ApiKey = ApiKey::GetLatestKvSnapshots; +} + +impl_write_type!(GetLatestKvSnapshotsRequest); +impl_read_type!(proto::GetLatestKvSnapshotsResponse); diff --git a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs index 0b59384d..a23a985d 100644 --- a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs +++ b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs @@ -48,7 +48,7 @@ impl GetLatestLakeSnapshotRequest { impl RequestBody for GetLatestLakeSnapshotRequest { type ResponseBody = proto::GetLatestLakeSnapshotResponse; - const API_KEY: ApiKey = ApiKey::GetLatestLakeSnapshot; + const API_KEY: ApiKey = ApiKey::GetLakeSnapshot; } impl_write_type!(GetLatestLakeSnapshotRequest); diff --git a/crates/fluss/src/rpc/message/get_producer_offsets.rs b/crates/fluss/src/rpc/message/get_producer_offsets.rs new file mode 100644 index 00000000..d2cd921d --- /dev/null +++ b/crates/fluss/src/rpc/message/get_producer_offsets.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetProducerOffsetsRequest { + pub inner_request: proto::GetProducerOffsetsRequest, +} + +impl GetProducerOffsetsRequest { + pub fn new(producer_id: &str) -> Self { + GetProducerOffsetsRequest { + inner_request: proto::GetProducerOffsetsRequest { + producer_id: producer_id.to_string(), + }, + } + } +} + +impl RequestBody for GetProducerOffsetsRequest { + type ResponseBody = proto::GetProducerOffsetsResponse; + const API_KEY: ApiKey = ApiKey::GetProducerOffsets; +} + +impl_write_type!(GetProducerOffsetsRequest); +impl_read_type!(proto::GetProducerOffsetsResponse); diff --git a/crates/fluss/src/rpc/message/get_table_stats.rs b/crates/fluss/src/rpc/message/get_table_stats.rs new file mode 100644 index 00000000..e8ea1165 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_table_stats.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::BucketStatsRequest; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetTableStatsRequest { + pub inner_request: proto::GetTableStatsRequest, +} + +impl GetTableStatsRequest { + pub fn new( + table_id: i64, + buckets_req: Vec, + target_columns: Vec, + ) -> Self { + GetTableStatsRequest { + inner_request: proto::GetTableStatsRequest { + table_id, + buckets_req: buckets_req.iter().map(BucketStatsRequest::to_pb).collect(), + target_columns, + }, + } + } +} + +impl RequestBody for GetTableStatsRequest { + type ResponseBody = proto::GetTableStatsResponse; + const API_KEY: ApiKey = ApiKey::GetTableStats; +} + +impl_write_type!(GetTableStatsRequest); +impl_read_type!(proto::GetTableStatsResponse); diff --git a/crates/fluss/src/rpc/message/list_acls.rs b/crates/fluss/src/rpc/message/list_acls.rs new file mode 100644 index 00000000..68c1177c --- /dev/null +++ b/crates/fluss/src/rpc/message/list_acls.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::AclFilter; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ListAclsRequest { + pub inner_request: proto::ListAclsRequest, +} + +impl ListAclsRequest { + pub fn new(acl_filter: AclFilter) -> Self { + ListAclsRequest { + inner_request: proto::ListAclsRequest { + acl_filter: acl_filter.to_pb(), + }, + } + } +} + +impl RequestBody for ListAclsRequest { + type ResponseBody = proto::ListAclsResponse; + const API_KEY: ApiKey = ApiKey::ListAcls; +} + +impl_write_type!(ListAclsRequest); +impl_read_type!(proto::ListAclsResponse); diff --git a/crates/fluss/src/rpc/message/list_database_summaries.rs b/crates/fluss/src/rpc/message/list_database_summaries.rs new file mode 100644 index 00000000..406233fc --- /dev/null +++ b/crates/fluss/src/rpc/message/list_database_summaries.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::WriteError; +use crate::rpc::message::{RequestBody, WriteType}; +use crate::{impl_write_type, proto}; +use bytes::BufMut; +use prost::Message; + +#[derive(Debug, Default)] +pub struct ListDatabaseSummariesRequest { + pub inner_request: proto::ListDatabasesRequest, +} + +impl ListDatabaseSummariesRequest { + pub fn new() -> Self { + ListDatabaseSummariesRequest { + inner_request: proto::ListDatabasesRequest { + include_summary: Some(true), + }, + } + } +} + +impl RequestBody for ListDatabaseSummariesRequest { + type ResponseBody = proto::ListDatabasesResponse; + const API_KEY: ApiKey = ApiKey::ListDatabases; +} + +impl_write_type!(ListDatabaseSummariesRequest); diff --git a/crates/fluss/src/rpc/message/list_databases.rs b/crates/fluss/src/rpc/message/list_databases.rs index 21e16400..74ca4944 100644 --- a/crates/fluss/src/rpc/message/list_databases.rs +++ b/crates/fluss/src/rpc/message/list_databases.rs @@ -32,7 +32,9 @@ pub struct ListDatabasesRequest { impl ListDatabasesRequest { pub fn new() -> Self { ListDatabasesRequest { - inner_request: proto::ListDatabasesRequest {}, + inner_request: proto::ListDatabasesRequest { + include_summary: None, + }, } } } diff --git a/crates/fluss/src/rpc/message/list_kv_snapshots.rs b/crates/fluss/src/rpc/message/list_kv_snapshots.rs new file mode 100644 index 00000000..ad603353 --- /dev/null +++ b/crates/fluss/src/rpc/message/list_kv_snapshots.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ListKvSnapshotsRequest { + pub inner_request: proto::ListKvSnapshotsRequest, +} + +impl ListKvSnapshotsRequest { + pub fn new(table_id: i64, partition_id: Option) -> Self { + ListKvSnapshotsRequest { + inner_request: proto::ListKvSnapshotsRequest { + table_id, + partition_id, + }, + } + } +} + +impl RequestBody for ListKvSnapshotsRequest { + type ResponseBody = proto::ListKvSnapshotsResponse; + const API_KEY: ApiKey = ApiKey::ListKvSnapshots; +} + +impl_write_type!(ListKvSnapshotsRequest); +impl_read_type!(proto::ListKvSnapshotsResponse); diff --git a/crates/fluss/src/rpc/message/list_rebalance_progress.rs b/crates/fluss/src/rpc/message/list_rebalance_progress.rs new file mode 100644 index 00000000..937ba207 --- /dev/null +++ b/crates/fluss/src/rpc/message/list_rebalance_progress.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ListRebalanceProgressRequest { + pub inner_request: proto::ListRebalanceProgressRequest, +} + +impl ListRebalanceProgressRequest { + pub fn new(rebalance_id: Option<&str>) -> Self { + ListRebalanceProgressRequest { + inner_request: proto::ListRebalanceProgressRequest { + rebalance_id: rebalance_id.map(|s| s.to_string()), + }, + } + } +} + +impl RequestBody for ListRebalanceProgressRequest { + type ResponseBody = proto::ListRebalanceProgressResponse; + const API_KEY: ApiKey = ApiKey::ListRebalanceProgress; +} + +impl_write_type!(ListRebalanceProgressRequest); +impl_read_type!(proto::ListRebalanceProgressResponse); diff --git a/crates/fluss/src/rpc/message/list_remote_log_manifests.rs b/crates/fluss/src/rpc/message/list_remote_log_manifests.rs new file mode 100644 index 00000000..047a0700 --- /dev/null +++ b/crates/fluss/src/rpc/message/list_remote_log_manifests.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ListRemoteLogManifestsRequest { + pub inner_request: proto::ListRemoteLogManifestsRequest, +} + +impl ListRemoteLogManifestsRequest { + pub fn new(table_id: i64, partition_id: Option) -> Self { + ListRemoteLogManifestsRequest { + inner_request: proto::ListRemoteLogManifestsRequest { + table_id, + partition_id, + }, + } + } +} + +impl RequestBody for ListRemoteLogManifestsRequest { + type ResponseBody = proto::ListRemoteLogManifestsResponse; + const API_KEY: ApiKey = ApiKey::ListRemoteLogManifests; +} + +impl_write_type!(ListRemoteLogManifestsRequest); +impl_read_type!(proto::ListRemoteLogManifestsResponse); diff --git a/crates/fluss/src/rpc/message/lookup.rs b/crates/fluss/src/rpc/message/lookup.rs index 200d4bc8..e205fa6b 100644 --- a/crates/fluss/src/rpc/message/lookup.rs +++ b/crates/fluss/src/rpc/message/lookup.rs @@ -42,7 +42,7 @@ impl LookupRequest { |(bucket_id, partition_id, keys)| proto::PbLookupReqForBucket { partition_id, bucket_id, - key: keys, + keys, }, ) .collect(); @@ -50,6 +50,9 @@ impl LookupRequest { let request = proto::LookupRequest { table_id, buckets_req, + insert_if_not_exists: None, + acks: None, + timeout_ms: None, }; Self { diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index 096066ed..aba690a5 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -19,62 +19,116 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use bytes::{Buf, BufMut}; +mod acquire_kv_snapshot_lease; +mod add_server_tag; +mod alter_cluster_configs; +mod alter_database; +mod alter_table; mod api_versions; mod authenticate; +mod cancel_rebalance; +mod create_acls; mod create_database; mod create_partition; mod create_table; mod database_exists; +mod delete_producer_offsets; +mod describe_cluster_configs; +mod drop_acls; mod drop_database; +mod drop_kv_snapshot_lease; mod drop_partition; mod drop_table; mod fetch; +mod get_cluster_health; mod get_database_info; +mod get_kv_snapshot_metadata; +mod get_lake_snapshot; +mod get_latest_kv_snapshots; mod get_latest_lake_snapshot; +mod get_producer_offsets; mod get_security_token; mod get_table; mod get_table_schema; +mod get_table_stats; mod header; mod init_writer; mod limit_scan; +mod list_acls; +mod list_database_summaries; mod list_databases; +mod list_kv_snapshots; mod list_offsets; mod list_partition_infos; +mod list_rebalance_progress; +mod list_remote_log_manifests; mod list_tables; mod lookup; mod prefix_lookup; mod produce_log; mod put_kv; +mod rebalance; +mod register_producer_offsets; +mod release_kv_snapshot_lease; +mod remove_server_tag; +mod scan_kv; mod table_exists; mod update_metadata; pub use crate::rpc::RpcError; +pub use acquire_kv_snapshot_lease::*; +pub use add_server_tag::*; +pub use alter_cluster_configs::*; +pub use alter_database::*; +pub use alter_table::*; pub use api_versions::*; pub use authenticate::*; +pub use cancel_rebalance::*; +pub use create_acls::*; pub use create_database::*; pub use create_partition::*; pub use create_table::*; pub use database_exists::*; +pub use delete_producer_offsets::*; +pub use describe_cluster_configs::*; +pub use drop_acls::*; pub use drop_database::*; +pub use drop_kv_snapshot_lease::*; pub use drop_partition::*; pub use drop_table::*; pub use fetch::*; +pub use get_cluster_health::*; pub use get_database_info::*; +pub use get_kv_snapshot_metadata::*; +pub use get_lake_snapshot::*; +pub use get_latest_kv_snapshots::*; pub use get_latest_lake_snapshot::*; +pub use get_producer_offsets::*; pub use get_security_token::*; pub use get_table::*; pub use get_table_schema::*; +pub use get_table_stats::*; pub use header::*; pub use init_writer::*; pub use limit_scan::*; +pub use list_acls::*; +pub use list_database_summaries::*; pub use list_databases::*; +pub use list_kv_snapshots::*; pub use list_offsets::*; pub use list_partition_infos::*; +pub use list_rebalance_progress::*; +pub use list_remote_log_manifests::*; pub use list_tables::*; pub use lookup::*; pub use prefix_lookup::*; pub use produce_log::*; pub use put_kv::*; +pub use rebalance::*; +pub use register_producer_offsets::*; +pub use release_kv_snapshot_lease::*; +pub use remove_server_tag::*; +pub use scan_kv::*; pub use table_exists::*; pub use update_metadata::*; diff --git a/crates/fluss/src/rpc/message/rebalance.rs b/crates/fluss/src/rpc/message/rebalance.rs new file mode 100644 index 00000000..7b96265b --- /dev/null +++ b/crates/fluss/src/rpc/message/rebalance.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct RebalanceRequest { + pub inner_request: proto::RebalanceRequest, +} + +impl RebalanceRequest { + pub fn new(goals: Vec) -> Self { + RebalanceRequest { + inner_request: proto::RebalanceRequest { goals }, + } + } +} + +impl RequestBody for RebalanceRequest { + type ResponseBody = proto::RebalanceResponse; + const API_KEY: ApiKey = ApiKey::Rebalance; +} + +impl_write_type!(RebalanceRequest); +impl_read_type!(proto::RebalanceResponse); diff --git a/crates/fluss/src/rpc/message/register_producer_offsets.rs b/crates/fluss/src/rpc/message/register_producer_offsets.rs new file mode 100644 index 00000000..93e71ac3 --- /dev/null +++ b/crates/fluss/src/rpc/message/register_producer_offsets.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::ProducerTableOffsets; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct RegisterProducerOffsetsRequest { + pub inner_request: proto::RegisterProducerOffsetsRequest, +} + +impl RegisterProducerOffsetsRequest { + pub fn new( + producer_id: &str, + table_offsets: Vec, + ttl_ms: Option, + ) -> Self { + RegisterProducerOffsetsRequest { + inner_request: proto::RegisterProducerOffsetsRequest { + producer_id: producer_id.to_string(), + table_offsets: table_offsets + .iter() + .map(ProducerTableOffsets::to_pb) + .collect(), + ttl_ms, + }, + } + } +} + +impl RequestBody for RegisterProducerOffsetsRequest { + type ResponseBody = proto::RegisterProducerOffsetsResponse; + const API_KEY: ApiKey = ApiKey::RegisterProducerOffsets; +} + +impl_write_type!(RegisterProducerOffsetsRequest); +impl_read_type!(proto::RegisterProducerOffsetsResponse); diff --git a/crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs b/crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs new file mode 100644 index 00000000..c9246232 --- /dev/null +++ b/crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TableBucket; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ReleaseKvSnapshotLeaseRequest { + pub inner_request: proto::ReleaseKvSnapshotLeaseRequest, +} + +impl ReleaseKvSnapshotLeaseRequest { + pub fn new(lease_id: &str, buckets_to_release: Vec) -> Self { + ReleaseKvSnapshotLeaseRequest { + inner_request: proto::ReleaseKvSnapshotLeaseRequest { + lease_id: lease_id.to_string(), + buckets_to_release: buckets_to_release.iter().map(TableBucket::to_pb).collect(), + }, + } + } +} + +impl RequestBody for ReleaseKvSnapshotLeaseRequest { + type ResponseBody = proto::ReleaseKvSnapshotLeaseResponse; + const API_KEY: ApiKey = ApiKey::ReleaseKvSnapshotLease; +} + +impl_write_type!(ReleaseKvSnapshotLeaseRequest); +impl_read_type!(proto::ReleaseKvSnapshotLeaseResponse); diff --git a/crates/fluss/src/rpc/message/remove_server_tag.rs b/crates/fluss/src/rpc/message/remove_server_tag.rs new file mode 100644 index 00000000..db7b4c63 --- /dev/null +++ b/crates/fluss/src/rpc/message/remove_server_tag.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct RemoveServerTagRequest { + pub inner_request: proto::RemoveServerTagRequest, +} + +impl RemoveServerTagRequest { + pub fn new(server_ids: Vec, server_tag: i32) -> Self { + RemoveServerTagRequest { + inner_request: proto::RemoveServerTagRequest { + server_ids, + server_tag, + }, + } + } +} + +impl RequestBody for RemoveServerTagRequest { + type ResponseBody = proto::RemoveServerTagResponse; + const API_KEY: ApiKey = ApiKey::RemoveServerTag; +} + +impl_write_type!(RemoveServerTagRequest); +impl_read_type!(proto::RemoveServerTagResponse); diff --git a/crates/fluss/src/rpc/message/scan_kv.rs b/crates/fluss/src/rpc/message/scan_kv.rs new file mode 100644 index 00000000..7e9a05eb --- /dev/null +++ b/crates/fluss/src/rpc/message/scan_kv.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ScanKvRequest { + pub inner_request: proto::ScanKvRequest, +} + +impl ScanKvRequest { + pub fn new( + scanner_id: Option>, + bucket_scan_req: Option, + call_seq_id: Option, + batch_size_bytes: Option, + close_scanner: Option, + ) -> Self { + ScanKvRequest { + inner_request: proto::ScanKvRequest { + scanner_id, + bucket_scan_req, + call_seq_id, + batch_size_bytes, + close_scanner, + }, + } + } +} + +impl RequestBody for ScanKvRequest { + type ResponseBody = proto::ScanKvResponse; + const API_KEY: ApiKey = ApiKey::ScanKv; +} + +impl_write_type!(ScanKvRequest); +impl_read_type!(proto::ScanKvResponse);