Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/fluss/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ storage-fs = ["opendal/services-fs"]
storage-s3 = ["opendal/services-s3"]
storage-oss = ["opendal/services-oss"]
integration_tests = []
# Gates tests that exercise APIs only available on Fluss 1.x servers.
# Enable alongside `integration_tests` when running against a 1.x server image.
fluss_v1 = []

[dependencies]
arrow = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/fluss/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ fn main() -> Result<()> {
config.bytes([
".proto.PbProduceLogReqForBucket.records",
".proto.PbPutKvReqForBucket.records",
".proto.PbLookupReqForBucket.key",
".proto.PbLookupReqForBucket.keys",
".proto.PbPrefixLookupReqForBucket.keys",
".proto.ScanKvResponse.records",
]);
config.compile_protos(&["src/proto/fluss_api.proto"], &["src/proto"])?;
Ok(())
Expand Down
339 changes: 336 additions & 3 deletions crates/fluss/src/client/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,24 @@ use crate::metadata::{
PhysicalTablePath, Schema, SchemaInfo, TableBucket, TableDescriptor, TableInfo, TablePath,
};
use crate::rpc::message::{
AcquireKvSnapshotLeaseRequest, AddServerTagRequest, AlterClusterConfigsRequest,
AlterDatabaseRequest, AlterTableRequest, CancelRebalanceRequest, CreateAclsRequest,
CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest,
DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest,
GetLatestLakeSnapshotRequest, GetTableRequest, GetTableSchemaRequestMsg, ListDatabasesRequest,
ListPartitionInfosRequest, ListTablesRequest, TableExistsRequest,
DeleteProducerOffsetsRequest, DescribeClusterConfigsRequest, DropAclsRequest,
DropDatabaseRequest, DropKvSnapshotLeaseRequest, DropPartitionRequest, DropTableRequest,
GetClusterHealthRequest, GetDatabaseInfoRequest, GetKvSnapshotMetadataRequest,
GetLakeSnapshotRequest, GetLatestKvSnapshotsRequest, GetLatestLakeSnapshotRequest,
GetProducerOffsetsRequest, GetTableRequest, GetTableSchemaRequestMsg, GetTableStatsRequest,
ListAclsRequest, ListDatabaseSummariesRequest, ListDatabasesRequest, ListKvSnapshotsRequest,
ListPartitionInfosRequest, ListRebalanceProgressRequest, ListRemoteLogManifestsRequest,
ListTablesRequest, RebalanceRequest, RegisterProducerOffsetsRequest,
ReleaseKvSnapshotLeaseRequest, RemoveServerTagRequest, TableExistsRequest,
};
use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
use crate::rpc::{RpcClient, ServerConnection};

use crate::error::{Error, Result};
use crate::proto;
use crate::proto::GetTableInfoResponse;
use crate::{BucketId, PartitionId, TableId};
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -150,6 +159,7 @@ impl FlussAdmin {
table_json,
created_time,
modified_time,
..
} = response;
let v: &[u8] = &table_json[..];
let table_descriptor =
Expand Down Expand Up @@ -482,4 +492,327 @@ impl FlussAdmin {
}
Ok(tasks)
}

/// List database summaries (name, created_time, table_count).
pub async fn list_database_summaries(&self) -> Result<Vec<proto::PbDatabaseSummary>> {
let response = self
.admin_gateway()
.await?
.request(ListDatabaseSummariesRequest::new())
.await?;
Ok(response.database_summary)
}

/// Alter a database's configuration.
pub async fn alter_database(
&self,
name: &str,
config_changes: Vec<proto::PbAlterConfig>,
ignore_if_not_exists: bool,
) -> Result<()> {
let _response = self
.admin_gateway()
.await?
.request(AlterDatabaseRequest::new(
name,
ignore_if_not_exists,
config_changes,
))
.await?;
Ok(())
}

