Skip to content

Latest commit

 

History

History
551 lines (470 loc) · 19.3 KB

File metadata and controls

551 lines (470 loc) · 19.3 KB

数据集成模块技术方案

1. 概述

数据集成模块负责实现异构数据源之间的数据同步,支持批量同步和实时同步两种模式。采用 DataX + Flink CDC 的统一架构,提供可视化的同步任务配置和监控能力。

2. 架构设计

2.1 整体架构

┌─────────────────────────────────────────────────────────────────┐
│                    数据集成服务 (Integration Service)             │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │  数据源管理  │  │  同步任务管理 │  │      任务监控          │  │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
├─────────────────────────────────────────────────────────────────┤
│                       执行引擎层                                 │
│  ┌───────────────────────┐  ┌───────────────────────────────┐  │
│  │    批量同步引擎        │  │       实时同步引擎             │  │
│  │   (DataX Engine)      │  │      (Flink CDC Engine)       │  │
│  └───────────────────────┘  └───────────────────────────────┘  │
├─────────────────────────────────────────────────────────────────┤
│                       连接器层 (Connector)                       │
├──────────┬──────────┬──────────┬──────────┬────────────────────┤
│  MySQL   │  Oracle  │  Kafka   │   HDFS   │   Hive/Iceberg    │
│  PG      │  SQLServer│ MQ      │   S3     │   ClickHouse      │
└──────────┴──────────┴──────────┴──────────┴────────────────────┘

2.2 核心组件

组件 职责
数据源管理 管理数据源连接信息,支持连接测试、Schema 探测
同步任务管理 配置同步任务,包括字段映射、转换规则、调度配置
批量同步引擎 基于 DataX 实现全量/增量批量同步
实时同步引擎 基于 Flink CDC 实现实时数据同步
任务监控 监控同步进度、数据量、延迟等指标

3. 数据源管理

3.1 支持的数据源

分类 数据源 读取 写入 实时
关系型数据库 MySQL
PostgreSQL
Oracle
SQL Server
消息队列 Kafka
RocketMQ
文件系统 HDFS -
S3/OSS -
FTP/SFTP -
数据仓库 Hive -
Iceberg
ClickHouse -
StarRocks -
NoSQL MongoDB
Elasticsearch -
Redis -

3.2 数据源配置

public class DataSource {
    private Long id;
    private String name;
    private DataSourceType type;
    private String description;
    private ConnectionConfig connectionConfig;
    private SecurityConfig securityConfig;
    private Long projectId;
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;
}

public class ConnectionConfig {
    private String host;
    private Integer port;
    private String database;
    private String username;
    private String password;  // 加密存储
    private Map<String, String> properties;
}

public class SecurityConfig {
    private Boolean sslEnabled;
    private String sslCertPath;
    private String kerberosKeytab;
    private String kerberosPrincipal;
}

3.3 连接测试与 Schema 探测

public interface DataSourceConnector {
    /**
     * 测试连接
     */
    ConnectionTestResult testConnection(ConnectionConfig config);

    /**
     * 获取 Schema 列表
     */
    List<String> getSchemas();

    /**
     * 获取表列表
     */
    List<TableInfo> getTables(String schema);

    /**
     * 获取表结构
     */
    TableSchema getTableSchema(String schema, String table);

    /**
     * 预览数据
     */
    DataPreview previewData(String schema, String table, int limit);
}

4. 批量同步(DataX)

4.1 同步任务配置

public class BatchSyncJob {
    private Long id;
    private String name;
    private Long sourceId;        // 源数据源
    private Long targetId;        // 目标数据源
    private SyncMode syncMode;    // FULL, INCREMENT
    private ReaderConfig reader;
    private WriterConfig writer;
    private TransformConfig transform;
    private ScheduleConfig schedule;
}

public class ReaderConfig {
    private String schema;
    private String table;
    private List<String> columns;
    private String splitKey;          // 分片键
    private String incrementColumn;   // 增量字段
    private String whereClause;       // 过滤条件
}

