diff --git a/Cargo.lock b/Cargo.lock index 417db829c..65cadb36e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,8 @@ dependencies = [ "amp-datasets-registry", "amp-providers-common", "amp-providers-registry", + "amp-worker-datasets-derived", + "amp-worker-datasets-raw", "async-trait", "axum", "common", @@ -3127,18 +3129,14 @@ dependencies = [ "amp-data-store", "amp-datasets-registry", "amp-providers-registry", - "amp-worker-datasets-derived", - "amp-worker-datasets-raw", "async-trait", "axum", "common", "datasets-common", - "datasets-derived", "metadata-db", "monitoring", "opentelemetry-instrumentation-tower", "rand 0.9.2", - "serde_json", "thiserror 2.0.18", "tokio", "tower-http", diff --git a/crates/core/metadata-db/src/jobs/job_descriptor.rs b/crates/core/metadata-db/src/jobs/job_descriptor.rs index fe8474e2e..bfaec3426 100644 --- a/crates/core/metadata-db/src/jobs/job_descriptor.rs +++ b/crates/core/metadata-db/src/jobs/job_descriptor.rs @@ -54,6 +54,18 @@ impl<'a> JobDescriptorRaw<'a> { } } +impl JobDescriptorRaw<'static> { + /// Consume and return the inner `Box`. + /// + /// Only available on owned (`'static`) descriptors where the `Cow` is guaranteed `Owned`. + pub fn into_inner(self) -> Box { + match self.0 { + Cow::Owned(boxed) => boxed, + Cow::Borrowed(_) => unreachable!("'static lifetime guarantees Cow::Owned"), + } + } +} + impl<'a> AsRef for JobDescriptorRaw<'a> { fn as_ref(&self) -> &RawValue { &self.0 diff --git a/crates/services/admin-api/Cargo.toml b/crates/services/admin-api/Cargo.toml index d5f4cdf93..9df8c8899 100644 --- a/crates/services/admin-api/Cargo.toml +++ b/crates/services/admin-api/Cargo.toml @@ -9,6 +9,8 @@ amp-data-store = { path = "../../core/data-store" } amp-datasets-registry = { path = "../../core/datasets-registry" } amp-providers-common = { path = "../../core/providers-common" } amp-providers-registry = { path = "../../core/providers-registry" } +amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" } +amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" } async-trait.workspace = true axum.workspace = true common = { path = "../../core/common" } diff --git a/crates/services/admin-api/src/handlers/datasets/deploy.rs b/crates/services/admin-api/src/handlers/datasets/deploy.rs index 3b832e1a5..3ffc8b18e 100644 --- a/crates/services/admin-api/src/handlers/datasets/deploy.rs +++ b/crates/services/admin-api/src/handlers/datasets/deploy.rs @@ -1,6 +1,8 @@ use std::fmt::Debug; use amp_datasets_registry::error::{ListVersionTagsError, ResolveRevisionError}; +use amp_worker_datasets_derived::job_descriptor::JobDescriptor as MaterializeDerivedDatasetJobDescriptor; +use amp_worker_datasets_raw::job_descriptor::JobDescriptor as MaterializeRawDatasetJobDescriptor; use axum::{ Json, extract::{ @@ -11,6 +13,7 @@ use axum::{ }; use common::dataset_store::GetDatasetError; use datasets_common::{name::Name, namespace::Namespace, reference::Reference, revision::Revision}; +use datasets_derived::DerivedDatasetKind; use monitoring::logging; use worker::job::JobId; @@ -143,16 +146,30 @@ pub async fn handler( .await .map_err(Error::GetDataset)?; + // Build the job descriptor based on dataset kind + let job_descriptor = if dataset.kind() == DerivedDatasetKind { + MaterializeDerivedDatasetJobDescriptor { + end_block: end_block.into(), + dataset_namespace: reference.namespace().clone(), + dataset_name: reference.name().clone(), + manifest_hash: reference.hash().clone(), + } + .into() + } else { + MaterializeRawDatasetJobDescriptor { + end_block: end_block.into(), + max_writers: parallelism, + dataset_namespace: reference.namespace().clone(), + dataset_name: reference.name().clone(), + manifest_hash: reference.hash().clone(), + } + .into() + }; + // Schedule the extraction job using the scheduler let job_id = ctx .scheduler - .schedule_dataset_sync_job( - reference.clone(), - dataset.kind().clone(), - end_block.into(), - parallelism, - worker_id, - ) + .schedule_job(reference.clone(), job_descriptor, worker_id) .await .map_err(|err| { tracing::error!( diff --git a/crates/services/admin-api/src/scheduler.rs b/crates/services/admin-api/src/scheduler.rs index d176f5b61..fccc38664 100644 --- a/crates/services/admin-api/src/scheduler.rs +++ b/crates/services/admin-api/src/scheduler.rs @@ -25,10 +25,10 @@ use async_trait::async_trait; use datasets_common::{ - dataset_kind_str::DatasetKindStr, end_block::EndBlock, hash::Hash, - hash_reference::HashReference, name::Name, namespace::Namespace, + hash::Hash, hash_reference::HashReference, name::Name, namespace::Namespace, }; use metadata_db::workers::Worker; +use serde_json::value::RawValue; use worker::{ job::{Job, JobId, JobStatus}, node_id::{InvalidIdError, NodeId, validate_node_id}, @@ -50,13 +50,11 @@ impl Scheduler for T where T: SchedulerJobs + SchedulerWorkers {} // because const generics make the trait not dyn-compatible. #[async_trait] pub trait SchedulerJobs: Send + Sync { - /// Schedule a dataset synchronization job - async fn schedule_dataset_sync_job( + /// Schedule a job with a pre-built descriptor + async fn schedule_job( &self, dataset_reference: HashReference, - dataset_kind: DatasetKindStr, - end_block: EndBlock, - max_writers: u16, + job_descriptor: JobDescriptor, worker_id: Option, ) -> Result; @@ -102,6 +100,58 @@ pub trait SchedulerJobs: Send + Sync { ) -> Result, ListJobsByDatasetError>; } +/// Invariant-preserving bridge between typed worker job descriptors and the scheduler. +/// +/// Acts as the admin-api's canonical job descriptor type, decoupling the scheduler trait +/// interface from both the typed worker descriptor crates (`amp-worker-datasets-raw`, +/// `amp-worker-datasets-derived`) and the metadata-db storage type (`JobDescriptorRawOwned`). +/// +/// Invariants are established at the point of construction (via the `From` impls from typed +/// worker descriptors) and preserved throughout. All subsequent conversions to/from +/// `JobDescriptorRawOwned` or `JobDescriptorRaw` are infallible. +#[derive(Clone, Debug)] +pub struct JobDescriptor(Box); + +impl From for metadata_db::jobs::JobDescriptorRawOwned { + fn from(value: JobDescriptor) -> Self { + // SAFETY: JobDescriptor inner data is valid JSON from trusted sources + metadata_db::jobs::JobDescriptorRaw::from_owned_unchecked(value.0) + } +} + +impl<'a> From<&'a JobDescriptor> for metadata_db::jobs::JobDescriptorRaw<'a> { + fn from(value: &'a JobDescriptor) -> Self { + // SAFETY: JobDescriptor inner data is valid JSON from trusted sources + metadata_db::jobs::JobDescriptorRaw::from_ref_unchecked(&value.0) + } +} + +impl From for JobDescriptor { + fn from(value: metadata_db::jobs::JobDescriptorRawOwned) -> Self { + Self(value.into_inner()) + } +} + +impl From<&metadata_db::jobs::JobDescriptorRaw<'_>> for JobDescriptor { + fn from(value: &metadata_db::jobs::JobDescriptorRaw<'_>) -> Self { + Self(value.as_raw().to_owned()) + } +} + +impl From for JobDescriptor { + fn from(desc: amp_worker_datasets_raw::job_descriptor::JobDescriptor) -> Self { + let raw: metadata_db::jobs::JobDescriptorRawOwned = desc.into(); + Self(raw.into_inner()) + } +} + +impl From for JobDescriptor { + fn from(desc: amp_worker_datasets_derived::job_descriptor::JobDescriptor) -> Self { + let raw: metadata_db::jobs::JobDescriptorRawOwned = desc.into(); + Self(raw.into_inner()) + } +} + /// Errors that can occur when scheduling a dataset dump job #[derive(Debug, thiserror::Error)] pub enum ScheduleJobError { @@ -140,15 +190,6 @@ pub enum ScheduleJobError { #[error("specified worker '{0}' not found or inactive")] WorkerNotAvailable(NodeId), - /// Failed to serialize job descriptor to JSON - /// - /// This occurs when: - /// - JobDescriptor cannot be serialized to JSON - /// - Invalid UTF-8 characters in job parameters - /// - Serialization buffer overflow - #[error("failed to serialize job descriptor: {0}")] - SerializeJobDescriptor(#[source] serde_json::Error), - /// Failed to register job in the metadata database /// /// This occurs when: diff --git a/crates/services/controller/Cargo.toml b/crates/services/controller/Cargo.toml index 1aeffe287..7ad533512 100644 --- a/crates/services/controller/Cargo.toml +++ b/crates/services/controller/Cargo.toml @@ -8,19 +8,15 @@ license-file.workspace = true admin-api = { path = "../admin-api" } amp-data-store = { path = "../../core/data-store" } amp-datasets-registry = { version = "0.1.0", path = "../../core/datasets-registry" } -amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" } -amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" } amp-providers-registry = { version = "0.1.0", path = "../../core/providers-registry" } async-trait.workspace = true axum.workspace = true common = { path = "../../core/common" } datasets-common = { version = "0.1.0", path = "../../core/datasets-common" } -datasets-derived = { version = "0.1.0", path = "../../core/datasets-derived" } metadata-db = { version = "0.1.0", path = "../../core/metadata-db" } monitoring = { path = "../../core/monitoring" } opentelemetry-instrumentation-tower = { version = "0.17.0", features = ["axum"] } rand.workspace = true -serde_json.workspace = true thiserror.workspace = true tokio.workspace = true tower-http = { workspace = true, features = ["cors"] } diff --git a/crates/services/controller/src/scheduler.rs b/crates/services/controller/src/scheduler.rs index 38f1a3707..d7ed3a4b4 100644 --- a/crates/services/controller/src/scheduler.rs +++ b/crates/services/controller/src/scheduler.rs @@ -27,16 +27,14 @@ use std::time::Duration; use admin_api::scheduler::{ - DeleteJobError, DeleteJobsByStatusError, GetJobError, GetWorkerError, ListJobsByDatasetError, - ListJobsError, ListWorkersError, NodeSelector, ScheduleJobError, SchedulerJobs, - SchedulerWorkers, StopJobError, + DeleteJobError, DeleteJobsByStatusError, GetJobError, GetWorkerError, JobDescriptor, + ListJobsByDatasetError, ListJobsError, ListWorkersError, NodeSelector, ScheduleJobError, + SchedulerJobs, SchedulerWorkers, StopJobError, }; use async_trait::async_trait; use datasets_common::{ - dataset_kind_str::DatasetKindStr, end_block::EndBlock, hash::Hash, - hash_reference::HashReference, name::Name, namespace::Namespace, + hash::Hash, hash_reference::HashReference, name::Name, namespace::Namespace, }; -use datasets_derived::DerivedDatasetKind; use metadata_db::{ Error as MetadataDbError, MetadataDb, jobs::JobStatusUpdateError, workers::Worker, }; @@ -69,19 +67,18 @@ impl Scheduler { Self { metadata_db } } - /// Schedule a dataset synchronization job + /// Schedule a job with a pre-built descriptor /// /// Checks for existing scheduled or running jobs to avoid duplicates, selects an available /// worker node (either randomly, by exact worker_id, or by matching a glob pattern) and registers the job in the metadata database. - async fn schedule_dataset_sync_job_impl( + async fn schedule_job_impl( &self, - end_block: EndBlock, - max_writers: u16, hash_reference: HashReference, - dataset_kind: DatasetKindStr, + job_descriptor: JobDescriptor, worker_id: Option, ) -> Result { // Avoid re-scheduling jobs in a scheduled or running state. + // TODO: Deduplicate jobs based on an idempotency key (not in the job descriptor) let existing_jobs = metadata_db::jobs::get_by_dataset(&self.metadata_db, hash_reference.hash()) .await @@ -135,32 +132,13 @@ impl Scheduler { } }; - let job_desc = if dataset_kind == DerivedDatasetKind { - amp_worker_datasets_derived::job_descriptor::JobDescriptor { - end_block, - dataset_namespace: hash_reference.namespace().clone(), - dataset_name: hash_reference.name().clone(), - manifest_hash: hash_reference.hash().clone(), - } - .into() - } else { - amp_worker_datasets_raw::job_descriptor::JobDescriptor { - end_block, - max_writers, - dataset_namespace: hash_reference.namespace().clone(), - dataset_name: hash_reference.name().clone(), - manifest_hash: hash_reference.hash().clone(), - } - .into() - }; - let mut tx = self .metadata_db .begin_txn() .await .map_err(ScheduleJobError::BeginTransaction)?; - let job_id = metadata_db::jobs::register(&mut tx, &node_id, &job_desc) + let job_id = metadata_db::jobs::register(&mut tx, &node_id, job_descriptor) .await .map(Into::into) .map_err(ScheduleJobError::RegisterJob)?; @@ -347,22 +325,14 @@ pub enum RescheduleJobError { #[async_trait] impl SchedulerJobs for Scheduler { - async fn schedule_dataset_sync_job( + async fn schedule_job( &self, dataset_reference: HashReference, - dataset_kind: DatasetKindStr, - end_block: EndBlock, - max_writers: u16, + job_descriptor: JobDescriptor, worker_id: Option, ) -> Result { - self.schedule_dataset_sync_job_impl( - end_block, - max_writers, - dataset_reference, - dataset_kind, - worker_id, - ) - .await + self.schedule_job_impl(dataset_reference, job_descriptor, worker_id) + .await } async fn stop_job(&self, job_id: JobId) -> Result<(), StopJobError> {