Skip to content

Latest commit

 

History

History
950 lines (816 loc) · 28.3 KB

File metadata and controls

950 lines (816 loc) · 28.3 KB

元数据服务技术方案

1. 概述

元数据服务是 DataJump 平台的数据治理核心,负责管理企业数据资产的技术元数据和业务元数据,提供数据血缘追踪、数据字典管理、数据质量监控等能力。

2. 功能架构

┌─────────────────────────────────────────────────────────────────┐
│                      元数据服务 (Metadata Service)              │
├─────────────────────────────────────────────────────────────────┤
│  ┌───────────────┐ ┌───────────────┐ ┌───────────────────────┐ │
│  │   数据血缘    │ │   数据字典    │ │      数据质量         │ │
│  │   Lineage     │ │  Dictionary   │ │      Quality          │ │
│  └───────────────┘ └───────────────┘ └───────────────────────┘ │
│  ┌───────────────┐ ┌───────────────┐ ┌───────────────────────┐ │
│  │   数据分类    │ │   数据标签    │ │      数据检索         │ │
│  │  Classification│ │     Tags     │ │       Search          │ │
│  └───────────────┘ └───────────────┘ └───────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│                       元数据存储层                               │
│  ┌───────────────┐ ┌───────────────┐ ┌───────────────────────┐ │
│  │    MySQL      │ │ Elasticsearch │ │       Neo4j           │ │
│  │  (结构化数据)  │ │   (全文检索)   │ │    (血缘图谱)         │ │
│  └───────────────┘ └───────────────┘ └───────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│                       元数据采集层                               │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────────────┐ │
│  │ Hive Hook│ │Spark Hook│ │Flink Hook│ │    SQL Parser      │ │
│  └──────────┘ └──────────┘ └──────────┘ └────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

3. 数据血缘

3.1 血缘模型

/**
 * 血缘节点 - 代表一个数据实体
 */
public class LineageNode {
    private String id;
    private NodeType type;        // TABLE, COLUMN, TASK, FILE
    private String name;
    private String qualifiedName; // database.table.column
    private Map<String, Object> attributes;
    private LocalDateTime createTime;
    private LocalDateTime updateTime;
}

public enum NodeType {
    DATABASE,
    TABLE,
    COLUMN,
    TASK,
    FILE,
    KAFKA_TOPIC
}

/**
 * 血缘边 - 代表数据流转关系
 */
public class LineageEdge {
    private String id;
    private String sourceId;
    private String targetId;
    private EdgeType type;
    private String processId;     // 产生血缘的任务ID
    private String processType;   // SQL, ETL, SPARK
    private LocalDateTime createTime;
}

public enum EdgeType {
    TABLE_TO_TABLE,
    COLUMN_TO_COLUMN,
    TASK_TO_TABLE,
    TABLE_TO_TASK
}

3.2 血缘采集

SQL 解析采集

public class SQLLineageParser {
    private final JSqlParser sqlParser;

    public LineageResult parse(String sql) {
        Statement statement = CCJSqlParserUtil.parse(sql);

        if (statement instanceof Select) {
            return parseSelect((Select) statement);
        } else if (statement instanceof Insert) {
            return parseInsert((Insert) statement);
        } else if (statement instanceof CreateTable) {
            return parseCreateTable((CreateTable) statement);
        }

        return LineageResult.empty();
    }

    private LineageResult parseSelect(Select select) {
        LineageResult result = new LineageResult();

        // 解析源表
        TablesNamesFinder tablesFinder = new TablesNamesFinder();
        List<String> sourceTables = tablesFinder.getTableList(select);

        // 解析列
        SelectBody selectBody = select.getSelectBody();
        if (selectBody instanceof PlainSelect) {
            PlainSelect plainSelect = (PlainSelect) selectBody;
            List<SelectItem> selectItems = plainSelect.getSelectItems();

            for (SelectItem item : selectItems) {
                if (item instanceof SelectExpressionItem) {
                    SelectExpressionItem exprItem = (SelectExpressionItem) item;
                    // 解析列表达式,建立列级血缘
                    ColumnLineage columnLineage = parseColumnExpression(
                        exprItem.getExpression(),
                        sourceTables
                    );
                    result.addColumnLineage(columnLineage);
                }
            }
        }

        result.setSourceTables(sourceTables);
        return result;
    }
}

Spark Hook 采集

