Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/elixir/lib/fluss/server_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule Fluss.ServerNode do
uid: String.t(),
host: String.t(),
port: non_neg_integer(),
server_type: :tablet_server | :coordinator_server
server_type: :tablet_server | :coordinator_server | :unknown
}

@spec url(t()) :: String.t()
Expand Down
2 changes: 2 additions & 0 deletions bindings/elixir/native/fluss_nif/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::sync::Arc;
pub enum NifServerType {
TabletServer,
CoordinatorServer,
Unknown,
}

#[derive(NifStruct)]
Expand All @@ -52,6 +53,7 @@ impl NifServerNode {
server_type: match node.server_type() {
ServerType::TabletServer => NifServerType::TabletServer,
ServerType::CoordinatorServer => NifServerType::CoordinatorServer,
ServerType::Unknown => NifServerType::Unknown,
},
}
}
Expand Down
21 changes: 21 additions & 0 deletions crates/fluss-test-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ impl FlussTestingClusterBuilder {
}
}

fn plaintext_tablet_bootstrap_servers(&self) -> HashMap<u16, String> {
let base_port = self.plain_client_port.unwrap_or(self.coordinator_host_port);
(0..self.number_of_tablet_servers)
.map(|server_id| {
(
server_id,
format!("127.0.0.1:{}", base_port + 1 + server_id),
)
})
.collect()
}