public class WriterConfig {
    private String schema;
    private String table;
    private List<ColumnMapping> columnMappings;
    private WriteMode writeMode;      // INSERT, UPSERT, REPLACE
    private Integer batchSize;
    private List<String> preSql;
    private List<String> postSql;
}

4.2 DataX Job 生成(已实现)

基于 DataXJobGenerator 组件,根据任务配置动态生成 DataX JSON:

@Component
public class DataXJobGenerator {
    // 支持 MySQL/PostgreSQL/Hive 三类读写插件
    // 自动根据 Datasource 的 type 选择 reader/writer 插件名称
    // 支持通过 whereClause 参数注入增量过滤条件
    public String generate(BatchSyncJob job, Datasource source, Datasource target, String whereClause);
}

4.3 Docker 容器执行引擎(已实现)

通过 DockerExecutor 管理 DataX 容器生命周期:

@Component
public class DockerExecutor {
    // 创建 DataX 容器,挂载 job.json,执行后收集日志
    public ExecutionResult execute(Long executionId, String jobJsonPath);
    // 停止运行中的容器
    public void stop(Long executionId);
}

执行流程:

  1. BatchSyncJobService.trigger() 创建 SyncExecution 记录(PENDING)
  2. 异步调用 executeAsync(),更新状态为 RUNNING
  3. DataXJobGenerator 生成 job.json
  4. DockerExecutor 启动 DataX 容器执行
  5. DataXOutputParser 解析日志获取统计指标
  6. 更新执行记录(SUCCESS/FAILED),若增量模式则更新位点

4.4 DataX 日志解析(已实现)

DataXOutputParser 通过正则表达式从 DataX 输出中提取:

  • 读出记录总数、读写失败总数
  • 读出记录速度、读出/写入字节数
  • 错误原因(DataX 智能分析结果)

4.5 增量同步策略(已实现)

策略 说明 适用场景 实现状态
时间戳增量 基于 update_time 字段过滤 有更新时间字段的表 ✅ 已实现
自增 ID 增量 基于自增主键过滤 只追加不更新的表 ✅ 已实现
全量覆盖 每次全量同步 数据量小、无增量字段 ✅ 已实现
分区增量 按分区同步 Hive 分区表 ⏳ 待实现

增量同步通过 SyncCheckpoint 表记录位点,每次执行时构建 WHERE 子句:

// BatchSyncJobService.buildIncrementWhere()
// readerConfig 中配置 incrementColumn 和 incrementType (TIMESTAMP / ID)
// 首次执行无位点时执行全量同步
// 成功后自动更新位点为当前时间戳

5. 实时同步(Flink CDC)

5.1 CDC 架构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   MySQL     │────▶│ Flink CDC   │────▶│   Kafka     │
│  Binlog     │     │   Source    │     │             │
└─────────────┘     └──────┬──────┘     └──────┬──────┘
                          │                    │
                          ▼                    ▼
                   ┌─────────────┐     ┌─────────────┐
                   │   Flink     │     │   Flink     │
                   │  Transform  │     │   Sink      │
                   └──────┬──────┘     └──────┬──────┘
                          │                    │
                          ▼                    ▼
                   ┌─────────────┐     ┌─────────────┐
                   │  Iceberg    │     │ ClickHouse  │
                   │   Table     │     │   Table     │
                   └─────────────┘     └─────────────┘

5.2 实时同步任务配置

public class RealtimeSyncJob {
    private Long id;
    private String name;
    private Long sourceId;
    private Long targetId;
    private CDCConfig cdcConfig;
    private List<TableMapping> tableMappings;
    private TransformConfig transform;
    private FlinkConfig flinkConfig;
}

public class CDCConfig {
    private String serverId;           // MySQL server-id
    private StartupMode startupMode;   // INITIAL, LATEST, TIMESTAMP
    private String startupTimestamp;
    private List<String> includeTables;
    private List<String> excludeTables;
}

public class FlinkConfig {
    private Integer parallelism;
    private String checkpointInterval;
    private String checkpointMode;     // EXACTLY_ONCE, AT_LEAST_ONCE
    private Map<String, String> flinkProperties;
}

5.3 Flink SQL 生成

