Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,16 @@ trait FlussLakeSupportsPushDownV2Filters extends FlussSupportsPushDownV2Filters

def tablePath: TablePath

def flussConfig: FlussConfiguration

override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
val pairs =
SparkPredicateConverter.convertPerPredicate(tableInfo.getRowType, predicates.toSeq)
val (acceptedSpark, acceptedFluss) = if (pairs.isEmpty) {
(Seq.empty[Predicate], Seq.empty[FlussPredicate])
} else {
val lakeSource =
FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
FlussLakeUtils.createLakeSource(flussConfig.toMap, tableInfo.getProperties.toMap, tablePath)
val result = FlussLakeBatch.applyLakeFilters(lakeSource, pairs.map(_._2).asJava)
// Identity-match: lake sources are expected to return the same instances they received.
val acceptedSet: JSet[FlussPredicate] =
Expand All @@ -119,7 +121,7 @@ class FlussAppendScanBuilder(
tablePath: TablePath,
val tableInfo: TableInfo,
options: CaseInsensitiveStringMap,
flussConfig: FlussConfiguration)
val flussConfig: FlussConfiguration)
extends FlussSupportsPushDownV2Filters {

override def build(): Scan = {
Expand All @@ -140,7 +142,7 @@ class FlussLakeAppendScanBuilder(
val tablePath: TablePath,
val tableInfo: TableInfo,
options: CaseInsensitiveStringMap,
flussConfig: FlussConfiguration)
val flussConfig: FlussConfiguration)
extends FlussLakeSupportsPushDownV2Filters {

override def build(): Scan = {
Expand All @@ -160,7 +162,7 @@ class FlussUpsertScanBuilder(
tablePath: TablePath,
val tableInfo: TableInfo,
options: CaseInsensitiveStringMap,
flussConfig: FlussConfiguration)
val flussConfig: FlussConfiguration)
extends FlussSupportsPushDownPartitionFilters {

override def build(): Scan = {
Expand All @@ -173,7 +175,7 @@ class FlussLakeUpsertScanBuilder(
val tablePath: TablePath,
val tableInfo: TableInfo,
options: CaseInsensitiveStringMap,
flussConfig: FlussConfiguration)
val flussConfig: FlussConfiguration)
extends FlussLakeSupportsPushDownV2Filters {

override def build(): Scan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class FlussLakeAppendBatch(
throw e
}

val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
val lakeSource =
FlussLakeUtils.createLakeSource(flussConfig.toMap, tableInfo.getProperties.toMap, tablePath)
lakeSource.withProject(FlussLakeUtils.lakeProjection(projection))
pushedPredicate.foreach(FlussLakeBatch.applyLakeFilters(lakeSource, _))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FlussLakePartitionReaderFactory(
extends PartitionReaderFactory {

@transient private lazy val lakeSource: LakeSource[LakeSplit] = {
val source = FlussLakeUtils.createLakeSource(tableProperties, tablePath)
val source = FlussLakeUtils.createLakeSource(flussConfig.toMap, tableProperties, tablePath)
source.withProject(FlussLakeUtils.lakeProjection(projection))
flussPredicate.foreach(FlussLakeBatch.applyLakeFilters(source, _))
source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class FlussLakeUpsertBatch(
throw e
}

val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
val lakeSource =
FlussLakeUtils.createLakeSource(flussConfig.toMap, tableInfo.getProperties.toMap, tablePath)
lakeSource.withProject(FlussLakeUtils.lakeProjection(projection))
pushedPredicate.foreach(FlussLakeBatch.applyLakeFilters(lakeSource, _))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,25 @@ import org.apache.fluss.utils.PropertiesUtils

import java.util

import scala.collection.JavaConverters._

object FlussLakeUtils {

private val SPARK_CATALOG_PREFIX = "spark.sql.catalog."

def createLakeSource(
catalogProperties: util.Map[String, String],
tableProperties: util.Map[String, String],
tablePath: TablePath): LakeSource[LakeSplit] = {
val tableConfig = Configuration.fromMap(tableProperties)
val datalakeFormat = tableConfig.get(ConfigOptions.TABLE_DATALAKE_FORMAT)
val dataLakePrefix = "table.datalake." + datalakeFormat + "."

val catalogProperties = PropertiesUtils.extractAndRemovePrefix(tableProperties, dataLakePrefix)
val lakeConfig = Configuration.fromMap(catalogProperties)
val lakeConfig =
Configuration.fromMap(PropertiesUtils.extractAndRemovePrefix(tableProperties, dataLakePrefix))
catalogProperties.asScala.foreach {
case (k, v) => lakeConfig.setString(s"$SPARK_CATALOG_PREFIX$k", v)
}
Comment on lines +44 to +46
val lakeStoragePlugin =
LakeStoragePluginSetUp.fromDataLakeFormat(datalakeFormat.toString, null)
val lakeStorage = lakeStoragePlugin.createLakeStorage(lakeConfig)
Expand Down
Loading