fn all_containers_exist(&self) -> bool {
self.container_names().iter().all(|name| {
std::process::Command::new("docker")
Expand Down Expand Up @@ -206,12 +218,14 @@ impl FlussTestingClusterBuilder {
}

let (bootstrap_servers, sasl_bootstrap_servers) = self.bootstrap_addresses();
let plaintext_tablet_bootstrap_servers = self.plaintext_tablet_bootstrap_servers();

FlussTestingCluster {
zookeeper,
coordinator_server,
tablet_servers,
bootstrap_servers,
plaintext_tablet_bootstrap_servers,
sasl_bootstrap_servers,
remote_data_dir: self.remote_data_dir.clone(),
sasl_users: self.sasl_users.clone(),
Expand Down Expand Up @@ -388,6 +402,7 @@ pub struct FlussTestingCluster {
coordinator_server: Arc<ContainerAsync<GenericImage>>,
tablet_servers: HashMap<u16, Arc<ContainerAsync<GenericImage>>>,
bootstrap_servers: String,
plaintext_tablet_bootstrap_servers: HashMap<u16, String>,
sasl_bootstrap_servers: Option<String>,
remote_data_dir: Option<std::path::PathBuf>,
sasl_users: Vec<(String, String)>,
Expand All @@ -414,6 +429,12 @@ impl FlussTestingCluster {
&self.bootstrap_servers
}

pub fn plaintext_tablet_bootstrap_server(&self, server_id: u16) -> Option<&str> {
self.plaintext_tablet_bootstrap_servers
.get(&server_id)
.map(String::as_str)
}

pub async fn get_fluss_connection(&self) -> FlussConnection {
let config = Config {
writer_acks: "all".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/client/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Metadata {
-1,
socket_address.ip().to_string(),
socket_address.port() as u32,
ServerType::CoordinatorServer,
ServerType::Unknown,
);
let con = connections.get_connection(&server_node).await?;

Expand Down
14 changes: 10 additions & 4 deletions crates/fluss/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ pub struct ServerNode {

impl ServerNode {
pub fn new(id: i32, host: String, port: u32, server_type: ServerType) -> ServerNode {
let uid = match &server_type {
ServerType::CoordinatorServer => format!("cs-{id}"),
ServerType::TabletServer => format!("ts-{id}"),
ServerType::Unknown => format!("unknown-{host}:{port}"),
};
ServerNode {
id,
uid: match server_type {
ServerType::CoordinatorServer => format!("cs-{id}"),
ServerType::TabletServer => format!("ts-{id}"),
},
uid,
host,
port,
server_type,
Expand Down Expand Up @@ -77,20 +79,23 @@ impl ServerNode {
pub enum ServerType {
TabletServer,
CoordinatorServer,
Unknown,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should update api-reference for this too

}

impl ServerType {
pub fn to_type_id(&self) -> i32 {
match self {
ServerType::CoordinatorServer => 1,
ServerType::TabletServer => 2,
ServerType::Unknown => -1,
}
}

pub fn from_type_id(type_id: i32) -> Option<ServerType> {
match type_id {
1 => Some(ServerType::CoordinatorServer),
2 => Some(ServerType::TabletServer),
-1 => Some(ServerType::Unknown),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for any unrecognized id, should we return None or Unknown?

_ => None,
}
}
Expand All @@ -101,6 +106,7 @@ impl fmt::Display for ServerType {
match self {
ServerType::TabletServer => write!(f, "TabletServer"),
ServerType::CoordinatorServer => write!(f, "CoordinatorServer"),
ServerType::Unknown => write!(f, "Unknown"),
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions crates/fluss/src/rpc/server_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ fn validate_server_type(
expected: &ServerType,
response_server_type: Option<i32>,
) -> Result<(), Error> {
if *expected == ServerType::Unknown {
return Ok(());
}

// For forward-compat with servers that do not populate `server_type`, validation is skipped.
let Some(type_id) = response_server_type else {
return Ok(());
Expand Down Expand Up @@ -1268,6 +1272,21 @@ mod tests {
)
.is_ok()
);
assert!(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should a simple integration test case be made in existing IT testing with unknown type/bootstrapping against tablet server?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an integration test covering bootstrapping from a tablet server endpoint. The test uses the shared cluster's tablet plaintext listener as bootstrap_servers, creates a FlussConnection, and verifies server node discovery returns both coordinator and tablet nodes.

validate_server_type(
&ServerType::Unknown,
Some(ServerType::CoordinatorServer.to_type_id()),
)
.is_ok()
);
assert!(
validate_server_type(
&ServerType::Unknown,
Some(ServerType::TabletServer.to_type_id()),
)
.is_ok()
);
assert!(validate_server_type(&ServerType::Unknown, Some(99),).is_ok());

// Mismatch: connected to a coordinator while expecting a tablet server
// (and vice versa).
Expand Down
35 changes: 35 additions & 0 deletions crates/fluss/tests/integration/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#[cfg(test)]
mod admin_test {
use crate::integration::utils::get_shared_cluster;
use fluss::client::FlussConnection;
use fluss::config::Config;
use fluss::error::FlussError;
use fluss::metadata::{
DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, PartitionSpec, Schema,
Expand Down Expand Up @@ -521,6 +523,39 @@ mod admin_test {
}
}

#[tokio::test]
async fn test_bootstrap_from_tablet_server() {
let cluster = get_shared_cluster();
let bootstrap_servers = cluster
.plaintext_tablet_bootstrap_server(0)
.expect("shared cluster should expose tablet server bootstrap")
.to_string();

let connection = FlussConnection::new(Config {
writer_acks: "all".to_string(),
bootstrap_servers,
..Default::default()
})
.await
.expect("should bootstrap from tablet server");
let admin = connection.get_admin().expect("should get admin");

let nodes = admin
.get_server_nodes()
.await
.expect("should get server nodes");

let has_coordinator = nodes
.iter()
.any(|n| *n.server_type() == fluss::ServerType::CoordinatorServer);
assert!(has_coordinator, "Expected a coordinator server node");

let has_tablet = nodes
.iter()
.any(|n| *n.server_type() == fluss::ServerType::TabletServer);
assert!(has_tablet, "Expected a tablet server node");
}

#[tokio::test]
async fn test_error_table_not_partitioned() {
let cluster = get_shared_cluster();
Expand Down
Loading