Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/pipeline-hf-archival.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Archive pipeline runs, diagnostics, and intermediate build artifacts to a dedicated HuggingFace repo (`PolicyEngine/policyengine-us-data-pipeline`) for durable run history that survives Modal volume deletion.
154 changes: 146 additions & 8 deletions modal_app/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,30 @@ def generate_run_id(version: str, sha: str) -> str:
def write_run_meta(
meta: RunMetadata,
vol: modal.Volume,
mirror: bool = True,
) -> None:
"""Write run metadata to the pipeline volume."""
"""Write run metadata to the pipeline volume and optionally mirror to HF.

Args:
meta: Run metadata to persist.
vol: Modal volume to write to.
mirror: If True, also upload meta.json to the pipeline
archival HF repo. Set to False in error handlers to
avoid network calls that could hang.
"""
run_dir = Path(RUNS_DIR) / meta.run_id
run_dir.mkdir(parents=True, exist_ok=True)
meta_path = run_dir / "meta.json"
with open(meta_path, "w") as f:
json.dump(meta.to_dict(), f, indent=2)
vol.commit()

if mirror:
_mirror_to_pipeline_repo(
meta.run_id,
[[str(meta_path), "meta.json"]],
)


def read_run_meta(
run_id: str,
Expand Down Expand Up @@ -170,7 +185,7 @@ def archive_diagnostics(
vol: modal.Volume,
prefix: str = "",
) -> None:
"""Archive calibration diagnostics to the run directory."""
"""Archive calibration diagnostics to the run directory and mirror to HF."""
diag_dir = Path(RUNS_DIR) / run_id / "diagnostics"
diag_dir.mkdir(parents=True, exist_ok=True)

Expand All @@ -180,16 +195,21 @@ def archive_diagnostics(
"config": f"{prefix}unified_run_config.json",
}

written_files = []
for key, filename in file_map.items():
data = result_bytes.get(key)
if data:
path = diag_dir / filename
with open(path, "wb") as f:
f.write(data)
print(f" Archived {filename} ({len(data):,} bytes)")
written_files.append([str(path), f"diagnostics/{filename}"])

vol.commit()

if written_files:
_mirror_to_pipeline_repo(run_id, written_files)


def _step_completed(meta: RunMetadata, step: str) -> bool:
"""Check if a step is marked completed in metadata."""
Expand Down Expand Up @@ -248,6 +268,111 @@ def _record_step(
write_run_meta(meta, vol)


# ── Pipeline repo archival ────────────────────────────────────────


_MIRROR_SCRIPT = """\
import os, json
from policyengine_us_data.utils.data_upload import upload_to_pipeline_repo

pairs = json.loads(os.environ["_MIRROR_PAIRS"])
run_id = os.environ["_MIRROR_RUN_ID"]
count = upload_to_pipeline_repo(pairs, run_id)
print(f"Mirrored {count} file(s) to pipeline repo")
"""

# 10-minute timeout for non-critical archival subprocess calls.
_MIRROR_TIMEOUT_S = 600


def _mirror_to_pipeline_repo(
run_id: str,
files_with_paths: list[list[str]],
) -> None:
"""Upload files to the pipeline archival HF repo.

Non-fatal: logs warnings on failure but does not raise.
Uses environment variables (not f-string interpolation) to
pass data to the subprocess, avoiding injection risks.

Args:
run_id: Pipeline run identifier.
files_with_paths: List of [local_path, path_within_run] pairs.
"""
if not files_with_paths:
return

env = os.environ.copy()
env["_MIRROR_PAIRS"] = json.dumps(files_with_paths)
env["_MIRROR_RUN_ID"] = run_id

try:
result = subprocess.run(
["uv", "run", "python", "-c", _MIRROR_SCRIPT],
cwd="/root/policyengine-us-data",
capture_output=True,
text=True,
env=env,
timeout=_MIRROR_TIMEOUT_S,
)
if result.returncode != 0:
print(f" WARNING: Pipeline repo mirror failed: {result.stderr[:500]}")
elif result.stdout.strip():
print(f" {result.stdout.strip()}")
except subprocess.TimeoutExpired:
print(f" WARNING: Pipeline repo mirror timed out after {_MIRROR_TIMEOUT_S}s")
except (subprocess.SubprocessError, OSError) as e:
print(f" WARNING: Pipeline repo mirror error: {e}")


# Intermediate artifacts from Step 1 to archive (not shipped elsewhere).
# Maps local filename -> repo-relative path inside artifacts/.
STEP1_ARTIFACTS: dict[str, str] = {
"acs_2022.h5": "artifacts/acs_2022.h5",
"irs_puf_2015.h5": "artifacts/irs_puf_2015.h5",
"puf_2024.h5": "artifacts/puf_2024.h5",
"extended_cps_2024.h5": "artifacts/extended_cps_2024.h5",
"stratified_extended_cps_2024.h5": "artifacts/stratified_extended_cps_2024.h5",
"build_log.txt": "artifacts/build_log.txt",
"calibration_log.csv": "artifacts/calibration_log_legacy.csv",
"uprating_factors.csv": "artifacts/uprating_factors.csv",
}

STEP2_ARTIFACTS: dict[str, str] = {
"calibration_package_meta.json": "artifacts/calibration_package_meta.json",
}


def _archive_artifacts(
run_id: str,
artifact_names: dict[str, str],
) -> None:
"""Archive named artifacts from ARTIFACTS_DIR to the pipeline HF repo.

Args:
run_id: Pipeline run identifier.
artifact_names: Mapping of {local_filename: repo_relative_path}.
"""
artifacts_dir = Path(ARTIFACTS_DIR)
files: list[list[str]] = []
for name, repo_path in artifact_names.items():
path = artifacts_dir / name
if not path.exists():
continue
try:
size = path.stat().st_size
except OSError:
continue
files.append([str(path), repo_path])
print(f" Archiving {name} ({size:,} bytes)")

if files:
print(f" Uploading {len(files)} artifact(s)...")
_mirror_to_pipeline_repo(run_id, files)
else:
print(" No artifacts found to archive")


# ── Include other Modal apps ─────────────────────────────────────
# app.include() merges functions from other apps into this one,
# ensuring Modal mounts their files and registers their functions
Expand Down Expand Up @@ -470,6 +595,8 @@ def _write_validation_diagnostics(
diag_dir = Path(RUNS_DIR) / run_id / "diagnostics"
diag_dir.mkdir(parents=True, exist_ok=True)

mirror_files: list[list[str]] = []

# Write regional validation CSV
if validation_rows:
csv_columns = [
Expand All @@ -496,6 +623,7 @@ def _write_validation_diagnostics(
for row in validation_rows:
writer.writerow({k: row.get(k, "") for k in csv_columns})
print(f" Wrote {len(validation_rows)} rows to {csv_path}")
mirror_files.append([str(csv_path), "diagnostics/validation_results.csv"])

# Compute summary
n_sanity_fail = sum(
Expand Down Expand Up @@ -567,9 +695,14 @@ def _write_validation_diagnostics(
with open(nat_path, "w") as f:
f.write(national_output)
print(f" Wrote national validation to {nat_path}")
mirror_files.append([str(nat_path), "diagnostics/national_validation.txt"])

vol.commit()

# Mirror validation files to pipeline archival HF repo
if mirror_files:
_mirror_to_pipeline_repo(run_id, mirror_files)


# ── Orchestrator ─────────────────────────────────────────────────

Expand Down Expand Up @@ -706,11 +839,6 @@ def run_pipeline(
skip_enhanced_cps=False,
)

# The build_datasets step produces files in its
# own volume. Key outputs (source_imputed,
# policy_data.db) are staged to HF in step 4.
# TODO(#617): When pipeline_artifacts.py lands,
# call mirror_to_pipeline() here for audit trail.
_record_step(
meta,
"build_datasets",
Expand All @@ -720,6 +848,11 @@ def run_pipeline(
print(
f" Completed in {meta.step_timings['build_datasets']['duration_s']}s"
)

# Archive intermediate build artifacts to pipeline HF repo
print(" Archiving intermediate artifacts...")
pipeline_volume.reload()
_archive_artifacts(run_id, STEP1_ARTIFACTS)
else:
print("\n[Step 1/5] Build datasets (skipped - completed)")

Expand All @@ -742,6 +875,11 @@ def run_pipeline(
pipeline_volume,
)
print(f" Completed in {meta.step_timings['build_package']['duration_s']}s")

# Archive package metadata to pipeline HF repo
print(" Archiving package metadata...")
pipeline_volume.reload()
_archive_artifacts(run_id, STEP2_ARTIFACTS)
else:
print("\n[Step 2/5] Build package (skipped - completed)")

Expand Down Expand Up @@ -962,7 +1100,7 @@ def run_pipeline(
except Exception as e:
meta.status = "failed"
meta.error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}"
write_run_meta(meta, pipeline_volume)
write_run_meta(meta, pipeline_volume, mirror=False)
print(f"\nPIPELINE FAILED: {e}")
print(f"Resume with: --resume-run-id {run_id}")
raise
Expand Down
Loading
Loading