-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
1873 lines (1641 loc) · 78.4 KB
/
app.py
File metadata and controls
1873 lines (1641 loc) · 78.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
app.py - Atlas Web Interface
A Streamlit app that wraps the Atlas agent in a clean UI.
Judges see: request input -> live analysis -> approval button -> execution -> change log.
Run: streamlit run app.py
"""
import streamlit as st
import os
import json
import csv
import io
import requests as http_requests
from datetime import datetime
from pathlib import Path
from urllib.parse import quote
from google import genai
from google.genai import types
from dotenv import load_dotenv
from lineage import summarize_impact, load_default, load_graph, calculate_semantic_risk
from gemini_client import smart_generate
from demo_cache import check_analysis_cache, check_execution_cache, check_inference_cache
from lineage_viz import build_lineage_graph
from db_scanner import scan_sqlite, build_discovery_report
from lineage_inference import (
infer_lineage_from_schema,
validate_inferred_lineage,
TEAM_DIRECTORY,
CRITICALITY_LEVELS,
)
from fivetran_tools import (
list_connections,
get_connection_details,
get_connection_state,
get_connection_schema_config,
modify_connection_column_config,
rollback_column_config,
sync_connection,
get_change_log,
)
# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------
load_dotenv()
# Support Streamlit Community Cloud secrets AND local .env
def _get_api_key() -> str:
# 1. Streamlit secrets (used when deployed to Streamlit Community Cloud)
try:
key = st.secrets.get("GEMINI_API_KEY", "")
if key:
return key
except Exception:
pass
# 2. Environment variable / .env file (used locally)
return os.getenv("GEMINI_API_KEY", "")
API_KEY = _get_api_key()
if not API_KEY:
st.error("GEMINI_API_KEY is not set. Add it to .env (local) or Streamlit secrets (cloud).")
st.stop()
client = genai.Client(api_key=API_KEY)
# ---------------------------------------------------------------------------
# Tool declarations (same as atlas.py)
# ---------------------------------------------------------------------------
ANALYSIS_TOOL_DECLS = [
{
"name": "list_connections",
"description": "List all Fivetran connections in the account. Use first to find the connection_id.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
{
"name": "get_connection_details",
"description": "Get sync status, schedule, and health for a Fivetran connection.",
"parameters": {
"type": "object",
"properties": {"connection_id": {"type": "string", "description": "The connection ID."}},
"required": ["connection_id"],
},
},
{
"name": "get_connection_state",
"description": "Get current sync state for a connection.",
"parameters": {
"type": "object",
"properties": {"connection_id": {"type": "string", "description": "The connection ID."}},
"required": ["connection_id"],
},
},
{
"name": "get_connection_schema_config",
"description": "Get schema config - which tables and columns are synced. Use to confirm a column exists.",
"parameters": {
"type": "object",
"properties": {"connection_id": {"type": "string", "description": "The connection ID."}},
"required": ["connection_id"],
},
},
{
"name": "summarize_impact",
"description": "Look up downstream impact of changing a column. Returns dashboards, models, reports, owners, and deprecation policy.",
"parameters": {
"type": "object",
"properties": {
"table": {"type": "string", "description": "Table name, e.g. 'stripe.customers'."},
"column": {"type": "string", "description": "Column name, e.g. 'customer_segment'."},
},
"required": ["table", "column"],
},
},
]
EXECUTION_TOOL_DECLS = [
{
"name": "modify_connection_column_config",
"description": "Soft-deprecate a column by setting enabled=false.",
"parameters": {
"type": "object",
"properties": {
"connection_id": {"type": "string"},
"schema_name": {"type": "string"},
"table_name": {"type": "string"},
"column_name": {"type": "string"},
"enabled": {"type": "boolean"},
},
"required": ["connection_id", "schema_name", "table_name", "column_name", "enabled"],
},
},
{
"name": "sync_connection",
"description": "Trigger a verification sync after changes.",
"parameters": {
"type": "object",
"properties": {"connection_id": {"type": "string"}},
"required": ["connection_id"],
},
},
]
ALL_TOOL_FUNCTIONS = {
"list_connections": lambda **kw: list_connections(),
"get_connection_details": lambda **kw: get_connection_details(**kw),
"get_connection_state": lambda **kw: get_connection_state(**kw),
"get_connection_schema_config": lambda **kw: get_connection_schema_config(**kw),
"summarize_impact": lambda **kw: summarize_impact(**kw),
"modify_connection_column_config": lambda **kw: modify_connection_column_config(**kw),
"sync_connection": lambda **kw: sync_connection(**kw),
}
# ---------------------------------------------------------------------------
# System prompts
# ---------------------------------------------------------------------------
ANALYSIS_PROMPT = """
You are Atlas, a data change intelligence agent built by the coded-devs team.
You help data engineers and data platform teams safely manage schema changes by analyzing downstream impact across Fivetran pipelines.
## Behaviour Rules
### Rule 1 — Conversational mode (default)
If the user's message is a general question, greeting, or anything that does NOT clearly specify a table and column to change, respond conversationally WITHOUT calling any tools.
- Answer questions about what you can do, how you work, what Fivetran is, etc.
- If the user seems to want a change but hasn't given you a table and column, ask a follow-up question to get the specifics.
- Examples of conversational messages: "what can you do?", "how does this work?", "what is Fivetran?", "tell me about data lineage", "hi", "what tables do you support?"
### Rule 2 — Analysis mode (only when you have a clear target)
If the user clearly specifies a table + column they want to drop, deprecate, remove, or evaluate, THEN call tools in this exact order:
1. `list_connections` — find the relevant connector.
2. `get_connection_schema_config` — confirm the column exists in Fivetran.
→ IF NOT FOUND: Stop. Tell the user politely the column/table was not found.
3. `get_connection_details` — check connector health.
4. `summarize_impact` — get downstream dependencies.
Then write a structured report with these sections:
## Connection Info
One line: connector name, service, status, last sync.
## Column Status
Confirm the column exists and is currently synced.
## Impact Summary
One paragraph: what breaks, how many assets are affected, what is the highest criticality tier.
## Affected Assets
Bullet list: **name** (type) — owned by [lead], team: [team], tier: [tier].
If none, write: "No downstream dependencies found — this column is safe to drop immediately."
## Recommended Deprecation Plan
Numbered steps with day offsets based on the highest tier asset found.
If no dependencies, simply say: "Safe for immediate removal."
## Stakeholder Messages
For each unique team affected, a 3-5 sentence Slack message.
Technical language for engineering teams, plain business language for exec/finance teams.
Skip this section entirely if there are no downstream dependencies.
Be direct, clear, and professional. No filler text.
"""
EXECUTION_PROMPT = """
You are Atlas, executing an approved plan.
1. Call modify_connection_column_config with enabled=false.
2. Call sync_connection to verify.
3. Confirm what was done in 3-4 lines. Be brief.
"""
# ---------------------------------------------------------------------------
# Agent loop (adapted for Streamlit)
# ---------------------------------------------------------------------------
def run_agent(contents, tool_decls, system_prompt, status_container):
"""Run the agent loop, updating a Streamlit status container with progress."""
tools = types.Tool(function_declarations=tool_decls)
config = types.GenerateContentConfig(
system_instruction=system_prompt,
tools=[tools],
)
tool_log = []
final_text = ""
for step in range(1, 9):
try:
response = smart_generate(
client,
contents,
config,
on_status=lambda m: status_container.write(m),
)
except Exception as e:
return f"API Error: {e}", tool_log
if not response.candidates:
return "Error: No response from Gemini.", tool_log
candidate = response.candidates[0]
parts = candidate.content.parts or []
text_parts = [getattr(p, "text", None) for p in parts]
text_parts = [t for t in text_parts if t]
fc_list = [getattr(p, "function_call", None) for p in parts]
fc_list = [fc for fc in fc_list if fc]
if not fc_list:
final_text = "\n".join(text_parts).strip()
break
contents.append(candidate.content)
for fc in fc_list:
name = fc.name
args = dict(fc.args) if fc.args else {}
status_container.write(f"Calling `{name}({json.dumps(args)})`")
tool_log.append({"tool": name, "args": args})
func = ALL_TOOL_FUNCTIONS.get(name)
if not func:
result = {"error": f"Unknown tool: {name}"}
else:
try:
result = func(**args)
except Exception as e:
result = {"error": str(e)}
contents.append(types.Content(
role="user",
parts=[types.Part.from_function_response(
name=name,
response={"result": result},
)],
))
if text_parts:
for t in text_parts:
status_container.write(t)
return final_text, tool_log
# ---------------------------------------------------------------------------
# Custom data source loading
# ---------------------------------------------------------------------------
# Standard deprecation policy tiers, reused when a custom upload doesn't
# define its own criticality_levels (CSV uploads never do).
_DEFAULT_CRITICALITY_LEVELS = {
"tier_1": {"description": "Business-critical. Used by execs or revenue-impacting systems. Requires 2-week deprecation notice minimum.", "deprecation_notice_days": 14},
"tier_2": {"description": "Important but recoverable. Team-level analytics. Requires 1-week notice.", "deprecation_notice_days": 7},
"tier_3": {"description": "Internal exploration. Minimal notice required.", "deprecation_notice_days": 2},
}
_TIER_PRIORITY = {"tier_1": 1, "tier_2": 2, "tier_3": 3}
CSV_COLUMNS = [
"table", "column", "description", "is_pii",
"downstream_name", "downstream_type", "downstream_tool",
"owner_team", "owner_lead", "owner_email", "owner_slack", "criticality",
]
def _to_bool(value: str) -> bool:
return str(value).strip().lower() in ("true", "1", "yes", "y", "t")
def build_graph_from_csv(text: str) -> dict:
"""
Turn a flat CSV (one row per downstream dependency) into the nested
lineage dict structure that lineage.json uses.
Expected columns: see CSV_COLUMNS. Rows are grouped by table + column;
each row contributes one downstream asset and (optionally) one owner.
"""
reader = csv.DictReader(io.StringIO(text))
missing = [c for c in CSV_COLUMNS if c not in (reader.fieldnames or [])]
if missing:
raise ValueError(f"CSV is missing required columns: {', '.join(missing)}")
tables: dict = {}
owners: dict = {}
for row in reader:
table = (row.get("table") or "").strip()
column = (row.get("column") or "").strip()
if not table or not column:
continue
table_entry = tables.setdefault(
table, {"criticality": None, "team_owner": "data-platform", "columns": {}}
)
col_entry = table_entry["columns"].setdefault(
column,
{
"description": (row.get("description") or "").strip(),
"is_pii": _to_bool(row.get("is_pii")),
"downstream": [],
},
)
downstream_name = (row.get("downstream_name") or "").strip()
if downstream_name:
asset = {
"type": (row.get("downstream_type") or "").strip() or "unknown",
"name": downstream_name,
"owner": (row.get("owner_team") or "").strip(),
"criticality": (row.get("criticality") or "").strip() or "tier_3",
}
tool = (row.get("downstream_tool") or "").strip()
if tool:
asset["tool"] = tool
col_entry["downstream"].append(asset)
team = (row.get("owner_team") or "").strip()
if team and team not in owners:
owners[team] = {
"slack": (row.get("owner_slack") or "").strip(),
"email": (row.get("owner_email") or "").strip(),
"lead": (row.get("owner_lead") or "").strip(),
}
# Derive each table's criticality from the strictest tier among its assets.
for table_entry in tables.values():
tiers = [
a["criticality"]
for col in table_entry["columns"].values()
for a in col["downstream"]
]
if tiers:
table_entry["criticality"] = min(
tiers, key=lambda t: _TIER_PRIORITY.get(t, 99)
)
else:
table_entry["criticality"] = "tier_3"
return {
"tables": tables,
"owners": owners,
"criticality_levels": _DEFAULT_CRITICALITY_LEVELS,
}
def _render_db_scan(scan: dict, source_label: str):
"""Show the discovery report + success banner for a completed scan."""
if "error" in scan:
st.error(f"Could not scan database: {scan['error']}")
return
st.success(
f"Database scanned! {scan['table_count']} tables, "
f"{scan['column_count']} columns found."
)
with st.expander("🗄️ Discovered Schema", expanded=True):
st.caption(f"Source: {source_label}")
st.markdown(build_discovery_report(scan))
for table, info in scan["tables"].items():
st.markdown(f"**`{table}`**")
rows = []
for col in info["columns"]:
flags = []
if col["primary_key"]:
flags.append("PK")
if col["notnull"]:
flags.append("NOT NULL")
rows.append(
f"- `{col['name']}` · {col['type'] or 'ANY'}"
+ (f" · {', '.join(flags)}" if flags else "")
)
st.markdown("\n".join(rows))
for fk in info["foreign_keys"]:
st.caption(
f"↳ FK: `{fk['from_column']}` → "
f"`{fk['to_table']}.{fk['to_column']}`"
)
def lineage_to_rows(lineage: dict) -> list:
"""Flatten a lineage dict into one editable row per downstream asset.
Columns with no downstream assets still get a single row (with blank asset
fields) so they survive an edit/rebuild round-trip.
"""
rows = []
for table, info in lineage.get("tables", {}).items():
for column, col in info.get("columns", {}).items():
downstream = col.get("downstream", [])
if not downstream:
rows.append({
"table": table, "column": column,
"description": col.get("description", ""),
"is_pii": bool(col.get("is_pii", False)),
"asset_type": "", "asset_name": "",
"owner": "", "criticality": "",
})
continue
for asset in downstream:
rows.append({
"table": table, "column": column,
"description": col.get("description", ""),
"is_pii": bool(col.get("is_pii", False)),
"asset_type": asset.get("type", ""),
"asset_name": asset.get("name", ""),
"owner": asset.get("owner", ""),
"criticality": asset.get("criticality", ""),
})
return rows
def rows_to_lineage(rows: list) -> dict:
"""Rebuild a lineage dict from edited rows (inverse of lineage_to_rows).
Unknown owners/tiers/types are coerced to safe defaults so the result
always validates. Owners and criticality_levels are synthesized from the
fixed directory.
"""
tables = {}
used_teams = set()
tier_priority = {"tier_1": 1, "tier_2": 2, "tier_3": 3}
for r in rows:
table = str(r.get("table") or "").strip()
column = str(r.get("column") or "").strip()
if not table or not column:
continue
t = tables.setdefault(table, {"criticality": "tier_3", "team_owner": "data-platform", "columns": {}})
c = t["columns"].setdefault(column, {
"description": str(r.get("description") or "").strip(),
"is_pii": bool(r.get("is_pii")),
"downstream": [],
})
name = str(r.get("asset_name") or "").strip()
if name:
owner = str(r.get("owner") or "analytics").strip()
if owner not in TEAM_DIRECTORY:
owner = "analytics"
tier = str(r.get("criticality") or "tier_3").strip()
if tier not in tier_priority:
tier = "tier_3"
atype = str(r.get("asset_type") or "dbt_model").strip() or "dbt_model"
c["downstream"].append({"type": atype, "name": name, "owner": owner, "criticality": tier})
used_teams.add(owner)
for t in tables.values():
tiers = [a["criticality"] for col in t["columns"].values() for a in col["downstream"]]
t["criticality"] = min(tiers, key=lambda x: tier_priority.get(x, 99)) if tiers else "tier_3"
return {
"tables": tables,
"owners": {team: TEAM_DIRECTORY[team] for team in sorted(used_teams)},
"criticality_levels": CRITICALITY_LEVELS,
}
def run_inference(scan: dict, feedback: str = ""):
"""Run lineage inference, preferring the demo cache to save API quota.
Stores the result in session state and flips the pending-review flag.
"""
cached = check_inference_cache(scan)
if cached:
st.session_state.inferred_lineage = cached
st.session_state.inferred_pending = True
st.session_state.inference_error = None
return
if scan.get("table_count", 0) > 50:
st.warning(
f"Large schema ({scan['table_count']} tables) — inference may be "
"slow and incomplete. Consider scanning a subset."
)
with st.spinner("Atlas is inferring downstream dependencies..."):
result = infer_lineage_from_schema(scan, client, feedback=feedback)
if "error" in result:
st.session_state.inference_error = result["error"]
st.session_state.inferred_lineage = None
st.session_state.inferred_pending = False
else:
st.session_state.inferred_lineage = result
st.session_state.inferred_pending = True
st.session_state.inference_error = None
def configure_database_source():
"""
Render the SQLite upload / demo-database controls in the sidebar.
Discovery scans the database; AI inference (triggered from here, reviewed
in the main area) turns the schema into a lineage graph. Until the user
accepts an inferred graph, the agent runs against the bundled demo lineage
so the app stays usable.
"""
# Once a user accepts an AI-inferred graph, keep using it across reruns
# instead of resetting to the demo lineage.
if st.session_state.get("active_inferred_lineage"):
load_graph(st.session_state.active_inferred_lineage)
else:
load_default()
uploaded = st.file_uploader(
"Upload a SQLite database", type=["db", "sqlite"]
)
demo_db_path = Path(__file__).parent / "demo_warehouse.db"
use_demo = st.button(
"Or use demo database", use_container_width=True,
disabled=not demo_db_path.exists(),
)
if uploaded is not None:
scan = scan_sqlite(uploaded.getvalue())
st.session_state.db_scan = scan
# A new upload invalidates any prior inference.
st.session_state.inferred_pending = False
st.session_state.inferred_lineage = None
elif use_demo:
scan = scan_sqlite(demo_db_path.read_bytes())
st.session_state.db_scan = scan
st.session_state.inferred_pending = False
st.session_state.inferred_lineage = None
else:
scan = st.session_state.get("db_scan")
if not scan:
st.info("Upload a SQLite file or use the demo database to discover its schema.")
return
_render_db_scan(scan, uploaded.name if uploaded is not None else "demo_warehouse.db")
if "error" in scan:
return
# --- AI lineage inference trigger ---
if st.button("✨ Auto-Discover Lineage with AI", type="primary", use_container_width=True):
run_inference(scan)
st.rerun()
if st.session_state.get("inference_error"):
st.error(f"Inference failed: {st.session_state.inference_error}")
if st.button("🔁 Retry inference", use_container_width=True):
st.session_state.inference_error = None
run_inference(scan)
st.rerun()
if st.session_state.get("active_inferred_lineage"):
_n_tables = len(st.session_state.active_inferred_lineage["tables"])
st.success(f"🎯 Atlas is using AI-inferred lineage for {_n_tables} table(s).")
def configure_data_source():
"""
Render the data-source picker in the sidebar and load the chosen graph
into the lineage module. Runs on every rerun so the active lineage graph
always matches the current selection.
"""
st.header("Data source")
source = st.radio(
"Lineage data",
("Demo data", "Upload JSON", "Upload CSV", "Upload Database"),
label_visibility="collapsed",
)
with st.expander("Expected format"):
st.markdown(
"**Demo data** — the bundled `lineage.json` sample "
"(stripe, hubspot connectors).\n\n"
"**Upload JSON** — a file matching `lineage.json`: top-level "
"`tables`, `owners`, and `criticality_levels` keys.\n\n"
"**Upload CSV** — one row per downstream dependency, with columns:"
)
st.code(", ".join(CSV_COLUMNS), language=None)
st.caption(
"`is_pii` accepts true/false. Multiple rows sharing the same "
"table + column are grouped into one column with several "
"downstream assets."
)
st.markdown(
"**Upload Database** — a SQLite `.db` / `.sqlite` file. Atlas "
"auto-discovers every table, column, type, and foreign key. "
"(Discovery only for now — lineage inference comes next.)"
)
if source == "Demo data":
load_default()
return
if source == "Upload Database":
configure_database_source()
return
if source == "Upload JSON":
uploaded = st.file_uploader("Upload a lineage JSON file", type=["json"])
if uploaded is None:
load_default() # keep the app usable until a file arrives
st.info("Using demo data until a JSON file is uploaded.")
return
try:
data = json.loads(uploaded.getvalue().decode("utf-8"))
if "tables" not in data:
raise ValueError("JSON must contain a top-level 'tables' key.")
data.setdefault("owners", {})
data.setdefault("criticality_levels", _DEFAULT_CRITICALITY_LEVELS)
load_graph(data)
st.success(
f"Loaded {len(data['tables'])} table(s) from "
f"`{uploaded.name}`."
)
except Exception as e:
load_default()
st.error(f"Could not parse JSON: {e}")
return
# Upload CSV
uploaded = st.file_uploader("Upload a lineage CSV file", type=["csv"])
if uploaded is None:
load_default()
st.info("Using demo data until a CSV file is uploaded.")
return
try:
text = uploaded.getvalue().decode("utf-8-sig")
graph = build_graph_from_csv(text)
if not graph["tables"]:
raise ValueError("No valid rows found (need table + column values).")
load_graph(graph)
st.success(
f"Loaded {len(graph['tables'])} table(s) and "
f"{len(graph['owners'])} team(s) from `{uploaded.name}`."
)
except Exception as e:
load_default()
st.error(f"Could not parse CSV: {e}")
# ---------------------------------------------------------------------------
# Page config
# ---------------------------------------------------------------------------
st.set_page_config(
page_title="Atlas - Data Change Intelligence",
page_icon="🔍",
layout="wide",
)
st.markdown("""
<style>
@import url('https://fonts.googleapis.com/css2?family=Outfit:wght@300;400;600&display=swap');
html, body, [class*="css"] {
font-family: 'Outfit', sans-serif !important;
}
/* Base Dark Theme Overrides */
.stApp {
background-color: #0b0f19 !important;
color: #e2e8f0 !important;
}
/* Sidebar styling (Glassmorphism) */
[data-testid="stSidebar"] {
background: rgba(15, 23, 42, 0.4) !important;
backdrop-filter: blur(12px) !important;
border-right: 1px solid rgba(255,255,255,0.05);
}
/* Primary Button with Gradient and Micro-animation */
button[kind="primary"] {
background: linear-gradient(135deg, #6366f1 0%, #a855f7 100%) !important;
color: white !important;
border: none !important;
border-radius: 8px !important;
transition: all 0.3s ease !important;
box-shadow: 0 4px 14px 0 rgba(99, 102, 241, 0.39) !important;
}
button[kind="primary"]:hover {
transform: translateY(-2px) !important;
box-shadow: 0 6px 20px rgba(99, 102, 241, 0.6) !important;
}
/* Secondary Buttons */
button[kind="secondary"] {
background-color: rgba(255, 255, 255, 0.05) !important;
border: 1px solid rgba(255, 255, 255, 0.1) !important;
border-radius: 8px !important;
transition: all 0.3s ease !important;
color: #e2e8f0 !important;
}
button[kind="secondary"]:hover {
background-color: rgba(255, 255, 255, 0.1) !important;
transform: translateY(-1px) !important;
}
/* Status container styling */
[data-testid="stStatusWidget"] {
border-radius: 12px;
background: rgba(30, 41, 59, 0.5) !important;
border: 1px solid rgba(255,255,255,0.05) !important;
box-shadow: 0 8px 32px 0 rgba(0, 0, 0, 0.2);
backdrop-filter: blur(4px);
}
/* Text area styling */
.stTextArea textarea {
background-color: #1e293b !important;
color: #f8fafc !important;
border: 1px solid rgba(255, 255, 255, 0.1) !important;
border-radius: 8px !important;
}
.stTextArea textarea:focus {
border-color: #a855f7 !important;
box-shadow: 0 0 0 1px #a855f7 !important;
}
/* Code blocks (JSON, Tool calls) */
.stCodeBlock {
border-radius: 8px !important;
border: 1px solid rgba(255, 255, 255, 0.05) !important;
}
/* File uploader */
[data-testid="stFileUploadDropzone"] {
background-color: rgba(255,255,255, 0.02) !important;
border: 2px dashed rgba(255,255,255, 0.1) !important;
border-radius: 12px !important;
}
/* Warning & Info banners */
.stAlert {
border-radius: 8px !important;
border: none !important;
}
</style>
""", unsafe_allow_html=True)
# ---------------------------------------------------------------------------
# Session state
# ---------------------------------------------------------------------------
if "analysis_report" not in st.session_state:
st.session_state.analysis_report = None
if "analysis_contents" not in st.session_state:
st.session_state.analysis_contents = None
if "execution_done" not in st.session_state:
st.session_state.execution_done = False
if "execution_result" not in st.session_state:
st.session_state.execution_result = None
if "tool_log" not in st.session_state:
st.session_state.tool_log = []
if "user_request" not in st.session_state:
st.session_state.user_request = ""
if "severity" not in st.session_state:
st.session_state.severity = None
if "followup_history" not in st.session_state:
st.session_state.followup_history = []
if "rollback_done" not in st.session_state:
st.session_state.rollback_done = False
if "db_scan" not in st.session_state:
st.session_state.db_scan = None
if "inferred_lineage" not in st.session_state:
st.session_state.inferred_lineage = None
if "inferred_pending" not in st.session_state:
st.session_state.inferred_pending = False
if "active_inferred_lineage" not in st.session_state:
st.session_state.active_inferred_lineage = None
if "inference_error" not in st.session_state:
st.session_state.inference_error = None
# ---------------------------------------------------------------------------
# UI Layout
# ---------------------------------------------------------------------------
# Hero Header
st.markdown("""
<div style="
padding: 2rem 0 1rem 0;
text-align: left;
">
<h1 style="
font-size: 3rem;
font-weight: 600;
background: linear-gradient(135deg, #a78bfa 0%, #818cf8 40%, #6366f1 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
margin-bottom: 0.3rem;
letter-spacing: -0.02em;
">Atlas</h1>
<p style="
color: #94a3b8;
font-size: 1.05rem;
margin: 0;
letter-spacing: 0.02em;
">Data Change Intelligence Agent — Powered by <strong style="color:#a78bfa;">Gemini</strong> + <strong style="color:#818cf8;">Fivetran MCP</strong></p>
</div>
<hr style="border: none; border-top: 1px solid rgba(255,255,255,0.06); margin: 0.5rem 0 1.5rem 0;">
""", unsafe_allow_html=True)
# Sidebar
with st.sidebar:
st.markdown("""
<div style="text-align:center; padding: 1rem 0 0.5rem 0;">
<span style="font-size: 2rem;">🔍</span>
<h2 style="
font-size: 1.4rem;
font-weight: 600;
background: linear-gradient(135deg, #a78bfa, #6366f1);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
margin: 0.3rem 0 0 0;
">Atlas</h2>
<p style="color: #64748b; font-size: 0.8rem; margin-top:0.2rem;">v1.0 • Hackathon Edition</p>
</div>
""", unsafe_allow_html=True)
st.divider()
configure_data_source()
st.divider()
st.header("Integrations")
slack_webhook = st.text_input(
"Slack Webhook URL",
type="password",
placeholder="https://hooks.slack.com/services/...",
help="Paste an Incoming Webhook URL to send notifications directly to Slack. Create one at https://api.slack.com/apps",
)
with st.expander("📖 How to connect Slack (2 minutes)", expanded=False):
st.markdown("""
**What you need:** A Slack workspace (free tier works)
**Step 1:** Go to [api.slack.com/apps](https://api.slack.com/apps) and click **"Create New App"** → **"From scratch"**
**Step 2:** Give it a name (e.g. "Atlas Notifications") and select your Slack workspace. Click **Create App**.
**Step 3:** In the left sidebar, click **"Incoming Webhooks"**. Toggle the switch to **ON**.
**Step 4:** Scroll down and click **"Add New Webhook to Workspace"**. Choose which channel should receive Atlas notifications (e.g. #general or create a new #data-alerts channel). Click **Allow**.
**Step 5:** You'll see a Webhook URL that looks like:
`https://hooks.slack.com/services/T00.../B00.../xxx...`
Copy it and paste it in the field above.
**That's it!** When Atlas analyzes a schema change, you can click "Send to Slack" next to any stakeholder message and it will appear in your chosen channel instantly.
**Don't have a Slack workspace?** Create one free at [slack.com/create](https://slack.com/create) — takes 1 minute.
""")
st.markdown("<br>", unsafe_allow_html=True)
telegram_bot_token = st.text_input(
"Telegram Bot Token",
type="password",
placeholder="123456789:ABCdefGHI...",
help="Create a bot with BotFather on Telegram and paste the token here.",
)
telegram_chat_id = st.text_input(
"Telegram Chat ID",
placeholder="-1001234567890",
help="The chat ID of the group or user to send the notification to.",
)
with st.expander("📖 How to connect Telegram (3 minutes)", expanded=False):
st.markdown("""
**What you need:** A Telegram account (the app on your phone or desktop)
**Step 1: Create a bot**
Open Telegram and search for **@BotFather** (it has a blue verified checkmark). Start a chat and send:
`/newbot`
BotFather will ask you for a display name (e.g. "Atlas Notifications") and a username (must end in "bot", e.g. "atlas_notify_bot").
**Step 2: Copy the token**
BotFather will reply with a long token that looks like:
`1234567890:ABCdefGHIjklMNOpqrsTUVwxyz`
Copy this and paste it in the **"Telegram Bot Token"** field above.
**Step 3: Get your Chat ID**
This tells Atlas WHERE to send messages.
Option A — Send to yourself:
1. Open a chat with your new bot in Telegram (tap the t.me/your_bot_name link BotFather gave you)
2. Press **Start**, then send any message like "hello"
3. Open this URL in your browser (replace YOUR_TOKEN with your actual token):
`https://api.telegram.org/botYOUR_TOKEN/getUpdates`
4. Look for `"chat":{"id":` followed by a number like `7916615222`
5. Copy that number and paste it in the **"Telegram Chat ID"** field above
Option B — Send to a group:
1. Create a new Telegram group
2. Add your bot to the group
3. Send any message in the group
4. Check the same URL above — the chat ID will be a negative number like `-1001234567890`
**That's it!** Click "Send to Telegram" next to any stakeholder message and it will arrive in your Telegram chat instantly.
""")
with st.expander("📖 About email notifications", expanded=False):
st.markdown("""
**No setup needed.** When you click "Send Email" next to a stakeholder message, Atlas opens your default email app (Gmail, Outlook, Apple Mail, etc.) with a pre-written message including:
- **To:** The team's email address
- **Subject:** Schema Change Notice with the table and column name
- **Body:** The full deprecation notice customized for that team
Just review the email and click Send in your mail app.
**Tip:** If clicking the button doesn't open your email, make sure you have a default email app set in your browser. For Gmail users in Chrome: go to gmail.com, click the diamond/handler icon in the address bar, and select "Always allow gmail.com to open mailto links".
""")
st.divider()
st.markdown("""
<div style="padding: 0.5rem 0;">
<h4 style="color: #cbd5e1; margin-bottom: 0.8rem;">⚡ How it works</h4>
<div style="display: flex; align-items: flex-start; margin-bottom: 0.7rem;">
<span style="
background: linear-gradient(135deg, #6366f1, #a855f7);
color: white; font-weight: 600; font-size: 0.75rem;
min-width: 22px; height: 22px; border-radius: 50%;
display: flex; align-items: center; justify-content: center;
margin-right: 0.6rem; margin-top: 2px;
">1</span>
<div><strong style="color:#e2e8f0;">Describe your change</strong><br><span style="color:#94a3b8; font-size:0.85rem;">Tell Atlas what schema change you want to make.</span></div>
</div>
<div style="display: flex; align-items: flex-start; margin-bottom: 0.7rem;">
<span style="
background: linear-gradient(135deg, #6366f1, #a855f7);
color: white; font-weight: 600; font-size: 0.75rem;
min-width: 22px; height: 22px; border-radius: 50%;
display: flex; align-items: center; justify-content: center;
margin-right: 0.6rem; margin-top: 2px;
">2</span>
<div><strong style="color:#e2e8f0;">Review the analysis</strong><br><span style="color:#94a3b8; font-size:0.85rem;">Atlas discovers connectors, confirms columns, checks downstream impact.</span></div>
</div>
<div style="display: flex; align-items: flex-start; margin-bottom: 0.7rem;">
<span style="
background: linear-gradient(135deg, #6366f1, #a855f7);
color: white; font-weight: 600; font-size: 0.75rem;
min-width: 22px; height: 22px; border-radius: 50%;
display: flex; align-items: center; justify-content: center;
margin-right: 0.6rem; margin-top: 2px;
">3</span>
<div><strong style="color:#e2e8f0;">Approve or reject</strong><br><span style="color:#94a3b8; font-size:0.85rem;">You stay in control. Atlas only executes after your explicit approval.</span></div>
</div>
<div style="display: flex; align-items: flex-start;">
<span style="
background: linear-gradient(135deg, #6366f1, #a855f7);