From b9e4d7945d434be08be23018da0a292f01fbe84c Mon Sep 17 00:00:00 2001 From: Drew Raines Date: Sat, 20 Jun 2026 22:30:11 +0000 Subject: [PATCH] feat: warn on stale/orphaned connectors in tunnel list Tunnels whose connector lease has expired show 'stale' in a new CONNECTOR column. Connectors that have no associated tunnel (left over from an unclean shutdown) are shown in a separate ORPHANED CONNECTORS section at the bottom of the table. Changes: - TunnelSummary gains connector_ready and connector_name fields - New OrphanedConnector struct - list_project_with_orphans() fetches all Connectors in one extra API call, joins Ready status onto each tunnel, and collects any Connector not referenced by a tunnel as an orphan - Commands::List uses list_active_with_orphans(); emits connector field ('ok'/'stale') and orphaned_connector entries - RenderTable adds CONNECTOR column and orphan section --- connect-lib/bin/src/main.rs | 25 +++++- connect-lib/lib/src/lib.rs | 4 +- connect-lib/lib/src/tunnels.rs | 101 +++++++++++++++++++++- connect-plugin/internal/output/convert.go | 39 +++++++-- 4 files changed, 155 insertions(+), 14 deletions(-) diff --git a/connect-lib/bin/src/main.rs b/connect-lib/bin/src/main.rs index e2b5c0c..41262b8 100644 --- a/connect-lib/bin/src/main.rs +++ b/connect-lib/bin/src/main.rs @@ -210,8 +210,8 @@ async fn run() -> n0_error::Result<()> { Commands::List => { let node = ListenNode::new(repo.clone()).await?; let service = TunnelService::new(datum.clone(), node.clone()); - let tunnels = service.list_active().await?; - let output: Vec = tunnels + let (tunnels, orphans) = service.list_active_with_orphans().await?; + let mut output: Vec = tunnels .iter() .map(|t| { let status = if t.accepted && t.programmed && t.connector_metadata_programmed { @@ -221,6 +221,7 @@ async fn run() -> n0_error::Result<()> { } else { "pending" }; + let connector = if t.connector_ready { "ok" } else { "stale" }; serde_json::json!({ "type": "tunnel", "id": t.id, @@ -228,10 +229,26 @@ async fn run() -> n0_error::Result<()> { "endpoint": t.endpoint, "status": status, "enabled": t.enabled, - "hostnames": t.hostnames + "hostnames": t.hostnames, + "connector": connector, + "connector_name": t.connector_name }) }) .collect(); + for o in &orphans { + let connector = if o.ready { "ok" } else { "stale" }; + output.push(serde_json::json!({ + "type": "orphaned_connector", + "id": o.name, + "label": "", + "endpoint": "", + "status": "orphaned", + "enabled": false, + "hostnames": [], + "connector": connector, + "connector_name": o.name + })); + } if json { println!("{}", serde_json::to_string_pretty(&output).anyerr()?); } else { @@ -287,6 +304,8 @@ async fn run() -> n0_error::Result<()> { accepted: false, programmed: false, connector_metadata_programmed: false, + connector_ready: false, + connector_name: None, })); ep } diff --git a/connect-lib/lib/src/lib.rs b/connect-lib/lib/src/lib.rs index ffabaa6..ea4c36e 100644 --- a/connect-lib/lib/src/lib.rs +++ b/connect-lib/lib/src/lib.rs @@ -19,8 +19,8 @@ pub use project_control_plane::ProjectControlPlaneClient; pub use repo::{MissingConnectDir, Repo}; pub use state::{Advertisment, SelectedContext, State, StateWrapper, TcpProxyData}; pub use tunnels::{ - ProgressStep, ProgressStepKind, StepStatus, TunnelDeleteOutcome, TunnelProgress, TunnelService, - TunnelSummary, + OrphanedConnector, ProgressStep, ProgressStepKind, StepStatus, TunnelDeleteOutcome, + TunnelProgress, TunnelService, TunnelSummary, }; /// The root domain for datum connect URLs to subdomain from. A proxy URL will diff --git a/connect-lib/lib/src/tunnels.rs b/connect-lib/lib/src/tunnels.rs index 8e5bd58..6772109 100644 --- a/connect-lib/lib/src/tunnels.rs +++ b/connect-lib/lib/src/tunnels.rs @@ -67,6 +67,22 @@ pub struct TunnelSummary { pub accepted: bool, pub programmed: bool, pub connector_metadata_programmed: bool, + /// True when the backing Connector's `Ready` condition is `True`. + /// False means the connector lease has expired and the tunnel agent is + /// offline (traffic will be dropped). + pub connector_ready: bool, + /// The name of the Connector resource backing this tunnel. + pub connector_name: Option, +} + +/// A Connector that exists in the project but is not referenced by any tunnel. +/// These are typically left over from a previous tunnel that was deleted without +/// a clean shutdown (e.g. the agent was killed before it could run cleanup). +#[derive(Debug, Clone, PartialEq)] +pub struct OrphanedConnector { + pub name: String, + /// True when the connector's `Ready` condition is `True`. + pub ready: bool, } impl TunnelSummary { @@ -509,10 +525,23 @@ impl TunnelService { } pub async fn list_project(&self, project_id: &str) -> Result> { + let (tunnels, _) = self.list_project_with_orphans(project_id).await?; + Ok(tunnels) + } + + /// Like [`list_project`] but also returns any [`OrphanedConnector`]s found + /// in the project — connectors that exist but are not referenced by any + /// tunnel's HTTPProxy. These are typically left over from a previous tunnel + /// that exited uncleanly. + pub async fn list_project_with_orphans( + &self, + project_id: &str, + ) -> Result<(Vec, Vec)> { let pcp = self.datum.project_control_plane_client(project_id).await?; let client = pcp.client(); let proxies: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - let ads: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + let ads: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); + let connectors_api: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); let proxy_list = proxies .list(&ListParams::default()) @@ -529,7 +558,29 @@ impl TunnelService { .filter_map(|item| item.metadata.name.clone().map(|name| (name, item))) .collect(); + // Fetch all connectors so we can check their Ready condition and detect orphans. + let connector_list = connectors_api + .list(&ListParams::default()) + .await + .std_context("Failed to list Connector objects")?; + let connector_ready_by_name: std::collections::HashMap = connector_list + .items + .iter() + .filter_map(|c| { + let name = c.metadata.name.clone()?; + let ready = condition_is_true( + c.status + .as_ref() + .and_then(|s| s.conditions.as_deref()), + CONNECTOR_CONDITION_READY, + ); + Some((name, ready)) + }) + .collect(); + let mut tunnels = Vec::new(); + let mut referenced_connector_names = std::collections::HashSet::new(); + for proxy in proxy_list.items { let Some(name) = proxy.metadata.name.clone() else { continue; @@ -568,6 +619,14 @@ impl TunnelService { HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED, ); let enabled = enabled_by_name.contains_key(&name); + let connector_name = proxy_connector_name(&proxy); + let connector_ready = connector_name + .as_deref() + .and_then(|cn| connector_ready_by_name.get(cn).copied()) + .unwrap_or(false); + if let Some(cn) = &connector_name { + referenced_connector_names.insert(cn.clone()); + } tunnels.push(TunnelSummary { id: name, label, @@ -577,10 +636,37 @@ impl TunnelService { accepted, programmed, connector_metadata_programmed, + connector_ready, + connector_name, }); } - Ok(tunnels) + // Any connector not referenced by a tunnel is orphaned. + let orphans: Vec = connector_list + .items + .into_iter() + .filter_map(|c| { + let name = c.metadata.name?; + if referenced_connector_names.contains(&name) { + return None; + } + let ready = *connector_ready_by_name.get(&name).unwrap_or(&false); + Some(OrphanedConnector { name, ready }) + }) + .collect(); + + Ok((tunnels, orphans)) + } + + /// Like [`list_active`] but also returns orphaned connectors. + /// Used by the `list` subcommand to show stale connector warnings. + pub async fn list_active_with_orphans( + &self, + ) -> Result<(Vec, Vec)> { + let Some(selected) = self.datum.selected_context() else { + return Ok((Vec::new(), Vec::new())); + }; + self.list_project_with_orphans(&selected.project_id).await } pub async fn create_project( @@ -788,6 +874,10 @@ impl TunnelService { .and_then(|status| status.conditions.as_deref()), HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED, ), + // connector_ready is not checked at creation time; the heartbeat + // agent will establish the lease shortly after. + connector_ready: false, + connector_name: Some(connector_name.clone()), }) } @@ -868,6 +958,7 @@ impl TunnelService { } let enabled = existing_ad.is_some(); + let connector_name = proxy_connector_name(&existing); let summary = TunnelSummary { id: tunnel_id.to_string(), @@ -896,6 +987,8 @@ impl TunnelService { .and_then(|status| status.conditions.as_deref()), HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED, ), + connector_ready: false, + connector_name, }; if !self.publish_tickets @@ -1004,6 +1097,8 @@ impl TunnelService { .std_context("Failed to delete ConnectorAdvertisement")?; } + let connector_name = proxy_connector_name(&proxy); + let summary = TunnelSummary { id: tunnel_id.to_string(), label, @@ -1031,6 +1126,8 @@ impl TunnelService { .and_then(|status| status.conditions.as_deref()), HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED, ), + connector_ready: false, + connector_name, }; if !self.publish_tickets diff --git a/connect-plugin/internal/output/convert.go b/connect-plugin/internal/output/convert.go index e4f2e7f..e876cb0 100644 --- a/connect-plugin/internal/output/convert.go +++ b/connect-plugin/internal/output/convert.go @@ -35,18 +35,30 @@ func ParseJSON(data []byte) (map[string]interface{}, error) { return result, nil } -// RenderTable takes a JSON array of tunnel objects and renders them +// RenderTable takes a JSON array of tunnel/connector objects and renders them // as a human-readable table to the given writer. -// Expected input: [{"id":"...","label":"...","endpoint":"...","status":"...","enabled":true,"hostnames":["..."]}] +// Expected input: [{"type":"tunnel","id":"...","label":"...","endpoint":"...","status":"...","enabled":true,"hostnames":["..."],"connector":"ok|stale"}] +// Orphaned connectors have type "orphaned_connector" and are rendered in a +// separate section below the tunnel rows. func RenderTable(data []byte, w *tabwriter.Writer) error { - var tunnels []map[string]interface{} - if err := json.Unmarshal(data, &tunnels); err != nil { + var items []map[string]interface{} + if err := json.Unmarshal(data, &items); err != nil { return fmt.Errorf("failed to parse tunnel list: %w", err) } + var tunnels []map[string]interface{} + var orphans []map[string]interface{} + for _, item := range items { + if fmt.Sprintf("%v", item["type"]) == "orphaned_connector" { + orphans = append(orphans, item) + } else { + tunnels = append(tunnels, item) + } + } + // Header - fmt.Fprintln(w, "ID\tLABEL\tENDPOINT\tSTATUS\tENABLED\tHOSTNAMES") - fmt.Fprintln(w, "--\t-----\t--------\t------\t-------\t---------") + fmt.Fprintln(w, "ID\tLABEL\tENDPOINT\tSTATUS\tENABLED\tCONNECTOR\tHOSTNAMES") + fmt.Fprintln(w, "--\t-----\t--------\t------\t-------\t---------\t---------") for _, t := range tunnels { id := fmt.Sprintf("%v", t["id"]) @@ -57,6 +69,7 @@ func RenderTable(data []byte, w *tabwriter.Writer) error { if enabledVal, ok := t["enabled"].(bool); ok && enabledVal { enabled = "yes" } + connector := fmt.Sprintf("%v", t["connector"]) hostnames := "\u2014" if hnArr, ok := t["hostnames"].([]interface{}); ok && len(hnArr) > 0 { hnStrs := make([]string, len(hnArr)) @@ -65,7 +78,19 @@ func RenderTable(data []byte, w *tabwriter.Writer) error { } hostnames = strings.Join(hnStrs, ",") } - fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", id, label, endpoint, status, enabled, hostnames) + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", id, label, endpoint, status, enabled, connector, hostnames) + } + + if len(orphans) > 0 { + fmt.Fprintln(w, "") + fmt.Fprintln(w, "ORPHANED CONNECTORS (no tunnel — safe to delete)") + fmt.Fprintln(w, "NAME\tSTATUS") + fmt.Fprintln(w, "----\t------") + for _, o := range orphans { + name := fmt.Sprintf("%v", o["id"]) + connector := fmt.Sprintf("%v", o["connector"]) + fmt.Fprintf(w, "%s\t%s\n", name, connector) + } } return w.Flush()