From ae512ec15a6b74f3d491c13a751d1910d0263408 Mon Sep 17 00:00:00 2001 From: Shiyas Mohammed Date: Wed, 18 Feb 2026 14:56:34 +0530 Subject: [PATCH] feat(data-store): add namespace prefix to object storage paths --- crates/core/data-store/src/lib.rs | 68 ++++++- crates/core/data-store/src/physical_table.rs | 52 ++++-- crates/core/metadata-db/src/physical_table.rs | 18 ++ .../metadata-db/src/physical_table/sql.rs | 35 ++++ .../src/handlers/datasets/restore.rs | 111 ++++++++++-- docs/openapi-specs/admin.spec.json | 2 +- tests/src/tests/it_admin_api_restore.rs | 171 ++++++++++++++++++ tests/src/tests/mod.rs | 1 + 8 files changed, 425 insertions(+), 33 deletions(-) create mode 100644 tests/src/tests/it_admin_api_restore.rs diff --git a/crates/core/data-store/src/lib.rs b/crates/core/data-store/src/lib.rs index 448ab1cac..447ef0c6c 100644 --- a/crates/core/data-store/src/lib.rs +++ b/crates/core/data-store/src/lib.rs @@ -188,7 +188,8 @@ impl DataStore { table_name: &TableName, ) -> Result { let revision_id = Uuid::now_v7(); - let path = PhyTableRevisionPath::new(dataset.name(), table_name, revision_id); + let path = + PhyTableRevisionPath::new(dataset.namespace(), dataset.name(), table_name, revision_id); let url = PhyTableUrl::new(self.url(), &path); let location_id = self @@ -255,14 +256,28 @@ impl DataStore { dataset: &HashReference, table_name: &TableName, ) -> Result, RestoreLatestTableRevisionError> { - let table_path = PhyTablePath::new(dataset.name(), table_name); + let table_path = PhyTablePath::new(dataset.namespace(), dataset.name(), table_name); - let Some(path) = self + let path = match self .find_latest_table_revision_in_object_store(&table_path) .await .map_err(RestoreLatestTableRevisionError::FindLatestRevision)? - else { - return Ok(None); + { + Some(path) => path, + None => { + // Check for legacy path (without namespace) + let legacy_path = PhyTablePath::from_legacy(dataset.name(), table_name); + + let Some(path) = self + .find_latest_table_revision_in_object_store(&legacy_path) + .await + .map_err(RestoreLatestTableRevisionError::FindLatestRevision)? + else { + return Ok(None); + }; + + path + } }; let url = PhyTableUrl::new(self.url(), &path); @@ -305,6 +320,35 @@ impl DataStore { .map_err(ListAllTableRevisionsError) } + /// Gets a table revision by matching JSONB metadata fields (manifest_hash, table_name). + /// + /// Returns the matching revision if one exists, or None if not found. + pub async fn get_table_revision( + &self, + dataset_ref: &HashReference, + table_name: &TableName, + ) -> Result, GetTableRevisionError> { + let Some(row) = metadata_db::physical_table::get_revision( + &self.metadata_db, + dataset_ref.hash(), + table_name, + ) + .await + .map_err(GetTableRevisionError)? + else { + return Ok(None); + }; + + let path: PhyTableRevisionPath = row.path.into(); + let url = PhyTableUrl::new(self.url(), &path); + + Ok(Some(PhyTableRevision { + location_id: row.id, + path, + url, + })) + } + /// Gets the active revision of a table from the metadata database. /// /// Returns the active revision info if one exists, or None if not found. @@ -836,6 +880,20 @@ pub enum CreateAndActivateTableRevisionError { #[error("Failed to get revision by location ID from metadata database")] pub struct GetRevisionByLocationIdError(#[source] metadata_db::Error); +/// Failed to retrieve physical table revision from metadata database +/// +/// This error occurs when querying the metadata database for the physical table revision +/// by matching JSONB metadata fields (manifest_hash, table_name). +/// +/// Common causes: +/// - Database connection lost during query +/// - Database server unreachable +/// - Network connectivity issues +/// - Invalid manifest hash or table name format +#[derive(Debug, thiserror::Error)] +#[error("Failed to get revision from metadata database")] +pub struct GetTableRevisionError(#[source] metadata_db::Error); + /// Failed to retrieve active physical table revision from metadata database /// /// This error occurs when querying the metadata database for the currently active diff --git a/crates/core/data-store/src/physical_table.rs b/crates/core/data-store/src/physical_table.rs index fb1bf464a..7908ac25a 100644 --- a/crates/core/data-store/src/physical_table.rs +++ b/crates/core/data-store/src/physical_table.rs @@ -1,4 +1,4 @@ -use datasets_common::{name::Name, table_name::TableName}; +use datasets_common::{name::Name, namespace::Namespace, table_name::TableName}; use object_store::path::Path; use url::Url; use uuid::Uuid; @@ -10,19 +10,20 @@ use uuid::Uuid; /// /// ## URL Format /// -/// `////` +/// `/////` /// /// Where: /// - `store_base_url`: Object store base URL, may include path prefix after bucket /// (e.g., `s3://bucket/prefix`, `file:///data/subdir`) -/// - `dataset_name`: Dataset name (without namespace) +/// - `namespace`: Dataset namespace +/// - `dataset_name`: Dataset name /// - `table_name`: Table name /// - `revision_id`: Unique identifier for this table revision (typically UUIDv7) /// /// ## Example /// /// ```text -/// s3://my-bucket/prefix/ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef/ +/// s3://my-bucket/prefix/default/ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef/ /// ``` #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PhyTableUrl(Url); @@ -75,7 +76,10 @@ impl std::fmt::Display for PhyTableUrl { /// Path to a table directory in object storage (without revision). /// /// Represents the parent directory containing all revisions of a table. -/// Format: `/` +/// Format: `//` +/// +/// For backward compatibility with revisions stored before the namespace prefix was added, +/// use [`PhyTablePath::from_legacy`] which produces the legacy format: `/`. /// /// **NOTE**: The underlying [`object_store::Path`] type automatically strips leading and /// trailing slashes, so the string representation will not contain a trailing slash. @@ -83,14 +87,38 @@ impl std::fmt::Display for PhyTableUrl { /// ## Example /// /// ```text -/// ethereum_mainnet/logs +/// edgeandnode/ethereum_mainnet/logs /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct PhyTablePath(Path); impl PhyTablePath { - /// Constructs the path to a table directory (without revision). - pub fn new(dataset_name: impl AsRef, table_name: impl AsRef) -> Self { + /// Constructs the path to a table directory including namespace. + /// + /// Format: `//` + pub fn new( + namespace: impl AsRef, + dataset_name: impl AsRef, + table_name: impl AsRef, + ) -> Self { + Self( + format!( + "{}/{}/{}", + namespace.as_ref(), + dataset_name.as_ref(), + table_name.as_ref() + ) + .into(), + ) + } + + /// Constructs the legacy path to a table directory (without namespace). + /// + /// Format: `/` + /// + /// This is kept for backward compatibility with revisions stored before + /// the namespace prefix was added to the storage layout. + pub fn from_legacy(dataset_name: impl AsRef, table_name: impl AsRef) -> Self { Self(format!("{}/{}", dataset_name.as_ref(), table_name.as_ref()).into()) } @@ -128,7 +156,7 @@ impl std::ops::Deref for PhyTablePath { /// Path to a table revision directory in object storage. /// /// Represents a specific revision of a table, identified by a UUID. -/// Format: `//` +/// Format: `///` /// /// **NOTE**: The underlying [`object_store::Path`] type automatically strips leading and /// trailing slashes, so the string representation will not contain a trailing slash. @@ -136,7 +164,7 @@ impl std::ops::Deref for PhyTablePath { /// ## Example /// /// ```text -/// ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef +/// edgeandnode/ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct PhyTableRevisionPath(Path); @@ -144,13 +172,15 @@ pub struct PhyTableRevisionPath(Path); impl PhyTableRevisionPath { /// Constructs the path to a table revision directory. pub fn new( + dataset_namespace: impl AsRef, dataset_name: impl AsRef, table_name: impl AsRef, revision_id: impl AsRef, ) -> Self { Self( format!( - "{}/{}/{}", + "{}/{}/{}/{}", + dataset_namespace.as_ref(), dataset_name.as_ref(), table_name.as_ref(), revision_id.as_ref() diff --git a/crates/core/metadata-db/src/physical_table.rs b/crates/core/metadata-db/src/physical_table.rs index 6e8e82ee4..56f6e50ba 100644 --- a/crates/core/metadata-db/src/physical_table.rs +++ b/crates/core/metadata-db/src/physical_table.rs @@ -177,6 +177,24 @@ where .map_err(Error::Database) } +/// Get a physical table revision by matching JSONB metadata fields. +/// +/// Queries the `physical_table_revisions` table by `manifest_hash` and `table_name` +/// stored in the JSONB `metadata` column. Returns the most recent matching revision. +#[tracing::instrument(skip(exe), err)] +pub async fn get_revision<'c, E>( + exe: E, + manifest_hash: impl Into> + std::fmt::Debug, + table_name: impl Into> + std::fmt::Debug, +) -> Result, Error> +where + E: Executor<'c>, +{ + sql::get_revision(exe, manifest_hash.into(), table_name.into()) + .await + .map_err(Error::Database) +} + /// Mark all active locations for a table as inactive /// /// This is typically used before marking a new location as active, ensuring diff --git a/crates/core/metadata-db/src/physical_table/sql.rs b/crates/core/metadata-db/src/physical_table/sql.rs index 875c33fba..db6c69fd6 100644 --- a/crates/core/metadata-db/src/physical_table/sql.rs +++ b/crates/core/metadata-db/src/physical_table/sql.rs @@ -301,6 +301,41 @@ where .await } +/// Get a physical table revision by JSONB metadata fields (manifest_hash, table_name). +/// +/// Returns the most recent matching revision, or `None` if no match exists. +pub async fn get_revision<'c, E>( + exe: E, + manifest_hash: ManifestHash<'_>, + table_name: Name<'_>, +) -> Result, sqlx::Error> +where + E: Executor<'c, Database = Postgres>, +{ + let query = indoc::indoc! {" + SELECT + ptr.id, + ptr.path, + EXISTS ( + SELECT 1 + FROM physical_tables + WHERE active_revision_id = ptr.id + ) AS active, + ptr.writer, + ptr.metadata + FROM physical_table_revisions ptr + WHERE ptr.metadata->>'manifest_hash' = $1 AND ptr.metadata->>'table_name' = $2 + ORDER BY ptr.id DESC + LIMIT 1 + "}; + + sqlx::query_as(query) + .bind(manifest_hash) + .bind(table_name) + .fetch_optional(exe) + .await +} + /// Deactivate all revisions for a specific table (set active_revision_id to NULL) pub async fn mark_inactive_by_table_name<'c, E>( exe: E, diff --git a/crates/services/admin-api/src/handlers/datasets/restore.rs b/crates/services/admin-api/src/handlers/datasets/restore.rs index c00192def..efc05f481 100644 --- a/crates/services/admin-api/src/handlers/datasets/restore.rs +++ b/crates/services/admin-api/src/handlers/datasets/restore.rs @@ -44,9 +44,14 @@ use crate::{ /// ## Error Codes /// - `INVALID_PATH`: Invalid path parameters (namespace, name, or revision) /// - `DATASET_NOT_FOUND`: The specified dataset or revision does not exist +/// - `RESOLVE_REVISION_ERROR`: Failed to resolve revision /// - `GET_DATASET_ERROR`: Failed to load dataset from store -/// - `RESTORE_TABLE_ERROR`: Failed to restore a table from storage +/// - `GET_REVISION_ERROR`: Failed to get revision of table +/// - `CREATE_AND_ACTIVATE_REVISION_ERROR`: Failed to create and activate table revision +/// - `RESTORE_TABLE_REVISION_ERROR`: Failed to restore a table revision from storage +/// - `REGISTER_FILES_ERROR`: Failed to register files for table /// - `TABLE_NOT_FOUND`: Table data not found in object storage +/// - `TASK_JOIN_ERROR`: Internal task join failure /// /// ## Behavior /// This endpoint restores dataset physical tables from object storage: @@ -82,8 +87,8 @@ use crate::{ ) )] pub async fn handler( - path: Result, PathRejection>, State(ctx): State, + path: Result, PathRejection>, ) -> Result<(StatusCode, Json), ErrorResponse> { let reference = match path { Ok(Path((namespace, name, revision))) => Reference::new(namespace, name, revision), @@ -134,28 +139,64 @@ pub async fn handler( let task = tokio::spawn(async move { let sql_table_ref_schema = dataset_ref.to_reference().to_string(); - // Restore latest revision from object storage - let info = data_store - .restore_latest_table_revision(&dataset_ref, table_def.name()) + let revision = data_store + .get_table_revision(&dataset_ref, table_def.name()) .await .map_err(|err| { tracing::error!( + table = %table_name, error = %err, error_source = logging::error_source(&err), - table = %table_name, - "failed to restore table revision from storage" + "failed to get active revision of table" ); - Error::RestoreTableRevision { + + Error::GetRevision { table: table_name.clone(), source: err, } - })? - .ok_or_else(|| Error::TableNotFound { - table: table_name.clone(), })?; + let revision = if let Some(revision) = revision { + // Register physical table for existing revision + data_store + .create_and_activate_table_revision(&dataset_ref, &table_name, &revision.path) + .await + .map_err(|err| { + tracing::error!( + table = %table_name, + error = %err, + error_source = logging::error_source(&err), + "failed to create and activate table revision" + ); + Error::CreateAndActivateTableRevision { + table: table_name.clone(), + source: err, + } + })?; + revision + } else { + data_store + .restore_latest_table_revision(&dataset_ref, table_def.name()) + .await + .map_err(|err| { + tracing::error!( + table = %table_name, + error = %err, + error_source = logging::error_source(&err), + "failed to restore table revision from storage" + ); + Error::RestoreTableRevision { + table: table_name.clone(), + source: err, + } + })? + .ok_or_else(|| Error::TableNotFound { + table: table_name.clone(), + })? + }; + // Register all files in the revision - register_revision_files(&data_store, &info) + register_revision_files(&data_store, &revision) .await .map_err(|err| { tracing::error!( @@ -176,7 +217,7 @@ pub async fn handler( dataset_ref.clone(), start_block, table_def.clone(), - info, + revision, sql_table_ref_schema, ); @@ -253,7 +294,7 @@ pub enum Error { /// This occurs when: /// - The namespace, name, or revision in the URL path is invalid /// - Path parameter parsing fails - #[error("Invalid path parameters: {0}")] + #[error("Invalid path parameters")] InvalidPath(#[source] PathRejection), /// Dataset or revision not found @@ -275,7 +316,7 @@ pub enum Error { /// - Failed to resolve revision to manifest hash /// - Database connection issues /// - Internal database errors - #[error("Failed to resolve revision: {0}")] + #[error("Failed to resolve revision")] ResolveRevision(#[source] ResolveRevisionError), /// Dataset store operation error when loading dataset @@ -284,9 +325,43 @@ pub enum Error { /// - Failed to load dataset configuration from manifest /// - Manifest parsing errors /// - Invalid dataset structure - #[error("Failed to load dataset: {0}")] + #[error("Failed to load dataset")] GetDataset(#[source] common::dataset_store::GetDatasetError), + /// Failed to get revision of table + /// + /// This occurs when querying the metadata database for an existing revision + /// by matching JSONB metadata fields (manifest_hash, table_name). + /// + /// Common causes: + /// - Database connection lost during query + /// - Database server unreachable + /// - Network connectivity issues + #[error("Failed to get revision of table '{table}'")] + GetRevision { + /// Name of the table whose revision lookup failed + table: TableName, + /// The underlying data store error + #[source] + source: amp_data_store::GetTableRevisionError, + }, + + /// Failed to create and activate table revision + /// + /// This occurs when: + /// - Transaction failure during table revision creation + /// - Failed to register physical table in metadata database + /// - Failed to mark existing revisions as inactive + /// - Failed to mark new revision as active + #[error("Failed to create and activate table revision for '{table}'")] + CreateAndActivateTableRevision { + /// Name of the table whose revision activation failed + table: TableName, + /// The underlying data store error + #[source] + source: amp_data_store::CreateAndActivateTableRevisionError, + }, + /// Failed to restore table revision from storage /// /// This occurs when: @@ -339,6 +414,8 @@ impl IntoErrorResponse for Error { Error::NotFound { .. } => "DATASET_NOT_FOUND", Error::ResolveRevision(_) => "RESOLVE_REVISION_ERROR", Error::GetDataset(_) => "GET_DATASET_ERROR", + Error::GetRevision { .. } => "GET_REVISION_ERROR", + Error::CreateAndActivateTableRevision { .. } => "CREATE_AND_ACTIVATE_REVISION_ERROR", Error::RestoreTableRevision { .. } => "RESTORE_TABLE_REVISION_ERROR", Error::RegisterFiles { .. } => "REGISTER_FILES_ERROR", Error::TableNotFound { .. } => "TABLE_NOT_FOUND", @@ -352,6 +429,8 @@ impl IntoErrorResponse for Error { Error::NotFound { .. } => StatusCode::NOT_FOUND, Error::ResolveRevision(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::GetDataset(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::GetRevision { .. } => StatusCode::INTERNAL_SERVER_ERROR, + Error::CreateAndActivateTableRevision { .. } => StatusCode::INTERNAL_SERVER_ERROR, Error::RestoreTableRevision { .. } => StatusCode::INTERNAL_SERVER_ERROR, Error::RegisterFiles { .. } => StatusCode::INTERNAL_SERVER_ERROR, Error::TableNotFound { .. } => StatusCode::NOT_FOUND, diff --git a/docs/openapi-specs/admin.spec.json b/docs/openapi-specs/admin.spec.json index 8ce12f2f6..20f206839 100644 --- a/docs/openapi-specs/admin.spec.json +++ b/docs/openapi-specs/admin.spec.json @@ -531,7 +531,7 @@ "datasets" ], "summary": "Handler for the `POST /datasets/{namespace}/{name}/versions/{revision}/restore` endpoint", - "description": "Restores physical table locations from object storage into the metadata database.\n\n## Path Parameters\n- `namespace`: Dataset namespace\n- `name`: Dataset name\n- `revision`: Revision (version, hash, latest, or dev)\n\n## Response\n- **202 Accepted**: Physical tables successfully restored from storage\n- **400 Bad Request**: Invalid path parameters\n- **404 Not Found**: Dataset or revision not found, or no tables found in storage\n- **500 Internal Server Error**: Database or storage error\n\n## Error Codes\n- `INVALID_PATH`: Invalid path parameters (namespace, name, or revision)\n- `DATASET_NOT_FOUND`: The specified dataset or revision does not exist\n- `GET_DATASET_ERROR`: Failed to load dataset from store\n- `RESTORE_TABLE_ERROR`: Failed to restore a table from storage\n- `TABLE_NOT_FOUND`: Table data not found in object storage\n\n## Behavior\nThis endpoint restores dataset physical tables from object storage:\n1. Resolves the revision to find the corresponding dataset\n2. Scans object storage for existing physical table files\n3. Re-indexes all Parquet file metadata from storage\n4. Registers the physical table locations in the metadata database\n5. Marks the restored locations as active\n\nThis is useful for:\n- Recovering from metadata database loss\n- Setting up a new system with pre-existing data\n- Re-syncing metadata after storage restoration", + "description": "Restores physical table locations from object storage into the metadata database.\n\n## Path Parameters\n- `namespace`: Dataset namespace\n- `name`: Dataset name\n- `revision`: Revision (version, hash, latest, or dev)\n\n## Response\n- **202 Accepted**: Physical tables successfully restored from storage\n- **400 Bad Request**: Invalid path parameters\n- **404 Not Found**: Dataset or revision not found, or no tables found in storage\n- **500 Internal Server Error**: Database or storage error\n\n## Error Codes\n- `INVALID_PATH`: Invalid path parameters (namespace, name, or revision)\n- `DATASET_NOT_FOUND`: The specified dataset or revision does not exist\n- `RESOLVE_REVISION_ERROR`: Failed to resolve revision\n- `GET_DATASET_ERROR`: Failed to load dataset from store\n- `GET_REVISION_ERROR`: Failed to get revision of table\n- `CREATE_AND_ACTIVATE_REVISION_ERROR`: Failed to create and activate table revision\n- `RESTORE_TABLE_REVISION_ERROR`: Failed to restore a table revision from storage\n- `REGISTER_FILES_ERROR`: Failed to register files for table\n- `TABLE_NOT_FOUND`: Table data not found in object storage\n- `TASK_JOIN_ERROR`: Internal task join failure\n\n## Behavior\nThis endpoint restores dataset physical tables from object storage:\n1. Resolves the revision to find the corresponding dataset\n2. Scans object storage for existing physical table files\n3. Re-indexes all Parquet file metadata from storage\n4. Registers the physical table locations in the metadata database\n5. Marks the restored locations as active\n\nThis is useful for:\n- Recovering from metadata database loss\n- Setting up a new system with pre-existing data\n- Re-syncing metadata after storage restoration", "operationId": "restore_dataset", "parameters": [ { diff --git a/tests/src/tests/it_admin_api_restore.rs b/tests/src/tests/it_admin_api_restore.rs new file mode 100644 index 000000000..ed713b1be --- /dev/null +++ b/tests/src/tests/it_admin_api_restore.rs @@ -0,0 +1,171 @@ +use datasets_common::reference::Reference; +use monitoring::logging; + +use crate::testlib::{self, ctx::TestCtxBuilder, fixtures::Ampctl}; + +/// Test restoring a dataset from legacy storage paths (without namespace prefix). +/// +/// The `eth_rpc` snapshot is stored at `eth_rpc/blocks/UUID/file.parquet` (no namespace). +/// The restore handler first tries the namespace-prefixed path, falls back to the legacy path. +#[tokio::test] +async fn restore_dataset_from_legacy_path_succeeds() { + logging::init(); + + //* Given + let ctx = TestCtx::setup("restore_dataset_from_legacy_path_succeeds").await; + + //* When + let restored_tables = ctx.restore_dataset().await; + + //* Then + assert_eq!( + restored_tables.len(), + 3, + "should restore 3 tables (blocks, logs, transactions)" + ); + + let table_names: Vec<&str> = restored_tables + .iter() + .map(|t| t.table_name.as_str()) + .collect(); + assert!( + table_names.contains(&"blocks"), + "should contain blocks table" + ); + assert!(table_names.contains(&"logs"), "should contain logs table"); + assert!( + table_names.contains(&"transactions"), + "should contain transactions table" + ); + + for table in &restored_tables { + assert!(table.location_id > 0, "location_id should be positive"); + assert!(!table.url.is_empty(), "url should not be empty"); + } + + // Verify tables are queryable + let (_, row_count) = ctx + .run_query("SELECT block_num FROM eth_rpc.blocks LIMIT 1") + .await + .expect("blocks table should be queryable after restore"); + assert!(row_count > 0, "blocks table should have data"); +} + +/// Test restoring a dataset from custom registered revision paths. +/// +/// Uses `eth_rpc_custom` snapshot with pre-registered revision paths. +/// The restore handler finds existing revisions via `get_table_revision` and re-activates them. +#[tokio::test] +async fn restore_dataset_from_custom_path_succeeds() { + logging::init(); + + //* Given — register revisions at custom paths + let ctx = + TestCtx::setup_with_custom_snapshot("restore_dataset_from_custom_path_succeeds").await; + + let tables = ["blocks", "logs", "transactions"]; + for table_name in &tables { + ctx.ampctl + .revisions() + .register( + "_/eth_rpc@0.0.0", + table_name, + &format!("eth_rpc_custom/{}", table_name), + ) + .await + .unwrap_or_else(|err| { + panic!("failed to register revision for {}: {}", table_name, err) + }); + } + + //* When + let restored_tables = ctx.restore_dataset().await; + + //* Then + assert_eq!( + restored_tables.len(), + 3, + "should restore 3 tables (blocks, logs, transactions)" + ); + + let table_names: Vec<&str> = restored_tables + .iter() + .map(|t| t.table_name.as_str()) + .collect(); + assert!( + table_names.contains(&"blocks"), + "should contain blocks table" + ); + assert!(table_names.contains(&"logs"), "should contain logs table"); + assert!( + table_names.contains(&"transactions"), + "should contain transactions table" + ); + + for table in &restored_tables { + assert!(table.location_id > 0, "location_id should be positive"); + assert!(!table.url.is_empty(), "url should not be empty"); + } + + // Verify tables are queryable + let (_, row_count) = ctx + .run_query("SELECT block_num FROM eth_rpc.blocks LIMIT 1") + .await + .expect("blocks table should be queryable after restore"); + assert!(row_count > 0, "blocks table should have data"); +} + +struct TestCtx { + ctx: testlib::ctx::TestCtx, + ampctl: Ampctl, +} + +impl TestCtx { + /// Setup for legacy path test: manifest + legacy snapshot (no namespace prefix). + async fn setup(test_name: &str) -> Self { + let ctx = TestCtxBuilder::new(test_name) + .with_dataset_manifests(["eth_rpc"]) + .with_dataset_snapshots(["eth_rpc"]) + .build() + .await + .expect("failed to build test context"); + + let ampctl = ctx.new_ampctl(); + + Self { ctx, ampctl } + } + + /// Setup for custom path test: manifest + custom snapshot. + async fn setup_with_custom_snapshot(test_name: &str) -> Self { + let ctx = TestCtxBuilder::new(test_name) + .with_dataset_manifests(["eth_rpc"]) + .with_dataset_snapshots(["eth_rpc_custom"]) + .build() + .await + .expect("failed to build test context"); + + let ampctl = ctx.new_ampctl(); + + Self { ctx, ampctl } + } + + fn dataset_ref(&self) -> Reference { + "_/eth_rpc@0.0.0".parse().expect("valid reference") + } + + async fn restore_dataset(&self) -> Vec { + self.ampctl + .restore_dataset(&self.dataset_ref()) + .await + .expect("failed to restore dataset") + } + + async fn run_query(&self, query: &str) -> Result<(serde_json::Value, usize), anyhow::Error> { + let mut client = self + .ctx + .new_flight_client() + .await + .expect("failed to create flight client"); + client.run_query(query, None).await + } +} diff --git a/tests/src/tests/mod.rs b/tests/src/tests/mod.rs index 58fa9921f..e7ca8cc1a 100644 --- a/tests/src/tests/mod.rs +++ b/tests/src/tests/mod.rs @@ -3,6 +3,7 @@ mod it_admin_api_datasets_manifest; mod it_admin_api_datasets_register; mod it_admin_api_datasets_stop_job; mod it_admin_api_jobs_progress; +mod it_admin_api_restore; mod it_admin_api_revisions; mod it_admin_api_schema; mod it_ampctl_gen_manifest;