diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index e27392f1..fe74f3b6 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -239,7 +239,7 @@ impl Metadata { Ok(self.get_cluster().get_partition_id(physical_table_path)) } - pub async fn get_connection(&self, server_node: &ServerNode) -> Result { + pub(crate) async fn get_connection(&self, server_node: &ServerNode) -> Result { let result = self.connections.get_connection(server_node).await?; Ok(result) } diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index fab0d277..fbad7bd9 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -146,6 +146,6 @@ pub type TableId = i64; pub type PartitionId = i64; pub type BucketId = i32; -pub mod proto { +pub(crate) mod proto { include!(concat!(env!("OUT_DIR"), "/proto.rs")); } diff --git a/crates/fluss/src/metadata/acl.rs b/crates/fluss/src/metadata/acl.rs new file mode 100644 index 00000000..84ec97af --- /dev/null +++ b/crates/fluss/src/metadata/acl.rs @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use crate::proto::{PbAclFilter, PbAclInfo}; + +/// Mirrors Java `org.apache.fluss.security.acl.ResourceType`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ResourceType { + Any, + Cluster, + Database, + Table, +} + +impl ResourceType { + pub fn to_i32(self) -> i32 { + match self { + Self::Any => 1, + Self::Cluster => 2, + Self::Database => 3, + Self::Table => 4, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 1 => Ok(Self::Any), + 2 => Ok(Self::Cluster), + 3 => Ok(Self::Database), + 4 => Ok(Self::Table), + _ => Err(Error::IllegalArgument { + message: format!("Unknown resource type code: {value}"), + }), + } + } +} + +/// Mirrors Java `org.apache.fluss.security.acl.OperationType`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OperationType { + Any, + All, + Read, + Write, + Create, + Drop, + Alter, + Describe, +} + +impl OperationType { + pub fn to_i32(self) -> i32 { + match self { + Self::Any => 1, + Self::All => 2, + Self::Read => 3, + Self::Write => 4, + Self::Create => 5, + Self::Drop => 6, + Self::Alter => 7, + Self::Describe => 8, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 1 => Ok(Self::Any), + 2 => Ok(Self::All), + 3 => Ok(Self::Read), + 4 => Ok(Self::Write), + 5 => Ok(Self::Create), + 6 => Ok(Self::Drop), + 7 => Ok(Self::Alter), + 8 => Ok(Self::Describe), + _ => Err(Error::IllegalArgument { + message: format!("Unknown operation type code: {value}"), + }), + } + } +} + +/// Mirrors Java `org.apache.fluss.security.acl.PermissionType`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PermissionType { + Any, + Allow, +} + +impl PermissionType { + pub fn to_i32(self) -> i32 { + match self { + Self::Any => 1, + Self::Allow => 2, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 1 => Ok(Self::Any), + 2 => Ok(Self::Allow), + _ => Err(Error::IllegalArgument { + message: format!("Unknown permission type: {value}"), + }), + } + } +} + +/// Mirrors Java `org.apache.fluss.security.acl.AclBinding` (concrete ACL entry). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AclInfo { + pub resource_name: String, + pub resource_type: ResourceType, + pub principal_name: String, + pub principal_type: String, + pub host: String, + pub operation_type: OperationType, + pub permission_type: PermissionType, +} + +impl AclInfo { + pub fn to_pb(&self) -> PbAclInfo { + PbAclInfo { + resource_name: self.resource_name.clone(), + resource_type: self.resource_type.to_i32(), + principal_name: self.principal_name.clone(), + principal_type: self.principal_type.clone(), + host: self.host.clone(), + operation_type: self.operation_type.to_i32(), + permission_type: self.permission_type.to_i32(), + } + } + + pub fn from_pb(pb: &PbAclInfo) -> Result { + Ok(Self { + resource_name: pb.resource_name.clone(), + resource_type: ResourceType::try_from_i32(pb.resource_type)?, + principal_name: pb.principal_name.clone(), + principal_type: pb.principal_type.clone(), + host: pb.host.clone(), + operation_type: OperationType::try_from_i32(pb.operation_type)?, + permission_type: PermissionType::try_from_i32(pb.permission_type)?, + }) + } +} + +/// Mirrors Java `org.apache.fluss.security.acl.AclBindingFilter` — like `AclInfo` but with +/// optional name/principal/host fields for partial matching. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AclFilter { + pub resource_name: Option, + pub resource_type: ResourceType, + pub principal_name: Option, + pub principal_type: Option, + pub host: Option, + pub operation_type: OperationType, + pub permission_type: PermissionType, +} + +impl AclFilter { + pub fn to_pb(&self) -> PbAclFilter { + PbAclFilter { + resource_name: self.resource_name.clone(), + resource_type: self.resource_type.to_i32(), + principal_name: self.principal_name.clone(), + principal_type: self.principal_type.clone(), + host: self.host.clone(), + operation_type: self.operation_type.to_i32(), + permission_type: self.permission_type.to_i32(), + } + } + + pub fn from_pb(pb: &PbAclFilter) -> Result { + Ok(Self { + resource_name: pb.resource_name.clone(), + resource_type: ResourceType::try_from_i32(pb.resource_type)?, + principal_name: pb.principal_name.clone(), + principal_type: pb.principal_type.clone(), + host: pb.host.clone(), + operation_type: OperationType::try_from_i32(pb.operation_type)?, + permission_type: PermissionType::try_from_i32(pb.permission_type)?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_resource_type_roundtrip() { + for v in [ + ResourceType::Any, + ResourceType::Cluster, + ResourceType::Database, + ResourceType::Table, + ] { + assert_eq!(ResourceType::try_from_i32(v.to_i32()).unwrap(), v); + } + assert!(ResourceType::try_from_i32(0).is_err()); + } + + #[test] + fn test_operation_type_roundtrip() { + for v in [ + OperationType::Any, + OperationType::All, + OperationType::Read, + OperationType::Write, + OperationType::Create, + OperationType::Drop, + OperationType::Alter, + OperationType::Describe, + ] { + assert_eq!(OperationType::try_from_i32(v.to_i32()).unwrap(), v); + } + assert!(OperationType::try_from_i32(0).is_err()); + } + + #[test] + fn test_permission_type_roundtrip() { + for v in [PermissionType::Any, PermissionType::Allow] { + assert_eq!(PermissionType::try_from_i32(v.to_i32()).unwrap(), v); + } + assert!(PermissionType::try_from_i32(0).is_err()); + } + + #[test] + fn test_acl_info_pb_roundtrip() { + let original = AclInfo { + resource_name: "topic-a".to_string(), + resource_type: ResourceType::Table, + principal_name: "alice".to_string(), + principal_type: "User".to_string(), + host: "*".to_string(), + operation_type: OperationType::Read, + permission_type: PermissionType::Allow, + }; + let pb = original.to_pb(); + assert_eq!(AclInfo::from_pb(&pb).unwrap(), original); + } + + #[test] + fn test_acl_filter_pb_roundtrip() { + let original = AclFilter { + resource_name: None, + resource_type: ResourceType::Any, + principal_name: Some("alice".to_string()), + principal_type: None, + host: Some("*".to_string()), + operation_type: OperationType::Any, + permission_type: PermissionType::Any, + }; + let pb = original.to_pb(); + assert_eq!(AclFilter::from_pb(&pb).unwrap(), original); + } +} diff --git a/crates/fluss/src/metadata/config.rs b/crates/fluss/src/metadata/config.rs new file mode 100644 index 00000000..4e045c45 --- /dev/null +++ b/crates/fluss/src/metadata/config.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use crate::proto::PbAlterConfig; + +/// Mirrors Java `org.apache.fluss.config.cluster.AlterConfigOpType`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AlterConfigOpType { + Set, + Delete, + Append, + Subtract, +} + +impl AlterConfigOpType { + pub fn to_i32(self) -> i32 { + match self { + Self::Set => 0, + Self::Delete => 1, + Self::Append => 2, + Self::Subtract => 3, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::Set), + 1 => Ok(Self::Delete), + 2 => Ok(Self::Append), + 3 => Ok(Self::Subtract), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported AlterConfigOpType: {value}"), + }), + } + } +} + +/// Mirrors Java `org.apache.fluss.config.cluster.AlterConfig`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AlterConfig { + pub config_key: String, + pub config_value: Option, + pub op_type: AlterConfigOpType, +} + +impl AlterConfig { + pub fn new>( + config_key: K, + config_value: Option, + op_type: AlterConfigOpType, + ) -> Self { + Self { + config_key: config_key.into(), + config_value, + op_type, + } + } + + pub fn to_pb(&self) -> PbAlterConfig { + PbAlterConfig { + config_key: self.config_key.clone(), + config_value: self.config_value.clone(), + op_type: self.op_type.to_i32(), + } + } + + pub fn from_pb(pb: &PbAlterConfig) -> Result { + Ok(Self { + config_key: pb.config_key.clone(), + config_value: pb.config_value.clone(), + op_type: AlterConfigOpType::try_from_i32(pb.op_type)?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_alter_config_op_type_roundtrip() { + for op in [ + AlterConfigOpType::Set, + AlterConfigOpType::Delete, + AlterConfigOpType::Append, + AlterConfigOpType::Subtract, + ] { + assert_eq!(AlterConfigOpType::try_from_i32(op.to_i32()).unwrap(), op); + } + } + + #[test] + fn test_alter_config_op_type_unknown() { + assert!(AlterConfigOpType::try_from_i32(99).is_err()); + } + + #[test] + fn test_alter_config_pb_roundtrip() { + let cases = [ + AlterConfig::new("foo", Some("bar".to_string()), AlterConfigOpType::Set), + AlterConfig::new("baz", None, AlterConfigOpType::Delete), + ]; + for original in cases { + let pb = original.to_pb(); + let restored = AlterConfig::from_pb(&pb).unwrap(); + assert_eq!(original, restored); + } + } +} diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index c1d1b72c..28cda723 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +mod acl; +mod config; mod data_lake_format; mod database; mod datatype; @@ -22,7 +24,11 @@ mod json_serde; mod partition; mod schema_util; mod table; +mod table_change; +mod table_stats; +pub use acl::*; +pub use config::*; pub use data_lake_format::*; pub use database::*; pub use datatype::*; @@ -30,3 +36,5 @@ pub use json_serde::*; pub use partition::*; pub(crate) use schema_util::{UNEXIST_MAPPING, index_mapping}; pub use table::*; +pub use table_change::*; +pub use table_stats::*; diff --git a/crates/fluss/src/metadata/table_change.rs b/crates/fluss/src/metadata/table_change.rs new file mode 100644 index 00000000..2ecf6cfa --- /dev/null +++ b/crates/fluss/src/metadata/table_change.rs @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use crate::proto::{PbAddColumn, PbDropColumn, PbModifyColumn, PbRenameColumn}; + +/// Mirrors Java `org.apache.fluss.config.cluster.ColumnPositionType`. +/// +/// The Java server today only handles `Last`; `First` and `After` are reserved by +/// the proto schema (`column_position_type = 0=LAST, 1=FIRST, 3=AFTER`) for future +/// use. Sending other variants will be rejected by the server. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ColumnPositionType { + Last, +} + +impl ColumnPositionType { + pub fn to_i32(self) -> i32 { + match self { + Self::Last => 0, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::Last), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported ColumnPositionType: {value}"), + }), + } + } +} + +/// Add a column. Mirrors the `AddColumn` variant of Java `TableChange`. +/// +/// `data_type_json` carries the column's `DataType` already serialized to its JSON +/// representation (matching the Java client, which transports `DataType` as JSON over +/// the wire). Callers can produce it via `DataType::serialize_json` + `serde_json::to_vec`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AddColumn { + pub column_name: String, + pub data_type_json: Vec, + pub comment: Option, + pub position: ColumnPositionType, +} + +impl AddColumn { + pub fn to_pb(&self) -> PbAddColumn { + PbAddColumn { + column_name: self.column_name.clone(), + data_type_json: self.data_type_json.clone(), + comment: self.comment.clone(), + column_position_type: self.position.to_i32(), + } + } + + pub fn from_pb(pb: &PbAddColumn) -> Result { + Ok(Self { + column_name: pb.column_name.clone(), + data_type_json: pb.data_type_json.clone(), + comment: pb.comment.clone(), + position: ColumnPositionType::try_from_i32(pb.column_position_type)?, + }) + } +} + +/// Drop a column. Mirrors the `DropColumn` variant of Java `TableChange`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DropColumn { + pub column_name: String, +} + +impl DropColumn { + pub fn to_pb(&self) -> PbDropColumn { + PbDropColumn { + column_name: self.column_name.clone(), + } + } + + pub fn from_pb(pb: &PbDropColumn) -> Self { + Self { + column_name: pb.column_name.clone(), + } + } +} + +/// Rename a column. Mirrors the `RenameColumn` variant of Java `TableChange`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RenameColumn { + pub old_column_name: String, + pub new_column_name: String, +} + +impl RenameColumn { + pub fn to_pb(&self) -> PbRenameColumn { + PbRenameColumn { + old_column_name: self.old_column_name.clone(), + new_column_name: self.new_column_name.clone(), + } + } + + pub fn from_pb(pb: &PbRenameColumn) -> Self { + Self { + old_column_name: pb.old_column_name.clone(), + new_column_name: pb.new_column_name.clone(), + } + } +} + +/// Modify a column's type/comment/position. Mirrors the `ModifyColumn` variant of +/// Java `TableChange`. All fields except `column_name` are optional — only the +/// non-`None` ones are applied. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ModifyColumn { + pub column_name: String, + pub data_type_json: Option>, + pub comment: Option, + pub position: Option, +} + +impl ModifyColumn { + pub fn to_pb(&self) -> PbModifyColumn { + PbModifyColumn { + column_name: self.column_name.clone(), + data_type_json: self.data_type_json.clone(), + comment: self.comment.clone(), + column_position_type: self.position.map(|p| p.to_i32()), + } + } + + pub fn from_pb(pb: &PbModifyColumn) -> Result { + Ok(Self { + column_name: pb.column_name.clone(), + data_type_json: pb.data_type_json.clone(), + comment: pb.comment.clone(), + position: pb + .column_position_type + .map(ColumnPositionType::try_from_i32) + .transpose()?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_column_position_type_roundtrip() { + assert_eq!( + ColumnPositionType::try_from_i32(ColumnPositionType::Last.to_i32()).unwrap(), + ColumnPositionType::Last + ); + assert!(ColumnPositionType::try_from_i32(1).is_err()); + assert!(ColumnPositionType::try_from_i32(3).is_err()); + } + + #[test] + fn test_add_column_pb_roundtrip() { + let original = AddColumn { + column_name: "amount".to_string(), + data_type_json: b"{\"type\":\"BIGINT\"}".to_vec(), + comment: Some("the amount".to_string()), + position: ColumnPositionType::Last, + }; + let pb = original.to_pb(); + assert_eq!(AddColumn::from_pb(&pb).unwrap(), original); + } + + #[test] + fn test_drop_column_pb_roundtrip() { + let original = DropColumn { + column_name: "obsolete".to_string(), + }; + let pb = original.to_pb(); + assert_eq!(DropColumn::from_pb(&pb), original); + } + + #[test] + fn test_rename_column_pb_roundtrip() { + let original = RenameColumn { + old_column_name: "old_name".to_string(), + new_column_name: "new_name".to_string(), + }; + let pb = original.to_pb(); + assert_eq!(RenameColumn::from_pb(&pb), original); + } + + #[test] + fn test_modify_column_pb_roundtrip_full() { + let original = ModifyColumn { + column_name: "x".to_string(), + data_type_json: Some(b"{\"type\":\"DOUBLE\"}".to_vec()), + comment: Some("changed".to_string()), + position: Some(ColumnPositionType::Last), + }; + let pb = original.to_pb(); + assert_eq!(ModifyColumn::from_pb(&pb).unwrap(), original); + } + + #[test] + fn test_modify_column_pb_roundtrip_empty() { + let original = ModifyColumn { + column_name: "x".to_string(), + data_type_json: None, + comment: None, + position: None, + }; + let pb = original.to_pb(); + assert_eq!(ModifyColumn::from_pb(&pb).unwrap(), original); + } +} diff --git a/crates/fluss/src/metadata/table_stats.rs b/crates/fluss/src/metadata/table_stats.rs new file mode 100644 index 00000000..ad2d1fc2 --- /dev/null +++ b/crates/fluss/src/metadata/table_stats.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::PbTableStatsReqForBucket; + +/// Per-bucket request item for `GetTableStats`. +/// Mirrors the bucket-stats request shape used by the Java client. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketStatsRequest { + pub partition_id: Option, + pub bucket_id: i32, +} + +impl BucketStatsRequest { + pub fn new(partition_id: Option, bucket_id: i32) -> Self { + Self { + partition_id, + bucket_id, + } + } + + pub fn to_pb(&self) -> PbTableStatsReqForBucket { + PbTableStatsReqForBucket { + partition_id: self.partition_id, + bucket_id: self.bucket_id, + } + } + + pub fn from_pb(pb: &PbTableStatsReqForBucket) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bucket_stats_request_pb_roundtrip() { + for req in [ + BucketStatsRequest::new(None, 0), + BucketStatsRequest::new(Some(42), 7), + ] { + let pb = req.to_pb(); + assert_eq!(BucketStatsRequest::from_pb(&pb), req); + } + } +} diff --git a/crates/fluss/src/rpc/convert.rs b/crates/fluss/src/rpc/convert.rs index 441645c2..246c6418 100644 --- a/crates/fluss/src/rpc/convert.rs +++ b/crates/fluss/src/rpc/convert.rs @@ -19,14 +19,14 @@ use crate::cluster::{ServerNode, ServerType}; use crate::metadata::TablePath; use crate::proto::{PbServerNode, PbTablePath}; -pub fn to_table_path(table_path: &TablePath) -> PbTablePath { +pub(crate) fn to_table_path(table_path: &TablePath) -> PbTablePath { PbTablePath { database_name: table_path.database().to_string(), table_name: table_path.table().to_string(), } } -pub fn from_pb_server_node(pb_server_node: PbServerNode, server_type: ServerType) -> ServerNode { +pub(crate) fn from_pb_server_node(pb_server_node: PbServerNode, server_type: ServerType) -> ServerNode { ServerNode::new( pb_server_node.node_id, pb_server_node.host, @@ -35,7 +35,7 @@ pub fn from_pb_server_node(pb_server_node: PbServerNode, server_type: ServerType ) } -pub fn from_pb_table_path(pb_table_path: &PbTablePath) -> TablePath { +pub(crate) fn from_pb_table_path(pb_table_path: &PbTablePath) -> TablePath { TablePath::new( pb_table_path.database_name.to_string(), pb_table_path.table_name.to_string(), diff --git a/crates/fluss/src/rpc/message/alter_cluster_configs.rs b/crates/fluss/src/rpc/message/alter_cluster_configs.rs new file mode 100644 index 00000000..6f14d261 --- /dev/null +++ b/crates/fluss/src/rpc/message/alter_cluster_configs.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::AlterConfig; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct AlterClusterConfigsRequest { + pub(crate) inner_request: proto::AlterClusterConfigsRequest, +} + +impl AlterClusterConfigsRequest { + pub fn new(alter_configs: Vec) -> Self { + AlterClusterConfigsRequest { + inner_request: proto::AlterClusterConfigsRequest { + alter_configs: alter_configs.iter().map(AlterConfig::to_pb).collect(), + }, + } + } +} + +impl RequestBody for AlterClusterConfigsRequest { + type ResponseBody = proto::AlterClusterConfigsResponse; + const API_KEY: ApiKey = ApiKey::AlterClusterConfigs; +} + +impl_write_type!(AlterClusterConfigsRequest); +impl_read_type!(proto::AlterClusterConfigsResponse); diff --git a/crates/fluss/src/rpc/message/alter_database.rs b/crates/fluss/src/rpc/message/alter_database.rs new file mode 100644 index 00000000..3eb08163 --- /dev/null +++ b/crates/fluss/src/rpc/message/alter_database.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::AlterConfig; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct AlterDatabaseRequest { + pub(crate) inner_request: proto::AlterDatabaseRequest, +} + +impl AlterDatabaseRequest { + pub fn new( + database_name: &str, + ignore_if_not_exists: bool, + config_changes: Vec, + ) -> Self { + AlterDatabaseRequest { + inner_request: proto::AlterDatabaseRequest { + database_name: database_name.to_string(), + ignore_if_not_exists, + config_changes: config_changes.iter().map(AlterConfig::to_pb).collect(), + comment: None, + }, + } + } +} + +impl RequestBody for AlterDatabaseRequest { + type ResponseBody = proto::AlterDatabaseResponse; + const API_KEY: ApiKey = ApiKey::AlterDatabase; +} + +impl_write_type!(AlterDatabaseRequest); +impl_read_type!(proto::AlterDatabaseResponse); diff --git a/crates/fluss/src/rpc/message/alter_table.rs b/crates/fluss/src/rpc/message/alter_table.rs new file mode 100644 index 00000000..64e9ad09 --- /dev/null +++ b/crates/fluss/src/rpc/message/alter_table.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::{AddColumn, AlterConfig, DropColumn, ModifyColumn, RenameColumn, TablePath}; +use crate::rpc::api_key::ApiKey; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct AlterTableRequest { + pub(crate) inner_request: proto::AlterTableRequest, +} + +impl AlterTableRequest { + pub fn new( + table_path: &TablePath, + ignore_if_not_exists: bool, + config_changes: Vec, + add_columns: Vec, + drop_columns: Vec, + rename_columns: Vec, + modify_columns: Vec, + ) -> Self { + AlterTableRequest { + inner_request: proto::AlterTableRequest { + table_path: to_table_path(table_path), + ignore_if_not_exists, + config_changes: config_changes.iter().map(AlterConfig::to_pb).collect(), + add_columns: add_columns.iter().map(AddColumn::to_pb).collect(), + drop_columns: drop_columns.iter().map(DropColumn::to_pb).collect(), + rename_columns: rename_columns.iter().map(RenameColumn::to_pb).collect(), + modify_columns: modify_columns.iter().map(ModifyColumn::to_pb).collect(), + }, + } + } +} + +impl RequestBody for AlterTableRequest { + type ResponseBody = proto::AlterTableResponse; + const API_KEY: ApiKey = ApiKey::AlterTable; +} + +impl_write_type!(AlterTableRequest); +impl_read_type!(proto::AlterTableResponse); diff --git a/crates/fluss/src/rpc/message/api_versions.rs b/crates/fluss/src/rpc/message/api_versions.rs index 579c66a7..42139a3a 100644 --- a/crates/fluss/src/rpc/message/api_versions.rs +++ b/crates/fluss/src/rpc/message/api_versions.rs @@ -27,7 +27,7 @@ use prost::Message; #[derive(Debug, Clone)] pub struct ApiVersionsRequest { - pub inner_request: ProtoApiVersionsRequest, + pub(crate) inner_request: ProtoApiVersionsRequest, } impl ApiVersionsRequest { diff --git a/crates/fluss/src/rpc/message/authenticate.rs b/crates/fluss/src/rpc/message/authenticate.rs index 1874b304..dca76256 100644 --- a/crates/fluss/src/rpc/message/authenticate.rs +++ b/crates/fluss/src/rpc/message/authenticate.rs @@ -25,7 +25,7 @@ use prost::Message; #[derive(Debug, Clone)] pub struct AuthenticateRequest { - pub inner_request: ProtoAuthenticateRequest, + pub(crate) inner_request: ProtoAuthenticateRequest, } impl AuthenticateRequest { diff --git a/crates/fluss/src/rpc/message/create_acls.rs b/crates/fluss/src/rpc/message/create_acls.rs new file mode 100644 index 00000000..011f76f2 --- /dev/null +++ b/crates/fluss/src/rpc/message/create_acls.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::AclInfo; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct CreateAclsRequest { + pub(crate) inner_request: proto::CreateAclsRequest, +} + +impl CreateAclsRequest { + pub fn new(acl: Vec) -> Self { + CreateAclsRequest { + inner_request: proto::CreateAclsRequest { + acl: acl.iter().map(AclInfo::to_pb).collect(), + }, + } + } +} + +impl RequestBody for CreateAclsRequest { + type ResponseBody = proto::CreateAclsResponse; + const API_KEY: ApiKey = ApiKey::CreateAcls; +} + +impl_write_type!(CreateAclsRequest); +impl_read_type!(proto::CreateAclsResponse); diff --git a/crates/fluss/src/rpc/message/create_database.rs b/crates/fluss/src/rpc/message/create_database.rs index ed0868da..6fb982b1 100644 --- a/crates/fluss/src/rpc/message/create_database.rs +++ b/crates/fluss/src/rpc/message/create_database.rs @@ -30,7 +30,7 @@ use prost::Message; #[derive(Debug)] pub struct CreateDatabaseRequest { - pub inner_request: proto::CreateDatabaseRequest, + pub(crate) inner_request: proto::CreateDatabaseRequest, } impl CreateDatabaseRequest { diff --git a/crates/fluss/src/rpc/message/create_partition.rs b/crates/fluss/src/rpc/message/create_partition.rs index 1646f333..32dbced1 100644 --- a/crates/fluss/src/rpc/message/create_partition.rs +++ b/crates/fluss/src/rpc/message/create_partition.rs @@ -27,7 +27,7 @@ use prost::Message; #[derive(Debug)] pub struct CreatePartitionRequest { - pub inner_request: proto::CreatePartitionRequest, + pub(crate) inner_request: proto::CreatePartitionRequest, } impl CreatePartitionRequest { diff --git a/crates/fluss/src/rpc/message/create_table.rs b/crates/fluss/src/rpc/message/create_table.rs index 4647fec6..3189c14f 100644 --- a/crates/fluss/src/rpc/message/create_table.rs +++ b/crates/fluss/src/rpc/message/create_table.rs @@ -31,7 +31,7 @@ use prost::Message; #[derive(Debug)] pub struct CreateTableRequest { - pub inner_request: proto::CreateTableRequest, + pub(crate) inner_request: proto::CreateTableRequest, } impl CreateTableRequest { diff --git a/crates/fluss/src/rpc/message/database_exists.rs b/crates/fluss/src/rpc/message/database_exists.rs index 4a9588a2..296d1560 100644 --- a/crates/fluss/src/rpc/message/database_exists.rs +++ b/crates/fluss/src/rpc/message/database_exists.rs @@ -26,7 +26,7 @@ use prost::Message; #[derive(Debug)] pub struct DatabaseExistsRequest { - pub inner_request: proto::DatabaseExistsRequest, + pub(crate) inner_request: proto::DatabaseExistsRequest, } impl DatabaseExistsRequest { diff --git a/crates/fluss/src/rpc/message/describe_cluster_configs.rs b/crates/fluss/src/rpc/message/describe_cluster_configs.rs new file mode 100644 index 00000000..8e6ea4e2 --- /dev/null +++ b/crates/fluss/src/rpc/message/describe_cluster_configs.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct DescribeClusterConfigsRequest { + pub(crate) inner_request: proto::DescribeClusterConfigsRequest, +} + +impl DescribeClusterConfigsRequest { + pub fn new() -> Self { + DescribeClusterConfigsRequest { + inner_request: proto::DescribeClusterConfigsRequest {}, + } + } +} + +impl RequestBody for DescribeClusterConfigsRequest { + type ResponseBody = proto::DescribeClusterConfigsResponse; + const API_KEY: ApiKey = ApiKey::DescribeClusterConfigs; +} + +impl_write_type!(DescribeClusterConfigsRequest); +impl_read_type!(proto::DescribeClusterConfigsResponse); diff --git a/crates/fluss/src/rpc/message/drop_acls.rs b/crates/fluss/src/rpc/message/drop_acls.rs new file mode 100644 index 00000000..47068108 --- /dev/null +++ b/crates/fluss/src/rpc/message/drop_acls.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::AclFilter; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct DropAclsRequest { + pub(crate) inner_request: proto::DropAclsRequest, +} + +impl DropAclsRequest { + pub fn new(acl_filter: Vec) -> Self { + DropAclsRequest { + inner_request: proto::DropAclsRequest { + acl_filter: acl_filter.iter().map(AclFilter::to_pb).collect(), + }, + } + } +} + +impl RequestBody for DropAclsRequest { + type ResponseBody = proto::DropAclsResponse; + const API_KEY: ApiKey = ApiKey::DropAcls; +} + +impl_write_type!(DropAclsRequest); +impl_read_type!(proto::DropAclsResponse); diff --git a/crates/fluss/src/rpc/message/drop_database.rs b/crates/fluss/src/rpc/message/drop_database.rs index bf7477f3..8a777dfd 100644 --- a/crates/fluss/src/rpc/message/drop_database.rs +++ b/crates/fluss/src/rpc/message/drop_database.rs @@ -26,7 +26,7 @@ use prost::Message; #[derive(Debug)] pub struct DropDatabaseRequest { - pub inner_request: proto::DropDatabaseRequest, + pub(crate) inner_request: proto::DropDatabaseRequest, } impl DropDatabaseRequest { diff --git a/crates/fluss/src/rpc/message/drop_partition.rs b/crates/fluss/src/rpc/message/drop_partition.rs index c7494acb..699c4562 100644 --- a/crates/fluss/src/rpc/message/drop_partition.rs +++ b/crates/fluss/src/rpc/message/drop_partition.rs @@ -27,7 +27,7 @@ use prost::Message; #[derive(Debug)] pub struct DropPartitionRequest { - pub inner_request: proto::DropPartitionRequest, + pub(crate) inner_request: proto::DropPartitionRequest, } impl DropPartitionRequest { diff --git a/crates/fluss/src/rpc/message/drop_table.rs b/crates/fluss/src/rpc/message/drop_table.rs index b452cf07..fcc9c2e4 100644 --- a/crates/fluss/src/rpc/message/drop_table.rs +++ b/crates/fluss/src/rpc/message/drop_table.rs @@ -31,7 +31,7 @@ use prost::Message; #[derive(Debug)] pub struct DropTableRequest { - pub inner_request: proto::DropTableRequest, + pub(crate) inner_request: proto::DropTableRequest, } impl DropTableRequest { diff --git a/crates/fluss/src/rpc/message/fetch.rs b/crates/fluss/src/rpc/message/fetch.rs index 67930f84..ed6a6a4d 100644 --- a/crates/fluss/src/rpc/message/fetch.rs +++ b/crates/fluss/src/rpc/message/fetch.rs @@ -34,11 +34,11 @@ const LOG_FETCH_MIN_BYTES: i32 = 1; const LOG_FETCH_WAIT_MAX_TIME: i32 = 500; pub struct FetchLogRequest { - pub inner_request: proto::FetchLogRequest, + pub(crate) inner_request: proto::FetchLogRequest, } impl FetchLogRequest { - pub fn new(fetch_log_request: proto::FetchLogRequest) -> Self { + pub(crate) fn new(fetch_log_request: proto::FetchLogRequest) -> Self { Self { inner_request: fetch_log_request, } diff --git a/crates/fluss/src/rpc/message/get_database_info.rs b/crates/fluss/src/rpc/message/get_database_info.rs index 63647d52..35b0eb22 100644 --- a/crates/fluss/src/rpc/message/get_database_info.rs +++ b/crates/fluss/src/rpc/message/get_database_info.rs @@ -26,7 +26,7 @@ use prost::Message; #[derive(Debug)] pub struct GetDatabaseInfoRequest { - pub inner_request: proto::GetDatabaseInfoRequest, + pub(crate) inner_request: proto::GetDatabaseInfoRequest, } impl GetDatabaseInfoRequest { diff --git a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs index a23a985d..75f70349 100644 --- a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs +++ b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs @@ -30,7 +30,7 @@ use prost::Message; #[derive(Debug)] pub struct GetLatestLakeSnapshotRequest { - pub inner_request: proto::GetLatestLakeSnapshotRequest, + pub(crate) inner_request: proto::GetLatestLakeSnapshotRequest, } impl GetLatestLakeSnapshotRequest { diff --git a/crates/fluss/src/rpc/message/get_security_token.rs b/crates/fluss/src/rpc/message/get_security_token.rs index 741c8482..2c825ac3 100644 --- a/crates/fluss/src/rpc/message/get_security_token.rs +++ b/crates/fluss/src/rpc/message/get_security_token.rs @@ -25,7 +25,7 @@ use prost::Message; #[derive(Debug)] pub struct GetSecurityTokenRequest { - pub inner_request: GetFileSystemSecurityTokenRequest, + pub(crate) inner_request: GetFileSystemSecurityTokenRequest, } impl GetSecurityTokenRequest { diff --git a/crates/fluss/src/rpc/message/get_table.rs b/crates/fluss/src/rpc/message/get_table.rs index a7562f92..6165e8d7 100644 --- a/crates/fluss/src/rpc/message/get_table.rs +++ b/crates/fluss/src/rpc/message/get_table.rs @@ -29,7 +29,7 @@ use prost::Message; #[derive(Debug)] pub struct GetTableRequest { - pub inner_request: GetTableInfoRequest, + pub(crate) inner_request: GetTableInfoRequest, } impl GetTableRequest { diff --git a/crates/fluss/src/rpc/message/get_table_schema.rs b/crates/fluss/src/rpc/message/get_table_schema.rs index 1c7c00b7..cd2f20ad 100644 --- a/crates/fluss/src/rpc/message/get_table_schema.rs +++ b/crates/fluss/src/rpc/message/get_table_schema.rs @@ -30,7 +30,7 @@ use prost::Message; /// `schema_id = None` requests the latest schema. #[derive(Debug)] pub struct GetTableSchemaRequestMsg { - pub inner_request: GetTableSchemaRequest, + pub(crate) inner_request: GetTableSchemaRequest, } impl GetTableSchemaRequestMsg { diff --git a/crates/fluss/src/rpc/message/get_table_stats.rs b/crates/fluss/src/rpc/message/get_table_stats.rs new file mode 100644 index 00000000..c5ef610a --- /dev/null +++ b/crates/fluss/src/rpc/message/get_table_stats.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::BucketStatsRequest; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetTableStatsRequest { + pub(crate) inner_request: proto::GetTableStatsRequest, +} + +impl GetTableStatsRequest { + pub fn new( + table_id: i64, + buckets_req: Vec, + target_columns: Vec, + ) -> Self { + GetTableStatsRequest { + inner_request: proto::GetTableStatsRequest { + table_id, + buckets_req: buckets_req.iter().map(BucketStatsRequest::to_pb).collect(), + target_columns, + }, + } + } +} + +impl RequestBody for GetTableStatsRequest { + type ResponseBody = proto::GetTableStatsResponse; + const API_KEY: ApiKey = ApiKey::GetTableStats; +} + +impl_write_type!(GetTableStatsRequest); +impl_read_type!(proto::GetTableStatsResponse); diff --git a/crates/fluss/src/rpc/message/init_writer.rs b/crates/fluss/src/rpc/message/init_writer.rs index b2e64a5f..ec30de5d 100644 --- a/crates/fluss/src/rpc/message/init_writer.rs +++ b/crates/fluss/src/rpc/message/init_writer.rs @@ -24,11 +24,11 @@ use bytes::{Buf, BufMut}; use prost::Message; pub struct InitWriterRequest { - pub inner_request: proto::InitWriterRequest, + pub(crate) inner_request: proto::InitWriterRequest, } impl InitWriterRequest { - pub fn new(table_paths: Vec) -> Self { + pub(crate) fn new(table_paths: Vec) -> Self { InitWriterRequest { inner_request: proto::InitWriterRequest { table_path: table_paths, diff --git a/crates/fluss/src/rpc/message/limit_scan.rs b/crates/fluss/src/rpc/message/limit_scan.rs index c71b03c3..cf9a0363 100644 --- a/crates/fluss/src/rpc/message/limit_scan.rs +++ b/crates/fluss/src/rpc/message/limit_scan.rs @@ -27,7 +27,7 @@ use prost::Message; use bytes::{Buf, BufMut}; pub struct LimitScanRequest { - pub inner_request: proto::LimitScanRequest, + pub(crate) inner_request: proto::LimitScanRequest, } impl LimitScanRequest { diff --git a/crates/fluss/src/rpc/message/list_acls.rs b/crates/fluss/src/rpc/message/list_acls.rs new file mode 100644 index 00000000..1b4bdc9f --- /dev/null +++ b/crates/fluss/src/rpc/message/list_acls.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::AclFilter; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ListAclsRequest { + pub(crate) inner_request: proto::ListAclsRequest, +} + +impl ListAclsRequest { + pub fn new(acl_filter: AclFilter) -> Self { + ListAclsRequest { + inner_request: proto::ListAclsRequest { + acl_filter: acl_filter.to_pb(), + }, + } + } +} + +impl RequestBody for ListAclsRequest { + type ResponseBody = proto::ListAclsResponse; + const API_KEY: ApiKey = ApiKey::ListAcls; +} + +impl_write_type!(ListAclsRequest); +impl_read_type!(proto::ListAclsResponse); diff --git a/crates/fluss/src/rpc/message/list_database_summaries.rs b/crates/fluss/src/rpc/message/list_database_summaries.rs new file mode 100644 index 00000000..6acea72f --- /dev/null +++ b/crates/fluss/src/rpc/message/list_database_summaries.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::WriteError; +use crate::rpc::message::{RequestBody, WriteType}; +use crate::{impl_write_type, proto}; +use bytes::BufMut; +use prost::Message; + +#[derive(Debug, Default)] +pub struct ListDatabaseSummariesRequest { + pub(crate) inner_request: proto::ListDatabasesRequest, +} + +impl ListDatabaseSummariesRequest { + pub fn new() -> Self { + ListDatabaseSummariesRequest { + inner_request: proto::ListDatabasesRequest { + include_summary: Some(true), + }, + } + } +} + +impl RequestBody for ListDatabaseSummariesRequest { + type ResponseBody = proto::ListDatabasesResponse; + const API_KEY: ApiKey = ApiKey::ListDatabases; +} + +impl_write_type!(ListDatabaseSummariesRequest); diff --git a/crates/fluss/src/rpc/message/list_databases.rs b/crates/fluss/src/rpc/message/list_databases.rs index 74ca4944..064a1e94 100644 --- a/crates/fluss/src/rpc/message/list_databases.rs +++ b/crates/fluss/src/rpc/message/list_databases.rs @@ -26,7 +26,7 @@ use prost::Message; #[derive(Debug, Default)] pub struct ListDatabasesRequest { - pub inner_request: proto::ListDatabasesRequest, + pub(crate) inner_request: proto::ListDatabasesRequest, } impl ListDatabasesRequest { diff --git a/crates/fluss/src/rpc/message/list_offsets.rs b/crates/fluss/src/rpc/message/list_offsets.rs index 2ec14370..a690b82e 100644 --- a/crates/fluss/src/rpc/message/list_offsets.rs +++ b/crates/fluss/src/rpc/message/list_offsets.rs @@ -68,7 +68,7 @@ impl OffsetSpec { #[derive(Debug)] pub struct ListOffsetsRequest { - pub inner_request: proto::ListOffsetsRequest, + pub(crate) inner_request: proto::ListOffsetsRequest, } impl ListOffsetsRequest { diff --git a/crates/fluss/src/rpc/message/list_partition_infos.rs b/crates/fluss/src/rpc/message/list_partition_infos.rs index cf24f466..091af6da 100644 --- a/crates/fluss/src/rpc/message/list_partition_infos.rs +++ b/crates/fluss/src/rpc/message/list_partition_infos.rs @@ -27,7 +27,7 @@ use prost::Message; #[derive(Debug)] pub struct ListPartitionInfosRequest { - pub inner_request: proto::ListPartitionInfosRequest, + pub(crate) inner_request: proto::ListPartitionInfosRequest, } impl ListPartitionInfosRequest { diff --git a/crates/fluss/src/rpc/message/list_tables.rs b/crates/fluss/src/rpc/message/list_tables.rs index 8ff72141..126c8001 100644 --- a/crates/fluss/src/rpc/message/list_tables.rs +++ b/crates/fluss/src/rpc/message/list_tables.rs @@ -29,7 +29,7 @@ use prost::Message; #[derive(Debug)] pub struct ListTablesRequest { - pub inner_request: proto::ListTablesRequest, + pub(crate) inner_request: proto::ListTablesRequest, } impl ListTablesRequest { diff --git a/crates/fluss/src/rpc/message/lookup.rs b/crates/fluss/src/rpc/message/lookup.rs index e205fa6b..2f058cde 100644 --- a/crates/fluss/src/rpc/message/lookup.rs +++ b/crates/fluss/src/rpc/message/lookup.rs @@ -28,7 +28,7 @@ use prost::Message; use bytes::{Buf, BufMut}; pub struct LookupRequest { - pub inner_request: proto::LookupRequest, + pub(crate) inner_request: proto::LookupRequest, } impl LookupRequest { diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index 096066ed..f126d214 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -19,12 +19,18 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use bytes::{Buf, BufMut}; +mod alter_cluster_configs; +mod alter_database; +mod alter_table; mod api_versions; mod authenticate; +mod create_acls; mod create_database; mod create_partition; mod create_table; mod database_exists; +mod describe_cluster_configs; +mod drop_acls; mod drop_database; mod drop_partition; mod drop_table; @@ -34,9 +40,12 @@ mod get_latest_lake_snapshot; mod get_security_token; mod get_table; mod get_table_schema; +mod get_table_stats; mod header; mod init_writer; mod limit_scan; +mod list_acls; +mod list_database_summaries; mod list_databases; mod list_offsets; mod list_partition_infos; @@ -49,12 +58,18 @@ mod table_exists; mod update_metadata; pub use crate::rpc::RpcError; +pub use alter_cluster_configs::*; +pub use alter_database::*; +pub use alter_table::*; pub use api_versions::*; pub use authenticate::*; +pub use create_acls::*; pub use create_database::*; pub use create_partition::*; pub use create_table::*; pub use database_exists::*; +pub use describe_cluster_configs::*; +pub use drop_acls::*; pub use drop_database::*; pub use drop_partition::*; pub use drop_table::*; @@ -64,9 +79,12 @@ pub use get_latest_lake_snapshot::*; pub use get_security_token::*; pub use get_table::*; pub use get_table_schema::*; +pub use get_table_stats::*; pub use header::*; pub use init_writer::*; pub use limit_scan::*; +pub use list_acls::*; +pub use list_database_summaries::*; pub use list_databases::*; pub use list_offsets::*; pub use list_partition_infos::*; diff --git a/crates/fluss/src/rpc/message/prefix_lookup.rs b/crates/fluss/src/rpc/message/prefix_lookup.rs index e71ffe7c..afafbbf7 100644 --- a/crates/fluss/src/rpc/message/prefix_lookup.rs +++ b/crates/fluss/src/rpc/message/prefix_lookup.rs @@ -28,7 +28,7 @@ use prost::Message; use bytes::{Buf, BufMut}; pub struct PrefixLookupRequest { - pub inner_request: proto::PrefixLookupRequest, + pub(crate) inner_request: proto::PrefixLookupRequest, } impl PrefixLookupRequest { diff --git a/crates/fluss/src/rpc/message/produce_log.rs b/crates/fluss/src/rpc/message/produce_log.rs index 8be24638..dc3acb72 100644 --- a/crates/fluss/src/rpc/message/produce_log.rs +++ b/crates/fluss/src/rpc/message/produce_log.rs @@ -28,7 +28,7 @@ use bytes::{Buf, BufMut}; use prost::Message; pub struct ProduceLogRequest { - pub inner_request: proto::ProduceLogRequest, + pub(crate) inner_request: proto::ProduceLogRequest, } impl ProduceLogRequest { diff --git a/crates/fluss/src/rpc/message/put_kv.rs b/crates/fluss/src/rpc/message/put_kv.rs index e76496d1..d9f2ad3f 100644 --- a/crates/fluss/src/rpc/message/put_kv.rs +++ b/crates/fluss/src/rpc/message/put_kv.rs @@ -27,7 +27,7 @@ use prost::Message; #[allow(dead_code)] pub struct PutKvRequest { - pub inner_request: proto::PutKvRequest, + pub(crate) inner_request: proto::PutKvRequest, } #[allow(dead_code)] diff --git a/crates/fluss/src/rpc/message/table_exists.rs b/crates/fluss/src/rpc/message/table_exists.rs index 5bc848e3..95bee24a 100644 --- a/crates/fluss/src/rpc/message/table_exists.rs +++ b/crates/fluss/src/rpc/message/table_exists.rs @@ -30,7 +30,7 @@ use bytes::{Buf, BufMut}; use prost::Message; #[derive(Debug)] pub struct TableExistsRequest { - pub inner_request: proto::TableExistsRequest, + pub(crate) inner_request: proto::TableExistsRequest, } impl TableExistsRequest { diff --git a/crates/fluss/src/rpc/message/update_metadata.rs b/crates/fluss/src/rpc/message/update_metadata.rs index fd96ca5e..797fd4a0 100644 --- a/crates/fluss/src/rpc/message/update_metadata.rs +++ b/crates/fluss/src/rpc/message/update_metadata.rs @@ -29,7 +29,7 @@ use bytes::{Buf, BufMut}; use prost::Message; pub struct UpdateMetadataRequest { - pub inner_request: proto::MetadataRequest, + pub(crate) inner_request: proto::MetadataRequest, } impl UpdateMetadataRequest { diff --git a/crates/fluss/src/rpc/mod.rs b/crates/fluss/src/rpc/mod.rs index 6f3a88d1..a4b191d8 100644 --- a/crates/fluss/src/rpc/mod.rs +++ b/crates/fluss/src/rpc/mod.rs @@ -29,4 +29,4 @@ pub use server_connection::*; mod convert; mod transport; -pub use convert::*; +pub(crate) use convert::*; diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index a1ea4657..a40fd58d 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -85,7 +85,7 @@ pub struct ServerApiVersions { impl ServerApiVersions { /// Build from the server's advertised API version list. - pub fn new(server_versions: &[PbApiVersion]) -> Self { + pub(crate) fn new(server_versions: &[PbApiVersion]) -> Self { let mut versions = HashMap::new(); for sv in server_versions { let api_key = ApiKey::from(i16::try_from(sv.api_key).unwrap()); @@ -207,7 +207,7 @@ impl RpcClient { self } - pub async fn get_connection( + pub(crate) async fn get_connection( &self, server_node: &ServerNode, ) -> Result { @@ -534,7 +534,7 @@ where matches!(*guard, ConnectionState::Poison(_)) } - pub async fn request(&self, msg: R) -> Result + pub(crate) async fn request(&self, msg: R) -> Result where R: RequestBody + Send + WriteType>, R::ResponseBody: ReadType>>,