diff --git a/bindings/elixir/lib/fluss/server_node.ex b/bindings/elixir/lib/fluss/server_node.ex index 5441aa23..3ce70418 100644 --- a/bindings/elixir/lib/fluss/server_node.ex +++ b/bindings/elixir/lib/fluss/server_node.ex @@ -29,7 +29,7 @@ defmodule Fluss.ServerNode do uid: String.t(), host: String.t(), port: non_neg_integer(), - server_type: :tablet_server | :coordinator_server + server_type: :tablet_server | :coordinator_server | :unknown } @spec url(t()) :: String.t() diff --git a/bindings/elixir/native/fluss_nif/src/admin.rs b/bindings/elixir/native/fluss_nif/src/admin.rs index 7a206a5d..6f79a97f 100644 --- a/bindings/elixir/native/fluss_nif/src/admin.rs +++ b/bindings/elixir/native/fluss_nif/src/admin.rs @@ -30,6 +30,7 @@ use std::sync::Arc; pub enum NifServerType { TabletServer, CoordinatorServer, + Unknown, } #[derive(NifStruct)] @@ -52,6 +53,7 @@ impl NifServerNode { server_type: match node.server_type() { ServerType::TabletServer => NifServerType::TabletServer, ServerType::CoordinatorServer => NifServerType::CoordinatorServer, + ServerType::Unknown => NifServerType::Unknown, }, } } diff --git a/crates/fluss-test-cluster/src/lib.rs b/crates/fluss-test-cluster/src/lib.rs index 041c21b0..fbff0acb 100644 --- a/crates/fluss-test-cluster/src/lib.rs +++ b/crates/fluss-test-cluster/src/lib.rs @@ -165,6 +165,18 @@ impl FlussTestingClusterBuilder { } } + fn plaintext_tablet_bootstrap_servers(&self) -> HashMap { + let base_port = self.plain_client_port.unwrap_or(self.coordinator_host_port); + (0..self.number_of_tablet_servers) + .map(|server_id| { + ( + server_id, + format!("127.0.0.1:{}", base_port + 1 + server_id), + ) + }) + .collect() + } + fn all_containers_exist(&self) -> bool { self.container_names().iter().all(|name| { std::process::Command::new("docker") @@ -206,12 +218,14 @@ impl FlussTestingClusterBuilder { } let (bootstrap_servers, sasl_bootstrap_servers) = self.bootstrap_addresses(); + let plaintext_tablet_bootstrap_servers = self.plaintext_tablet_bootstrap_servers(); FlussTestingCluster { zookeeper, coordinator_server, tablet_servers, bootstrap_servers, + plaintext_tablet_bootstrap_servers, sasl_bootstrap_servers, remote_data_dir: self.remote_data_dir.clone(), sasl_users: self.sasl_users.clone(), @@ -388,6 +402,7 @@ pub struct FlussTestingCluster { coordinator_server: Arc>, tablet_servers: HashMap>>, bootstrap_servers: String, + plaintext_tablet_bootstrap_servers: HashMap, sasl_bootstrap_servers: Option, remote_data_dir: Option, sasl_users: Vec<(String, String)>, @@ -414,6 +429,12 @@ impl FlussTestingCluster { &self.bootstrap_servers } + pub fn plaintext_tablet_bootstrap_server(&self, server_id: u16) -> Option<&str> { + self.plaintext_tablet_bootstrap_servers + .get(&server_id) + .map(String::as_str) + } + pub async fn get_fluss_connection(&self) -> FlussConnection { let config = Config { writer_acks: "all".to_string(), diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 1e3ee7fe..e27392f1 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -88,7 +88,7 @@ impl Metadata { -1, socket_address.ip().to_string(), socket_address.port() as u32, - ServerType::CoordinatorServer, + ServerType::Unknown, ); let con = connections.get_connection(&server_node).await?; diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index d5518709..1b6a3685 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -483,6 +483,14 @@ mod tests { ); } + #[test] + fn test_server_type_from_type_id() { + assert_eq!(ServerType::from_type_id(1), ServerType::CoordinatorServer); + assert_eq!(ServerType::from_type_id(2), ServerType::TabletServer); + assert_eq!(ServerType::from_type_id(-1), ServerType::Unknown); + assert_eq!(ServerType::from_type_id(99), ServerType::Unknown); + } + #[test] fn test_get_server_nodes_with_coordinator_and_tablets() { let cluster = Cluster::new( diff --git a/crates/fluss/src/cluster/mod.rs b/crates/fluss/src/cluster/mod.rs index 863f8ed5..d300f7f9 100644 --- a/crates/fluss/src/cluster/mod.rs +++ b/crates/fluss/src/cluster/mod.rs @@ -36,12 +36,14 @@ pub struct ServerNode { impl ServerNode { pub fn new(id: i32, host: String, port: u32, server_type: ServerType) -> ServerNode { + let uid = match &server_type { + ServerType::CoordinatorServer => format!("cs-{id}"), + ServerType::TabletServer => format!("ts-{id}"), + ServerType::Unknown => format!("unknown-{host}:{port}"), + }; ServerNode { id, - uid: match server_type { - ServerType::CoordinatorServer => format!("cs-{id}"), - ServerType::TabletServer => format!("ts-{id}"), - }, + uid, host, port, server_type, @@ -77,6 +79,7 @@ impl ServerNode { pub enum ServerType { TabletServer, CoordinatorServer, + Unknown, } impl ServerType { @@ -84,14 +87,15 @@ impl ServerType { match self { ServerType::CoordinatorServer => 1, ServerType::TabletServer => 2, + ServerType::Unknown => -1, } } - pub fn from_type_id(type_id: i32) -> Option { + pub fn from_type_id(type_id: i32) -> ServerType { match type_id { - 1 => Some(ServerType::CoordinatorServer), - 2 => Some(ServerType::TabletServer), - _ => None, + 1 => ServerType::CoordinatorServer, + 2 => ServerType::TabletServer, + _ => ServerType::Unknown, } } } @@ -101,6 +105,7 @@ impl fmt::Display for ServerType { match self { ServerType::TabletServer => write!(f, "TabletServer"), ServerType::CoordinatorServer => write!(f, "CoordinatorServer"), + ServerType::Unknown => write!(f, "Unknown"), } } } diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 762c7ea9..a1ea4657 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -156,20 +156,21 @@ fn validate_server_type( expected: &ServerType, response_server_type: Option, ) -> Result<(), Error> { + if *expected == ServerType::Unknown { + return Ok(()); + } + // For forward-compat with servers that do not populate `server_type`, validation is skipped. let Some(type_id) = response_server_type else { return Ok(()); }; let actual = ServerType::from_type_id(type_id); - if actual.as_ref() == Some(expected) { + if &actual == expected { return Ok(()); } - let actual_desc = actual - .map(|t| t.to_string()) - .unwrap_or_else(|| format!("Unknown(type_id={type_id})")); Err(Error::InvalidServerType { message: format!( - "Expected server type {expected} but the server advertised {actual_desc}. \ + "Expected server type {expected} but the server advertised {actual}. \ The client may be talking to the wrong endpoint \ (e.g. coordinator vs tablet server)." ), @@ -1268,6 +1269,21 @@ mod tests { ) .is_ok() ); + assert!( + validate_server_type( + &ServerType::Unknown, + Some(ServerType::CoordinatorServer.to_type_id()), + ) + .is_ok() + ); + assert!( + validate_server_type( + &ServerType::Unknown, + Some(ServerType::TabletServer.to_type_id()), + ) + .is_ok() + ); + assert!(validate_server_type(&ServerType::Unknown, Some(99),).is_ok()); // Mismatch: connected to a coordinator while expecting a tablet server // (and vice versa). @@ -1290,8 +1306,8 @@ mod tests { )); validate_server_type(&ServerType::TabletServer, None).ok(); - // Unknown / unmapped type id still fails, with the raw id surfaced so - // operators can diagnose protocol drift. + // Unknown / unmapped type id is treated as Unknown, which still fails + // when the client expects a concrete server type. assert!(matches!( validate_server_type(&ServerType::CoordinatorServer, Some(99),), Err(Error::InvalidServerType { .. }) diff --git a/crates/fluss/tests/integration/admin.rs b/crates/fluss/tests/integration/admin.rs index 0860cbef..dfb00b0d 100644 --- a/crates/fluss/tests/integration/admin.rs +++ b/crates/fluss/tests/integration/admin.rs @@ -18,6 +18,8 @@ #[cfg(test)] mod admin_test { use crate::integration::utils::get_shared_cluster; + use fluss::client::FlussConnection; + use fluss::config::Config; use fluss::error::FlussError; use fluss::metadata::{ DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, PartitionSpec, Schema, @@ -521,6 +523,39 @@ mod admin_test { } } + #[tokio::test] + async fn test_bootstrap_from_tablet_server() { + let cluster = get_shared_cluster(); + let bootstrap_servers = cluster + .plaintext_tablet_bootstrap_server(0) + .expect("shared cluster should expose tablet server bootstrap") + .to_string(); + + let connection = FlussConnection::new(Config { + writer_acks: "all".to_string(), + bootstrap_servers, + ..Default::default() + }) + .await + .expect("should bootstrap from tablet server"); + let admin = connection.get_admin().expect("should get admin"); + + let nodes = admin + .get_server_nodes() + .await + .expect("should get server nodes"); + + let has_coordinator = nodes + .iter() + .any(|n| *n.server_type() == fluss::ServerType::CoordinatorServer); + assert!(has_coordinator, "Expected a coordinator server node"); + + let has_tablet = nodes + .iter() + .any(|n| *n.server_type() == fluss::ServerType::TabletServer); + assert!(has_tablet, "Expected a tablet server node"); + } + #[tokio::test] async fn test_error_table_not_partitioned() { let cluster = get_shared_cluster(); diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 7ef39577..49c54823 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -99,9 +99,12 @@ Complete API reference for the Fluss Rust client. | `fn id(&self) -> i32` | Server node ID | | `fn host(&self) -> &str` | Hostname of the server | | `fn port(&self) -> u32` | Port number | -| `fn server_type(&self) -> &ServerType` | Server type (`CoordinatorServer` or `TabletServer`) | +| `fn server_type(&self) -> &ServerType` | Server type (`CoordinatorServer`, `TabletServer`, or `Unknown`) | | `fn uid(&self) -> &str` | Unique identifier (e.g. `"cs-0"`, `"ts-1"`) | +`ServerType::Unknown` is used when the client has not yet determined the endpoint +type, such as bootstrap endpoints. + ## `FlussTable<'a>` | Method | Description |