Skip to content

Commit d75ef69

Browse files
Update crates/fluss/src/client/connection.rs
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 351f6d9 commit d75ef69

2 files changed

Lines changed: 60 additions & 45 deletions

File tree

crates/fluss/src/client/admin.rs

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,30 +39,27 @@ use tokio::task::JoinHandle;
3939

4040
#[derive(Clone)]
4141
pub struct FlussAdmin {
42-
admin_gateway: ServerConnection,
43-
#[allow(dead_code)]
4442
metadata: Arc<Metadata>,
45-
#[allow(dead_code)]
4643
rpc_client: Arc<RpcClient>,
4744
}
4845

4946
impl FlussAdmin {
50-
pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Result<Self> {
51-
let admin_con =
52-
connections
53-
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
54-
|| Error::UnexpectedError {
55-
message: "Coordinator server not found in cluster metadata".to_string(),
56-
source: None,
57-
},
58-
)?)
59-
.await?;
60-
61-
Ok(FlussAdmin {
62-
admin_gateway: admin_con,
47+
pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
48+
FlussAdmin {
6349
metadata,
6450
rpc_client: connections,
65-
})
51+
}
52+
}
53+
54+
async fn admin_gateway(&self) -> Result<ServerConnection> {
55+
let cluster = self.metadata.get_cluster();
56+
let coordinator = cluster.get_coordinator_server().ok_or_else(|| {
57+
Error::UnexpectedError {
58+
message: "Coordinator server not found in cluster metadata".to_string(),
59+
source: None,
60+
}
61+
})?;
62+
self.rpc_client.get_connection(coordinator).await
6663
}
6764

