元数据服务是 DataJump 平台的数据治理核心,负责管理企业数据资产的技术元数据和业务元数据,提供数据血缘追踪、数据字典管理、数据质量监控等能力。
┌─────────────────────────────────────────────────────────────────┐
│ 元数据服务 (Metadata Service) │
├─────────────────────────────────────────────────────────────────┤
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────────────┐ │
│ │ 数据血缘 │ │ 数据字典 │ │ 数据质量 │ │
│ │ Lineage │ │ Dictionary │ │ Quality │ │
│ └───────────────┘ └───────────────┘ └───────────────────────┘ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────────────┐ │
│ │ 数据分类 │ │ 数据标签 │ │ 数据检索 │ │
│ │ Classification│ │ Tags │ │ Search │ │
│ └───────────────┘ └───────────────┘ └───────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 元数据存储层 │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────────────┐ │
│ │ MySQL │ │ Elasticsearch │ │ Neo4j │ │
│ │ (结构化数据) │ │ (全文检索) │ │ (血缘图谱) │ │
│ └───────────────┘ └───────────────┘ └───────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 元数据采集层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────────────┐ │
│ │ Hive Hook│ │Spark Hook│ │Flink Hook│ │ SQL Parser │ │
│ └──────────┘ └──────────┘ └──────────┘ └────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
/**
* 血缘节点 - 代表一个数据实体
*/
public class LineageNode {
private String id;
private NodeType type; // TABLE, COLUMN, TASK, FILE
private String name;
private String qualifiedName; // database.table.column
private Map<String, Object> attributes;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
public enum NodeType {
DATABASE,
TABLE,
COLUMN,
TASK,
FILE,
KAFKA_TOPIC
}
/**
* 血缘边 - 代表数据流转关系
*/
public class LineageEdge {
private String id;
private String sourceId;
private String targetId;
private EdgeType type;
private String processId; // 产生血缘的任务ID
private String processType; // SQL, ETL, SPARK
private LocalDateTime createTime;
}
public enum EdgeType {
TABLE_TO_TABLE,
COLUMN_TO_COLUMN,
TASK_TO_TABLE,
TABLE_TO_TASK
}
public class SQLLineageParser {
private final JSqlParser sqlParser;
public LineageResult parse(String sql) {
Statement statement = CCJSqlParserUtil.parse(sql);
if (statement instanceof Select) {
return parseSelect((Select) statement);
} else if (statement instanceof Insert) {
return parseInsert((Insert) statement);
} else if (statement instanceof CreateTable) {
return parseCreateTable((CreateTable) statement);
}
return LineageResult.empty();
}
private LineageResult parseSelect(Select select) {
LineageResult result = new LineageResult();
// 解析源表
TablesNamesFinder tablesFinder = new TablesNamesFinder();
List<String> sourceTables = tablesFinder.getTableList(select);
// 解析列
SelectBody selectBody = select.getSelectBody();
if (selectBody instanceof PlainSelect) {
PlainSelect plainSelect = (PlainSelect) selectBody;
List<SelectItem> selectItems = plainSelect.getSelectItems();
for (SelectItem item : selectItems) {
if (item instanceof SelectExpressionItem) {
SelectExpressionItem exprItem = (SelectExpressionItem) item;
// 解析列表达式,建立列级血缘
ColumnLineage columnLineage = parseColumnExpression(
exprItem.getExpression(),
sourceTables
);
result.addColumnLineage(columnLineage);
}
}
}
result.setSourceTables(sourceTables);
return result;
}
}
class SparkLineageListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val queryExecution = SparkSession.active.sharedState.cacheManager
queryExecution.cachedData.foreach { cachedData =>
val lineage = extractLineage(cachedData.plan)
LineageReporter.report(lineage)
}
}
private def extractLineage(plan: LogicalPlan): LineageResult = {
val sources = plan.collect {
case r: LogicalRelation => r.catalogTable.map(_.identifier.toString)
case h: HiveTableRelation => Some(h.tableMeta.identifier.toString)
}.flatten
val target = plan match {
case i: InsertIntoHadoopFsRelationCommand =>
i.catalogTable.map(_.identifier.toString)
case _ => None
}
LineageResult(sources, target)
}
}
// 创建表节点
CREATE (t:Table {
id: $id,
name: $name,
database: $database,
qualifiedName: $qualifiedName,
createTime: datetime()
})
// 创建列节点
CREATE (c:Column {
id: $id,
name: $name,
table: $table,
dataType: $dataType,
qualifiedName: $qualifiedName
})
// 建立表级血缘
MATCH (source:Table {qualifiedName: $sourceTable})
MATCH (target:Table {qualifiedName: $targetTable})
CREATE (source)-[:LINEAGE_TO {
processId: $processId,
processType: $processType,
createTime: datetime()
}]->(target)
// 建立列级血缘
MATCH (sourceCol:Column {qualifiedName: $sourceColumn})
MATCH (targetCol:Column {qualifiedName: $targetColumn})
CREATE (sourceCol)-[:DERIVED_FROM {
expression: $expression,
processId: $processId
}]->(targetCol)
public interface LineageService {
/**
* 获取上游血缘
*/
LineageGraph getUpstreamLineage(String nodeId, int depth);
/**
* 获取下游血缘
*/
LineageGraph getDownstreamLineage(String nodeId, int depth);
/**
* 获取完整血缘图
*/
LineageGraph getFullLineage(String nodeId, int upstreamDepth, int downstreamDepth);
/**
* 影响分析:查找所有受影响的下游
*/
List<LineageNode> impactAnalysis(String nodeId);
/**
* 根因分析:追溯数据来源
*/
List<LineagePath> rootCauseAnalysis(String nodeId);
}
@Service
public class LineageServiceImpl implements LineageService {
private final Neo4jTemplate neo4jTemplate;
@Override
public LineageGraph getUpstreamLineage(String nodeId, int depth) {
String cypher = """
MATCH path = (target {id: $nodeId})<-[:LINEAGE_TO|DERIVED_FROM*1..%d]-(source)
RETURN path
""".formatted(depth);
return neo4jTemplate.query(cypher, Map.of("nodeId", nodeId))
.stream()
.collect(LineageGraphCollector.toGraph());
}
}
interface LineageGraphProps {
nodeId: string;
direction: 'upstream' | 'downstream' | 'both';
depth: number;
}
const LineageGraph: React.FC<LineageGraphProps> = ({ nodeId, direction, depth }) => {
const [graph, setGraph] = useState<LineageData>();
const graphRef = useRef<HTMLDivElement>(null);
useEffect(() => {
loadLineage();
}, [nodeId, direction, depth]);
useEffect(() => {
if (graph && graphRef.current) {
renderGraph();
}
}, [graph]);
const renderGraph = () => {
const g6Graph = new G6.Graph({
container: graphRef.current!,
width: 1200,
height: 800,
layout: {
type: 'dagre',
rankdir: 'LR',
nodesep: 50,
ranksep: 100,
},
defaultNode: {
type: 'lineage-node',
size: [180, 60],
},
defaultEdge: {
type: 'polyline',
style: {
stroke: '#91d5ff',
lineWidth: 2,
endArrow: true,
},
},
modes: {
default: ['drag-canvas', 'zoom-canvas', 'drag-node'],
},
});
// 注册自定义节点
G6.registerNode('lineage-node', {
draw(cfg, group) {
const { name, type, highlight } = cfg;
const rect = group.addShape('rect', {
attrs: {
width: 180,
height: 60,
fill: highlight ? '#e6f7ff' : '#fff',
stroke: getTypeColor(type),
radius: 4,
},
});
group.addShape('image', {
attrs: {
x: 10,
y: 15,
width: 30,
height: 30,
img: getTypeIcon(type),
},
});
group.addShape('text', {
attrs: {
x: 50,
y: 35,
text: name,
fontSize: 14,
fill: '#333',
},
});
return rect;
},
});
g6Graph.data(transformToG6Data(graph));
g6Graph.render();
};
return (
<div className="lineage-container">
<div className="toolbar">
<Select value={direction} onChange={setDirection}>
<Option value="upstream">上游血缘</Option>
<Option value="downstream">下游血缘</Option>
<Option value="both">完整血缘</Option>
</Select>
<InputNumber
min={1}
max={10}
value={depth}
onChange={setDepth}
addonBefore="深度"
/>
</div>
<div ref={graphRef} className="graph-canvas" />
</div>
);
};
/**
* 数据资产 - 表
*/
public class TableAsset {
private Long id;
private String database;
private String tableName;
private String qualifiedName;
private String tableType; // MANAGED, EXTERNAL, VIEW
private String description;
private String owner;
private List<ColumnAsset> columns;
private Map<String, String> properties;
private StorageInfo storageInfo;
private Statistics statistics;
private LocalDateTime createTime;
private LocalDateTime lastAccessTime;
}
/**
* 列资产
*/
public class ColumnAsset {
private Long id;
private Long tableId;
private String columnName;
private String dataType;
private String description;
private Boolean isPrimaryKey;
private Boolean isPartitionKey;
private Boolean isNullable;
private String defaultValue;
private ColumnStatistics statistics;
}
/**
* 业务术语
*/
public class BusinessTerm {
private Long id;
private String name;
private String definition;
private String category;
private List<String> synonyms;
private List<Long> relatedAssetIds;
private String owner;
private String status; // DRAFT, APPROVED, DEPRECATED
}
@Service
public class MetadataSyncService {
private final HiveMetaStoreClient hiveClient;
private final TableAssetRepository tableRepo;
/**
* 从 Hive MetaStore 同步元数据
*/
@Scheduled(cron = "0 0 * * * ?") // 每小时同步
public void syncFromHive() {
List<String> databases = hiveClient.getAllDatabases();
for (String database : databases) {
List<String> tables = hiveClient.getAllTables(database);
for (String tableName : tables) {
try {
Table hiveTable = hiveClient.getTable(database, tableName);
TableAsset asset = convertToAsset(hiveTable);
tableRepo.upsert(asset);
} catch (Exception e) {
log.error("同步表元数据失败: {}.{}", database, tableName, e);
}
}
}
}
private TableAsset convertToAsset(Table hiveTable) {
TableAsset asset = new TableAsset();
asset.setDatabase(hiveTable.getDbName());
asset.setTableName(hiveTable.getTableName());
asset.setQualifiedName(hiveTable.getDbName() + "." + hiveTable.getTableName());
asset.setTableType(hiveTable.getTableType().name());
asset.setDescription(hiveTable.getParameters().get("comment"));
asset.setOwner(hiveTable.getOwner());
// 同步列信息
List<ColumnAsset> columns = hiveTable.getSd().getCols().stream()
.map(this::convertToColumnAsset)
.collect(Collectors.toList());
asset.setColumns(columns);
// 同步分区列
List<ColumnAsset> partitionColumns = hiveTable.getPartitionKeys().stream()
.map(col -> {
ColumnAsset colAsset = convertToColumnAsset(col);
colAsset.setIsPartitionKey(true);
return colAsset;
})
.collect(Collectors.toList());
columns.addAll(partitionColumns);
// 存储信息
StorageInfo storageInfo = new StorageInfo();
storageInfo.setLocation(hiveTable.getSd().getLocation());
storageInfo.setInputFormat(hiveTable.getSd().getInputFormat());
storageInfo.setOutputFormat(hiveTable.getSd().getOutputFormat());
asset.setStorageInfo(storageInfo);
return asset;
}
}
@Service
public class MetadataSearchService {
private final ElasticsearchClient esClient;
/**
* 全文检索数据资产
*/
public SearchResult<TableAsset> search(SearchRequest request) {
BoolQuery.Builder boolQuery = new BoolQuery.Builder();
// 关键词搜索
if (StringUtils.hasText(request.getKeyword())) {
boolQuery.should(
MultiMatchQuery.of(m -> m
.query(request.getKeyword())
.fields("tableName^3", "description^2", "columns.columnName", "columns.description")
.fuzziness("AUTO")
)._toQuery()
);
}
// 过滤条件
if (StringUtils.hasText(request.getDatabase())) {
boolQuery.filter(TermQuery.of(t -> t.field("database").value(request.getDatabase()))._toQuery());
}
if (request.getTags() != null && !request.getTags().isEmpty()) {
boolQuery.filter(TermsQuery.of(t -> t.field("tags").terms(
TermsQueryField.of(f -> f.value(request.getTags().stream()
.map(FieldValue::of)
.collect(Collectors.toList())))
))._toQuery());
}
co.elastic.clients.elasticsearch.core.SearchRequest esRequest =
co.elastic.clients.elasticsearch.core.SearchRequest.of(s -> s
.index("metadata_table")
.query(boolQuery.build()._toQuery())
.from(request.getPage() * request.getSize())
.size(request.getSize())
.highlight(h -> h
.fields("tableName", f -> f)
.fields("description", f -> f)
)
);
SearchResponse<TableAsset> response = esClient.search(esRequest, TableAsset.class);
return SearchResult.<TableAsset>builder()
.total(response.hits().total().value())
.items(response.hits().hits().stream()
.map(Hit::source)
.collect(Collectors.toList()))
.build();
}
}
public class QualityRule {
private Long id;
private String name;
private String description;
private RuleType type;
private String targetTable;
private String targetColumn;
private String expression;
private Map<String, Object> params;
private QualityLevel level; // CRITICAL, WARNING, INFO
private Boolean enabled;
}
public enum RuleType {
NOT_NULL, // 非空检查
UNIQUE, // 唯一性检查
RANGE, // 范围检查
REGEX, // 正则匹配
ENUM, // 枚举值检查
FOREIGN_KEY, // 外键检查
CUSTOM_SQL, // 自定义 SQL
FRESHNESS, // 数据新鲜度
VOLUME, // 数据量检查
SCHEMA_CHANGE // Schema 变更检查
}
@Service
public class QualityCheckExecutor {
public QualityCheckResult execute(QualityRule rule) {
return switch (rule.getType()) {
case NOT_NULL -> checkNotNull(rule);
case UNIQUE -> checkUnique(rule);
case RANGE -> checkRange(rule);
case CUSTOM_SQL -> checkCustomSql(rule);
case FRESHNESS -> checkFreshness(rule);
case VOLUME -> checkVolume(rule);
default -> throw new UnsupportedOperationException();
};
}
private QualityCheckResult checkNotNull(QualityRule rule) {
String sql = String.format("""
SELECT
COUNT(*) as total,
COUNT(%s) as non_null_count,
COUNT(*) - COUNT(%s) as null_count
FROM %s
""",
rule.getTargetColumn(),
rule.getTargetColumn(),
rule.getTargetTable()
);
Map<String, Object> result = jdbcTemplate.queryForMap(sql);
long nullCount = ((Number) result.get("null_count")).longValue();
return QualityCheckResult.builder()
.ruleId(rule.getId())
.passed(nullCount == 0)
.actualValue(nullCount)
.expectedValue(0L)
.message(nullCount == 0 ? "通过" : String.format("发现 %d 条空值", nullCount))
.build();
}
private QualityCheckResult checkFreshness(QualityRule rule) {
String timeColumn = (String) rule.getParams().get("timeColumn");
int maxDelayMinutes = (Integer) rule.getParams().get("maxDelayMinutes");
String sql = String.format("""
SELECT MAX(%s) as latest_time
FROM %s
""", timeColumn, rule.getTargetTable());
LocalDateTime latestTime = jdbcTemplate.queryForObject(sql, LocalDateTime.class);
long delayMinutes = ChronoUnit.MINUTES.between(latestTime, LocalDateTime.now());
return QualityCheckResult.builder()
.ruleId(rule.getId())
.passed(delayMinutes <= maxDelayMinutes)
.actualValue(delayMinutes)
.expectedValue((long) maxDelayMinutes)
.message(delayMinutes <= maxDelayMinutes ?
"数据新鲜度正常" :
String.format("数据延迟 %d 分钟,超过阈值 %d 分钟", delayMinutes, maxDelayMinutes))
.build();
}
}
public class QualityReport {
private Long tableId;
private String tableName;
private LocalDateTime checkTime;
private QualityScore score;
private List<DimensionScore> dimensionScores;
private List<QualityCheckResult> details;
}
public class QualityScore {
private Double overallScore; // 0-100
private Integer totalRules;
private Integer passedRules;
private Integer failedRules;
}
public class DimensionScore {
private String dimension; // 完整性、准确性、一致性、及时性
private Double score;
private Integer ruleCount;
private Integer passedCount;
}
-- 表资产
CREATE TABLE t_table_asset (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
database_name VARCHAR(255) NOT NULL,
table_name VARCHAR(255) NOT NULL,
qualified_name VARCHAR(500) NOT NULL UNIQUE,
table_type VARCHAR(50),
description TEXT,
owner VARCHAR(100),
properties JSON,
storage_info JSON,
statistics JSON,
create_time DATETIME,
last_access_time DATETIME,
sync_time DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_database (database_name),
INDEX idx_qualified_name (qualified_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 列资产
CREATE TABLE t_column_asset (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
table_id BIGINT NOT NULL,
column_name VARCHAR(255) NOT NULL,
data_type VARCHAR(100),
description TEXT,
is_primary_key BOOLEAN DEFAULT FALSE,
is_partition_key BOOLEAN DEFAULT FALSE,
is_nullable BOOLEAN DEFAULT TRUE,
default_value VARCHAR(500),
statistics JSON,
FOREIGN KEY (table_id) REFERENCES t_table_asset(id),
INDEX idx_table (table_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 业务术语
CREATE TABLE t_business_term (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
definition TEXT,
category VARCHAR(100),
synonyms JSON,
owner VARCHAR(100),
status VARCHAR(20) DEFAULT 'DRAFT',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_name (name),
INDEX idx_category (category)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 术语资产关联
CREATE TABLE t_term_asset_relation (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
term_id BIGINT NOT NULL,
asset_type VARCHAR(50) NOT NULL,
asset_id BIGINT NOT NULL,
FOREIGN KEY (term_id) REFERENCES t_business_term(id),
INDEX idx_term (term_id),
INDEX idx_asset (asset_type, asset_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 数据标签
CREATE TABLE t_data_tag (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
color VARCHAR(20),
description TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 资产标签关联
CREATE TABLE t_asset_tag_relation (
asset_type VARCHAR(50) NOT NULL,
asset_id BIGINT NOT NULL,
tag_id BIGINT NOT NULL,
PRIMARY KEY (asset_type, asset_id, tag_id),
FOREIGN KEY (tag_id) REFERENCES t_data_tag(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 质量规则
CREATE TABLE t_quality_rule (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
description TEXT,
rule_type VARCHAR(50) NOT NULL,
target_table VARCHAR(255),
target_column VARCHAR(255),
expression TEXT,
params JSON,
level VARCHAR(20) DEFAULT 'WARNING',
enabled BOOLEAN DEFAULT TRUE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 质量检查结果
CREATE TABLE t_quality_check_result (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
rule_id BIGINT NOT NULL,
check_time DATETIME NOT NULL,
passed BOOLEAN,
actual_value VARCHAR(255),
expected_value VARCHAR(255),
message TEXT,
FOREIGN KEY (rule_id) REFERENCES t_quality_rule(id),
INDEX idx_rule_time (rule_id, check_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
# 血缘查询
GET /api/v1/lineage/{nodeId}
Query:
direction: upstream | downstream | both
depth: number
# 影响分析
GET /api/v1/lineage/{nodeId}/impact
# 搜索数据资产
GET /api/v1/metadata/search
Query:
keyword: string
database: string
tags: string[]
page: number
size: number
# 获取表详情
GET /api/v1/metadata/tables/{tableId}
# 获取表的列信息
GET /api/v1/metadata/tables/{tableId}/columns
# 业务术语
POST /api/v1/metadata/terms
GET /api/v1/metadata/terms
PUT /api/v1/metadata/terms/{termId}
# 数据标签
POST /api/v1/metadata/tags
GET /api/v1/metadata/tags
POST /api/v1/metadata/assets/{assetType}/{assetId}/tags
# 质量规则
POST /api/v1/quality/rules
GET /api/v1/quality/rules
PUT /api/v1/quality/rules/{ruleId}
# 执行质量检查
POST /api/v1/quality/check
Request:
tableId: number
ruleIds: number[]
# 获取质量报告
GET /api/v1/quality/reports/{tableId}
| 功能 |
状态 |
说明 |
| 表/列元数据管理 |
✅ 已完成 |
MySQL 存储,支持 CRUD |
| 数据源同步 |
✅ 已完成 |
支持从 MySQL/PostgreSQL 数据源同步元数据 |
| SQL 血缘解析 |
✅ 已完成 |
基于 JSqlParser,支持 INSERT/SELECT/UPDATE/MERGE |
| Neo4j 血缘存储 |
✅ 已完成 |
图数据库存储表级血缘关系 |
| 血缘查询 API |
✅ 已完成 |
上游/下游/完整血缘查询,支持深度控制 |
| G6 血缘可视化 |
✅ 已完成 |
DAG 布局,支持拖拽缩放 |
| Elasticsearch 搜索 |
✅ 已完成 |
全文搜索,带 MySQL LIKE 降级 |
| 血缘自动建表 |
✅ 已完成 |
解析血缘时自动在 MySQL 创建表元数据 |
| 功能 |
优先级 |
说明 |
| Hive MetaStore 同步 |
P1 |
定时从 Hive 同步元数据 |
| 列级血缘 |
P1 |
解析 SQL 提取列级血缘 |
| Spark/Flink Hook |
P2 |
运行时采集血缘 |
| 数据质量规则 |
P2 |
质量检测引擎 |
| 业务术语管理 |
P3 |
业务元数据管理 |
| 数据标签 |
P3 |
标签分类体系 |
POST /api/lineage/parse
Content-Type: application/json
{
"sql": "INSERT INTO dwd.user_orders SELECT * FROM ods.orders o JOIN ods.users u ON o.user_id = u.id",
"taskType": "ETL",
"taskName": "sync_user_orders"
}
# 响应
{
"code": 0,
"data": {
"sourceTables": ["ods.orders", "ods.users"],
"targetTables": ["dwd.user_orders"],
"columnLineages": []
}
}
# 上游血缘
GET /api/lineage/upstream/{qualifiedName}?depth=3
# 下游血缘
GET /api/lineage/downstream/{qualifiedName}?depth=3
# 完整血缘
GET /api/lineage/full/{qualifiedName}?upstreamDepth=3&downstreamDepth=3
# 响应格式(G6 兼容)
{
"code": 0,
"data": {
"nodes": [
{"id": "ods.orders", "label": "orders", "database": "ods", "isCenter": false},
{"id": "dwd.user_orders", "label": "user_orders", "database": "dwd", "isCenter": true}
],
"edges": [
{"source": "ods.orders", "target": "dwd.user_orders", "taskType": "ETL"}
]
}
}
# 按关键词搜索(支持表名、库名、描述)
GET /api/v1/metadata/tables?keyword=user
# 全文搜索(ES/MySQL 降级)
GET /api/v1/metadata/search?keyword=订单&database=dwd&page=0&size=20
# application.yml
spring:
neo4j:
uri: bolt://localhost:7687
authentication:
username: neo4j
password: datajump123
elasticsearch:
host: localhost
port: 9200
enabled: true # 设为 false 时降级到 MySQL LIKE 查询
| 服务 |
端口 |
用途 |
必需 |
| MySQL |
3306 |
表/列元数据存储 |
是 |
| Neo4j |
7687 |
血缘图存储 |
是 |
| Elasticsearch |
9200 |
全文搜索 |
否(有 MySQL 降级) |