class SparkLineageListener extends SparkListener {
  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    val queryExecution = SparkSession.active.sharedState.cacheManager
    queryExecution.cachedData.foreach { cachedData =>
      val lineage = extractLineage(cachedData.plan)
      LineageReporter.report(lineage)
    }
  }

  private def extractLineage(plan: LogicalPlan): LineageResult = {
    val sources = plan.collect {
      case r: LogicalRelation => r.catalogTable.map(_.identifier.toString)
      case h: HiveTableRelation => Some(h.tableMeta.identifier.toString)
    }.flatten

    val target = plan match {
      case i: InsertIntoHadoopFsRelationCommand =>
        i.catalogTable.map(_.identifier.toString)
      case _ => None
    }

    LineageResult(sources, target)
  }
}

3.3 血缘存储(Neo4j)

// 创建表节点
CREATE (t:Table {
  id: $id,
  name: $name,
  database: $database,
  qualifiedName: $qualifiedName,
  createTime: datetime()
})

// 创建列节点
CREATE (c:Column {
  id: $id,
  name: $name,
  table: $table,
  dataType: $dataType,
  qualifiedName: $qualifiedName
})

// 建立表级血缘
MATCH (source:Table {qualifiedName: $sourceTable})
MATCH (target:Table {qualifiedName: $targetTable})
CREATE (source)-[:LINEAGE_TO {
  processId: $processId,
  processType: $processType,
  createTime: datetime()
}]->(target)

// 建立列级血缘
MATCH (sourceCol:Column {qualifiedName: $sourceColumn})
MATCH (targetCol:Column {qualifiedName: $targetColumn})
CREATE (sourceCol)-[:DERIVED_FROM {
  expression: $expression,
  processId: $processId
}]->(targetCol)

3.4 血缘查询

public interface LineageService {
    /**
     * 获取上游血缘
     */
    LineageGraph getUpstreamLineage(String nodeId, int depth);

    /**
     * 获取下游血缘
     */
    LineageGraph getDownstreamLineage(String nodeId, int depth);

    /**
     * 获取完整血缘图
     */
    LineageGraph getFullLineage(String nodeId, int upstreamDepth, int downstreamDepth);

    /**
     * 影响分析:查找所有受影响的下游
     */
    List<LineageNode> impactAnalysis(String nodeId);

    /**
     * 根因分析:追溯数据来源
     */
    List<LineagePath> rootCauseAnalysis(String nodeId);
}

@Service
public class LineageServiceImpl implements LineageService {
    private final Neo4jTemplate neo4jTemplate;

    @Override
    public LineageGraph getUpstreamLineage(String nodeId, int depth) {
        String cypher = """
            MATCH path = (target {id: $nodeId})<-[:LINEAGE_TO|DERIVED_FROM*1..%d]-(source)
            RETURN path
            """.formatted(depth);

        return neo4jTemplate.query(cypher, Map.of("nodeId", nodeId))
            .stream()
            .collect(LineageGraphCollector.toGraph());
    }
}

3.5 血缘可视化

interface LineageGraphProps {
  nodeId: string;
  direction: 'upstream' | 'downstream' | 'both';
  depth: number;
}

const LineageGraph: React.FC<LineageGraphProps> = ({ nodeId, direction, depth }) => {
  const [graph, setGraph] = useState<LineageData>();
  const graphRef = useRef<HTMLDivElement>(null);

  useEffect(() => {
    loadLineage();
  }, [nodeId, direction, depth]);

  useEffect(() => {
    if (graph && graphRef.current) {
      renderGraph();
    }
  }, [graph]);

  const renderGraph = () => {
    const g6Graph = new G6.Graph({
      container: graphRef.current!,
      width: 1200,
      height: 800,
      layout: {
        type: 'dagre',
        rankdir: 'LR',
        nodesep: 50,
        ranksep: 100,
      },
      defaultNode: {
        type: 'lineage-node',
        size: [180, 60],
      },
      defaultEdge: {
        type: 'polyline',
        style: {
          stroke: '#91d5ff',
          lineWidth: 2,
          endArrow: true,
        },
      },
      modes: {
        default: ['drag-canvas', 'zoom-canvas', 'drag-node'],
      },
    });

    // 注册自定义节点
    G6.registerNode('lineage-node', {
      draw(cfg, group) {
        const { name, type, highlight } = cfg;
        const rect = group.addShape('rect', {
          attrs: {
            width: 180,
            height: 60,
            fill: highlight ? '#e6f7ff' : '#fff',
            stroke: getTypeColor(type),
            radius: 4,
          },
        });

        group.addShape('image', {
          attrs: {
            x: 10,
            y: 15,
            width: 30,
            height: 30,
            img: getTypeIcon(type),
          },
        });

        group.addShape('text', {
          attrs: {
            x: 50,
            y: 35,
            text: name,
            fontSize: 14,
            fill: '#333',
          },
        });

        return rect;
      },
    });

    g6Graph.data(transformToG6Data(graph));
    g6Graph.render();
  };

  return (
    <div className="lineage-container">
      <div className="toolbar">
        <Select value={direction} onChange={setDirection}>
          <Option value="upstream">上游血缘</Option>
          <Option value="downstream">下游血缘</Option>
          <Option value="both">完整血缘</Option>
        </Select>
        <InputNumber
          min={1}
          max={10}
          value={depth}
          onChange={setDepth}
          addonBefore="深度"
        />
      </div>
      <div ref={graphRef} className="graph-canvas" />
    </div>
  );
};

