From d921ecb776898f022d544c80609be24a7a78d04a Mon Sep 17 00:00:00 2001 From: juaristi22 Date: Thu, 26 Mar 2026 17:06:51 +0530 Subject: [PATCH 1/3] add run_id to artifact directories --- modal_app/data_build.py | 3 +++ modal_app/local_area.py | 8 ++++-- modal_app/pipeline.py | 34 ++++++++++++++++++-------- modal_app/remote_calibration_runner.py | 27 ++++++++++++++++---- 4 files changed, 55 insertions(+), 17 deletions(-) diff --git a/modal_app/data_build.py b/modal_app/data_build.py index e5047aca..a423761e 100644 --- a/modal_app/data_build.py +++ b/modal_app/data_build.py @@ -350,6 +350,7 @@ def build_datasets( clear_checkpoints: bool = False, skip_tests: bool = False, skip_enhanced_cps: bool = False, + run_id: str = "", ): """Build all datasets with preemption-resilient checkpointing. @@ -593,6 +594,8 @@ def build_datasets( # failure does not block downstream calibration steps. print("Copying pipeline artifacts to shared volume...") artifacts_dir = Path(PIPELINE_MOUNT) / "artifacts" + if run_id: + artifacts_dir = artifacts_dir / run_id artifacts_dir.mkdir(parents=True, exist_ok=True) # Copy all intermediate H5 datasets for lineage tracing diff --git a/modal_app/local_area.py b/modal_app/local_area.py index 8a058be2..a69dae8e 100644 --- a/modal_app/local_area.py +++ b/modal_app/local_area.py @@ -646,7 +646,9 @@ def coordinate_publish( version_dir = staging_dir / version pipeline_volume.reload() - artifacts = Path("/pipeline/artifacts") + artifacts = ( + Path(f"/pipeline/artifacts/{run_id}") if run_id else Path("/pipeline/artifacts") + ) weights_path = artifacts / "calibration_weights.npy" db_path = artifacts / "policy_data.db" dataset_path = artifacts / "source_imputed_stratified_extended_cps.h5" @@ -929,7 +931,9 @@ def coordinate_national_publish( staging_dir = Path(VOLUME_MOUNT) pipeline_volume.reload() - artifacts = Path("/pipeline/artifacts") + artifacts = ( + Path(f"/pipeline/artifacts/{run_id}") if run_id else Path("/pipeline/artifacts") + ) weights_path = artifacts / "national_calibration_weights.npy" db_path = artifacts / "policy_data.db" dataset_path = artifacts / "source_imputed_stratified_extended_cps.h5" diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index f5fbe361..1abed68f 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -67,10 +67,21 @@ REPO_URL = "https://github.com/PolicyEngine/policyengine-us-data.git" PIPELINE_MOUNT = "/pipeline" STAGING_MOUNT = "/staging" -ARTIFACTS_DIR = f"{PIPELINE_MOUNT}/artifacts" +ARTIFACTS_BASE = f"{PIPELINE_MOUNT}/artifacts" RUNS_DIR = f"{PIPELINE_MOUNT}/runs" +def artifacts_dir_for_run(run_id: str) -> str: + """Return the run-scoped artifacts directory. + + When run_id is empty, falls back to the flat base path + for backward compatibility with standalone invocations. + """ + if run_id: + return f"{ARTIFACTS_BASE}/{run_id}" + return ARTIFACTS_BASE + + # ── Run metadata ───────────────────────────────────────────────── @@ -302,7 +313,7 @@ def stage_base_datasets( version: Package version string for the commit. branch: Git branch for repo clone. """ - artifacts = Path(ARTIFACTS_DIR) + artifacts = Path(artifacts_dir_for_run(run_id)) files_with_paths = [] @@ -666,8 +677,8 @@ def run_pipeline( run_dir.mkdir(parents=True, exist_ok=True) (run_dir / "diagnostics").mkdir(exist_ok=True) - # Create artifacts directory - Path(ARTIFACTS_DIR).mkdir(parents=True, exist_ok=True) + # Create run-scoped artifacts directory + Path(artifacts_dir_for_run(run_id)).mkdir(parents=True, exist_ok=True) write_run_meta(meta, pipeline_volume) @@ -704,6 +715,7 @@ def run_pipeline( clear_checkpoints=clear_checkpoints, skip_tests=True, skip_enhanced_cps=False, + run_id=run_id, ) # The build_datasets step produces files in its @@ -732,6 +744,7 @@ def run_pipeline( branch=branch, workers=num_workers, n_clones=n_clones, + run_id=run_id, ) print(f" Package at: {pkg_path}") @@ -750,7 +763,7 @@ def run_pipeline( print("\n[Step 3/5] Fitting calibration weights...") step_start = time.time() - vol_path = "/pipeline/artifacts/calibration_package.pkl" + vol_path = f"{artifacts_dir_for_run(run_id)}/calibration_package.pkl" target_cfg = "policyengine_us_data/calibration/target_config.yaml" # Spawn regional fit @@ -794,16 +807,17 @@ def run_pipeline( regional_result = regional_handle.get() print(" Regional fit complete. Writing to volume...") - # Write regional results to pipeline volume + # Write regional results to pipeline volume (run-scoped) + artifacts_rel = f"artifacts/{run_id}" if run_id else "artifacts" with pipeline_volume.batch_upload(force=True) as batch: batch.put_file( BytesIO(regional_result["weights"]), - "artifacts/calibration_weights.npy", + f"{artifacts_rel}/calibration_weights.npy", ) if regional_result.get("config"): batch.put_file( BytesIO(regional_result["config"]), - "artifacts/unified_run_config.json", + f"{artifacts_rel}/unified_run_config.json", ) archive_diagnostics( @@ -822,12 +836,12 @@ def run_pipeline( with pipeline_volume.batch_upload(force=True) as batch: batch.put_file( BytesIO(national_result["weights"]), - "artifacts/national_calibration_weights.npy", + f"{artifacts_rel}/national_calibration_weights.npy", ) if national_result.get("config"): batch.put_file( BytesIO(national_result["config"]), - "artifacts/national_unified_run_config.json", + f"{artifacts_rel}/national_unified_run_config.json", ) archive_diagnostics( diff --git a/modal_app/remote_calibration_runner.py b/modal_app/remote_calibration_runner.py index 41cfc476..b2962e32 100644 --- a/modal_app/remote_calibration_runner.py +++ b/modal_app/remote_calibration_runner.py @@ -156,12 +156,13 @@ def _fit_weights_impl( log_freq: int = None, skip_county: bool = True, workers: int = 8, + artifacts_dir: str = "", ) -> dict: """Full pipeline: read data from pipeline volume, build matrix, fit.""" _setup_repo() pipeline_vol.reload() - artifacts = f"{PIPELINE_MOUNT}/artifacts" + artifacts = artifacts_dir if artifacts_dir else f"{PIPELINE_MOUNT}/artifacts" db_path = f"{artifacts}/policy_data.db" dataset_path = f"{artifacts}/source_imputed_stratified_extended_cps.h5" for label, p in [("database", db_path), ("dataset", dataset_path)]: @@ -324,12 +325,15 @@ def _build_package_impl( skip_county: bool = True, workers: int = 8, n_clones: int = 430, + run_id: str = "", ) -> str: """Read data from pipeline volume, build X matrix, save package.""" _setup_repo() pipeline_vol.reload() artifacts = f"{PIPELINE_MOUNT}/artifacts" + if run_id: + artifacts = f"{artifacts}/{run_id}" db_path = f"{artifacts}/policy_data.db" dataset_path = f"{artifacts}/source_imputed_stratified_extended_cps.h5" for label, p in [("database", db_path), ("dataset", dataset_path)]: @@ -338,7 +342,7 @@ def _build_package_impl( f"Missing {label} on pipeline volume: {p}. Run data_build first." ) - pkg_path = f"{PIPELINE_MOUNT}/artifacts/calibration_package.pkl" + pkg_path = f"{artifacts}/calibration_package.pkl" script_path = "policyengine_us_data/calibration/unified_calibration.py" cmd = [ "uv", @@ -405,6 +409,7 @@ def build_package_remote( skip_county: bool = True, workers: int = 8, n_clones: int = 430, + run_id: str = "", ) -> str: return _build_package_impl( branch, @@ -412,6 +417,7 @@ def build_package_remote( skip_county=skip_county, workers=workers, n_clones=n_clones, + run_id=run_id, ) @@ -421,7 +427,7 @@ def build_package_remote( volumes={PIPELINE_MOUNT: pipeline_vol}, nonpreemptible=True, ) -def check_volume_package() -> dict: +def check_volume_package(artifacts_dir: str = "") -> dict: """Check if a calibration package exists on the volume. Reads the lightweight JSON sidecar for provenance fields. @@ -430,8 +436,9 @@ def check_volume_package() -> dict: import datetime import json - pkg_path = f"{PIPELINE_MOUNT}/artifacts/calibration_package.pkl" - sidecar_path = f"{PIPELINE_MOUNT}/artifacts/calibration_package_meta.json" + base = artifacts_dir if artifacts_dir else f"{PIPELINE_MOUNT}/artifacts" + pkg_path = f"{base}/calibration_package.pkl" + sidecar_path = f"{base}/calibration_package_meta.json" if not os.path.exists(pkg_path): return {"exists": False} @@ -485,6 +492,7 @@ def fit_weights_t4( log_freq: int = None, skip_county: bool = True, workers: int = 8, + artifacts_dir: str = "", ) -> dict: return _fit_weights_impl( branch, @@ -497,6 +505,7 @@ def fit_weights_t4( log_freq, skip_county=skip_county, workers=workers, + artifacts_dir=artifacts_dir, ) @@ -520,6 +529,7 @@ def fit_weights_a10( log_freq: int = None, skip_county: bool = True, workers: int = 8, + artifacts_dir: str = "", ) -> dict: return _fit_weights_impl( branch, @@ -532,6 +542,7 @@ def fit_weights_a10( log_freq, skip_county=skip_county, workers=workers, + artifacts_dir=artifacts_dir, ) @@ -555,6 +566,7 @@ def fit_weights_a100_40( log_freq: int = None, skip_county: bool = True, workers: int = 8, + artifacts_dir: str = "", ) -> dict: return _fit_weights_impl( branch, @@ -567,6 +579,7 @@ def fit_weights_a100_40( log_freq, skip_county=skip_county, workers=workers, + artifacts_dir=artifacts_dir, ) @@ -590,6 +603,7 @@ def fit_weights_a100_80( log_freq: int = None, skip_county: bool = True, workers: int = 8, + artifacts_dir: str = "", ) -> dict: return _fit_weights_impl( branch, @@ -602,6 +616,7 @@ def fit_weights_a100_80( log_freq, skip_county=skip_county, workers=workers, + artifacts_dir=artifacts_dir, ) @@ -625,6 +640,7 @@ def fit_weights_h100( log_freq: int = None, skip_county: bool = True, workers: int = 8, + artifacts_dir: str = "", ) -> dict: return _fit_weights_impl( branch, @@ -637,6 +653,7 @@ def fit_weights_h100( log_freq, skip_county=skip_county, workers=workers, + artifacts_dir=artifacts_dir, ) From cecef561dd9d11214a5c2ab7f2652b1cd8b4d50f Mon Sep 17 00:00:00 2001 From: juaristi22 Date: Thu, 26 Mar 2026 18:02:05 +0530 Subject: [PATCH 2/3] adding changelog fragment --- changelog.d/scope-pipeline-artifacts-by-run-id.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/scope-pipeline-artifacts-by-run-id.fixed.md diff --git a/changelog.d/scope-pipeline-artifacts-by-run-id.fixed.md b/changelog.d/scope-pipeline-artifacts-by-run-id.fixed.md new file mode 100644 index 00000000..dfca7a6f --- /dev/null +++ b/changelog.d/scope-pipeline-artifacts-by-run-id.fixed.md @@ -0,0 +1 @@ +Scope pipeline artifact directory by run ID to prevent concurrent runs from clobbering each other's H5 files, calibration packages, and weights. \ No newline at end of file From 8b9d2313a1c79c560047994089e1ee08b0036388 Mon Sep 17 00:00:00 2001 From: juaristi22 Date: Thu, 26 Mar 2026 20:54:32 +0530 Subject: [PATCH 3/3] increase worker timeout --- modal_app/local_area.py | 6 +++--- modal_app/remote_calibration_runner.py | 20 ++++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/modal_app/local_area.py b/modal_app/local_area.py index a69dae8e..caabfc99 100644 --- a/modal_app/local_area.py +++ b/modal_app/local_area.py @@ -294,7 +294,7 @@ def run_phase( }, memory=16384, cpu=4.0, - timeout=14400, + timeout=28800, nonpreemptible=True, ) def build_areas_worker( @@ -438,7 +438,7 @@ def validate_staging(branch: str, version: str, run_id: str = "") -> Dict: secrets=[hf_secret], volumes={VOLUME_MOUNT: staging_volume}, memory=8192, - timeout=14400, + timeout=28800, nonpreemptible=True, ) def upload_to_staging( @@ -902,7 +902,7 @@ def main( "/pipeline": pipeline_volume, }, memory=16384, - timeout=14400, + timeout=28800, nonpreemptible=True, ) def coordinate_national_publish( diff --git a/modal_app/remote_calibration_runner.py b/modal_app/remote_calibration_runner.py index b2962e32..30126e24 100644 --- a/modal_app/remote_calibration_runner.py +++ b/modal_app/remote_calibration_runner.py @@ -478,7 +478,7 @@ def check_volume_package(artifacts_dir: str = "") -> dict: memory=32768, cpu=8.0, gpu="T4", - timeout=14400, + timeout=28800, volumes={PIPELINE_MOUNT: pipeline_vol}, ) def fit_weights_t4( @@ -515,7 +515,7 @@ def fit_weights_t4( memory=32768, cpu=8.0, gpu="A10", - timeout=14400, + timeout=28800, volumes={PIPELINE_MOUNT: pipeline_vol}, ) def fit_weights_a10( @@ -552,7 +552,7 @@ def fit_weights_a10( memory=32768, cpu=8.0, gpu="A100-40GB", - timeout=14400, + timeout=28800, volumes={PIPELINE_MOUNT: pipeline_vol}, ) def fit_weights_a100_40( @@ -589,7 +589,7 @@ def fit_weights_a100_40( memory=32768, cpu=8.0, gpu="A100-80GB", - timeout=14400, + timeout=28800, volumes={PIPELINE_MOUNT: pipeline_vol}, ) def fit_weights_a100_80( @@ -626,7 +626,7 @@ def fit_weights_a100_80( memory=32768, cpu=8.0, gpu="H100", - timeout=14400, + timeout=28800, volumes={PIPELINE_MOUNT: pipeline_vol}, ) def fit_weights_h100( @@ -674,7 +674,7 @@ def fit_weights_h100( memory=32768, cpu=8.0, gpu="T4", - timeout=14400, + timeout=28800, volumes={PIPELINE_MOUNT: pipeline_vol}, ) def fit_from_package_t4( @@ -706,7 +706,7 @@ def fit_from_package_t4( memory=32768, cpu=8.0, gpu="A10", - timeout=14400, + timeout=28800, volumes={PIPELINE_MOUNT: pipeline_vol}, ) def fit_from_package_a10( @@ -738,7 +738,7 @@ def fit_from_package_a10( memory=32768, cpu=8.0, gpu="A100-40GB", - timeout=14400, + timeout=28800, volumes={PIPELINE_MOUNT: pipeline_vol}, ) def fit_from_package_a100_40( @@ -770,7 +770,7 @@ def fit_from_package_a100_40( memory=32768, cpu=8.0, gpu="A100-80GB", - timeout=14400, + timeout=28800, volumes={PIPELINE_MOUNT: pipeline_vol}, ) def fit_from_package_a100_80( @@ -802,7 +802,7 @@ def fit_from_package_a100_80( memory=32768, cpu=8.0, gpu="H100", - timeout=14400, + timeout=28800, volumes={PIPELINE_MOUNT: pipeline_vol}, ) def fit_from_package_h100(