Skip to content
276 changes: 274 additions & 2 deletions src/datasets/api.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::collections::HashSet;

use anyhow::{bail, Result};
use anyhow::{bail, Context, Result};
use reqwest::header::HeaderMap;
use serde::{Deserialize, Serialize};
use serde_json::{json, Map, Value};
use urlencoding::encode;

use crate::http::ApiClient;
use crate::http::{ApiClient, HttpError};

use super::records::DATASET_RECORD_FIELDS;

Expand Down Expand Up @@ -72,11 +73,51 @@ impl Dataset {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatasetSnapshot {
pub id: String,
pub name: String,
pub dataset_id: String,
pub description: Option<String>,
pub xact_id: String,
pub created: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateDatasetSnapshotResult {
pub dataset_snapshot: DatasetSnapshot,
pub found_existing: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatasetRestorePreview {
pub rows_to_restore: usize,
pub rows_to_delete: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatasetRestoreResult {
pub xact_id: String,
pub rows_restored: usize,
pub rows_deleted: usize,
}

#[derive(Debug, Deserialize)]
struct ListResponse {
objects: Vec<Dataset>,
}

#[derive(Debug, Deserialize)]
struct ListResponseGeneric<T> {
objects: Vec<T>,
}

#[derive(Debug, Deserialize)]
struct DatasetHeadXactRow {
#[serde(rename = "_xact_id", default)]
xact_id: Option<String>,
}

pub async fn list_datasets(client: &ApiClient, project_id: &str) -> Result<Vec<Dataset>> {
let path = format!(
"/v1/dataset?org_name={}&project_id={}",
Expand Down Expand Up @@ -216,6 +257,87 @@ pub async fn delete_dataset(client: &ApiClient, dataset_id: &str) -> Result<()>
client.delete(&path).await
}

pub async fn list_dataset_snapshots(
client: &ApiClient,
dataset_id: &str,
) -> Result<Vec<DatasetSnapshot>> {
let path = format!("/v1/dataset_snapshot?dataset_id={}", encode(dataset_id));
let list: ListResponseGeneric<DatasetSnapshot> = client.get(&path).await?;
Ok(list.objects)
}

pub async fn create_dataset_snapshot(
client: &ApiClient,
dataset_id: &str,
name: &str,
description: Option<&str>,
xact_id: &str,
) -> Result<CreateDatasetSnapshotResult> {
let mut body = serde_json::json!({
"dataset_id": dataset_id,
"name": name,
"xact_id": xact_id,
});
if let Some(description) = description {
body["description"] = Value::String(description.to_string());
}
let response = client
.post_with_headers_raw("/v1/dataset_snapshot", &body, &[])
.await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(HttpError { status, body }.into());
}

let found_existing = found_existing_snapshot_header(response.headers());
let dataset_snapshot = response.json().await.context("failed to parse response")?;
Ok(CreateDatasetSnapshotResult {
dataset_snapshot,
found_existing,
})
}

pub async fn preview_dataset_restore(
client: &ApiClient,
dataset_id: &str,
xact_id: &str,
) -> Result<DatasetRestorePreview> {
let path = format!("/v1/dataset/{}/restore/preview", encode(dataset_id));
client
.post(&path, &serde_json::json!({ "version": xact_id }))
.await
}

pub async fn restore_dataset(
client: &ApiClient,
dataset_id: &str,
xact_id: &str,
) -> Result<DatasetRestoreResult> {
let path = format!("/v1/dataset/{}/restore", encode(dataset_id));
client
.post(&path, &serde_json::json!({ "version": xact_id }))
.await
}

pub async fn get_dataset_head_xact_id(
client: &ApiClient,
dataset_id: &str,
) -> Result<Option<String>> {
let query = build_dataset_head_xact_query(dataset_id);
let response = client
.btql_structured::<DatasetHeadXactRow, _>(&query)
.await?;
let head = response
.data
.into_iter()
.filter_map(|row| row.xact_id)
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.max_by(compare_xact_ids);
Ok(head)
}

fn resolve_dataset_rows_page_limit(max_rows: Option<usize>, loaded_rows: usize) -> Option<usize> {
match max_rows {
None => Some(MAX_DATASET_ROWS_PAGE_LIMIT),
Expand Down Expand Up @@ -269,6 +391,44 @@ fn dataset_rows_select_fields() -> Vec<Value> {
.collect()
}

fn build_dataset_head_xact_query(dataset_id: &str) -> Value {
json!({
"select": [{
"expr": {"op": "ident", "name": ["_xact_id"]},
"alias": "_xact_id",
}],
"from": {
"op": "function",
"name": {"op": "ident", "name": ["dataset"]},
"args": [{"op": "literal", "value": dataset_id}]
},
"filter": {
"op": "ge",
"left": {"op": "ident", "name": ["created"]},
"right": {"op": "literal", "value": DATASET_ROWS_SINCE}
},
"sort": [{
"expr": {"op": "ident", "name": ["_xact_id"]},
"dir": "desc",
}],
"limit": 1
})
}

fn compare_xact_ids(left: &String, right: &String) -> std::cmp::Ordering {
match (left.parse::<u64>(), right.parse::<u64>()) {
(Ok(left), Ok(right)) => left.cmp(&right),
_ => left.cmp(right),
}
}

fn found_existing_snapshot_header(headers: &HeaderMap) -> bool {
headers
.get("x-bt-found-existing")
.and_then(|value| value.to_str().ok())
.is_some_and(|value| value.eq_ignore_ascii_case("true") || value == "1")
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -351,6 +511,99 @@ mod tests {
);
}

#[test]
fn dataset_head_query_includes_required_filter_and_limit() {
let query = build_dataset_head_xact_query("dataset-id");
assert_eq!(
query,
serde_json::json!({
"select": [{
"expr": {"op": "ident", "name": ["_xact_id"]},
"alias": "_xact_id",
}],
"from": {
"op": "function",
"name": {"op": "ident", "name": ["dataset"]},
"args": [{"op": "literal", "value": "dataset-id"}]
},
"filter": {
"op": "ge",
"left": {"op": "ident", "name": ["created"]},
"right": {"op": "literal", "value": "1970-01-01T00:00:00Z"}
},
"sort": [{
"expr": {"op": "ident", "name": ["_xact_id"]},
"dir": "desc",
}],
"limit": 1
})
);
}

#[test]
fn dataset_head_query_keeps_dataset_id_as_literal() {
let query = build_dataset_head_xact_query("dataset'with-quote");
assert_eq!(
query.pointer("/from/args/0/value").and_then(Value::as_str),
Some("dataset'with-quote")
);
}

#[test]
fn compare_xact_ids_prefers_numeric_order_when_possible() {
assert_eq!(
compare_xact_ids(&"10".to_string(), &"2".to_string()),
std::cmp::Ordering::Greater
);
assert_eq!(
compare_xact_ids(&"b".to_string(), &"a".to_string()),
std::cmp::Ordering::Greater
);
}

#[test]
fn dataset_snapshot_deserializes_service_schema() {
let snapshot: DatasetSnapshot = serde_json::from_value(serde_json::json!({
"id": "01926568-8088-7109-99ab-123456789abc",
"dataset_id": "01926568-8088-7109-99ab-abcdef012345",
"name": "baseline",
"description": null,
"xact_id": "1000192656880881099",
"created": null
}))
.expect("deserialize snapshot");

assert_eq!(snapshot.dataset_id, "01926568-8088-7109-99ab-abcdef012345");
assert_eq!(snapshot.name, "baseline");
assert!(snapshot.description.is_none());
assert_eq!(snapshot.xact_id, "1000192656880881099");
assert!(snapshot.created.is_none());
}

#[test]
fn dataset_restore_preview_deserializes_count_fields() {
let preview: DatasetRestorePreview = serde_json::from_value(serde_json::json!({
"rows_to_restore": 7,
"rows_to_delete": 2
}))
.expect("deserialize preview");
assert_eq!(preview.rows_to_restore, 7);
assert_eq!(preview.rows_to_delete, 2);
}

#[test]
fn dataset_restore_result_deserializes_count_fields() {
let result: DatasetRestoreResult = serde_json::from_value(serde_json::json!({
"xact_id": "1000192656880881099",
"rows_restored": 7,
"rows_deleted": 2
}))
.expect("deserialize result");
assert_eq!(result.xact_id, "1000192656880881099");
assert_eq!(result.rows_restored, 7);
assert_eq!(result.rows_deleted, 2);
}

#[test]
fn dataset_rows_page_limit_defaults_to_api_max() {
assert_eq!(
Expand All @@ -369,4 +622,23 @@ mod tests {
fn dataset_rows_page_limit_stops_when_limit_reached() {
assert_eq!(resolve_dataset_rows_page_limit(Some(200), 200), None);
}

#[test]
fn found_existing_snapshot_header_accepts_true_and_one() {
let mut headers = HeaderMap::new();
headers.insert("x-bt-found-existing", "true".parse().expect("header"));
assert!(found_existing_snapshot_header(&headers));

headers.insert("x-bt-found-existing", "1".parse().expect("header"));
assert!(found_existing_snapshot_header(&headers));
}

#[test]
fn found_existing_snapshot_header_rejects_missing_or_false() {
assert!(!found_existing_snapshot_header(&HeaderMap::new()));

let mut headers = HeaderMap::new();
headers.insert("x-bt-found-existing", "false".parse().expect("header"));
assert!(!found_existing_snapshot_header(&headers));
}
}
Loading
Loading