Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 78 additions & 25 deletions crates/bin/ampctl/src/cmd/dataset/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
//! 2. POSTing to admin API `/datasets/{namespace}/{name}/versions/{version}/restore` endpoint
//! 3. Returning information about the restored tables
//!
//! Optionally targets a single table with `--table` and can activate a specific revision
//! with `--location-id`.
//!
//! # Dataset Reference Format
//!
//! `namespace/name@version` (e.g., `graph/eth_mainnet@1.0.0`)
Expand All @@ -29,9 +32,17 @@ pub struct Args {
/// Examples: my_namespace/my_dataset@1.0.0, my_namespace/my_dataset@latest
#[arg(value_name = "REFERENCE", required = true, value_parser = clap::value_parser!(Reference))]
pub dataset_ref: Reference,

/// Restore a specific table only (instead of all tables)
#[arg(long, value_name = "TABLE")]
pub table: Option<String>,

/// Activate a specific location ID for the table (requires --table)
#[arg(long, value_name = "ID", requires = "table")]
pub location_id: Option<i64>,
}

/// Result of a dataset restore operation.
/// Result of a dataset restore operation (all tables).
#[derive(serde::Serialize)]
struct RestoreResult {
tables: Vec<RestoredTableInfo>,
Expand Down Expand Up @@ -66,9 +77,39 @@ impl std::fmt::Display for RestoreResult {
}
}

/// Result of a single table restore operation.
#[derive(serde::Serialize)]
struct RestoreTableResult {
table: String,
#[serde(skip_serializing_if = "Option::is_none")]
location_id: Option<i64>,
}

impl std::fmt::Display for RestoreTableResult {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(
f,
"{} Table '{}' restored successfully",
console::style("✓").green().bold(),
console::style(&self.table).yellow(),
)?;
if let Some(id) = self.location_id {
writeln!(
f,
"{} Activated location_id: {}",
console::style("→").cyan(),
id
)?;
}
Ok(())
}
}

/// Restore dataset physical tables from object storage.
///
/// Re-indexes physical table metadata from storage into the database.
/// Optionally targets a single table with `--table`, and can activate
/// a specific revision with `--location-id`.
///
/// # Errors
///
Expand All @@ -78,47 +119,59 @@ pub async fn run(
Args {
global,
dataset_ref,
table,
location_id,
}: Args,
) -> Result<(), Error> {
tracing::debug!(%dataset_ref, "Restoring dataset");
let client = global.build_client().map_err(Error::ClientBuild)?;

let tables = restore_dataset(&global, &dataset_ref).await?;
let result = RestoreResult { tables };
global.print(&result).map_err(Error::JsonSerialization)?;
if let Some(table_name) = table {
tracing::debug!(%dataset_ref, %table_name, ?location_id, "restoring single table");

client
.datasets()
.restore_table(&dataset_ref, &table_name, location_id)
.await
.map_err(Error::RestoreTable)?;

let result = RestoreTableResult {
table: table_name,
location_id,
};
global.print(&result).map_err(Error::JsonSerialization)?;
} else {
tracing::debug!(%dataset_ref, "restoring dataset");

let response = client
.datasets()
.restore(&dataset_ref)
.await
.map_err(Error::Restore)?;

let result = RestoreResult {
tables: response.tables,
};
global.print(&result).map_err(Error::JsonSerialization)?;
}

Ok(())
}

/// Restore dataset tables via the admin API.
///
/// POSTs to the `/datasets/{namespace}/{name}/versions/{version}/restore` endpoint
/// and returns the list of restored tables.
#[tracing::instrument(skip_all, fields(%dataset_ref))]
async fn restore_dataset(
global: &GlobalArgs,
dataset_ref: &Reference,
) -> Result<Vec<RestoredTableInfo>, Error> {
let client = global.build_client().map_err(Error::ClientBuild)?;
let response = client
.datasets()
.restore(dataset_ref)
.await
.map_err(Error::Restore)?;

Ok(response.tables)
}

/// Errors for dataset restore operations.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Failed to build client
#[error("failed to build admin API client")]
ClientBuild(#[source] crate::args::BuildClientError),

/// Restore error from the client
/// Restore error from the client (all tables)
#[error("restore failed")]
Restore(#[source] crate::client::datasets::RestoreError),

/// Restore table error from the client (single table)
#[error("restore table failed")]
RestoreTable(#[source] crate::client::datasets::RestoreTableError),

/// Failed to serialize result to JSON
#[error("failed to serialize result to JSON")]
JsonSerialization(#[source] serde_json::Error),
Expand Down
21 changes: 20 additions & 1 deletion crates/bin/ampctl/src/cmd/dataset/restore__after_help.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
## Examples

Restore dataset physical tables from storage:
Restore all dataset physical tables from storage:

```
ampctl dataset restore my_namespace/my_dataset@1.0.0
```

Restore a single table from storage (UUID heuristic):

```
ampctl dataset restore my_namespace/my_dataset@1.0.0 --table logs
```

Activate a specific revision for a table:

```
ampctl dataset restore my_namespace/my_dataset@1.0.0 --table logs --location-id 42
```

Restore latest version of a dataset:

```
Expand Down Expand Up @@ -40,6 +52,13 @@ ampctl dataset restore production/eth_mainnet@2.1.0
ampctl dataset restore analytics/uniswap_v3@1.0.0
```

**Activating a specific revision for a table:**

```
# Switch a table to a known good revision
ampctl dataset restore production/eth_mainnet@2.1.0 --table blocks --location-id 123
```

**Re-syncing metadata after storage restoration:**

```
Expand Down
Loading
Loading