Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ fn list_database_summaries(api: &Api) -> Result<Vec<DatabaseSummary>, ApiError>
.map(|r| r.databases.into_iter().map(DatabaseSummary::from).collect())
}

/// List the ids of every managed database in the workspace.
///
/// Exposed for the whole-workspace `indexes list` scan (#168): a managed
/// database's connection is hidden from `connections list` and its tables are
/// absent from the unscoped `information_schema` enumeration, so that scan
/// rediscovers managed databases here and resolves each one's
/// `default_connection_id` via [`get_database`]. The list summary omits the
/// connection id, hence ids only.
pub(crate) fn list_database_ids(api: &Api) -> Result<Vec<String>, ApiError> {
list_database_summaries(api).map(|dbs| dbs.into_iter().map(|d| d.id).collect())
}

fn fetch_database(api: &Api, id: &str) -> Database {
get_database(api, id).unwrap_or_else(|e| e.exit())
}
Expand Down
294 changes: 278 additions & 16 deletions src/indexes.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::databases;
use crate::sdk::{Api, block, block_with_wakeup, none_if_404};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -108,34 +109,106 @@ fn scan_connection_id<'a>(
.unwrap_or(label)
}

/// One table to scan for indexes, paired with the connection id its per-table
/// index call must address. The `table.connection` field carries the display
/// label (a connection name, or a managed database's internal `__db_*` label),
/// which can differ from the real `conn_id` used for the API call.
struct ScanTarget {
conn_id: String,
table: InfoTable,
}

/// Resolve the `default_connection_id` of every managed database in the
/// workspace, in parallel.
///
/// These are exactly the connections the whole-workspace `information_schema`
/// enumeration omits and `connections list` hides (#168), so the unscoped scan
/// can't discover them any other way. `databases list` summaries don't carry the
/// connection id, so each database needs a `databases get`; a database deleted
/// between the list and the get (404) is skipped, any other error surfaces
/// loudly to match the rest of this path.
fn managed_db_connection_ids(api: &Api) -> Vec<String> {
let ids = databases::list_database_ids(api).unwrap_or_else(|e| e.exit());
ids.par_iter()
.filter_map(|id| {
none_if_404(databases::get_database(api, id))
.unwrap_or_else(|e| e.exit())
.map(|db| db.default_connection_id)
})
.collect()
}

/// Build the per-table scan list for a whole-workspace (unscoped) `indexes
/// list`.
///
/// The workspace-wide `information_schema` enumeration returns only
/// regular-connection tables — managed-database catalogs never appear there, and
/// `connections list` hides their connections (#168). So managed databases are
/// discovered separately via [`managed_db_connection_ids`] and each is scanned
/// with a connection-scoped `information_schema` call, exactly like the
/// `--connection-id` path. The two table sets are disjoint: a managed database's
/// connection is never returned by `connections list`.
fn workspace_scan_targets(api: &Api, schema: Option<&str>, table: Option<&str>) -> Vec<ScanTarget> {
// Regular connections: one workspace-wide enumeration, label (= connection
// name) mapped back to its id, falling back to the label itself (#161).
let name_to_id = connection_lookup(api);
let mut targets: Vec<ScanTarget> = collect_tables(api, None, schema, table)
.into_iter()
.map(|t| {
let conn_id = scan_connection_id(None, &t.connection, &name_to_id).to_string();
ScanTarget { conn_id, table: t }
})
.collect();

// Managed databases: discover their hidden connections, then scan each
// scoped (the per-connection enumeration is what surfaces `__db_*` tables).
let db_conns = managed_db_connection_ids(api);
let managed: Vec<ScanTarget> = db_conns
.par_iter()
.flat_map(|conn| {
collect_tables(api, Some(conn), schema, table)
.into_iter()
.map(|t| ScanTarget {
conn_id: conn.clone(),
table: t,
})
.collect::<Vec<_>>()
})
.collect();
targets.extend(managed);
targets
}