4. 数据字典

4.1 数据模型

/**
 * 数据资产 - 表
 */
public class TableAsset {
    private Long id;
    private String database;
    private String tableName;
    private String qualifiedName;
    private String tableType;         // MANAGED, EXTERNAL, VIEW
    private String description;
    private String owner;
    private List<ColumnAsset> columns;
    private Map<String, String> properties;
    private StorageInfo storageInfo;
    private Statistics statistics;
    private LocalDateTime createTime;
    private LocalDateTime lastAccessTime;
}

/**
 * 列资产
 */
public class ColumnAsset {
    private Long id;
    private Long tableId;
    private String columnName;
    private String dataType;
    private String description;
    private Boolean isPrimaryKey;
    private Boolean isPartitionKey;
    private Boolean isNullable;
    private String defaultValue;
    private ColumnStatistics statistics;
}

/**
 * 业务术语
 */
public class BusinessTerm {
    private Long id;
    private String name;
    private String definition;
    private String category;
    private List<String> synonyms;
    private List<Long> relatedAssetIds;
    private String owner;
    private String status;  // DRAFT, APPROVED, DEPRECATED
}

4.2 元数据同步

@Service
public class MetadataSyncService {
    private final HiveMetaStoreClient hiveClient;
    private final TableAssetRepository tableRepo;

    /**
     * 从 Hive MetaStore 同步元数据
     */
    @Scheduled(cron = "0 0 * * * ?")  // 每小时同步
    public void syncFromHive() {
        List<String> databases = hiveClient.getAllDatabases();

        for (String database : databases) {
            List<String> tables = hiveClient.getAllTables(database);

            for (String tableName : tables) {
                try {
                    Table hiveTable = hiveClient.getTable(database, tableName);
                    TableAsset asset = convertToAsset(hiveTable);
                    tableRepo.upsert(asset);
                } catch (Exception e) {
                    log.error("同步表元数据失败: {}.{}", database, tableName, e);
                }
            }
        }
    }

    private TableAsset convertToAsset(Table hiveTable) {
        TableAsset asset = new TableAsset();
        asset.setDatabase(hiveTable.getDbName());
        asset.setTableName(hiveTable.getTableName());
        asset.setQualifiedName(hiveTable.getDbName() + "." + hiveTable.getTableName());
        asset.setTableType(hiveTable.getTableType().name());
        asset.setDescription(hiveTable.getParameters().get("comment"));
        asset.setOwner(hiveTable.getOwner());

        // 同步列信息
        List<ColumnAsset> columns = hiveTable.getSd().getCols().stream()
            .map(this::convertToColumnAsset)
            .collect(Collectors.toList());
        asset.setColumns(columns);

        // 同步分区列
        List<ColumnAsset> partitionColumns = hiveTable.getPartitionKeys().stream()
            .map(col -> {
                ColumnAsset colAsset = convertToColumnAsset(col);
                colAsset.setIsPartitionKey(true);
                return colAsset;
            })
            .collect(Collectors.toList());
        columns.addAll(partitionColumns);

        // 存储信息
        StorageInfo storageInfo = new StorageInfo();
        storageInfo.setLocation(hiveTable.getSd().getLocation());
        storageInfo.setInputFormat(hiveTable.getSd().getInputFormat());
        storageInfo.setOutputFormat(hiveTable.getSd().getOutputFormat());
        asset.setStorageInfo(storageInfo);

        return asset;
    }
}

