Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ path = "src/main.rs"
# behind a shared multi-thread tokio runtime. The `arrow` feature backs result
# decode; `upload_stream` carries `content_length` (sized body, not chunked, so
# the server can fast-fail an oversized upload — see src/sdk.rs::Api::upload_stream).
hotdata = { version = "0.1.1", features = ["arrow"] }
hotdata = { version = "0.2.0", features = ["arrow"] }
# Shared multi-thread runtime for the sync wrapper; block_on is called
# concurrently from rayon worker threads (see src/indexes.rs). `sync` backs the
# mpsc channel that bridges the blocking upload reader into an async byte stream.
Expand Down
193 changes: 191 additions & 2 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,45 @@ pub(crate) fn fetch_arrow_result(api: &Api, result_id: &str) -> QueryResponse {
arrow_result_to_query_response(result, result_id.to_owned())
}

/// Resolve an inline (HTTP 200) query response for display.
///
/// A non-truncated response carries the whole result in `rows`, so it's shown
/// as-is. A truncated one (#640) carries only a bounded preview — the full set
/// is persisted under `result_id` — so follow it to the full result via Arrow,
/// the same path the async (202) branch uses. Truncation rides on result *size*
/// while `async_after_ms` gates on *time*, so a fast-completing but large query
/// returns a truncated inline 200; without this follow the CLI would silently
/// print only the preview rows.
///
/// If a truncated response has no `result_id` (persistence could not be
/// initiated — see the SDK's `warning` field), the full result is unfetchable,
/// so fall back to the preview and surface a warning rather than failing.
fn resolve_inline(api: &Api, resp: hotdata::models::QueryResponse) -> QueryResponse {
if !resp.truncated {
return query_response_from_sdk(resp);
}
match resp.result_id.clone().flatten() {
Some(result_id) => {
// The Arrow fetch returns only schema + rows; carry the query-level
// warning and execution time the inline response reported, which
// `arrow_result_to_query_response` otherwise hardcodes to None.
let mut full = fetch_arrow_result(api, &result_id);
full.warning = resp.warning.flatten();
full.execution_time_ms = Some(resp.execution_time_ms.max(0) as u64);
full
}
None => {
let mut preview = query_response_from_sdk(resp);
let note = "result truncated to a preview; full result unavailable (persistence not initiated)";
preview.warning = Some(match preview.warning {
Some(w) => format!("{w}; {note}"),
None => note.to_string(),
});
preview
}
}
}

