Data Change Intelligence Agent
Built for the Google Cloud Rapid Agent Hackathon 2026 (Fivetran Track)
🌐 Live Demo: https://atlas-fivetran.streamlit.app/
Data pipelines break constantly because upstream schema changes (like dropping a column or changing a data type) are made without understanding the downstream impact. Data engineers waste hours manually tracing lineage across dbt, Looker, and machine learning platforms, often discovering breakages only after executive dashboards fail.
Atlas is an AI agent powered by Gemini 3 and Fivetran's Model Context Protocol (MCP). It proactively analyzes the impact of proposed schema changes before they happen.
Instead of just answering questions, Atlas takes action:
- It validates the current schema state directly via Fivetran.
- It traces the lineage of the specific column across all downstream assets (dbt, Tableau, Looker, ML features).
- It determines the business criticality of the change.
- It formulates a deprecation plan and drafts custom communications for affected stakeholders.
- Upon user approval, it uses Fivetran to automatically soft-deprecate the column and triggers a verification sync.
Atlas isn't a chatbot; it's an agentic workflow. When given a complex prompt (e.g., "Drop forecast_category from Salesforce and lead_source_legacy from HubSpot"), Gemini 3's advanced reasoning dynamically parallelizes its tasks:
- Verify connection health and schema status via Fivetran MCP (
salesforce,hubspot). - Retrieve lineage maps for both targets.
- Synthesize a combined impact report across multiple data domains.
- Execute multiple
modify_connection_column_configtool calls. - Trigger multiple
sync_connectioncommands to push changes to production.
flowchart TD
User(["👤 User: Drop customer_segment from stripe.customers"])
UI["🖥️ Streamlit UI"]
Gemini{{"🧠 Gemini 3 — reasoning & planning"}}
subgraph Tools["Agent Tools"]
Fivetran["🔌 Fivetran MCP<br/>connections · schema · sync"]
Lineage["🕸️ Lineage Engine<br/>downstream impact · owners"]
Ranker["⚙️ Semantic Ranker<br/>deterministic severity"]
end
Gate{"🛡️ Approval Gate<br/>human in the loop"}
Exec["✅ Execute: soft-deprecate column<br/>+ trigger verification sync"]
Log["📋 Change Log"]
User --> UI --> Gemini
Gemini -->|tool calls| Tools
Tools -->|results| Gemini
Gemini -->|impact report + severity| Gate
Gate -->|reject| UI
Gate -->|approve| Exec --> Fivetran
Exec --> Log --> UI
Atlas talks to Fivetran through a tool layer (fivetran_tools.py) that implements the same tool names and response shapes as the official fivetran/fivetran-mcp server. It exposes six tools:
| Tool | Real Fivetran endpoint |
|---|---|
list_connections |
GET /v1/connections |
get_connection_details |
GET /v1/connections/{id} |
get_connection_state |
GET /v1/connections/{id}/state |
get_connection_schema_config |
GET /v1/connections/{id}/schemas |
modify_connection_column_config |
PATCH /v1/connections/{id}/schemas/{schema}/tables/{table}/columns/{column} |
sync_connection |
POST /v1/connections/{id}/sync |
For the demo these run against a curated in-memory fixture so judges can exercise the full lifecycle without live credentials. Because the interface matches the official MCP server exactly, it's a drop-in replacement: point the tool layer at a real Fivetran account (with API credentials) and Atlas operates on production connections unchanged.
Two layers keep the demo alive even under free-tier API limits:
- Smart model fallback (
gemini_client.py) — every Gemini call routes throughsmart_generate(), which walks an ordered model chain and transparently falls back on429(rate limit),503(overload), or404(unavailable model). The chain ends with high-quota models (gemini-1.5-flash, 1500 RPD) as a safety net. - Demo cache (
demo_cache.py) — the three rehearsed scenarios (dropcustomer_segment, droplead_source_legacy, the not-founddiscount_code) have pre-baked reports. If API quota is fully exhausted, Atlas serves the cached analysis and execution with zero API calls, so the live demo never fails. The severity badge, PII flag, and stakeholder cards are still rebuilt deterministically from the lineage layer, so a cached run renders identically to a live one.
Atlas welcome screen with Fivetran MCP and multi-channel integrations
Impact analysis with CRITICAL severity badge and downstream asset mapping
Stakeholder notifications — send directly to Slack, Telegram, or Email
Real-time Telegram notification delivered by Atlas
Human-in-the-loop approval gate, execution result, and change log
- Gemini 3 - Advanced reasoning, planning, and multi-tool orchestration
- Fivetran MCP Server - Direct integration with Fivetran's configuration API
- Streamlit - Custom glassmorphic UI with dynamic state management
- Python - Core logic and API integration
- Clone the repository
- Install dependencies:
pip install -r requirements.txt - Add your Gemini API key to a
.envfile:GEMINI_API_KEY="your-key" - Run the app:
streamlit run app.py
This project is licensed under the MIT License - see the LICENSE file for details.