4.3 数据检索

@Service
public class MetadataSearchService {
    private final ElasticsearchClient esClient;

    /**
     * 全文检索数据资产
     */
    public SearchResult<TableAsset> search(SearchRequest request) {
        BoolQuery.Builder boolQuery = new BoolQuery.Builder();

        // 关键词搜索
        if (StringUtils.hasText(request.getKeyword())) {
            boolQuery.should(
                MultiMatchQuery.of(m -> m
                    .query(request.getKeyword())
                    .fields("tableName^3", "description^2", "columns.columnName", "columns.description")
                    .fuzziness("AUTO")
                )._toQuery()
            );
        }

        // 过滤条件
        if (StringUtils.hasText(request.getDatabase())) {
            boolQuery.filter(TermQuery.of(t -> t.field("database").value(request.getDatabase()))._toQuery());
        }

        if (request.getTags() != null && !request.getTags().isEmpty()) {
            boolQuery.filter(TermsQuery.of(t -> t.field("tags").terms(
                TermsQueryField.of(f -> f.value(request.getTags().stream()
                    .map(FieldValue::of)
                    .collect(Collectors.toList())))
            ))._toQuery());
        }

        co.elastic.clients.elasticsearch.core.SearchRequest esRequest =
            co.elastic.clients.elasticsearch.core.SearchRequest.of(s -> s
                .index("metadata_table")
                .query(boolQuery.build()._toQuery())
                .from(request.getPage() * request.getSize())
                .size(request.getSize())
                .highlight(h -> h
                    .fields("tableName", f -> f)
                    .fields("description", f -> f)
                )
            );

        SearchResponse<TableAsset> response = esClient.search(esRequest, TableAsset.class);

        return SearchResult.<TableAsset>builder()
            .total(response.hits().total().value())
            .items(response.hits().hits().stream()
                .map(Hit::source)
                .collect(Collectors.toList()))
            .build();
    }
}

5. 数据质量

5.1 质量规则

public class QualityRule {
    private Long id;
    private String name;
    private String description;
    private RuleType type;
    private String targetTable;
    private String targetColumn;
    private String expression;
    private Map<String, Object> params;
    private QualityLevel level;       // CRITICAL, WARNING, INFO
    private Boolean enabled;
}

public enum RuleType {
    NOT_NULL,           // 非空检查
    UNIQUE,             // 唯一性检查
    RANGE,              // 范围检查
    REGEX,              // 正则匹配
    ENUM,               // 枚举值检查
    FOREIGN_KEY,        // 外键检查
    CUSTOM_SQL,         // 自定义 SQL
    FRESHNESS,          // 数据新鲜度
    VOLUME,             // 数据量检查
    SCHEMA_CHANGE       // Schema 变更检查
}

5.2 质量检测执行

@Service
public class QualityCheckExecutor {

    public QualityCheckResult execute(QualityRule rule) {
        return switch (rule.getType()) {
            case NOT_NULL -> checkNotNull(rule);
            case UNIQUE -> checkUnique(rule);
            case RANGE -> checkRange(rule);
            case CUSTOM_SQL -> checkCustomSql(rule);
            case FRESHNESS -> checkFreshness(rule);
            case VOLUME -> checkVolume(rule);
            default -> throw new UnsupportedOperationException();
        };
    }

    private QualityCheckResult checkNotNull(QualityRule rule) {
        String sql = String.format("""
            SELECT
                COUNT(*) as total,
                COUNT(%s) as non_null_count,
                COUNT(*) - COUNT(%s) as null_count
            FROM %s
            """,
            rule.getTargetColumn(),
            rule.getTargetColumn(),
            rule.getTargetTable()
        );

        Map<String, Object> result = jdbcTemplate.queryForMap(sql);
        long nullCount = ((Number) result.get("null_count")).longValue();

        return QualityCheckResult.builder()
            .ruleId(rule.getId())
            .passed(nullCount == 0)
            .actualValue(nullCount)
            .expectedValue(0L)
            .message(nullCount == 0 ? "通过" : String.format("发现 %d 条空值", nullCount))
            .build();
    }

