diff --git a/changelog/610.feature.md b/changelog/610.feature.md new file mode 100644 index 000000000..9b3b14e27 --- /dev/null +++ b/changelog/610.feature.md @@ -0,0 +1 @@ +Added `ref executions reingest` command to re-ingest existing execution results without re-running diagnostics. Creates a new immutable execution record with a timestamped output fragment, leaving the original execution untouched. diff --git a/packages/climate-ref-core/src/climate_ref_core/env.py b/packages/climate-ref-core/src/climate_ref_core/env.py index 75b33d1dd..03edaf838 100644 --- a/packages/climate-ref-core/src/climate_ref_core/env.py +++ b/packages/climate-ref-core/src/climate_ref_core/env.py @@ -27,7 +27,7 @@ def get_env() -> Env: # Load the environment variables from the .env file # This will override any defaults set above - env.read_env(verbose=True) + env.read_env() return env diff --git a/packages/climate-ref/src/climate_ref/cli/executions.py b/packages/climate-ref/src/climate_ref/cli/executions.py index 688949987..32f6d567b 100644 --- a/packages/climate-ref/src/climate_ref/cli/executions.py +++ b/packages/climate-ref/src/climate_ref/cli/executions.py @@ -741,6 +741,140 @@ def stats( pretty_print_df(results_df, console=console) +@app.command() +def reingest( # noqa: PLR0913 + ctx: typer.Context, + group_ids: Annotated[ + list[int] | None, + typer.Argument(help="Execution group IDs to reingest. If omitted, uses filters."), + ] = None, + provider: Annotated[ + list[str] | None, + typer.Option( + help="Filter by provider slug (substring match, case-insensitive). " + "Multiple values can be provided." + ), + ] = None, + diagnostic: Annotated[ + list[str] | None, + typer.Option( + help="Filter by diagnostic slug (substring match, case-insensitive). " + "Multiple values can be provided." + ), + ] = None, + include_failed: Annotated[ + bool, + typer.Option( + "--include-failed", + help="Also attempt reingest on failed executions.", + ), + ] = False, + dry_run: Annotated[ + bool, + typer.Option( + "--dry-run", + help="Show what would be reingested without making changes.", + ), + ] = False, + force: bool = typer.Option(False, help="Skip confirmation prompt"), +) -> None: + """ + Reingest existing executions without re-running diagnostics. + + Re-runs build_execution_result() on existing output files and re-ingests + the results into the database. Useful when new series definitions or + metadata extraction logic has been added. + + A new Execution record is always created under the same ExecutionGroup, + leaving the original execution untouched. Results are treated as immutable. + + The dirty flag is never modified by this command. + """ + import pandas as pd + + from climate_ref.executor.reingest import ( + get_executions_for_reingest, + reingest_execution, + ) + from climate_ref.provider_registry import ProviderRegistry + + config: Config = ctx.obj.config + db = ctx.obj.database + console = ctx.obj.console + + if not any([group_ids, provider, diagnostic]): + logger.error( + "At least one filter is required (group IDs, --provider, or --diagnostic). " + "This prevents accidental reingest of all executions." + ) + raise typer.Exit(code=1) + + provider_registry = ProviderRegistry.build_from_config(config, db) + + results = get_executions_for_reingest( + db, + execution_group_ids=group_ids, + provider_filters=provider, + diagnostic_filters=diagnostic, + include_failed=include_failed, + ) + + if not results: + console.print("No executions found matching the specified criteria.") + return + + preview_df = pd.DataFrame( + [ + { + "group_id": eg.id, + "execution_id": ex.id, + "provider": eg.diagnostic.provider.slug, + "diagnostic": eg.diagnostic.slug, + "key": eg.key, + "successful": ex.successful, + } + for eg, ex in results + ] + ) + + if dry_run: + console.print(f"[bold]Dry run:[/] would reingest {len(results)} execution(s):") + pretty_print_df(preview_df, console=console) + return + + console.print(f"Will reingest {len(results)} execution(s):") + pretty_print_df(preview_df, console=console) + + if not force: + if not typer.confirm("\nProceed with reingest?"): + console.print("Reingest cancelled.") + return + + # Ensure any autobegun transaction from the preview queries is closed + # so each reingest runs in its own top-level transaction (not a savepoint). + if db.session.in_transaction(): + db.session.commit() + + # Process each execution in a separate transaction + success_count = 0 + skip_count = 0 + for eg, ex in results: + with db.session.begin(): + ok = reingest_execution( + config=config, + database=db, + execution=ex, + provider_registry=provider_registry, + ) + + if ok: + success_count += 1 + else: + skip_count += 1 + + console.print(f"\n[green]Reingest complete:[/] {success_count} succeeded, {skip_count} skipped.") + + @app.command() def flag_dirty(ctx: typer.Context, execution_id: int) -> None: """ diff --git a/packages/climate-ref/src/climate_ref/data_catalog.py b/packages/climate-ref/src/climate_ref/data_catalog.py index e64425104..00ba2b1ec 100644 --- a/packages/climate-ref/src/climate_ref/data_catalog.py +++ b/packages/climate-ref/src/climate_ref/data_catalog.py @@ -81,13 +81,13 @@ def finalise(self, subset: pd.DataFrame) -> pd.DataFrame: Finalise unfinalised datasets in the given subset. If the adapter supports finalization (implements FinaliseableDatasetAdapterMixin), - unfinalised datasets in the subset are finalized by opening their files. + unfinalised datasets in the subset are finalised by opening their files. The internal cache and database are updated accordingly. Parameters ---------- subset - DataFrame subset to finalize (typically after filter+group_by) + DataFrame subset to finalise (typically after filter+group_by) Returns ------- diff --git a/packages/climate-ref/src/climate_ref/executor/__init__.py b/packages/climate-ref/src/climate_ref/executor/__init__.py index 9dd61acfb..d5bd99ce3 100644 --- a/packages/climate-ref/src/climate_ref/executor/__init__.py +++ b/packages/climate-ref/src/climate_ref/executor/__init__.py @@ -21,4 +21,9 @@ from .result_handling import handle_execution_result from .synchronous import SynchronousExecutor -__all__ = ["HPCExecutor", "LocalExecutor", "SynchronousExecutor", "handle_execution_result"] +__all__ = [ + "HPCExecutor", + "LocalExecutor", + "SynchronousExecutor", + "handle_execution_result", +] diff --git a/packages/climate-ref/src/climate_ref/executor/fragment.py b/packages/climate-ref/src/climate_ref/executor/fragment.py new file mode 100644 index 000000000..86a31965c --- /dev/null +++ b/packages/climate-ref/src/climate_ref/executor/fragment.py @@ -0,0 +1,44 @@ +""" +Helpers for allocating non-colliding output fragment paths. +""" + +import datetime +from pathlib import Path + + +def allocate_output_fragment(base_fragment: str, results_dir: Path) -> str: + """ + Return a unique output fragment by appending a UTC timestamp. + + The returned fragment is ``{base_fragment}_{YYYYMMDDTHHMMSSffffff}``, which is + practically unique without needing any database or filesystem lookups. + Microsecond resolution avoids collisions from rapid successive calls. + + Parameters + ---------- + base_fragment + The natural fragment, e.g. ``provider/diagnostic/dataset_hash`` + results_dir + The results root directory. Used to verify the allocated fragment + does not already exist on disk. + + Returns + ------- + : + A new fragment with a timestamp suffix + + Raises + ------ + FileExistsError + If the computed output directory already exists under *results_dir* + """ + now = datetime.datetime.now(tz=datetime.timezone.utc) + fragment = f"{base_fragment}_{now.strftime('%Y%m%dT%H%M%S%f')}" + + target = results_dir / fragment + if target.exists(): + raise FileExistsError( + f"Output directory already exists: {target}. Cannot allocate fragment '{fragment}'." + ) + + return fragment diff --git a/packages/climate-ref/src/climate_ref/executor/reingest.py b/packages/climate-ref/src/climate_ref/executor/reingest.py new file mode 100644 index 000000000..fbbb98fda --- /dev/null +++ b/packages/climate-ref/src/climate_ref/executor/reingest.py @@ -0,0 +1,378 @@ +""" +Reingest existing execution results without re-running diagnostics. + +This module provides functionality to re-run ``build_execution_result()`` and +re-ingest the results into the database for executions that have already completed. +This is useful when new series definitions or metadata extraction logic has been added +and you want to apply it to existing outputs without re-executing the diagnostics. + +Reingest always creates a new ``Execution`` record under the same ``ExecutionGroup`` +with its own output directory, leaving the original execution untouched. +Results are treated as immutable. +""" + +import shutil +from collections import defaultdict +from collections.abc import Sequence +from typing import TYPE_CHECKING, Any + +import pandas as pd +from loguru import logger + +from climate_ref.datasets import get_slug_column +from climate_ref.executor.fragment import allocate_output_fragment +from climate_ref.executor.result_handling import ingest_execution_result +from climate_ref.models.execution import ( + Execution, + ExecutionGroup, + execution_datasets, + get_execution_group_and_latest_filtered, +) +from climate_ref_core.datasets import ( + DatasetCollection, + ExecutionDatasetCollection, + SourceDatasetType, +) +from climate_ref_core.diagnostics import ExecutionDefinition +from climate_ref_core.pycmec.controlled_vocabulary import CV + +if TYPE_CHECKING: + from pathlib import Path + + from climate_ref.config import Config + from climate_ref.database import Database + from climate_ref.models.dataset import Dataset + from climate_ref.provider_registry import ProviderRegistry + from climate_ref_core.diagnostics import Diagnostic, ExecutionResult + + +def reconstruct_execution_definition( + config: "Config", + execution: Execution, + diagnostic: "Diagnostic", +) -> ExecutionDefinition: + """ + Reconstruct an ``ExecutionDefinition`` from database state. + + This rebuilds the definition that was originally used to produce the execution, + using the execution's stored datasets, output fragment, and the live diagnostic + object from the provider registry. + + Parameters + ---------- + config + Application configuration (provides ``paths.results``) + execution + The database ``Execution`` record to reconstruct from + diagnostic + The live ``Diagnostic`` instance resolved from the provider registry + + Returns + ------- + : + A reconstructed ``ExecutionDefinition`` pointing at the scratch directory + """ + execution_group = execution.execution_group + + # Build DatasetCollection per source type from the execution's linked datasets + datasets_by_type: dict[SourceDatasetType, list[Any]] = defaultdict(list) + for dataset in execution.datasets: + datasets_by_type[dataset.dataset_type].append(dataset) + + collection: dict[SourceDatasetType | str, DatasetCollection] = {} + for source_type, ds_list in datasets_by_type.items(): + slug_column = get_slug_column(source_type) + + # Build a DataFrame from the DB dataset records and their files + rows = [] + for dataset in ds_list: + # Get all attributes from the polymorphic dataset model + dataset_attrs = _extract_dataset_attributes(dataset) + for file in dataset.files: + row = { + **dataset_attrs, + "path": file.path, + "start_time": file.start_time, + "end_time": file.end_time, + } + if hasattr(file, "tracking_id"): + row["tracking_id"] = file.tracking_id + rows.append((dataset.id, row)) + + if rows: + index = [r[0] for r in rows] + data = [r[1] for r in rows] + df = pd.DataFrame(data, index=index) + else: + df = pd.DataFrame() + + # Retrieve the selector for this source type from the execution group + selector_key = source_type.value + selector = tuple(tuple(pair) for pair in execution_group.selectors.get(selector_key, [])) + + collection[source_type] = DatasetCollection( + datasets=df, + slug_column=slug_column, + selector=selector, + ) + + # Point at the scratch directory -- the caller is expected to copy results + # to scratch before calling build_execution_result() to avoid mutating + # the live results tree. + output_directory = config.paths.scratch / execution.output_fragment + + return ExecutionDefinition( + diagnostic=diagnostic, + key=execution_group.key, + datasets=ExecutionDatasetCollection(collection), + root_directory=config.paths.scratch, + output_directory=output_directory, + ) + + +def _extract_dataset_attributes(dataset: "Dataset") -> dict[str, object]: + """ + Extract all column values from a polymorphic dataset model as a dict. + + Introspects the SQLAlchemy mapper to get all mapped columns for the concrete + dataset type (e.g. CMIP6Dataset, Obs4MIPsDataset). + """ + attrs = {} + # Get columns from the concrete mapper (handles polymorphic inheritance) + mapper = type(dataset).__mapper__ + for column in mapper.columns: + col_name = column.key + # Skip internal/FK columns + if col_name in ("id", "dataset_type"): + continue + val = getattr(dataset, col_name, None) + if val is not None: + attrs[col_name] = val + return attrs + + +def _validate_path_containment(path: "Path", base: "Path", label: str) -> None: + """ + Check that *path* stays within *base* after resolving symlinks and ``..`` segments. + + Raises + ------ + ValueError + If the resolved *path* escapes *base* + """ + if not path.resolve().is_relative_to(base.resolve()): + msg = f"Computed {label} path {path} escapes {base}." + raise ValueError(msg) + + +def _build_execution_result( + config: "Config", + execution: Execution, + provider_registry: "ProviderRegistry", +) -> "tuple[ExecutionResult, Path] | None": + """ + Resolve the diagnostic, validate paths, and build the execution result. + + Returns the result and scratch directory on success, or None on failure. + """ + execution_group = execution.execution_group + diagnostic_model = execution_group.diagnostic + provider_slug = diagnostic_model.provider.slug + diagnostic_slug = diagnostic_model.slug + + try: + diagnostic = provider_registry.get_metric(provider_slug, diagnostic_slug) + except KeyError: + logger.error( + f"Could not resolve diagnostic {provider_slug}/{diagnostic_slug} " + f"from provider registry. Skipping execution {execution.id}." + ) + return None + + scratch_dir = config.paths.scratch / execution.output_fragment + try: + _validate_path_containment(scratch_dir, config.paths.scratch, "scratch") + except ValueError: + logger.error(f"Skipping execution {execution.id}: scratch path escapes base.") + return None + if not scratch_dir.exists(): + logger.error(f"Scratch directory does not exist: {scratch_dir}. Skipping execution {execution.id}.") + return None + + definition = reconstruct_execution_definition(config, execution, diagnostic) + + try: + result = diagnostic.build_execution_result(definition) + except Exception: + logger.exception( + f"build_execution_result failed for execution {execution.id} " + f"({provider_slug}/{diagnostic_slug}). Skipping." + ) + return None + + if not result.successful or result.metric_bundle_filename is None: + logger.warning( + f"build_execution_result returned unsuccessful result for execution {execution.id}. Skipping." + ) + return None + + return result, scratch_dir + + +def reingest_execution( + config: "Config", + database: "Database", + execution: Execution, + provider_registry: "ProviderRegistry", +) -> bool: + """ + Reingest an existing execution. + + Re-runs ``build_execution_result()`` against the scratch directory + (which contains the raw outputs from the original diagnostic run), + creates a new ``Execution`` record with a unique output fragment, + copies results to the new location, and ingests metrics into the database. + + The original execution is left untouched. + + Parameters + ---------- + config + Application configuration + database + Database instance + execution + The ``Execution`` record to reingest + provider_registry + Registry of active providers (used to resolve the live diagnostic) + + Returns + ------- + : + True if reingest was successful, False if it was skipped due to an error + """ + built = _build_execution_result(config, execution, provider_registry) + if built is None: + return False + result, scratch_dir = built + + execution_group = execution.execution_group + + # Allocate a new output fragment with a timestamp suffix + new_fragment = allocate_output_fragment(execution.output_fragment, config.paths.results) + + # Copy scratch tree to the new results location + dst_dir = config.paths.results / new_fragment + try: + _validate_path_containment(dst_dir, config.paths.results, "results") + except ValueError: + logger.error(f"Skipping execution {execution.id}: results path escapes base.") + return False + dst_dir.parent.mkdir(parents=True, exist_ok=True) + shutil.copytree(scratch_dir, dst_dir) + + cv = CV.load_from_file(config.paths.dimensions_cv) + + try: + with database.session.begin_nested(): + # Create new Execution record + new_execution = Execution( + execution_group=execution_group, + dataset_hash=execution.dataset_hash, + output_fragment=new_fragment, + ) + database.session.add(new_execution) + database.session.flush() + + # Copy dataset links from the original execution + for dataset in execution.datasets: + database.session.execute( + execution_datasets.insert().values( + execution_id=new_execution.id, + dataset_id=dataset.id, + ) + ) + + # Use scratch dir as the output base path since build_execution_result + # may produce absolute paths under scratch in output bundles + ingest_execution_result( + database, + new_execution, + result, + cv, + output_base_path=scratch_dir, + ) + + assert result.metric_bundle_filename is not None + new_execution.mark_successful(result.as_relative_path(result.metric_bundle_filename)) + except Exception: + logger.exception(f"Ingestion failed for execution {execution.id}. Rolling back changes.") + # Clean up the copied directory on failure + if dst_dir.exists(): + shutil.rmtree(dst_dir) + return False + + logger.info(f"Successfully reingested execution {execution.id} -> new execution {new_execution.id}") + return True + + +def get_executions_for_reingest( + database: "Database", + *, + execution_group_ids: Sequence[int] | None = None, + provider_filters: list[str] | None = None, + diagnostic_filters: list[str] | None = None, + include_failed: bool = False, +) -> list[tuple[ExecutionGroup, Execution]]: + """ + Query executions eligible for reingest. + + Always selects the **oldest** (original) execution per group so that + reingest uses the execution whose scratch directory actually exists. + Reingested executions only have results directories, not scratch. + + Parameters + ---------- + database + Database instance + execution_group_ids + If provided, only include these execution group IDs + provider_filters + Filter by provider slug (substring, case-insensitive) + diagnostic_filters + Filter by diagnostic slug (substring, case-insensitive) + include_failed + If True, also include failed executions + + Returns + ------- + : + List of (ExecutionGroup, oldest Execution) tuples + """ + # Use the existing filtered query to identify matching execution groups + results = get_execution_group_and_latest_filtered( + database.session, + diagnostic_filters=diagnostic_filters, + provider_filters=provider_filters, + successful=None if include_failed else True, + ) + + # Filter by execution group IDs if provided + if execution_group_ids: + id_set = set(execution_group_ids) + results = [(eg, ex) for eg, ex in results if eg.id in id_set] + + # Filter out entries with no execution, then select the oldest per group. + # ExecutionGroup.executions is ordered by created_at ascending, + # so [0] is the original execution whose scratch directory exists. + seen: set[int] = set() + out: list[tuple[ExecutionGroup, Execution]] = [] + for eg, ex in results: + if ex is None or eg.id in seen: + continue + seen.add(eg.id) + oldest = eg.executions[0] + if not include_failed and not oldest.successful: + continue + out.append((eg, oldest)) + return out diff --git a/packages/climate-ref/src/climate_ref/executor/result_handling.py b/packages/climate-ref/src/climate_ref/executor/result_handling.py index 49279db2b..9f935360b 100644 --- a/packages/climate-ref/src/climate_ref/executor/result_handling.py +++ b/packages/climate-ref/src/climate_ref/executor/result_handling.py @@ -66,77 +66,85 @@ def _copy_file_to_results( shutil.copy(input_directory / filename, output_filename) -def _process_execution_scalar( +def ingest_scalar_values( database: Database, result: "ExecutionResult", execution: Execution, cv: CV, ) -> None: """ - Process the scalar values from the execution result and store them in the database + Load, validate, and bulk-insert scalar metric values. - This also validates the scalar values against the controlled vocabulary + Parameters + ---------- + database + The active database session to use + result + The execution result containing the metric bundle filename + execution + The execution record to associate values with + cv + The controlled vocabulary to validate against + + Notes + ----- + Callers are responsible for transaction boundaries; this function does not + open a nested transaction or catch exceptions. """ - # Load the metric bundle from the file cmec_metric_bundle = CMECMetric.load_from_json(result.to_output_path(result.metric_bundle_filename)) - # Check that the diagnostic values conform with the controlled vocabulary try: cv.validate_metrics(cmec_metric_bundle) except (ResultValidationError, AssertionError): # TODO: Remove once we have settled on a controlled vocabulary - logger.exception("Diagnostic values do not conform with the controlled vocabulary") - # execution.mark_failed() + logger.warning( + "Diagnostic scalar values do not conform with the controlled vocabulary", exc_info=True + ) - # Perform a bulk insert of scalar values - # The current implementation will swallow the exception, but display a log message - try: - scalar_values = [ + new_values = [] + for metric_result in cmec_metric_bundle.iter_results(): + new_values.append( { "execution_id": execution.id, - "value": result.value, - "attributes": result.attributes, - **result.dimensions, + "value": metric_result.value, + "attributes": metric_result.attributes, + **metric_result.dimensions, } - for result in cmec_metric_bundle.iter_results() - ] - logger.debug(f"Ingesting {len(scalar_values)} scalar values for execution {execution.id}") - if scalar_values: - # Perform this in a nested transaction to rollback if something goes wrong - # We will lose the metric values for a given execution, but not the whole execution - with database.session.begin_nested(): - database.session.execute( - insert(ScalarMetricValue), - scalar_values, - ) - # This is a broad exception catch to ensure we log any issues - except Exception: - logger.exception("Something went wrong when ingesting diagnostic values") + ) + logger.debug(f"Ingesting {len(new_values)} scalar values for execution {execution.id}") -def _process_execution_series( - config: "Config", + if new_values: + database.session.execute(insert(ScalarMetricValue), new_values) + + +def ingest_series_values( database: Database, result: "ExecutionResult", execution: Execution, cv: CV, ) -> None: """ - Process the series values from the execution result and store them in the database + Load, validate, and bulk-insert series metric values. - This also copies the series values file from the scratch directory to the results directory - and validates the series values against the controlled vocabulary. + Parameters + ---------- + database + The active database session to use + result + The execution result containing the series filename + execution + The execution record to associate values with + cv + The controlled vocabulary to validate against + + Notes + ----- + Callers are responsible for transaction boundaries; this function does not + open a nested transaction or catch exceptions. """ assert result.series_filename, "Series filename must be set in the result" - _copy_file_to_results( - config.paths.scratch, - config.paths.results, - execution.output_fragment, - result.series_filename, - ) - - # Load the series values from the file series_values_path = result.to_output_path(result.series_filename) series_values = TSeries.load_from_json(series_values_path) @@ -144,12 +152,13 @@ def _process_execution_series( cv.validate_metrics(series_values) except (ResultValidationError, AssertionError): # TODO: Remove once we have settled on a controlled vocabulary - logger.exception("Diagnostic values do not conform with the controlled vocabulary") - # execution.mark_failed() + logger.warning( + "Diagnostic series values do not conform with the controlled vocabulary", exc_info=True + ) - # Perform a bulk insert of series values - try: - series_values_content = [ + new_values = [] + for series_result in series_values: + new_values.append( { "execution_id": execution.id, "values": series_result.values, @@ -158,19 +167,123 @@ def _process_execution_series( "index_name": series_result.index_name, **series_result.dimensions, } - for series_result in series_values - ] - logger.debug(f"Ingesting {len(series_values)} series values for execution {execution.id}") - if series_values: - # Perform this in a nested transaction to rollback if something goes wrong - # We will lose the metric values for a given execution, but not the whole execution - with database.session.begin_nested(): - database.session.execute( - insert(SeriesMetricValue), - series_values_content, - ) - except Exception: - logger.exception("Something went wrong when ingesting diagnostic series values") + ) + + logger.debug(f"Ingesting {len(new_values)} series values for execution {execution.id}") + + if new_values: + database.session.execute(insert(SeriesMetricValue), new_values) + + +def ingest_execution_result( + database: Database, + execution: Execution, + result: "ExecutionResult", + cv: CV, + *, + output_base_path: pathlib.Path, +) -> None: + """ + Ingest a successful execution result into the database. + + Registers output entries and ingests scalar and series metric values. + + Parameters + ---------- + database + The active database session to use + execution + The execution record to associate results with + result + The successful execution result + cv + The controlled vocabulary to validate metrics against + output_base_path + Primary base directory for resolving output filenames + + Notes + ----- + Callers are responsible for: + + * File copying (scratch -> results) + * Transaction boundaries + * Marking the execution as successful (``execution.mark_successful()``) + * Setting the dirty flag on the execution group + """ + if result.output_bundle_filename: + cmec_output_bundle = CMECOutput.load_from_json(result.to_output_path(result.output_bundle_filename)) + for attr, output_type in [ + ("plots", ResultOutputType.Plot), + ("data", ResultOutputType.Data), + ("html", ResultOutputType.HTML), + ]: + register_execution_outputs( + database, + execution, + getattr(cmec_output_bundle, attr), + output_type=output_type, + base_path=output_base_path, + ) + + if result.series_filename: + ingest_series_values( + database=database, + result=result, + execution=execution, + cv=cv, + ) + + ingest_scalar_values( + database=database, + result=result, + execution=execution, + cv=cv, + ) + + +def register_execution_outputs( + database: Database, + execution: Execution, + outputs: "dict[str, OutputDict] | None", + output_type: ResultOutputType, + *, + base_path: pathlib.Path, +) -> None: + """ + Register output entries in the database. + + Each entry in ``outputs`` is resolved relative to ``base_path``. + + Parameters + ---------- + database + The active database session to use + execution + The execution record to associate outputs with + outputs + Mapping of short name to ``OutputDict`` (may be None) + output_type + The type of output being registered + base_path + Base directory for resolving relative filenames + + Notes + ----- + Callers are responsible for transaction boundaries. + """ + for key, output_info in (outputs or {}).items(): + filename = ensure_relative_path(output_info.filename, base_path) + database.session.add( + ExecutionOutput.build( + execution_id=execution.id, + output_type=output_type, + filename=str(filename), + description=output_info.description, + short_name=key, + long_name=output_info.long_name, + dimensions=output_info.dimensions or {}, + ) + ) def handle_execution_result( @@ -240,23 +353,33 @@ def handle_execution_result( execution.output_fragment, result.output_bundle_filename, ) - _handle_output_bundle( + _copy_output_bundle_files( config, - database, execution, result.to_output_path(result.output_bundle_filename), ) - cv = CV.load_from_file(config.paths.dimensions_cv) - if result.series_filename: - # Process the series values if they are present - # This will ingest the series values into the database - _process_execution_series(config=config, database=database, result=result, execution=execution, cv=cv) + _copy_file_to_results( + config.paths.scratch, + config.paths.results, + execution.output_fragment, + result.series_filename, + ) - # Process the scalar values - # This will ingest the scalar values into the database - _process_execution_scalar(database=database, result=result, execution=execution, cv=cv) + # Ingest outputs and metrics into the database via the shared ingestion path + cv = CV.load_from_file(config.paths.dimensions_cv) + try: + with database.session.begin_nested(): + ingest_execution_result( + database, + execution, + result, + cv, + output_base_path=config.paths.scratch / execution.output_fragment, + ) + except Exception: + logger.exception("Something went wrong when ingesting execution result") # TODO: This should check if the result is the most recent for the execution, # if so then update the dirty fields @@ -267,67 +390,21 @@ def handle_execution_result( execution.mark_successful(result.as_relative_path(result.metric_bundle_filename)) -def _handle_output_bundle( +def _copy_output_bundle_files( config: "Config", - database: Database, execution: Execution, cmec_output_bundle_filename: pathlib.Path, ) -> None: - # Extract the registered outputs - # Copy the content to the output directory - # Track in the db + """Copy output bundle referenced files (plots, data, html) from scratch to results.""" cmec_output_bundle = CMECOutput.load_from_json(cmec_output_bundle_filename) - _handle_outputs( - cmec_output_bundle.plots, - output_type=ResultOutputType.Plot, - config=config, - database=database, - execution=execution, - ) - _handle_outputs( - cmec_output_bundle.data, - output_type=ResultOutputType.Data, - config=config, - database=database, - execution=execution, - ) - _handle_outputs( - cmec_output_bundle.html, - output_type=ResultOutputType.HTML, - config=config, - database=database, - execution=execution, - ) - - -def _handle_outputs( - outputs: dict[str, OutputDict] | None, - output_type: ResultOutputType, - config: "Config", - database: Database, - execution: Execution, -) -> None: - outputs = outputs or {} - - for key, output_info in outputs.items(): - filename = ensure_relative_path( - output_info.filename, config.paths.scratch / execution.output_fragment - ) - - _copy_file_to_results( - config.paths.scratch, - config.paths.results, - execution.output_fragment, - filename, - ) - database.session.add( - ExecutionOutput.build( - execution_id=execution.id, - output_type=output_type, - filename=str(filename), - description=output_info.description, - short_name=key, - long_name=output_info.long_name, - dimensions=output_info.dimensions or {}, + scratch_base = config.paths.scratch / execution.output_fragment + + for attr in ("plots", "data", "html"): + for output_info in (getattr(cmec_output_bundle, attr) or {}).values(): + filename = ensure_relative_path(output_info.filename, scratch_base) + _copy_file_to_results( + config.paths.scratch, + config.paths.results, + execution.output_fragment, + filename, ) - ) diff --git a/packages/climate-ref/src/climate_ref/solver.py b/packages/climate-ref/src/climate_ref/solver.py index b92d11dab..f8052843e 100644 --- a/packages/climate-ref/src/climate_ref/solver.py +++ b/packages/climate-ref/src/climate_ref/solver.py @@ -553,7 +553,6 @@ def solve_required_executions( # noqa: PLR0912, PLR0913, PLR0915 total_count = 0 for potential_execution in solver.solve(filters): - # The diagnostic output is first written to the scratch directory definition = potential_execution.build_execution_definition(output_root=config.paths.scratch) logger.debug( @@ -623,6 +622,7 @@ def solve_required_executions( # noqa: PLR0912, PLR0913, PLR0915 logger.info( f"Running new execution for execution group: {potential_execution.execution_slug()!r}" ) + execution = Execution( execution_group=execution_group, dataset_hash=definition.datasets.hash, diff --git a/packages/climate-ref/tests/integration/test_reingest.py b/packages/climate-ref/tests/integration/test_reingest.py new file mode 100644 index 000000000..b0ee862d8 --- /dev/null +++ b/packages/climate-ref/tests/integration/test_reingest.py @@ -0,0 +1,180 @@ +""" +Integration tests for reingest functionality +""" + +import pytest + +from climate_ref.executor.reingest import ( + reconstruct_execution_definition, + reingest_execution, +) +from climate_ref.models import ScalarMetricValue, SeriesMetricValue +from climate_ref.models.dataset import CMIP6Dataset +from climate_ref.models.diagnostic import Diagnostic as DiagnosticModel +from climate_ref.models.execution import Execution, ExecutionGroup, ExecutionOutput, execution_datasets +from climate_ref.models.metric_value import MetricValue +from climate_ref.provider_registry import ProviderRegistry +from climate_ref.solver import solve_required_executions +from climate_ref_core.datasets import SourceDatasetType + + +def test_definition_round_trip(config, db_seeded, provider): + with db_seeded.session.begin(): + datasets = db_seeded.session.query(CMIP6Dataset).limit(2).all() + assert len(datasets) >= 1 + + selector = (("source_id", datasets[0].source_id),) + + diag = db_seeded.session.query(DiagnosticModel).first() + eg = ExecutionGroup( + key="round-trip-test", + diagnostic_id=diag.id, + selectors={SourceDatasetType.CMIP6.value: [list(pair) for pair in selector]}, + ) + db_seeded.session.add(eg) + db_seeded.session.flush() + + ex = Execution( + execution_group_id=eg.id, + successful=True, + output_fragment="test/round-trip/abc", + dataset_hash="h1", + ) + db_seeded.session.add(ex) + db_seeded.session.flush() + + for dataset in datasets: + db_seeded.session.execute( + execution_datasets.insert().values( + execution_id=ex.id, + dataset_id=dataset.id, + ) + ) + + diagnostic = provider.get("mock") + definition = reconstruct_execution_definition(config, ex, diagnostic) + + assert definition.key == "round-trip-test" + assert definition.diagnostic is diagnostic + assert SourceDatasetType.CMIP6 in definition.datasets + + collection = definition.datasets[SourceDatasetType.CMIP6] + + # Dataset IDs should match what was linked + expected_ids = sorted(d.id for d in datasets) + actual_ids = sorted(collection.datasets.index.unique()) + assert expected_ids == actual_ids, f"Dataset IDs: expected {expected_ids}, got {actual_ids}" + + # File paths should be present for all linked datasets + expected_paths = sorted(f.path for d in datasets for f in d.files) + actual_paths = sorted(collection.datasets["path"].tolist()) + assert expected_paths == actual_paths, "File paths not preserved through round-trip" + + # Key facets should survive the round-trip + for facet in ("variable_id", "source_id", "experiment_id"): + assert facet in collection.datasets.columns, f"Missing facet column: {facet}" + + # Selector should match what was stored on the execution group + assert collection.selector == selector + + +def _snapshot_scalars(db, execution): + """Snapshot scalar metrics as a set of (value, dimensions) for comparison.""" + values = db.session.query(ScalarMetricValue).filter_by(execution_id=execution.id).all() + return {(v.value, tuple(sorted(v.dimensions.items()))) for v in values} + + +def _snapshot_series(db, execution): + """Snapshot series metrics as a set of (values_tuple, dimensions) for comparison.""" + values = db.session.query(SeriesMetricValue).filter_by(execution_id=execution.id).all() + return {(tuple(v.values), tuple(sorted(v.dimensions.items()))) for v in values} + + +def _snapshot_outputs(db, execution): + """Snapshot outputs as a set of (short_name, output_type, filename) for comparison.""" + outputs = db.session.query(ExecutionOutput).filter_by(execution_id=execution.id).all() + return {(o.short_name, o.output_type, o.filename) for o in outputs} + + +@pytest.fixture +def _solved_example(config, db_seeded): + """Run the example provider's diagnostics via solve, producing real executions.""" + solve_required_executions(db=db_seeded, config=config, one_per_diagnostic=True) + + # Verify at least one successful execution was produced + successful = db_seeded.session.query(Execution).filter_by(successful=True).all() + assert len(successful) >= 1, "solve_required_executions should produce at least one successful execution" + + # Close any lingering transaction so test methods start clean + if db_seeded.session.in_transaction(): + db_seeded.session.commit() + + +@pytest.fixture +def provider_registry(config, db_seeded): + """Build a ProviderRegistry from the seeded database.""" + return ProviderRegistry.build_from_config(config, db_seeded) + + +@pytest.mark.usefixtures("_solved_example") +class TestReingestAfterSolve: + """End-to-end: solve with example provider, then reingest.""" + + def _get_successful_execution(self, db_seeded): + """Get the first successful execution with metric values.""" + executions = db_seeded.session.query(Execution).filter_by(successful=True).all() + for ex in executions: + if db_seeded.session.query(MetricValue).filter_by(execution_id=ex.id).count() > 0: + return ex + pytest.skip("No successful execution with metric values found") + + def test_reingest_creates_equivalent_execution(self, config, db_seeded, provider_registry): + """Reingest should create a new execution with equivalent metrics and outputs.""" + execution = self._get_successful_execution(db_seeded) + + original_scalars = _snapshot_scalars(db_seeded, execution) + original_series = _snapshot_series(db_seeded, execution) + original_outputs = _snapshot_outputs(db_seeded, execution) + + execution_count_before = db_seeded.session.query(Execution).count() + + if db_seeded.session.in_transaction(): + db_seeded.session.commit() + + with db_seeded.session.begin(): + ok = reingest_execution( + config=config, + database=db_seeded, + execution=execution, + provider_registry=provider_registry, + ) + assert ok is True + + # A new execution should exist + execution_count_after = db_seeded.session.query(Execution).count() + assert execution_count_after == execution_count_before + 1 + + # Find the new execution + eg = execution.execution_group + new_execution = ( + db_seeded.session.query(Execution) + .filter( + Execution.execution_group_id == eg.id, + Execution.id != execution.id, + Execution.successful.is_(True), + ) + .order_by(Execution.id.desc()) + .first() + ) + assert new_execution is not None + + new_scalars = _snapshot_scalars(db_seeded, new_execution) + new_series = _snapshot_series(db_seeded, new_execution) + new_outputs = _snapshot_outputs(db_seeded, new_execution) + + assert original_scalars == new_scalars, "Reingested scalars should match original" + assert original_series == new_series, "Reingested series should match original" + assert original_outputs == new_outputs, "Reingested outputs should match original" + + # Original execution should be untouched + assert _snapshot_scalars(db_seeded, execution) == original_scalars diff --git a/packages/climate-ref/tests/unit/cli/test_executions.py b/packages/climate-ref/tests/unit/cli/test_executions.py index 177be0344..b714f89dc 100644 --- a/packages/climate-ref/tests/unit/cli/test_executions.py +++ b/packages/climate-ref/tests/unit/cli/test_executions.py @@ -1038,3 +1038,119 @@ def test_fail_running_with_confirmation(self, db_with_running, invoke_cli): assert result.exit_code == 0 assert "Successfully marked 2 execution(s) as failed" in result.stdout + + +class TestReingestCLI: + """Tests for the `executions reingest` CLI command.""" + + @pytest.fixture + def db_with_executions(self, db_seeded, config): + """Create a DB with execution groups that have output directories on disk.""" + with db_seeded.session.begin(): + _register_provider(db_seeded, pmp_provider) + _register_provider(db_seeded, esmvaltool_provider) + + diag_pmp = db_seeded.session.query(Diagnostic).filter_by(slug="enso_tel").first() + diag_esm = db_seeded.session.query(Diagnostic).filter_by(slug="enso-characteristics").first() + + eg1 = ExecutionGroup( + key="reingest-1", + diagnostic_id=diag_pmp.id, + selectors={"cmip6": [["source_id", "GFDL-ESM4"]]}, + ) + eg2 = ExecutionGroup( + key="reingest-2", + diagnostic_id=diag_esm.id, + selectors={"cmip6": [["source_id", "ACCESS-ESM1-5"]]}, + ) + db_seeded.session.add_all([eg1, eg2]) + db_seeded.session.flush() + + ex1 = Execution( + execution_group_id=eg1.id, + successful=True, + output_fragment="pmp/enso_tel/out1", + dataset_hash="h1", + ) + ex2 = Execution( + execution_group_id=eg2.id, + successful=False, + output_fragment="esmvaltool/enso-char/out2", + dataset_hash="h2", + ) + db_seeded.session.add_all([ex1, ex2]) + + db_seeded.session.commit() + return db_seeded + + def test_reingest_no_filters_error(self, invoke_cli, db_seeded): + """Calling reingest with no filters should exit with code 1.""" + result = invoke_cli(["executions", "reingest"], expected_exit_code=1) + assert "At least one filter is required" in result.stderr + + def test_reingest_no_results(self, db_with_executions, invoke_cli): + """When filters match nothing, should print a message and exit cleanly.""" + result = invoke_cli(["executions", "reingest", "--provider", "nonexistent", "--force"]) + assert result.exit_code == 0 + assert "No executions found" in result.stdout + + def test_reingest_dry_run(self, db_with_executions, invoke_cli): + """Dry run should show preview without making changes.""" + result = invoke_cli(["executions", "reingest", "--provider", "pmp", "--dry-run", "--force"]) + assert result.exit_code == 0 + assert "Dry run" in result.stdout + assert "enso_tel" in result.stdout + + def test_reingest_cancellation(self, db_with_executions, invoke_cli): + """User declining confirmation should cancel reingest.""" + with patch("climate_ref.cli.executions.typer.confirm", return_value=False): + result = invoke_cli(["executions", "reingest", "--provider", "pmp"]) + assert result.exit_code == 0 + assert "Reingest cancelled" in result.stdout + + def test_reingest_force_runs(self, db_with_executions, invoke_cli, config): + """Force mode should skip confirmation. Even if reingest_execution returns False + (no output dirs), we exercise the CLI loop and get skip counts.""" + result = invoke_cli(["executions", "reingest", "--provider", "pmp", "--force"]) + assert result.exit_code == 0 + assert "Reingest complete" in result.stdout + # Output dir doesn't exist so reingest_execution returns False -> skipped + assert "skipped" in result.stdout + + def test_reingest_by_group_ids(self, db_with_executions, invoke_cli): + """Passing group IDs directly should work.""" + eg = db_with_executions.session.query(ExecutionGroup).filter_by(key="reingest-1").first() + result = invoke_cli(["executions", "reingest", str(eg.id), "--dry-run"]) + assert result.exit_code == 0 + assert "Dry run" in result.stdout + assert "reingest-1" in result.stdout + + def test_reingest_include_failed(self, db_with_executions, invoke_cli): + """--include-failed should include failed executions in preview.""" + result = invoke_cli( + [ + "executions", + "reingest", + "--provider", + "esmvaltool", + "--include-failed", + "--dry-run", + ] + ) + assert result.exit_code == 0 + assert "enso-characteristics" in result.stdout + + def test_reingest_success_path(self, db_with_executions, invoke_cli, config): + """When reingest_execution succeeds, success_count should increment.""" + # Create output directory so reingest_execution can find it + eg = db_with_executions.session.query(ExecutionGroup).filter_by(key="reingest-1").first() + ex = eg.executions[0] + results_dir = config.paths.results / ex.output_fragment + results_dir.mkdir(parents=True, exist_ok=True) + + # Mock reingest_execution to return True (success) + with patch("climate_ref.executor.reingest.reingest_execution", return_value=True): + result = invoke_cli(["executions", "reingest", str(eg.id), "--force"]) + assert result.exit_code == 0 + assert "1 succeeded" in result.stdout + assert "0 skipped" in result.stdout diff --git a/packages/climate-ref/tests/unit/executor/test_reingest.py b/packages/climate-ref/tests/unit/executor/test_reingest.py new file mode 100644 index 000000000..e01cc0572 --- /dev/null +++ b/packages/climate-ref/tests/unit/executor/test_reingest.py @@ -0,0 +1,1399 @@ +"""Tests for the reingest module and allocate_output_fragment helper.""" + +import datetime +import json +import pathlib +from unittest.mock import patch + +import pytest +from climate_ref_esmvaltool import provider as esmvaltool_provider +from climate_ref_pmp import provider as pmp_provider + +from climate_ref.executor.fragment import allocate_output_fragment +from climate_ref.executor.reingest import ( + _extract_dataset_attributes, + get_executions_for_reingest, + reconstruct_execution_definition, + reingest_execution, +) +from climate_ref.executor.result_handling import ( + ingest_execution_result, + ingest_scalar_values, + ingest_series_values, + register_execution_outputs, +) +from climate_ref.models import ScalarMetricValue, SeriesMetricValue +from climate_ref.models.dataset import CMIP6Dataset +from climate_ref.models.diagnostic import Diagnostic as DiagnosticModel +from climate_ref.models.execution import ( + Execution, + ExecutionGroup, + ExecutionOutput, + ResultOutputType, + execution_datasets, +) +from climate_ref.models.provider import Provider as ProviderModel +from climate_ref.provider_registry import ProviderRegistry, _register_provider +from climate_ref_core.datasets import SourceDatasetType +from climate_ref_core.diagnostics import ExecutionResult +from climate_ref_core.metric_values import SeriesMetricValue as TSeries +from climate_ref_core.pycmec.controlled_vocabulary import CV +from climate_ref_core.pycmec.metric import CMECMetric +from climate_ref_core.pycmec.output import CMECOutput + + +@pytest.fixture +def reingest_db(db, config): + """Set up a database with an execution group, execution, and result files on disk.""" + with db.session.begin(): + provider_model = ProviderModel(name="mock_provider", slug="mock_provider", version="v0.1.0") + db.session.add(provider_model) + db.session.flush() + + diag_model = DiagnosticModel( + name="mock", + slug="mock", + provider_id=provider_model.id, + ) + db.session.add(diag_model) + db.session.flush() + + eg = ExecutionGroup( + key="test-key", + diagnostic_id=diag_model.id, + selectors={"cmip6": [["source_id", "ACCESS-ESM1-5"], ["variable_id", "tas"]]}, + dirty=False, + ) + db.session.add(eg) + db.session.flush() + + execution = Execution( + execution_group_id=eg.id, + successful=True, + output_fragment="mock_provider/mock/abc123", + dataset_hash="hash1", + ) + db.session.add(execution) + db.session.flush() + + return db + + +@pytest.fixture +def reingest_execution_obj(reingest_db): + """Get the execution object from the reingest_db fixture.""" + return reingest_db.session.query(Execution).one() + + +@pytest.fixture +def mock_provider_registry(provider): + """Create a mock ProviderRegistry that returns the test provider.""" + return ProviderRegistry(providers=[provider]) + + +SAMPLE_SERIES = [ + TSeries( + dimensions={"source_id": "test-model"}, + values=[1.0, 2.0, 3.0], + index=[0, 1, 2], + index_name="time", + attributes={"units": "K"}, + ) +] + + +def _create_scratch_dir(config, execution): + """Create and return the scratch directory for an execution.""" + scratch_dir = config.paths.scratch / execution.output_fragment + scratch_dir.mkdir(parents=True, exist_ok=True) + return scratch_dir + + +@pytest.fixture +def scratch_dir_with_results(config, reingest_execution_obj): + """Create a scratch directory with empty CMEC template files (simulates raw diagnostic output).""" + scratch_dir = _create_scratch_dir(config, reingest_execution_obj) + + CMECMetric(**CMECMetric.create_template()).dump_to_json(scratch_dir / "diagnostic.json") + CMECOutput(**CMECOutput.create_template()).dump_to_json(scratch_dir / "output.json") + TSeries.dump_to_json(scratch_dir / "series.json", SAMPLE_SERIES) + + return scratch_dir + + +@pytest.fixture +def scratch_dir_with_data(config, reingest_execution_obj): + """Create a scratch directory with CMEC files containing actual metric/output data.""" + scratch_dir = _create_scratch_dir(config, reingest_execution_obj) + + (scratch_dir / "diagnostic.json").write_text( + json.dumps( + { + "DIMENSIONS": { + "json_structure": ["source_id", "metric"], + "source_id": {"test-model": {}}, + "metric": {"rmse": {}}, + }, + "RESULTS": {"test-model": {"rmse": 42.0}}, + } + ) + ) + + (scratch_dir / "plot.png").write_bytes(b"fake png") + (scratch_dir / "output.json").write_text( + json.dumps( + { + "index": "index.html", + "provenance": {"environment": {}, "modeldata": [], "obsdata": {}, "log": "cmec_output.log"}, + "data": {}, + "plots": {"test_plot": {"filename": "plot.png", "long_name": "Test Plot", "description": ""}}, + "html": {}, + "metrics": None, + "diagnostics": {}, + } + ) + ) + + TSeries.dump_to_json(scratch_dir / "series.json", SAMPLE_SERIES) + + return scratch_dir + + +@pytest.fixture +def mock_result_factory(mocker): + """Factory to create mock ExecutionResult objects with sensible defaults. + + Accepts an output_dir and optional overrides for output_bundle_filename + and series_filename. + """ + + def _create( + output_dir, + *, + output_bundle_filename=pathlib.Path("output.json"), + series_filename=pathlib.Path("series.json"), + ): + mock_result = mocker.Mock(spec=ExecutionResult) + mock_result.successful = True + mock_result.metric_bundle_filename = pathlib.Path("diagnostic.json") + mock_result.output_bundle_filename = output_bundle_filename + mock_result.series_filename = series_filename + mock_result.retryable = False + mock_result.to_output_path = lambda f: output_dir / f if f else output_dir + mock_result.as_relative_path = pathlib.Path + return mock_result + + return _create + + +def _patch_build_result(mocker, registry, mock_result): + """Patch build_execution_result on the mock diagnostic to return mock_result.""" + diagnostic = registry.get_metric("mock_provider", "mock") + mocker.patch.object(diagnostic, "build_execution_result", return_value=mock_result) + return diagnostic + + +# --- allocate_output_fragment tests --- + + +class TestAllocateOutputFragment: + def test_appends_timestamp_suffix(self, tmp_path): + """Should append a UTC timestamp suffix to the base fragment.""" + result = allocate_output_fragment("provider/diag/abc123", tmp_path) + assert result.startswith("provider/diag/abc123_") + # Suffix should be a valid timestamp: YYYYMMDDTHHMMSS followed by 6 microsecond digits + suffix = result.split("_", 1)[1] + assert len(suffix) == 21 # 8 date + T + 6 time + 6 microseconds + assert "T" in suffix + + def test_preserves_base_fragment(self, tmp_path): + """The original fragment should be a prefix of the result.""" + base = "my_provider/my_diag/hash123" + result = allocate_output_fragment(base, tmp_path) + assert result.startswith(base + "_") + + def test_different_calls_produce_different_fragments(self, tmp_path): + """Two rapid calls should produce different fragments (microsecond resolution).""" + result1 = allocate_output_fragment("provider/diag/abc123", tmp_path) + result2 = allocate_output_fragment("provider/diag/abc123", tmp_path) + assert result1 != result2 + + def test_raises_if_directory_already_exists(self, tmp_path): + """Should raise FileExistsError when the target directory already exists.""" + fixed_time = datetime.datetime(2026, 1, 1, 12, 0, 0, 0, tzinfo=datetime.timezone.utc) + with patch("climate_ref.executor.fragment.datetime") as mock_dt: + mock_dt.datetime.now.return_value = fixed_time + mock_dt.timezone = datetime.timezone + # First call succeeds + fragment = allocate_output_fragment("provider/diag/abc123", tmp_path) + # Create the directory so a second call with the same timestamp collides + (tmp_path / fragment).mkdir(parents=True) + with pytest.raises(FileExistsError, match="Output directory already exists"): + allocate_output_fragment("provider/diag/abc123", tmp_path) + + +# --- extract dataset attributes tests --- + + +class TestExtractDatasetAttributes: + def test_extracts_cmip6_attributes(self, db_seeded): + dataset = db_seeded.session.query(CMIP6Dataset).first() + attrs = _extract_dataset_attributes(dataset) + + assert "variable_id" in attrs + assert "source_id" in attrs + assert "instance_id" in attrs + assert "id" not in attrs + assert "dataset_type" not in attrs + + +# --- reconstruct_execution_definition tests --- + + +class TestReconstructExecutionDefinition: + def test_reconstruct_basic(self, config, reingest_db, reingest_execution_obj, provider): + """Test that a basic execution definition can be reconstructed.""" + diagnostic = provider.get("mock") + definition = reconstruct_execution_definition(config, reingest_execution_obj, diagnostic) + + assert definition.key == "test-key" + assert definition.diagnostic is diagnostic + # Definition points at scratch (not results) so build_execution_result + # writes to a safe location. + assert definition.output_directory == config.paths.scratch / "mock_provider/mock/abc123" + + def test_reconstruct_output_directory_under_scratch( + self, config, reingest_db, reingest_execution_obj, provider + ): + """Output directory should be under scratch for safe re-extraction.""" + diagnostic = provider.get("mock") + definition = reconstruct_execution_definition(config, reingest_execution_obj, diagnostic) + + assert str(definition.output_directory).startswith(str(config.paths.scratch)) + + +# --- reingest_execution tests --- + + +class TestReingestExecution: + def test_reingest_missing_output_dir( + self, config, reingest_db, reingest_execution_obj, mock_provider_registry + ): + """Should return False when scratch directory doesn't exist.""" + result = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + assert result is False + + def test_reingest_unresolvable_diagnostic(self, config, reingest_db, reingest_execution_obj): + """Should return False when provider registry can't resolve diagnostic.""" + empty_registry = ProviderRegistry(providers=[]) + result = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=empty_registry, + ) + assert result is False + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_reingest_creates_new_execution( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """Reingest should always create a new Execution record.""" + original_id = reingest_execution_obj.id + original_count = reingest_db.session.query(Execution).count() + + mock_result = mock_result_factory(scratch_dir_with_results) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is True + new_count = reingest_db.session.query(Execution).count() + assert new_count == original_count + 1 + + # Original execution should be untouched + original = reingest_db.session.get(Execution, original_id) + assert original is not None + assert original.successful is True + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_reingest_creates_unique_fragment( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """New execution should have a different output_fragment from the original.""" + mock_result = mock_result_factory(scratch_dir_with_results) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is True + + all_executions = reingest_db.session.query(Execution).all() + fragments = [e.output_fragment for e in all_executions] + assert len(set(fragments)) == len(fragments), f"Expected unique fragments, got: {fragments}" + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_reingest_twice_creates_distinct_fragments( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """Running reingest twice should create distinct output fragments.""" + mock_result = mock_result_factory(scratch_dir_with_results) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + # Mock datetime.datetime.now to return different timestamps for each call + t1 = datetime.datetime(2026, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) + t2 = datetime.datetime(2026, 1, 1, 12, 0, 1, tzinfo=datetime.timezone.utc) + mocker.patch( + "climate_ref.executor.fragment.datetime.datetime", + **{"now.side_effect": [t1, t2]}, + ) + + ok1 = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok1 is True + + reingest_db.session.refresh(reingest_execution_obj) + ok2 = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok2 is True + + # Should have 3 executions total: original + 2 reingested + all_executions = reingest_db.session.query(Execution).all() + assert len(all_executions) == 3 + + fragments = [e.output_fragment for e in all_executions] + assert len(set(fragments)) == 3, f"Expected unique fragments, got: {fragments}" + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_reingest_copies_results_to_new_directory( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """Reingest should copy scratch tree to a new results directory.""" + mock_result = mock_result_factory(scratch_dir_with_results) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is True + + new_execution = ( + reingest_db.session.query(Execution).filter(Execution.id != reingest_execution_obj.id).one() + ) + + results_dir = config.paths.results / new_execution.output_fragment + assert results_dir.exists(), "Results should be copied to new directory" + assert (results_dir / "diagnostic.json").exists() + + def test_reingest_build_execution_result_failure( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mocker, + ): + """Should return False and skip when build_execution_result raises.""" + mock_diagnostic = mock_provider_registry.get_metric("mock_provider", "mock") + mocker.patch.object( + mock_diagnostic, + "build_execution_result", + side_effect=RuntimeError("Extraction failed"), + ) + + result = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + assert result is False + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_reingest_does_not_touch_dirty_flag( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """The dirty flag should remain unchanged after reingest.""" + eg = reingest_execution_obj.execution_group + eg.dirty = True + reingest_db.session.commit() + + mock_result = mock_result_factory( + scratch_dir_with_results, output_bundle_filename=None, series_filename=None + ) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + reingest_db.session.refresh(eg) + assert eg.dirty is True + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_reingest_preserves_original_execution( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """Original execution should remain untouched after reingest.""" + execution = reingest_execution_obj + + reingest_db.session.add( + ScalarMetricValue( + execution_id=execution.id, + value=99.0, + attributes={"original": True}, + ) + ) + reingest_db.session.commit() + + original_count = ( + reingest_db.session.query(ScalarMetricValue).filter_by(execution_id=execution.id).count() + ) + + mock_result = mock_result_factory(scratch_dir_with_results) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=execution, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is True + + # Original execution's values should be unchanged + preserved_count = ( + reingest_db.session.query(ScalarMetricValue).filter_by(execution_id=execution.id).count() + ) + assert preserved_count == original_count, "Original execution values should be untouched" + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_reingest_ingestion_failure_rolls_back( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """If ingestion fails, no new execution should be created and results dir cleaned up.""" + original_count = reingest_db.session.query(Execution).count() + + mock_result = mock_result_factory( + scratch_dir_with_results, output_bundle_filename=None, series_filename=None + ) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + # Make the scalar ingestion fail by corrupting the metric bundle + (scratch_dir_with_results / "diagnostic.json").write_text("not valid json") + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is False + + # No new execution should have been created + new_count = reingest_db.session.query(Execution).count() + assert new_count == original_count, "Failed reingest should not create new execution" + + def test_scratch_directory_preserved_after_success( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """Scratch directory should be preserved after reingest (contains raw outputs).""" + mock_result = mock_result_factory( + scratch_dir_with_results, output_bundle_filename=None, series_filename=None + ) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + scratch_dir = config.paths.scratch / reingest_execution_obj.output_fragment + + reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + + assert scratch_dir.exists(), "Scratch directory should be preserved after reingest" + + def test_scratch_directory_preserved_after_failure( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mocker, + ): + """Scratch directory should be preserved even when reingest fails.""" + mock_diagnostic = mock_provider_registry.get_metric("mock_provider", "mock") + mocker.patch.object( + mock_diagnostic, + "build_execution_result", + side_effect=RuntimeError("Extraction failed"), + ) + + scratch_dir = config.paths.scratch / reingest_execution_obj.output_fragment + + result = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + + assert result is False + assert scratch_dir.exists(), "Scratch directory should be preserved after failure" + + +# --- register_execution_outputs tests --- + + +class TestRegisterExecutionOutputs: + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_registers_outputs_in_db( + self, config, reingest_db, reingest_execution_obj, scratch_dir_with_data + ): + """Should register output entries from the bundle into the database.""" + execution = reingest_execution_obj + bundle_path = scratch_dir_with_data / "output.json" + cmec_output_bundle = CMECOutput.load_from_json(bundle_path) + + register_execution_outputs( + reingest_db, + execution, + cmec_output_bundle.plots, + output_type=ResultOutputType.Plot, + base_path=scratch_dir_with_data, + ) + reingest_db.session.commit() + + outputs = reingest_db.session.query(ExecutionOutput).filter_by(execution_id=execution.id).all() + assert len(outputs) >= 1 + assert any(o.short_name == "test_plot" for o in outputs) + + +# --- ingest metrics tests --- + + +class TestIngestMetrics: + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_ingest_scalar_values( + self, config, reingest_db, reingest_execution_obj, scratch_dir_with_data, mock_result_factory + ): + """Should ingest scalar metric values from a real CMEC bundle.""" + mock_result = mock_result_factory(scratch_dir_with_data) + cv = CV.load_from_file(config.paths.dimensions_cv) + + ingest_scalar_values( + database=reingest_db, result=mock_result, execution=reingest_execution_obj, cv=cv + ) + reingest_db.session.commit() + + scalars = ( + reingest_db.session.query(ScalarMetricValue) + .filter_by(execution_id=reingest_execution_obj.id) + .all() + ) + assert len(scalars) >= 1 + assert scalars[0].value == 42.0 + + +class TestIngestMetricsWithSeries: + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_ingest_series_values( + self, config, reingest_db, reingest_execution_obj, scratch_dir_with_data, mock_result_factory + ): + """Should ingest series metric values from a real series file.""" + mock_result = mock_result_factory(scratch_dir_with_data) + cv = CV.load_from_file(config.paths.dimensions_cv) + + ingest_series_values( + database=reingest_db, result=mock_result, execution=reingest_execution_obj, cv=cv + ) + reingest_db.session.commit() + + series = ( + reingest_db.session.query(SeriesMetricValue) + .filter_by(execution_id=reingest_execution_obj.id) + .all() + ) + assert len(series) >= 1 + + +# --- reconstruct with datasets tests --- + + +class TestReconstructEmptyDataset: + def test_reconstruct_dataset_with_no_files(self, config, db_seeded, provider): + """Datasets with no files should produce an empty DataFrame.""" + with db_seeded.session.begin(): + diag = db_seeded.session.query(DiagnosticModel).first() + eg = ExecutionGroup( + key="empty-files", + diagnostic_id=diag.id, + selectors={"cmip6": [["source_id", "NONEXISTENT"]]}, + ) + db_seeded.session.add(eg) + db_seeded.session.flush() + + # Link a dataset but it has no files + dataset = db_seeded.session.query(CMIP6Dataset).first() + ex = Execution( + execution_group_id=eg.id, + successful=True, + output_fragment="test/empty-files/abc", + dataset_hash="h1", + ) + db_seeded.session.add(ex) + db_seeded.session.flush() + + if dataset: + db_seeded.session.execute( + execution_datasets.insert().values( + execution_id=ex.id, + dataset_id=dataset.id, + ) + ) + + diagnostic = provider.get("mock") + definition = reconstruct_execution_definition(config, ex, diagnostic) + assert definition.key == "empty-files" + + +class TestReconstructWithDatasets: + def test_reconstruct_with_linked_datasets(self, config, db_seeded, provider): + """reconstruct_execution_definition should build dataset collections from linked datasets.""" + with db_seeded.session.begin(): + diag = db_seeded.session.query(DiagnosticModel).first() + eg = ExecutionGroup( + key="recon-test", + diagnostic_id=diag.id, + selectors={"cmip6": [["source_id", "ACCESS-ESM1-5"]]}, + ) + db_seeded.session.add(eg) + db_seeded.session.flush() + + ex = Execution( + execution_group_id=eg.id, + successful=True, + output_fragment="test/recon/abc", + dataset_hash="h1", + ) + db_seeded.session.add(ex) + db_seeded.session.flush() + + # Link a dataset to the execution + dataset = db_seeded.session.query(CMIP6Dataset).first() + if dataset: + db_seeded.session.execute( + execution_datasets.insert().values( + execution_id=ex.id, + dataset_id=dataset.id, + ) + ) + + diagnostic = provider.get("mock") + definition = reconstruct_execution_definition(config, ex, diagnostic) + + assert definition.key == "recon-test" + if dataset: + assert SourceDatasetType.CMIP6 in definition.datasets + + +# --- unsuccessful result tests --- + + +class TestReingestUnsuccessfulResult: + def test_reingest_unsuccessful_build_result( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mocker, + ): + """Should return False when build_execution_result returns unsuccessful.""" + mock_diagnostic = mock_provider_registry.get_metric("mock_provider", "mock") + mock_result = mocker.Mock(spec=ExecutionResult) + mock_result.successful = False + mock_result.metric_bundle_filename = None + mocker.patch.object(mock_diagnostic, "build_execution_result", return_value=mock_result) + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + assert ok is False + + def test_reingest_successful_but_no_metric_bundle( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mocker, + ): + """Should return False when result is successful but metric_bundle_filename is None.""" + mock_diagnostic = mock_provider_registry.get_metric("mock_provider", "mock") + mock_result = mocker.Mock(spec=ExecutionResult) + mock_result.successful = True + mock_result.metric_bundle_filename = None + mocker.patch.object(mock_diagnostic, "build_execution_result", return_value=mock_result) + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + assert ok is False + + +# --- get_executions_for_reingest tests --- + + +class TestGetExecutionsForReingest: + @pytest.fixture(autouse=True) + def _register_providers(self, db_seeded): + """Register providers once for all tests in this class.""" + with db_seeded.session.begin(): + _register_provider(db_seeded, pmp_provider) + _register_provider(db_seeded, esmvaltool_provider) + + def test_filters_by_success(self, db_seeded): + """By default should only return successful executions.""" + with db_seeded.session.begin(): + diag = db_seeded.session.query(DiagnosticModel).first() + eg = ExecutionGroup(key="test-filter", diagnostic_id=diag.id, selectors={}) + db_seeded.session.add(eg) + db_seeded.session.flush() + + db_seeded.session.add( + Execution( + execution_group_id=eg.id, + successful=True, + output_fragment="out-s", + dataset_hash="h1", + ) + ) + + results = get_executions_for_reingest(db_seeded, execution_group_ids=[eg.id], include_failed=False) + assert len(results) == 1 + assert results[0][1].successful is True + + def test_include_failed(self, db_seeded): + """With include_failed=True, should also return failed executions.""" + with db_seeded.session.begin(): + diag = db_seeded.session.query(DiagnosticModel).first() + eg = ExecutionGroup(key="test-failed", diagnostic_id=diag.id, selectors={}) + db_seeded.session.add(eg) + db_seeded.session.flush() + + db_seeded.session.add( + Execution( + execution_group_id=eg.id, + successful=False, + output_fragment="out-f", + dataset_hash="h2", + ) + ) + + results = get_executions_for_reingest(db_seeded, execution_group_ids=[eg.id], include_failed=True) + assert len(results) >= 1 + + def test_filters_by_group_ids(self, db_seeded): + """Should only return executions for specified group IDs.""" + with db_seeded.session.begin(): + diag = db_seeded.session.query(DiagnosticModel).first() + eg1 = ExecutionGroup(key="group-a", diagnostic_id=diag.id, selectors={}) + eg2 = ExecutionGroup(key="group-b", diagnostic_id=diag.id, selectors={}) + db_seeded.session.add_all([eg1, eg2]) + db_seeded.session.flush() + + db_seeded.session.add( + Execution( + execution_group_id=eg1.id, + successful=True, + output_fragment="out-a", + dataset_hash="ha", + ) + ) + db_seeded.session.add( + Execution( + execution_group_id=eg2.id, + successful=True, + output_fragment="out-b", + dataset_hash="hb", + ) + ) + + results = get_executions_for_reingest(db_seeded, execution_group_ids=[eg1.id]) + assert len(results) == 1 + assert results[0][0].id == eg1.id + + def test_filters_by_provider(self, db_seeded): + """Should filter executions by provider slug.""" + with db_seeded.session.begin(): + diag_pmp = db_seeded.session.query(DiagnosticModel).filter_by(slug="enso_tel").first() + diag_esm = db_seeded.session.query(DiagnosticModel).filter_by(slug="enso-characteristics").first() + + eg1 = ExecutionGroup(key="prov-pmp", diagnostic_id=diag_pmp.id, selectors={}) + eg2 = ExecutionGroup(key="prov-esm", diagnostic_id=diag_esm.id, selectors={}) + db_seeded.session.add_all([eg1, eg2]) + db_seeded.session.flush() + + db_seeded.session.add( + Execution( + execution_group_id=eg1.id, + successful=True, + output_fragment="out-pmp", + dataset_hash="hp", + ) + ) + db_seeded.session.add( + Execution( + execution_group_id=eg2.id, + successful=True, + output_fragment="out-esm", + dataset_hash="he", + ) + ) + + results = get_executions_for_reingest(db_seeded, provider_filters=["pmp"]) + provider_slugs = {eg.diagnostic.provider.slug for eg, _ in results} + assert "pmp" in provider_slugs + assert "esmvaltool" not in provider_slugs + + def test_filters_by_diagnostic(self, db_seeded): + """Should filter executions by diagnostic slug.""" + with db_seeded.session.begin(): + diag_pmp = db_seeded.session.query(DiagnosticModel).filter_by(slug="enso_tel").first() + diag_esm = db_seeded.session.query(DiagnosticModel).filter_by(slug="enso-characteristics").first() + + eg1 = ExecutionGroup(key="diag-pmp", diagnostic_id=diag_pmp.id, selectors={}) + eg2 = ExecutionGroup(key="diag-esm", diagnostic_id=diag_esm.id, selectors={}) + db_seeded.session.add_all([eg1, eg2]) + db_seeded.session.flush() + + db_seeded.session.add( + Execution( + execution_group_id=eg1.id, + successful=True, + output_fragment="out-d-pmp", + dataset_hash="hdp", + ) + ) + db_seeded.session.add( + Execution( + execution_group_id=eg2.id, + successful=True, + output_fragment="out-d-esm", + dataset_hash="hde", + ) + ) + + results = get_executions_for_reingest(db_seeded, diagnostic_filters=["enso_tel"]) + diag_slugs = {eg.diagnostic.slug for eg, _ in results} + assert "enso_tel" in diag_slugs + assert "enso-characteristics" not in diag_slugs + + def test_filters_out_none_executions(self, db_seeded): + """Should filter out execution groups that have no executions.""" + with db_seeded.session.begin(): + diag = db_seeded.session.query(DiagnosticModel).first() + eg = ExecutionGroup(key="no-exec", diagnostic_id=diag.id, selectors={}) + db_seeded.session.add(eg) + + results = get_executions_for_reingest(db_seeded, execution_group_ids=[eg.id]) + assert len(results) == 0 + + def test_selects_oldest_execution(self, db_seeded): + """Should return the oldest (original) execution, not the latest. + + Reingested executions only have results directories, not scratch. + Selecting the oldest ensures we always reingest from the execution + whose scratch directory actually exists. + """ + with db_seeded.session.begin(): + diag = db_seeded.session.query(DiagnosticModel).first() + eg = ExecutionGroup(key="multi-exec", diagnostic_id=diag.id, selectors={}) + db_seeded.session.add(eg) + db_seeded.session.flush() + + original = Execution( + execution_group_id=eg.id, + successful=True, + output_fragment="original_fragment", + dataset_hash="h-orig", + ) + db_seeded.session.add(original) + db_seeded.session.flush() + + reingested = Execution( + execution_group_id=eg.id, + successful=True, + output_fragment="original_fragment_20260405T120000000000", + dataset_hash="h-orig", + ) + db_seeded.session.add(reingested) + + results = get_executions_for_reingest(db_seeded, execution_group_ids=[eg.id]) + assert len(results) == 1 + _, selected_execution = results[0] + assert selected_execution.output_fragment == "original_fragment" + assert selected_execution.id == original.id + + +# --- equivalence tests --- + + +def _snapshot_scalars(db, execution): + """Snapshot scalar metrics as a set of (value, dimensions) for comparison.""" + values = db.session.query(ScalarMetricValue).filter_by(execution_id=execution.id).all() + return {(v.value, tuple(sorted(v.dimensions.items()))) for v in values} + + +def _snapshot_series(db, execution): + """Snapshot series metrics as a set of (values_tuple, dimensions) for comparison.""" + values = db.session.query(SeriesMetricValue).filter_by(execution_id=execution.id).all() + return {(tuple(v.values), tuple(sorted(v.dimensions.items()))) for v in values} + + +def _snapshot_outputs(db, execution): + """Snapshot outputs as a set of (short_name, output_type, filename) for comparison.""" + outputs = db.session.query(ExecutionOutput).filter_by(execution_id=execution.id).all() + return {(o.short_name, o.output_type, o.filename) for o in outputs} + + +class TestReingestEquivalence: + """Reingest should produce the same DB state as fresh ingestion.""" + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_reingest_matches_original( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_data, + mock_result_factory, + mocker, + ): + """Reingest should produce equivalent metrics and outputs to the original.""" + execution = reingest_execution_obj + mock_result = mock_result_factory(scratch_dir_with_data) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + # Original ingestion via the shared path + cv = CV.load_from_file(config.paths.dimensions_cv) + ingest_execution_result( + reingest_db, + execution, + mock_result, + cv, + output_base_path=config.paths.scratch / execution.output_fragment, + ) + execution.mark_successful(mock_result.as_relative_path(mock_result.metric_bundle_filename)) + reingest_db.session.commit() + + # Snapshot original DB state + original_scalars = _snapshot_scalars(reingest_db, execution) + original_series = _snapshot_series(reingest_db, execution) + original_outputs = _snapshot_outputs(reingest_db, execution) + + assert original_scalars, "Original ingestion should produce scalar values" + + # Reingest creates a new execution from the same data + ok = reingest_execution( + config=config, + database=reingest_db, + execution=execution, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is True + + # Find the new execution created by reingest + new_execution = reingest_db.session.query(Execution).filter(Execution.id != execution.id).one() + + # Snapshot reingest DB state + reingest_scalars = _snapshot_scalars(reingest_db, new_execution) + reingest_series = _snapshot_series(reingest_db, new_execution) + reingest_outputs = _snapshot_outputs(reingest_db, new_execution) + + # Both paths go through ingest_execution_result, so results must match + assert original_scalars == reingest_scalars, ( + f"Scalar values differ: original={original_scalars}, reingest={reingest_scalars}" + ) + assert original_series == reingest_series, ( + f"Series values differ: original={original_series}, reingest={reingest_series}" + ) + assert original_outputs == reingest_outputs, ( + f"Output entries differ: original={original_outputs}, reingest={reingest_outputs}" + ) + + +class TestReingestDatasetLinks: + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_reingest_copies_dataset_links( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """New execution should have the same dataset links as the original.""" + execution = reingest_execution_obj + + # Create a dataset to link to the execution + dataset = CMIP6Dataset( + slug="test-dataset.tas.gn", + instance_id="CMIP6.test.tas", + variable_id="tas", + source_id="ACCESS-ESM1-5", + experiment_id="historical", + table_id="Amon", + grid_label="gn", + member_id="r1i1p1f1", + variant_label="r1i1p1f1", + version="v20200101", + activity_id="CMIP", + institution_id="CSIRO", + ) + reingest_db.session.add(dataset) + reingest_db.session.flush() + + reingest_db.session.execute( + execution_datasets.insert().values( + execution_id=execution.id, + dataset_id=dataset.id, + ) + ) + reingest_db.session.commit() + + original_dataset_ids = sorted(d.id for d in execution.datasets) + assert len(original_dataset_ids) >= 1 + + mock_result = mock_result_factory( + scratch_dir_with_results, output_bundle_filename=None, series_filename=None + ) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=execution, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is True + + new_execution = reingest_db.session.query(Execution).filter(Execution.id != execution.id).one() + new_dataset_ids = sorted(d.id for d in new_execution.datasets) + assert original_dataset_ids == new_dataset_ids, ( + f"Dataset links should be copied: original={original_dataset_ids}, new={new_dataset_ids}" + ) + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_reingest_with_no_datasets( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """Reingest should succeed even when original execution has no dataset links.""" + assert len(reingest_execution_obj.datasets) == 0 + + mock_result = mock_result_factory( + scratch_dir_with_results, output_bundle_filename=None, series_filename=None + ) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is True + + new_execution = ( + reingest_db.session.query(Execution).filter(Execution.id != reingest_execution_obj.id).one() + ) + assert len(new_execution.datasets) == 0 + + +class TestReingestExecutionState: + """Verify the state of the new execution record after reingest.""" + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_new_execution_marked_successful_with_correct_path( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """New execution should be marked successful with the correct metric bundle path.""" + mock_result = mock_result_factory( + scratch_dir_with_results, output_bundle_filename=None, series_filename=None + ) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is True + + new_execution = ( + reingest_db.session.query(Execution).filter(Execution.id != reingest_execution_obj.id).one() + ) + assert new_execution.successful is True + assert new_execution.path is not None + assert "diagnostic.json" in new_execution.path + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_new_execution_belongs_to_same_group( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """New execution should belong to the same execution group as the original.""" + mock_result = mock_result_factory( + scratch_dir_with_results, output_bundle_filename=None, series_filename=None + ) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is True + + new_execution = ( + reingest_db.session.query(Execution).filter(Execution.id != reingest_execution_obj.id).one() + ) + assert new_execution.execution_group_id == reingest_execution_obj.execution_group_id + assert new_execution.dataset_hash == reingest_execution_obj.dataset_hash + + +class TestReingestFailureCleanup: + """Verify that failed reingest cleans up the results directory.""" + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_results_dir_cleaned_on_ingestion_failure( + self, + config, + reingest_db, + reingest_execution_obj, + mock_provider_registry, + scratch_dir_with_results, + mock_result_factory, + mocker, + ): + """If ingestion fails, the copied results directory should be removed.""" + mock_result = mock_result_factory( + scratch_dir_with_results, output_bundle_filename=None, series_filename=None + ) + _patch_build_result(mocker, mock_provider_registry, mock_result) + + # Corrupt the metric bundle so ingestion fails + (scratch_dir_with_results / "diagnostic.json").write_text("not valid json") + + ok = reingest_execution( + config=config, + database=reingest_db, + execution=reingest_execution_obj, + provider_registry=mock_provider_registry, + ) + reingest_db.session.commit() + assert ok is False + + # No versioned results directory should remain + # The original fragment dir may or may not exist, but no _v2 dir should exist + versioned_dir = config.paths.results / (reingest_execution_obj.output_fragment + "_v2") + assert not versioned_dir.exists(), "Failed reingest should clean up results directory" + + +# --- ingest_execution_result standalone tests --- + + +class TestIngestExecutionResultStandalone: + """Test ingest_execution_result with the simplified signature.""" + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_ingest_with_all_outputs( + self, config, reingest_db, reingest_execution_obj, scratch_dir_with_data, mock_result_factory + ): + """Should ingest scalars, series, and register outputs in one call.""" + mock_result = mock_result_factory(scratch_dir_with_data) + cv = CV.load_from_file(config.paths.dimensions_cv) + + ingest_execution_result( + reingest_db, + reingest_execution_obj, + mock_result, + cv, + output_base_path=scratch_dir_with_data, + ) + reingest_db.session.commit() + + execution_id = reingest_execution_obj.id + + scalars = reingest_db.session.query(ScalarMetricValue).filter_by(execution_id=execution_id).all() + assert len(scalars) >= 1, "Should have ingested scalar values" + assert scalars[0].value == 42.0 + + series = reingest_db.session.query(SeriesMetricValue).filter_by(execution_id=execution_id).all() + assert len(series) >= 1, "Should have ingested series values" + + outputs = reingest_db.session.query(ExecutionOutput).filter_by(execution_id=execution_id).all() + assert len(outputs) >= 1, "Should have registered outputs" + assert any(o.short_name == "test_plot" for o in outputs) + + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") + def test_ingest_without_optional_outputs( + self, config, reingest_db, reingest_execution_obj, scratch_dir_with_data, mock_result_factory + ): + """Should work with no output_bundle and no series.""" + mock_result = mock_result_factory( + scratch_dir_with_data, output_bundle_filename=None, series_filename=None + ) + cv = CV.load_from_file(config.paths.dimensions_cv) + + ingest_execution_result( + reingest_db, + reingest_execution_obj, + mock_result, + cv, + output_base_path=scratch_dir_with_data, + ) + reingest_db.session.commit() + + execution_id = reingest_execution_obj.id + + scalars = reingest_db.session.query(ScalarMetricValue).filter_by(execution_id=execution_id).all() + assert len(scalars) >= 1, "Should still ingest scalar values" + + series = reingest_db.session.query(SeriesMetricValue).filter_by(execution_id=execution_id).all() + assert len(series) == 0, "Should have no series values" + + outputs = reingest_db.session.query(ExecutionOutput).filter_by(execution_id=execution_id).all() + assert len(outputs) == 0, "Should have no registered outputs" diff --git a/stubs/environs/__init__.pyi b/stubs/environs/__init__.pyi deleted file mode 100644 index 42d589ebe..000000000 --- a/stubs/environs/__init__.pyi +++ /dev/null @@ -1,17 +0,0 @@ -from pathlib import Path - -class Env: - def __init__(self, expand_vars: bool) -> None: ... - def read_env( - self, - path: str | None = None, - recurse: bool = True, - verbose: bool = False, - override: bool = False, - ) -> bool: ... - def path(self, name: str, default: Path | str | None = None) -> Path: ... - def bool(self, name: str, default: bool | None = None) -> bool: ... - def list(self, name: str, default: list[str] | None) -> list[str]: ... - # Ordering matters here and I don't know why - # This `str` method must be declared last to avoid a conflict with the `str` types above - def str(self, name: str, default: str | None = None) -> str: ...