/// Gather index rows across a connection's (or the workspace's) tables — the
/// `indexes list` path when no full `connection.schema.table` triple is given.
///
/// Enumerates tables via `information_schema`, then fetches each table's indexes
/// by the connection id chosen in [`scan_connection_id`] (the supplied
/// `--connection-id` wins, fixing the database-scoped case in #161). Skipped
/// connections / missing tables surface as no rows for that table, not an error.
/// With a `--connection-id`, enumerates that connection's tables and fetches
/// each table's indexes against it (the database-scoped case fixed in #161).
/// Without one, [`workspace_scan_targets`] assembles the list across both
/// regular connections and managed databases (#168). Skipped connections /
/// missing tables surface as no rows for that table, not an error.
fn collect_connection_wide(
api: &Api,
connection_id: Option<&str>,
schema: Option<&str>,
table: Option<&str>,
) -> Vec<IndexRow> {
let tables = collect_tables(api, connection_id, schema, table);
// See `scan_connection_id`: a supplied `--connection-id` is used directly,
// so the name→id map is only needed (and only worth a round-trip) for the
// unscoped, list-everything case (#161).
let conn_ids = if connection_id.is_some() {
HashMap::new()
} else {
connection_lookup(api)
let targets = match connection_id {
Some(cid) => collect_tables(api, Some(cid), schema, table)
.into_iter()
.map(|t| ScanTarget {
conn_id: cid.to_string(),
table: t,
})
.collect(),
None => workspace_scan_targets(api, schema, table),
};
let per_table: Vec<(String, Vec<Index>)> = tables
let per_table: Vec<(String, Vec<Index>)> = targets
.par_iter()
.map(|t| {
let cid = scan_connection_id(connection_id, &t.connection, &conn_ids);
.map(|tg| {
let t = &tg.table;
let full = format!("{}.{}.{}", t.connection, t.schema, t.table);
let indexes = list_one_table_scan(api, cid, &t.schema, &t.table);
let indexes = list_one_table_scan(api, &tg.conn_id, &t.schema, &t.table);
(full, indexes)
})
.collect();
Expand Down Expand Up @@ -701,6 +774,195 @@ mod tests {
assert_eq!(rows[0].table.as_deref(), Some("__db_abc.public.vec_mid"));
}

#[test]
fn collect_connection_wide_unscoped_discovers_managed_db_indexes() {
// #168: unscoped `indexes list` in a managed-only workspace (the real
// production shape — `connections list` is empty because it hides
// database-scoped connections, and the workspace-wide
// `information_schema` returns no managed tables). The scan must
// rediscover the managed database via `databases list` → `databases get`
// → default_connection_id, then a connection-scoped `information_schema`
// surfaces its `__db_*` table and the per-table indexes call resolves.
let mut server = mockito::Server::new();
// No regular connections.
let conns = server
.mock("GET", "/v1/connections")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"connections":[]}"#)
.create();
// Workspace-wide enumeration (no connection_id query) → no tables.
let info_ws = server
.mock("GET", "/v1/information_schema")
.match_query(mockito::Matcher::Exact(String::new()))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"count":0,"limit":100,"tables":[],"has_more":false,"next_cursor":null}"#)
.create();
// The managed database is discovered here.
let dbs = server
.mock("GET", "/v1/databases")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"databases":[{"id":"dbidabc","name":"airbnb","default_catalog":"default"}]}"#,
)
.create();
let db = server
.mock("GET", "/v1/databases/dbidabc")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"id":"dbidabc","name":"airbnb","default_catalog":"default",
"default_connection_id":"conn-managed","attachments":[]}"#,
)
.create();
// Connection-scoped enumeration surfaces the managed table.
let info_scoped = server
.mock("GET", "/v1/information_schema")
.match_query(mockito::Matcher::UrlEncoded(
"connection_id".into(),
"conn-managed".into(),
))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"count":1,"limit":100,"tables":[
{"connection":"__db_abc","schema":"public","table":"listings","synced":true}
],"has_more":false,"next_cursor":null}"#,
)
.create();
let idx = server
.mock(
"GET",
"/v1/connections/conn-managed/tables/public/listings/indexes",
)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"indexes":[{"index_name":"listings_desc_bm25","index_type":"bm25",
"columns":["description"],"metric":null,"status":"ready",
"created_at":"2020-01-01T00:00:00Z","updated_at":"2020-01-01T00:00:00Z"}]}"#,
)
.create();

