diff --git a/requirements.txt b/requirements.txt index 60c28ad..c98d0ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,5 +13,5 @@ zarr dask dask-jobqueue gcsfs -h5netcdf +h5netcdf[pyfive] typer diff --git a/src/batch_processing/cmd/batch/merge.py b/src/batch_processing/cmd/batch/merge.py index 137c07c..48350b3 100644 --- a/src/batch_processing/cmd/batch/merge.py +++ b/src/batch_processing/cmd/batch/merge.py @@ -19,6 +19,9 @@ def __init__(self, args): self.base_batch_dir = Path(self.exacloud_user_dir, args.batches) self.result_dir = self.base_batch_dir / "all_merged" self.result_dir.mkdir(parents=True, exist_ok=True) + # Prefer netcdf4 for writes because h5netcdf can fail on dimension scales + # in some environments (H5DSis_scale RuntimeError). + self._preferred_write_engine = "netcdf4" def _get_available_batches(self): """Get list of available batch directories, sorted by batch number.""" @@ -29,7 +32,7 @@ def _get_available_batches(self): return sorted(batch_dirs, key=lambda x: get_batch_number(x.name)) def _get_available_output_files(self): - """Get list of output files from the first available batch.""" + """Get list of NetCDF output files from the first available batch.""" batch_dirs = self._get_available_batches() if not batch_dirs: return [] @@ -38,7 +41,43 @@ def _get_available_output_files(self): if not first_batch_output_dir.exists(): return [] - return [f.name for f in first_batch_output_dir.iterdir() if f.is_file()] + #return [f.name for f in first_batch_output_dir.iterdir() if f.is_file()] + + files = [f.name for f in first_batch_output_dir.iterdir() if f.is_file()] + netcdf_files = [name for name in files if name.lower().endswith(".nc")] + skipped_non_netcdf = sorted(set(files) - set(netcdf_files)) + if skipped_non_netcdf: + print( + f"Skipping {len(skipped_non_netcdf)} non-NetCDF output files: " + f"{', '.join(skipped_non_netcdf)}" + ) + self._print_non_netcdf_batch_numbers(batch_dirs) + return netcdf_files + + def _print_non_netcdf_batch_numbers(self, batch_dirs): + """Print batch numbers that contain .js and .txt sidecar files.""" + non_netcdf_batches = {} + + for batch_dir in batch_dirs: + output_dir = batch_dir / "output" + if not output_dir.exists(): + continue + + for output_file in output_dir.iterdir(): + if not output_file.is_file(): + continue + + lower_name = output_file.name.lower() + if lower_name.endswith(".js") or lower_name.endswith(".txt"): + non_netcdf_batches.setdefault(output_file.name, []).append(batch_dir.name) + + if not non_netcdf_batches: + return + + print("Batch folders containing .js/.txt files:") + for filename in sorted(non_netcdf_batches): + batches = sorted(non_netcdf_batches[filename], key=get_batch_number) + print(f" - {filename}: {', '.join(batches)}") def _create_canvas_for_variable(self, output_file, available_batches): """Create a canvas dataset for a specific output file using available batches.""" @@ -159,7 +198,14 @@ def _fill_canvas_with_batches(self, canvas, output_file, available_files, batch_ # Concatenate along the specified dimension if datasets: - combined_ds = xr.concat(datasets, dim=concat_dim) + #combined_ds = xr.concat(datasets, dim=concat_dim) + combined_ds = xr.concat( + datasets, + dim=concat_dim, + data_vars="minimal", + coords="minimal", + compat="override", + ) # Fill the canvas with the combined data if concat_dim == 'y': @@ -179,6 +225,30 @@ def _fill_canvas_with_batches(self, canvas, output_file, available_files, batch_ return canvas + def _write_netcdf_with_fallback(self, dataset, output_file_path): + """Write dataset and auto-switch to netcdf4 if h5netcdf is unstable.""" + output_file_path = Path(output_file_path) + + # Avoid reusing a partially written file from a previous failed attempt. + if output_file_path.exists(): + output_file_path.unlink() + + try: + dataset.to_netcdf(output_file_path.as_posix(), engine=self._preferred_write_engine) + except RuntimeError as e: + if self._preferred_write_engine != "h5netcdf" or "H5DSis_scale" not in str(e): + raise + + print( + f"h5netcdf failed while writing {output_file_path.name} " + f"({e}). Retrying with netcdf4 engine for this and remaining files." + ) + self._preferred_write_engine = "netcdf4" + if output_file_path.exists(): + output_file_path.unlink() + dataset.to_netcdf(output_file_path.as_posix(), engine="netcdf4") + + def _merge_with_canvas(self, output_file, output_path): """Merge output file using canvas approach, handling missing batches gracefully.""" available_batches = self._get_available_batches() @@ -205,8 +275,12 @@ def _merge_with_canvas(self, output_file, output_path): # Save the merged result output_file_path = output_path / output_file print(f"Saving merged {output_file} to {output_file_path}") - canvas.to_netcdf(output_file_path.as_posix(), engine="h5netcdf") - canvas.close() + #canvas.to_netcdf(output_file_path.as_posix(), engine="h5netcdf") + #canvas.close() + try: + self._write_netcdf_with_fallback(canvas, output_file_path) + finally: + canvas.close() def _merge_small_dataset(self, output_file, output_path): """Original merge method for small datasets - kept for compatibility.""" @@ -268,7 +342,18 @@ def _check_status(self): return True # Allow merging to proceed try: - ds = xr.open_mfdataset(available_status_files, engine="h5netcdf", concat_dim="Y", combine="nested") + #ds = xr.open_mfdataset(available_status_files, engine="h5netcdf", concat_dim="Y", combine="nested") + ds = xr.open_mfdataset( + available_status_files, + engine="h5netcdf", + concat_dim="Y", + combine="nested", + data_vars="minimal", + coords="minimal", + compat="override", + decode_cf=False, + decode_times=False, + ) # Replace nan values with -99 to indicate they were originally nan status_values = ds.run_status.values status_values = np.where(np.isnan(status_values), -99, status_values) @@ -377,7 +462,11 @@ def execute(self): run_status_file = self.result_dir / "run_status.nc" if run_status_file.exists(): try: - ds = xr.open_dataset(run_status_file.as_posix(), engine="h5netcdf") + ds = xr.open_dataset( + run_status_file.as_posix(), + engine="h5netcdf", + decode_timedelta=False, + ) # Filter out fill values and get valid runtime values total_runtime_values = ds.total_runtime.values.flatten() diff --git a/src/batch_processing/cmd/batch/split.py b/src/batch_processing/cmd/batch/split.py index cdbade6..921ba44 100644 --- a/src/batch_processing/cmd/batch/split.py +++ b/src/batch_processing/cmd/batch/split.py @@ -116,7 +116,14 @@ def _run_utils(self, batch_dir, batch_input_dir): def _configure(self, index: int, batch_dir: Path) -> None: config_file = batch_dir / "config" / "config.js" - update_config(path=config_file.as_posix(), prefix_value=batch_dir) + #update_config(path=config_file.as_posix(), prefix_value=batch_dir) + scenario_continuation = getattr(self._args, "scenario_continuation", False) + update_config( + path=config_file.as_posix(), + prefix_value=batch_dir, + scenario_continuation=scenario_continuation, + ) + mpi_ranks = max(1, int(getattr(self._args, "mpi_ranks", 1))) if self._args.job_name_prefix: job_name = f"{self._args.job_name_prefix}-{self.base_batch_dir.name}-batch-{index}" @@ -124,6 +131,9 @@ def _configure(self, index: int, batch_dir: Path) -> None: job_name = f"{self.base_batch_dir.name}-batch-{index}" additional_flags = "--no-output-cleanup" if getattr(self._args, 'restart_run', False) else "" + flags_before_max_output = ( + "--no-output-cleanup" if scenario_continuation else "" + ) substitution_values = { "job_name": job_name, @@ -138,6 +148,8 @@ def _configure(self, index: int, batch_dir: Path) -> None: "t": self._args.t, "n": self._args.n, "additional_flags": additional_flags, + "flags_before_max_output": flags_before_max_output, + "mpi_ranks": mpi_ranks, } script_path = batch_dir / "slurm_runner.sh" @@ -236,7 +248,12 @@ def execute(self): ) ds = xr.open_zarr(path) else: - ds = xr.open_dataset(self.input_path / "run-mask.nc", engine="h5netcdf") + #ds = xr.open_dataset(self.input_path / "run-mask.nc", engine="h5netcdf") + ds = xr.open_dataset( + self.input_path / "run-mask.nc", + engine="h5netcdf", + driver_kwds={"backend": "pyfive"}, + ) X, Y = ds.X.size, ds.Y.size print("Dimension size of X:", X) diff --git a/src/batch_processing/cmd/extract_cell.py b/src/batch_processing/cmd/extract_cell.py index 4504d12..05e5cd2 100644 --- a/src/batch_processing/cmd/extract_cell.py +++ b/src/batch_processing/cmd/extract_cell.py @@ -85,6 +85,9 @@ def _write_slurm_runner(self): "s": self._args.s, "t": self._args.t, "n": self._args.n, + "additional_flags": "", + "flags_before_max_output": "", + "mpi_ranks": 1, } ) diff --git a/src/batch_processing/cmd/init.py b/src/batch_processing/cmd/init.py index 31a3d14..134be77 100644 --- a/src/batch_processing/cmd/init.py +++ b/src/batch_processing/cmd/init.py @@ -1,3 +1,5 @@ +import sys +import shlex import json import subprocess from pathlib import Path @@ -14,6 +16,7 @@ def __init__(self, args): super().__init__(basedir=basedir) self._args = args self._compile = getattr(args, "compile", False) + self._branch = getattr(args, "branch", None) def execute(self): if self.user == "root": @@ -28,13 +31,16 @@ def execute(self): if self._compile: # Clone from GitHub and compile print(f"[bold blue]Cloning dvm-dos-tem to {self.dvmdostem_path} directory...[/bold blue]") + branch_opt = f"-b {shlex.quote(self._branch)} " if self._branch else "" subprocess.run( - f"git clone https://github.com/uaf-arctic-eco-modeling/dvm-dos-tem.git {self.dvmdostem_path}", + f"git clone {branch_opt}https://github.com/uaf-arctic-eco-modeling/dvm-dos-tem.git {shlex.quote(str(self.dvmdostem_path))}", shell=True, check=True, executable="/bin/bash", ) print(f"[bold green]dvm-dos-tem is cloned to {self.dvmdostem_path}[/bold green]") + subprocess.run("which python",shell=True, check=True, executable="/bin/bash") + #subprocess.run(f"pipx install {self.dvmdostem_path}/pyddt",shell=True, check=True, executable="/bin/bash") print("[bold blue]Compiling dvmdostem binary...[/bold blue]") command = f""" @@ -55,14 +61,17 @@ def execute(self): print(f"[bold blue]Copying dvm-dos-tem to {self.dvmdostem_path} directory...[/bold blue]") download_directory("gcp-slurm", "dvm-dos-tem/", basedir) print(f"[bold green]dvm-dos-tem is copied to {self.dvmdostem_path}[/bold green]") + if self._branch: + print("[bold yellow]--branch is ignored unless --compile is specified.[/bold yellow]") + subprocess.run([f"chmod +x {self.dvmdostem_bin_path}"], shell=True, check=True) # Make all Python scripts in scripts directory executable (recursively) - subprocess.run( - f"find {self.dvmdostem_scripts_path} -name '*.py' -exec chmod +x {{}} \\;", - shell=True, - check=True - ) + #subprocess.run( + # f"find {self.dvmdostem_scripts_path} -name '*.py' -exec chmod +x {{}} \\;", + # shell=True, + # check=True + #) if Path(self.output_spec_path).exists(): print("[bold yellow]output_spec.csv already exists, using current file...[/bold yellow]") diff --git a/src/batch_processing/main.py b/src/batch_processing/main.py index 17d1f39..95bc66c 100644 --- a/src/batch_processing/main.py +++ b/src/batch_processing/main.py @@ -1,8 +1,17 @@ +import logging +import os import typer from typing import Optional from enum import Enum import textwrap +# Force h5netcdf to use pyfive backend to avoid H5DSget_num_scales errors with +# NetCDF4 files that have complex dimension scale metadata (e.g. from DVMDOSTEM) +os.environ.setdefault("H5NETCDF_READ_BACKEND", "pyfive") + +# Suppress pyfive's verbose INFO-level logging (file access messages) +logging.getLogger("pyfive").setLevel(logging.WARNING) + import lazy_import from batch_processing.utils.utils import get_email_from_username @@ -96,9 +105,14 @@ def init( "--compile", help="Clone dvm-dos-tem from GitHub and compile it instead of copying pre-built version from bucket", ), + branch: Optional[str] = typer.Option( + None, + "--branch", + help="Git branch of dvm-dos-tem to clone (used only with --compile)", + ), ): """Initialize the environment for running the simulation.""" - args = type("Args", (), {"basedir": basedir, "compile": compile})() + args = type("Args", (), {"basedir": basedir, "compile": compile, "branch": branch})() InitCommand(args).execute() @@ -238,6 +252,23 @@ def batch_split( restart_run: bool = typer.Option( False, "--restart-run", help="Add --no-output-cleanup flag to mpirun command" ), + scenario_continuation: bool = typer.Option( + False, + "-sc", + "--scenario-continuation", + help=( + "Set restart_from to output/restart-tr.nc and add --no-output-cleanup " + "before --max-output-volume in slurm runner" + ), + ), + mpi_ranks: int = typer.Option( + 1, + "--mpi-ranks", + help=( + "Number of MPI ranks per batch job. " + "Default is 1 to avoid Lustre NetCDF restart write failures." + ), + ), ): """Split the given input data into smaller batches.""" # Create args object for compatibility with command class @@ -254,6 +285,8 @@ def batch_split( "log_level": log_level.value, "job_name_prefix": job_name_prefix, "restart_run": restart_run, + "scenario_continuation": scenario_continuation, + "mpi_ranks": mpi_ranks, } args = type("Args", (), all_args)() BatchSplitCommand(args).execute() diff --git a/src/batch_processing/templates/slurm_runner.sh b/src/batch_processing/templates/slurm_runner.sh index cf616c6..2fa8acd 100644 --- a/src/batch_processing/templates/slurm_runner.sh +++ b/src/batch_processing/templates/slurm_runner.sh @@ -11,8 +11,19 @@ ulimit -s unlimited ulimit -l unlimited -. /dependencies/setup-env.sh -. /etc/profile.d/z00_lmod.sh -module load openmpi +source /etc/profile.d/z00_lmod.sh +module purge +module use /mnt/exacloud/lustre/modulefiles +module avail -mpirun --use-hwthread-cpus $dvmdostem_binary -f $config_path -l $log_level --max-output-volume=-1 $additional_flags -p $p -e $e -s $s -t $t -n $n \ No newline at end of file +module load openmpi/v4.1.x +module load dvmdostem-deps/2026-02 + +# Suppress PMIx compression library warning (optional, cosmetic) +export PMIX_MCA_pcompress_base_silence_warning=1 + +# Lustre: disable HDF5 file locking (incompatible with Lustre without flock) +export HDF5_USE_FILE_LOCKING=FALSE + +# OpenMPI 4.1.x: use ROMIO instead of buggy OMPIO for NetCDF/HDF5 parallel I/O +mpirun -x HDF5_USE_FILE_LOCKING -x PMIX_MCA_pcompress_base_silence_warning --use-hwthread-cpus --mca io ^ompio $dvmdostem_binary -f $config_path -l $log_level $flags_before_max_output --max-output-volume=-1 $additional_flags -p $p -e $e -s $s -t $t -n $n diff --git a/src/batch_processing/utils/utils.py b/src/batch_processing/utils/utils.py index c56367c..e40d099 100644 --- a/src/batch_processing/utils/utils.py +++ b/src/batch_processing/utils/utils.py @@ -64,6 +64,7 @@ "fri_fire_file": "input/fri-fire.nc", "hist_exp_fire_file": "input/historic-explicit-fire.nc", "proj_exp_fire_file": "input/projected-explicit-fire.nc", + "restart_from":"output/", } @@ -543,7 +544,10 @@ def submit_job(path: str) -> CompletedProcess: return subprocess.run(command, text=True, capture_output=True) -def update_config(path: str, prefix_value: str) -> None: +#def update_config(path: str, prefix_value: str) -> None: +def update_config( + path: str, prefix_value: str, scenario_continuation: bool = False +) -> None: """Updates the 'IO' section of config.js with new paths. This function reads the JSON configuration file, modifies the 'IO' section @@ -553,13 +557,17 @@ def update_config(path: str, prefix_value: str) -> None: Args: path (str): The file system path to the JSON configuration file to be updated. prefix_value (str): The new prefix to be added to the paths in the 'IO' section. + scenario_continuation (bool): If True, set restart_from to output/restart-tr.nc. Returns: None """ config_data = read_json_file(path) for key, val in IO_PATHS.items(): - config_data["IO"][key] = f"{prefix_value}/{val}" + if key == "restart_from" and scenario_continuation: + config_data["IO"][key] = f"{prefix_value}/output/restart-tr.nc" + else: + config_data["IO"][key] = f"{prefix_value}/{val}" write_json_file(path, config_data)