Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions crates/core/data-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ impl DataStore {
table_name: &TableName,
) -> Result<PhyTableRevision, CreateNewTableRevisionError> {
let revision_id = Uuid::now_v7();
let path = PhyTableRevisionPath::new(dataset.name(), table_name, revision_id);
let path =
PhyTableRevisionPath::new(dataset.namespace(), dataset.name(), table_name, revision_id);
let url = PhyTableUrl::new(self.url(), &path);

let location_id = self
Expand Down Expand Up @@ -255,14 +256,28 @@ impl DataStore {
dataset: &HashReference,
table_name: &TableName,
) -> Result<Option<PhyTableRevision>, RestoreLatestTableRevisionError> {
let table_path = PhyTablePath::new(dataset.name(), table_name);
let table_path = PhyTablePath::new(dataset.namespace(), dataset.name(), table_name);

let Some(path) = self
let path = match self
.find_latest_table_revision_in_object_store(&table_path)
.await
.map_err(RestoreLatestTableRevisionError::FindLatestRevision)?
else {
return Ok(None);
{
Some(path) => path,
None => {
// Check for legacy path (without namespace)
let legacy_path = PhyTablePath::from_legacy(dataset.name(), table_name);

let Some(path) = self
.find_latest_table_revision_in_object_store(&legacy_path)
.await
.map_err(RestoreLatestTableRevisionError::FindLatestRevision)?
else {
return Ok(None);
};

path
}
};

let url = PhyTableUrl::new(self.url(), &path);
Expand Down Expand Up @@ -305,6 +320,35 @@ impl DataStore {
.map_err(ListAllTableRevisionsError)
}

/// Gets a table revision by matching JSONB metadata fields (manifest_hash, table_name).
///
/// Returns the matching revision if one exists, or None if not found.
pub async fn get_table_revision(
&self,
dataset_ref: &HashReference,
table_name: &TableName,
) -> Result<Option<PhyTableRevision>, GetTableRevisionError> {
let Some(row) = metadata_db::physical_table::get_revision(
&self.metadata_db,
dataset_ref.hash(),
table_name,
)
.await
.map_err(GetTableRevisionError)?
else {
return Ok(None);
};

let path: PhyTableRevisionPath = row.path.into();
let url = PhyTableUrl::new(self.url(), &path);

Ok(Some(PhyTableRevision {
location_id: row.id,
path,
url,
}))
}

/// Gets the active revision of a table from the metadata database.
///
/// Returns the active revision info if one exists, or None if not found.
Expand Down Expand Up @@ -836,6 +880,20 @@ pub enum CreateAndActivateTableRevisionError {
#[error("Failed to get revision by location ID from metadata database")]
pub struct GetRevisionByLocationIdError(#[source] metadata_db::Error);

/// Failed to retrieve physical table revision from metadata database
///
/// This error occurs when querying the metadata database for the physical table revision
/// by matching JSONB metadata fields (manifest_hash, table_name).
///
/// Common causes:
/// - Database connection lost during query
/// - Database server unreachable
/// - Network connectivity issues
/// - Invalid manifest hash or table name format
#[derive(Debug, thiserror::Error)]
#[error("Failed to get revision from metadata database")]
pub struct GetTableRevisionError(#[source] metadata_db::Error);

/// Failed to retrieve active physical table revision from metadata database
///
/// This error occurs when querying the metadata database for the currently active
Expand Down
52 changes: 41 additions & 11 deletions crates/core/data-store/src/physical_table.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use datasets_common::{name::Name, table_name::TableName};
use datasets_common::{name::Name, namespace::Namespace, table_name::TableName};
use object_store::path::Path;
use url::Url;
use uuid::Uuid;
Expand All @@ -10,19 +10,20 @@ use uuid::Uuid;
///
/// ## URL Format
///
/// `<store_base_url>/<dataset_name>/<table_name>/<revision_id>/`
/// `<store_base_url>/<namespace>/<dataset_name>/<table_name>/<revision_id>/`
///
/// Where:
/// - `store_base_url`: Object store base URL, may include path prefix after bucket
/// (e.g., `s3://bucket/prefix`, `file:///data/subdir`)
/// - `dataset_name`: Dataset name (without namespace)
/// - `namespace`: Dataset namespace
/// - `dataset_name`: Dataset name
/// - `table_name`: Table name
/// - `revision_id`: Unique identifier for this table revision (typically UUIDv7)
///
/// ## Example
///
/// ```text
/// s3://my-bucket/prefix/ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef/
/// s3://my-bucket/prefix/default/ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef/
/// ```
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PhyTableUrl(Url);
Expand Down Expand Up @@ -75,22 +76,49 @@ impl std::fmt::Display for PhyTableUrl {
/// Path to a table directory in object storage (without revision).
///
/// Represents the parent directory containing all revisions of a table.
/// Format: `<dataset_name>/<table_name>`
/// Format: `<namespace>/<dataset_name>/<table_name>`
///
/// For backward compatibility with revisions stored before the namespace prefix was added,
/// use [`PhyTablePath::from_legacy`] which produces the legacy format: `<dataset_name>/<table_name>`.
///
/// **NOTE**: The underlying [`object_store::Path`] type automatically strips leading and
/// trailing slashes, so the string representation will not contain a trailing slash.
///
/// ## Example
///
/// ```text
/// ethereum_mainnet/logs
/// edgeandnode/ethereum_mainnet/logs
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PhyTablePath(Path);

impl PhyTablePath {
/// Constructs the path to a table directory (without revision).
pub fn new(dataset_name: impl AsRef<Name>, table_name: impl AsRef<TableName>) -> Self {
/// Constructs the path to a table directory including namespace.
///
/// Format: `<namespace>/<dataset_name>/<table_name>`
pub fn new(
namespace: impl AsRef<Namespace>,
dataset_name: impl AsRef<Name>,
table_name: impl AsRef<TableName>,
) -> Self {
Self(
format!(
"{}/{}/{}",
namespace.as_ref(),
dataset_name.as_ref(),
table_name.as_ref()
)
.into(),
)
}

/// Constructs the legacy path to a table directory (without namespace).
///
/// Format: `<dataset_name>/<table_name>`
///
/// This is kept for backward compatibility with revisions stored before
/// the namespace prefix was added to the storage layout.
pub fn from_legacy(dataset_name: impl AsRef<Name>, table_name: impl AsRef<TableName>) -> Self {
Self(format!("{}/{}", dataset_name.as_ref(), table_name.as_ref()).into())
}

Expand Down Expand Up @@ -128,29 +156,31 @@ impl std::ops::Deref for PhyTablePath {
/// Path to a table revision directory in object storage.
///
/// Represents a specific revision of a table, identified by a UUID.
/// Format: `<dataset_name>/<table_name>/<revision_uuid>`
/// Format: `<namespace>/<dataset_name>/<table_name>/<revision_uuid>`
///
/// **NOTE**: The underlying [`object_store::Path`] type automatically strips leading and
/// trailing slashes, so the string representation will not contain a trailing slash.
///
/// ## Example
///
/// ```text
/// ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef
/// edgeandnode/ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PhyTableRevisionPath(Path);

impl PhyTableRevisionPath {
/// Constructs the path to a table revision directory.
pub fn new(
dataset_namespace: impl AsRef<Namespace>,
dataset_name: impl AsRef<Name>,
table_name: impl AsRef<TableName>,
revision_id: impl AsRef<Uuid>,
) -> Self {
Self(
format!(
"{}/{}/{}",
"{}/{}/{}/{}",
dataset_namespace.as_ref(),
dataset_name.as_ref(),
table_name.as_ref(),
revision_id.as_ref()
Expand Down
18 changes: 18 additions & 0 deletions crates/core/metadata-db/src/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,24 @@ where
.map_err(Error::Database)
}

/// Get a physical table revision by matching JSONB metadata fields.
///
/// Queries the `physical_table_revisions` table by `manifest_hash` and `table_name`
/// stored in the JSONB `metadata` column. Returns the most recent matching revision.
#[tracing::instrument(skip(exe), err)]
pub async fn get_revision<'c, E>(
exe: E,
manifest_hash: impl Into<ManifestHash<'_>> + std::fmt::Debug,
table_name: impl Into<TableName<'_>> + std::fmt::Debug,
) -> Result<Option<PhysicalTableRevision>, Error>
where
E: Executor<'c>,
{
sql::get_revision(exe, manifest_hash.into(), table_name.into())
.await
.map_err(Error::Database)
}

/// Mark all active locations for a table as inactive
///
/// This is typically used before marking a new location as active, ensuring
Expand Down
35 changes: 35 additions & 0 deletions crates/core/metadata-db/src/physical_table/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,41 @@ where
.await
}

/// Get a physical table revision by JSONB metadata fields (manifest_hash, table_name).
///
/// Returns the most recent matching revision, or `None` if no match exists.
pub async fn get_revision<'c, E>(
exe: E,
manifest_hash: ManifestHash<'_>,
table_name: Name<'_>,
) -> Result<Option<PhysicalTableRevision>, sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
let query = indoc::indoc! {"
SELECT
ptr.id,
ptr.path,
EXISTS (
SELECT 1
FROM physical_tables
WHERE active_revision_id = ptr.id
) AS active,
ptr.writer,
ptr.metadata
FROM physical_table_revisions ptr
WHERE ptr.metadata->>'manifest_hash' = $1 AND ptr.metadata->>'table_name' = $2
ORDER BY ptr.id DESC
LIMIT 1
"};

sqlx::query_as(query)
.bind(manifest_hash)
.bind(table_name)
.fetch_optional(exe)
.await
}

/// Deactivate all revisions for a specific table (set active_revision_id to NULL)
pub async fn mark_inactive_by_table_name<'c, E>(
exe: E,
Expand Down
Loading