Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions connect-lib/bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ async fn run() -> n0_error::Result<()> {
Commands::List => {
let node = ListenNode::new(repo.clone()).await?;
let service = TunnelService::new(datum.clone(), node.clone());
let tunnels = service.list_active().await?;
let output: Vec<serde_json::Value> = tunnels
let (tunnels, orphans) = service.list_active_with_orphans().await?;
let mut output: Vec<serde_json::Value> = tunnels
.iter()
.map(|t| {
let status = if t.accepted && t.programmed && t.connector_metadata_programmed {
Expand All @@ -221,17 +221,34 @@ async fn run() -> n0_error::Result<()> {
} else {
"pending"
};
let connector = if t.connector_ready { "ok" } else { "stale" };
serde_json::json!({
"type": "tunnel",
"id": t.id,
"label": t.label,
"endpoint": t.endpoint,
"status": status,
"enabled": t.enabled,
"hostnames": t.hostnames
"hostnames": t.hostnames,
"connector": connector,
"connector_name": t.connector_name
})
})
.collect();
for o in &orphans {
let connector = if o.ready { "ok" } else { "stale" };
output.push(serde_json::json!({
"type": "orphaned_connector",
"id": o.name,
"label": "",
"endpoint": "",
"status": "orphaned",
"enabled": false,
"hostnames": [],
"connector": connector,
"connector_name": o.name
}));
}
if json {
println!("{}", serde_json::to_string_pretty(&output).anyerr()?);
} else {
Expand Down Expand Up @@ -287,6 +304,8 @@ async fn run() -> n0_error::Result<()> {
accepted: false,
programmed: false,
connector_metadata_programmed: false,
connector_ready: false,
connector_name: None,
}));
ep
}
Expand Down
4 changes: 2 additions & 2 deletions connect-lib/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub use project_control_plane::ProjectControlPlaneClient;
pub use repo::{MissingConnectDir, Repo};
pub use state::{Advertisment, SelectedContext, State, StateWrapper, TcpProxyData};
pub use tunnels::{
ProgressStep, ProgressStepKind, StepStatus, TunnelDeleteOutcome, TunnelProgress, TunnelService,
TunnelSummary,
OrphanedConnector, ProgressStep, ProgressStepKind, StepStatus, TunnelDeleteOutcome,
TunnelProgress, TunnelService, TunnelSummary,
};

/// The root domain for datum connect URLs to subdomain from. A proxy URL will
Expand Down
101 changes: 99 additions & 2 deletions connect-lib/lib/src/tunnels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ pub struct TunnelSummary {
pub accepted: bool,
pub programmed: bool,
pub connector_metadata_programmed: bool,
/// True when the backing Connector's `Ready` condition is `True`.
/// False means the connector lease has expired and the tunnel agent is
/// offline (traffic will be dropped).
pub connector_ready: bool,
/// The name of the Connector resource backing this tunnel.
pub connector_name: Option<String>,
}

/// A Connector that exists in the project but is not referenced by any tunnel.
/// These are typically left over from a previous tunnel that was deleted without
/// a clean shutdown (e.g. the agent was killed before it could run cleanup).
#[derive(Debug, Clone, PartialEq)]
pub struct OrphanedConnector {
pub name: String,
/// True when the connector's `Ready` condition is `True`.
pub ready: bool,
}

impl TunnelSummary {
Expand Down Expand Up @@ -509,10 +525,23 @@ impl TunnelService {
}

pub async fn list_project(&self, project_id: &str) -> Result<Vec<TunnelSummary>> {
let (tunnels, _) = self.list_project_with_orphans(project_id).await?;
Ok(tunnels)
}

/// Like [`list_project`] but also returns any [`OrphanedConnector`]s found
/// in the project — connectors that exist but are not referenced by any
/// tunnel's HTTPProxy. These are typically left over from a previous tunnel
/// that exited uncleanly.
pub async fn list_project_with_orphans(
&self,
project_id: &str,
) -> Result<(Vec<TunnelSummary>, Vec<OrphanedConnector>)> {
let pcp = self.datum.project_control_plane_client(project_id).await?;
let client = pcp.client();
let proxies: Api<HTTPProxy> = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE);
let ads: Api<ConnectorAdvertisement> = Api::namespaced(client, DEFAULT_PCP_NAMESPACE);
let ads: Api<ConnectorAdvertisement> = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE);
let connectors_api: Api<Connector> = Api::namespaced(client, DEFAULT_PCP_NAMESPACE);

let proxy_list = proxies
.list(&ListParams::default())
Expand All @@ -529,7 +558,29 @@ impl TunnelService {
.filter_map(|item| item.metadata.name.clone().map(|name| (name, item)))
.collect();

// Fetch all connectors so we can check their Ready condition and detect orphans.
let connector_list = connectors_api
.list(&ListParams::default())
.await
.std_context("Failed to list Connector objects")?;
let connector_ready_by_name: std::collections::HashMap<String, bool> = connector_list
.items
.iter()
.filter_map(|c| {
let name = c.metadata.name.clone()?;
let ready = condition_is_true(
c.status
.as_ref()
.and_then(|s| s.conditions.as_deref()),
CONNECTOR_CONDITION_READY,
);
Some((name, ready))
})
.collect();

let mut tunnels = Vec::new();
let mut referenced_connector_names = std::collections::HashSet::new();

for proxy in proxy_list.items {
let Some(name) = proxy.metadata.name.clone() else {
continue;
Expand Down Expand Up @@ -568,6 +619,14 @@ impl TunnelService {
HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED,
);
let enabled = enabled_by_name.contains_key(&name);
let connector_name = proxy_connector_name(&proxy);
let connector_ready = connector_name
.as_deref()
.and_then(|cn| connector_ready_by_name.get(cn).copied())
.unwrap_or(false);
if let Some(cn) = &connector_name {
referenced_connector_names.insert(cn.clone());
}
tunnels.push(TunnelSummary {
id: name,
label,
Expand All @@ -577,10 +636,37 @@ impl TunnelService {
accepted,
programmed,
connector_metadata_programmed,
connector_ready,
connector_name,
});
}

Ok(tunnels)
// Any connector not referenced by a tunnel is orphaned.
let orphans: Vec<OrphanedConnector> = connector_list
.items
.into_iter()
.filter_map(|c| {
let name = c.metadata.name?;
if referenced_connector_names.contains(&name) {
return None;
}
let ready = *connector_ready_by_name.get(&name).unwrap_or(&false);
Some(OrphanedConnector { name, ready })
})
.collect();

Ok((tunnels, orphans))
}

/// Like [`list_active`] but also returns orphaned connectors.
/// Used by the `list` subcommand to show stale connector warnings.
pub async fn list_active_with_orphans(
&self,
) -> Result<(Vec<TunnelSummary>, Vec<OrphanedConnector>)> {
let Some(selected) = self.datum.selected_context() else {
return Ok((Vec::new(), Vec::new()));
};
self.list_project_with_orphans(&selected.project_id).await
}

pub async fn create_project(
Expand Down Expand Up @@ -788,6 +874,10 @@ impl TunnelService {
.and_then(|status| status.conditions.as_deref()),
HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED,
),
// connector_ready is not checked at creation time; the heartbeat
// agent will establish the lease shortly after.
connector_ready: false,
connector_name: Some(connector_name.clone()),
})
}

Expand Down Expand Up @@ -868,6 +958,7 @@ impl TunnelService {
}

let enabled = existing_ad.is_some();
let connector_name = proxy_connector_name(&existing);

let summary = TunnelSummary {
id: tunnel_id.to_string(),
Expand Down Expand Up @@ -896,6 +987,8 @@ impl TunnelService {
.and_then(|status| status.conditions.as_deref()),
HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED,
),
connector_ready: false,
connector_name,
};

if !self.publish_tickets
Expand Down Expand Up @@ -1004,6 +1097,8 @@ impl TunnelService {
.std_context("Failed to delete ConnectorAdvertisement")?;
}

let connector_name = proxy_connector_name(&proxy);

let summary = TunnelSummary {
id: tunnel_id.to_string(),
label,
Expand Down Expand Up @@ -1031,6 +1126,8 @@ impl TunnelService {
.and_then(|status| status.conditions.as_deref()),
HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED,
),
connector_ready: false,
connector_name,
};

if !self.publish_tickets
Expand Down
39 changes: 32 additions & 7 deletions connect-plugin/internal/output/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,30 @@ func ParseJSON(data []byte) (map[string]interface{}, error) {
return result, nil
}

// RenderTable takes a JSON array of tunnel objects and renders them
// RenderTable takes a JSON array of tunnel/connector objects and renders them
// as a human-readable table to the given writer.
// Expected input: [{"id":"...","label":"...","endpoint":"...","status":"...","enabled":true,"hostnames":["..."]}]
// Expected input: [{"type":"tunnel","id":"...","label":"...","endpoint":"...","status":"...","enabled":true,"hostnames":["..."],"connector":"ok|stale"}]
// Orphaned connectors have type "orphaned_connector" and are rendered in a
// separate section below the tunnel rows.
func RenderTable(data []byte, w *tabwriter.Writer) error {
var tunnels []map[string]interface{}
if err := json.Unmarshal(data, &tunnels); err != nil {
var items []map[string]interface{}
if err := json.Unmarshal(data, &items); err != nil {
return fmt.Errorf("failed to parse tunnel list: %w", err)
}

var tunnels []map[string]interface{}
var orphans []map[string]interface{}
for _, item := range items {
if fmt.Sprintf("%v", item["type"]) == "orphaned_connector" {
orphans = append(orphans, item)
} else {
tunnels = append(tunnels, item)
}
}

// Header
fmt.Fprintln(w, "ID\tLABEL\tENDPOINT\tSTATUS\tENABLED\tHOSTNAMES")
fmt.Fprintln(w, "--\t-----\t--------\t------\t-------\t---------")
fmt.Fprintln(w, "ID\tLABEL\tENDPOINT\tSTATUS\tENABLED\tCONNECTOR\tHOSTNAMES")
fmt.Fprintln(w, "--\t-----\t--------\t------\t-------\t---------\t---------")

for _, t := range tunnels {
id := fmt.Sprintf("%v", t["id"])
Expand All @@ -57,6 +69,7 @@ func RenderTable(data []byte, w *tabwriter.Writer) error {
if enabledVal, ok := t["enabled"].(bool); ok && enabledVal {
enabled = "yes"
}
connector := fmt.Sprintf("%v", t["connector"])
hostnames := "\u2014"
if hnArr, ok := t["hostnames"].([]interface{}); ok && len(hnArr) > 0 {
hnStrs := make([]string, len(hnArr))
Expand All @@ -65,7 +78,19 @@ func RenderTable(data []byte, w *tabwriter.Writer) error {
}
hostnames = strings.Join(hnStrs, ",")
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", id, label, endpoint, status, enabled, hostnames)
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", id, label, endpoint, status, enabled, connector, hostnames)
}

if len(orphans) > 0 {
fmt.Fprintln(w, "")
fmt.Fprintln(w, "ORPHANED CONNECTORS (no tunnel — safe to delete)")
fmt.Fprintln(w, "NAME\tSTATUS")
fmt.Fprintln(w, "----\t------")
for _, o := range orphans {
name := fmt.Sprintf("%v", o["id"])
connector := fmt.Sprintf("%v", o["connector"])
fmt.Fprintf(w, "%s\t%s\n", name, connector)
}
}

return w.Flush()
Expand Down
Loading