diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala index 9e1c0d4545..361542d769 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala @@ -93,6 +93,8 @@ trait FlussLakeSupportsPushDownV2Filters extends FlussSupportsPushDownV2Filters def tablePath: TablePath + def flussConfig: FlussConfiguration + override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { val pairs = SparkPredicateConverter.convertPerPredicate(tableInfo.getRowType, predicates.toSeq) @@ -100,7 +102,7 @@ trait FlussLakeSupportsPushDownV2Filters extends FlussSupportsPushDownV2Filters (Seq.empty[Predicate], Seq.empty[FlussPredicate]) } else { val lakeSource = - FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + FlussLakeUtils.createLakeSource(flussConfig.toMap, tableInfo.getProperties.toMap, tablePath) val result = FlussLakeBatch.applyLakeFilters(lakeSource, pairs.map(_._2).asJava) // Identity-match: lake sources are expected to return the same instances they received. val acceptedSet: JSet[FlussPredicate] = @@ -119,7 +121,7 @@ class FlussAppendScanBuilder( tablePath: TablePath, val tableInfo: TableInfo, options: CaseInsensitiveStringMap, - flussConfig: FlussConfiguration) + val flussConfig: FlussConfiguration) extends FlussSupportsPushDownV2Filters { override def build(): Scan = { @@ -140,7 +142,7 @@ class FlussLakeAppendScanBuilder( val tablePath: TablePath, val tableInfo: TableInfo, options: CaseInsensitiveStringMap, - flussConfig: FlussConfiguration) + val flussConfig: FlussConfiguration) extends FlussLakeSupportsPushDownV2Filters { override def build(): Scan = { @@ -160,7 +162,7 @@ class FlussUpsertScanBuilder( tablePath: TablePath, val tableInfo: TableInfo, options: CaseInsensitiveStringMap, - flussConfig: FlussConfiguration) + val flussConfig: FlussConfiguration) extends FlussSupportsPushDownPartitionFilters { override def build(): Scan = { @@ -173,7 +175,7 @@ class FlussLakeUpsertScanBuilder( val tablePath: TablePath, val tableInfo: TableInfo, options: CaseInsensitiveStringMap, - flussConfig: FlussConfiguration) + val flussConfig: FlussConfiguration) extends FlussLakeSupportsPushDownV2Filters { override def build(): Scan = { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala index d451039299..0eb701555f 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala @@ -86,7 +86,8 @@ class FlussLakeAppendBatch( throw e } - val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + val lakeSource = + FlussLakeUtils.createLakeSource(flussConfig.toMap, tableInfo.getProperties.toMap, tablePath) lakeSource.withProject(FlussLakeUtils.lakeProjection(projection)) pushedPredicate.foreach(FlussLakeBatch.applyLakeFilters(lakeSource, _)) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala index 369c45f748..30014b3c9c 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala @@ -39,7 +39,7 @@ class FlussLakePartitionReaderFactory( extends PartitionReaderFactory { @transient private lazy val lakeSource: LakeSource[LakeSplit] = { - val source = FlussLakeUtils.createLakeSource(tableProperties, tablePath) + val source = FlussLakeUtils.createLakeSource(flussConfig.toMap, tableProperties, tablePath) source.withProject(FlussLakeUtils.lakeProjection(projection)) flussPredicate.foreach(FlussLakeBatch.applyLakeFilters(source, _)) source diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala index 2ef6ab7701..1b095751ef 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala @@ -86,7 +86,8 @@ class FlussLakeUpsertBatch( throw e } - val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + val lakeSource = + FlussLakeUtils.createLakeSource(flussConfig.toMap, tableInfo.getProperties.toMap, tablePath) lakeSource.withProject(FlussLakeUtils.lakeProjection(projection)) pushedPredicate.foreach(FlussLakeBatch.applyLakeFilters(lakeSource, _)) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala index b797891ab7..62a6b6d989 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala @@ -25,17 +25,25 @@ import org.apache.fluss.utils.PropertiesUtils import java.util +import scala.collection.JavaConverters._ + object FlussLakeUtils { + private val SPARK_CATALOG_PREFIX = "spark.sql.catalog." + def createLakeSource( + catalogProperties: util.Map[String, String], tableProperties: util.Map[String, String], tablePath: TablePath): LakeSource[LakeSplit] = { val tableConfig = Configuration.fromMap(tableProperties) val datalakeFormat = tableConfig.get(ConfigOptions.TABLE_DATALAKE_FORMAT) val dataLakePrefix = "table.datalake." + datalakeFormat + "." - val catalogProperties = PropertiesUtils.extractAndRemovePrefix(tableProperties, dataLakePrefix) - val lakeConfig = Configuration.fromMap(catalogProperties) + val lakeConfig = + Configuration.fromMap(PropertiesUtils.extractAndRemovePrefix(tableProperties, dataLakePrefix)) + catalogProperties.asScala.foreach { + case (k, v) => lakeConfig.setString(s"$SPARK_CATALOG_PREFIX$k", v) + } val lakeStoragePlugin = LakeStoragePluginSetUp.fromDataLakeFormat(datalakeFormat.toString, null) val lakeStorage = lakeStoragePlugin.createLakeStorage(lakeConfig)