public class FlinkSqlGenerator {
    public String generateCDCSource(RealtimeSyncJob job, TableMapping mapping) {
        return String.format("""
            CREATE TABLE source_%s (
                %s,
                PRIMARY KEY (%s) NOT ENFORCED
            ) WITH (
                'connector' = 'mysql-cdc',
                'hostname' = '%s',
                'port' = '%d',
                'username' = '%s',
                'password' = '%s',
                'database-name' = '%s',
                'table-name' = '%s',
                'server-id' = '%s',
                'scan.startup.mode' = '%s'
            )
            """,
            mapping.getSourceTable(),
            generateColumns(mapping.getSourceColumns()),
            mapping.getPrimaryKeys(),
            job.getSourceConfig().getHost(),
            job.getSourceConfig().getPort(),
            job.getSourceConfig().getUsername(),
            job.getSourceConfig().getPassword(),
            mapping.getSourceSchema(),
            mapping.getSourceTable(),
            job.getCdcConfig().getServerId(),
            job.getCdcConfig().getStartupMode().getValue()
        );
    }

    public String generateSink(RealtimeSyncJob job, TableMapping mapping) {
        // 根据目标类型生成不同的 Sink DDL
        return switch (job.getTargetType()) {
            case KAFKA -> generateKafkaSink(job, mapping);
            case ICEBERG -> generateIcebergSink(job, mapping);
            case CLICKHOUSE -> generateClickHouseSink(job, mapping);
            default -> throw new UnsupportedOperationException();
        };
    }
}

5.4 数据转换

public class TransformConfig {
    private List<FieldTransform> fieldTransforms;
    private String filterExpression;
    private List<ComputedColumn> computedColumns;
}

public class FieldTransform {
    private String sourceField;
    private String targetField;
    private TransformType type;
    private Map<String, Object> params;
}

public enum TransformType {
    RENAME,           // 字段重命名
    TYPE_CAST,        // 类型转换
    MASK,             // 数据脱敏
    ENCRYPT,          // 加密
    EXPRESSION,       // 表达式计算
    LOOKUP,           // 维表关联
    SPLIT,            // 字段拆分
    CONCAT            // 字段合并
}

6. 数据库设计

