From ec80a17c21176b753b68717f39740875ea179b88 Mon Sep 17 00:00:00 2001 From: jkukreja Date: Thu, 26 Feb 2026 19:37:06 -0500 Subject: [PATCH] [flink] Implement FLIP-314 LineageVertexProvider for source and sink connectors --- .../flink/PaimonDataStreamScanProvider.java | 20 +++++- .../flink/PaimonDataStreamSinkProvider.java | 20 +++++- .../flink/sink/FlinkFormatTableSink.java | 5 +- .../paimon/flink/sink/FlinkTableSinkBase.java | 11 +++- .../flink/source/BaseDataTableSource.java | 4 +- .../flink/source/SystemTableSource.java | 4 +- .../paimon/flink/lineage/LineageUtils.java | 39 ++++++++++++ .../paimon/flink/lineage/LineageUtils.java | 61 +++++++++++++++++++ .../flink/lineage/PaimonLineageDataset.java | 55 +++++++++++++++++ .../lineage/PaimonSinkLineageVertex.java | 39 ++++++++++++ .../lineage/PaimonSourceLineageVertex.java | 50 +++++++++++++++ 11 files changed, 299 insertions(+), 9 deletions(-) create mode 100644 paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java create mode 100644 paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java create mode 100644 paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java create mode 100644 paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java create mode 100644 paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java index af9df5b9338c..11951fd147cc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java @@ -18,8 +18,12 @@ package org.apache.paimon.flink; +import org.apache.paimon.flink.lineage.LineageUtils; + import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.data.RowData; @@ -27,15 +31,22 @@ import java.util.function.Function; /** Paimon {@link DataStreamScanProvider}. */ -public class PaimonDataStreamScanProvider implements DataStreamScanProvider { +public class PaimonDataStreamScanProvider implements DataStreamScanProvider, LineageVertexProvider { private final boolean isBounded; private final Function> producer; + private final String name; + private final String namespace; public PaimonDataStreamScanProvider( - boolean isBounded, Function> producer) { + boolean isBounded, + Function> producer, + String name, + String namespace) { this.isBounded = isBounded; this.producer = producer; + this.name = name; + this.namespace = namespace; } @Override @@ -48,4 +59,9 @@ public DataStream produceDataStream( public boolean isBounded() { return isBounded; } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sourceLineageVertex(name, namespace, isBounded); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java index 05eaacf5ab14..70a9f2136bd9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java @@ -18,8 +18,12 @@ package org.apache.paimon.flink; +import org.apache.paimon.flink.lineage.LineageUtils; + import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; import org.apache.flink.table.data.RowData; @@ -27,12 +31,19 @@ import java.util.function.Function; /** Paimon {@link DataStreamSinkProvider}. */ -public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider { +public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider, LineageVertexProvider { private final Function, DataStreamSink> producer; + private final String name; + private final String namespace; - public PaimonDataStreamSinkProvider(Function, DataStreamSink> producer) { + public PaimonDataStreamSinkProvider( + Function, DataStreamSink> producer, + String name, + String namespace) { this.producer = producer; + this.name = name; + this.namespace = namespace; } @Override @@ -40,4 +51,9 @@ public DataStreamSink consumeDataStream( ProviderContext providerContext, DataStream dataStream) { return producer.apply(dataStream); } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sinkLineageVertex(name, namespace); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java index 361323f016ff..e70ca6ee6ed9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.PaimonDataStreamSinkProvider; import org.apache.paimon.table.FormatTable; @@ -60,7 +61,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { return new PaimonDataStreamSinkProvider( (dataStream) -> new FlinkFormatTableDataStreamSink(table, overwrite, staticPartitions) - .sinkFrom(dataStream)); + .sinkFrom(dataStream), + tableIdentifier.asSummaryString(), + CoreOptions.path(table.options()).toString()); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index b7c37d525eff..c36fe33093c1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -104,13 +104,18 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { throw new UnsupportedOperationException( "Paimon doesn't support streaming INSERT OVERWRITE."); } + String name = tableIdentifier.asSummaryString(); + String namespace = CoreOptions.path(table.options()).toString(); + if (table instanceof FormatTable) { FormatTable formatTable = (FormatTable) table; return new PaimonDataStreamSinkProvider( (dataStream) -> new FlinkFormatTableDataStreamSink( formatTable, overwrite, staticPartitions) - .sinkFrom(dataStream)); + .sinkFrom(dataStream), + name, + namespace); } Options conf = Options.fromMap(table.options()); @@ -134,7 +139,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { } conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism); return builder.build(); - }); + }, + name, + namespace); } protected FlinkSinkBuilder createSinkBuilder() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index 8e1ecfc9556e..3f1cc8ef89c6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -202,7 +202,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { sourceBuilder .sourceParallelism(inferSourceParallelism(env)) .env(env) - .build()); + .build(), + tableIdentifier.asSummaryString(), + CoreOptions.path(table.options()).toString()); } private ScanRuntimeProvider createCountStarScan() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index e99f265d03b4..c50e6383f661 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -130,7 +130,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { dataStreamSource.setParallelism(parallelism); } return dataStreamSource; - }); + }, + tableIdentifier.asSummaryString(), + CoreOptions.path(table.options()).toString()); } @Override diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java new file mode 100644 index 000000000000..1d6989f970d3 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +/** + * Stub implementation of lineage utilities for Flink 1.x. Even though {@code LineageVertexProvider} + * exists in Flink 1.x, Flink's lineage graph construction only became functional in Flink 2.0+, so + * these methods return {@code null}. + */ +public class LineageUtils { + + public static SourceLineageVertex sourceLineageVertex( + String name, String namespace, boolean isBounded) { + return null; + } + + public static LineageVertex sinkLineageVertex(String name, String namespace) { + return null; + } +} diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java new file mode 100644 index 000000000000..dc1cc48405d9 --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.Collections; + +/** + * Lineage utilities for Flink 2.0+. Builds {@link SourceLineageVertex} and {@link LineageVertex} + * from a Paimon table name and its physical warehouse path (namespace). + */ +public class LineageUtils { + + /** + * Creates a {@link SourceLineageVertex} for a Paimon source table. + * + * @param name fully qualified table name, e.g. {@code "paimon.mydb.mytable"} + * @param namespace physical warehouse path, e.g. {@code + * "s3://my-bucket/warehouse/mydb/mytable"} + * @param isBounded whether the source is bounded (batch) or unbounded (streaming) + */ + public static SourceLineageVertex sourceLineageVertex( + String name, String namespace, boolean isBounded) { + LineageDataset dataset = new PaimonLineageDataset(name, namespace); + Boundedness boundedness = + isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED; + return new PaimonSourceLineageVertex(boundedness, Collections.singletonList(dataset)); + } + + /** + * Creates a {@link LineageVertex} for a Paimon sink table. + * + * @param name fully qualified table name, e.g. {@code "paimon.mydb.mytable"} + * @param namespace physical warehouse path, e.g. {@code + * "s3://my-bucket/warehouse/mydb/mytable"} + */ + public static LineageVertex sinkLineageVertex(String name, String namespace) { + LineageDataset dataset = new PaimonLineageDataset(name, namespace); + return new PaimonSinkLineageVertex(Collections.singletonList(dataset)); + } +} diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java new file mode 100644 index 000000000000..2813c273bb8f --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.Collections; +import java.util.Map; + +/** + * A {@link LineageDataset} representing a Paimon table, identified by its fully qualified name and + * physical warehouse path as the namespace. + */ +public class PaimonLineageDataset implements LineageDataset { + + private final String name; + private final String namespace; + + public PaimonLineageDataset(String name, String namespace) { + this.name = name; + this.namespace = namespace; + } + + @Override + public String name() { + return name; + } + + @Override + public String namespace() { + return namespace; + } + + @Override + public Map facets() { + return Collections.emptyMap(); + } +} diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java new file mode 100644 index 000000000000..40024da5e919 --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; + +import java.util.List; + +/** A {@link LineageVertex} representing a Paimon sink table. */ +public class PaimonSinkLineageVertex implements LineageVertex { + + private final List datasets; + + public PaimonSinkLineageVertex(List datasets) { + this.datasets = datasets; + } + + @Override + public List datasets() { + return datasets; + } +} diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java new file mode 100644 index 000000000000..cbacce2f8abb --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.List; + +/** + * A {@link SourceLineageVertex} representing a Paimon source table. Carries the {@link Boundedness} + * to indicate whether the source is bounded (batch) or unbounded (streaming). + */ +public class PaimonSourceLineageVertex implements SourceLineageVertex { + + private final Boundedness boundedness; + private final List datasets; + + public PaimonSourceLineageVertex(Boundedness boundedness, List datasets) { + this.boundedness = boundedness; + this.datasets = datasets; + } + + @Override + public Boundedness boundedness() { + return boundedness; + } + + @Override + public List datasets() { + return datasets; + } +}