System design and component overview
┌─────────────────────────────────────────────────────────────┐
│ Applications │
│ (Research, Monitoring, Analysis, ML/AI, Visualization) │
└─────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ env-agents Core │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Request │ │ Response │ │ Semantic │ │
│ │ Spec │ │ Schema │ │ Engine │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ BaseAdapter (Abstract Interface) │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────┬───────────┬────────────┬──────────┬──────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Adapter Implementations │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ NASA │ │ USGS │ │ Earth │ │ GBIF │ │
│ │ POWER │ │ NWIS │ │ Engine │ │ │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼────────────┼─────────────┼──────────────┼──────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ External Data Sources │
│ NASA API USGS Web Services Google EE GBIF API │
└─────────────────────────────────────────────────────────────┘
Purpose: Unified interface for specifying data queries
Location: env_agents/core/models.py
Key Fields:
class RequestSpec:
geometry: Geometry # Where to fetch data
time_range: tuple # When to fetch data
variables: list # What variables (optional)
attributes: dict # Additional service-specific paramsGeometry Types:
point- Single location (lon, lat)bbox- Bounding box (west, south, east, north)polygon- Custom shape (list of coordinates)
Purpose: Standardized output format from all adapters
Core Columns:
Identity:
observation_id- Unique ID for each observationdataset- Source service namesource_url- Data source URLsource_version- API/dataset versionlicense- Data licenseretrieval_timestamp- When fetched
Location:
geometry_type- point, bbox, polygonlatitude,longitude- Point locationgeom_wkt- WKT geometry string
Time:
time- ISO timestamp (YYYY-MM-DD or full datetime)
Measurement:
variable- Parameter name (prefixed by service)value- Numeric measurementunit- Units of measurement
Quality & Metadata:
qc_flag- Quality control statusattributes- Service-specific metadata (JSON)provenance- Processing history
Purpose: Contract that all data adapters must implement
Location: env_agents/adapters/base.py
Required Methods:
class BaseAdapter(ABC):
# Class constants
DATASET: str # Service identifier
SOURCE_URL: str # Base URL
SOURCE_VERSION: str # Version string
LICENSE: str # Data license
# Required methods
@abstractmethod
def capabilities(self) -> Dict:
"""Return service capabilities and available variables"""
@abstractmethod
def _fetch_rows(self, spec: RequestSpec) -> List[Dict]:
"""Fetch data and return list of row dicts"""
# Provided methods
def fetch(self, spec: RequestSpec) -> pd.DataFrame:
"""Public method - calls _fetch_rows and returns DataFrame"""Pattern:
- Subclasses implement
_fetch_rows()with service-specific logic - Base class provides
fetch()wrapper that handles DataFrame conversion capabilities()describes what the service offers
Location: env_agents/adapters/
Current Adapters:
nasa_power.py- NASA POWER climate datausgs_nwis.py- USGS stream gaugesgbif.py- Species observationsopenaq.py- Air quality sensorswater_quality_portal.py- Water quality samplesssurgo.py- Soil survey dataosm_overpass.py- OpenStreetMap featuresearth_engine/production_adapter.py- Google Earth Engine- (and more...)
Common Pattern:
class MyAdapter(BaseAdapter):
DATASET = "MY_SERVICE"
SOURCE_URL = "https://api.example.com"
def capabilities(self):
return {
"variables": [...],
"spatial_coverage": "Global",
...
}
def _fetch_rows(self, spec: RequestSpec):
# 1. Parse geometry and time from spec
# 2. Query upstream API
# 3. Transform to standard schema
# 4. Return list of row dicts
return rowsspec = RequestSpec(
geometry=Geometry(type="point", coordinates=[-122.4, 37.8]),
time_range=("2021-01-01", "2021-12-31")
)adapter = NASAPowerAdapter()
data = adapter.fetch(spec)Internal Steps:
fetch()validates spec- Calls
_fetch_rows(spec) - Adapter queries upstream API
- Transforms response to standard schema
- Returns list of row dicts
fetch()converts to DataFrame- Returns to user
# DataFrame with 20 standard columns
print(data.shape) # (365, 20)
print(data.columns) # ['observation_id', 'dataset', ...]Why: Unified interface to heterogeneous services
Benefits:
- Consistent API across all services
- Easy to add new services
- Swap implementations without changing client code
Why: Analysis-ready data without transformation
Benefits:
- Immediate compatibility with pandas/analysis tools
- Cross-service data fusion
- Consistent metadata structure
Why: Each API has unique characteristics
Per-adapter configuration:
- Rate limiting
- Timeout handling
- Retry strategies
- Caching policies
Core dependencies:
pandas- DataFrame operationsrequests- HTTP clientpydantic- Data validationshapely- Geometry operations
Optional dependencies:
earthengine-api- Only if using Earth Engine- Service-specific libs as needed
Pattern: Adapters + SQLite for persistence
# scripts/acquire_environmental_data.py
for cluster_id in clusters:
spec = create_spec(cluster)
rows = adapter._fetch_rows(spec)
# Store in database
for row in rows:
db.insert(cluster_id, row)Schema:
observationstable with 20 standard columnscluster_idfor spatial grouping- Indexes on cluster_id, dataset, time
Per-service configuration:
SERVICE_CONFIG = {
"NASA_POWER": {"rate_limit": 10.0}, # 10 second wait
"USGS_NWIS": {"rate_limit": 1.0}, # 1 second wait
}Implementation: Sleep after each query to respect API limits
Three-tier strategy:
try:
rows = adapter._fetch_rows(spec)
status = "success"
except ServiceUnavailableError:
status = "retry_later" # Temporary failure
except NoDataError:
status = "no_data" # Expected - location has no data
except Exception as e:
status = "failed" # Unexpected error
log_error(e)-
Create adapter file:
env_agents/adapters/my_service.py -
Implement BaseAdapter:
from .base import BaseAdapter
class MyServiceAdapter(BaseAdapter):
DATASET = "MY_SERVICE"
SOURCE_URL = "https://api.myservice.com"
def capabilities(self):
return {"variables": [...]}
def _fetch_rows(self, spec):
# Query API and return rows
return rows- Add to canonical services (optional):
# adapters/__init__.py
CANONICAL_SERVICES = {
"MY_SERVICE": MyServiceAdapter,
...
}- Write tests:
# tests/test_my_service.py
def test_my_service_adapter():
adapter = MyServiceAdapter()
spec = RequestSpec(...)
data = adapter.fetch(spec)
assert len(data) > 0See EXTENDING_SERVICES.md for full guide.
env_agents/
├── core/
│ ├── models.py # RequestSpec, Geometry
│ ├── config.py # Configuration management
│ └── __init__.py
│
├── adapters/
│ ├── base.py # BaseAdapter abstract class
│ ├── nasa_power.py # NASA POWER adapter
│ ├── usgs_nwis.py # USGS NWIS adapter
│ ├── gbif.py # GBIF adapter
│ ├── earth_engine/
│ │ └── production_adapter.py
│ └── ...
│
└── __init__.py
Test individual adapters:
def test_nasa_power_capabilities():
adapter = NASAPowerAdapter()
caps = adapter.capabilities()
assert "variables" in capsTest with real APIs:
def test_nasa_power_fetch():
adapter = NASAPowerAdapter()
spec = RequestSpec(...)
data = adapter.fetch(spec)
assert len(data) > 0
assert "value" in data.columnsVerify all adapters follow schema:
def test_adapter_contract(adapter_class):
adapter = adapter_class()
data = adapter.fetch(spec)
# Check required columns
required = ["observation_id", "dataset", "time", "variable", "value"]
assert all(col in data.columns for col in required)- Sequential execution required due to quota limits
- Threading-based timeouts for hanging queries
- Rate limiting: 2-5 seconds between queries
- Temporal fallback: Auto-handle missing dates
- Parallel execution possible (independent rate limits)
- Service-specific tuning: Each has optimal rate
- Retry strategies: Exponential backoff for transient errors
- SQLite for local storage (millions of observations)
- Batch inserts for performance
- Indexes on cluster_id, dataset, time
- Async/await patterns for parallel fetching
- Caching layer for repeated queries
- Distributed execution for large-scale workloads
- Streaming responses for huge datasets
- Plugin system for external adapters
- Core schema is stable (20 columns)
- Adapters can add new services without breaking existing code
attributesfield allows service-specific extensions
For implementation details, see: