Skip to content

Commit cc01853

Browse files
authored
Merge pull request #214 from CausalInferenceLab/feat/datahub-lineage
feat(catalog): add load_lineage_documents() to DataHubCatalogLoader
2 parents e05ff07 + 60f00d0 commit cc01853

File tree

2 files changed

+111
-3
lines changed

2 files changed

+111
-3
lines changed

src/lang2sql/core/ports.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from typing import Any, Protocol, runtime_checkable
44

5-
from .catalog import TextDocument
5+
from .catalog import CatalogEntry, TextDocument
66

77

88
class LLMPort(Protocol):
@@ -57,3 +57,9 @@ class DocumentLoaderPort(Protocol):
5757
"""Converts a file path or directory to list[TextDocument]."""
5858

5959
def load(self, path: str) -> list[TextDocument]: ...
60+
61+
62+
class CatalogLoaderPort(Protocol):
63+
"""Abstracts catalog loading from external sources (DataHub, file, database, etc.)."""
64+
65+
def load(self) -> list[CatalogEntry]: ...

src/lang2sql/integrations/catalog/datahub_.py

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
from __future__ import annotations
22

3-
from ...core.catalog import CatalogEntry
3+
from ...core.catalog import CatalogEntry, TextDocument
44
from ...core.exceptions import IntegrationMissingError
5+
from ...core.ports import CatalogLoaderPort
56

67
try:
78
import datahub as _datahub # type: ignore[import]
89
except ImportError:
910
_datahub = None # type: ignore[assignment]
1011

1112

12-
class DataHubCatalogLoader:
13+
class DataHubCatalogLoader(CatalogLoaderPort):
1314
"""DataHub URN → list[CatalogEntry] 변환.
1415
1516
DataHub GMS 서버에서 테이블 메타데이터를 조회하여
@@ -59,3 +60,104 @@ def load(self, urns: list[str] | None = None) -> list[CatalogEntry]:
5960
CatalogEntry(name=name, description=description, columns=columns)
6061
)
6162
return entries
63+
64+
def load_lineage_documents(
65+
self,
66+
urns: list[str] | None = None,
67+
max_degree: int = 2,
68+
) -> list[TextDocument]:
69+
"""DataHub lineage 정보를 TextDocument 목록으로 변환한다.
70+
71+
내부적으로 build_table_metadata()를 사용하며, 사이클 안전성은
72+
하위 레이어에서 보장된다:
73+
- get_table_lineage(): GraphQL degree 필터로 depth 상한 적용
74+
- min_degree_lineage(): 테이블별 최소 degree만 유지 (사이클 경로 dedup)
75+
- build_table_metadata(): 자기 자신(table == current_table) 제외
76+
77+
Args:
78+
urns: 조회할 URN 목록. None이면 전체 URN을 조회한다.
79+
max_degree: 포함할 최대 lineage depth. 기본값 2.
80+
81+
Returns:
82+
TextDocument 목록. lineage 없는 테이블은 제외된다.
83+
84+
Usage::
85+
86+
loader = DataHubCatalogLoader(gms_server="http://localhost:8080")
87+
pipeline = EnrichedNL2SQL(
88+
catalog=loader.load(),
89+
documents=loader.load_lineage_documents(),
90+
llm=..., db=..., embedding=...,
91+
)
92+
"""
93+
if urns is None:
94+
urns = list(self._fetcher.get_urns())
95+
96+
return [
97+
doc
98+
for urn in urns
99+
if (doc := self._urn_to_lineage_document(urn, max_degree)) is not None
100+
]
101+
102+
def _urn_to_lineage_document(
103+
self, urn: str, max_degree: int
104+
) -> TextDocument | None:
105+
"""단일 URN의 lineage를 TextDocument로 변환. lineage 없으면 None 반환."""
106+
try:
107+
# build_table_metadata가 upstream/downstream/column lineage를
108+
# 파싱 및 dedup까지 처리해준다.
109+
meta = self._fetcher.build_table_metadata(urn, max_degree=max_degree)
110+
except Exception:
111+
return None
112+
113+
table_name = meta.get("table_name") or ""
114+
lineage = meta.get("lineage", {})
115+
upstream = lineage.get("upstream", [])
116+
downstream = lineage.get("downstream", [])
117+
upstream_columns = lineage.get("upstream_columns", [])
118+
119+
if not upstream and not downstream and not upstream_columns:
120+
return None
121+
122+
return TextDocument(
123+
id=f"lineage__{table_name}",
124+
title=f"{table_name} 리니지",
125+
content=self._format_lineage(
126+
table_name, upstream, downstream, upstream_columns
127+
),
128+
source="datahub",
129+
metadata={"urn": urn, "table_name": table_name},
130+
)
131+
132+
@staticmethod
133+
def _format_lineage(
134+
table_name: str,
135+
upstream: list[dict],
136+
downstream: list[dict],
137+
upstream_columns: list[dict],
138+
) -> str:
139+
"""lineage 데이터를 자연어 텍스트로 포맷한다."""
140+
lines: list[str] = [f"테이블: {table_name}", ""]
141+
142+
if upstream:
143+
lines += ["[Upstream — 이 테이블의 원천 데이터]"]
144+
lines += [f" - {t['table']} (depth: {t['degree']})" for t in upstream]
145+
lines.append("")
146+
147+
if downstream:
148+
lines += ["[Downstream — 이 테이블을 참조하는 테이블]"]
149+
lines += [f" - {t['table']} (depth: {t['degree']})" for t in downstream]
150+
lines.append("")
151+
152+
if upstream_columns:
153+
lines += ["[컬럼 단위 Upstream Lineage]"]
154+
for dataset in upstream_columns:
155+
lines.append(f" {dataset.get('upstream_dataset', '')}:")
156+
lines += [
157+
f" {col['upstream_column']}{col['downstream_column']}"
158+
f" (신뢰도: {col.get('confidence', 1.0):.2f})"
159+
for col in dataset.get("columns", [])
160+
]
161+
lines.append("")
162+
163+
return "\n".join(lines).strip()

0 commit comments

Comments
 (0)