数据集成模块负责实现异构数据源之间的数据同步,支持批量同步和实时同步两种模式。采用 DataX + Flink CDC 的统一架构,提供可视化的同步任务配置和监控能力。
┌─────────────────────────────────────────────────────────────────┐
│ 数据集成服务 (Integration Service) │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ 数据源管理 │ │ 同步任务管理 │ │ 任务监控 │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 执行引擎层 │
│ ┌───────────────────────┐ ┌───────────────────────────────┐ │
│ │ 批量同步引擎 │ │ 实时同步引擎 │ │
│ │ (DataX Engine) │ │ (Flink CDC Engine) │ │
│ └───────────────────────┘ └───────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 连接器层 (Connector) │
├──────────┬──────────┬──────────┬──────────┬────────────────────┤
│ MySQL │ Oracle │ Kafka │ HDFS │ Hive/Iceberg │
│ PG │ SQLServer│ MQ │ S3 │ ClickHouse │
└──────────┴──────────┴──────────┴──────────┴────────────────────┘
| 组件 | 职责 |
|---|---|
| 数据源管理 | 管理数据源连接信息,支持连接测试、Schema 探测 |
| 同步任务管理 | 配置同步任务,包括字段映射、转换规则、调度配置 |
| 批量同步引擎 | 基于 DataX 实现全量/增量批量同步 |
| 实时同步引擎 | 基于 Flink CDC 实现实时数据同步 |
| 任务监控 | 监控同步进度、数据量、延迟等指标 |
| 分类 | 数据源 | 读取 | 写入 | 实时 |
|---|---|---|---|---|
| 关系型数据库 | MySQL | ✅ | ✅ | ✅ |
| PostgreSQL | ✅ | ✅ | ✅ | |
| Oracle | ✅ | ✅ | ✅ | |
| SQL Server | ✅ | ✅ | ✅ | |
| 消息队列 | Kafka | ✅ | ✅ | ✅ |
| RocketMQ | ✅ | ✅ | ✅ | |
| 文件系统 | HDFS | ✅ | ✅ | - |
| S3/OSS | ✅ | ✅ | - | |
| FTP/SFTP | ✅ | ✅ | - | |
| 数据仓库 | Hive | ✅ | ✅ | - |
| Iceberg | ✅ | ✅ | ✅ | |
| ClickHouse | ✅ | ✅ | - | |
| StarRocks | ✅ | ✅ | - | |
| NoSQL | MongoDB | ✅ | ✅ | ✅ |
| Elasticsearch | ✅ | ✅ | - | |
| Redis | ✅ | ✅ | - |
public class DataSource {
private Long id;
private String name;
private DataSourceType type;
private String description;
private ConnectionConfig connectionConfig;
private SecurityConfig securityConfig;
private Long projectId;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}
public class ConnectionConfig {
private String host;
private Integer port;
private String database;
private String username;
private String password; // 加密存储
private Map<String, String> properties;
}
public class SecurityConfig {
private Boolean sslEnabled;
private String sslCertPath;
private String kerberosKeytab;
private String kerberosPrincipal;
}public interface DataSourceConnector {
/**
* 测试连接
*/
ConnectionTestResult testConnection(ConnectionConfig config);
/**
* 获取 Schema 列表
*/
List<String> getSchemas();
/**
* 获取表列表
*/
List<TableInfo> getTables(String schema);
/**
* 获取表结构
*/
TableSchema getTableSchema(String schema, String table);
/**
* 预览数据
*/
DataPreview previewData(String schema, String table, int limit);
}public class BatchSyncJob {
private Long id;
private String name;
private Long sourceId; // 源数据源
private Long targetId; // 目标数据源
private SyncMode syncMode; // FULL, INCREMENT
private ReaderConfig reader;
private WriterConfig writer;
private TransformConfig transform;
private ScheduleConfig schedule;
}
public class ReaderConfig {
private String schema;
private String table;
private List<String> columns;
private String splitKey; // 分片键
private String incrementColumn; // 增量字段
private String whereClause; // 过滤条件
}
public class WriterConfig {
private String schema;
private String table;
private List<ColumnMapping> columnMappings;
private WriteMode writeMode; // INSERT, UPSERT, REPLACE
private Integer batchSize;
private List<String> preSql;
private List<String> postSql;
}基于 DataXJobGenerator 组件,根据任务配置动态生成 DataX JSON:
@Component
public class DataXJobGenerator {
// 支持 MySQL/PostgreSQL/Hive 三类读写插件
// 自动根据 Datasource 的 type 选择 reader/writer 插件名称
// 支持通过 whereClause 参数注入增量过滤条件
public String generate(BatchSyncJob job, Datasource source, Datasource target, String whereClause);
}通过 DockerExecutor 管理 DataX 容器生命周期:
@Component
public class DockerExecutor {
// 创建 DataX 容器,挂载 job.json,执行后收集日志
public ExecutionResult execute(Long executionId, String jobJsonPath);
// 停止运行中的容器
public void stop(Long executionId);
}执行流程:
BatchSyncJobService.trigger()创建SyncExecution记录(PENDING)- 异步调用
executeAsync(),更新状态为 RUNNING DataXJobGenerator生成 job.jsonDockerExecutor启动 DataX 容器执行DataXOutputParser解析日志获取统计指标- 更新执行记录(SUCCESS/FAILED),若增量模式则更新位点
DataXOutputParser 通过正则表达式从 DataX 输出中提取:
- 读出记录总数、读写失败总数
- 读出记录速度、读出/写入字节数
- 错误原因(DataX 智能分析结果)
| 策略 | 说明 | 适用场景 | 实现状态 |
|---|---|---|---|
| 时间戳增量 | 基于 update_time 字段过滤 | 有更新时间字段的表 | ✅ 已实现 |
| 自增 ID 增量 | 基于自增主键过滤 | 只追加不更新的表 | ✅ 已实现 |
| 全量覆盖 | 每次全量同步 | 数据量小、无增量字段 | ✅ 已实现 |
| 分区增量 | 按分区同步 | Hive 分区表 | ⏳ 待实现 |
增量同步通过 SyncCheckpoint 表记录位点,每次执行时构建 WHERE 子句:
// BatchSyncJobService.buildIncrementWhere()
// readerConfig 中配置 incrementColumn 和 incrementType (TIMESTAMP / ID)
// 首次执行无位点时执行全量同步
// 成功后自动更新位点为当前时间戳┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ MySQL │────▶│ Flink CDC │────▶│ Kafka │
│ Binlog │ │ Source │ │ │
└─────────────┘ └──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Flink │ │ Flink │
│ Transform │ │ Sink │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Iceberg │ │ ClickHouse │
│ Table │ │ Table │
└─────────────┘ └─────────────┘
public class RealtimeSyncJob {
private Long id;
private String name;
private Long sourceId;
private Long targetId;
private CDCConfig cdcConfig;
private List<TableMapping> tableMappings;
private TransformConfig transform;
private FlinkConfig flinkConfig;
}
public class CDCConfig {
private String serverId; // MySQL server-id
private StartupMode startupMode; // INITIAL, LATEST, TIMESTAMP
private String startupTimestamp;
private List<String> includeTables;
private List<String> excludeTables;
}
public class FlinkConfig {
private Integer parallelism;
private String checkpointInterval;
private String checkpointMode; // EXACTLY_ONCE, AT_LEAST_ONCE
private Map<String, String> flinkProperties;
}public class FlinkSqlGenerator {
public String generateCDCSource(RealtimeSyncJob job, TableMapping mapping) {
return String.format("""
CREATE TABLE source_%s (
%s,
PRIMARY KEY (%s) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '%s',
'port' = '%d',
'username' = '%s',
'password' = '%s',
'database-name' = '%s',
'table-name' = '%s',
'server-id' = '%s',
'scan.startup.mode' = '%s'
)
""",
mapping.getSourceTable(),
generateColumns(mapping.getSourceColumns()),
mapping.getPrimaryKeys(),
job.getSourceConfig().getHost(),
job.getSourceConfig().getPort(),
job.getSourceConfig().getUsername(),
job.getSourceConfig().getPassword(),
mapping.getSourceSchema(),
mapping.getSourceTable(),
job.getCdcConfig().getServerId(),
job.getCdcConfig().getStartupMode().getValue()
);
}
public String generateSink(RealtimeSyncJob job, TableMapping mapping) {
// 根据目标类型生成不同的 Sink DDL
return switch (job.getTargetType()) {
case KAFKA -> generateKafkaSink(job, mapping);
case ICEBERG -> generateIcebergSink(job, mapping);
case CLICKHOUSE -> generateClickHouseSink(job, mapping);
default -> throw new UnsupportedOperationException();
};
}
}public class TransformConfig {
private List<FieldTransform> fieldTransforms;
private String filterExpression;
private List<ComputedColumn> computedColumns;
}
public class FieldTransform {
private String sourceField;
private String targetField;
private TransformType type;
private Map<String, Object> params;
}
public enum TransformType {
RENAME, // 字段重命名
TYPE_CAST, // 类型转换
MASK, // 数据脱敏
ENCRYPT, // 加密
EXPRESSION, // 表达式计算
LOOKUP, // 维表关联
SPLIT, // 字段拆分
CONCAT // 字段合并
}-- 数据源表
CREATE TABLE t_datasource (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
type VARCHAR(50) NOT NULL,
description TEXT,
connection_config JSON NOT NULL,
security_config JSON,
project_id BIGINT NOT NULL,
status TINYINT DEFAULT 1,
created_by BIGINT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_project (project_id),
INDEX idx_type (type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 批量同步任务表
CREATE TABLE t_batch_sync_job (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
source_id BIGINT NOT NULL,
target_id BIGINT NOT NULL,
sync_mode VARCHAR(20) NOT NULL,
reader_config JSON NOT NULL,
writer_config JSON NOT NULL,
transform_config JSON,
schedule_config JSON,
status TINYINT DEFAULT 0,
project_id BIGINT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (source_id) REFERENCES t_datasource(id),
FOREIGN KEY (target_id) REFERENCES t_datasource(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 实时同步任务表
CREATE TABLE t_realtime_sync_job (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
source_id BIGINT NOT NULL,
target_id BIGINT NOT NULL,
cdc_config JSON NOT NULL,
table_mappings JSON NOT NULL,
transform_config JSON,
flink_config JSON,
flink_job_id VARCHAR(100),
status TINYINT DEFAULT 0,
project_id BIGINT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 同步执行记录表(已实现)
CREATE TABLE t_sync_execution (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
job_id BIGINT NOT NULL,
start_time DATETIME,
end_time DATETIME,
status TINYINT DEFAULT 0 COMMENT '0-PENDING 1-RUNNING 2-SUCCESS 3-FAILED 4-CANCELLED',
total_records BIGINT DEFAULT 0,
success_records BIGINT DEFAULT 0,
failed_records BIGINT DEFAULT 0,
bytes_read BIGINT DEFAULT 0,
bytes_written BIGINT DEFAULT 0,
speed_records BIGINT DEFAULT 0 COMMENT 'records/s',
error_message TEXT,
container_id VARCHAR(100),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_job_time (job_id, start_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 同步位点表(已实现)
CREATE TABLE t_sync_checkpoint (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
job_id BIGINT NOT NULL,
checkpoint_value VARCHAR(255) NOT NULL,
sync_time DATETIME,
record_count BIGINT DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE INDEX idx_job (job_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;# 创建数据源
POST /api/v1/datasources
# 获取数据源列表
GET /api/v1/datasources?projectId=1
# 测试连接
POST /api/v1/datasources/test-connection
# 获取表列表(Schema 探测)
GET /api/v1/datasources/{id}/tables
# 获取列信息
GET /api/v1/datasources/{id}/tables/{table}/columns# CRUD
POST /api/v1/batch-sync-jobs # 创建
GET /api/v1/batch-sync-jobs # 列表
GET /api/v1/batch-sync-jobs/{id} # 详情
PUT /api/v1/batch-sync-jobs/{id} # 更新
DELETE /api/v1/batch-sync-jobs/{id} # 删除
# 执行控制
POST /api/v1/batch-sync-jobs/{id}/trigger # 触发执行
POST /api/v1/batch-sync-jobs/{id}/stop # 停止执行
# 执行记录
GET /api/v1/batch-sync-jobs/{id}/executions # 执行历史列表
GET /api/v1/batch-sync-jobs/{id}/executions/{execId} # 单次执行详情
# 同步位点
GET /api/v1/batch-sync-jobs/{id}/checkpoint # 获取当前位点
# Schema 校验(已实现)
GET /api/v1/batch-sync-jobs/validate-schema?sourceId=1&targetId=2&sourceTable=t1&targetTable=t2在创建同步任务前,自动校验源表与目标表的 Schema 兼容性,并提供智能建议。
- 自动识别系统字段:自增主键(AUTO_INCREMENT)、有默认值的时间戳字段(created_at、updated_at)
- 字段名匹配:按字段名(大小写不敏感)自动映射源表和目标表字段
- 类型兼容性检查:检查字段类型是否兼容(如 VARCHAR→VARCHAR、INT→BIGINT)
- 生成推荐配置:自动生成可直接使用的 readerConfig 和 writerConfig
{
"compatible": true,
"status": "OK", // OK / WARNING / ERROR
"message": "可同步 10 个字段,已自动排除 2 个系统字段",
"issues": [
{
"level": "INFO",
"type": "AUTO_INCREMENT",
"column": "id",
"message": "目标表字段 [id] 为自增主键",
"suggestion": "已自动排除,无需同步"
}
],
"suggestedReaderColumns": ["user_id", "username", "phone", ...],
"suggestedWriterColumns": ["user_id", "username", "phone", ...],
"autoMappedColumns": [
{
"sourceColumn": "user_id",
"targetColumn": "user_id",
"sourceType": "varchar(32)",
"targetType": "varchar(32)",
"typeCompatible": true
}
],
"excludedColumns": [
{ "column": "id", "reason": "AUTO_INCREMENT", "table": "TARGET" },
{ "column": "created_at", "reason": "HAS_DEFAULT", "table": "TARGET" }
]
}- 选择源表和目标表后自动触发校验
- 展示校验状态(通过/警告/错误)
- 显示已排除的系统字段
- 创建任务时自动应用推荐的字段配置
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| sync.records.total | 同步总记录数 | - |
| sync.records.success | 成功记录数 | - |
| sync.records.failed | 失败记录数 | > 0 |
| sync.bytes.read | 读取字节数 | - |
| sync.bytes.written | 写入字节数 | - |
| sync.latency | 同步延迟 | > 5min |
| sync.throughput | 吞吐量(records/s) | < 1000 |
| cdc.lag | CDC 延迟 | > 1min |