Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,35 @@

package org.apache.paimon.flink;

import org.apache.paimon.flink.lineage.LineageUtils;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.data.RowData;

import java.util.function.Function;

/** Paimon {@link DataStreamScanProvider}. */
public class PaimonDataStreamScanProvider implements DataStreamScanProvider {
public class PaimonDataStreamScanProvider implements DataStreamScanProvider, LineageVertexProvider {

private final boolean isBounded;
private final Function<StreamExecutionEnvironment, DataStream<RowData>> producer;
private final String name;
private final String namespace;

public PaimonDataStreamScanProvider(
boolean isBounded, Function<StreamExecutionEnvironment, DataStream<RowData>> producer) {
boolean isBounded,
Function<StreamExecutionEnvironment, DataStream<RowData>> producer,
String name,
String namespace) {
this.isBounded = isBounded;
this.producer = producer;
this.name = name;
this.namespace = namespace;
}

@Override
Expand All @@ -48,4 +59,9 @@ public DataStream<RowData> produceDataStream(
public boolean isBounded() {
return isBounded;
}

@Override
public LineageVertex getLineageVertex() {
return LineageUtils.sourceLineageVertex(name, namespace, isBounded);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,42 @@

package org.apache.paimon.flink;

import org.apache.paimon.flink.lineage.LineageUtils;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.data.RowData;

import java.util.function.Function;

/** Paimon {@link DataStreamSinkProvider}. */
public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider {
public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider, LineageVertexProvider {

private final Function<DataStream<RowData>, DataStreamSink<?>> producer;
private final String name;
private final String namespace;

public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, DataStreamSink<?>> producer) {
public PaimonDataStreamSinkProvider(
Function<DataStream<RowData>, DataStreamSink<?>> producer,
String name,
String namespace) {
this.producer = producer;
this.name = name;
this.namespace = namespace;
}

@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return producer.apply(dataStream);
}

@Override
public LineageVertex getLineageVertex() {
return LineageUtils.sinkLineageVertex(name, namespace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
import org.apache.paimon.table.FormatTable;

Expand Down Expand Up @@ -60,7 +61,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return new PaimonDataStreamSinkProvider(
(dataStream) ->
new FlinkFormatTableDataStreamSink(table, overwrite, staticPartitions)
.sinkFrom(dataStream));
.sinkFrom(dataStream),
tableIdentifier.asSummaryString(),
CoreOptions.path(table.options()).toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,18 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
throw new UnsupportedOperationException(
"Paimon doesn't support streaming INSERT OVERWRITE.");
}
String name = tableIdentifier.asSummaryString();
String namespace = CoreOptions.path(table.options()).toString();

if (table instanceof FormatTable) {
FormatTable formatTable = (FormatTable) table;
return new PaimonDataStreamSinkProvider(
(dataStream) ->
new FlinkFormatTableDataStreamSink(
formatTable, overwrite, staticPartitions)
.sinkFrom(dataStream));
.sinkFrom(dataStream),
name,
namespace);
}

Options conf = Options.fromMap(table.options());
Expand All @@ -134,7 +139,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
}
conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism);
return builder.build();
});
},
name,
namespace);
}

protected FlinkSinkBuilder createSinkBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
sourceBuilder
.sourceParallelism(inferSourceParallelism(env))
.env(env)
.build());
.build(),
tableIdentifier.asSummaryString(),
CoreOptions.path(table.options()).toString());
}

private ScanRuntimeProvider createCountStarScan() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
dataStreamSource.setParallelism(parallelism);
}
return dataStreamSource;
});
},
tableIdentifier.asSummaryString(),
CoreOptions.path(table.options()).toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lineage;

import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;

/**
* Stub implementation of lineage utilities for Flink 1.x. Even though {@code LineageVertexProvider}
* exists in Flink 1.x, Flink's lineage graph construction only became functional in Flink 2.0+, so
* these methods return {@code null}.
*/
public class LineageUtils {

public static SourceLineageVertex sourceLineageVertex(
String name, String namespace, boolean isBounded) {
return null;
}

public static LineageVertex sinkLineageVertex(String name, String namespace) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lineage;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;

import java.util.Collections;

/**
* Lineage utilities for Flink 2.0+. Builds {@link SourceLineageVertex} and {@link LineageVertex}
* from a Paimon table name and its physical warehouse path (namespace).
*/
public class LineageUtils {

/**
* Creates a {@link SourceLineageVertex} for a Paimon source table.
*
* @param name fully qualified table name, e.g. {@code "paimon.mydb.mytable"}
* @param namespace physical warehouse path, e.g. {@code
* "s3://my-bucket/warehouse/mydb/mytable"}
* @param isBounded whether the source is bounded (batch) or unbounded (streaming)
*/
public static SourceLineageVertex sourceLineageVertex(
String name, String namespace, boolean isBounded) {
LineageDataset dataset = new PaimonLineageDataset(name, namespace);
Boundedness boundedness =
isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
return new PaimonSourceLineageVertex(boundedness, Collections.singletonList(dataset));
}

/**
* Creates a {@link LineageVertex} for a Paimon sink table.
*
* @param name fully qualified table name, e.g. {@code "paimon.mydb.mytable"}
* @param namespace physical warehouse path, e.g. {@code
* "s3://my-bucket/warehouse/mydb/mytable"}
*/
public static LineageVertex sinkLineageVertex(String name, String namespace) {
LineageDataset dataset = new PaimonLineageDataset(name, namespace);
return new PaimonSinkLineageVertex(Collections.singletonList(dataset));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lineage;

import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

import java.util.Collections;
import java.util.Map;

/**
* A {@link LineageDataset} representing a Paimon table, identified by its fully qualified name and
* physical warehouse path as the namespace.
*/
public class PaimonLineageDataset implements LineageDataset {

private final String name;
private final String namespace;

public PaimonLineageDataset(String name, String namespace) {
this.name = name;
this.namespace = namespace;
}

@Override
public String name() {
return name;
}

@Override
public String namespace() {
return namespace;
}

@Override
public Map<String, LineageDatasetFacet> facets() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lineage;

import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;

import java.util.List;

/** A {@link LineageVertex} representing a Paimon sink table. */
public class PaimonSinkLineageVertex implements LineageVertex {

private final List<LineageDataset> datasets;

public PaimonSinkLineageVertex(List<LineageDataset> datasets) {
this.datasets = datasets;
}

@Override
public List<LineageDataset> datasets() {
return datasets;
}
}
Loading
Loading