From 9d2a5aff42068fa6c8fad1d7985f0df015964676 Mon Sep 17 00:00:00 2001 From: Xiaobing Fang Date: Mon, 15 Jun 2026 21:08:20 +0800 Subject: [PATCH 1/2] fix: allow unknown bootstrap server type --- bindings/elixir/lib/fluss/server_node.ex | 2 +- bindings/elixir/native/fluss_nif/src/admin.rs | 2 ++ crates/fluss/src/client/metadata.rs | 2 +- crates/fluss/src/cluster/mod.rs | 14 ++++++++++---- crates/fluss/src/rpc/server_connection.rs | 19 +++++++++++++++++++ 5 files changed, 33 insertions(+), 6 deletions(-) diff --git a/bindings/elixir/lib/fluss/server_node.ex b/bindings/elixir/lib/fluss/server_node.ex index 5441aa23..3ce70418 100644 --- a/bindings/elixir/lib/fluss/server_node.ex +++ b/bindings/elixir/lib/fluss/server_node.ex @@ -29,7 +29,7 @@ defmodule Fluss.ServerNode do uid: String.t(), host: String.t(), port: non_neg_integer(), - server_type: :tablet_server | :coordinator_server + server_type: :tablet_server | :coordinator_server | :unknown } @spec url(t()) :: String.t() diff --git a/bindings/elixir/native/fluss_nif/src/admin.rs b/bindings/elixir/native/fluss_nif/src/admin.rs index 7a206a5d..6f79a97f 100644 --- a/bindings/elixir/native/fluss_nif/src/admin.rs +++ b/bindings/elixir/native/fluss_nif/src/admin.rs @@ -30,6 +30,7 @@ use std::sync::Arc; pub enum NifServerType { TabletServer, CoordinatorServer, + Unknown, } #[derive(NifStruct)] @@ -52,6 +53,7 @@ impl NifServerNode { server_type: match node.server_type() { ServerType::TabletServer => NifServerType::TabletServer, ServerType::CoordinatorServer => NifServerType::CoordinatorServer, + ServerType::Unknown => NifServerType::Unknown, }, } } diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 1e3ee7fe..e27392f1 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -88,7 +88,7 @@ impl Metadata { -1, socket_address.ip().to_string(), socket_address.port() as u32, - ServerType::CoordinatorServer, + ServerType::Unknown, ); let con = connections.get_connection(&server_node).await?; diff --git a/crates/fluss/src/cluster/mod.rs b/crates/fluss/src/cluster/mod.rs index 863f8ed5..1f5cf4b2 100644 --- a/crates/fluss/src/cluster/mod.rs +++ b/crates/fluss/src/cluster/mod.rs @@ -36,12 +36,14 @@ pub struct ServerNode { impl ServerNode { pub fn new(id: i32, host: String, port: u32, server_type: ServerType) -> ServerNode { + let uid = match &server_type { + ServerType::CoordinatorServer => format!("cs-{id}"), + ServerType::TabletServer => format!("ts-{id}"), + ServerType::Unknown => format!("unknown-{host}:{port}"), + }; ServerNode { id, - uid: match server_type { - ServerType::CoordinatorServer => format!("cs-{id}"), - ServerType::TabletServer => format!("ts-{id}"), - }, + uid, host, port, server_type, @@ -77,6 +79,7 @@ impl ServerNode { pub enum ServerType { TabletServer, CoordinatorServer, + Unknown, } impl ServerType { @@ -84,6 +87,7 @@ impl ServerType { match self { ServerType::CoordinatorServer => 1, ServerType::TabletServer => 2, + ServerType::Unknown => -1, } } @@ -91,6 +95,7 @@ impl ServerType { match type_id { 1 => Some(ServerType::CoordinatorServer), 2 => Some(ServerType::TabletServer), + -1 => Some(ServerType::Unknown), _ => None, } } @@ -101,6 +106,7 @@ impl fmt::Display for ServerType { match self { ServerType::TabletServer => write!(f, "TabletServer"), ServerType::CoordinatorServer => write!(f, "CoordinatorServer"), + ServerType::Unknown => write!(f, "Unknown"), } } } diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 762c7ea9..12d411c5 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -156,6 +156,10 @@ fn validate_server_type( expected: &ServerType, response_server_type: Option, ) -> Result<(), Error> { + if *expected == ServerType::Unknown { + return Ok(()); + } + // For forward-compat with servers that do not populate `server_type`, validation is skipped. let Some(type_id) = response_server_type else { return Ok(()); @@ -1268,6 +1272,21 @@ mod tests { ) .is_ok() ); + assert!( + validate_server_type( + &ServerType::Unknown, + Some(ServerType::CoordinatorServer.to_type_id()), + ) + .is_ok() + ); + assert!( + validate_server_type( + &ServerType::Unknown, + Some(ServerType::TabletServer.to_type_id()), + ) + .is_ok() + ); + assert!(validate_server_type(&ServerType::Unknown, Some(99),).is_ok()); // Mismatch: connected to a coordinator while expecting a tablet server // (and vice versa). From 69b8eb472da5f9e142e01068cbb93e2acccc11ea Mon Sep 17 00:00:00 2001 From: Xiaobing Fang Date: Tue, 16 Jun 2026 10:07:50 +0800 Subject: [PATCH 2/2] test: cover tablet bootstrap server endpoint --- crates/fluss-test-cluster/src/lib.rs | 21 +++++++++++++++ crates/fluss/tests/integration/admin.rs | 35 +++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/crates/fluss-test-cluster/src/lib.rs b/crates/fluss-test-cluster/src/lib.rs index 041c21b0..fbff0acb 100644 --- a/crates/fluss-test-cluster/src/lib.rs +++ b/crates/fluss-test-cluster/src/lib.rs @@ -165,6 +165,18 @@ impl FlussTestingClusterBuilder { } } + fn plaintext_tablet_bootstrap_servers(&self) -> HashMap { + let base_port = self.plain_client_port.unwrap_or(self.coordinator_host_port); + (0..self.number_of_tablet_servers) + .map(|server_id| { + ( + server_id, + format!("127.0.0.1:{}", base_port + 1 + server_id), + ) + }) + .collect() + } + fn all_containers_exist(&self) -> bool { self.container_names().iter().all(|name| { std::process::Command::new("docker") @@ -206,12 +218,14 @@ impl FlussTestingClusterBuilder { } let (bootstrap_servers, sasl_bootstrap_servers) = self.bootstrap_addresses(); + let plaintext_tablet_bootstrap_servers = self.plaintext_tablet_bootstrap_servers(); FlussTestingCluster { zookeeper, coordinator_server, tablet_servers, bootstrap_servers, + plaintext_tablet_bootstrap_servers, sasl_bootstrap_servers, remote_data_dir: self.remote_data_dir.clone(), sasl_users: self.sasl_users.clone(), @@ -388,6 +402,7 @@ pub struct FlussTestingCluster { coordinator_server: Arc>, tablet_servers: HashMap>>, bootstrap_servers: String, + plaintext_tablet_bootstrap_servers: HashMap, sasl_bootstrap_servers: Option, remote_data_dir: Option, sasl_users: Vec<(String, String)>, @@ -414,6 +429,12 @@ impl FlussTestingCluster { &self.bootstrap_servers } + pub fn plaintext_tablet_bootstrap_server(&self, server_id: u16) -> Option<&str> { + self.plaintext_tablet_bootstrap_servers + .get(&server_id) + .map(String::as_str) + } + pub async fn get_fluss_connection(&self) -> FlussConnection { let config = Config { writer_acks: "all".to_string(), diff --git a/crates/fluss/tests/integration/admin.rs b/crates/fluss/tests/integration/admin.rs index 0860cbef..dfb00b0d 100644 --- a/crates/fluss/tests/integration/admin.rs +++ b/crates/fluss/tests/integration/admin.rs @@ -18,6 +18,8 @@ #[cfg(test)] mod admin_test { use crate::integration::utils::get_shared_cluster; + use fluss::client::FlussConnection; + use fluss::config::Config; use fluss::error::FlussError; use fluss::metadata::{ DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, PartitionSpec, Schema, @@ -521,6 +523,39 @@ mod admin_test { } } + #[tokio::test] + async fn test_bootstrap_from_tablet_server() { + let cluster = get_shared_cluster(); + let bootstrap_servers = cluster + .plaintext_tablet_bootstrap_server(0) + .expect("shared cluster should expose tablet server bootstrap") + .to_string(); + + let connection = FlussConnection::new(Config { + writer_acks: "all".to_string(), + bootstrap_servers, + ..Default::default() + }) + .await + .expect("should bootstrap from tablet server"); + let admin = connection.get_admin().expect("should get admin"); + + let nodes = admin + .get_server_nodes() + .await + .expect("should get server nodes"); + + let has_coordinator = nodes + .iter() + .any(|n| *n.server_type() == fluss::ServerType::CoordinatorServer); + assert!(has_coordinator, "Expected a coordinator server node"); + + let has_tablet = nodes + .iter() + .any(|n| *n.server_type() == fluss::ServerType::TabletServer); + assert!(has_tablet, "Expected a tablet server node"); + } + #[tokio::test] async fn test_error_table_not_partitioned() { let cluster = get_shared_cluster();