    private QualityCheckResult checkFreshness(QualityRule rule) {
        String timeColumn = (String) rule.getParams().get("timeColumn");
        int maxDelayMinutes = (Integer) rule.getParams().get("maxDelayMinutes");

        String sql = String.format("""
            SELECT MAX(%s) as latest_time
            FROM %s
            """, timeColumn, rule.getTargetTable());

        LocalDateTime latestTime = jdbcTemplate.queryForObject(sql, LocalDateTime.class);
        long delayMinutes = ChronoUnit.MINUTES.between(latestTime, LocalDateTime.now());

        return QualityCheckResult.builder()
            .ruleId(rule.getId())
            .passed(delayMinutes <= maxDelayMinutes)
            .actualValue(delayMinutes)
            .expectedValue((long) maxDelayMinutes)
            .message(delayMinutes <= maxDelayMinutes ?
                "数据新鲜度正常" :
                String.format("数据延迟 %d 分钟,超过阈值 %d 分钟", delayMinutes, maxDelayMinutes))
            .build();
    }
}

5.3 质量报告

public class QualityReport {
    private Long tableId;
    private String tableName;
    private LocalDateTime checkTime;
    private QualityScore score;
    private List<DimensionScore> dimensionScores;
    private List<QualityCheckResult> details;
}

public class QualityScore {
    private Double overallScore;      // 0-100
    private Integer totalRules;
    private Integer passedRules;
    private Integer failedRules;
}

public class DimensionScore {
    private String dimension;         // 完整性、准确性、一致性、及时性
    private Double score;
    private Integer ruleCount;
    private Integer passedCount;
}

6. 数据库设计

