diff --git a/changelog.d/pipeline-hf-archival.added.md b/changelog.d/pipeline-hf-archival.added.md new file mode 100644 index 00000000..7ec9947f --- /dev/null +++ b/changelog.d/pipeline-hf-archival.added.md @@ -0,0 +1 @@ +Archive pipeline runs, diagnostics, and intermediate build artifacts to a dedicated HuggingFace repo (`PolicyEngine/policyengine-us-data-pipeline`) for durable run history that survives Modal volume deletion. diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index f5fbe361..3c92c39c 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -107,8 +107,17 @@ def generate_run_id(version: str, sha: str) -> str: def write_run_meta( meta: RunMetadata, vol: modal.Volume, + mirror: bool = True, ) -> None: - """Write run metadata to the pipeline volume.""" + """Write run metadata to the pipeline volume and optionally mirror to HF. + + Args: + meta: Run metadata to persist. + vol: Modal volume to write to. + mirror: If True, also upload meta.json to the pipeline + archival HF repo. Set to False in error handlers to + avoid network calls that could hang. + """ run_dir = Path(RUNS_DIR) / meta.run_id run_dir.mkdir(parents=True, exist_ok=True) meta_path = run_dir / "meta.json" @@ -116,6 +125,12 @@ def write_run_meta( json.dump(meta.to_dict(), f, indent=2) vol.commit() + if mirror: + _mirror_to_pipeline_repo( + meta.run_id, + [[str(meta_path), "meta.json"]], + ) + def read_run_meta( run_id: str, @@ -170,7 +185,7 @@ def archive_diagnostics( vol: modal.Volume, prefix: str = "", ) -> None: - """Archive calibration diagnostics to the run directory.""" + """Archive calibration diagnostics to the run directory and mirror to HF.""" diag_dir = Path(RUNS_DIR) / run_id / "diagnostics" diag_dir.mkdir(parents=True, exist_ok=True) @@ -180,6 +195,7 @@ def archive_diagnostics( "config": f"{prefix}unified_run_config.json", } + written_files = [] for key, filename in file_map.items(): data = result_bytes.get(key) if data: @@ -187,9 +203,13 @@ def archive_diagnostics( with open(path, "wb") as f: f.write(data) print(f" Archived {filename} ({len(data):,} bytes)") + written_files.append([str(path), f"diagnostics/{filename}"]) vol.commit() + if written_files: + _mirror_to_pipeline_repo(run_id, written_files) + def _step_completed(meta: RunMetadata, step: str) -> bool: """Check if a step is marked completed in metadata.""" @@ -248,6 +268,111 @@ def _record_step( write_run_meta(meta, vol) +# ── Pipeline repo archival ──────────────────────────────────────── + + +_MIRROR_SCRIPT = """\ +import os, json +from policyengine_us_data.utils.data_upload import upload_to_pipeline_repo + +pairs = json.loads(os.environ["_MIRROR_PAIRS"]) +run_id = os.environ["_MIRROR_RUN_ID"] +count = upload_to_pipeline_repo(pairs, run_id) +print(f"Mirrored {count} file(s) to pipeline repo") +""" + +# 10-minute timeout for non-critical archival subprocess calls. +_MIRROR_TIMEOUT_S = 600 + + +def _mirror_to_pipeline_repo( + run_id: str, + files_with_paths: list[list[str]], +) -> None: + """Upload files to the pipeline archival HF repo. + + Non-fatal: logs warnings on failure but does not raise. + Uses environment variables (not f-string interpolation) to + pass data to the subprocess, avoiding injection risks. + + Args: + run_id: Pipeline run identifier. + files_with_paths: List of [local_path, path_within_run] pairs. + """ + if not files_with_paths: + return + + env = os.environ.copy() + env["_MIRROR_PAIRS"] = json.dumps(files_with_paths) + env["_MIRROR_RUN_ID"] = run_id + + try: + result = subprocess.run( + ["uv", "run", "python", "-c", _MIRROR_SCRIPT], + cwd="/root/policyengine-us-data", + capture_output=True, + text=True, + env=env, + timeout=_MIRROR_TIMEOUT_S, + ) + if result.returncode != 0: + print(f" WARNING: Pipeline repo mirror failed: {result.stderr[:500]}") + elif result.stdout.strip(): + print(f" {result.stdout.strip()}") + except subprocess.TimeoutExpired: + print(f" WARNING: Pipeline repo mirror timed out after {_MIRROR_TIMEOUT_S}s") + except (subprocess.SubprocessError, OSError) as e: + print(f" WARNING: Pipeline repo mirror error: {e}") + + +# Intermediate artifacts from Step 1 to archive (not shipped elsewhere). +# Maps local filename -> repo-relative path inside artifacts/. +STEP1_ARTIFACTS: dict[str, str] = { + "acs_2022.h5": "artifacts/acs_2022.h5", + "irs_puf_2015.h5": "artifacts/irs_puf_2015.h5", + "puf_2024.h5": "artifacts/puf_2024.h5", + "extended_cps_2024.h5": "artifacts/extended_cps_2024.h5", + "stratified_extended_cps_2024.h5": "artifacts/stratified_extended_cps_2024.h5", + "build_log.txt": "artifacts/build_log.txt", + "calibration_log.csv": "artifacts/calibration_log_legacy.csv", + "uprating_factors.csv": "artifacts/uprating_factors.csv", +} + +STEP2_ARTIFACTS: dict[str, str] = { + "calibration_package_meta.json": "artifacts/calibration_package_meta.json", +} + + +def _archive_artifacts( + run_id: str, + artifact_names: dict[str, str], +) -> None: + """Archive named artifacts from ARTIFACTS_DIR to the pipeline HF repo. + + Args: + run_id: Pipeline run identifier. + artifact_names: Mapping of {local_filename: repo_relative_path}. + """ + artifacts_dir = Path(ARTIFACTS_DIR) + files: list[list[str]] = [] + for name, repo_path in artifact_names.items(): + path = artifacts_dir / name + if not path.exists(): + continue + try: + size = path.stat().st_size + except OSError: + continue + files.append([str(path), repo_path]) + print(f" Archiving {name} ({size:,} bytes)") + + if files: + print(f" Uploading {len(files)} artifact(s)...") + _mirror_to_pipeline_repo(run_id, files) + else: + print(" No artifacts found to archive") + + # ── Include other Modal apps ───────────────────────────────────── # app.include() merges functions from other apps into this one, # ensuring Modal mounts their files and registers their functions @@ -470,6 +595,8 @@ def _write_validation_diagnostics( diag_dir = Path(RUNS_DIR) / run_id / "diagnostics" diag_dir.mkdir(parents=True, exist_ok=True) + mirror_files: list[list[str]] = [] + # Write regional validation CSV if validation_rows: csv_columns = [ @@ -496,6 +623,7 @@ def _write_validation_diagnostics( for row in validation_rows: writer.writerow({k: row.get(k, "") for k in csv_columns}) print(f" Wrote {len(validation_rows)} rows to {csv_path}") + mirror_files.append([str(csv_path), "diagnostics/validation_results.csv"]) # Compute summary n_sanity_fail = sum( @@ -567,9 +695,14 @@ def _write_validation_diagnostics( with open(nat_path, "w") as f: f.write(national_output) print(f" Wrote national validation to {nat_path}") + mirror_files.append([str(nat_path), "diagnostics/national_validation.txt"]) vol.commit() + # Mirror validation files to pipeline archival HF repo + if mirror_files: + _mirror_to_pipeline_repo(run_id, mirror_files) + # ── Orchestrator ───────────────────────────────────────────────── @@ -706,11 +839,6 @@ def run_pipeline( skip_enhanced_cps=False, ) - # The build_datasets step produces files in its - # own volume. Key outputs (source_imputed, - # policy_data.db) are staged to HF in step 4. - # TODO(#617): When pipeline_artifacts.py lands, - # call mirror_to_pipeline() here for audit trail. _record_step( meta, "build_datasets", @@ -720,6 +848,11 @@ def run_pipeline( print( f" Completed in {meta.step_timings['build_datasets']['duration_s']}s" ) + + # Archive intermediate build artifacts to pipeline HF repo + print(" Archiving intermediate artifacts...") + pipeline_volume.reload() + _archive_artifacts(run_id, STEP1_ARTIFACTS) else: print("\n[Step 1/5] Build datasets (skipped - completed)") @@ -742,6 +875,11 @@ def run_pipeline( pipeline_volume, ) print(f" Completed in {meta.step_timings['build_package']['duration_s']}s") + + # Archive package metadata to pipeline HF repo + print(" Archiving package metadata...") + pipeline_volume.reload() + _archive_artifacts(run_id, STEP2_ARTIFACTS) else: print("\n[Step 2/5] Build package (skipped - completed)") @@ -962,7 +1100,7 @@ def run_pipeline( except Exception as e: meta.status = "failed" meta.error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}" - write_run_meta(meta, pipeline_volume) + write_run_meta(meta, pipeline_volume, mirror=False) print(f"\nPIPELINE FAILED: {e}") print(f"Resume with: --resume-run-id {run_id}") raise diff --git a/policyengine_us_data/utils/data_upload.py b/policyengine_us_data/utils/data_upload.py index 90447ca4..5421135c 100644 --- a/policyengine_us_data/utils/data_upload.py +++ b/policyengine_us_data/utils/data_upload.py @@ -273,44 +273,48 @@ def hf_create_commit_with_retry( ) -def upload_to_staging_hf( - files_with_paths: List[Tuple[Path, str]], - version: str, - hf_repo_name: str = "policyengine/policyengine-us-data", - hf_repo_type: str = "model", +def _batched_hf_upload( + files_with_repo_paths: List[Tuple[str, str]], + repo_id: str, + repo_type: str, + commit_message_prefix: str, batch_size: int = 50, - run_id: str = "", ) -> int: - """ - Upload files to staging/ paths in HuggingFace. + """Upload files to a HuggingFace repo in batches with retry. Args: - files_with_paths: List of (local_path, relative_path) tuples - relative_path is like "states/AL.h5" - version: Version string for commit message - hf_repo_name: HuggingFace repository name - hf_repo_type: Repository type - batch_size: Number of files per commit batch + files_with_repo_paths: List of (local_path, path_in_repo) tuples. + repo_id: HuggingFace repository ID. + repo_type: Repository type. + commit_message_prefix: Prefix for commit messages. + batch_size: Number of files per commit batch. Returns: - Number of files uploaded + Number of files uploaded. + + Raises: + EnvironmentError: If HUGGING_FACE_TOKEN is not set. """ token = os.environ.get("HUGGING_FACE_TOKEN") + if not token: + raise EnvironmentError( + "HUGGING_FACE_TOKEN environment variable is not set. " + "Cannot upload to HuggingFace." + ) api = HfApi() total_uploaded = 0 - for i in range(0, len(files_with_paths), batch_size): - batch = files_with_paths[i : i + batch_size] + for i in range(0, len(files_with_repo_paths), batch_size): + batch = files_with_repo_paths[i : i + batch_size] operations = [] - for local_path, rel_path in batch: + for local_path, repo_path in batch: local_path = Path(local_path) if not local_path.exists(): logging.warning(f"File {local_path} does not exist, skipping.") continue - staging_prefix = f"staging/{run_id}" if run_id else "staging" operations.append( CommitOperationAdd( - path_in_repo=f"{staging_prefix}/{rel_path}", + path_in_repo=repo_path, path_or_fileobj=str(local_path), ) ) @@ -318,23 +322,59 @@ def upload_to_staging_hf( if not operations: continue + batch_num = i // batch_size + 1 hf_create_commit_with_retry( api=api, operations=operations, - repo_id=hf_repo_name, - repo_type=hf_repo_type, + repo_id=repo_id, + repo_type=repo_type, token=token, - commit_message=f"Upload batch {i // batch_size + 1} to staging for version {version}", + commit_message=f"{commit_message_prefix}: batch {batch_num} ({len(operations)} files)", ) total_uploaded += len(operations) logging.info( - f"Uploaded batch {i // batch_size + 1}: {len(operations)} files to staging/" + f"Uploaded batch {batch_num}: {len(operations)} files to {repo_id}" ) - logging.info(f"Total: uploaded {total_uploaded} files to staging/ in HuggingFace") + logging.info(f"Total: uploaded {total_uploaded} files to {repo_id}") return total_uploaded +def upload_to_staging_hf( + files_with_paths: List[Tuple[Path, str]], + version: str, + hf_repo_name: str = "policyengine/policyengine-us-data", + hf_repo_type: str = "model", + batch_size: int = 50, + run_id: str = "", +) -> int: + """ + Upload files to staging/ paths in HuggingFace. + + Args: + files_with_paths: List of (local_path, relative_path) tuples + relative_path is like "states/AL.h5" + version: Version string for commit message + hf_repo_name: HuggingFace repository name + hf_repo_type: Repository type + batch_size: Number of files per commit batch + + Returns: + Number of files uploaded + """ + staging_prefix = f"staging/{run_id}" if run_id else "staging" + repo_paths = [ + (str(local), f"{staging_prefix}/{rel}") for local, rel in files_with_paths + ] + return _batched_hf_upload( + repo_paths, + hf_repo_name, + hf_repo_type, + f"Upload to staging for version {version}", + batch_size, + ) + + def promote_staging_to_production_hf( files: List[str], version: str, @@ -466,6 +506,41 @@ def cleanup_staging_hf( return len(files) +PIPELINE_HF_REPO = "PolicyEngine/policyengine-us-data-pipeline" + + +def upload_to_pipeline_repo( + files_with_paths: List[Tuple[str, str]], + run_id: str, + repo_name: str = PIPELINE_HF_REPO, + repo_type: str = "model", + batch_size: int = 10, +) -> int: + """Upload files to the pipeline archival HF repo. + + Each file is placed under {run_id}/{path_within_run} in the repo. + + Args: + files_with_paths: List of (local_path, path_within_run) tuples. + path_within_run is like "meta.json" or "artifacts/acs_2022.h5". + run_id: Pipeline run identifier. + repo_name: HuggingFace repository name. + repo_type: Repository type. + batch_size: Number of files per commit batch. + + Returns: + Number of files uploaded. + """ + repo_paths = [(local, f"{run_id}/{path}") for local, path in files_with_paths] + return _batched_hf_upload( + repo_paths, + repo_name, + repo_type, + f"Archive run {run_id}", + batch_size, + ) + + def upload_from_hf_staging_to_gcs( rel_paths: List[str], version: str,