let api = Api::test_new(&server.url(), "k", Some("ws"));
let rows = collect_connection_wide(&api, None, None, None);
conns.assert();
info_ws.assert();
dbs.assert();
db.assert();
info_scoped.assert();
idx.assert();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].inner.index_name, "listings_desc_bm25");
assert_eq!(rows[0].table.as_deref(), Some("__db_abc.public.listings"));
}

#[test]
fn collect_connection_wide_unscoped_unions_regular_and_managed() {
// The unscoped scan unions regular-connection tables (workspace-wide
// enumeration, label = connection name mapped to its id) with managed
// databases (discovered separately, #168). The two sets are disjoint, so
// both indexes appear exactly once.
let mut server = mockito::Server::new();
let conns = server
.mock("GET", "/v1/connections")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"connections":[{"id":"conn-reg","name":"Warehouse","source_type":"postgres"}]}"#,
)
.create();
// Workspace-wide enumeration returns the regular connection's table.
let info_ws = server
.mock("GET", "/v1/information_schema")
.match_query(mockito::Matcher::Exact(String::new()))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"count":1,"limit":100,"tables":[
{"connection":"Warehouse","schema":"public","table":"events","synced":true}
],"has_more":false,"next_cursor":null}"#,
)
.create();
let reg_idx = server
.mock(
"GET",
"/v1/connections/conn-reg/tables/public/events/indexes",
)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"indexes":[{"index_name":"events_bm25","index_type":"bm25",
"columns":["body"],"metric":null,"status":"ready",
"created_at":"2020-01-01T00:00:00Z","updated_at":"2020-01-01T00:00:00Z"}]}"#,
)
.create();
let dbs = server
.mock("GET", "/v1/databases")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"databases":[{"id":"dbidabc","name":"airbnb","default_catalog":"default"}]}"#,
)
.create();
let db = server
.mock("GET", "/v1/databases/dbidabc")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"id":"dbidabc","name":"airbnb","default_catalog":"default",
"default_connection_id":"conn-managed","attachments":[]}"#,
)
.create();
let info_scoped = server
.mock("GET", "/v1/information_schema")
.match_query(mockito::Matcher::UrlEncoded(
"connection_id".into(),
"conn-managed".into(),
))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"count":1,"limit":100,"tables":[
{"connection":"__db_abc","schema":"public","table":"listings","synced":true}
],"has_more":false,"next_cursor":null}"#,
)
.create();
let managed_idx = server
.mock(
"GET",
"/v1/connections/conn-managed/tables/public/listings/indexes",
)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"indexes":[{"index_name":"listings_desc_bm25","index_type":"bm25",
"columns":["description"],"metric":null,"status":"ready",
"created_at":"2020-01-01T00:00:00Z","updated_at":"2020-01-01T00:00:00Z"}]}"#,
)
.create();

let api = Api::test_new(&server.url(), "k", Some("ws"));
let mut rows = collect_connection_wide(&api, None, None, None);
conns.assert();
info_ws.assert();
reg_idx.assert();
dbs.assert();
db.assert();
info_scoped.assert();
managed_idx.assert();
rows.sort_by(|a, b| a.inner.index_name.cmp(&b.inner.index_name));
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].inner.index_name, "events_bm25");
assert_eq!(rows[0].table.as_deref(), Some("Warehouse.public.events"));
assert_eq!(rows[1].inner.index_name, "listings_desc_bm25");
assert_eq!(rows[1].table.as_deref(), Some("__db_abc.public.listings"));
}

#[test]
fn collect_tables_single_page() {
let mut server = mockito::Server::new();
Expand Down
Loading