From 6e8cb819a355e5942b5529488564383f02863bd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Fri, 22 May 2026 17:17:12 +0800 Subject: [PATCH 1/2] allow specific plugin impl can use catalog config to overwrite default table properties --- .../apache/fluss/spark/read/FlussScanBuilder.scala | 12 +++++++----- .../fluss/spark/read/lake/FlussLakeAppendBatch.scala | 3 ++- .../read/lake/FlussLakePartitionReaderFactory.scala | 2 +- .../fluss/spark/read/lake/FlussLakeUpsertBatch.scala | 3 ++- .../fluss/spark/read/lake/FlussLakeUtils.scala | 12 ++++++++++-- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala index 9e1c0d4545..361542d769 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala @@ -93,6 +93,8 @@ trait FlussLakeSupportsPushDownV2Filters extends FlussSupportsPushDownV2Filters def tablePath: TablePath + def flussConfig: FlussConfiguration + override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { val pairs = SparkPredicateConverter.convertPerPredicate(tableInfo.getRowType, predicates.toSeq) @@ -100,7 +102,7 @@ trait FlussLakeSupportsPushDownV2Filters extends FlussSupportsPushDownV2Filters (Seq.empty[Predicate], Seq.empty[FlussPredicate]) } else { val lakeSource = - FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + FlussLakeUtils.createLakeSource(flussConfig.toMap, tableInfo.getProperties.toMap, tablePath) val result = FlussLakeBatch.applyLakeFilters(lakeSource, pairs.map(_._2).asJava) // Identity-match: lake sources are expected to return the same instances they received. val acceptedSet: JSet[FlussPredicate] = @@ -119,7 +121,7 @@ class FlussAppendScanBuilder( tablePath: TablePath, val tableInfo: TableInfo, options: CaseInsensitiveStringMap, - flussConfig: FlussConfiguration) + val flussConfig: FlussConfiguration) extends FlussSupportsPushDownV2Filters { override def build(): Scan = { @@ -140,7 +142,7 @@ class FlussLakeAppendScanBuilder( val tablePath: TablePath, val tableInfo: TableInfo, options: CaseInsensitiveStringMap, - flussConfig: FlussConfiguration) + val flussConfig: FlussConfiguration) extends FlussLakeSupportsPushDownV2Filters { override def build(): Scan = { @@ -160,7 +162,7 @@ class FlussUpsertScanBuilder( tablePath: TablePath, val tableInfo: TableInfo, options: CaseInsensitiveStringMap, - flussConfig: FlussConfiguration) + val flussConfig: FlussConfiguration) extends FlussSupportsPushDownPartitionFilters { override def build(): Scan = { @@ -173,7 +175,7 @@ class FlussLakeUpsertScanBuilder( val tablePath: TablePath, val tableInfo: TableInfo, options: CaseInsensitiveStringMap, - flussConfig: FlussConfiguration) + val flussConfig: FlussConfiguration) extends FlussLakeSupportsPushDownV2Filters { override def build(): Scan = { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala index d451039299..0eb701555f 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala @@ -86,7 +86,8 @@ class FlussLakeAppendBatch( throw e } - val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + val lakeSource = + FlussLakeUtils.createLakeSource(flussConfig.toMap, tableInfo.getProperties.toMap, tablePath) lakeSource.withProject(FlussLakeUtils.lakeProjection(projection)) pushedPredicate.foreach(FlussLakeBatch.applyLakeFilters(lakeSource, _)) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala index 369c45f748..30014b3c9c 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala @@ -39,7 +39,7 @@ class FlussLakePartitionReaderFactory( extends PartitionReaderFactory { @transient private lazy val lakeSource: LakeSource[LakeSplit] = { - val source = FlussLakeUtils.createLakeSource(tableProperties, tablePath) + val source = FlussLakeUtils.createLakeSource(flussConfig.toMap, tableProperties, tablePath) source.withProject(FlussLakeUtils.lakeProjection(projection)) flussPredicate.foreach(FlussLakeBatch.applyLakeFilters(source, _)) source diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala index 2ef6ab7701..1b095751ef 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala @@ -86,7 +86,8 @@ class FlussLakeUpsertBatch( throw e } - val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + val lakeSource = + FlussLakeUtils.createLakeSource(flussConfig.toMap, tableInfo.getProperties.toMap, tablePath) lakeSource.withProject(FlussLakeUtils.lakeProjection(projection)) pushedPredicate.foreach(FlussLakeBatch.applyLakeFilters(lakeSource, _)) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala index b797891ab7..8ec4a4b9b2 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala @@ -25,17 +25,25 @@ import org.apache.fluss.utils.PropertiesUtils import java.util +import scala.collection.JavaConverters._ + object FlussLakeUtils { + private val SPARK_CATALOG_PREFIX = "spark.catalog." + def createLakeSource( + catalogProperties: util.Map[String, String], tableProperties: util.Map[String, String], tablePath: TablePath): LakeSource[LakeSplit] = { val tableConfig = Configuration.fromMap(tableProperties) val datalakeFormat = tableConfig.get(ConfigOptions.TABLE_DATALAKE_FORMAT) val dataLakePrefix = "table.datalake." + datalakeFormat + "." - val catalogProperties = PropertiesUtils.extractAndRemovePrefix(tableProperties, dataLakePrefix) - val lakeConfig = Configuration.fromMap(catalogProperties) + val lakeConfig = + Configuration.fromMap(PropertiesUtils.extractAndRemovePrefix(tableProperties, dataLakePrefix)) + catalogProperties.asScala.foreach { + case (k, v) => lakeConfig.setString(s"$SPARK_CATALOG_PREFIX$k", v) + } val lakeStoragePlugin = LakeStoragePluginSetUp.fromDataLakeFormat(datalakeFormat.toString, null) val lakeStorage = lakeStoragePlugin.createLakeStorage(lakeConfig) From f2cff55b817f6556ba92a2f7e43d0e2ef85cbca6 Mon Sep 17 00:00:00 2001 From: Yang Zhang <714615435@qq.com> Date: Fri, 22 May 2026 19:42:04 +0800 Subject: [PATCH 2/2] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala index 8ec4a4b9b2..62a6b6d989 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala @@ -29,7 +29,7 @@ import scala.collection.JavaConverters._ object FlussLakeUtils { - private val SPARK_CATALOG_PREFIX = "spark.catalog." + private val SPARK_CATALOG_PREFIX = "spark.sql.catalog." def createLakeSource( catalogProperties: util.Map[String, String],