/// Alter a table (config changes, add columns).
pub async fn alter_table(
&self,
table_path: &TablePath,
config_changes: Vec<proto::PbAlterConfig>,
add_columns: Vec<proto::PbAddColumn>,
) -> Result<()> {
let _response = self
.admin_gateway()
.await?
.request(AlterTableRequest::new(
table_path,
false,
config_changes,
add_columns,
vec![],
vec![],
vec![],
))
.await?;
Ok(())
}

/// Get table statistics for buckets.
pub async fn get_table_stats(
&self,
table_id: i64,
buckets_req: Vec<proto::PbTableStatsReqForBucket>,
) -> Result<proto::GetTableStatsResponse> {
self.admin_gateway()
.await?
.request(GetTableStatsRequest::new(table_id, buckets_req))
.await
}

/// Get the latest KV snapshots for a table.
pub async fn get_latest_kv_snapshots(
&self,
table_path: &TablePath,
partition_name: Option<&str>,
) -> Result<proto::GetLatestKvSnapshotsResponse> {
self.admin_gateway()
.await?
.request(GetLatestKvSnapshotsRequest::new(table_path, partition_name))
.await
}

/// Get KV snapshot metadata.
pub async fn get_kv_snapshot_metadata(
&self,
table_id: i64,
partition_id: Option<i64>,
bucket_id: i32,
snapshot_id: i64,
) -> Result<proto::GetKvSnapshotMetadataResponse> {
self.admin_gateway()
.await?
.request(GetKvSnapshotMetadataRequest::new(
table_id,
partition_id,
bucket_id,
snapshot_id,
))
.await
}

/// Acquire a KV snapshot lease.
pub async fn create_kv_snapshot_lease(
&self,
lease_id: &str,
lease_duration_ms: i64,
snapshots_to_lease: Vec<proto::PbKvSnapshotLeaseForTable>,
) -> Result<proto::AcquireKvSnapshotLeaseResponse> {
self.admin_gateway()
.await?
.request(AcquireKvSnapshotLeaseRequest::new(
lease_id,
lease_duration_ms,
snapshots_to_lease,
))
.await
}

/// Get a lake snapshot for a table.
pub async fn get_lake_snapshot(
&self,
table_path: &TablePath,
snapshot_id: Option<i64>,
) -> Result<proto::GetLakeSnapshotResponse> {
self.admin_gateway()
.await?
.request(GetLakeSnapshotRequest::new(table_path, snapshot_id, None))
.await
}

/// Create ACLs.
pub async fn create_acls(
&self,
acl: Vec<proto::PbAclInfo>,
) -> Result<proto::CreateAclsResponse> {
self.admin_gateway()
.await?
.request(CreateAclsRequest::new(acl))
.await
}

/// List ACLs matching a filter.
pub async fn list_acls(
&self,
acl_filter: proto::PbAclFilter,
) -> Result<proto::ListAclsResponse> {
self.admin_gateway()
.await?
.request(ListAclsRequest::new(acl_filter))
.await
}

/// Drop ACLs matching filters.
pub async fn drop_acls(
&self,
acl_filter: Vec<proto::PbAclFilter>,
) -> Result<proto::DropAclsResponse> {
self.admin_gateway()
.await?
.request(DropAclsRequest::new(acl_filter))
.await
}

/// Describe cluster configuration.
pub async fn describe_cluster_configs(&self) -> Result<proto::DescribeClusterConfigsResponse> {
self.admin_gateway()
.await?
.request(DescribeClusterConfigsRequest::new())
.await
}

/// Alter cluster configuration.
pub async fn alter_cluster_configs(
&self,
alter_configs: Vec<proto::PbAlterConfig>,
) -> Result<()> {
let _response = self
.admin_gateway()
.await?
.request(AlterClusterConfigsRequest::new(alter_configs))
.await?;
Ok(())
}

