diff --git a/Cargo.lock b/Cargo.lock index 5c5d66f..59836d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1165,9 +1165,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hotdata" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96620c427754f23659e79a45b4ae3e9d25e927e476e68af60683f46e460491cc" +checksum = "17ba716cff96f209c53979615cac927933b43ce3d905ede86efadee28990b8db" dependencies = [ "arrow-array", "arrow-ipc", diff --git a/Cargo.toml b/Cargo.toml index 34dd2c1..f3f2b94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ path = "src/main.rs" # behind a shared multi-thread tokio runtime. The `arrow` feature backs result # decode; `upload_stream` carries `content_length` (sized body, not chunked, so # the server can fast-fail an oversized upload — see src/sdk.rs::Api::upload_stream). -hotdata = { version = "0.1.1", features = ["arrow"] } +hotdata = { version = "0.2.0", features = ["arrow"] } # Shared multi-thread runtime for the sync wrapper; block_on is called # concurrently from rayon worker threads (see src/indexes.rs). `sync` backs the # mpsc channel that bridges the blocking upload reader into an async byte stream. diff --git a/src/query.rs b/src/query.rs index f9dd783..20f6491 100644 --- a/src/query.rs +++ b/src/query.rs @@ -205,6 +205,45 @@ pub(crate) fn fetch_arrow_result(api: &Api, result_id: &str) -> QueryResponse { arrow_result_to_query_response(result, result_id.to_owned()) } +/// Resolve an inline (HTTP 200) query response for display. +/// +/// A non-truncated response carries the whole result in `rows`, so it's shown +/// as-is. A truncated one (#640) carries only a bounded preview — the full set +/// is persisted under `result_id` — so follow it to the full result via Arrow, +/// the same path the async (202) branch uses. Truncation rides on result *size* +/// while `async_after_ms` gates on *time*, so a fast-completing but large query +/// returns a truncated inline 200; without this follow the CLI would silently +/// print only the preview rows. +/// +/// If a truncated response has no `result_id` (persistence could not be +/// initiated — see the SDK's `warning` field), the full result is unfetchable, +/// so fall back to the preview and surface a warning rather than failing. +fn resolve_inline(api: &Api, resp: hotdata::models::QueryResponse) -> QueryResponse { + if !resp.truncated { + return query_response_from_sdk(resp); + } + match resp.result_id.clone().flatten() { + Some(result_id) => { + // The Arrow fetch returns only schema + rows; carry the query-level + // warning and execution time the inline response reported, which + // `arrow_result_to_query_response` otherwise hardcodes to None. + let mut full = fetch_arrow_result(api, &result_id); + full.warning = resp.warning.flatten(); + full.execution_time_ms = Some(resp.execution_time_ms.max(0) as u64); + full + } + None => { + let mut preview = query_response_from_sdk(resp); + let note = "result truncated to a preview; full result unavailable (persistence not initiated)"; + preview.warning = Some(match preview.warning { + Some(w) => format!("{w}; {note}"), + None => note.to_string(), + }); + preview + } + } +} + pub fn execute(sql: &str, workspace_id: &str, database: Option<&str>, format: &str) { let api = Api::new(Some(workspace_id)); @@ -223,9 +262,11 @@ pub fn execute(sql: &str, workspace_id: &str, database: Option<&str>, format: &s spinner.finish_and_clear(); let async_resp = match outcome { - // Completed within async_after_ms — inline results. + // Completed within async_after_ms — inline results. A large result can + // come back truncated to a preview even on this fast path, so follow it + // to the full set (resolve_inline) rather than printing the preview. hotdata::QueryOutcome::Inline(resp) => { - print_result(&query_response_from_sdk(resp), format); + print_result(&resolve_inline(&api, resp), format); return; } // Still running — poll the query run, then fetch the result as Arrow. @@ -397,3 +438,151 @@ pub fn print_result(result: &QueryResponse, format: &str) { _ => unreachable!(), } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::sdk::Api; + use std::sync::Arc; + + /// A truncated inline 200: one preview row standing in for a larger result. + /// `result_id` uses the wire double-option (`Some(None)` = field present but + /// null, i.e. persistence not initiated). + fn truncated_preview(result_id: Option<&str>) -> hotdata::models::QueryResponse { + let mut resp = hotdata::models::QueryResponse::new( + vec!["id".to_string()], // columns + 5, // execution_time_ms + vec![false], // nullable + 1, // preview_row_count + "qrun_1".to_string(), // query_run_id + 1, // row_count (deprecated, == preview) + vec![vec![serde_json::json!(1)]], // rows (preview only) + true, // truncated + ); + resp.result_id = Some(result_id.map(|s| s.to_string())); + resp + } + + #[test] + fn resolve_inline_follows_truncated_result_to_full_arrow() { + use arrow::array::{Int64Array, RecordBatch}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::ipc::writer::StreamWriter; + + // Full result has 3 rows — more than the 1-row inline preview. + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let mut ipc: Vec = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut ipc, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + + let mut server = mockito::Server::new(); + let m = server + .mock("GET", "/v1/results/res_1") + .match_query(mockito::Matcher::UrlEncoded( + "format".into(), + "arrow".into(), + )) + .with_status(200) + .with_header("content-type", "application/vnd.apache.arrow.stream") + .with_body(ipc) + .create(); + + // The inline response carries a query-level warning and execution time + // (execution_time_ms=5 from `truncated_preview`) that must survive the + // Arrow follow, which otherwise hardcodes them to None. + let mut resp = truncated_preview(Some("res_1")); + resp.warning = Some(Some("approximate aggregate".to_string())); + + let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); + let resolved = resolve_inline(&api, resp); + + // Followed the truncated preview to the full 3-row result. + assert_eq!(resolved.row_count, 3); + assert_eq!(resolved.rows.len(), 3); + assert_eq!(resolved.result_id.as_deref(), Some("res_1")); + // Inline warning + timing carried through, not dropped by the fetch. + assert_eq!(resolved.warning.as_deref(), Some("approximate aggregate")); + assert_eq!(resolved.execution_time_ms, Some(5)); + m.assert(); + } + + #[test] + fn resolve_inline_returns_untruncated_preview_without_fetching() { + // truncated=false short-circuits before any network call; point the Api + // at a server with no mocks so an erroneous fetch would fail loudly. + let server = mockito::Server::new(); + let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); + + let mut resp = hotdata::models::QueryResponse::new( + vec!["x".to_string()], + 5, + vec![false], + 2, + "qrun_2".to_string(), + 2, + vec![vec![serde_json::json!(1)], vec![serde_json::json!(2)]], + false, // not truncated + ); + resp.result_id = Some(Some("res_2".to_string())); + + let resolved = resolve_inline(&api, resp); + assert_eq!(resolved.row_count, 2); + assert_eq!( + resolved.rows, + vec![vec![serde_json::json!(1)], vec![serde_json::json!(2)]] + ); + assert_eq!(resolved.result_id.as_deref(), Some("res_2")); + } + + #[test] + fn resolve_inline_truncated_without_result_id_warns_and_keeps_preview() { + // Truncated but persistence never started (result_id is null): the full + // result is unfetchable, so keep the preview and surface a warning. + let server = mockito::Server::new(); + let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); + + let resolved = resolve_inline(&api, truncated_preview(None)); + assert_eq!(resolved.row_count, 1); + assert_eq!(resolved.rows.len(), 1); + assert!( + resolved + .warning + .as_deref() + .unwrap_or("") + .contains("truncated") + ); + } + + #[test] + fn resolve_inline_preserves_existing_warning_when_following_fails() { + // A truncated response with no result_id often arrives with an SDK + // warning explaining why persistence didn't start. The truncation note + // is appended to it, not allowed to clobber it. + let server = mockito::Server::new(); + let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); + + let mut resp = truncated_preview(None); + resp.warning = Some(Some( + "result persistence could not be initiated".to_string(), + )); + + let resolved = resolve_inline(&api, resp); + let warning = resolved.warning.as_deref().unwrap_or(""); + assert!( + warning.contains("result persistence could not be initiated"), + "original warning dropped: {warning:?}" + ); + assert!( + warning.contains("truncated"), + "truncation note missing: {warning:?}" + ); + } +}