diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index a566132..075d1f1 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -12,6 +12,7 @@ "RSAPI_ALTPORT": "TRUE", "RME_DATA_BUCKET": "riverscapes-athena", "RME_ATHENA_OUTPUT_BUCKET": "riverscapes-athena-output", + "AWS_DEFAULT_REGION": "us-west-2", // mod_spatialite installed via apt (libsqlite3-mod-spatialite) "SPATIALITE_LIB": "/usr/lib/x86_64-linux-gnu/mod_spatialite.so", "DATA_ROOT": "/workspaces/data" diff --git a/.vscode/launch.json b/.vscode/launch.json index 858e6ec..49b6e75 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -497,7 +497,7 @@ // }, // This is the new version that pulls aux from Google Postgres { - "name": "Python: huc10_athena", + "name": "Deprecated Python: huc10_athena", "type": "debugpy", "request": "launch", "program": "${workspaceFolder}/pipelines/rscontext_to_athena/huc10_athena.py", @@ -516,6 +516,20 @@ // "--delete" ], }, + { + "name": "🆕 Scrape rs_context to Athena", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/pipelines/rscontext_to_athena/rscontext_to_athena.py", + "console": "integratedTerminal", + "env": { + "PYTHONPATH": "${workspaceFolder}" + }, + "args": [ + "${input:environment}", + "{env:DATA_ROOT}/huc10_athena", + ], + }, { "name": "Add ChaMP Aux Measurements to Topo - SQLite", "type": "debugpy", diff --git a/README.md b/README.md index b0d150c..218b93e 100644 --- a/README.md +++ b/README.md @@ -151,93 +151,89 @@ We've loaded it as a requirement in the toml: `https://github.com/Riverscapes/R Partition hierarchy (three levels): 1. `authority` – repository root name (e.g. `data-exchange-scripts`). Derived automatically from the git repo folder name. -2. `authority_name` – the tool / package authority publishing the layer definitions (from JSON). +2. `tool_schema_name` – the tool / package name publishing the layer definitions (from JSON). 3. `tool_schema_version` – semantic version of the tool's layer definition schema (from JSON). Output pattern: ```text +dist/index.json dist/metadata/ - authority=/authority_name=/tool_schema_version=/layer_metadata.parquet - index.json + authority=/tool_schema_name=/tool_schema_version=/layer_metadata.parquet ``` Default behavior: - Output format: Parquet (use `--format csv` for CSV). -- Partition columns (`authority`, `authority_name`, `tool_schema_version`) are NOT inside the Parquet/CSV files unless `--include-partition-cols` is passed. -- A `commit_sha` (current HEAD) is written into every row and stored again in `index.json` with a run timestamp. -- Schema validation is enforced; any validation error causes a loud failure (non-zero exit code). An `index.json` with `status: validation_failed` and the collected `validation_errors` is still written for diagnostics, but no partition files are produced. +- Partition columns (`authority`, `tool_schema_name`, `tool_schema_version`) are always written inside the Parquet/CSV files (the Athena table is not partitioned). +- The `dist/` directory is cleaned before each run. +- A `commit_sha` (current HEAD) is written into every row and stored again in `dist/index.json` with a run timestamp. +- Schema validation is enforced; any validation error causes a loud failure (non-zero exit code). A `dist/index.json` with `status: validation_failed` and the collected `validation_errors` is still written for diagnostics, but no partition files are produced. Run locally: ```bash -python scripts/metadata/export_layer_definitions_for_s3.py +uv run export-layer-definitions-for-s3 --root . ``` Optional flags: ```bash -python scripts/metadata/export_layer_definitions_for_s3.py --format csv # CSV instead of Parquet -python scripts/metadata/export_layer_definitions_for_s3.py --include-partition-cols # Embed partition columns in each file +uv run export-layer-definitions-for-s3 --root . --format csv # CSV instead of Parquet +uv run export-layer-definitions-for-s3 --root . --dry-run # Validate only, no files written ``` -### Athena External Table +Validate only (no output files): + +```bash +uv run validate-metadata --root . +``` -We publish to: `s3://riverscapes-athena/metadata/layer_definitions/` +### Athena External Table -This gets turned into the athena table default.layer_definitions. +We publish to: `s3://riverscapes-athena/riverscapes_metadata/layer_definitions_raw/X.X/` where XX is riverscapes metadata schema version number. -Add new partitions (after upload): +Although the folder is structure is set up as if for partitions, the Athena table is not partitioned. -```sql --- auto-discover -MSCK REPAIR TABLE layer_definitions; --- OR manual: -ALTER TABLE layer_definitions -ADD IF NOT EXISTS PARTITION ( - authority='data-exchange-scripts', - authority_name='rme_to_athena', - tool_schema_version='1.0.0' -) -LOCATION 's3://riverscapes-athena/metadata/layer_definitions/authority=data-exchange-scripts/authority_name=rme_to_athena/tool_schema_version=1.0.0/'; -``` +This gets turned into the athena table default.layer_definitions and the view `layer_definitions_latest`. ### GitHub Actions Workflow Workflow file: `.github/workflows/metadata-catalog.yml` -Steps performed: +The workflow has two jobs: + +**`validate`** (runs on all branches and PRs): +1. Checkout code. +2. Install Python 3.12 + `uv sync`. +3. Run `export-layer-definitions-for-s3 --root .` → validates and produces partitioned Parquet under `dist/`. +4. Uploads `dist/` as a build artifact. +**`build-and-publish`** (runs on `main` branch only, after `validate`): 1. Checkout code. -2. Assume AWS IAM role via OIDC (secret `METADATA_PUBLISH_ROLE_ARN`). -3. Install dependencies (Python 3.12 + `uv sync`). -4. Run flatten script -> partitioned Parquet. -5. Sync `dist/metadata` to S3 bucket prefix. -6. Run `MSCK REPAIR TABLE` to load partitions. -7. Perform sample queries (partition listing / row count). +2. Configure AWS credentials via OIDC (secret `METADATA_PUBLISH_ROLE_ARN`). +3. Install Python 3.12 + `uv sync`. +4. Download the `dist/` artifact from the `validate` job. +5. Run `publish-metadata-to-s3 --root .` → uploads Parquet files to S3 and runs a row-count verification query on Athena. ### IAM Role (Least Privilege Summary) The role must allow: -- S3 List/Get/Put/Delete under `metadata/layer_definitions/` and query result prefix. +- S3 List/Get/Put/Delete under `riverscapes_metadata/layer_definitions_raw/` and query result prefix. - Athena: StartQueryExecution, GetQueryExecution, GetQueryResults. - Glue: Get/Create/Update table & partitions for the database/table. ### Future Enhancements - Validate layer schemas (dtype whitelist, required fields & semantic checks). -- Explicit partition adds instead of MSCK for faster updates. - Historical snapshots (extra partition like `snapshot_date`). -- Glue Catalog integration (automated table & partition registration without MSCK). - Data quality profile summary (row counts, distinct key coverage) in `index.json`. ## Troubleshooting Metadata | Symptom | Likely Cause | Fix | |---------|--------------|-----| -| Empty Athena table | Partitions not loaded | Run `MSCK REPAIR TABLE` or add partitions manually | | Wrong data types | Created table before column rename | Drop & recreate external table with new DDL | | Missing new version | Workflow didn’t run or lacked perms | Check Actions logs & IAM role policies | | Zero rows for authority | Upload sync failed | Inspect S3 prefix & re-run workflow | diff --git a/pipelines/rme_to_athena/rme_to_athena_parquet.py b/pipelines/rme_to_athena/rme_to_athena_parquet.py index cafc63c..6df887b 100644 --- a/pipelines/rme_to_athena/rme_to_athena_parquet.py +++ b/pipelines/rme_to_athena/rme_to_athena_parquet.py @@ -38,6 +38,7 @@ DATA_BUCKET_ENV_VAR = "RME_DATA_BUCKET" OUTPUT_BUCKET_ENV_VAR = "RME_ATHENA_OUTPUT_BUCKET" DEFAULT_DATA_BUCKET = "riverscapes-athena" +BASE_S3_KEY = "data_exchange/rs_metric_engine2" DATA_BUCKET = os.getenv(DATA_BUCKET_ENV_VAR, DEFAULT_DATA_BUCKET) ATHENA_OUTPUT_BUCKET = os.getenv(OUTPUT_BUCKET_ENV_VAR, DATA_BUCKET) # fallback to data bucket if not set @@ -57,7 +58,7 @@ huc_projects_scraped as (select substr(huc12, 1, 10) as huc10, raw_rme_pq2.rme_date_created_ts - from raw_rme_pq2) + from rs_raw.raw_rs_metric_engine2 raw_rme_pq2) select distinct project_id, huc, created_on, rme_date_created_ts from huc_projects_dex dex left join huc_projects_scraped scr on dex.huc = scr.huc10 @@ -314,7 +315,7 @@ def scrape_rme( rme_pq_filepath = huc_dir / f'rme_{project.huc}.parquet' data_gdf.to_parquet(rme_pq_filepath) # do not use os.path.join because this is aws os, not system os - s3_key = f'data_exchange/riverscape_metrics/{rme_pq_filepath.name}' + s3_key = f'{BASE_S3_KEY}/{rme_pq_filepath.name}' upload_to_s3(rme_pq_filepath, data_bucket, s3_key) if delete_downloads_when_done: @@ -323,7 +324,7 @@ def scrape_rme( prg.update(count) except Exception as e: log.error(f'Error scraping HUC {project.huc}: {e}') - raise + # raise prg.finish() diff --git a/pipelines/rscontext_to_athena/huc10_athena.py b/pipelines/rscontext_to_athena/huc10_athena.py index d4738f6..f1e837a 100644 --- a/pipelines/rscontext_to_athena/huc10_athena.py +++ b/pipelines/rscontext_to_athena/huc10_athena.py @@ -1,5 +1,6 @@ """ Scrape HUC10 information and load it to Athena. +DEPRECATED - USE rscontext_to_athena instead. Philip Bailey 5 Oct 2025 diff --git a/pipelines/rscontext_to_athena/rscontext_to_athena.py b/pipelines/rscontext_to_athena/rscontext_to_athena.py new file mode 100644 index 0000000..55a2eed --- /dev/null +++ b/pipelines/rscontext_to_athena/rscontext_to_athena.py @@ -0,0 +1,205 @@ +"""Scrape rs_context projects (HUC10) from Data Exchange and load the data S3 for Athena +This version queries the Athena index of Data Exchange projects instead of using graphql API + +Downloads specific files and uses geo to bin rasters. +Requires geo extras +`uv sync --extra geo` + +Lorin 2026-March-23 + +""" + +import argparse +import json +import logging +import shutil +import sys +import traceback +from pathlib import Path, PurePosixPath + +import boto3 +from rsxml import Logger, ProgressBar, dotenv +from rsxml.util import safe_makedirs + +from pydex import RiverscapesAPI, RiverscapesProject +from pydex.lib.athena import query_to_dataframe +from pydex.lib.raster import Raster + +# RegEx for finding DEM, Vegetation and Metrics files +REGEXES = {"DEM_REGEX": r'.*\/dem\.tif$', "METRICS_REGEX": r'.*rscontext_metrics\.json$', "VEG_REGEX": r'.*\/existing_veg\.tif$'} +S3_BUCKET = 'riverscapes-athena' +S3_BASE_PATH = 'data_exchange/rs-context' + +# Number of decimal places to truncate floats +FLOAT_DEC_PLACES = 4 + +MAJOR = 1000000 +MINOR = 1000 + +missing_projects_query = """ +-- rscontext projects that we have not scraped into Athena (missing or newer versions of HUCs previously loaded) +select p.project_id AS project_id, + p.huc, + p.created_on, + s.huc AS scraped_huc, + s.project_id AS scraped_project_id, + sp.created_on AS scraped_project_created_on +from conus_projects p + left join default.rs_context_huc10 s on p.huc = s.huc + left join conus_projects sp on s.project_id = sp.project_id +where p.project_type_id = 'rscontext' + and (s.huc is null or sp.created_on < p.created_on) +""" + + +def join_s3_key(*parts: str) -> str: + """Build an S3 key with forward slashes regardless of OS.""" + return str(PurePosixPath(*parts)) + + +def scrape_rscontext_project(s3, rs_api: RiverscapesAPI, project: RiverscapesProject, download_dir: Path, skip_overwrite: bool): + """Scrape (download, transform, upload) a single project""" + DOWNLOAD_RETRIES = 3 + log = Logger("Scrape RSContext project") + # S3 key for upload + s3_key = join_s3_key(S3_BASE_PATH, f'{project.huc}.json') + if project.huc is None or project.huc == '': + log.warning(f'Project {project.id} does not have a HUC. Skipping.') + return + + if project.model_version is None: + log.warning(f'Project {project.id} does not have a model version. Skipping.') + return + + try: + if skip_overwrite is True: + try: + # head is the cheapest way to check if a file exists on S3 + s3.head_object(Bucket=S3_BUCKET, Key=s3_key) + log.info(f'File s3://{S3_BUCKET}/{s3_key} already exists. Skipping project {project.id}.') + return + except s3.exceptions.ClientError as e: + if e.response['Error']['Code'] == '404': + pass + else: + raise e + + # download the files we need + huc_dir = download_dir / str(project.huc) + safe_makedirs(str(huc_dir)) + + retry = 0 + complete = False + while retry < DOWNLOAD_RETRIES and complete is False: + try: + rs_api.download_files(project_id=project.id, download_dir=str(huc_dir), re_filter=list(REGEXES.values())) + complete = True + break + except Exception as e: + log.error(f'Error downloading files for project {project.id}: {e}') + traceback.print_exc(file=sys.stdout) + retry += 1 + continue + + dem_tif_path = huc_dir / 'topography' / 'dem.tif' + if not dem_tif_path.exists(): + raise FileNotFoundError(f'Could not find DEM file for project {project.id}') + veg_tif_path = huc_dir / 'vegetation' / 'existing_veg.tif' + if not veg_tif_path.exists(): + raise FileNotFoundError(f'Could not find vegetation file for project {project.id}') + metrics_json_path = huc_dir / 'rscontext_metrics.json' + try: + metrics = json.loads(open(metrics_json_path, 'r', encoding='utf-8').read()) + except Exception as e: + log.warning(f'Could not find or read metrics JSON for project {project.id}: {e}') + metrics = {} + + huc10_json_path = huc_dir / f'huc10_{project.huc}.json' + dem_raster = Raster(str(dem_tif_path)) + dem_bins = dem_raster.bin_raster(100) + veg_raster = Raster(str(veg_tif_path)) + veg_bins = veg_raster.bin_raster_categorical() + if 'rs_context' not in metrics: + metrics['rs_context'] = {} + metrics['rs_context']['dem_bins'] = dem_bins + metrics['rs_context']['existing_veg_bins'] = veg_bins + + # Add the project ID to the metrics so we can trace this back to its source + metrics['rs_context']['project_id'] = project.id + metrics['rs_context']['model_version'] = str(project.model_version) + + log.info(f'Writing HUC10 metrics to {huc10_json_path}') + # Write the JSON back to `huc10_{huc}.json` (just for debugging purposes really) + with open(huc10_json_path, 'w', encoding='utf-8') as f: + json.dump(metrics, f, indent=2) + + # Now use boto3 to upload the file to S3 + log.info(f'Uploading metrics to s3://{S3_BUCKET}/{s3_key}') + + s3.put_object(Bucket=S3_BUCKET, Key=s3_key, Body=json.dumps(metrics['rs_context'])) + + except Exception as e: + log.error(f'Error scraping HUC {project.huc}: {e}') + traceback.print_exc(file=sys.stdout) + + +def scrape_rsprojects(rs_api: RiverscapesAPI, download_dir: Path, delete_downloads: bool, skip_overwrite: bool): + """Scrape all projects matching criteria""" + log = Logger('Scrape RSContext') + projects_to_add_df = query_to_dataframe(missing_projects_query, 'identify new projects') + if projects_to_add_df.empty: + log.info("Query to identify projects to scrape returned no results.") + return + count = 0 + prg = ProgressBar(projects_to_add_df.shape[0], text="Scrape Progress") + s3 = boto3.client('s3') + for project_id in projects_to_add_df['project_id']: + project = rs_api.get_project_full(project_id) + if project.huc is None or project.huc == '': + log.warning(f'Project {project.id} does not have a HUC. Skipping.') + continue + scrape_rscontext_project(s3, rs_api, project, download_dir, skip_overwrite) + count += 1 + prg.update(count) + + if delete_downloads is True and download_dir.is_dir(): + try: + log.info(f'Deleting download directory {download_dir}') + shutil.rmtree(download_dir) + except Exception as e: + log.error(f'Error deleting download directory {download_dir}: {e}') + + +def main(): + """ + Parse arguments and call function to run the scrape + """ + parser = argparse.ArgumentParser() + parser.add_argument('stage', help='Environment: staging or production', type=str) + parser.add_argument('working_folder', help='top level folder for downloads and output', type=str) + parser.add_argument('--delete', help='Delete downloaded files after processing', action='store_true', default=False) + parser.add_argument('--skip-overwrite', help='Whether or not to skip overwriting existing S3 files', action='store_true', default=False) + + args = dotenv.parse_args_env(parser) + + # Set up some reasonable folders to store things + working_folder = Path(args.working_folder) + download_folder = working_folder / 'downloads' + safe_makedirs(str(working_folder)) + + log = Logger('Setup') + log.setup(log_path=working_folder / 'rscontext_to_athena.log', log_level=logging.DEBUG) + try: + with RiverscapesAPI(stage=args.stage) as rs_api: + scrape_rsprojects(rs_api, download_folder, args.delete, args.skip_overwrite) + + except Exception as e: + log.error(e) + traceback.print_exc(file=sys.stdout) + sys.exit(1) + + log.info('Process complete') + + +if __name__ == '__main__': + main() diff --git a/thirdpartydata/climate_engine/layer_definitions.json b/thirdpartydata/climate_engine/layer_definitions.json new file mode 100644 index 0000000..0b567f1 --- /dev/null +++ b/thirdpartydata/climate_engine/layer_definitions.json @@ -0,0 +1,62 @@ +{ + "$schema": "https://xml.riverscapes.net/riverscapes_metadata/schema/layer_definitions.schema.json", + "tool_schema_name": "climate_engine_api", + "tool_schema_version": "1.0.0", + "layers": [ + { + "layer_id": "RAP_COVER", + "layer_name": "RAP Cover - 30m - Yearly", + "description": "Rangeland Analysis Platform data provided via Climate Engine API. Columns defined here represent Variables that can be returned via the API. The return labels include units, unlike the request labels.", + "source_url": "https://support.climateengine.org/article/81-rap", + "source_title": "Climate Engine - RAP 30m Annual Vegetation Cover and Production", + "theme": "vegetation", + "columns": [ + { + "name": "Date" + }, + { + "name": "AFG", + "friendly_name": "Annual Forb and Grass Cover", + "dtype": "FLOAT", + "data_unit": "%", + "description": "Fractional cover of annual forbs and grasses." + }, + { + "name": "PFG", + "friendly_name": "Perennial Forb and Grass Cover", + "dtype": "FLOAT", + "data_unit": "%", + "description": "Fractional cover of perennial forbs and grasses." + }, + { + "name": "SHR", + "friendly_name": "Shrub Cover", + "dtype": "FLOAT", + "data_unit": "%", + "description": "Fractional cover of shrubs." + }, + { + "name": "TRE", + "friendly_name": "Tree Cover", + "dtype": "FLOAT", + "data_unit": "%", + "description": "Fractional cover of trees." + }, + { + "name": "BGR", + "friendly_name": "Bare Ground Cover", + "dtype": "FLOAT", + "data_unit": "%", + "description": "Fractional cover of bare ground." + }, + { + "name": "LTR", + "friendly_name": "Litter Cover", + "dtype": "FLOAT", + "data_unit": "%", + "description": "Fractional cover of litter." + } + ] + } + ] +} \ No newline at end of file