-- 数据源表
CREATE TABLE t_datasource (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    type VARCHAR(50) NOT NULL,
    description TEXT,
    connection_config JSON NOT NULL,
    security_config JSON,
    project_id BIGINT NOT NULL,
    status TINYINT DEFAULT 1,
    created_by BIGINT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_project (project_id),
    INDEX idx_type (type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 批量同步任务表
CREATE TABLE t_batch_sync_job (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    source_id BIGINT NOT NULL,
    target_id BIGINT NOT NULL,
    sync_mode VARCHAR(20) NOT NULL,
    reader_config JSON NOT NULL,
    writer_config JSON NOT NULL,
    transform_config JSON,
    schedule_config JSON,
    status TINYINT DEFAULT 0,
    project_id BIGINT NOT NULL,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    FOREIGN KEY (source_id) REFERENCES t_datasource(id),
    FOREIGN KEY (target_id) REFERENCES t_datasource(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 实时同步任务表
CREATE TABLE t_realtime_sync_job (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    source_id BIGINT NOT NULL,
    target_id BIGINT NOT NULL,
    cdc_config JSON NOT NULL,
    table_mappings JSON NOT NULL,
    transform_config JSON,
    flink_config JSON,
    flink_job_id VARCHAR(100),
    status TINYINT DEFAULT 0,
    project_id BIGINT NOT NULL,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 同步执行记录表(已实现)
CREATE TABLE t_sync_execution (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    job_id BIGINT NOT NULL,
    start_time DATETIME,
    end_time DATETIME,
    status TINYINT DEFAULT 0 COMMENT '0-PENDING 1-RUNNING 2-SUCCESS 3-FAILED 4-CANCELLED',
    total_records BIGINT DEFAULT 0,
    success_records BIGINT DEFAULT 0,
    failed_records BIGINT DEFAULT 0,
    bytes_read BIGINT DEFAULT 0,
    bytes_written BIGINT DEFAULT 0,
    speed_records BIGINT DEFAULT 0 COMMENT 'records/s',
    error_message TEXT,
    container_id VARCHAR(100),
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_job_time (job_id, start_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 同步位点表(已实现)
CREATE TABLE t_sync_checkpoint (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    job_id BIGINT NOT NULL,
    checkpoint_value VARCHAR(255) NOT NULL,
    sync_time DATETIME,
    record_count BIGINT DEFAULT 0,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    UNIQUE INDEX idx_job (job_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

7. API 设计

7.1 数据源管理(已实现)

# 创建数据源
POST /api/v1/datasources

# 获取数据源列表
GET /api/v1/datasources?projectId=1

# 测试连接
POST /api/v1/datasources/test-connection

# 获取表列表(Schema 探测)
GET /api/v1/datasources/{id}/tables

# 获取列信息
GET /api/v1/datasources/{id}/tables/{table}/columns

7.2 批量同步任务管理(已实现)

# CRUD
POST   /api/v1/batch-sync-jobs            # 创建
GET    /api/v1/batch-sync-jobs             # 列表
GET    /api/v1/batch-sync-jobs/{id}        # 详情
PUT    /api/v1/batch-sync-jobs/{id}        # 更新
DELETE /api/v1/batch-sync-jobs/{id}        # 删除

# 执行控制
POST /api/v1/batch-sync-jobs/{id}/trigger  # 触发执行
POST /api/v1/batch-sync-jobs/{id}/stop     # 停止执行

# 执行记录
GET /api/v1/batch-sync-jobs/{id}/executions          # 执行历史列表
GET /api/v1/batch-sync-jobs/{id}/executions/{execId} # 单次执行详情

# 同步位点
GET /api/v1/batch-sync-jobs/{id}/checkpoint  # 获取当前位点

# Schema 校验(已实现)
GET /api/v1/batch-sync-jobs/validate-schema?sourceId=1&targetId=2&sourceTable=t1&targetTable=t2

7.3 Schema 校验服务(已实现)

在创建同步任务前,自动校验源表与目标表的 Schema 兼容性,并提供智能建议。

校验逻辑

  1. 自动识别系统字段:自增主键(AUTO_INCREMENT)、有默认值的时间戳字段(created_at、updated_at)
  2. 字段名匹配:按字段名(大小写不敏感)自动映射源表和目标表字段
  3. 类型兼容性检查:检查字段类型是否兼容(如 VARCHAR→VARCHAR、INT→BIGINT)
  4. 生成推荐配置:自动生成可直接使用的 readerConfig 和 writerConfig

返回结果结构

{
  "compatible": true,
  "status": "OK",                    // OK / WARNING / ERROR
  "message": "可同步 10 个字段,已自动排除 2 个系统字段",
  "issues": [
    {
      "level": "INFO",
      "type": "AUTO_INCREMENT",
      "column": "id",
      "message": "目标表字段 [id] 为自增主键",
      "suggestion": "已自动排除,无需同步"
    }
  ],
  "suggestedReaderColumns": ["user_id", "username", "phone", ...],
  "suggestedWriterColumns": ["user_id", "username", "phone", ...],
  "autoMappedColumns": [
    {
      "sourceColumn": "user_id",
      "targetColumn": "user_id",
      "sourceType": "varchar(32)",
      "targetType": "varchar(32)",
      "typeCompatible": true
    }
  ],
  "excludedColumns": [
    { "column": "id", "reason": "AUTO_INCREMENT", "table": "TARGET" },
    { "column": "created_at", "reason": "HAS_DEFAULT", "table": "TARGET" }
  ]
}

前端集成

  • 选择源表和目标表后自动触发校验
  • 展示校验状态(通过/警告/错误)
  • 显示已排除的系统字段
  • 创建任务时自动应用推荐的字段配置

8. 监控指标

指标 说明 告警阈值
sync.records.total 同步总记录数 -
sync.records.success 成功记录数 -
sync.records.failed 失败记录数 > 0
sync.bytes.read 读取字节数 -
sync.bytes.written 写入字节数 -
sync.latency 同步延迟 > 5min
sync.throughput 吞吐量(records/s) < 1000
cdc.lag CDC 延迟 > 1min