diff --git a/crates/fluss-test-cluster/src/lib.rs b/crates/fluss-test-cluster/src/lib.rs index 041c21b0..a6d68638 100644 --- a/crates/fluss-test-cluster/src/lib.rs +++ b/crates/fluss-test-cluster/src/lib.rs @@ -18,13 +18,66 @@ use fluss::client::FlussConnection; use fluss::config::Config; use std::collections::HashMap; +use std::future::Future; use std::mem::ManuallyDrop; use std::sync::Arc; use std::time::Duration; +use testcontainers::bollard::Docker; +use testcontainers::bollard::query_parameters::{ + ListContainersOptionsBuilder, RemoveContainerOptionsBuilder, +}; use testcontainers::core::ContainerPort; +use testcontainers::core::client::docker_client_instance; use testcontainers::runners::AsyncRunner; use testcontainers::{ContainerAsync, GenericImage, ImageExt}; +/// testcontainers' own shared `bollard::Docker` — the same client (and daemon +/// resolution) it uses for `image.start()`, so teardown and existence checks act on +/// the exact daemon `start` created containers on. `None` (with a warning) if the +/// client is unreachable. +async fn docker_client() -> Option { + match docker_client_instance().await { + Ok(docker) => Some(docker), + Err(e) => { + eprintln!("warning: cannot reach the testcontainers Docker client: {e}"); + None + } + } +} + +/// Force-removes a container by name. Best-effort: a 404 (already gone) is success; +/// other errors warn rather than panic. +async fn force_remove_container(docker: &Docker, name: &str) { + let options = RemoveContainerOptionsBuilder::default().force(true).build(); + match docker.remove_container(name, Some(options)).await { + Ok(()) + | Err(testcontainers::bollard::errors::Error::DockerResponseServerError { + status_code: 404, + .. + }) => {} + Err(e) => eprintln!("warning: failed to remove container '{name}': {e}"), + } +} + +/// Runs an async teardown future to completion from a sync caller. `stop()` runs +/// both inside a tokio runtime (async tests) and from a runtime-less atexit handler, +/// so we use a dedicated thread with its own runtime instead of blocking the caller. +fn run_blocking(future: F) -> F::Output +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build tokio runtime for container teardown") + .block_on(future) + }) + .join() + .expect("container teardown thread panicked") +} + pub const FLUSS_IMAGE: &str = env!("FLUSS_IMAGE"); pub const FLUSS_VERSION: &str = env!("FLUSS_VERSION"); pub const ZOOKEEPER_IMAGE: &str = env!("ZOOKEEPER_IMAGE"); @@ -165,21 +218,32 @@ impl FlussTestingClusterBuilder { } } - fn all_containers_exist(&self) -> bool { - self.container_names().iter().all(|name| { - std::process::Command::new("docker") - .args(["ps", "-q", "--filter", &format!("name=^{}$", name)]) - .output() - .map(|o| !String::from_utf8_lossy(&o.stdout).trim().is_empty()) - .unwrap_or(false) - }) + async fn all_containers_exist(&self) -> bool { + let Some(docker) = docker_client().await else { + return false; + }; + for name in self.container_names() { + // Anchored exact-name match; `all(false)` = running only, so a stopped + // leftover counts as absent and gets recreated. + let mut filters = HashMap::new(); + filters.insert("name".to_string(), vec![format!("^{name}$")]); + let options = ListContainersOptionsBuilder::default() + .all(false) + .filters(&filters) + .build(); + match docker.list_containers(Some(options)).await { + Ok(list) if !list.is_empty() => continue, + _ => return false, + } + } + true } async fn start_all_containers(&mut self) -> Vec> { - for name in &self.container_names() { - let _ = std::process::Command::new("docker") - .args(["rm", "-f", name]) - .output(); + if let Some(docker) = docker_client().await { + for name in self.container_names() { + force_remove_container(&docker, &name).await; + } } self.inject_sasl_conf(); @@ -222,7 +286,7 @@ impl FlussTestingClusterBuilder { /// Containers outlive the process. Clean up via `stop_cluster()`. /// Idempotent: if the cluster is already running, returns its info. pub async fn build_detached(&mut self) -> ClusterInfo { - if !self.all_containers_exist() { + if !self.all_containers_exist().await { let containers = self.start_all_containers().await; let _ = ManuallyDrop::new(containers); } @@ -396,11 +460,14 @@ pub struct FlussTestingCluster { impl FlussTestingCluster { pub fn stop(&self) { - for name in &self.container_names { - let _ = std::process::Command::new("docker") - .args(["rm", "-f", name]) - .output(); - } + let names = self.container_names.clone(); + run_blocking(async move { + if let Some(docker) = docker_client().await { + for name in &names { + force_remove_container(&docker, name).await; + } + } + }); if let Some(ref dir) = self.remote_data_dir { let _ = std::fs::remove_dir_all(dir); } @@ -493,22 +560,47 @@ impl FlussTestingCluster { } pub fn stop_cluster(name: &str) { - let prefixes = [ - format!("zookeeper-{}", name), - format!("coordinator-server-{}", name), - format!("tablet-server-{}-", name), - ]; - for prefix in &prefixes { - if let Ok(output) = std::process::Command::new("docker") - .args(["ps", "-aq", "--filter", &format!("name={}", prefix)]) - .output() - { - let ids = String::from_utf8_lossy(&output.stdout); - for id in ids.split_whitespace() { - let _ = std::process::Command::new("docker") - .args(["rm", "-f", id]) - .output(); - } + let name = name.to_string(); + run_blocking(async move { stop_cluster_async(&name).await }); +} + +/// Force-removes every container of cluster `name` (matched by name prefix) on the +/// testcontainers daemon — the same daemon `build_detached` started them on. +async fn stop_cluster_async(name: &str) { + let Some(docker) = docker_client().await else { + return; + }; + + // Multiple values for the `name` filter are OR'd by the daemon; these prefixes + // cover zookeeper, coordinator, and any number of tablet servers. + let mut filters = HashMap::new(); + filters.insert( + "name".to_string(), + vec![ + format!("zookeeper-{name}"), + format!("coordinator-server-{name}"), + format!("tablet-server-{name}-"), + ], + ); + let options = ListContainersOptionsBuilder::default() + .all(true) + .filters(&filters) + .build(); + + let containers = match docker.list_containers(Some(options)).await { + Ok(containers) => containers, + Err(e) => { + eprintln!("warning: failed to list cluster containers: {e}"); + return; + } + }; + + for container in containers { + // Prefer the container name (daemon prefixes it with '/'); fall back to id. + if let Some(cname) = container.names.and_then(|n| n.into_iter().next()) { + force_remove_container(&docker, cname.trim_start_matches('/')).await; + } else if let Some(id) = container.id { + force_remove_container(&docker, &id).await; } } }