From 01b1fba96fb3106411d50772b6f34974e1251517 Mon Sep 17 00:00:00 2001 From: Shiyas Mohammed Date: Fri, 20 Feb 2026 18:25:47 +0530 Subject: [PATCH] feat(admin-api): add single-table restore endpoint --- crates/bin/ampctl/src/cmd/dataset/restore.rs | 103 +++-- .../src/cmd/dataset/restore__after_help.md | 21 +- crates/clients/admin/src/datasets.rs | 259 ++++++++++++ crates/core/data-store/src/lib.rs | 128 ++++++ crates/core/metadata-db/src/physical_table.rs | 16 +- .../admin-api/src/handlers/datasets.rs | 1 + .../src/handlers/datasets/restore_table.rs | 394 ++++++++++++++++++ crates/services/admin-api/src/lib.rs | 6 + docs/features/admin-dataset.md | 8 +- docs/openapi-specs/admin.spec.json | 113 +++++ .../tests/it_admin_api_datasets_restore.rs | 272 ++++++++++++ tests/src/tests/mod.rs | 1 + 12 files changed, 1287 insertions(+), 35 deletions(-) create mode 100644 crates/services/admin-api/src/handlers/datasets/restore_table.rs create mode 100644 tests/src/tests/it_admin_api_datasets_restore.rs diff --git a/crates/bin/ampctl/src/cmd/dataset/restore.rs b/crates/bin/ampctl/src/cmd/dataset/restore.rs index 478af695d..a7c43eb28 100644 --- a/crates/bin/ampctl/src/cmd/dataset/restore.rs +++ b/crates/bin/ampctl/src/cmd/dataset/restore.rs @@ -5,6 +5,9 @@ //! 2. POSTing to admin API `/datasets/{namespace}/{name}/versions/{version}/restore` endpoint //! 3. Returning information about the restored tables //! +//! Optionally targets a single table with `--table` and can activate a specific revision +//! with `--location-id`. +//! //! # Dataset Reference Format //! //! `namespace/name@version` (e.g., `graph/eth_mainnet@1.0.0`) @@ -29,9 +32,17 @@ pub struct Args { /// Examples: my_namespace/my_dataset@1.0.0, my_namespace/my_dataset@latest #[arg(value_name = "REFERENCE", required = true, value_parser = clap::value_parser!(Reference))] pub dataset_ref: Reference, + + /// Restore a specific table only (instead of all tables) + #[arg(long, value_name = "TABLE")] + pub table: Option, + + /// Activate a specific location ID for the table (requires --table) + #[arg(long, value_name = "ID", requires = "table")] + pub location_id: Option, } -/// Result of a dataset restore operation. +/// Result of a dataset restore operation (all tables). #[derive(serde::Serialize)] struct RestoreResult { tables: Vec, @@ -66,9 +77,39 @@ impl std::fmt::Display for RestoreResult { } } +/// Result of a single table restore operation. +#[derive(serde::Serialize)] +struct RestoreTableResult { + table: String, + #[serde(skip_serializing_if = "Option::is_none")] + location_id: Option, +} + +impl std::fmt::Display for RestoreTableResult { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!( + f, + "{} Table '{}' restored successfully", + console::style("✓").green().bold(), + console::style(&self.table).yellow(), + )?; + if let Some(id) = self.location_id { + writeln!( + f, + "{} Activated location_id: {}", + console::style("→").cyan(), + id + )?; + } + Ok(()) + } +} + /// Restore dataset physical tables from object storage. /// /// Re-indexes physical table metadata from storage into the database. +/// Optionally targets a single table with `--table`, and can activate +/// a specific revision with `--location-id`. /// /// # Errors /// @@ -78,36 +119,44 @@ pub async fn run( Args { global, dataset_ref, + table, + location_id, }: Args, ) -> Result<(), Error> { - tracing::debug!(%dataset_ref, "Restoring dataset"); + let client = global.build_client().map_err(Error::ClientBuild)?; - let tables = restore_dataset(&global, &dataset_ref).await?; - let result = RestoreResult { tables }; - global.print(&result).map_err(Error::JsonSerialization)?; + if let Some(table_name) = table { + tracing::debug!(%dataset_ref, %table_name, ?location_id, "restoring single table"); + + client + .datasets() + .restore_table(&dataset_ref, &table_name, location_id) + .await + .map_err(Error::RestoreTable)?; + + let result = RestoreTableResult { + table: table_name, + location_id, + }; + global.print(&result).map_err(Error::JsonSerialization)?; + } else { + tracing::debug!(%dataset_ref, "restoring dataset"); + + let response = client + .datasets() + .restore(&dataset_ref) + .await + .map_err(Error::Restore)?; + + let result = RestoreResult { + tables: response.tables, + }; + global.print(&result).map_err(Error::JsonSerialization)?; + } Ok(()) } -/// Restore dataset tables via the admin API. -/// -/// POSTs to the `/datasets/{namespace}/{name}/versions/{version}/restore` endpoint -/// and returns the list of restored tables. -#[tracing::instrument(skip_all, fields(%dataset_ref))] -async fn restore_dataset( - global: &GlobalArgs, - dataset_ref: &Reference, -) -> Result, Error> { - let client = global.build_client().map_err(Error::ClientBuild)?; - let response = client - .datasets() - .restore(dataset_ref) - .await - .map_err(Error::Restore)?; - - Ok(response.tables) -} - /// Errors for dataset restore operations. #[derive(Debug, thiserror::Error)] pub enum Error { @@ -115,10 +164,14 @@ pub enum Error { #[error("failed to build admin API client")] ClientBuild(#[source] crate::args::BuildClientError), - /// Restore error from the client + /// Restore error from the client (all tables) #[error("restore failed")] Restore(#[source] crate::client::datasets::RestoreError), + /// Restore table error from the client (single table) + #[error("restore table failed")] + RestoreTable(#[source] crate::client::datasets::RestoreTableError), + /// Failed to serialize result to JSON #[error("failed to serialize result to JSON")] JsonSerialization(#[source] serde_json::Error), diff --git a/crates/bin/ampctl/src/cmd/dataset/restore__after_help.md b/crates/bin/ampctl/src/cmd/dataset/restore__after_help.md index ad9589ff7..3bbe92971 100644 --- a/crates/bin/ampctl/src/cmd/dataset/restore__after_help.md +++ b/crates/bin/ampctl/src/cmd/dataset/restore__after_help.md @@ -1,11 +1,23 @@ ## Examples -Restore dataset physical tables from storage: +Restore all dataset physical tables from storage: ``` ampctl dataset restore my_namespace/my_dataset@1.0.0 ``` +Restore a single table from storage (UUID heuristic): + +``` +ampctl dataset restore my_namespace/my_dataset@1.0.0 --table logs +``` + +Activate a specific revision for a table: + +``` +ampctl dataset restore my_namespace/my_dataset@1.0.0 --table logs --location-id 42 +``` + Restore latest version of a dataset: ``` @@ -40,6 +52,13 @@ ampctl dataset restore production/eth_mainnet@2.1.0 ampctl dataset restore analytics/uniswap_v3@1.0.0 ``` +**Activating a specific revision for a table:** + +``` +# Switch a table to a known good revision +ampctl dataset restore production/eth_mainnet@2.1.0 --table blocks --location-id 123 +``` + **Re-syncing metadata after storage restoration:** ``` diff --git a/crates/clients/admin/src/datasets.rs b/crates/clients/admin/src/datasets.rs index 97b7ef1c4..d764ce6bb 100644 --- a/crates/clients/admin/src/datasets.rs +++ b/crates/clients/admin/src/datasets.rs @@ -93,6 +93,18 @@ fn dataset_restore(namespace: &Namespace, name: &Name, version: &Revision) -> St format!("datasets/{namespace}/{name}/versions/{version}/restore") } +/// Build URL path for restoring a single table in a dataset version. +/// +/// POST `/datasets/{namespace}/{name}/versions/{version}/tables/{table_name}/restore` +fn dataset_restore_table( + namespace: &Namespace, + name: &Name, + version: &Revision, + table_name: &str, +) -> String { + format!("datasets/{namespace}/{name}/versions/{version}/tables/{table_name}/restore") +} + /// Build URL path for listing jobs for a dataset revision. /// /// GET `/datasets/{namespace}/{name}/versions/{revision}/jobs` @@ -437,6 +449,137 @@ impl<'a> DatasetsClient<'a> { } } + /// Restore a single table in a dataset version. + /// + /// POSTs to `/datasets/{namespace}/{name}/versions/{version}/tables/{table_name}/restore` endpoint. + /// + /// If `location_id` is provided, activates that existing revision for the table. + /// If omitted, discovers the latest revision from object storage via UUID heuristic. + /// + /// # Errors + /// + /// Returns [`RestoreTableError`] for network errors, API errors (400/404/500), + /// or unexpected responses. + #[tracing::instrument(skip(self), fields(dataset_ref = %dataset_ref, table_name = %table_name, ?location_id))] + pub async fn restore_table( + &self, + dataset_ref: &Reference, + table_name: &str, + location_id: Option, + ) -> Result<(), RestoreTableError> { + let namespace = dataset_ref.namespace(); + let name = dataset_ref.name(); + let version = dataset_ref.revision(); + + let url = self + .client + .base_url() + .join(&dataset_restore_table(namespace, name, version, table_name)) + .expect("valid URL"); + + tracing::debug!("sending restore table request"); + + let mut request = self.client.http().post(url.as_str()); + + if let Some(id) = location_id { + request = request.json(&RestoreTablePayload { location_id: id }); + } + + let response = request + .send() + .await + .map_err(|err| RestoreTableError::Network { + url: url.to_string(), + source: err, + })?; + + let status = response.status(); + tracing::debug!(status = %status, "received API response"); + + match status.as_u16() { + 200 => { + tracing::debug!("table restored"); + Ok(()) + } + 400 | 404 | 500 => { + let text = response.text().await.map_err(|err| { + tracing::error!( + status = %status, + error = %err, + error_source = logging::error_source(&err), + "failed to read error response" + ); + RestoreTableError::UnexpectedResponse { + status: status.as_u16(), + message: format!("Failed to read error response: {}", err), + } + })?; + + let error_response: ErrorResponse = serde_json::from_str(&text).map_err(|err| { + tracing::error!( + status = %status, + error = %err, + error_source = logging::error_source(&err), + "failed to parse error response" + ); + RestoreTableError::UnexpectedResponse { + status: status.as_u16(), + message: text.clone(), + } + })?; + + match error_response.error_code.as_str() { + "INVALID_PATH" => Err(RestoreTableError::InvalidPath(error_response.into())), + "INVALID_BODY" => Err(RestoreTableError::InvalidBody(error_response.into())), + "DATASET_NOT_FOUND" => { + Err(RestoreTableError::DatasetNotFound(error_response.into())) + } + "RESOLVE_REVISION_ERROR" => Err(RestoreTableError::ResolveRevisionError( + error_response.into(), + )), + "GET_DATASET_ERROR" => { + Err(RestoreTableError::GetDatasetError(error_response.into())) + } + "TABLE_NOT_IN_MANIFEST" => { + Err(RestoreTableError::TableNotInManifest(error_response.into())) + } + "TABLE_NOT_FOUND" => { + Err(RestoreTableError::TableNotFound(error_response.into())) + } + "GET_REVISION_ERROR" => { + Err(RestoreTableError::GetRevisionError(error_response.into())) + } + "REVISION_NOT_FOUND" => { + Err(RestoreTableError::RevisionNotFound(error_response.into())) + } + "REGISTER_AND_ACTIVATE_ERROR" => Err( + RestoreTableError::RegisterAndActivateError(error_response.into()), + ), + "RESTORE_TABLE_REVISION_ERROR" => Err( + RestoreTableError::RestoreTableRevisionError(error_response.into()), + ), + "REGISTER_FILES_ERROR" => { + Err(RestoreTableError::RegisterFilesError(error_response.into())) + } + _ => Err(RestoreTableError::UnexpectedResponse { + status: status.as_u16(), + message: text, + }), + } + } + _ => { + let text = response + .text() + .await + .unwrap_or_else(|_| String::from("Failed to read response body")); + Err(RestoreTableError::UnexpectedResponse { + status: status.as_u16(), + message: text, + }) + } + } + } + /// List all registered datasets. /// /// GETs from `/datasets` endpoint. @@ -1411,6 +1554,122 @@ pub enum RestoreError { UnexpectedResponse { status: u16, message: String }, } +/// Request payload for restoring a single table with a specific location ID. +#[derive(Debug, serde::Serialize)] +struct RestoreTablePayload { + location_id: i64, +} + +/// Errors that can occur when restoring a single table. +#[derive(Debug, thiserror::Error)] +pub enum RestoreTableError { + /// Invalid path parameters (400, INVALID_PATH) + /// + /// This occurs when: + /// - The namespace, name, revision, or table_name in the URL path is invalid + /// - Path parameter parsing fails + #[error("invalid path")] + InvalidPath(#[source] ApiError), + + /// Invalid request body (400, INVALID_BODY) + /// + /// This occurs when: + /// - The request body is not valid JSON + /// - The JSON structure doesn't match the expected schema + /// - Required fields are missing or have invalid types + #[error("invalid body")] + InvalidBody(#[source] ApiError), + + /// Dataset or revision not found (404, DATASET_NOT_FOUND) + /// + /// This occurs when: + /// - The combination of namespace, name, and revision doesn't match any dataset + /// - The dataset has not been registered + #[error("dataset not found")] + DatasetNotFound(#[source] ApiError), + + /// Failed to resolve revision (500, RESOLVE_REVISION_ERROR) + /// + /// This occurs when: + /// - The datasets registry could not resolve the revision reference + /// - The revision string is valid but cannot be mapped to a manifest hash + #[error("resolve revision error")] + ResolveRevisionError(#[source] ApiError), + + /// Failed to load dataset (500, GET_DATASET_ERROR) + /// + /// This occurs when: + /// - Failed to load dataset configuration from manifest + /// - Manifest parsing errors + /// - Invalid dataset structure + #[error("get dataset error")] + GetDatasetError(#[source] ApiError), + + /// Table not found in dataset manifest (404, TABLE_NOT_IN_MANIFEST) + /// + /// This occurs when: + /// - The table name doesn't match any table in the dataset definition + /// - The dataset was resolved but doesn't define the requested table + #[error("table not in manifest")] + TableNotInManifest(#[source] ApiError), + + /// Table data not found in object storage (404, TABLE_NOT_FOUND) + /// + /// This occurs when: + /// - No physical table files exist in storage for this table + /// - The UUID heuristic scan found no revision directories + #[error("table not found")] + TableNotFound(#[source] ApiError), + + /// Failed to look up revision by location ID (500, GET_REVISION_ERROR) + /// + /// This occurs when: + /// - The metadata database query for the location_id failed + /// - Database connection or query errors + #[error("get revision error")] + GetRevisionError(#[source] ApiError), + + /// No revision found for the given location ID (404, REVISION_NOT_FOUND) + /// + /// This occurs when: + /// - The provided location_id doesn't correspond to any registered revision + /// - The revision was deleted or never existed + #[error("revision not found")] + RevisionNotFound(#[source] ApiError), + + /// Failed to register and activate physical table (500, REGISTER_AND_ACTIVATE_ERROR) + /// + /// This occurs when: + /// - The transactional upsert-and-activate operation failed + /// - Database connection or constraint errors during activation + #[error("register and activate error")] + RegisterAndActivateError(#[source] ApiError), + + /// Failed to restore table revision from storage (500, RESTORE_TABLE_REVISION_ERROR) + /// + /// This occurs when: + /// - Error scanning object storage for table files + /// - Error registering location in metadata database + #[error("restore table revision error")] + RestoreTableRevisionError(#[source] ApiError), + + /// Failed to register files (500, REGISTER_FILES_ERROR) + /// + /// This occurs when: + /// - Error re-indexing Parquet file metadata from storage + /// - Database errors while registering file metadata + #[error("register files error")] + RegisterFilesError(#[source] ApiError), + + /// Network or connection error + #[error("network error connecting to {url}")] + Network { url: String, source: reqwest::Error }, + + /// Unexpected response from API + #[error("unexpected response (status {status}): {message}")] + UnexpectedResponse { status: u16, message: String }, +} + /// Response from GET /datasets endpoint. #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct DatasetsResponse { diff --git a/crates/core/data-store/src/lib.rs b/crates/core/data-store/src/lib.rs index 8556f9731..5b035fe25 100644 --- a/crates/core/data-store/src/lib.rs +++ b/crates/core/data-store/src/lib.rs @@ -381,6 +381,69 @@ impl DataStore { Ok(()) } + /// Upserts the physical table entry and activates a specific revision by location ID. + /// + /// This atomically: + /// 1. Upserts the `physical_tables` row (ensures it exists) + /// 2. Marks all existing revisions for the table as inactive + /// 3. Marks the specified revision as active + /// + /// Unlike [`activate_table_revision`](Self::activate_table_revision), this also + /// ensures the `physical_tables` row exists before activation. + pub async fn register_and_activate_physical_table( + &self, + dataset: &HashReference, + table_name: &TableName, + location_id: LocationId, + ) -> Result<(), RegisterAndActivatePhysicalTableError> { + let mut tx = self + .metadata_db + .begin_txn() + .await + .map_err(RegisterAndActivatePhysicalTableError::TransactionBegin)?; + + metadata_db::physical_table::register( + &mut tx, + dataset.namespace(), + dataset.name(), + dataset.hash(), + table_name, + ) + .await + .map_err(RegisterAndActivatePhysicalTableError::RegisterPhysicalTable)?; + + metadata_db::physical_table::mark_inactive_by_table_name( + &mut tx, + dataset.namespace(), + dataset.name(), + dataset.hash(), + table_name, + ) + .await + .map_err(RegisterAndActivatePhysicalTableError::MarkInactive)?; + + let updated = metadata_db::physical_table::mark_active_by_id( + &mut tx, + location_id, + dataset.namespace(), + dataset.name(), + dataset.hash(), + table_name, + ) + .await + .map_err(RegisterAndActivatePhysicalTableError::MarkActive)?; + + if !updated { + return Err(RegisterAndActivatePhysicalTableError::PhysicalTableNotFound); + } + + tx.commit() + .await + .map_err(RegisterAndActivatePhysicalTableError::TransactionCommit)?; + + Ok(()) + } + /// Deactivates all revisions for a table. /// /// Marks all revisions for the given dataset table as inactive. This is a @@ -934,6 +997,71 @@ pub enum ActivateTableRevisionError { TransactionCommit(#[source] metadata_db::Error), } +/// Errors that occur when upserting a physical table and activating a revision +/// +/// This error type is used by `DataStore::register_and_activate_physical_table()`. +/// The operation runs as a single database transaction: on any failure, +/// PostgreSQL guarantees all changes are rolled back. +#[derive(Debug, thiserror::Error)] +pub enum RegisterAndActivatePhysicalTableError { + /// Failed to begin a database transaction + /// + /// Common causes: + /// - Database connection pool exhausted + /// - Database server unreachable + #[error("Failed to begin transaction")] + TransactionBegin(#[source] metadata_db::Error), + + /// Failed to upsert the physical table record + /// + /// The UPSERT ensures a `physical_tables` row exists for the + /// dataset+table combination before attempting to activate a revision. + /// + /// Common causes: + /// - Database connection lost during insert + /// - Database constraint violation + #[error("Failed to register physical table in metadata database")] + RegisterPhysicalTable(#[source] metadata_db::Error), + + /// Failed to mark existing active revisions as inactive + /// + /// Common causes: + /// - Database connection lost during update + /// - Lock timeout on physical_table_revisions rows + /// - Database constraint violation + #[error("Failed to mark existing physical tables as inactive")] + MarkInactive(#[source] metadata_db::Error), + + /// Failed to mark the specified revision as active + /// + /// Common causes: + /// - Database connection lost during update + /// - Lock timeout on physical_table_revisions row + /// - Database constraint violation + #[error("Failed to mark table revision as active: {0}")] + MarkActive(#[source] metadata_db::Error), + + /// No physical table found with the given location ID + /// + /// The `mark_active_by_id` update matched zero rows, meaning the provided + /// `location_id` does not correspond to a registered physical table for this table. + #[error("No physical table found")] + PhysicalTableNotFound, + + /// Failed to commit transaction after successful database operations + /// + /// When a commit fails, PostgreSQL guarantees that all changes are rolled back. + /// Neither the upsert, deactivation, nor the activation will be persisted. + /// + /// Common causes: + /// - Database connection lost during commit + /// - Transaction conflict with concurrent operations + /// + /// The operation is safe to retry from the beginning as no partial state was persisted. + #[error("Failed to commit transaction")] + TransactionCommit(#[source] metadata_db::Error), +} + /// Errors that occur when deactivating a physical table revision /// /// This error type is used by `DataStore::deactivate_table_revision()`. diff --git a/crates/core/metadata-db/src/physical_table.rs b/crates/core/metadata-db/src/physical_table.rs index a002c0d3b..2f18ceddc 100644 --- a/crates/core/metadata-db/src/physical_table.rs +++ b/crates/core/metadata-db/src/physical_table.rs @@ -73,15 +73,15 @@ where } } -/// Mark all active locations for a table as inactive +/// Mark all active physical tables for a table as inactive /// -/// This is typically used before marking a new location as active, ensuring -/// only one location per table is active at a time. +/// This is typically used before marking a new physical table as active, ensuring +/// only one physical table per table is active at a time. /// /// # Transaction Boundaries /// /// This operation should typically be performed within a transaction along with -/// `mark_active_by_id()` to ensure atomicity when switching active locations. +/// `mark_active_by_id()` to ensure atomicity when switching active physical tables. #[tracing::instrument(skip(exe), err)] pub async fn mark_inactive_by_table_name<'c, E>( exe: E, @@ -104,15 +104,15 @@ where .map_err(Error::Database) } -/// Mark a specific location as active +/// Mark a specific physical table as active /// -/// This does not automatically deactivate other locations. Use `mark_inactive_by_table_id()` -/// first within a transaction to ensure only one location is active. +/// This does not automatically deactivate other physical tables. Use `mark_inactive_by_table_name()` +/// first within a transaction to ensure only one physical table is active. /// /// # Transaction Boundaries /// /// This operation should typically be performed within a transaction along with -/// `mark_inactive_by_table_id()` to ensure atomicity when switching active locations. +/// `mark_inactive_by_table_name()` to ensure atomicity when switching active physical tables. #[tracing::instrument(skip(exe), err)] pub async fn mark_active_by_id<'c, E>( exe: E, diff --git a/crates/services/admin-api/src/handlers/datasets.rs b/crates/services/admin-api/src/handlers/datasets.rs index 9fdeef1e1..0f284f560 100644 --- a/crates/services/admin-api/src/handlers/datasets.rs +++ b/crates/services/admin-api/src/handlers/datasets.rs @@ -11,3 +11,4 @@ pub mod list_jobs; pub mod list_versions; pub mod register; pub mod restore; +pub mod restore_table; diff --git a/crates/services/admin-api/src/handlers/datasets/restore_table.rs b/crates/services/admin-api/src/handlers/datasets/restore_table.rs new file mode 100644 index 000000000..2ae1d415f --- /dev/null +++ b/crates/services/admin-api/src/handlers/datasets/restore_table.rs @@ -0,0 +1,394 @@ +use amp_datasets_registry::error::ResolveRevisionError; +use axum::{ + body::Bytes, + extract::{Path, State, rejection::PathRejection}, + http::StatusCode, +}; +use datasets_common::{ + name::Name, namespace::Namespace, reference::Reference, revision::Revision, + table_name::TableName, +}; +use metadata_db::physical_table_revision::LocationId; +use monitoring::logging; + +use crate::{ + ctx::Ctx, + handlers::{ + common::{RegisterRevisionFilesError, register_revision_files}, + error::{ErrorResponse, IntoErrorResponse}, + }, +}; + +/// Handler for the `POST /datasets/{namespace}/{name}/versions/{revision}/tables/{table_name}/restore` endpoint +/// +/// Restores a single physical table, either by activating a known revision or +/// by discovering the latest revision from object storage. +/// +/// ## Path Parameters +/// - `namespace`: Dataset namespace +/// - `name`: Dataset name +/// - `revision`: Revision (version, hash, latest, or dev) +/// - `table_name`: Table name +/// +/// ## Request Body (optional JSON) +/// - `location_id` (optional, i64): If provided, activates this existing revision +/// for the table. If omitted, discovers the latest revision from object storage +/// via UUID heuristic. +/// +/// ## Response +/// - **200 OK**: Table successfully restored +/// - **400 Bad Request**: Invalid path parameters or request body +/// - **404 Not Found**: Dataset, revision, or table not found +/// - **500 Internal Server Error**: Database or storage error +/// +/// ## Error Codes +/// - `INVALID_PATH`: Invalid path parameters (namespace, name, revision, or table_name) +/// - `INVALID_BODY`: Invalid request body +/// - `DATASET_NOT_FOUND`: The specified dataset or revision does not exist +/// - `RESOLVE_REVISION_ERROR`: Failed to resolve revision to manifest hash +/// - `GET_DATASET_ERROR`: Failed to load dataset from store +/// - `TABLE_NOT_IN_MANIFEST`: Table not found in the dataset manifest +/// - `TABLE_NOT_FOUND`: Table data not found in object storage +/// - `GET_REVISION_ERROR`: Failed to look up revision by location ID +/// - `REVISION_NOT_FOUND`: No revision exists for the given location ID +/// - `REGISTER_AND_ACTIVATE_ERROR`: Failed to register and activate physical table +/// - `RESTORE_TABLE_REVISION_ERROR`: Failed to restore a table from storage +/// - `REGISTER_FILES_ERROR`: Failed to register files for a table +/// +/// ## Behavior +/// +/// **With `location_id`:** +/// 1. Resolves the revision and loads the dataset +/// 2. Validates the table exists in the dataset manifest +/// 3. Upserts the `physical_tables` entry +/// 4. Marks all existing revisions for the table as inactive +/// 5. Marks the specified `location_id` as active +/// +/// **Without `location_id`:** +/// 1. Resolves the revision and loads the dataset +/// 2. Validates the table exists in the dataset manifest +/// 3. Scans object storage for the latest revision (by UUID ordering) +/// 4. Registers and activates the discovered revision +/// 5. Re-indexes all Parquet file metadata from storage +/// +/// This is useful for: +/// - Recovering from metadata database loss +/// - Activating a specific known revision for a table +/// - Setting up a new system with pre-existing data +/// - Re-syncing metadata after storage restoration +#[tracing::instrument(skip_all, err)] +#[cfg_attr( + feature = "utoipa", + utoipa::path( + post, + path = "/datasets/{namespace}/{name}/versions/{revision}/tables/{table_name}/restore", + tag = "datasets", + operation_id = "restore_table", + params( + ("namespace" = String, Path, description = "Dataset namespace"), + ("name" = String, Path, description = "Dataset name"), + ("revision" = String, Path, description = "Revision (version, hash, latest, or dev)"), + ("table_name" = String, Path, description = "Table name"), + ), + request_body(content = Option, content_type = "application/json"), + responses( + (status = 200, description = "Table successfully restored"), + (status = 400, description = "Bad request (invalid parameters)", body = ErrorResponse), + (status = 404, description = "Dataset, revision, or table not found", body = ErrorResponse), + (status = 500, description = "Internal server error", body = ErrorResponse), + ) + ) +)] +pub async fn handler( + path: Result, PathRejection>, + State(ctx): State, + body: Bytes, +) -> Result { + let (reference, table_name) = match path { + Ok(Path((namespace, name, revision, table_name))) => { + (Reference::new(namespace, name, revision), table_name) + } + Err(err) => { + tracing::debug!(error = %err, error_source = logging::error_source(&err), "invalid path parameters"); + return Err(Error::InvalidPath(err).into()); + } + }; + + let location_id = if body.is_empty() { + None + } else { + let payload: RestoreTablePayload = serde_json::from_slice(&body).map_err(|err| { + tracing::debug!(error = %err, error_source = logging::error_source(&err), "invalid request body"); + Error::InvalidBody(err) + })?; + Some(payload.location_id) + }; + + tracing::debug!( + dataset_reference = %reference, + table_name = %table_name, + ?location_id, + "restoring table" + ); + + let namespace = reference.namespace().clone(); + let name = reference.name().clone(); + let revision = reference.revision().clone(); + + // Resolve reference to hash reference + let dataset_ref = ctx + .datasets_registry + .resolve_revision(&reference) + .await + .map_err(Error::ResolveRevision)? + .ok_or_else(|| Error::NotFound { + namespace: namespace.clone(), + name: name.clone(), + revision: revision.clone(), + })?; + + // Load the full dataset object using the resolved hash reference + let dataset = ctx + .dataset_store + .get_dataset(&dataset_ref) + .await + .map_err(Error::GetDataset)?; + + // Validate the table exists in the dataset manifest + dataset + .tables() + .iter() + .find(|t| *t.name() == table_name) + .ok_or_else(|| Error::TableNotInManifest { + table: table_name.clone(), + })?; + + if let Some(location_id) = location_id { + // Check if the revision with the given location id exists + let revision = ctx + .data_store + .get_revision_by_location_id(location_id) + .await + .map_err(|err| Error::GetRevisionByLocationId { + location_id, + source: err, + })?; + if revision.is_none() { + return Err(Error::RevisionNotFound { location_id }.into()); + } + + // With location_id: UPSERT physical_tables → mark inactive → mark active + tracing::debug!(%dataset_ref, %table_name, %location_id, "activating existing revision"); + + ctx.data_store + .register_and_activate_physical_table(&dataset_ref, &table_name, location_id) + .await + .map_err(Error::RegisterAndActivatePhysicalTable)?; + + tracing::info!(%dataset_ref, %table_name, %location_id, "table revision activated"); + } else { + // Without location_id: full restore via UUID heuristic + tracing::debug!(%dataset_ref, %table_name, "restoring from object storage"); + + let data_store = ctx.data_store.clone(); + + let info = data_store + .restore_latest_table_revision(&dataset_ref, &table_name) + .await + .map_err(|err| { + tracing::error!( + table_name = %table_name, + error = %err, + error_source = logging::error_source(&err), + "failed to restore table revision from storage" + ); + Error::RestoreTableRevision { + table: table_name.clone(), + source: err, + } + })? + .ok_or_else(|| Error::TableNotFound { + table: table_name.clone(), + })?; + + // Register all files in the revision + register_revision_files(&data_store, &info) + .await + .map_err(|err| { + tracing::error!( + table_name = %table_name, + error = %err, + error_source = logging::error_source(&err), + "failed to register files for table" + ); + Error::RegisterFiles { + table: table_name.clone(), + source: err, + } + })?; + + tracing::info!( + %dataset_ref, + %table_name, + location_id = %info.location_id, + path = %info.path, + "table restored from storage" + ); + } + + Ok(StatusCode::OK) +} + +/// Optional request body for restore table operation +#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] +pub struct RestoreTablePayload { + /// Location ID of the revision to activate. If omitted, the latest + /// revision is discovered from object storage. + #[cfg_attr(feature = "utoipa", schema(value_type = i64))] + pub location_id: LocationId, +} + +/// Errors that can occur when restoring a table +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Invalid path parameters + /// + /// One or more path segments (namespace, name, revision, or table_name) + /// could not be parsed into their expected types. + #[error("Invalid path parameters: {0}")] + InvalidPath(#[source] PathRejection), + + /// Invalid request body + /// + /// This occurs when: + /// - The request body is not valid JSON + /// - The JSON structure doesn't match the expected schema + /// - Required fields are missing or have invalid types + #[error("Invalid request body: {0}")] + InvalidBody(#[source] serde_json::Error), + + /// Dataset or revision not found + /// + /// The combination of namespace, name, and revision does not match + /// any registered dataset in the registry. + #[error("Dataset '{namespace}/{name}' at revision '{revision}' not found")] + NotFound { + namespace: Namespace, + name: Name, + revision: Revision, + }, + + /// Failed to resolve revision to a manifest hash + /// + /// The datasets registry could not resolve the given revision reference + /// (version string, hash, "latest", or "dev") to a concrete manifest hash. + #[error("Failed to resolve revision: {0}")] + ResolveRevision(#[source] ResolveRevisionError), + + /// Failed to load dataset from the dataset store + /// + /// The resolved hash reference exists but the dataset object could not + /// be loaded from the backing store. + #[error("Failed to load dataset: {0}")] + GetDataset(#[source] common::dataset_store::GetDatasetError), + + /// Table not found in the dataset manifest + /// + /// The dataset was loaded successfully but does not contain a table + /// definition matching the requested table name. + #[error("Table '{table}' not found in dataset manifest")] + TableNotInManifest { table: TableName }, + + /// Failed to look up a revision by its location ID + /// + /// The metadata database query for the given `location_id` failed. + #[error("Failed to get revision by location ID: {location_id}")] + GetRevisionByLocationId { + location_id: LocationId, + #[source] + source: amp_data_store::GetRevisionByLocationIdError, + }, + + /// No revision exists for the given location ID + /// + /// The provided `location_id` does not correspond to any registered + /// physical table revision in the metadata database. + #[error("No revision found for location_id {location_id}")] + RevisionNotFound { location_id: LocationId }, + + /// Failed to upsert physical table and activate revision + /// + /// The transactional upsert-and-activate operation failed. This covers + /// registering the physical table row, deactivating existing revisions, + /// and activating the specified `location_id`. + #[error("Failed to register and activate physical table: {0}")] + RegisterAndActivatePhysicalTable( + #[source] amp_data_store::RegisterAndActivatePhysicalTableError, + ), + + /// Failed to restore table revision from object storage + /// + /// The UUID heuristic scan of object storage failed, or no revision + /// directories were found for the table. + #[error("Failed to restore table revision for '{table}'")] + RestoreTableRevision { + table: TableName, + #[source] + source: amp_data_store::RestoreLatestTableRevisionError, + }, + + /// Failed to register parquet files for a restored table revision + /// + /// The revision was discovered and activated, but re-indexing the + /// parquet file metadata from storage into the database failed. + #[error("Failed to register files for table '{table}'")] + RegisterFiles { + table: TableName, + #[source] + source: RegisterRevisionFilesError, + }, + + /// Table data not found in object storage + /// + /// The UUID heuristic scan completed but found no revision directories + /// for the table in object storage. + #[error("Table '{table}' not found in object storage")] + TableNotFound { table: TableName }, +} + +impl IntoErrorResponse for Error { + fn error_code(&self) -> &'static str { + match self { + Error::InvalidPath(_) => "INVALID_PATH", + Error::InvalidBody(_) => "INVALID_BODY", + Error::NotFound { .. } => "DATASET_NOT_FOUND", + Error::ResolveRevision(_) => "RESOLVE_REVISION_ERROR", + Error::GetDataset(_) => "GET_DATASET_ERROR", + Error::TableNotInManifest { .. } => "TABLE_NOT_IN_MANIFEST", + Error::GetRevisionByLocationId { .. } => "GET_REVISION_ERROR", + Error::RevisionNotFound { .. } => "REVISION_NOT_FOUND", + Error::RegisterAndActivatePhysicalTable(_) => "REGISTER_AND_ACTIVATE_ERROR", + Error::RestoreTableRevision { .. } => "RESTORE_TABLE_REVISION_ERROR", + Error::RegisterFiles { .. } => "REGISTER_FILES_ERROR", + Error::TableNotFound { .. } => "TABLE_NOT_FOUND", + } + } + + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidPath(_) => StatusCode::BAD_REQUEST, + Error::InvalidBody(_) => StatusCode::BAD_REQUEST, + Error::NotFound { .. } => StatusCode::NOT_FOUND, + Error::ResolveRevision(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::GetDataset(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::TableNotInManifest { .. } => StatusCode::NOT_FOUND, + Error::GetRevisionByLocationId { .. } => StatusCode::INTERNAL_SERVER_ERROR, + Error::RevisionNotFound { .. } => StatusCode::NOT_FOUND, + Error::RegisterAndActivatePhysicalTable(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::RestoreTableRevision { .. } => StatusCode::INTERNAL_SERVER_ERROR, + Error::RegisterFiles { .. } => StatusCode::INTERNAL_SERVER_ERROR, + Error::TableNotFound { .. } => StatusCode::NOT_FOUND, + } + } +} diff --git a/crates/services/admin-api/src/lib.rs b/crates/services/admin-api/src/lib.rs index bd95faed5..ef5448e99 100644 --- a/crates/services/admin-api/src/lib.rs +++ b/crates/services/admin-api/src/lib.rs @@ -46,6 +46,10 @@ pub fn router(ctx: Ctx) -> Router<()> { "/datasets/{namespace}/{name}/versions/{revision}/restore", post(datasets::restore::handler), ) + .route( + "/datasets/{namespace}/{name}/versions/{revision}/tables/{table_name}/restore", + post(datasets::restore_table::handler), + ) .route( "/datasets/{namespace}/{name}/versions/{revision}/jobs", get(datasets::list_jobs::handler), @@ -121,6 +125,7 @@ pub fn router(ctx: Ctx) -> Router<()> { handlers::datasets::register::handler, handlers::datasets::deploy::handler, handlers::datasets::restore::handler, + handlers::datasets::restore_table::handler, handlers::datasets::delete::handler, handlers::datasets::delete_version::handler, // Manifest endpoints @@ -178,6 +183,7 @@ pub fn router(ctx: Ctx) -> Router<()> { handlers::datasets::deploy::DeployResponse, handlers::datasets::restore::RestoreResponse, handlers::datasets::restore::RestoredTableInfo, + handlers::datasets::restore_table::RestoreTablePayload, // Job schemas handlers::jobs::progress::JobProgressResponse, handlers::jobs::progress::TableProgress, diff --git a/docs/features/admin-dataset.md b/docs/features/admin-dataset.md index 9d4ac9c17..99f211f11 100644 --- a/docs/features/admin-dataset.md +++ b/docs/features/admin-dataset.md @@ -123,11 +123,17 @@ ampctl dataset manifest my_namespace/my_dataset@1.2.0 Re-index dataset metadata from existing data in object storage. Useful for recovery after metadata loss, setting up new systems with pre-existing data, or re-syncing after storage restoration. ```bash -# Restore a specific version +# Restore all tables for a specific version ampctl dataset restore my_namespace/my_dataset@1.0.0 # Restore latest version ampctl dataset restore my_namespace/my_dataset@latest + +# Restore a single table (discovers latest revision from storage) +ampctl dataset restore my_namespace/my_dataset@1.0.0 --table blocks + +# Restore a single table with a specific location ID +ampctl dataset restore my_namespace/my_dataset@1.0.0 --table blocks --location-id 42 ``` **JSON output for scripting:** diff --git a/docs/openapi-specs/admin.spec.json b/docs/openapi-specs/admin.spec.json index 53f08164b..fa469f86d 100644 --- a/docs/openapi-specs/admin.spec.json +++ b/docs/openapi-specs/admin.spec.json @@ -606,6 +606,105 @@ } } }, + "/datasets/{namespace}/{name}/versions/{revision}/tables/{table_name}/restore": { + "post": { + "tags": [ + "datasets" + ], + "summary": "Handler for the `POST /datasets/{namespace}/{name}/versions/{revision}/tables/{table_name}/restore` endpoint", + "description": "Restores a single physical table, either by activating a known revision or\nby discovering the latest revision from object storage.\n\n## Path Parameters\n- `namespace`: Dataset namespace\n- `name`: Dataset name\n- `revision`: Revision (version, hash, latest, or dev)\n- `table_name`: Table name\n\n## Request Body (optional JSON)\n- `location_id` (optional, i64): If provided, activates this existing revision\n for the table. If omitted, discovers the latest revision from object storage\n via UUID heuristic.\n\n## Response\n- **200 OK**: Table successfully restored\n- **400 Bad Request**: Invalid path parameters or request body\n- **404 Not Found**: Dataset, revision, or table not found\n- **500 Internal Server Error**: Database or storage error\n\n## Error Codes\n- `INVALID_PATH`: Invalid path parameters (namespace, name, revision, or table_name)\n- `INVALID_BODY`: Invalid request body\n- `DATASET_NOT_FOUND`: The specified dataset or revision does not exist\n- `RESOLVE_REVISION_ERROR`: Failed to resolve revision to manifest hash\n- `GET_DATASET_ERROR`: Failed to load dataset from store\n- `TABLE_NOT_IN_MANIFEST`: Table not found in the dataset manifest\n- `TABLE_NOT_FOUND`: Table data not found in object storage\n- `GET_REVISION_ERROR`: Failed to look up revision by location ID\n- `REVISION_NOT_FOUND`: No revision exists for the given location ID\n- `REGISTER_AND_ACTIVATE_ERROR`: Failed to register and activate physical table\n- `RESTORE_TABLE_REVISION_ERROR`: Failed to restore a table from storage\n- `REGISTER_FILES_ERROR`: Failed to register files for a table\n\n## Behavior\n\n**With `location_id`:**\n1. Resolves the revision and loads the dataset\n2. Validates the table exists in the dataset manifest\n3. Upserts the `physical_tables` entry\n4. Marks all existing revisions for the table as inactive\n5. Marks the specified `location_id` as active\n\n**Without `location_id`:**\n1. Resolves the revision and loads the dataset\n2. Validates the table exists in the dataset manifest\n3. Scans object storage for the latest revision (by UUID ordering)\n4. Registers and activates the discovered revision\n5. Re-indexes all Parquet file metadata from storage\n\nThis is useful for:\n- Recovering from metadata database loss\n- Activating a specific known revision for a table\n- Setting up a new system with pre-existing data\n- Re-syncing metadata after storage restoration", + "operationId": "restore_table", + "parameters": [ + { + "name": "namespace", + "in": "path", + "description": "Dataset namespace", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "name", + "in": "path", + "description": "Dataset name", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "revision", + "in": "path", + "description": "Revision (version, hash, latest, or dev)", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "table_name", + "in": "path", + "description": "Table name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "oneOf": [ + { + "type": "null" + }, + { + "$ref": "#/components/schemas/RestoreTablePayload" + } + ] + } + } + } + }, + "responses": { + "200": { + "description": "Table successfully restored" + }, + "400": { + "description": "Bad request (invalid parameters)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "404": { + "description": "Dataset, revision, or table not found", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "500": { + "description": "Internal server error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + } + } + } + }, "/datasets/{namespace}/{name}/versions/{version}": { "delete": { "tags": [ @@ -2633,6 +2732,20 @@ } } }, + "RestoreTablePayload": { + "type": "object", + "description": "Optional request body for restore table operation", + "required": [ + "location_id" + ], + "properties": { + "location_id": { + "type": "integer", + "format": "int64", + "description": "Location ID of the revision to activate. If omitted, the latest\nrevision is discovered from object storage." + } + } + }, "RestoredTableInfo": { "type": "object", "description": "Information about a restored physical table", diff --git a/tests/src/tests/it_admin_api_datasets_restore.rs b/tests/src/tests/it_admin_api_datasets_restore.rs new file mode 100644 index 000000000..dc99dd974 --- /dev/null +++ b/tests/src/tests/it_admin_api_datasets_restore.rs @@ -0,0 +1,272 @@ +use ampctl::client::{ + self, + datasets::RestoreTableError, + revisions::{RegisterError, RegisterResponse, RestoreError, RestoreResponse}, +}; +use datasets_common::reference::Reference; +use monitoring::logging; + +use crate::testlib::ctx::TestCtxBuilder; + +#[tokio::test] +async fn restore_table_from_uuid_path_succeeds() { + logging::init(); + + //* Given — register and restore a revision for blocks only + let ctx = TestCtx::setup("restore_table_from_uuid_path_succeeds").await; + let reg = ctx + .register_revision("_/eth_rpc@0.0.0", "blocks", "eth_rpc_custom/blocks") + .await + .expect("failed to register blocks revision"); + ctx.restore_revision(reg.location_id) + .await + .expect("failed to restore blocks revision files"); + + //* When — restore the blocks table via the dataset endpoint (UUID heuristic) + let result = ctx.restore_table("blocks", None).await; + + //* Then — blocks queries succeed, logs queries fail + assert!( + result.is_ok(), + "restore_table should succeed, got: {:?}", + result.err() + ); + + let blocks = ctx + .run_query("SELECT block_num FROM eth_rpc.blocks LIMIT 1") + .await; + assert!( + blocks.is_ok(), + "query on blocks should succeed after restore: {:?}", + blocks.err() + ); + + // Query non restored table + let logs = ctx + .run_query("SELECT log_index FROM eth_rpc.logs LIMIT 1") + .await; + assert!( + logs.is_err(), + "query on logs should fail (not restored), but got: {:?}", + logs.ok() + ); +} + +#[tokio::test] +async fn restore_table_with_location_id_succeeds() { + logging::init(); + + //* Given — register and restore a revision for blocks, get its location_id + let ctx = TestCtx::setup_with_custom_snapshots("restore_table_with_location_id_succeeds").await; + let reg = ctx + .register_revision("_/eth_rpc@0.0.0", "blocks", "eth_rpc_custom/blocks") + .await + .expect("failed to register blocks revision"); + ctx.restore_revision(reg.location_id) + .await + .expect("failed to restore blocks revision files"); + + //* When — restore the blocks table using the known location_id + let result = ctx.restore_table("blocks", Some(reg.location_id)).await; + + //* Then — blocks queries succeed, logs queries fail + assert!( + result.is_ok(), + "restore_table with location_id should succeed, got: {:?}", + result.err() + ); + + let blocks = ctx + .run_query("SELECT block_num FROM eth_rpc.blocks LIMIT 1") + .await; + assert!( + blocks.is_ok(), + "query on blocks should succeed after restore: {:?}", + blocks.err() + ); + + // Query non restored table + let logs = ctx + .run_query("SELECT log_index FROM eth_rpc.logs LIMIT 1") + .await; + assert!( + logs.is_err(), + "query on logs should fail (not restored), but got: {:?}", + logs.ok() + ); +} + +#[tokio::test] +async fn restore_table_with_nonexistent_table_returns_404() { + logging::init(); + + //* Given + let ctx = TestCtx::setup("restore_table_with_nonexistent_table_returns_404").await; + + //* When + let resp = ctx.restore_table("nonexistent_table", None).await; + + //* Then + assert!( + resp.is_err(), + "restore with nonexistent table should return error" + ); + let err = resp.expect_err("expected error response"); + match err { + RestoreTableError::TableNotInManifest(api_err) => { + assert_eq!( + api_err.error_code, "TABLE_NOT_IN_MANIFEST", + "Expected TABLE_NOT_IN_MANIFEST error code, got: {}", + api_err.error_code + ); + } + _ => panic!("Expected TableNotInManifest error, got: {:?}", err), + } +} + +#[tokio::test] +async fn restore_table_with_nonexistent_location_id_returns_404() { + logging::init(); + + //* Given + let ctx = TestCtx::setup("restore_table_with_nonexistent_location_id_returns_404").await; + + //* When + let resp = ctx.restore_table("blocks", Some(999999)).await; + + //* Then + assert!( + resp.is_err(), + "restore with nonexistent location_id should return error" + ); + let err = resp.expect_err("expected error response"); + match err { + RestoreTableError::RevisionNotFound(api_err) => { + assert_eq!( + api_err.error_code, "REVISION_NOT_FOUND", + "Expected REVISION_NOT_FOUND error code, got: {}", + api_err.error_code + ); + } + _ => panic!("Expected RevisionNotFound error, got: {:?}", err), + } +} + +#[tokio::test] +async fn restore_table_with_nonexistent_dataset_returns_404() { + logging::init(); + + //* Given + let ctx = TestCtx::setup("restore_table_with_nonexistent_dataset_returns_404").await; + let invalid_ref: Reference = "_/nonexistent@0.0.0".parse().expect("valid reference"); + + //* When + let resp = ctx + .client + .datasets() + .restore_table(&invalid_ref, "blocks", None) + .await; + + //* Then + assert!( + resp.is_err(), + "restore with nonexistent dataset should return error" + ); + let err = resp.expect_err("expected error response"); + match err { + RestoreTableError::DatasetNotFound(api_err) => { + assert_eq!( + api_err.error_code, "DATASET_NOT_FOUND", + "Expected DATASET_NOT_FOUND error code, got: {}", + api_err.error_code + ); + } + _ => panic!("Expected DatasetNotFound error, got: {:?}", err), + } +} + +struct TestCtx { + ctx: crate::testlib::ctx::TestCtx, + client: client::Client, +} + +impl TestCtx { + /// Standard setup: eth_rpc manifest + eth_rpc snapshot. + async fn setup(test_name: &str) -> Self { + let ctx = TestCtxBuilder::new(test_name) + .with_dataset_manifests(["eth_rpc"]) + .with_dataset_snapshots(["eth_rpc"]) + .build() + .await + .expect("failed to build test context"); + + let base_url = ctx + .daemon_controller() + .admin_api_url() + .parse() + .expect("valid admin API URL"); + let client = client::Client::new(base_url); + + Self { ctx, client } + } + + /// Setup with custom snapshots: eth_rpc manifest + eth_rpc_custom snapshot. + async fn setup_with_custom_snapshots(test_name: &str) -> Self { + let ctx = TestCtxBuilder::new(test_name) + .with_dataset_manifests(["eth_rpc"]) + .with_dataset_snapshots(["eth_rpc_custom"]) + .build() + .await + .expect("failed to build test context"); + + let base_url = ctx + .daemon_controller() + .admin_api_url() + .parse() + .expect("valid admin API URL"); + let client = client::Client::new(base_url); + + Self { ctx, client } + } + + /// Restore a single table via the dataset restore_table endpoint. + async fn restore_table( + &self, + table_name: &str, + location_id: Option, + ) -> Result<(), RestoreTableError> { + let dataset_ref: Reference = "_/eth_rpc@0.0.0".parse().expect("valid reference"); + self.client + .datasets() + .restore_table(&dataset_ref, table_name, location_id) + .await + } + + /// Register a table revision via the revisions API. + async fn register_revision( + &self, + dataset: &str, + table_name: &str, + path: &str, + ) -> Result { + self.client + .revisions() + .register(dataset, table_name, path) + .await + } + + /// Restore a revision's files from object storage. + async fn restore_revision(&self, location_id: i64) -> Result { + self.client.revisions().restore(location_id).await + } + + /// Execute a SQL query via the Flight client. + async fn run_query(&self, query: &str) -> Result<(serde_json::Value, usize), anyhow::Error> { + let mut client = self + .ctx + .new_flight_client() + .await + .expect("failed to create flight client"); + client.run_query(query, None).await + } +} diff --git a/tests/src/tests/mod.rs b/tests/src/tests/mod.rs index 07ba79c43..dcf968a96 100644 --- a/tests/src/tests/mod.rs +++ b/tests/src/tests/mod.rs @@ -1,6 +1,7 @@ mod it_admin_api_datasets_list_jobs; mod it_admin_api_datasets_manifest; mod it_admin_api_datasets_register; +mod it_admin_api_datasets_restore; mod it_admin_api_datasets_stop_job; mod it_admin_api_jobs_progress; mod it_admin_api_revisions;