Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions crates/core/metadata-db/src/jobs/job_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ impl<'a> JobDescriptorRaw<'a> {
}
}

impl JobDescriptorRaw<'static> {
/// Consume and return the inner `Box<RawValue>`.
///
/// Only available on owned (`'static`) descriptors where the `Cow` is guaranteed `Owned`.
pub fn into_inner(self) -> Box<RawValue> {
match self.0 {
Cow::Owned(boxed) => boxed,
Cow::Borrowed(_) => unreachable!("'static lifetime guarantees Cow::Owned"),
}
}
}

impl<'a> AsRef<RawValue> for JobDescriptorRaw<'a> {
fn as_ref(&self) -> &RawValue {
&self.0
Expand Down
2 changes: 2 additions & 0 deletions crates/services/admin-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ amp-data-store = { path = "../../core/data-store" }
amp-datasets-registry = { path = "../../core/datasets-registry" }
amp-providers-common = { path = "../../core/providers-common" }
amp-providers-registry = { path = "../../core/providers-registry" }
amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" }
amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" }
async-trait.workspace = true
axum.workspace = true
common = { path = "../../core/common" }
Expand Down
31 changes: 24 additions & 7 deletions crates/services/admin-api/src/handlers/datasets/deploy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::fmt::Debug;

use amp_datasets_registry::error::{ListVersionTagsError, ResolveRevisionError};
use amp_worker_datasets_derived::job_descriptor::JobDescriptor as MaterializeDerivedDatasetJobDescriptor;
use amp_worker_datasets_raw::job_descriptor::JobDescriptor as MaterializeRawDatasetJobDescriptor;
use axum::{
Json,
extract::{
Expand All @@ -11,6 +13,7 @@ use axum::{
};
use common::dataset_store::GetDatasetError;
use datasets_common::{name::Name, namespace::Namespace, reference::Reference, revision::Revision};
use datasets_derived::DerivedDatasetKind;
use monitoring::logging;
use worker::job::JobId;

Expand Down Expand Up @@ -143,16 +146,30 @@ pub async fn handler(
.await
.map_err(Error::GetDataset)?;

// Build the job descriptor based on dataset kind
let job_descriptor = if dataset.kind() == DerivedDatasetKind {
MaterializeDerivedDatasetJobDescriptor {
end_block: end_block.into(),
dataset_namespace: reference.namespace().clone(),
dataset_name: reference.name().clone(),
manifest_hash: reference.hash().clone(),
}
.into()
} else {
MaterializeRawDatasetJobDescriptor {
end_block: end_block.into(),
max_writers: parallelism,
dataset_namespace: reference.namespace().clone(),
dataset_name: reference.name().clone(),
manifest_hash: reference.hash().clone(),
}
.into()
};
Comment on lines +150 to +167
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this logic to a JobDescriptor::materialise_dataset() fn ?

Copy link
Contributor Author

@LNSD LNSD Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this in the handler intentionally.

JobDescriptor is a thin, type-erased bridge. It shouldn't know about MaterializeRawDatasetJobDescriptor or MaterializeDerivedDatasetJobDescriptor internal details. Construction must use the actual typed worker descriptors at the call site where the dataset-kind context lives.

Eventually, these will be extracted directly from the request body, so the handler is the right place for this logic.


// Schedule the extraction job using the scheduler
let job_id = ctx
.scheduler
.schedule_dataset_sync_job(
reference.clone(),
dataset.kind().clone(),
end_block.into(),
parallelism,
worker_id,
)
.schedule_job(reference.clone(), job_descriptor, worker_id)
.await
.map_err(|err| {
tracing::error!(
Expand Down
73 changes: 57 additions & 16 deletions crates/services/admin-api/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

use async_trait::async_trait;
use datasets_common::{
dataset_kind_str::DatasetKindStr, end_block::EndBlock, hash::Hash,
hash_reference::HashReference, name::Name, namespace::Namespace,
hash::Hash, hash_reference::HashReference, name::Name, namespace::Namespace,
};
use metadata_db::workers::Worker;
use serde_json::value::RawValue;
use worker::{
job::{Job, JobId, JobStatus},
node_id::{InvalidIdError, NodeId, validate_node_id},
Expand All @@ -50,13 +50,11 @@ impl<T> Scheduler for T where T: SchedulerJobs + SchedulerWorkers {}
// because const generics make the trait not dyn-compatible.
#[async_trait]
pub trait SchedulerJobs: Send + Sync {
/// Schedule a dataset synchronization job
async fn schedule_dataset_sync_job(
/// Schedule a job with a pre-built descriptor
async fn schedule_job(
&self,
dataset_reference: HashReference,
dataset_kind: DatasetKindStr,
end_block: EndBlock,
max_writers: u16,
job_descriptor: JobDescriptor,
worker_id: Option<NodeSelector>,
) -> Result<JobId, ScheduleJobError>;

Expand Down Expand Up @@ -102,6 +100,58 @@ pub trait SchedulerJobs: Send + Sync {
) -> Result<Vec<Job>, ListJobsByDatasetError>;
}

/// Invariant-preserving bridge between typed worker job descriptors and the scheduler.
///
/// Acts as the admin-api's canonical job descriptor type, decoupling the scheduler trait
/// interface from both the typed worker descriptor crates (`amp-worker-datasets-raw`,
/// `amp-worker-datasets-derived`) and the metadata-db storage type (`JobDescriptorRawOwned`).
///
/// Invariants are established at the point of construction (via the `From` impls from typed
/// worker descriptors) and preserved throughout. All subsequent conversions to/from
/// `JobDescriptorRawOwned` or `JobDescriptorRaw` are infallible.
#[derive(Clone, Debug)]
pub struct JobDescriptor(Box<RawValue>);

impl From<JobDescriptor> for metadata_db::jobs::JobDescriptorRawOwned {
fn from(value: JobDescriptor) -> Self {
// SAFETY: JobDescriptor inner data is valid JSON from trusted sources
metadata_db::jobs::JobDescriptorRaw::from_owned_unchecked(value.0)
}
}

impl<'a> From<&'a JobDescriptor> for metadata_db::jobs::JobDescriptorRaw<'a> {
fn from(value: &'a JobDescriptor) -> Self {
// SAFETY: JobDescriptor inner data is valid JSON from trusted sources
metadata_db::jobs::JobDescriptorRaw::from_ref_unchecked(&value.0)
}
}

impl From<metadata_db::jobs::JobDescriptorRawOwned> for JobDescriptor {
fn from(value: metadata_db::jobs::JobDescriptorRawOwned) -> Self {
Self(value.into_inner())
}
}

impl From<&metadata_db::jobs::JobDescriptorRaw<'_>> for JobDescriptor {
fn from(value: &metadata_db::jobs::JobDescriptorRaw<'_>) -> Self {
Self(value.as_raw().to_owned())
}
}

impl From<amp_worker_datasets_raw::job_descriptor::JobDescriptor> for JobDescriptor {
fn from(desc: amp_worker_datasets_raw::job_descriptor::JobDescriptor) -> Self {
let raw: metadata_db::jobs::JobDescriptorRawOwned = desc.into();
Self(raw.into_inner())
}
}

impl From<amp_worker_datasets_derived::job_descriptor::JobDescriptor> for JobDescriptor {
fn from(desc: amp_worker_datasets_derived::job_descriptor::JobDescriptor) -> Self {
let raw: metadata_db::jobs::JobDescriptorRawOwned = desc.into();
Self(raw.into_inner())
}
}

/// Errors that can occur when scheduling a dataset dump job
#[derive(Debug, thiserror::Error)]
pub enum ScheduleJobError {
Expand Down Expand Up @@ -140,15 +190,6 @@ pub enum ScheduleJobError {
#[error("specified worker '{0}' not found or inactive")]
WorkerNotAvailable(NodeId),

/// Failed to serialize job descriptor to JSON
///
/// This occurs when:
/// - JobDescriptor cannot be serialized to JSON
/// - Invalid UTF-8 characters in job parameters
/// - Serialization buffer overflow
#[error("failed to serialize job descriptor: {0}")]
SerializeJobDescriptor(#[source] serde_json::Error),

/// Failed to register job in the metadata database
///
/// This occurs when:
Expand Down
4 changes: 0 additions & 4 deletions crates/services/controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,15 @@ license-file.workspace = true
admin-api = { path = "../admin-api" }
amp-data-store = { path = "../../core/data-store" }
amp-datasets-registry = { version = "0.1.0", path = "../../core/datasets-registry" }
amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" }
amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" }
amp-providers-registry = { version = "0.1.0", path = "../../core/providers-registry" }
async-trait.workspace = true
axum.workspace = true
common = { path = "../../core/common" }
datasets-common = { version = "0.1.0", path = "../../core/datasets-common" }
datasets-derived = { version = "0.1.0", path = "../../core/datasets-derived" }
metadata-db = { version = "0.1.0", path = "../../core/metadata-db" }
monitoring = { path = "../../core/monitoring" }
opentelemetry-instrumentation-tower = { version = "0.17.0", features = ["axum"] }
rand.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tower-http = { workspace = true, features = ["cors"] }
Expand Down
56 changes: 13 additions & 43 deletions crates/services/controller/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@
use std::time::Duration;

use admin_api::scheduler::{
DeleteJobError, DeleteJobsByStatusError, GetJobError, GetWorkerError, ListJobsByDatasetError,
ListJobsError, ListWorkersError, NodeSelector, ScheduleJobError, SchedulerJobs,
SchedulerWorkers, StopJobError,
DeleteJobError, DeleteJobsByStatusError, GetJobError, GetWorkerError, JobDescriptor,
ListJobsByDatasetError, ListJobsError, ListWorkersError, NodeSelector, ScheduleJobError,
SchedulerJobs, SchedulerWorkers, StopJobError,
};
use async_trait::async_trait;
use datasets_common::{
dataset_kind_str::DatasetKindStr, end_block::EndBlock, hash::Hash,
hash_reference::HashReference, name::Name, namespace::Namespace,
hash::Hash, hash_reference::HashReference, name::Name, namespace::Namespace,
};
use datasets_derived::DerivedDatasetKind;
use metadata_db::{
Error as MetadataDbError, MetadataDb, jobs::JobStatusUpdateError, workers::Worker,
};
Expand Down Expand Up @@ -69,19 +67,18 @@ impl Scheduler {
Self { metadata_db }
}

/// Schedule a dataset synchronization job
/// Schedule a job with a pre-built descriptor
///
/// Checks for existing scheduled or running jobs to avoid duplicates, selects an available
/// worker node (either randomly, by exact worker_id, or by matching a glob pattern) and registers the job in the metadata database.
async fn schedule_dataset_sync_job_impl(
async fn schedule_job_impl(
&self,
end_block: EndBlock,
max_writers: u16,
hash_reference: HashReference,
dataset_kind: DatasetKindStr,
job_descriptor: JobDescriptor,
worker_id: Option<NodeSelector>,
) -> Result<JobId, ScheduleJobError> {
// Avoid re-scheduling jobs in a scheduled or running state.
// TODO: Deduplicate jobs based on an idempotency key (not in the job descriptor)
let existing_jobs =
metadata_db::jobs::get_by_dataset(&self.metadata_db, hash_reference.hash())
.await
Expand Down Expand Up @@ -135,32 +132,13 @@ impl Scheduler {
}
};

let job_desc = if dataset_kind == DerivedDatasetKind {
amp_worker_datasets_derived::job_descriptor::JobDescriptor {
end_block,
dataset_namespace: hash_reference.namespace().clone(),
dataset_name: hash_reference.name().clone(),
manifest_hash: hash_reference.hash().clone(),
}
.into()
} else {
amp_worker_datasets_raw::job_descriptor::JobDescriptor {
end_block,
max_writers,
dataset_namespace: hash_reference.namespace().clone(),
dataset_name: hash_reference.name().clone(),
manifest_hash: hash_reference.hash().clone(),
}
.into()
};

let mut tx = self
.metadata_db
.begin_txn()
.await
.map_err(ScheduleJobError::BeginTransaction)?;

let job_id = metadata_db::jobs::register(&mut tx, &node_id, &job_desc)
let job_id = metadata_db::jobs::register(&mut tx, &node_id, job_descriptor)
.await
.map(Into::into)
.map_err(ScheduleJobError::RegisterJob)?;
Expand Down Expand Up @@ -347,22 +325,14 @@ pub enum RescheduleJobError {

#[async_trait]
impl SchedulerJobs for Scheduler {
async fn schedule_dataset_sync_job(
async fn schedule_job(
&self,
dataset_reference: HashReference,
dataset_kind: DatasetKindStr,
end_block: EndBlock,
max_writers: u16,
job_descriptor: JobDescriptor,
worker_id: Option<NodeSelector>,
) -> Result<JobId, ScheduleJobError> {
self.schedule_dataset_sync_job_impl(
end_block,
max_writers,
dataset_reference,
dataset_kind,
worker_id,
)
.await
self.schedule_job_impl(dataset_reference, job_descriptor, worker_id)
.await
}

async fn stop_job(&self, job_id: JobId) -> Result<(), StopJobError> {
Expand Down
Loading