-- 表资产
CREATE TABLE t_table_asset (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    database_name VARCHAR(255) NOT NULL,
    table_name VARCHAR(255) NOT NULL,
    qualified_name VARCHAR(500) NOT NULL UNIQUE,
    table_type VARCHAR(50),
    description TEXT,
    owner VARCHAR(100),
    properties JSON,
    storage_info JSON,
    statistics JSON,
    create_time DATETIME,
    last_access_time DATETIME,
    sync_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_database (database_name),
    INDEX idx_qualified_name (qualified_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 列资产
CREATE TABLE t_column_asset (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    table_id BIGINT NOT NULL,
    column_name VARCHAR(255) NOT NULL,
    data_type VARCHAR(100),
    description TEXT,
    is_primary_key BOOLEAN DEFAULT FALSE,
    is_partition_key BOOLEAN DEFAULT FALSE,
    is_nullable BOOLEAN DEFAULT TRUE,
    default_value VARCHAR(500),
    statistics JSON,
    FOREIGN KEY (table_id) REFERENCES t_table_asset(id),
    INDEX idx_table (table_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 业务术语
CREATE TABLE t_business_term (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    definition TEXT,
    category VARCHAR(100),
    synonyms JSON,
    owner VARCHAR(100),
    status VARCHAR(20) DEFAULT 'DRAFT',
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_name (name),
    INDEX idx_category (category)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 术语资产关联
CREATE TABLE t_term_asset_relation (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    term_id BIGINT NOT NULL,
    asset_type VARCHAR(50) NOT NULL,
    asset_id BIGINT NOT NULL,
    FOREIGN KEY (term_id) REFERENCES t_business_term(id),
    INDEX idx_term (term_id),
    INDEX idx_asset (asset_type, asset_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 数据标签
CREATE TABLE t_data_tag (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL,
    color VARCHAR(20),
    description TEXT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 资产标签关联
CREATE TABLE t_asset_tag_relation (
    asset_type VARCHAR(50) NOT NULL,
    asset_id BIGINT NOT NULL,
    tag_id BIGINT NOT NULL,
    PRIMARY KEY (asset_type, asset_id, tag_id),
    FOREIGN KEY (tag_id) REFERENCES t_data_tag(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 质量规则
CREATE TABLE t_quality_rule (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    rule_type VARCHAR(50) NOT NULL,
    target_table VARCHAR(255),
    target_column VARCHAR(255),
    expression TEXT,
    params JSON,
    level VARCHAR(20) DEFAULT 'WARNING',
    enabled BOOLEAN DEFAULT TRUE,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 质量检查结果
CREATE TABLE t_quality_check_result (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    rule_id BIGINT NOT NULL,
    check_time DATETIME NOT NULL,
    passed BOOLEAN,
    actual_value VARCHAR(255),
    expected_value VARCHAR(255),
    message TEXT,
    FOREIGN KEY (rule_id) REFERENCES t_quality_rule(id),
    INDEX idx_rule_time (rule_id, check_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

7. API 设计

# 血缘查询
GET /api/v1/lineage/{nodeId}
Query:
  direction: upstream | downstream | both
  depth: number

# 影响分析
GET /api/v1/lineage/{nodeId}/impact

# 搜索数据资产
GET /api/v1/metadata/search
Query:
  keyword: string
  database: string
  tags: string[]
  page: number
  size: number

# 获取表详情
GET /api/v1/metadata/tables/{tableId}

# 获取表的列信息
GET /api/v1/metadata/tables/{tableId}/columns

# 业务术语
POST /api/v1/metadata/terms
GET /api/v1/metadata/terms
PUT /api/v1/metadata/terms/{termId}

# 数据标签
POST /api/v1/metadata/tags
GET /api/v1/metadata/tags
POST /api/v1/metadata/assets/{assetType}/{assetId}/tags

# 质量规则
POST /api/v1/quality/rules
GET /api/v1/quality/rules
PUT /api/v1/quality/rules/{ruleId}

# 执行质量检查
POST /api/v1/quality/check
Request:
  tableId: number
  ruleIds: number[]

# 获取质量报告
GET /api/v1/quality/reports/{tableId}

8. 实现状态

8.1 已实现功能

功能 状态 说明
表/列元数据管理 ✅ 已完成 MySQL 存储,支持 CRUD
数据源同步 ✅ 已完成 支持从 MySQL/PostgreSQL 数据源同步元数据
SQL 血缘解析 ✅ 已完成 基于 JSqlParser,支持 INSERT/SELECT/UPDATE/MERGE
Neo4j 血缘存储 ✅ 已完成 图数据库存储表级血缘关系
血缘查询 API ✅ 已完成 上游/下游/完整血缘查询,支持深度控制
G6 血缘可视化 ✅ 已完成 DAG 布局,支持拖拽缩放
Elasticsearch 搜索 ✅ 已完成 全文搜索,带 MySQL LIKE 降级
血缘自动建表 ✅ 已完成 解析血缘时自动在 MySQL 创建表元数据

8.2 待实现功能

功能 优先级 说明
Hive MetaStore 同步 P1 定时从 Hive 同步元数据
列级血缘 P1 解析 SQL 提取列级血缘
Spark/Flink Hook P2 运行时采集血缘
数据质量规则 P2 质量检测引擎
业务术语管理 P3 业务元数据管理
数据标签 P3 标签分类体系

8.3 核心 API

血缘解析

POST /api/lineage/parse
Content-Type: application/json

{
  "sql": "INSERT INTO dwd.user_orders SELECT * FROM ods.orders o JOIN ods.users u ON o.user_id = u.id",
  "taskType": "ETL",
  "taskName": "sync_user_orders"
}

# 响应
{
  "code": 0,
  "data": {
    "sourceTables": ["ods.orders", "ods.users"],
    "targetTables": ["dwd.user_orders"],
    "columnLineages": []
  }
}

血缘查询

# 上游血缘
GET /api/lineage/upstream/{qualifiedName}?depth=3

# 下游血缘
GET /api/lineage/downstream/{qualifiedName}?depth=3

# 完整血缘
GET /api/lineage/full/{qualifiedName}?upstreamDepth=3&downstreamDepth=3

# 响应格式(G6 兼容)
{
  "code": 0,
  "data": {
    "nodes": [
      {"id": "ods.orders", "label": "orders", "database": "ods", "isCenter": false},
      {"id": "dwd.user_orders", "label": "user_orders", "database": "dwd", "isCenter": true}
    ],
    "edges": [
      {"source": "ods.orders", "target": "dwd.user_orders", "taskType": "ETL"}
    ]
  }
}

元数据搜索

# 按关键词搜索(支持表名、库名、描述)
GET /api/v1/metadata/tables?keyword=user

# 全文搜索(ES/MySQL 降级)
GET /api/v1/metadata/search?keyword=订单&database=dwd&page=0&size=20

8.4 配置说明

# application.yml
spring:
  neo4j:
    uri: bolt://localhost:7687
    authentication:
      username: neo4j
      password: datajump123

elasticsearch:
  host: localhost
  port: 9200
  enabled: true  # 设为 false 时降级到 MySQL LIKE 查询

8.5 依赖服务

服务 端口 用途 必需
MySQL 3306 表/列元数据存储
Neo4j 7687 血缘图存储
Elasticsearch 9200 全文搜索 否(有 MySQL 降级)