/// Add a tag to servers.
pub async fn add_server_tag(&self, server_ids: Vec<i32>, server_tag: i32) -> Result<()> {
let _response = self
.admin_gateway()
.await?
.request(AddServerTagRequest::new(server_ids, server_tag))
.await?;
Ok(())
}

/// Remove a tag from servers.
pub async fn remove_server_tag(&self, server_ids: Vec<i32>, server_tag: i32) -> Result<()> {
let _response = self
.admin_gateway()
.await?
.request(RemoveServerTagRequest::new(server_ids, server_tag))
.await?;
Ok(())
}

/// Trigger a rebalance.
pub async fn rebalance(&self, goals: Vec<i32>) -> Result<proto::RebalanceResponse> {
self.admin_gateway()
.await?
.request(RebalanceRequest::new(goals))
.await
}

/// List rebalance progress.
pub async fn list_rebalance_progress(
&self,
rebalance_id: Option<&str>,
) -> Result<proto::ListRebalanceProgressResponse> {
self.admin_gateway()
.await?
.request(ListRebalanceProgressRequest::new(rebalance_id))
.await
}

/// Cancel a rebalance.
pub async fn cancel_rebalance(&self, rebalance_id: Option<&str>) -> Result<()> {
let _response = self
.admin_gateway()
.await?
.request(CancelRebalanceRequest::new(rebalance_id))
.await?;
Ok(())
}

/// Register producer offsets.
pub async fn register_producer_offsets(
&self,
producer_id: &str,
table_offsets: Vec<proto::PbProducerTableOffsets>,
) -> Result<proto::RegisterProducerOffsetsResponse> {
self.admin_gateway()
.await?
.request(RegisterProducerOffsetsRequest::new(
producer_id,
table_offsets,
None,
))
.await
}

/// Get producer offsets.
pub async fn get_producer_offsets(
&self,
producer_id: &str,
) -> Result<proto::GetProducerOffsetsResponse> {
self.admin_gateway()
.await?
.request(GetProducerOffsetsRequest::new(producer_id))
.await
}

/// Delete producer offsets.
pub async fn delete_producer_offsets(&self, producer_id: &str) -> Result<()> {
let _response = self
.admin_gateway()
.await?
.request(DeleteProducerOffsetsRequest::new(producer_id))
.await?;
Ok(())
}

/// Get cluster health status.
pub async fn get_cluster_health(&self) -> Result<proto::GetClusterHealthResponse> {
self.admin_gateway()
.await?
.request(GetClusterHealthRequest::new())
.await
}

/// List remote log manifests for a table.
pub async fn list_remote_log_manifests(
&self,
table_id: i64,
partition_id: Option<i64>,
) -> Result<proto::ListRemoteLogManifestsResponse> {
self.admin_gateway()
.await?
.request(ListRemoteLogManifestsRequest::new(table_id, partition_id))
.await
}

/// List active KV snapshots for a table.
pub async fn list_kv_snapshots(
&self,
table_id: i64,
partition_id: Option<i64>,
) -> Result<proto::ListKvSnapshotsResponse> {
self.admin_gateway()
.await?
.request(ListKvSnapshotsRequest::new(table_id, partition_id))
.await
}

/// Release specific bucket snapshots from a KV snapshot lease.
pub async fn release_kv_snapshot_lease(
&self,
lease_id: &str,
buckets_to_release: Vec<proto::PbTableBucket>,
) -> Result<()> {
let _response = self
.admin_gateway()
.await?
.request(ReleaseKvSnapshotLeaseRequest::new(
lease_id,
buckets_to_release,
))
.await?;
Ok(())
}

/// Drop an entire KV snapshot lease.
pub async fn drop_kv_snapshot_lease(&self, lease_id: &str) -> Result<()> {
let _response = self
.admin_gateway()
.await?
.request(DropKvSnapshotLeaseRequest::new(lease_id))
.await?;
Ok(())
}
}
Loading