From 0479fa5a99a351095586f32575d389213927cb78 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Thu, 26 Mar 2026 23:27:08 +0100 Subject: [PATCH 1/3] Archive pipeline runs and intermediate artifacts to HF Publish full run records (metadata, diagnostics, intermediate build artifacts) to a dedicated HF model repo (PolicyEngine/policyengine-us-data-pipeline) so run history survives Modal volume deletion. All existing uploads to the main data repo are unchanged. - Add upload_to_pipeline_repo() utility with retry and batching - Mirror meta.json, diagnostics, and validation files on every write - Archive Step 1 intermediate artifacts (acs, puf, extended_cps, etc.) - Archive Step 2 calibration_package_meta.json Co-Authored-By: Claude Opus 4.6 --- modal_app/pipeline.py | 145 ++++++++++++++++++++-- policyengine_us_data/utils/data_upload.py | 68 ++++++++++ 2 files changed, 206 insertions(+), 7 deletions(-) diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index f5fbe361..bd6a56bc 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -108,7 +108,7 @@ def write_run_meta( meta: RunMetadata, vol: modal.Volume, ) -> None: - """Write run metadata to the pipeline volume.""" + """Write run metadata to the pipeline volume and mirror to HF.""" run_dir = Path(RUNS_DIR) / meta.run_id run_dir.mkdir(parents=True, exist_ok=True) meta_path = run_dir / "meta.json" @@ -116,6 +116,11 @@ def write_run_meta( json.dump(meta.to_dict(), f, indent=2) vol.commit() + _mirror_to_pipeline_repo( + meta.run_id, + [[str(meta_path), "meta.json"]], + ) + def read_run_meta( run_id: str, @@ -170,7 +175,7 @@ def archive_diagnostics( vol: modal.Volume, prefix: str = "", ) -> None: - """Archive calibration diagnostics to the run directory.""" + """Archive calibration diagnostics to the run directory and mirror to HF.""" diag_dir = Path(RUNS_DIR) / run_id / "diagnostics" diag_dir.mkdir(parents=True, exist_ok=True) @@ -180,6 +185,7 @@ def archive_diagnostics( "config": f"{prefix}unified_run_config.json", } + written_files = [] for key, filename in file_map.items(): data = result_bytes.get(key) if data: @@ -187,9 +193,13 @@ def archive_diagnostics( with open(path, "wb") as f: f.write(data) print(f" Archived {filename} ({len(data):,} bytes)") + written_files.append([str(path), f"diagnostics/{filename}"]) vol.commit() + if written_files: + _mirror_to_pipeline_repo(run_id, written_files) + def _step_completed(meta: RunMetadata, step: str) -> bool: """Check if a step is marked completed in metadata.""" @@ -248,6 +258,111 @@ def _record_step( write_run_meta(meta, vol) +# ── Pipeline repo archival ──────────────────────────────────────── + + +def _mirror_to_pipeline_repo( + run_id: str, + files_with_paths: list, +) -> None: + """Upload files to the pipeline archival HF repo. + + Non-fatal: logs warnings on failure but does not raise. + + Args: + run_id: Pipeline run identifier. + files_with_paths: List of [local_path, path_within_run] pairs. + """ + if not files_with_paths: + return + + existing = [[p, r] for p, r in files_with_paths if Path(p).exists()] + if not existing: + return + + import json as _json + + pairs_json = _json.dumps(existing) + + try: + result = subprocess.run( + [ + "uv", + "run", + "python", + "-c", + f""" +import json +from policyengine_us_data.utils.data_upload import upload_to_pipeline_repo + +pairs = json.loads('''{pairs_json}''') +count = upload_to_pipeline_repo(pairs, "{run_id}") +print(f"Mirrored {{count}} file(s) to pipeline repo") +""", + ], + cwd="/root/policyengine-us-data", + capture_output=True, + text=True, + env=os.environ.copy(), + ) + if result.returncode != 0: + print(f" WARNING: Pipeline repo mirror failed: {result.stderr[:500]}") + else: + if result.stdout.strip(): + print(f" {result.stdout.strip()}") + except Exception as e: + print(f" WARNING: Pipeline repo mirror error: {e}") + + +# Intermediate artifacts from Step 1 to archive (not shipped elsewhere). +INTERMEDIATE_ARTIFACTS = [ + "acs_2022.h5", + "irs_puf_2015.h5", + "puf_2024.h5", + "extended_cps_2024.h5", + "stratified_extended_cps_2024.h5", + "build_log.txt", + "calibration_log.csv", + "uprating_factors.csv", +] + + +def _archive_build_artifacts(run_id: str) -> None: + """Archive intermediate build artifacts to the pipeline HF repo.""" + artifacts_dir = Path(ARTIFACTS_DIR) + files = [] + for name in INTERMEDIATE_ARTIFACTS: + path = artifacts_dir / name + if path.exists(): + repo_name = name + if name == "calibration_log.csv": + repo_name = "calibration_log_legacy.csv" + files.append([str(path), f"artifacts/{repo_name}"]) + print(f" Archiving {name} ({path.stat().st_size:,} bytes)") + + if files: + print(f" Uploading {len(files)} intermediate artifact(s)...") + _mirror_to_pipeline_repo(run_id, files) + else: + print(" No intermediate artifacts found to archive") + + +def _archive_package_artifacts(run_id: str) -> None: + """Archive calibration package metadata to the pipeline HF repo.""" + meta_path = Path(ARTIFACTS_DIR) / "calibration_package_meta.json" + if meta_path.exists(): + print( + f" Archiving calibration_package_meta.json " + f"({meta_path.stat().st_size:,} bytes)" + ) + _mirror_to_pipeline_repo( + run_id, + [[str(meta_path), "artifacts/calibration_package_meta.json"]], + ) + else: + print(" No calibration_package_meta.json found") + + # ── Include other Modal apps ───────────────────────────────────── # app.include() merges functions from other apps into this one, # ensuring Modal mounts their files and registers their functions @@ -570,6 +685,17 @@ def _write_validation_diagnostics( vol.commit() + # Mirror validation files to pipeline archival HF repo + mirror_files = [] + csv_path = diag_dir / "validation_results.csv" + if csv_path.exists(): + mirror_files.append([str(csv_path), "diagnostics/validation_results.csv"]) + nat_path = diag_dir / "national_validation.txt" + if nat_path.exists(): + mirror_files.append([str(nat_path), "diagnostics/national_validation.txt"]) + if mirror_files: + _mirror_to_pipeline_repo(run_id, mirror_files) + # ── Orchestrator ───────────────────────────────────────────────── @@ -706,11 +832,6 @@ def run_pipeline( skip_enhanced_cps=False, ) - # The build_datasets step produces files in its - # own volume. Key outputs (source_imputed, - # policy_data.db) are staged to HF in step 4. - # TODO(#617): When pipeline_artifacts.py lands, - # call mirror_to_pipeline() here for audit trail. _record_step( meta, "build_datasets", @@ -720,6 +841,11 @@ def run_pipeline( print( f" Completed in {meta.step_timings['build_datasets']['duration_s']}s" ) + + # Archive intermediate build artifacts to pipeline HF repo + print(" Archiving intermediate artifacts...") + pipeline_volume.reload() + _archive_build_artifacts(run_id) else: print("\n[Step 1/5] Build datasets (skipped - completed)") @@ -742,6 +868,11 @@ def run_pipeline( pipeline_volume, ) print(f" Completed in {meta.step_timings['build_package']['duration_s']}s") + + # Archive package metadata to pipeline HF repo + print(" Archiving package metadata...") + pipeline_volume.reload() + _archive_package_artifacts(run_id) else: print("\n[Step 2/5] Build package (skipped - completed)") diff --git a/policyengine_us_data/utils/data_upload.py b/policyengine_us_data/utils/data_upload.py index 90447ca4..9349413e 100644 --- a/policyengine_us_data/utils/data_upload.py +++ b/policyengine_us_data/utils/data_upload.py @@ -466,6 +466,74 @@ def cleanup_staging_hf( return len(files) +PIPELINE_HF_REPO = "PolicyEngine/policyengine-us-data-pipeline" + + +def upload_to_pipeline_repo( + files_with_paths: List[Tuple[str, str]], + run_id: str, + repo_name: str = PIPELINE_HF_REPO, + repo_type: str = "model", + batch_size: int = 10, +) -> int: + """Upload files to the pipeline archival HF repo. + + Each file is placed under {run_id}/{path_within_run} in the repo. + + Args: + files_with_paths: List of (local_path, path_within_run) tuples. + path_within_run is like "meta.json" or "artifacts/acs_2022.h5". + run_id: Pipeline run identifier. + repo_name: HuggingFace repository name. + repo_type: Repository type. + batch_size: Number of files per commit batch. + + Returns: + Number of files uploaded. + """ + token = os.environ.get("HUGGING_FACE_TOKEN") + api = HfApi() + + total_uploaded = 0 + for i in range(0, len(files_with_paths), batch_size): + batch = files_with_paths[i : i + batch_size] + operations = [] + for local_path, path_within_run in batch: + local_path = Path(local_path) + if not local_path.exists(): + logging.warning(f"File {local_path} does not exist, skipping.") + continue + operations.append( + CommitOperationAdd( + path_in_repo=f"{run_id}/{path_within_run}", + path_or_fileobj=str(local_path), + ) + ) + + if not operations: + continue + + hf_create_commit_with_retry( + api=api, + operations=operations, + repo_id=repo_name, + repo_type=repo_type, + token=token, + commit_message=( + f"Archive run {run_id}: batch {i // batch_size + 1} " + f"({len(operations)} files)" + ), + ) + total_uploaded += len(operations) + logging.info( + f"Uploaded batch {i // batch_size + 1}: " + f"{len(operations)} files to {repo_name}/{run_id}/" + ) + + logging.info(f"Total: uploaded {total_uploaded} files to {repo_name}/{run_id}/") + return total_uploaded + + def upload_from_hf_staging_to_gcs( rel_paths: List[str], version: str, From f57da6f757da9a549eea95cc3f66e3667052de20 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Thu, 26 Mar 2026 23:48:57 +0100 Subject: [PATCH 2/3] Address review findings: safety, simplification, error handling - Add 10-minute subprocess timeout to _mirror_to_pipeline_repo - Pass data via env vars instead of f-string code injection - Narrow exception handling to SubprocessError/OSError - Add mirror=False param to write_run_meta for error handlers - Validate HUGGING_FACE_TOKEN before attempting uploads - Extract _batched_hf_upload shared helper (dedup ~30 lines) - Consolidate _archive_build/package_artifacts into _archive_artifacts - Fix TOCTOU race between exists() and stat() with try/except - Build validation mirror list inline instead of re-probing filesystem - Add proper type hints to new functions Co-Authored-By: Claude Opus 4.6 --- modal_app/pipeline.py | 169 +++++++++++----------- policyengine_us_data/utils/data_upload.py | 139 +++++++++--------- 2 files changed, 161 insertions(+), 147 deletions(-) diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index bd6a56bc..3c92c39c 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -107,8 +107,17 @@ def generate_run_id(version: str, sha: str) -> str: def write_run_meta( meta: RunMetadata, vol: modal.Volume, + mirror: bool = True, ) -> None: - """Write run metadata to the pipeline volume and mirror to HF.""" + """Write run metadata to the pipeline volume and optionally mirror to HF. + + Args: + meta: Run metadata to persist. + vol: Modal volume to write to. + mirror: If True, also upload meta.json to the pipeline + archival HF repo. Set to False in error handlers to + avoid network calls that could hang. + """ run_dir = Path(RUNS_DIR) / meta.run_id run_dir.mkdir(parents=True, exist_ok=True) meta_path = run_dir / "meta.json" @@ -116,10 +125,11 @@ def write_run_meta( json.dump(meta.to_dict(), f, indent=2) vol.commit() - _mirror_to_pipeline_repo( - meta.run_id, - [[str(meta_path), "meta.json"]], - ) + if mirror: + _mirror_to_pipeline_repo( + meta.run_id, + [[str(meta_path), "meta.json"]], + ) def read_run_meta( @@ -261,13 +271,29 @@ def _record_step( # ── Pipeline repo archival ──────────────────────────────────────── +_MIRROR_SCRIPT = """\ +import os, json +from policyengine_us_data.utils.data_upload import upload_to_pipeline_repo + +pairs = json.loads(os.environ["_MIRROR_PAIRS"]) +run_id = os.environ["_MIRROR_RUN_ID"] +count = upload_to_pipeline_repo(pairs, run_id) +print(f"Mirrored {count} file(s) to pipeline repo") +""" + +# 10-minute timeout for non-critical archival subprocess calls. +_MIRROR_TIMEOUT_S = 600 + + def _mirror_to_pipeline_repo( run_id: str, - files_with_paths: list, + files_with_paths: list[list[str]], ) -> None: """Upload files to the pipeline archival HF repo. Non-fatal: logs warnings on failure but does not raise. + Uses environment variables (not f-string interpolation) to + pass data to the subprocess, avoiding injection risks. Args: run_id: Pipeline run identifier. @@ -276,91 +302,75 @@ def _mirror_to_pipeline_repo( if not files_with_paths: return - existing = [[p, r] for p, r in files_with_paths if Path(p).exists()] - if not existing: - return - - import json as _json - - pairs_json = _json.dumps(existing) + env = os.environ.copy() + env["_MIRROR_PAIRS"] = json.dumps(files_with_paths) + env["_MIRROR_RUN_ID"] = run_id try: result = subprocess.run( - [ - "uv", - "run", - "python", - "-c", - f""" -import json -from policyengine_us_data.utils.data_upload import upload_to_pipeline_repo - -pairs = json.loads('''{pairs_json}''') -count = upload_to_pipeline_repo(pairs, "{run_id}") -print(f"Mirrored {{count}} file(s) to pipeline repo") -""", - ], + ["uv", "run", "python", "-c", _MIRROR_SCRIPT], cwd="/root/policyengine-us-data", capture_output=True, text=True, - env=os.environ.copy(), + env=env, + timeout=_MIRROR_TIMEOUT_S, ) if result.returncode != 0: print(f" WARNING: Pipeline repo mirror failed: {result.stderr[:500]}") - else: - if result.stdout.strip(): - print(f" {result.stdout.strip()}") - except Exception as e: + elif result.stdout.strip(): + print(f" {result.stdout.strip()}") + except subprocess.TimeoutExpired: + print(f" WARNING: Pipeline repo mirror timed out after {_MIRROR_TIMEOUT_S}s") + except (subprocess.SubprocessError, OSError) as e: print(f" WARNING: Pipeline repo mirror error: {e}") # Intermediate artifacts from Step 1 to archive (not shipped elsewhere). -INTERMEDIATE_ARTIFACTS = [ - "acs_2022.h5", - "irs_puf_2015.h5", - "puf_2024.h5", - "extended_cps_2024.h5", - "stratified_extended_cps_2024.h5", - "build_log.txt", - "calibration_log.csv", - "uprating_factors.csv", -] - +# Maps local filename -> repo-relative path inside artifacts/. +STEP1_ARTIFACTS: dict[str, str] = { + "acs_2022.h5": "artifacts/acs_2022.h5", + "irs_puf_2015.h5": "artifacts/irs_puf_2015.h5", + "puf_2024.h5": "artifacts/puf_2024.h5", + "extended_cps_2024.h5": "artifacts/extended_cps_2024.h5", + "stratified_extended_cps_2024.h5": "artifacts/stratified_extended_cps_2024.h5", + "build_log.txt": "artifacts/build_log.txt", + "calibration_log.csv": "artifacts/calibration_log_legacy.csv", + "uprating_factors.csv": "artifacts/uprating_factors.csv", +} + +STEP2_ARTIFACTS: dict[str, str] = { + "calibration_package_meta.json": "artifacts/calibration_package_meta.json", +} + + +def _archive_artifacts( + run_id: str, + artifact_names: dict[str, str], +) -> None: + """Archive named artifacts from ARTIFACTS_DIR to the pipeline HF repo. -def _archive_build_artifacts(run_id: str) -> None: - """Archive intermediate build artifacts to the pipeline HF repo.""" + Args: + run_id: Pipeline run identifier. + artifact_names: Mapping of {local_filename: repo_relative_path}. + """ artifacts_dir = Path(ARTIFACTS_DIR) - files = [] - for name in INTERMEDIATE_ARTIFACTS: + files: list[list[str]] = [] + for name, repo_path in artifact_names.items(): path = artifacts_dir / name - if path.exists(): - repo_name = name - if name == "calibration_log.csv": - repo_name = "calibration_log_legacy.csv" - files.append([str(path), f"artifacts/{repo_name}"]) - print(f" Archiving {name} ({path.stat().st_size:,} bytes)") + if not path.exists(): + continue + try: + size = path.stat().st_size + except OSError: + continue + files.append([str(path), repo_path]) + print(f" Archiving {name} ({size:,} bytes)") if files: - print(f" Uploading {len(files)} intermediate artifact(s)...") + print(f" Uploading {len(files)} artifact(s)...") _mirror_to_pipeline_repo(run_id, files) else: - print(" No intermediate artifacts found to archive") - - -def _archive_package_artifacts(run_id: str) -> None: - """Archive calibration package metadata to the pipeline HF repo.""" - meta_path = Path(ARTIFACTS_DIR) / "calibration_package_meta.json" - if meta_path.exists(): - print( - f" Archiving calibration_package_meta.json " - f"({meta_path.stat().st_size:,} bytes)" - ) - _mirror_to_pipeline_repo( - run_id, - [[str(meta_path), "artifacts/calibration_package_meta.json"]], - ) - else: - print(" No calibration_package_meta.json found") + print(" No artifacts found to archive") # ── Include other Modal apps ───────────────────────────────────── @@ -585,6 +595,8 @@ def _write_validation_diagnostics( diag_dir = Path(RUNS_DIR) / run_id / "diagnostics" diag_dir.mkdir(parents=True, exist_ok=True) + mirror_files: list[list[str]] = [] + # Write regional validation CSV if validation_rows: csv_columns = [ @@ -611,6 +623,7 @@ def _write_validation_diagnostics( for row in validation_rows: writer.writerow({k: row.get(k, "") for k in csv_columns}) print(f" Wrote {len(validation_rows)} rows to {csv_path}") + mirror_files.append([str(csv_path), "diagnostics/validation_results.csv"]) # Compute summary n_sanity_fail = sum( @@ -682,17 +695,11 @@ def _write_validation_diagnostics( with open(nat_path, "w") as f: f.write(national_output) print(f" Wrote national validation to {nat_path}") + mirror_files.append([str(nat_path), "diagnostics/national_validation.txt"]) vol.commit() # Mirror validation files to pipeline archival HF repo - mirror_files = [] - csv_path = diag_dir / "validation_results.csv" - if csv_path.exists(): - mirror_files.append([str(csv_path), "diagnostics/validation_results.csv"]) - nat_path = diag_dir / "national_validation.txt" - if nat_path.exists(): - mirror_files.append([str(nat_path), "diagnostics/national_validation.txt"]) if mirror_files: _mirror_to_pipeline_repo(run_id, mirror_files) @@ -845,7 +852,7 @@ def run_pipeline( # Archive intermediate build artifacts to pipeline HF repo print(" Archiving intermediate artifacts...") pipeline_volume.reload() - _archive_build_artifacts(run_id) + _archive_artifacts(run_id, STEP1_ARTIFACTS) else: print("\n[Step 1/5] Build datasets (skipped - completed)") @@ -872,7 +879,7 @@ def run_pipeline( # Archive package metadata to pipeline HF repo print(" Archiving package metadata...") pipeline_volume.reload() - _archive_package_artifacts(run_id) + _archive_artifacts(run_id, STEP2_ARTIFACTS) else: print("\n[Step 2/5] Build package (skipped - completed)") @@ -1093,7 +1100,7 @@ def run_pipeline( except Exception as e: meta.status = "failed" meta.error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}" - write_run_meta(meta, pipeline_volume) + write_run_meta(meta, pipeline_volume, mirror=False) print(f"\nPIPELINE FAILED: {e}") print(f"Resume with: --resume-run-id {run_id}") raise diff --git a/policyengine_us_data/utils/data_upload.py b/policyengine_us_data/utils/data_upload.py index 9349413e..5421135c 100644 --- a/policyengine_us_data/utils/data_upload.py +++ b/policyengine_us_data/utils/data_upload.py @@ -273,44 +273,48 @@ def hf_create_commit_with_retry( ) -def upload_to_staging_hf( - files_with_paths: List[Tuple[Path, str]], - version: str, - hf_repo_name: str = "policyengine/policyengine-us-data", - hf_repo_type: str = "model", +def _batched_hf_upload( + files_with_repo_paths: List[Tuple[str, str]], + repo_id: str, + repo_type: str, + commit_message_prefix: str, batch_size: int = 50, - run_id: str = "", ) -> int: - """ - Upload files to staging/ paths in HuggingFace. + """Upload files to a HuggingFace repo in batches with retry. Args: - files_with_paths: List of (local_path, relative_path) tuples - relative_path is like "states/AL.h5" - version: Version string for commit message - hf_repo_name: HuggingFace repository name - hf_repo_type: Repository type - batch_size: Number of files per commit batch + files_with_repo_paths: List of (local_path, path_in_repo) tuples. + repo_id: HuggingFace repository ID. + repo_type: Repository type. + commit_message_prefix: Prefix for commit messages. + batch_size: Number of files per commit batch. Returns: - Number of files uploaded + Number of files uploaded. + + Raises: + EnvironmentError: If HUGGING_FACE_TOKEN is not set. """ token = os.environ.get("HUGGING_FACE_TOKEN") + if not token: + raise EnvironmentError( + "HUGGING_FACE_TOKEN environment variable is not set. " + "Cannot upload to HuggingFace." + ) api = HfApi() total_uploaded = 0 - for i in range(0, len(files_with_paths), batch_size): - batch = files_with_paths[i : i + batch_size] + for i in range(0, len(files_with_repo_paths), batch_size): + batch = files_with_repo_paths[i : i + batch_size] operations = [] - for local_path, rel_path in batch: + for local_path, repo_path in batch: local_path = Path(local_path) if not local_path.exists(): logging.warning(f"File {local_path} does not exist, skipping.") continue - staging_prefix = f"staging/{run_id}" if run_id else "staging" operations.append( CommitOperationAdd( - path_in_repo=f"{staging_prefix}/{rel_path}", + path_in_repo=repo_path, path_or_fileobj=str(local_path), ) ) @@ -318,23 +322,59 @@ def upload_to_staging_hf( if not operations: continue + batch_num = i // batch_size + 1 hf_create_commit_with_retry( api=api, operations=operations, - repo_id=hf_repo_name, - repo_type=hf_repo_type, + repo_id=repo_id, + repo_type=repo_type, token=token, - commit_message=f"Upload batch {i // batch_size + 1} to staging for version {version}", + commit_message=f"{commit_message_prefix}: batch {batch_num} ({len(operations)} files)", ) total_uploaded += len(operations) logging.info( - f"Uploaded batch {i // batch_size + 1}: {len(operations)} files to staging/" + f"Uploaded batch {batch_num}: {len(operations)} files to {repo_id}" ) - logging.info(f"Total: uploaded {total_uploaded} files to staging/ in HuggingFace") + logging.info(f"Total: uploaded {total_uploaded} files to {repo_id}") return total_uploaded +def upload_to_staging_hf( + files_with_paths: List[Tuple[Path, str]], + version: str, + hf_repo_name: str = "policyengine/policyengine-us-data", + hf_repo_type: str = "model", + batch_size: int = 50, + run_id: str = "", +) -> int: + """ + Upload files to staging/ paths in HuggingFace. + + Args: + files_with_paths: List of (local_path, relative_path) tuples + relative_path is like "states/AL.h5" + version: Version string for commit message + hf_repo_name: HuggingFace repository name + hf_repo_type: Repository type + batch_size: Number of files per commit batch + + Returns: + Number of files uploaded + """ + staging_prefix = f"staging/{run_id}" if run_id else "staging" + repo_paths = [ + (str(local), f"{staging_prefix}/{rel}") for local, rel in files_with_paths + ] + return _batched_hf_upload( + repo_paths, + hf_repo_name, + hf_repo_type, + f"Upload to staging for version {version}", + batch_size, + ) + + def promote_staging_to_production_hf( files: List[str], version: str, @@ -491,47 +531,14 @@ def upload_to_pipeline_repo( Returns: Number of files uploaded. """ - token = os.environ.get("HUGGING_FACE_TOKEN") - api = HfApi() - - total_uploaded = 0 - for i in range(0, len(files_with_paths), batch_size): - batch = files_with_paths[i : i + batch_size] - operations = [] - for local_path, path_within_run in batch: - local_path = Path(local_path) - if not local_path.exists(): - logging.warning(f"File {local_path} does not exist, skipping.") - continue - operations.append( - CommitOperationAdd( - path_in_repo=f"{run_id}/{path_within_run}", - path_or_fileobj=str(local_path), - ) - ) - - if not operations: - continue - - hf_create_commit_with_retry( - api=api, - operations=operations, - repo_id=repo_name, - repo_type=repo_type, - token=token, - commit_message=( - f"Archive run {run_id}: batch {i // batch_size + 1} " - f"({len(operations)} files)" - ), - ) - total_uploaded += len(operations) - logging.info( - f"Uploaded batch {i // batch_size + 1}: " - f"{len(operations)} files to {repo_name}/{run_id}/" - ) - - logging.info(f"Total: uploaded {total_uploaded} files to {repo_name}/{run_id}/") - return total_uploaded + repo_paths = [(local, f"{run_id}/{path}") for local, path in files_with_paths] + return _batched_hf_upload( + repo_paths, + repo_name, + repo_type, + f"Archive run {run_id}", + batch_size, + ) def upload_from_hf_staging_to_gcs( From 16032facb8f4093839083c3c8491d0d4a96c1486 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Thu, 26 Mar 2026 23:53:18 +0100 Subject: [PATCH 3/3] Add changelog entry for pipeline HF archival Co-Authored-By: Claude Opus 4.6 --- changelog.d/pipeline-hf-archival.added.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/pipeline-hf-archival.added.md diff --git a/changelog.d/pipeline-hf-archival.added.md b/changelog.d/pipeline-hf-archival.added.md new file mode 100644 index 00000000..7ec9947f --- /dev/null +++ b/changelog.d/pipeline-hf-archival.added.md @@ -0,0 +1 @@ +Archive pipeline runs, diagnostics, and intermediate build artifacts to a dedicated HuggingFace repo (`PolicyEngine/policyengine-us-data-pipeline`) for durable run history that survives Modal volume deletion.