Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 126 additions & 34 deletions crates/fluss-test-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,66 @@
use fluss::client::FlussConnection;
use fluss::config::Config;
use std::collections::HashMap;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::sync::Arc;
use std::time::Duration;
use testcontainers::bollard::Docker;
use testcontainers::bollard::query_parameters::{
ListContainersOptionsBuilder, RemoveContainerOptionsBuilder,
};
use testcontainers::core::ContainerPort;
use testcontainers::core::client::docker_client_instance;
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, GenericImage, ImageExt};

/// testcontainers' own shared `bollard::Docker` — the same client (and daemon
/// resolution) it uses for `image.start()`, so teardown and existence checks act on
/// the exact daemon `start` created containers on. `None` (with a warning) if the
/// client is unreachable.
async fn docker_client() -> Option<Docker> {
match docker_client_instance().await {
Ok(docker) => Some(docker),
Err(e) => {
eprintln!("warning: cannot reach the testcontainers Docker client: {e}");
None
}
}
}

/// Force-removes a container by name. Best-effort: a 404 (already gone) is success;
/// other errors warn rather than panic.
async fn force_remove_container(docker: &Docker, name: &str) {
let options = RemoveContainerOptionsBuilder::default().force(true).build();
match docker.remove_container(name, Some(options)).await {
Ok(())
| Err(testcontainers::bollard::errors::Error::DockerResponseServerError {
status_code: 404,
..
}) => {}
Err(e) => eprintln!("warning: failed to remove container '{name}': {e}"),
}
}

/// Runs an async teardown future to completion from a sync caller. `stop()` runs
/// both inside a tokio runtime (async tests) and from a runtime-less atexit handler,
/// so we use a dedicated thread with its own runtime instead of blocking the caller.
fn run_blocking<F>(future: F) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime for container teardown")
.block_on(future)
})
.join()
.expect("container teardown thread panicked")
}

pub const FLUSS_IMAGE: &str = env!("FLUSS_IMAGE");
pub const FLUSS_VERSION: &str = env!("FLUSS_VERSION");
pub const ZOOKEEPER_IMAGE: &str = env!("ZOOKEEPER_IMAGE");
Expand Down Expand Up @@ -165,21 +218,32 @@ impl FlussTestingClusterBuilder {
}
}

fn all_containers_exist(&self) -> bool {
self.container_names().iter().all(|name| {
std::process::Command::new("docker")
.args(["ps", "-q", "--filter", &format!("name=^{}$", name)])
.output()
.map(|o| !String::from_utf8_lossy(&o.stdout).trim().is_empty())
.unwrap_or(false)
})
async fn all_containers_exist(&self) -> bool {
let Some(docker) = docker_client().await else {
return false;
};
for name in self.container_names() {
// Anchored exact-name match; `all(false)` = running only, so a stopped
// leftover counts as absent and gets recreated.
let mut filters = HashMap::new();
filters.insert("name".to_string(), vec![format!("^{name}$")]);
let options = ListContainersOptionsBuilder::default()
.all(false)
.filters(&filters)
.build();
match docker.list_containers(Some(options)).await {
Ok(list) if !list.is_empty() => continue,
_ => return false,
}
}
Comment on lines +225 to +238

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Seems like we're making a list_containers per-container. Anyway to make this more efficient?

true
}

async fn start_all_containers(&mut self) -> Vec<ContainerAsync<GenericImage>> {
for name in &self.container_names() {
let _ = std::process::Command::new("docker")
.args(["rm", "-f", name])
.output();
if let Some(docker) = docker_client().await {
for name in self.container_names() {
force_remove_container(&docker, &name).await;
}
}
self.inject_sasl_conf();

Expand Down Expand Up @@ -222,7 +286,7 @@ impl FlussTestingClusterBuilder {
/// Containers outlive the process. Clean up via `stop_cluster()`.
/// Idempotent: if the cluster is already running, returns its info.
pub async fn build_detached(&mut self) -> ClusterInfo {
if !self.all_containers_exist() {
if !self.all_containers_exist().await {
let containers = self.start_all_containers().await;
let _ = ManuallyDrop::new(containers);
}
Expand Down Expand Up @@ -396,11 +460,14 @@ pub struct FlussTestingCluster {

impl FlussTestingCluster {
pub fn stop(&self) {
for name in &self.container_names {
let _ = std::process::Command::new("docker")
.args(["rm", "-f", name])
.output();
}
let names = self.container_names.clone();
run_blocking(async move {
if let Some(docker) = docker_client().await {
for name in &names {
force_remove_container(&docker, name).await;
}
}
});
if let Some(ref dir) = self.remote_data_dir {
let _ = std::fs::remove_dir_all(dir);
}
Expand Down Expand Up @@ -493,22 +560,47 @@ impl FlussTestingCluster {
}

pub fn stop_cluster(name: &str) {
let prefixes = [
format!("zookeeper-{}", name),
format!("coordinator-server-{}", name),
format!("tablet-server-{}-", name),
];
for prefix in &prefixes {
if let Ok(output) = std::process::Command::new("docker")
.args(["ps", "-aq", "--filter", &format!("name={}", prefix)])
.output()
{
let ids = String::from_utf8_lossy(&output.stdout);
for id in ids.split_whitespace() {
let _ = std::process::Command::new("docker")
.args(["rm", "-f", id])
.output();
}
let name = name.to_string();
run_blocking(async move { stop_cluster_async(&name).await });
}

/// Force-removes every container of cluster `name` (matched by name prefix) on the
/// testcontainers daemon — the same daemon `build_detached` started them on.
async fn stop_cluster_async(name: &str) {
let Some(docker) = docker_client().await else {
return;
};

// Multiple values for the `name` filter are OR'd by the daemon; these prefixes
// cover zookeeper, coordinator, and any number of tablet servers.
let mut filters = HashMap::new();
filters.insert(
"name".to_string(),
vec![
format!("zookeeper-{name}"),
format!("coordinator-server-{name}"),
format!("tablet-server-{name}-"),
],
);
let options = ListContainersOptionsBuilder::default()
.all(true)
.filters(&filters)
.build();

let containers = match docker.list_containers(Some(options)).await {
Ok(containers) => containers,
Err(e) => {
eprintln!("warning: failed to list cluster containers: {e}");
return;
}
};
Comment on lines +576 to +596

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This looks similar to parts of all_containers_exist, can these be refactored?


for container in containers {
// Prefer the container name (daemon prefixes it with '/'); fall back to id.
if let Some(cname) = container.names.and_then(|n| n.into_iter().next()) {
force_remove_container(&docker, cname.trim_start_matches('/')).await;
} else if let Some(id) = container.id {
force_remove_container(&docker, &id).await;
}
}
}
Loading