pub fn execute(sql: &str, workspace_id: &str, database: Option<&str>, format: &str) {
let api = Api::new(Some(workspace_id));

Expand All @@ -223,9 +262,11 @@ pub fn execute(sql: &str, workspace_id: &str, database: Option<&str>, format: &s
spinner.finish_and_clear();

let async_resp = match outcome {
// Completed within async_after_ms — inline results.
// Completed within async_after_ms — inline results. A large result can
// come back truncated to a preview even on this fast path, so follow it
// to the full set (resolve_inline) rather than printing the preview.
hotdata::QueryOutcome::Inline(resp) => {
print_result(&query_response_from_sdk(resp), format);
print_result(&resolve_inline(&api, resp), format);
return;
}
// Still running — poll the query run, then fetch the result as Arrow.
Expand Down Expand Up @@ -397,3 +438,151 @@ pub fn print_result(result: &QueryResponse, format: &str) {
_ => unreachable!(),
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::sdk::Api;
use std::sync::Arc;

/// A truncated inline 200: one preview row standing in for a larger result.
/// `result_id` uses the wire double-option (`Some(None)` = field present but
/// null, i.e. persistence not initiated).
fn truncated_preview(result_id: Option<&str>) -> hotdata::models::QueryResponse {
let mut resp = hotdata::models::QueryResponse::new(
vec!["id".to_string()], // columns
5, // execution_time_ms
vec![false], // nullable
1, // preview_row_count
"qrun_1".to_string(), // query_run_id
1, // row_count (deprecated, == preview)
vec![vec![serde_json::json!(1)]], // rows (preview only)
true, // truncated
);
resp.result_id = Some(result_id.map(|s| s.to_string()));
resp
}

#[test]
fn resolve_inline_follows_truncated_result_to_full_arrow() {
use arrow::array::{Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::ipc::writer::StreamWriter;

// Full result has 3 rows — more than the 1-row inline preview.
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
)
.unwrap();
let mut ipc: Vec<u8> = Vec::new();
{
let mut writer = StreamWriter::try_new(&mut ipc, &schema).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}

let mut server = mockito::Server::new();
let m = server
.mock("GET", "/v1/results/res_1")
.match_query(mockito::Matcher::UrlEncoded(
"format".into(),
"arrow".into(),
))
.with_status(200)
.with_header("content-type", "application/vnd.apache.arrow.stream")
.with_body(ipc)
.create();

// The inline response carries a query-level warning and execution time
// (execution_time_ms=5 from `truncated_preview`) that must survive the
// Arrow follow, which otherwise hardcodes them to None.
let mut resp = truncated_preview(Some("res_1"));
resp.warning = Some(Some("approximate aggregate".to_string()));

let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1"));
let resolved = resolve_inline(&api, resp);

// Followed the truncated preview to the full 3-row result.
assert_eq!(resolved.row_count, 3);
assert_eq!(resolved.rows.len(), 3);
assert_eq!(resolved.result_id.as_deref(), Some("res_1"));
// Inline warning + timing carried through, not dropped by the fetch.
assert_eq!(resolved.warning.as_deref(), Some("approximate aggregate"));
assert_eq!(resolved.execution_time_ms, Some(5));
m.assert();
}

#[test]
fn resolve_inline_returns_untruncated_preview_without_fetching() {
// truncated=false short-circuits before any network call; point the Api
// at a server with no mocks so an erroneous fetch would fail loudly.
let server = mockito::Server::new();
let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1"));

let mut resp = hotdata::models::QueryResponse::new(
vec!["x".to_string()],
5,
vec![false],
2,
"qrun_2".to_string(),
2,
vec![vec![serde_json::json!(1)], vec![serde_json::json!(2)]],
false, // not truncated
);
resp.result_id = Some(Some("res_2".to_string()));

let resolved = resolve_inline(&api, resp);
assert_eq!(resolved.row_count, 2);
assert_eq!(
resolved.rows,
vec![vec![serde_json::json!(1)], vec![serde_json::json!(2)]]
);
assert_eq!(resolved.result_id.as_deref(), Some("res_2"));
}

#[test]
fn resolve_inline_truncated_without_result_id_warns_and_keeps_preview() {
// Truncated but persistence never started (result_id is null): the full
// result is unfetchable, so keep the preview and surface a warning.
let server = mockito::Server::new();
let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1"));

let resolved = resolve_inline(&api, truncated_preview(None));
assert_eq!(resolved.row_count, 1);
assert_eq!(resolved.rows.len(), 1);
assert!(
resolved
.warning
.as_deref()
.unwrap_or("")
.contains("truncated")
);
}

#[test]
fn resolve_inline_preserves_existing_warning_when_following_fails() {
// A truncated response with no result_id often arrives with an SDK
// warning explaining why persistence didn't start. The truncation note
// is appended to it, not allowed to clobber it.
let server = mockito::Server::new();
let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1"));

let mut resp = truncated_preview(None);
resp.warning = Some(Some(
"result persistence could not be initiated".to_string(),
));

let resolved = resolve_inline(&api, resp);
let warning = resolved.warning.as_deref().unwrap_or("");
assert!(
warning.contains("result persistence could not be initiated"),
"original warning dropped: {warning:?}"
);
assert!(
warning.contains("truncated"),
"truncation note missing: {warning:?}"
);
}
}
Loading