6865
pub async fn create_database(
@@ -72,7 +69,8 @@ impl FlussAdmin {
7269
ignore_if_exists: bool,
7370
) -> Result<()> {
7471
let _response = self
75-
.admin_gateway
72+
.admin_gateway()
73+
.await?
7674
.request(CreateDatabaseRequest::new(
7775
database_name,
7876
database_descriptor,
@@ -89,7 +87,8 @@ impl FlussAdmin {
8987
ignore_if_exists: bool,
9088
) -> Result<()> {
9189
let _response = self
92-
.admin_gateway
90+
.admin_gateway()
91+
.await?
9392
.request(CreateTableRequest::new(
9493
table_path,
9594
table_descriptor,
@@ -105,15 +104,17 @@ impl FlussAdmin {
105104
ignore_if_not_exists: bool,
106105
) -> Result<()> {
107106
let _response = self
108-
.admin_gateway
107+
.admin_gateway()
108+
.await?
109109
.request(DropTableRequest::new(table_path, ignore_if_not_exists))
110110
.await?;
111111
Ok(())
112112
}
113113

114114
pub async fn get_table_info(&self, table_path: &TablePath) -> Result<TableInfo> {
115115
let response = self
116-
.admin_gateway
116+
.admin_gateway()
117+
.await?
117118
.request(GetTableRequest::new(table_path))
118119
.await?;
119120

@@ -145,7 +146,8 @@ impl FlussAdmin {
145146
/// List all tables in the given database
146147
pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
147148
let response = self
148-
.admin_gateway
149+
.admin_gateway()
150+
.await?
149151
.request(ListTablesRequest::new(database_name))
150152
.await?;
151153
Ok(response.table_name)
@@ -163,7 +165,8 @@ impl FlussAdmin {
163165
partial_partition_spec: Option<&PartitionSpec>,
164166
) -> Result<Vec<PartitionInfo>> {
165167
let response = self
166-
.admin_gateway
168+
.admin_gateway()
169+
.await?
167170
.request(ListPartitionInfosRequest::new(
168171
table_path,
169172
partial_partition_spec,
@@ -180,7 +183,8 @@ impl FlussAdmin {
180183
ignore_if_exists: bool,
181184
) -> Result<()> {
182185
let _response = self
183-
.admin_gateway
186+
.admin_gateway()
187+
.await?
184188
.request(CreatePartitionRequest::new(
185189
table_path,
186190
partition_spec,
@@ -198,7 +202,8 @@ impl FlussAdmin {
198202
ignore_if_not_exists: bool,
199203
) -> Result<()> {
200204
let _response = self
201-
.admin_gateway
205+
.admin_gateway()
206+
.await?
202207
.request(DropPartitionRequest::new(
203208
table_path,
204209
partition_spec,
@@ -211,7 +216,8 @@ impl FlussAdmin {
211216
/// Check if a table exists
212217
pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
213218
let response = self
214-
.admin_gateway
219+
.admin_gateway()
220+
.await?
215221
.request(TableExistsRequest::new(table_path))
216222
.await?;
217223
Ok(response.exists)
@@ -225,7 +231,8 @@ impl FlussAdmin {
225231
cascade: bool,
226232
) -> Result<()> {
227233
let _response = self
228-
.admin_gateway
234+
.admin_gateway()
235+
.await?
229236
.request(DropDatabaseRequest::new(
230237
database_name,
231238
ignore_if_not_exists,
@@ -238,7 +245,8 @@ impl FlussAdmin {
238245
/// List all databases
239246
pub async fn list_databases(&self) -> Result<Vec<String>> {
240247
let response = self
241-
.admin_gateway
248+
.admin_gateway()
249+
.await?
242250
.request(ListDatabasesRequest::new())
243251
.await?;
244252
Ok(response.database_name)
@@ -247,7 +255,8 @@ impl FlussAdmin {
247255
/// Check if a database exists
248256
pub async fn database_exists(&self, database_name: &str) -> Result<bool> {
249257
let response = self
250-
.admin_gateway
258+
.admin_gateway()
259+
.await?
251260
.request(DatabaseExistsRequest::new(database_name))
252261
.await?;
253262
Ok(response.exists)
@@ -256,7 +265,7 @@ impl FlussAdmin {
256265
/// Get database information
257266
pub async fn get_database_info(&self, database_name: &str) -> Result<DatabaseInfo> {
258267
let request = GetDatabaseInfoRequest::new(database_name);
259-
let response = self.admin_gateway.request(request).await?;
268+
let response = self.admin_gateway().await?.request(request).await?;
260269

261270
// Convert proto response to DatabaseInfo
262271
let database_descriptor = DatabaseDescriptor::from_json_bytes(&response.database_json)?;
@@ -279,7 +288,8 @@ impl FlussAdmin {
279288
/// Get the latest lake snapshot for a table
280289
pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result<LakeSnapshot> {
281290
let response = self
282-
.admin_gateway
291+
.admin_gateway()
292+
.await?
283293
.request(GetLatestLakeSnapshotRequest::new(table_path))
284294
.await?;
285295

crates/fluss/src/client/connection.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ use crate::config::Config;
2323
use crate::rpc::RpcClient;
2424
use parking_lot::RwLock;
2525
use std::sync::Arc;
26-
use tokio::sync::OnceCell;
2726
use std::time::Duration;
28-
2927
use crate::error::{Error, FlussError, Result};
3028
use crate::metadata::TablePath;
3129

@@ -34,7 +32,7 @@ pub struct FlussConnection {
3432
network_connects: Arc<RpcClient>,
3533
args: Config,
3634
writer_client: RwLock<Option<Arc<WriterClient>>>,
37-
admin_client: OnceCell<FlussAdmin>,
35+
admin_client: RwLock<Option<FlussAdmin>>,
3836
}
3937

4038
impl FlussConnection {
@@ -62,7 +60,7 @@ impl FlussConnection {
6260
network_connects: connections.clone(),
6361
args: arg.clone(),
6462
writer_client: Default::default(),
65-
admin_client: OnceCell::new(),
63+
admin_client: RwLock::new(None),
6664
})
6765
}
6866

@@ -79,16 +77,23 @@ impl FlussConnection {
7977
}
8078

8179
pub async fn get_admin(&self) -> Result<FlussAdmin> {
82-
// Lazily initialize and cache the FlussAdmin instance. The cached FlussAdmin
83-
// holds a reference to RpcClient, which manages connection reuse and re-acquisition
84-
// when a cached connection becomes poisoned. Subsequent calls clone cheaply —
85-
// all internal fields (ServerConnection, Arc<Metadata>, Arc<RpcClient>) are
86-
// Arc-backed so cloning is just a reference-count bump.
87-
let admin = self
88-
.admin_client
89-
.get_or_try_init(|| FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()))
90-
.await?;
91-
Ok(admin.clone())
80+
// 1. Fast path: return cached instance if already initialized.
81+
if let Some(admin) = self.admin_client.read().as_ref() {
82+
return Ok(admin.clone());
83+
}
84+
85+
// 2. Slow path: acquire write lock.
86+
let mut admin_guard = self.admin_client.write();
87+
88+
// 3. Double-check: another thread may have initialized while we waited.
89+
if let Some(admin) = admin_guard.as_ref() {
90+
return Ok(admin.clone());
91+
}
92+
93+
// 4. Initialize and cache.
94+
let admin = FlussAdmin::new(self.network_connects.clone(), self.metadata.clone());
95+
*admin_guard = Some(admin.clone());
96+
Ok(admin)
9297
}
9398

9499
pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {

0 commit comments

Comments
 (0)