From d1c528a403e35afe84da1174c99b46d03a6c73a4 Mon Sep 17 00:00:00 2001 From: Ken Lippold Date: Thu, 5 Mar 2026 16:54:17 -0800 Subject: [PATCH 1/3] Moved bulk of ETL logic and error handling to hydroserverpy ETL package --- domains/etl/aggregation.py | 291 --------- domains/etl/etl_errors.py | 551 ---------------- domains/etl/internal.py | 243 +++++++ domains/etl/loader.py | 163 ----- domains/etl/models/run.py | 10 - domains/etl/models/task.py | 2 +- domains/etl/run_result_normalizer.py | 125 ---- domains/etl/services/run.py | 14 - domains/etl/services/task.py | 3 +- domains/etl/tasks.py | 939 +++------------------------ 10 files changed, 345 insertions(+), 1996 deletions(-) delete mode 100644 domains/etl/aggregation.py delete mode 100644 domains/etl/etl_errors.py create mode 100644 domains/etl/internal.py delete mode 100644 domains/etl/loader.py delete mode 100644 domains/etl/run_result_normalizer.py diff --git a/domains/etl/aggregation.py b/domains/etl/aggregation.py deleted file mode 100644 index 2c85b893..00000000 --- a/domains/etl/aggregation.py +++ /dev/null @@ -1,291 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from datetime import datetime, time, timedelta, timezone as dt_timezone, tzinfo -import math -import re -from bisect import bisect_left -from typing import Iterable -from zoneinfo import ZoneInfo - - -AGGREGATION_STATISTICS = { - "simple_mean", - "time_weighted_daily_mean", - "last_value_of_day", -} -AGGREGATION_TIMEZONE_MODES = {"fixedOffset", "daylightSavings"} -_FIXED_OFFSET_RE = re.compile(r"^([+-])(\d{2})(\d{2})$") - - -@dataclass(frozen=True) -class AggregationTransformation: - aggregation_statistic: str - timezone_mode: str - timezone: str - - -def _first_non_empty(mapping: dict, keys: Iterable[str]) -> str | None: - for key in keys: - value = mapping.get(key) - if value is None: - continue - if isinstance(value, str): - value = value.strip() - if not value: - continue - return value - return None - - -def _parse_fixed_offset(offset: str) -> tzinfo: - match = _FIXED_OFFSET_RE.fullmatch(offset) - if not match: - raise ValueError("fixedOffset timezone must match +/-HHMM") - - sign, hours_raw, minutes_raw = match.groups() - hours = int(hours_raw) - minutes = int(minutes_raw) - if minutes >= 60: - raise ValueError("fixedOffset timezone minutes must be between 00 and 59") - - offset_delta = timedelta(hours=hours, minutes=minutes) - if sign == "-": - offset_delta = -offset_delta - - return dt_timezone(offset_delta) - - -def timezone_info_for_transformation(transform: AggregationTransformation) -> tzinfo: - if transform.timezone_mode == "fixedOffset": - return _parse_fixed_offset(transform.timezone) - - if transform.timezone_mode == "daylightSavings": - try: - return ZoneInfo(transform.timezone) - except Exception as exc: # pragma: no cover - platform-specific internals - raise ValueError( - "daylightSavings timezone must be a valid IANA timezone" - ) from exc - - raise ValueError(f"Unsupported timezoneMode: {transform.timezone_mode}") - - -def normalize_aggregation_transformation(raw: dict) -> dict: - if not isinstance(raw, dict): - raise ValueError("Aggregation transformation must be an object") - - transform_type = raw.get("type") - if transform_type != "aggregation": - raise ValueError("Aggregation transformation must set type='aggregation'") - - aggregation_statistic = _first_non_empty( - raw, ("aggregationStatistic", "aggregation_statistic") - ) - if not isinstance(aggregation_statistic, str) or aggregation_statistic not in AGGREGATION_STATISTICS: - allowed = ", ".join(sorted(AGGREGATION_STATISTICS)) - raise ValueError(f"aggregationStatistic must be one of: {allowed}") - - timezone_mode = _first_non_empty(raw, ("timezoneMode", "timezone_mode")) - if not isinstance(timezone_mode, str) or timezone_mode not in AGGREGATION_TIMEZONE_MODES: - allowed = ", ".join(sorted(AGGREGATION_TIMEZONE_MODES)) - raise ValueError(f"timezoneMode must be one of: {allowed}") - - timezone_value = _first_non_empty(raw, ("timezone",)) - if not isinstance(timezone_value, str): - raise ValueError("timezone is required for aggregation transformations") - - normalized = { - "type": "aggregation", - "aggregationStatistic": aggregation_statistic, - "timezoneMode": timezone_mode, - "timezone": timezone_value, - } - - # Validate timezone now so malformed configs fail early. - timezone_info_for_transformation( - AggregationTransformation( - aggregation_statistic=aggregation_statistic, - timezone_mode=timezone_mode, - timezone=timezone_value, - ) - ) - - return normalized - - -def parse_aggregation_transformation(raw: dict) -> AggregationTransformation: - normalized = normalize_aggregation_transformation(raw) - return AggregationTransformation( - aggregation_statistic=normalized["aggregationStatistic"], - timezone_mode=normalized["timezoneMode"], - timezone=normalized["timezone"], - ) - - -def _local_midnight(timestamp_utc: datetime, tz: tzinfo) -> datetime: - local = timestamp_utc.astimezone(tz) - return datetime.combine(local.date(), time.min, tzinfo=tz) - - -def closed_window_end_utc(source_end_utc: datetime, transform: AggregationTransformation) -> datetime: - tz = timezone_info_for_transformation(transform) - return _local_midnight(source_end_utc, tz).astimezone(dt_timezone.utc) - - -def first_window_start_utc(source_begin_utc: datetime, transform: AggregationTransformation) -> datetime: - tz = timezone_info_for_transformation(transform) - return _local_midnight(source_begin_utc, tz).astimezone(dt_timezone.utc) - - -def next_window_start_utc(destination_end_utc: datetime, transform: AggregationTransformation) -> datetime: - tz = timezone_info_for_transformation(transform) - destination_local = destination_end_utc.astimezone(tz) - next_date = destination_local.date() + timedelta(days=1) - local_midnight = datetime.combine(next_date, time.min, tzinfo=tz) - return local_midnight.astimezone(dt_timezone.utc) - - -def iter_daily_windows_utc( - start_utc: datetime, - end_utc: datetime, - transform: AggregationTransformation, -): - tz = timezone_info_for_transformation(transform) - - current_local = _local_midnight(start_utc, tz) - end_local = _local_midnight(end_utc, tz) - - while current_local < end_local: - next_local = datetime.combine( - current_local.date() + timedelta(days=1), - time.min, - tzinfo=tz, - ) - yield ( - current_local.astimezone(dt_timezone.utc), - next_local.astimezone(dt_timezone.utc), - current_local.date(), - ) - current_local = next_local - - -def _boundary_value( - target: datetime, - timestamps: list[datetime], - values: list[float], - prev_idx: int | None, - next_idx: int | None, -) -> float | None: - prev = None - nxt = None - - if prev_idx is not None and 0 <= prev_idx < len(timestamps): - prev = (timestamps[prev_idx], values[prev_idx]) - if next_idx is not None and 0 <= next_idx < len(timestamps): - nxt = (timestamps[next_idx], values[next_idx]) - - if prev and prev[0] == target: - return prev[1] - if nxt and nxt[0] == target: - return nxt[1] - - if prev and nxt: - t0, v0 = prev - t1, v1 = nxt - span = (t1 - t0).total_seconds() - if span <= 0: - return v1 - ratio = (target - t0).total_seconds() / span - return v0 + ratio * (v1 - v0) - - if prev: - return prev[1] - if nxt: - return nxt[1] - - return None - - -def aggregate_daily_window( - timestamps: list[datetime], - values: list[float], - window_start_utc: datetime, - window_end_utc: datetime, - statistic: str, -) -> float | None: - if statistic not in AGGREGATION_STATISTICS: - raise ValueError(f"Unsupported aggregationStatistic '{statistic}'") - - if not timestamps or len(timestamps) != len(values): - return None - - if window_end_utc <= window_start_utc: - return None - - left = bisect_left(timestamps, window_start_utc) - right = bisect_left(timestamps, window_end_utc) - - # No observations in this day -> skip writing this day. - if left == right: - return None - - window_values = values[left:right] - - if statistic == "simple_mean": - return sum(window_values) / len(window_values) - - if statistic == "last_value_of_day": - return window_values[-1] - - # Time-weighted daily mean using trapezoidal integration over the daily window. - start_value = _boundary_value( - target=window_start_utc, - timestamps=timestamps, - values=values, - prev_idx=(left - 1) if left > 0 else None, - next_idx=left, - ) - end_value = _boundary_value( - target=window_end_utc, - timestamps=timestamps, - values=values, - prev_idx=(right - 1) if right > 0 else None, - next_idx=right if right < len(timestamps) else None, - ) - - if start_value is None or end_value is None: - return None - - area_points: list[tuple[datetime, float]] = [(window_start_utc, start_value)] - for idx in range(left, right): - ts = timestamps[idx] - val = values[idx] - if ts == window_start_utc: - area_points[0] = (ts, val) - continue - area_points.append((ts, val)) - - if area_points[-1][0] == window_end_utc: - area_points[-1] = (window_end_utc, end_value) - else: - area_points.append((window_end_utc, end_value)) - - total_area = 0.0 - for idx in range(1, len(area_points)): - t0, v0 = area_points[idx - 1] - t1, v1 = area_points[idx] - span = (t1 - t0).total_seconds() - if span <= 0: - continue - total_area += (v0 + v1) * 0.5 * span - - duration = (window_end_utc - window_start_utc).total_seconds() - if duration <= 0: - return None - - result = total_area / duration - if math.isnan(result) or math.isinf(result): - return None - - return result diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py deleted file mode 100644 index cfe78394..00000000 --- a/domains/etl/etl_errors.py +++ /dev/null @@ -1,551 +0,0 @@ -from __future__ import annotations - -import ast -import json -import re -from typing import Any, Iterable, Optional - -from pydantic import ValidationError - - -class EtlUserFacingError(Exception): - """ - Exception intended to be shown to end users (TaskDetails "run message"). - - Keep this as a single readable string. Avoid structured payloads. - """ - - -_EXTRACTOR_ALIAS_MAP: dict[str, str] = { - "source_uri": "sourceUri", - "placeholder_variables": "placeholderVariables", - "run_time_value": "runTimeValue", -} - -_TRANSFORMER_ALIAS_MAP: dict[str, str] = { - "header_row": "headerRow", - "data_start_row": "dataStartRow", - "identifier_type": "identifierType", - "custom_format": "customFormat", - "timezone_mode": "timezoneMode", - "run_time_value": "runTimeValue", - "jmespath": "JMESPath", - "target_identifier": "targetIdentifier", - "source_identifier": "sourceIdentifier", - "data_transformations": "dataTransformations", - "rating_curve_url": "ratingCurveUrl", -} - - -def _alias(component: str, field: str) -> str: - if component == "extractor": - return _EXTRACTOR_ALIAS_MAP.get(field, field) - if component == "transformer": - return _TRANSFORMER_ALIAS_MAP.get(field, field) - return field - - -def _format_loc(component: str, loc: Iterable[Any]) -> str: - loc_list = list(loc) - # Strip pydantic union branch model names from the front. - if ( - component == "extractor" - and loc_list - and loc_list[0] - in ( - "HTTPExtractor", - "LocalFileExtractor", - ) - ): - loc_list = loc_list[1:] - if ( - component == "transformer" - and loc_list - and loc_list[0] - in ( - "JSONTransformer", - "CSVTransformer", - ) - ): - loc_list = loc_list[1:] - - parts: list[str] = [] - for item in loc_list: - if isinstance(item, int): - if not parts: - parts.append(f"[{item}]") - else: - parts[-1] = f"{parts[-1]}[{item}]" - continue - if isinstance(item, str): - parts.append(_alias(component, item)) - continue - parts.append(str(item)) - - if not parts: - return component - return ".".join([component] + parts) - - -def _jsonish(value: Any) -> str: - if value is None: - return "null" - if isinstance(value, str): - if value == "": - return '""' - return repr(value) - return repr(value) - - -def user_facing_error_from_validation_error( - component: str, - exc: ValidationError, - *, - raw: Optional[dict[str, Any]] = None, -) -> EtlUserFacingError: - """ - Convert pydantic's ValidationError into one readable, actionable sentence. - """ - errs = exc.errors(include_url=False) - - # Unions emit errors for every branch. Filter to the selected type when possible. - if raw and component in ("extractor", "transformer"): - raw_type = raw.get("type") - type_to_model = { - "extractor": {"HTTP": "HTTPExtractor", "local": "LocalFileExtractor"}, - "transformer": {"JSON": "JSONTransformer", "CSV": "CSVTransformer"}, - } - selected_model = type_to_model.get(component, {}).get(raw_type) - if selected_model: - errs = [ - e for e in errs if not e.get("loc") or e["loc"][0] == selected_model - ] or errs - - if not errs: - return EtlUserFacingError(f"Invalid {component} configuration.") - - first = errs[0] - loc = first.get("loc") or () - msg = first.get("msg") or "Invalid value" - inp = first.get("input", None) - - path = _format_loc(component, loc) - if component == "transformer" and isinstance(raw, dict): - ts = raw.get("timestamp") - if isinstance(ts, dict): - tz_mode = ts.get("timezoneMode") or ts.get("timezone_mode") - tz_val = ts.get("timezone") - if ( - path.endswith("transformer.timestamp.timezone") - and str(tz_mode) == "daylightSavings" - ): - if tz_val is None or str(tz_val).strip() == "": - return EtlUserFacingError( - "Timezone information is required when daylight savings mode is enabled. " - "Select a valid timezone such as America/Denver and try again." - ) - if "Invalid timezone" in str(msg): - return EtlUserFacingError( - "The configured timezone is not recognized. " - "Use a valid IANA timezone such as America/Denver and run the job again." - ) - - message = ( - f"Invalid {component} configuration at {path}: {msg} (got {_jsonish(inp)}). " - f"Update the Data Connection {component} settings." - ) - return EtlUserFacingError(message) - - -_MISSING_PER_TASK_VAR_RE = re.compile(r"Missing per-task variable '([^']+)'") -_MISSING_PLACEHOLDER_VAR_RE = re.compile(r"Missing placeholder variable: (.+)$") -_TIMESTAMP_COL_NOT_FOUND_RE = re.compile( - r"Timestamp column '([^']*)' not found in data\." -) - -_MISSING_REQUIRED_TASK_VAR_RE = re.compile( - r"Missing required per-task extractor variable '([^']+)'" -) -_MISSING_URI_PLACEHOLDER_RE = re.compile( - r"Extractor source URI contains a placeholder '([^']+)', but it was not provided" -) -_SOURCE_INDEX_OOR_RE = re.compile( - r"Source index (\d+) is out of range for extracted data\." -) -_SOURCE_COL_NOT_FOUND_RE = re.compile( - r"Source column '([^']+)' not found in extracted data\." -) -_USECOLS_NOT_FOUND_RE = re.compile( - r"columns expected but not found:\s*(\[[^\]]*\])", - re.IGNORECASE, -) - - -def _iter_exception_chain(exc: Exception) -> Iterable[Exception]: - seen: set[int] = set() - current: Optional[Exception] = exc - while current is not None and id(current) not in seen: - seen.add(id(current)) - yield current - next_exc = current.__cause__ or current.__context__ - current = next_exc if isinstance(next_exc, Exception) else None - - -def _extract_missing_usecols(exc: Exception) -> list[str]: - for err in _iter_exception_chain(exc): - msg = str(err) - match = _USECOLS_NOT_FOUND_RE.search(msg) - if not match: - continue - - raw_list = match.group(1) - try: - parsed = ast.literal_eval(raw_list) - if isinstance(parsed, (list, tuple, set)): - cols = [str(c).strip() for c in parsed if str(c).strip()] - if cols: - return cols - except Exception: - pass - - inner = raw_list.strip()[1:-1] - if inner: - cols = [part.strip().strip("'\"") for part in inner.split(",")] - cols = [c for c in cols if c] - if cols: - return cols - return [] - - -def _format_cols(cols: list[str], max_cols: int = 4) -> str: - shown = [f"'{c}'" for c in cols[:max_cols]] - if len(cols) > max_cols: - shown.append(f"+{len(cols) - max_cols} more") - return ", ".join(shown) - - -def user_facing_error_from_exception( - exc: Exception, - *, - transformer_raw: Optional[dict[str, Any]] = None, -) -> Optional[EtlUserFacingError]: - """ - Map common ETL/hydroserverpy exceptions to a single readable message. - """ - if isinstance(exc, EtlUserFacingError): - return exc - - if isinstance(exc, ValidationError): - return None - - if isinstance(exc, KeyError): - msg = exc.args[0] if exc.args and isinstance(exc.args[0], str) else str(exc) - m = _MISSING_PER_TASK_VAR_RE.search(msg) - if m: - name = m.group(1) - return EtlUserFacingError( - f"A required task variable named '{name}' was not provided. " - "Add a value for it in the task configuration and run the job again." - ) - m = _MISSING_PLACEHOLDER_VAR_RE.search(msg) - if m: - name = m.group(1).strip() - return EtlUserFacingError( - f"The extractor URL includes a placeholder '{name}', but no value was supplied. " - "Provide the missing value in the task variables." - ) - - msg_str = str(exc) - - if isinstance(exc, TypeError) and "JSONTransformer received None" in msg_str: - return EtlUserFacingError( - "The transformer did not receive any extracted data to parse. " - "Confirm the extractor is returning a valid JSON payload." - ) - - if isinstance(exc, TypeError) and "CSVTransformer received None" in msg_str: - return EtlUserFacingError( - "The transformer did not receive any extracted data to parse. " - "Confirm the extractor is returning a valid CSV payload." - ) - - if ( - ("NoneType" in msg_str or "nonetype" in msg_str.lower()) - and "string" in msg_str.lower() - and "assign" in msg_str.lower() - ): - return EtlUserFacingError( - "A required configuration value is null where a string is expected. " - "Provide the missing value in your ETL configuration JSON." - ) - - # django-ninja HttpError (avoid importing ninja here to keep module import-safe) - status = getattr(exc, "status_code", None) - if status is not None and exc.__class__.__name__ == "HttpError": - message = getattr(exc, "message", None) or msg_str - if "Datastream does not exist" in message: - return EtlUserFacingError( - "One or more destination datastream identifiers could not be found in HydroServer. " - "Update the task mappings to use valid datastream IDs." - ) - if status in (401, 403): - return EtlUserFacingError( - "HydroServer rejected the load due to authorization. " - "Confirm the target datastream(s) belong to this workspace and the job has permission to write." - ) - if status >= 400: - return EtlUserFacingError( - "HydroServer rejected some or all of the data. " - "Verify the transformed timestamps/values are valid and the target datastream mappings are correct." - ) - - if isinstance(exc, ValueError): - # Extractor placeholder/variable resolution - m = _MISSING_REQUIRED_TASK_VAR_RE.search(msg_str) - if m: - name = m.group(1) - return EtlUserFacingError( - f"A required task variable named '{name}' was not provided. " - "Add a value for it in the task configuration and run the job again." - ) - m = _MISSING_URI_PLACEHOLDER_RE.search(msg_str) - if m: - name = m.group(1) - return EtlUserFacingError( - f"The extractor URL includes a placeholder '{name}', but no value was supplied. " - "Provide the missing value in the task variables." - ) - - if "identifierType='index' requires timestamp.key" in msg_str: - return EtlUserFacingError( - "The timestamp column is set incorrectly. Index mode expects a 1-based column number (1 for the first column). " - "Update the timestamp setting to a valid column index." - ) - - if msg_str.startswith( - "One or more timestamps could not be read with the current settings" - ): - return EtlUserFacingError( - "One or more timestamps could not be read using the current format and timezone settings. " - "Confirm how dates appear in the source file and update the transformer configuration to match." - ) - - if ( - msg_str - == "One or more configured CSV columns were not found in the header row." - ): - missing_cols = _extract_missing_usecols(exc) - if len(missing_cols) > 1: - return EtlUserFacingError( - f"Configured CSV columns were not found in the file header ({_format_cols(missing_cols)}). " - "This often means the delimiter or headerRow setting is incorrect. " - "Verify the delimiter and headerRow settings, then run the job again." - ) - if len(missing_cols) == 1 and isinstance(transformer_raw, dict): - ts_cfg = transformer_raw.get("timestamp") - ts_key = ts_cfg.get("key") if isinstance(ts_cfg, dict) else None - if ts_key is not None and str(missing_cols[0]) == str(ts_key): - col = missing_cols[0] - return EtlUserFacingError( - f"The configured timestamp column '{col}' was not found in the file header. " - "Confirm the timestamp mapping and verify the delimiter/headerRow settings match the source file." - ) - return EtlUserFacingError( - "A required column was not found in the file header. " - "The source file may have changed or the header row may be set incorrectly. " - "Confirm the file layout and update the column mappings if needed." - ) - if ( - msg_str - == "The header row contained unexpected values and could not be processed." - ): - return EtlUserFacingError( - "A required column was not found in the file header. " - "The source file may have changed or the header row may be set incorrectly. " - "Confirm the file layout and update the column mappings if needed." - ) - if ( - msg_str - == "One or more data rows contained unexpected values and could not be processed." - ): - return EtlUserFacingError( - "A required column was not found in the file header. " - "The source file may have changed or the header row may be set incorrectly. " - "Confirm the file layout and update the column mappings if needed." - ) - - if msg_str.startswith("Rating curve transformation is missing ratingCurveUrl"): - return EtlUserFacingError( - "A rating curve transformation is missing the rating curve URL. " - "Set a ratingCurveUrl for the mapping and run the job again." - ) - - if msg_str.startswith("Rating curve file not found at"): - return EtlUserFacingError( - "The configured rating curve file could not be found. " - "Verify the ratingCurveUrl is correct and points to an existing file." - ) - - if msg_str.startswith("Timed out while retrieving rating curve from"): - return EtlUserFacingError( - "The task timed out while trying to load the rating curve file. " - "Verify the file URL is reachable and run the job again." - ) - - if ( - msg_str.startswith("Could not connect to rating curve URL") - or msg_str.startswith("Failed to retrieve rating curve from") - or msg_str.startswith("Rating curve request to") - ): - return EtlUserFacingError( - "The task could not download the configured rating curve file. " - "Verify the file exists and can be reached, then run the job again." - ) - - if msg_str.startswith("Authentication failed while retrieving rating curve from"): - return EtlUserFacingError( - "The task is not authorized to access the configured rating curve file. " - "Use a rating curve file stored in HydroServer and run the job again." - ) - - if msg_str.startswith("Unable to read rating curve file from"): - return EtlUserFacingError( - "The configured rating curve file could not be read. " - "Replace it with a valid CSV file and run the job again." - ) - - if msg_str.startswith("Rating curve at") and "is empty" in msg_str: - return EtlUserFacingError( - "The configured rating curve file is empty. " - "Upload a valid rating curve file and run the job again." - ) - - if msg_str.startswith("Rating curve at") and "not a valid CSV file" in msg_str: - return EtlUserFacingError( - "The configured rating curve file is not a valid CSV file. " - "Upload a valid rating curve CSV using the standardized two-column format." - ) - - if msg_str.startswith("Rating curve at") and "at least two columns" in msg_str: - return EtlUserFacingError( - "The rating curve CSV must include two columns (input on the left, output on the right). " - "Update the file and run the job again." - ) - - if msg_str.startswith("Rating curve at") and "at least two numeric rows" in msg_str: - return EtlUserFacingError( - "The rating curve CSV must include at least two numeric rows. " - "Update the file and run the job again." - ) - - # JSON transformer common configuration errors - if msg_str == "The payload's expected fields were not found.": - return EtlUserFacingError( - "Failed to find the timestamp or value using the current JSON query. " - "Confirm the JMESPath expression matches the structure returned by the source." - ) - if ( - msg_str - == "The timestamp or value key could not be found with the specified query." - ): - return EtlUserFacingError( - "Failed to find the timestamp or value using the current JSON query. " - "Confirm the JMESPath expression matches the structure returned by the source." - ) - - m = _TIMESTAMP_COL_NOT_FOUND_RE.search(msg_str) - if m: - col = m.group(1) - return EtlUserFacingError( - f"The configured timestamp column '{col}' was not found in the file header. " - "Confirm the timestamp mapping and verify the delimiter/headerRow settings match the source file." - ) - - m = _SOURCE_INDEX_OOR_RE.search(msg_str) - if m: - idx = m.group(1) - return EtlUserFacingError( - f"A mapping source index ({idx}) is out of range for the extracted data. " - "Update task.mappings sourceIdentifier values (or switch identifierType) to match the extracted columns." - ) - - m = _SOURCE_COL_NOT_FOUND_RE.search(msg_str) - if m: - col = m.group(1) - return EtlUserFacingError( - f"A mapped field named '{col}' was not found in the extracted data. " - "Update the task mapping so the source identifier matches the JSON." - ) - - # JSON decode failures (usually extractor returned HTML/text instead of JSON) - if isinstance(exc, json.JSONDecodeError): - return EtlUserFacingError( - "The source did not return valid JSON. " - "Verify the URL points to a JSON endpoint." - ) - - if msg_str == "Could not connect to the source system.": - return EtlUserFacingError( - "Failed to connect to the source system. This may be temporary; try again shortly. " - "If it persists, the source system may be offline." - ) - - if msg_str == "The requested data could not be found on the source system.": - return EtlUserFacingError( - "The requested data could not be found on the source system. " - "Verify the URL is correct and that the file or endpoint still exists." - ) - - if msg_str.startswith("Authentication with the source system failed."): - return EtlUserFacingError( - "Authentication with the source system failed. The username, password, or token may be incorrect or expired. " - "Update the credentials and try again." - ) - - if msg_str in ( - "The connection to the source worked but no observations were returned.", - ): - return EtlUserFacingError( - "No observations were returned from the source system. " - "Confirm the configured source system has observations available for the requested time range." - ) - - # Backward-compatible mappings for older hydroserverpy strings. - if msg_str == "The requested payload was not found on the source system.": - return EtlUserFacingError( - "The requested data could not be found on the source system. " - "Verify the URL is correct and that the file or endpoint still exists." - ) - - if msg_str == "The source system returned no data.": - return EtlUserFacingError( - "No observations were returned from the source system. " - "Confirm the configured source system has observations available for the requested time range." - ) - - if ( - msg_str - == "Authentication with the source system failed; credentials may be invalid or expired." - ): - return EtlUserFacingError( - "Authentication with the source system failed. The username, password, or token may be incorrect or expired. " - "Update the credentials and try again." - ) - - if "jmespath.exceptions" in msg_str or "Parse error at column" in msg_str: - return EtlUserFacingError( - "The JSON query used to extract timestamps or values is invalid or returned unexpected data. " - "Review and correct the JMESPath expression." - ) - - if msg_str in ( - "The target datastream could not be found.", - "The target data series (datastream) could not be found.", - "The target datastream was not found.", - ): - return EtlUserFacingError( - "One or more destination datastream identifiers could not be found in HydroServer. " - "Update the task mappings to use valid datastream IDs." - ) - - return None diff --git a/domains/etl/internal.py b/domains/etl/internal.py new file mode 100644 index 00000000..4e316847 --- /dev/null +++ b/domains/etl/internal.py @@ -0,0 +1,243 @@ +import uuid +import logging +import traceback +import pandas as pd +from datetime import datetime +from io import BytesIO +from typing import Union, Optional, TextIO +from pydantic import ConfigDict +from django.db.models import Min +from domains.sta.models import Datastream, Observation +from domains.sta.services import ObservationService +from interfaces.api.schemas.observation import ObservationBulkPostBody +from hydroserverpy.etl import extractors, transformers +from hydroserverpy.etl.transformers import ETLDataMapping +from hydroserverpy.etl.loaders import Loader, ETLLoaderResult, ETLTargetResult +from hydroserverpy.etl.exceptions import ETLError + + +logger = logging.getLogger(__name__) +observation_service = ObservationService() + + +class HydroServerInternalExtractor(extractors.Extractor): + def extract( + self, + **kwargs + ) -> Union[str, TextIO, BytesIO]: + """ + HydroServer's internal transformer will query data directly from the ORM to build a DataFrame. + Since there's nothing to extract in this step, just return an empty string. + """ + + return "" + + +class HydroServerInternalTransformer(transformers.Transformer): + def transform( + self, + payload: Union[str, TextIO, BytesIO], + data_mappings: list[ETLDataMapping], + **kwargs + ) -> pd.DataFrame: + """ + Load datastream observations from the ORM into a DataFrame. + + The payload is ignored; data is sourced directly from the Observation ORM. + For each data mapping, the source datastream is queried once and the resulting + DataFrame is fanned out to each target path. Per-target phenomenon_end_time + filtering is applied in pandas after the query to avoid re-querying the ORM + for one-to-many mappings. + """ + + result_dfs = [] + + for data_mapping in data_mappings: + source_datastream_id = uuid.UUID(str(data_mapping.source_identifier)) + + earliest_end_time = Datastream.objects.filter( + id__in=[target_path.target_identifier for target_path in data_mapping.target_paths] + ).aggregate( + earliest=Min("phenomenon_end_time") + )["earliest"] + + queryset = Observation.objects.filter(datastream_id=source_datastream_id) + if earliest_end_time is not None: + queryset = queryset.filter(phenomenon_time__gt=earliest_end_time) + + source_df = pd.DataFrame(list(queryset.values("phenomenon_time", "result"))) + + if source_df.empty: + continue + + source_df = source_df.rename( + columns={"phenomenon_time": "timestamp", "result": "value"} + ) + + for target_path in data_mapping.target_paths: + target_id = str(target_path.target_identifier) + + phenomenon_end_time = ( + Datastream.objects.filter(id=target_path.target_identifier) + .values_list("phenomenon_end_time", flat=True) + .first() + ) + + target_df = source_df.copy() + + if phenomenon_end_time is not None: + target_df = target_df[target_df["timestamp"] > phenomenon_end_time] + + if target_df.empty: + continue + + target_df = target_df.copy() + target_df["target_id"] = target_id + result_dfs.append(target_df) + + if not result_dfs: + return pd.DataFrame(columns=["timestamp", "value", "target_id"]) + + return self.standardize_dataframe( + pd.concat(result_dfs, ignore_index=True), + data_mappings, + ) + + +class HydroServerInternalLoader(Loader): + chunk_size: int = 5000 + + model_config = ConfigDict(arbitrary_types_allowed=True) + + def load( + self, + payload: pd.DataFrame, + **kwargs + ) -> ETLLoaderResult: + """ + Load observations from a DataFrame to corresponding HydroServer datastreams. + """ + + target_ids = payload["target_id"].unique() + + logger.debug("Resolving %s destination datastream(s).", len(target_ids)) + + datastreams = {} + missing_datastreams = [] + task = kwargs["task_instance"] + + for target_id in target_ids: + try: + datastreams[target_id] = Datastream.objects.get(pk=uuid.UUID(target_id)) + except Datastream.DoesNotExist: + missing_datastreams.append(target_id) + except Exception as e: + raise ETLError( + f"Encountered an unexpected error " + f"while loading destination datastream with ID: '{target_id}'. " + f"Ensure the datastream UUID is formatted correctly." + ) from e + + if missing_datastreams: + raise ETLError( + f"One or more destination datastreams do not exist on this HydroServer instance. " + f"Ensure the datastream IDs are correct. " + f"Missing datastream IDs: {', '.join(sorted(missing_datastreams))}." + ) + + earliest_phenomenon_end_time = min( + ( + datastream.phenomenon_end_time for datastream in datastreams.values() + if datastream.phenomenon_end_time is not None + ), default=None, + ) + + if earliest_phenomenon_end_time is not None: + payload = payload[payload["timestamp"] > earliest_phenomenon_end_time] + + etl_results = ETLLoaderResult() + + for target_id, datastream in datastreams.items(): + etl_results.target_results[target_id] = ETLTargetResult( + target_identifier=target_id, + ) + + datastream_df = ( + payload[payload["target_id"] == target_id][["timestamp", "value"]] + .dropna(subset=["value"]) + .copy() + ) + + if datastream.phenomenon_end_time is not None: + datastream_df = datastream_df.loc[datastream_df["timestamp"] > datastream.phenomenon_end_time] + + if datastream_df.empty: + etl_results.skipped_count += 1 + etl_results.target_results[target_id].status = "skipped" + continue + + datastream_observations_to_load = len(datastream_df) + + logger.info( + "Uploading %s observation(s) to datastream %s (%s chunk(s), chunk_size=%s)", + datastream_observations_to_load, + target_id, + (datastream_observations_to_load + self.chunk_size - 1) // self.chunk_size, + self.chunk_size, + ) + + for start_idx in range(0, datastream_observations_to_load, self.chunk_size): + end_idx = min(start_idx + self.chunk_size, datastream_observations_to_load) + chunk = datastream_df.iloc[start_idx:end_idx] + + try: + observation_service.bulk_create( + principal=task.workspace.owner, + data=ObservationBulkPostBody( + fields=["phenomenonTime", "result"], + data=chunk.values.tolist(), + ), + datastream_id=datastream.pk, + mode="append", + ) + etl_results.target_results[target_id].values_loaded += len(chunk) + except Exception as e: + etl_results.target_results[target_id].status = "failed" + etl_results.target_results[target_id].error = str(e) + etl_results.target_results[target_id].traceback = traceback.format_exc() + break + + if not etl_results.target_results[target_id].values_loaded > 0: + etl_results.target_results[target_id].status = "skipped" + + if etl_results.target_results[target_id].status not in ["skipped", "failed"]: + etl_results.target_results[target_id].status = "success" + + etl_results.aggregate_results() + + return etl_results + + def target_loaded_through( + self, + target_identifier: Union[str, int] + ) -> Optional[datetime]: + """ + Retrieve the timestamp through which data is loaded for a given HydroServer datastream. + """ + + try: + datastream = Datastream.objects.get(pk=uuid.UUID(target_identifier)) + except Datastream.DoesNotExist as e: + raise ETLError( + f"The destination datastream with ID '{target_identifier}' " + f"does not exist on this HydroServer instance. " + f"Ensure the datastream ID is correct." + ) from e + except Exception as e: + raise ETLError( + f"Encountered an unexpected error " + f"while loading destination datastream with ID: '{target_identifier}'. " + f"Ensure the datastream UUID is formatted correctly." + ) from e + + return datastream.phenomenon_end_time diff --git a/domains/etl/loader.py b/domains/etl/loader.py deleted file mode 100644 index 88ad1361..00000000 --- a/domains/etl/loader.py +++ /dev/null @@ -1,163 +0,0 @@ -from __future__ import annotations -from uuid import UUID -from dataclasses import dataclass -from typing import Any - -from hydroserverpy.etl.loaders.base import Loader -import logging -import pandas as pd -from datetime import datetime, timezone as dt_timezone -from django.db.models import Min, Value -from django.db.models.functions import Coalesce -from domains.etl.models import Task -from domains.sta.services import ObservationService -from domains.sta.models import Datastream -from interfaces.api.schemas.observation import ObservationBulkPostBody - -observation_service = ObservationService() - - -@dataclass(frozen=True) -class LoadSummary: - cutoff: str - timestamps_total: int - timestamps_after_cutoff: int - observations_available: int - observations_loaded: int - datastreams_loaded: int - - -class HydroServerInternalLoader(Loader): - """ - A class that extends the HydroServer client with ETL-specific functionalities. - """ - - def __init__(self, task): - self._begin_cache: dict[str, pd.Timestamp] = {} - self.task = task - - def load(self, data: pd.DataFrame, task: Task) -> LoadSummary: - """ - Load observations from a DataFrame to the HydroServer. - """ - begin_date = self.earliest_begin_date(task) - new_data = data[data["timestamp"] > begin_date] - - cutoff_value = ( - begin_date.isoformat() - if hasattr(begin_date, "isoformat") - else str(begin_date) - ) - timestamps_total = len(data) - timestamps_after_cutoff = len(new_data) - observations_available = 0 - observations_loaded = 0 - datastreams_loaded = 0 - - for col in new_data.columns.difference(["timestamp"]): - df = ( - new_data[["timestamp", col]] - .rename(columns={col: "value"}) - .dropna(subset=["value"]) - ) - available = len(df) - observations_available += available - if available == 0: - logging.warning( - "No new observations for %s after filtering; skipping.", col - ) - continue - - df = df.rename(columns={"timestamp": "phenomenonTime", "value": "result"}) - - loaded = 0 - # Chunked upload - CHUNK_SIZE = 5000 - total = len(df) - chunks = (total + CHUNK_SIZE - 1) // CHUNK_SIZE - logging.info( - "Uploading %s observation(s) to datastream %s (%s chunk(s), chunk_size=%s)", - total, - col, - chunks, - CHUNK_SIZE, - ) - for start in range(0, total, CHUNK_SIZE): - end = min(start + CHUNK_SIZE, total) - chunk = df.iloc[start:end] - logging.debug( - "Uploading chunk to datastream %s: rows %s-%s (%s rows)", - col, - start, - end - 1, - len(chunk), - ) - - chunk_data = ObservationBulkPostBody( - fields=["phenomenonTime", "result"], - data=chunk.values.tolist(), - ) - - try: - observation_service.bulk_create( - principal=self.task.data_connection.workspace.owner, - data=chunk_data, - datastream_id=UUID(col), - mode="append", - ) - loaded += len(chunk) - except Exception as e: - status = getattr(e, "status_code", None) or getattr( - getattr(e, "response", None), "status_code", None - ) - if status == 409 or "409" in str(e) or "Conflict" in str(e): - logging.info( - "409 Conflict for datastream %s on rows %s-%s; skipping remainder for this stream.", - col, - start, - end - 1, - ) - break - raise - - if loaded > 0: - datastreams_loaded += 1 - observations_loaded += loaded - - return LoadSummary( - cutoff=cutoff_value, - timestamps_total=timestamps_total, - timestamps_after_cutoff=timestamps_after_cutoff, - observations_available=observations_available, - observations_loaded=observations_loaded, - datastreams_loaded=datastreams_loaded, - ) - - @staticmethod - def _fetch_earliest_begin(task: Task) -> pd.Timestamp: - logging.info( - "Checking HydroServer for the most recent data already stored (so we only extract new observations)..." - ) - - return Datastream.objects.filter( - id__in={ - path.target_identifier - for mapping in task.mappings.all() - for path in mapping.paths.all() - } - ).aggregate( - earliest_end=Coalesce( - Min("phenomenon_end_time"), Value(datetime(1880, 1, 1, tzinfo=dt_timezone.utc)) - ) - )[ - "earliest_end" - ] - - def earliest_begin_date(self, task: Task) -> pd.Timestamp: - """ - Return earliest begin date for a payload, or compute+cache it on first call. - """ - key = task.name - if key not in self._begin_cache: - self._begin_cache[key] = self._fetch_earliest_begin(task) - return self._begin_cache[key] diff --git a/domains/etl/models/run.py b/domains/etl/models/run.py index 3136d10d..8928fad2 100644 --- a/domains/etl/models/run.py +++ b/domains/etl/models/run.py @@ -1,7 +1,6 @@ import uuid6 from django.db import models from .task import Task -from domains.etl.run_result_normalizer import normalize_task_run_result, task_transformer_raw class TaskRun(models.Model): @@ -11,12 +10,3 @@ class TaskRun(models.Model): started_at = models.DateTimeField(auto_now_add=True) finished_at = models.DateTimeField(null=True, blank=True) result = models.JSONField(blank=True, null=True) - - def save(self, *args, **kwargs): - transformer_raw = task_transformer_raw(self.task) if self.task_id else None - self.result = normalize_task_run_result( - status=self.status, - result=self.result, - transformer_raw=transformer_raw, - ) - super().save(*args, **kwargs) diff --git a/domains/etl/models/task.py b/domains/etl/models/task.py index ea173e18..23a1bb23 100644 --- a/domains/etl/models/task.py +++ b/domains/etl/models/task.py @@ -115,7 +115,7 @@ class TaskMappingPath(models.Model): id = models.UUIDField(primary_key=True, default=uuid6.uuid7, editable=False) task_mapping = models.ForeignKey(TaskMapping, on_delete=models.CASCADE, related_name="paths") target_identifier = models.CharField(max_length=255) - data_transformations = models.JSONField(default=dict) + data_transformations = models.JSONField(default=list, blank=True) @receiver(pre_delete, sender=Task) diff --git a/domains/etl/run_result_normalizer.py b/domains/etl/run_result_normalizer.py deleted file mode 100644 index f9f525af..00000000 --- a/domains/etl/run_result_normalizer.py +++ /dev/null @@ -1,125 +0,0 @@ -from __future__ import annotations - -import re -from typing import Any, Optional - -from .etl_errors import user_facing_error_from_exception - - -_SUCCESS_LOAD_RE = re.compile( - r"^Load complete\.\s*(\d+)\s+rows were added to\s+(\d+)\s+datastreams?\.$" -) -_SUCCESS_LOADED_RE = re.compile( - r"^Loaded\s+(\d+)\s+total observations\s+(?:into|across)\s+(\d+)\s+datastream(?:s|\(s\))\.$" -) -_FAILURE_STAGE_PREFIX_RE = re.compile( - r"^(?:Setup failed|Failed during [^:]+):\s*", - re.IGNORECASE, -) - - -def task_transformer_raw(task: Any) -> Optional[dict[str, Any]]: - if task is None: - return None - - data_connection = getattr(task, "data_connection", None) - if data_connection is None: - return None - - raw_settings = getattr(data_connection, "transformer_settings", None) or {} - if not isinstance(raw_settings, dict): - return None - - raw: dict[str, Any] = dict(raw_settings) - transformer_type = getattr(data_connection, "transformer_type", None) - if transformer_type and "type" not in raw: - raw["type"] = transformer_type - return raw - - -def _format_loaded_success_message(loaded: int, ds_count: int) -> str: - preposition = "into" if ds_count == 1 else "across" - ds_word = "datastream" if ds_count == 1 else "datastreams" - return f"Loaded {loaded} total observations {preposition} {ds_count} {ds_word}." - - -def _extract_message(result: dict[str, Any]) -> Optional[str]: - for key in ("message", "summary", "error", "detail"): - val = result.get(key) - if isinstance(val, str) and val.strip(): - return val.strip() - return None - - -def _normalize_success_message(message: str) -> str: - m = _SUCCESS_LOAD_RE.match(message) - if m: - return _format_loaded_success_message(int(m.group(1)), int(m.group(2))) - - m = _SUCCESS_LOADED_RE.match(message) - if m: - return _format_loaded_success_message(int(m.group(1)), int(m.group(2))) - - if ( - message - == "Already up to date. No new observations were loaded because all timestamps in the source are older than what is already stored." - ): - return "Already up to date. No new observations were loaded." - - return message - - -def _normalize_failure_message( - message: str, - *, - transformer_raw: Optional[dict[str, Any]] = None, -) -> str: - candidate = _FAILURE_STAGE_PREFIX_RE.sub("", message).strip() - - if candidate.startswith("Error reading CSV data:"): - candidate = candidate.split("Error reading CSV data:", 1)[1].strip() - - mapped = user_facing_error_from_exception( - ValueError(candidate), - transformer_raw=transformer_raw, - ) - if mapped: - return str(mapped) - return candidate or message - - -def normalize_task_run_result( - *, - status: str, - result: Any, - transformer_raw: Optional[dict[str, Any]] = None, -) -> Any: - if result is None: - return None - - normalized: dict[str, Any] - if isinstance(result, dict): - normalized = dict(result) - else: - normalized = {"message": str(result)} - - message = _extract_message(normalized) - if not message: - return normalized - - if status == "SUCCESS": - normalized_message = _normalize_success_message(message) - elif status == "FAILURE": - normalized_message = _normalize_failure_message( - message, - transformer_raw=transformer_raw, - ) - else: - normalized_message = message - - normalized["message"] = normalized_message - summary = normalized.get("summary") - if not isinstance(summary, str) or not summary.strip() or summary.strip() == message: - normalized["summary"] = normalized_message - - return normalized diff --git a/domains/etl/services/run.py b/domains/etl/services/run.py index 757864a2..b0ce8684 100644 --- a/domains/etl/services/run.py +++ b/domains/etl/services/run.py @@ -6,10 +6,6 @@ from django.contrib.auth import get_user_model from domains.iam.models import APIKey from domains.etl.models import TaskRun -from domains.etl.run_result_normalizer import ( - normalize_task_run_result, - task_transformer_raw, -) from interfaces.api.schemas import TaskRunFields, TaskRunPostBody, TaskRunPatchBody, TaskRunOrderByFields from interfaces.api.service import ServiceUtils from .task import TaskService @@ -87,11 +83,6 @@ def create( ) task_run_data = data.dict(include=set(TaskRunFields.model_fields.keys())) - task_run_data["result"] = normalize_task_run_result( - status=task_run_data["status"], - result=task_run_data.get("result"), - transformer_raw=task_transformer_raw(task), - ) try: task_run = TaskRun.objects.create( @@ -132,11 +123,6 @@ def update( for field, value in task_run_data.items(): setattr(task_run, field, value) - task_run.result = normalize_task_run_result( - status=task_run.status, - result=task_run.result, - transformer_raw=task_transformer_raw(task), - ) task_run.save() return self.get( diff --git a/domains/etl/services/task.py b/domains/etl/services/task.py index 46a0951f..31ea4d0d 100644 --- a/domains/etl/services/task.py +++ b/domains/etl/services/task.py @@ -19,7 +19,6 @@ TaskRun, ) from domains.sta.models import ThingFileAttachment, Datastream -from domains.etl.aggregation import normalize_aggregation_transformation from interfaces.api.schemas import ( TaskFields, TaskPostBody, @@ -641,7 +640,7 @@ def _validate_aggregation_mapping_constraints( try: path["data_transformations"] = [ - normalize_aggregation_transformation(transformations[0]) + transformations[0] ] except ValueError as exc: raise HttpError(400, str(exc)) from exc diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index 4a51bbe4..1c1bbc9a 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -1,824 +1,118 @@ -import logging -from contextlib import contextmanager -from dataclasses import dataclass, field -from datetime import datetime, timedelta, timezone as dt_timezone -import os -from typing import Any, Optional from uuid import UUID -import pandas as pd +from datetime import timedelta from celery import shared_task -from pydantic import TypeAdapter, ValidationError from celery.signals import task_prerun, task_success, task_failure, task_postrun from django.utils import timezone from django.db.utils import IntegrityError from django.core.management import call_command from domains.etl.models import Task, TaskRun -from domains.sta.models import Datastream, Observation -from domains.sta.services import ObservationService -from interfaces.api.schemas.observation import ObservationBulkPostBody -from .loader import HydroServerInternalLoader, LoadSummary -from .etl_errors import ( - EtlUserFacingError, - user_facing_error_from_exception, - user_facing_error_from_validation_error, -) -from .run_result_normalizer import normalize_task_run_result, task_transformer_raw -from .aggregation import ( - AggregationTransformation, - aggregate_daily_window, - closed_window_end_utc, - first_window_start_utc, - iter_daily_windows_utc, - next_window_start_utc, - parse_aggregation_transformation, -) -from hydroserverpy.etl.factories import extractor_factory, transformer_factory -from hydroserverpy.etl.etl_configuration import ( - ExtractorConfig, - TransformerConfig, - SourceTargetMapping, - MappingPath, -) - - -@dataclass -class TaskRunContext: - stage: str = "setup" - runtime_source_uri: Optional[str] = None - log_handler: Optional["TaskLogHandler"] = None - task_meta: dict[str, Any] = field(default_factory=dict) - - -@dataclass(frozen=True) -class AggregationMapping: - source_datastream_id: UUID - target_datastream_id: UUID - transformation: AggregationTransformation - - -class TaskLogFilter(logging.Filter): - def filter(self, record: logging.LogRecord) -> bool: - path = (record.pathname or "").replace("\\", "/") - return "/hydroserverpy/" in path or "/domains/etl/" in path - - -class TaskLogHandler(logging.Handler): - def __init__(self, context: TaskRunContext): - super().__init__(level=logging.INFO) - self.context = context - self.lines: list[str] = [] - self.entries: list[dict[str, Any]] = [] - self._formatter = logging.Formatter() - - def emit(self, record: logging.LogRecord) -> None: - if not self.filter(record): - return - - message = record.getMessage() - - timestamp = datetime.fromtimestamp( - record.created, tz=dt_timezone.utc - ).isoformat() - line = f"{timestamp} {record.levelname:<8} {message}" - if record.exc_info: - line = f"{line}\n{self._formatter.formatException(record.exc_info)}" - self.lines.append(line) - - entry: dict[str, Any] = { - "timestamp": timestamp, - "level": record.levelname, - "logger": record.name, - "message": message, - "pathname": record.pathname, - "lineno": record.lineno, - } - if record.exc_info: - entry["exception"] = self._formatter.formatException(record.exc_info) - self.entries.append(entry) - - self._capture_runtime_uri(message) - - def _capture_runtime_uri(self, message: str) -> None: - if self.context.runtime_source_uri: - return - if "Resolved runtime source URI:" in message: - self.context.runtime_source_uri = message.split( - "Resolved runtime source URI:", 1 - )[1].strip() - return - if "Requesting data from" in message: - if "→" in message: - self.context.runtime_source_uri = message.split("→", 1)[1].strip() - return - if "from" in message: - self.context.runtime_source_uri = message.split("from", 1)[1].strip() - - def as_text(self) -> str: - return "\n".join(self.lines).strip() - - -TASK_RUN_CONTEXT: dict[str, TaskRunContext] = {} -observation_service = ObservationService() - - -@contextmanager -def capture_task_logs(context: TaskRunContext): - logger = logging.getLogger() - handler = TaskLogHandler(context) - handler.addFilter(TaskLogFilter()) - context.log_handler = handler - - previous_level = logger.level - if previous_level > logging.INFO: - logger.setLevel(logging.INFO) - - logger.addHandler(handler) - try: - yield handler - finally: - logger.removeHandler(handler) - if previous_level > logging.INFO: - logger.setLevel(previous_level) - - -def _is_empty(data: Any) -> bool: - if data is None: - return True - if isinstance(data, pd.DataFrame) and data.empty: - return True - return False - - -def _describe_payload(data: Any) -> dict[str, Any]: - if isinstance(data, pd.DataFrame): - return { - "type": "DataFrame", - "rows": len(data), - "columns": len(data.columns), - } - info: dict[str, Any] = {"type": type(data).__name__} - if isinstance(data, (bytes, bytearray)): - info["bytes"] = len(data) - return info - # BytesIO and similar - try: - buf = getattr(data, "getbuffer", None) - if callable(buf): - info["bytes"] = len(data.getbuffer()) - return info - except Exception: - pass - # Real file handles - try: - fileno = getattr(data, "fileno", None) - if callable(fileno): - info["bytes"] = os.fstat(data.fileno()).st_size - return info - except Exception: - pass - return info - - -def _describe_transformed_data(data: Any) -> dict[str, Any]: - if not isinstance(data, pd.DataFrame): - return {"type": type(data).__name__} - datastreams = [col for col in data.columns if col != "timestamp"] - return { - "type": "DataFrame", - "rows": len(data), - "columns": len(data.columns), - "datastreams": len(datastreams), - } - +from hydroserverpy.etl.hydroserver import build_hydroserver_pipeline +from hydroserverpy.etl.exceptions import ETLError +from .internal import HydroServerInternalExtractor, HydroServerInternalTransformer, HydroServerInternalLoader -def _success_message(load: Optional[LoadSummary]) -> str: - if not load: - return "Load complete." - - loaded = load.observations_loaded - if loaded == 0: - if load.timestamps_total and load.timestamps_after_cutoff == 0: - return "Already up to date. No new observations were loaded." - # Otherwise, we don't have strong evidence for why nothing loaded beyond "no new observations". - return "No new observations were loaded." - - ds_count = load.datastreams_loaded or 0 - preposition = "into" if ds_count == 1 else "across" - ds_word = "datastream" if ds_count == 1 else "datastreams" - return f"Loaded {loaded} total observations {preposition} {ds_count} {ds_word}." - - -def _apply_runtime_uri_aliases(result: dict[str, Any], runtime_source_uri: str) -> None: - result.setdefault("runtime_source_uri", runtime_source_uri) - result.setdefault("runtimeSourceUri", runtime_source_uri) - result.setdefault("runtime_url", runtime_source_uri) - result.setdefault("runtimeUrl", runtime_source_uri) - - -def _apply_log_aliases(result: dict[str, Any]) -> None: - if "log_entries" in result and "logEntries" not in result: - result["logEntries"] = result["log_entries"] - - -def _merge_result_with_context( - result: dict[str, Any], context: Optional[TaskRunContext] -) -> dict[str, Any]: - if "summary" not in result and "message" in result: - result["summary"] = result["message"] - - if context: - if context.runtime_source_uri and not ( - result.get("runtime_source_uri") - or result.get("runtimeSourceUri") - or result.get("runtime_url") - or result.get("runtimeUrl") - ): - _apply_runtime_uri_aliases(result, context.runtime_source_uri) - - if context.log_handler: - if "logs" not in result: - logs_text = context.log_handler.as_text() - if logs_text: - result["logs"] = logs_text - if "log_entries" not in result and context.log_handler.entries: - result["log_entries"] = context.log_handler.entries - - _apply_log_aliases(result) - return result - - -def _build_task_result( - message: str, - context: Optional[TaskRunContext] = None, - *, - stage: Optional[str] = None, - traceback: Optional[str] = None, -) -> dict[str, Any]: - result: dict[str, Any] = {"message": message, "summary": message} - if stage: - result["stage"] = stage - if traceback: - result["traceback"] = traceback - - if context and context.runtime_source_uri: - _apply_runtime_uri_aliases(result, context.runtime_source_uri) - - if context and context.task_meta and "task" not in result: - result["task"] = context.task_meta - - if context and context.log_handler: - logs_text = context.log_handler.as_text() - if logs_text: - result["logs"] = logs_text - if context.log_handler.entries: - result["log_entries"] = context.log_handler.entries - - _apply_log_aliases(result) - return result - - -def _last_logged_error(context: Optional[TaskRunContext]) -> Optional[str]: - if not context or not context.log_handler or not context.log_handler.entries: - return None - for entry in reversed(context.log_handler.entries): - if entry.get("level") == "ERROR": - msg = entry.get("message") - if msg: - return msg - return None - - -def _mapped_csv_error_from_log(last_err: str) -> Optional[str]: - prefix = "Error reading CSV data:" - if not last_err.startswith(prefix): - return None - - detail = last_err[len(prefix) :].strip() - if detail == "One or more configured CSV columns were not found in the header row.": - return ( - "Configured CSV columns were not found in the file header. " - "This often means the delimiter or headerRow setting is incorrect. " - "Verify the delimiter and headerRow settings, then run the job again." - ) - if ( - detail - == "The header row contained unexpected values and could not be processed." - ): - return ( - "A required column was not found in the file header. " - "The source file may have changed or the header row may be set incorrectly. " - "Confirm the file layout and update the column mappings if needed." - ) - if ( - detail - == "One or more data rows contained unexpected values and could not be processed." - ): - return ( - "A required column was not found in the file header. " - "The source file may have changed or the header row may be set incorrectly. " - "Confirm the file layout and update the column mappings if needed." - ) - return None +@shared_task(bind=True, expires=10) +def run_etl_task(self, task_id: str): + """ + Runs a HydroServer ETL task based on the task configuration provided. + """ -def _validate_component_config( - component: str, adapter: TypeAdapter, raw: dict[str, Any] -): try: - return adapter.validate_python(raw) - except ValidationError as ve: - raise user_facing_error_from_validation_error(component, ve, raw=raw) from ve - + task = Task.objects.select_related( + "data_connection" + ).prefetch_related( + "mappings", "mappings__paths" + ).get(pk=UUID(task_id)) + except Task.DoesNotExist as e: + raise ETLError( + f"ETL task with ID '{task_id}' does not exist. " + "The task record may have been deleted before this run could execute." + ) from e + except Exception as e: + raise ETLError( + "Encountered an unexpected error while setting up the ETL task run. " + "See task logs for additional details." + ) from e + + # TODO: HydroServer stored settings and hydroserverpy interface should be better reconciled once automated QA/QC + # design is finalized. -def _parse_datastream_uuid(raw_value: Any, field_name: str) -> UUID: try: - return UUID(str(raw_value)) - except (TypeError, ValueError) as exc: - raise EtlUserFacingError( - f"Aggregation mapping {field_name} must be a valid datastream UUID." - ) from exc - - -def _extract_aggregation_mappings(task: Task) -> list[AggregationMapping]: - task_mappings = list(task.mappings.all()) - if len(task_mappings) < 1: - raise EtlUserFacingError( - "Aggregation tasks must include at least one mapping." - ) - - mappings: list[AggregationMapping] = [] - for task_mapping in task_mappings: - paths = list(task_mapping.paths.all()) - if len(paths) != 1: - raise EtlUserFacingError( - "Aggregation mappings must include exactly one target path per source." - ) - - path = paths[0] - source_id = _parse_datastream_uuid( - task_mapping.source_identifier, "sourceIdentifier" - ) - target_id = _parse_datastream_uuid( - path.target_identifier, "targetIdentifier" - ) - - transformations = path.data_transformations or [] - if not isinstance(transformations, list) or len(transformations) != 1: - raise EtlUserFacingError( - "Aggregation mappings must include exactly one aggregation transformation." - ) - if not isinstance(transformations[0], dict): - raise EtlUserFacingError("Invalid aggregation transformation payload.") - - try: - transformation = parse_aggregation_transformation(transformations[0]) - except ValueError as exc: - raise EtlUserFacingError(str(exc)) from exc - - mappings.append( - AggregationMapping( - source_datastream_id=source_id, - target_datastream_id=target_id, - transformation=transformation, - ) - ) - - return mappings - - -def _fetch_observation_points( - source_datastream_id: UUID, - query_start_utc: datetime, - query_end_utc: datetime, -) -> tuple[list[datetime], list[float]]: - points = list( - Observation.objects.filter( - datastream_id=source_datastream_id, - phenomenon_time__gte=query_start_utc, - phenomenon_time__lt=query_end_utc, - ) - .order_by("phenomenon_time") - .values_list("phenomenon_time", "result") - ) - - previous_point = ( - Observation.objects.filter( - datastream_id=source_datastream_id, - phenomenon_time__lt=query_start_utc, - ) - .order_by("-phenomenon_time") - .values_list("phenomenon_time", "result") - .first() - ) - if previous_point: - points.insert(0, previous_point) - - next_point = ( - Observation.objects.filter( - datastream_id=source_datastream_id, - phenomenon_time__gte=query_end_utc, - ) - .order_by("phenomenon_time") - .values_list("phenomenon_time", "result") - .first() - ) - if next_point: - points.append(next_point) - - cleaned: list[tuple[datetime, float]] = [] - for phenomenon_time, result in points: - try: - result_float = float(result) - except (TypeError, ValueError): - continue - if not pd.notna(result_float): - continue - if cleaned and cleaned[-1][0] == phenomenon_time: - cleaned[-1] = (phenomenon_time, result_float) + if task.task_type == "Aggregation": + etl_classes = { + "extractor_cls": HydroServerInternalExtractor, + "transformer_cls": HydroServerInternalTransformer, + "loader_cls": HydroServerInternalLoader + } else: - cleaned.append((phenomenon_time, result_float)) - - timestamps = [point[0] for point in cleaned] - values = [point[1] for point in cleaned] - return timestamps, values - - -def _load_aggregated_rows(task: Task, target_datastream_id: UUID, rows: list[list[Any]]): - chunk_size = 5000 - for offset in range(0, len(rows), chunk_size): - chunk = rows[offset:offset + chunk_size] - payload = ObservationBulkPostBody( - fields=["phenomenonTime", "result"], - data=chunk, - ) - observation_service.bulk_create( - principal=task.workspace.owner, - data=payload, - datastream_id=target_datastream_id, - mode="append", - ) - - -def _run_aggregation_task(task: Task, context: TaskRunContext) -> dict[str, Any]: - context.stage = "aggregate" - - if not task.workspace.owner: - raise EtlUserFacingError("Task workspace does not have an owner account.") - - mappings = _extract_aggregation_mappings(task) - if not mappings: - return _build_task_result( - "Aggregation task has no mappings. Nothing to do.", - context, - stage=context.stage, - ) - - datastream_ids = { - mapping.source_datastream_id for mapping in mappings - } | {mapping.target_datastream_id for mapping in mappings} - datastreams = Datastream.objects.filter( - id__in=datastream_ids, - thing__workspace_id=task.workspace_id, - ).only("id", "name", "phenomenon_begin_time", "phenomenon_end_time") - datastream_map = {datastream.id: datastream for datastream in datastreams} - - loaded_rows = 0 - loaded_mappings = 0 - loaded_days = 0 - mapping_summaries: list[dict[str, Any]] = [] - - for mapping in mappings: - source = datastream_map.get(mapping.source_datastream_id) - target = datastream_map.get(mapping.target_datastream_id) - - if not source or not target: - raise EtlUserFacingError( - "Aggregation source and target datastreams must exist in the task workspace." - ) - - source_end = source.phenomenon_end_time - if not source_end: - logging.info( - "Skipping mapping source=%s target=%s: source has no observations yet.", - source.id, - target.id, - ) - mapping_summaries.append( - { - "sourceDatastreamId": str(source.id), - "targetDatastreamId": str(target.id), - "status": "skipped", - "reason": "Source datastream has no observations.", - "rowsLoaded": 0, - "daysLoaded": 0, - } - ) - continue - - closed_end = closed_window_end_utc(source_end, mapping.transformation) - destination_end = target.phenomenon_end_time + etl_classes = { + "loader_cls": HydroServerInternalLoader + } - source_begin = source.phenomenon_begin_time - if not source_begin: - logging.info( - "Skipping mapping source=%s target=%s: source has no phenomenon_begin_time.", - source.id, - target.id, - ) - mapping_summaries.append( + etl_pipeline, etl_data_mappings, runtime_variables = build_hydroserver_pipeline( + task=task, + data_connection=task.data_connection, + data_mappings=[ { - "sourceDatastreamId": str(source.id), - "targetDatastreamId": str(target.id), - "status": "skipped", - "reason": "Source datastream has no observation history.", - "rowsLoaded": 0, - "daysLoaded": 0, - } - ) - continue + "sourceIdentifier": mapping.source_identifier, + "paths": [ + { + "targetIdentifier": path.target_identifier, + "dataTransformations": path.data_transformations, + } for path in mapping.paths.all() + ] + } for mapping in task.mappings.all() + ], + **etl_classes + ) + except Exception as e: + raise ETLError( + "Encountered an unexpected ETL configuration error. " + "See task logs for additional details." + ) from e - query_start = first_window_start_utc(source_begin, mapping.transformation) - if destination_end is None: - start_window = query_start + try: + context = etl_pipeline.run( + task=task, + data_mappings=etl_data_mappings, + task_instance=task, + raise_on_error=False, + **runtime_variables + ) + except Exception as e: + raise ETLError( + "Encountered an unexpected ETL execution error. " + "See task logs for additional details." + ) from e + + if context.exception: + if isinstance(context.exception, ETLError): + context.exception.result = { + "runtime_variables": context.runtime_variables, + **context.results + } + raise context.exception else: - start_window = next_window_start_utc(destination_end, mapping.transformation) - - if start_window >= closed_end: - logging.info( - "Skipping mapping source=%s target=%s: no new closed daily windows.", - source.id, - target.id, + error = ETLError( + "Encountered an unexpected ETL execution error. " + "See task logs for additional details." ) - mapping_summaries.append( - { - "sourceDatastreamId": str(source.id), - "targetDatastreamId": str(target.id), - "status": "up_to_date", - "reason": "No new closed daily windows.", - "rowsLoaded": 0, - "daysLoaded": 0, - } - ) - continue - - timestamps, values = _fetch_observation_points( - source_datastream_id=source.id, - query_start_utc=query_start, - query_end_utc=closed_end, - ) - if not timestamps: - logging.info( - "Skipping mapping source=%s target=%s: no source observations in query range.", - source.id, - target.id, - ) - mapping_summaries.append( - { - "sourceDatastreamId": str(source.id), - "targetDatastreamId": str(target.id), - "status": "skipped", - "reason": "No source observations available for aggregation.", - "rowsLoaded": 0, - "daysLoaded": 0, - } - ) - continue - - rows: list[list[Any]] = [] - for day_start, day_end, _ in iter_daily_windows_utc( - start_window, - closed_end, - mapping.transformation, - ): - value = aggregate_daily_window( - timestamps=timestamps, - values=values, - window_start_utc=day_start, - window_end_utc=day_end, - statistic=mapping.transformation.aggregation_statistic, - ) - if value is None: - continue - rows.append([day_start, float(value)]) - - if not rows: - mapping_summaries.append( - { - "sourceDatastreamId": str(source.id), - "targetDatastreamId": str(target.id), - "status": "up_to_date", - "reason": "No complete daily windows contained source observations.", - "rowsLoaded": 0, - "daysLoaded": 0, - } - ) - continue + error.results = context.results.dict() + raise error from context.exception - _load_aggregated_rows(task=task, target_datastream_id=target.id, rows=rows) - - loaded_rows += len(rows) - loaded_days += len(rows) - loaded_mappings += 1 - - logging.info( - "Aggregated %s day(s) for mapping source=%s target=%s statistic=%s.", - len(rows), - source.id, - target.id, - mapping.transformation.aggregation_statistic, - ) - mapping_summaries.append( - { - "sourceDatastreamId": str(source.id), - "targetDatastreamId": str(target.id), - "status": "loaded", - "rowsLoaded": len(rows), - "daysLoaded": len(rows), - "statistic": mapping.transformation.aggregation_statistic, - } - ) - - if loaded_rows == 0: - result = _build_task_result( - "No new closed daily windows were available for aggregation.", - context, - stage=context.stage, - ) + if context.results.values_loaded_total == 0: + message = "Already up-to-date. No new observations were loaded." else: - result = _build_task_result( - f"Aggregated {loaded_days} day(s) and loaded {loaded_rows} observation(s) across {loaded_mappings} mapping(s).", - context, - stage=context.stage, + message = ( + f"Loaded {context.results.values_loaded_total} total observation(s) " + f"into {context.results.success_count} datastream(s)." ) - result["aggregation"] = { - "mappingsProcessed": len(mappings), - "mappingsLoaded": loaded_mappings, - "daysLoaded": loaded_days, - "rowsLoaded": loaded_rows, - "mappings": mapping_summaries, + return { + "message": message, + "runtime_variables": context.runtime_variables, + **context.results.dict(), } - return result - - -@shared_task(bind=True, expires=10, name="etl.tasks.run_etl_task") -def run_etl_task(self, task_id: str): - """ - Runs a HydroServer ETL task based on the task configuration provided. - """ - - task_run_id = self.request.id - context = TaskRunContext() - TASK_RUN_CONTEXT[task_run_id] = context - - with capture_task_logs(context): - try: - task = ( - Task.objects.select_related("data_connection", "workspace") - .prefetch_related("mappings", "mappings__paths") - .get(pk=UUID(task_id)) - ) - - context.task_meta = { - "id": str(task.id), - "name": task.name, - "type": task.task_type, - } - if task.data_connection_id: - context.task_meta["data_connection_id"] = str(task.data_connection_id) - context.task_meta["data_connection_name"] = task.data_connection.name - - context.stage = "setup" - if task.task_type == "Aggregation": - logging.info("Starting aggregation task") - return _run_aggregation_task(task, context) - - if not task.data_connection: - raise EtlUserFacingError("ETL tasks require a data connection.") - - extractor_raw = { - "type": task.data_connection.extractor_type, - **(task.data_connection.extractor_settings or {}), - } - transformer_raw = { - "type": task.data_connection.transformer_type, - **(task.data_connection.transformer_settings or {}), - } - - timestamp_cfg = transformer_raw.get("timestamp") or {} - if isinstance(timestamp_cfg, dict): - tz_mode = timestamp_cfg.get("timezoneMode") - tz_value = timestamp_cfg.get("timezone") - if tz_mode == "daylightSavings" and not tz_value: - raise EtlUserFacingError( - "Timezone information is required when daylight savings mode is enabled. " - "Select a valid timezone such as America/Denver and try again." - ) - - extractor_cfg = _validate_component_config( - "extractor", TypeAdapter(ExtractorConfig), extractor_raw - ) - transformer_cfg = _validate_component_config( - "transformer", TypeAdapter(TransformerConfig), transformer_raw - ) - - extractor_cls = extractor_factory(extractor_cfg) - transformer_cls = transformer_factory(transformer_cfg) - loader_cls = HydroServerInternalLoader(task) - - task_mappings = [ - SourceTargetMapping( - source_identifier=task_mapping.source_identifier, - paths=[ - MappingPath( - target_identifier=task_mapping_path.target_identifier, - data_transformations=task_mapping_path.data_transformations, - ) - for task_mapping_path in task_mapping.paths.all() - ], - ) - for task_mapping in task.mappings.all() - ] - - context.stage = "extract" - logging.info("Starting extract") - data = extractor_cls.extract(task, loader_cls) - context.runtime_source_uri = ( - getattr(extractor_cls, "runtime_source_uri", None) - or context.runtime_source_uri - ) - extract_summary = _describe_payload(data) - logging.info("Extractor returned payload: %s", extract_summary) - if _is_empty(data): - if task.data_connection.extractor_type == "HTTP": - return _build_task_result( - "No observations were returned from the source system. " - "Confirm the configured source system has observations available for the requested time range.", - context, - stage=context.stage, - ) - return _build_task_result( - "The extractor returned no observations. Nothing to load.", - context, - stage=context.stage, - ) - - context.stage = "transform" - logging.info("Starting transform") - data = transformer_cls.transform(data, task_mappings) - transform_summary = _describe_transformed_data(data) - logging.info("Transform result: %s", transform_summary) - if isinstance(data, pd.DataFrame) and "timestamp" in data.columns: - bad = data["timestamp"].isna().sum() - if bad: - raise EtlUserFacingError( - "One or more timestamps could not be read using the current format and timezone settings. " - "Confirm how dates appear in the source file and update the transformer configuration to match." - ) - if _is_empty(data): - # hydroserverpy's CSVTransformer returns None on read errors (but logs ERROR). - # Treat that as a failure to avoid misleading "produced no rows" messaging. - last_err = _last_logged_error(context) - if last_err and last_err.startswith("Error reading CSV data:"): - mapped_csv_error = _mapped_csv_error_from_log(last_err) - if mapped_csv_error: - raise EtlUserFacingError(mapped_csv_error) - raise EtlUserFacingError( - f"{last_err}. Check delimiter/headerRow/dataStartRow/identifierType settings " - "and confirm the upstream CSV columns match your task mappings." - ) - return _build_task_result( - "Transform produced no rows. Nothing to load.", - context, - stage=context.stage, - ) - - context.stage = "load" - logging.info("Starting load") - load_summary = loader_cls.load(data, task) - logging.info( - "Load result: loaded=%s available=%s cutoff=%s", - getattr(load_summary, "observations_loaded", None), - getattr(load_summary, "observations_available", None), - getattr(load_summary, "cutoff", None), - ) - - return _build_task_result( - _success_message(load_summary), - context, - stage=context.stage, - ) - except Exception as e: - mapped = user_facing_error_from_exception( - e, transformer_raw=locals().get("transformer_raw") - ) - if mapped: - logging.error("%s", str(mapped)) - if mapped is e: - raise - raise mapped from e - logging.exception("ETL task failed during %s", context.stage) - raise @task_prerun.connect @@ -847,7 +141,9 @@ def update_next_run(sender, task_id, kwargs, **extra): return try: - task = Task.objects.select_related("periodic_task").get(pk=kwargs["task_id"]) + task = Task.objects.select_related("periodic_task").get( + pk=kwargs["task_id"] + ) except Task.DoesNotExist: return @@ -874,27 +170,11 @@ def mark_etl_task_success(sender, result, **extra): if sender != run_etl_task: return - context = TASK_RUN_CONTEXT.pop(sender.request.id, None) - try: task_run = TaskRun.objects.get(id=sender.request.id) except TaskRun.DoesNotExist: return - if not isinstance(result, dict): - result = {"message": str(result)} - - result = _merge_result_with_context(result, context) - if context and context.stage and "stage" not in result: - result["stage"] = context.stage - - transformer_raw = task_transformer_raw(task_run.task) - result = normalize_task_run_result( - status="SUCCESS", - result=result, - transformer_raw=transformer_raw, - ) - task_run.status = "SUCCESS" task_run.finished_at = timezone.now() task_run.result = result @@ -911,42 +191,23 @@ def mark_etl_task_failure(sender, task_id, einfo, exception, **extra): if sender != run_etl_task: return - context = TASK_RUN_CONTEXT.pop(task_id, None) - try: task_run = TaskRun.objects.get(id=task_id) except TaskRun.DoesNotExist: return - stage = context.stage if context else None - mapped = user_facing_error_from_exception(exception) - if mapped: - # User-facing errors are already stage-aware and readable; don't prepend robotic prefixes. - message = str(mapped) - else: - if stage and stage.lower() == "setup": - message = f"Setup failed: {exception}" - else: - message = f"Failed during {stage}: {exception}" if stage else f"{exception}" - task_run.status = "FAILURE" task_run.finished_at = timezone.now() - transformer_raw = task_transformer_raw(task_run.task) - task_run.result = normalize_task_run_result( - status="FAILURE", - result=_build_task_result( - message, - context, - stage=stage, - traceback=einfo.traceback, - ), - transformer_raw=transformer_raw, - ) + task_run.result = { + "error": str(exception), + "traceback": einfo.traceback, + **(getattr(exception, "results", None) or {}), + } task_run.save(update_fields=["status", "finished_at", "result"]) -@shared_task(bind=True, expires=10, name="etl.tasks.cleanup_etl_task_runs") +@shared_task(bind=True, expires=10) def cleanup_etl_task_runs(self, days=14): """ Celery task to run the cleanup_etl_task_runs management command. From 7c841caa934023157e67229c3144397238c6b196 Mon Sep 17 00:00:00 2001 From: Kenneth Lippold Date: Fri, 6 Mar 2026 11:04:41 -0800 Subject: [PATCH 2/3] Update hydroserverpy version to 1.9.0b2 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index d06cc294..20bf769d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ gunicorn==23.0.0 celery==5.5.3 redis==7.0.1 hydroserver-sensorthings==0.4.2 -hydroserverpy==1.9.0b1 +hydroserverpy==1.9.0b2 pandas==2.2.3 orjson==3.10.15 uuid6==2024.7.10 From 3d599459239e8328f6f1e2bc30ed723f158694b2 Mon Sep 17 00:00:00 2001 From: Ken Lippold Date: Fri, 6 Mar 2026 14:29:23 -0800 Subject: [PATCH 3/3] Fixed Celery beat task ref and issue with error reporting. --- ..._task_type_and_nullable_data_connection.py | 5 +++ .../migrations/0005_fix_celery_task_names.py | 35 +++++++++++++++++++ domains/etl/services/task.py | 2 +- domains/etl/tasks.py | 4 ++- hydroserver/settings.py | 4 +-- requirements.txt | 2 +- tests/fixtures/test_etl_tasks.yaml | 2 +- 7 files changed, 48 insertions(+), 6 deletions(-) create mode 100644 domains/etl/migrations/0005_fix_celery_task_names.py diff --git a/domains/etl/migrations/0004_task_task_type_and_nullable_data_connection.py b/domains/etl/migrations/0004_task_task_type_and_nullable_data_connection.py index b36f0111..66d1c0de 100644 --- a/domains/etl/migrations/0004_task_task_type_and_nullable_data_connection.py +++ b/domains/etl/migrations/0004_task_task_type_and_nullable_data_connection.py @@ -25,4 +25,9 @@ class Migration(migrations.Migration): to="etl.dataconnection", ), ), + migrations.AlterField( + model_name='taskmappingpath', + name='data_transformations', + field=models.JSONField(blank=True, default=list), + ), ] diff --git a/domains/etl/migrations/0005_fix_celery_task_names.py b/domains/etl/migrations/0005_fix_celery_task_names.py new file mode 100644 index 00000000..a0076d59 --- /dev/null +++ b/domains/etl/migrations/0005_fix_celery_task_names.py @@ -0,0 +1,35 @@ +# Generated by Django 5.2.2 on 2026-03-06 21:11 + +from django.db import migrations + + +def fix_celery_task_names(apps, schema_editor): + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + task_name_map = { + "etl.tasks.run_etl_task": "domains.etl.tasks.run_etl_task", + "etl.tasks.cleanup_etl_task_runs": "domains.etl.tasks.cleanup_etl_task_runs", + } + for old_name, new_name in task_name_map.items(): + PeriodicTask.objects.filter(task=old_name).update(task=new_name) + + +def reverse_celery_task_names(apps, schema_editor): + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + task_name_map = { + "domains.etl.tasks.run_etl_task": "etl.tasks.run_etl_task", + "domains.etl.tasks.cleanup_etl_task_runs": "etl.tasks.cleanup_etl_task_runs", + } + for old_name, new_name in task_name_map.items(): + PeriodicTask.objects.filter(task=old_name).update(task=new_name) + + +class Migration(migrations.Migration): + + dependencies = [ + ("django_celery_beat", "0019_alter_periodictasks_options"), + ('etl', '0004_task_task_type_and_nullable_data_connection'), + ] + + operations = [ + migrations.RunPython(fix_celery_task_names, reverse_celery_task_names), + ] \ No newline at end of file diff --git a/domains/etl/services/task.py b/domains/etl/services/task.py index 31ea4d0d..3bb61ad9 100644 --- a/domains/etl/services/task.py +++ b/domains/etl/services/task.py @@ -540,7 +540,7 @@ def update_scheduling(task: Task, schedule_data: dict | None = None): if not task.periodic_task: task.periodic_task = PeriodicTask.objects.create( name=f"{task.name} — {task.id}", - task="etl.tasks.run_etl_task", + task="domains.etl.tasks.run_etl_task", kwargs=f'{{"task_id": "{str(task.id)}"}}', enabled=True, date_changed=timezone.now(), diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index 1c1bbc9a..f037e688 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -65,6 +65,8 @@ def run_etl_task(self, task_id: str): ], **etl_classes ) + except ETLError as e: + raise e except Exception as e: raise ETLError( "Encountered an unexpected ETL configuration error. " @@ -89,7 +91,7 @@ def run_etl_task(self, task_id: str): if isinstance(context.exception, ETLError): context.exception.result = { "runtime_variables": context.runtime_variables, - **context.results + **(context.results.dict() if context.results is not None else {}) } raise context.exception else: diff --git a/hydroserver/settings.py b/hydroserver/settings.py index 1e8e3e00..62aba63d 100644 --- a/hydroserver/settings.py +++ b/hydroserver/settings.py @@ -80,9 +80,9 @@ CELERY_BEAT_SCHEDULE = { "cleanup_task_runs": { - "task": "etl.tasks.cleanup_etl_task_runs", + "task": "domains.etl.tasks.cleanup_etl_task_runs", "schedule": crontab(hour=3, minute=0), - "args": (14,), + "args": (7,), }, } diff --git a/requirements.txt b/requirements.txt index 20bf769d..55221b9b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ gunicorn==23.0.0 celery==5.5.3 redis==7.0.1 hydroserver-sensorthings==0.4.2 -hydroserverpy==1.9.0b2 +hydroserverpy==1.9.0b3 pandas==2.2.3 orjson==3.10.15 uuid6==2024.7.10 diff --git a/tests/fixtures/test_etl_tasks.yaml b/tests/fixtures/test_etl_tasks.yaml index 3f423891..bccf14de 100644 --- a/tests/fixtures/test_etl_tasks.yaml +++ b/tests/fixtures/test_etl_tasks.yaml @@ -10,7 +10,7 @@ pk: 1000000 fields: name: Test ETL Data Connection — 019adb4f-856c-745a-a996-708a2b2d33bd - task: etl.tasks.run_etl_task + task: domains.etl.tasks.run_etl_task args: [] kwargs: {"task_id": "019adb4f-856c-745a-a996-708a2b2d33bd"} enabled: False