diff --git a/paimon-spark/paimon-spark-4.1/pom.xml b/paimon-spark/paimon-spark-4.1/pom.xml
new file mode 100644
index 000000000000..87ebe4eedf24
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/pom.xml
@@ -0,0 +1,161 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.paimon
+ paimon-spark
+ 1.5-SNAPSHOT
+
+
+ paimon-spark-4.1_2.13
+ Paimon : Spark : 4.1 : 2.13
+
+
+ 4.1.1
+
+
+
+
+ org.apache.paimon
+ paimon-format
+
+
+
+ org.apache.paimon
+ paimon-spark4-common_${scala.binary.version}
+ ${project.version}
+
+
+
+ org.apache.paimon
+ paimon-spark-common_${scala.binary.version}
+ ${project.version}
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+
+
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+ ${spark.version}
+
+
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+ ${spark.version}
+
+
+
+
+
+ org.apache.paimon
+ paimon-spark-ut_${scala.binary.version}
+ ${project.version}
+ tests
+ test
+
+
+ *
+ *
+
+
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+ org.apache.spark
+ spark-connect-shims_${scala.binary.version}
+
+
+
+
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-paimon
+ package
+
+ shade
+
+
+
+
+ *
+
+ com/github/luben/zstd/**
+ **/*libzstd-jni-*.so
+ **/*libzstd-jni-*.dll
+
+
+
+
+
+ org.apache.paimon:paimon-spark4-common_${scala.binary.version}
+
+
+
+
+
+
+
+
+
diff --git a/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala b/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
new file mode 100644
index 000000000000..e86195f1af0b
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.optimizer
+
+import org.apache.paimon.spark.PaimonScan
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, ExprId, ScalarSubquery, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {
+
+ override def tryMergeDataSourceV2ScanRelation(
+ newV2ScanRelation: DataSourceV2ScanRelation,
+ cachedV2ScanRelation: DataSourceV2ScanRelation)
+ : Option[(LogicalPlan, AttributeMap[Attribute])] = {
+ (newV2ScanRelation, cachedV2ScanRelation) match {
+ case (
+ DataSourceV2ScanRelation(
+ newRelation,
+ newScan: PaimonScan,
+ newOutput,
+ newPartitioning,
+ newOrdering),
+ DataSourceV2ScanRelation(
+ cachedRelation,
+ cachedScan: PaimonScan,
+ _,
+ cachedPartitioning,
+ cacheOrdering)) =>
+ checkIdenticalPlans(newRelation, cachedRelation).flatMap {
+ outputMap =>
+ if (
+ samePartitioning(newPartitioning, cachedPartitioning, outputMap) && sameOrdering(
+ newOrdering,
+ cacheOrdering,
+ outputMap)
+ ) {
+ mergePaimonScan(newScan, cachedScan).map {
+ mergedScan =>
+ val mergedAttributes = mergedScan
+ .readSchema()
+ .map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+ val cachedOutputNameMap = cachedRelation.output.map(a => a.name -> a).toMap
+ val mergedOutput =
+ mergedAttributes.map(a => cachedOutputNameMap.getOrElse(a.name, a))
+ val newV2ScanRelation =
+ cachedV2ScanRelation.copy(scan = mergedScan, output = mergedOutput)
+
+ val mergedOutputNameMap = mergedOutput.map(a => a.name -> a).toMap
+ val newOutputMap =
+ AttributeMap(newOutput.map(a => a -> mergedOutputNameMap(a.name).toAttribute))
+
+ newV2ScanRelation -> newOutputMap
+ }
+ } else {
+ None
+ }
+ }
+
+ case _ => None
+ }
+ }
+
+ private def sameOrdering(
+ newOrdering: Option[Seq[SortOrder]],
+ cachedOrdering: Option[Seq[SortOrder]],
+ outputAttrMap: AttributeMap[Attribute]): Boolean = {
+ val mappedNewOrdering = newOrdering.map(_.map(mapAttributes(_, outputAttrMap)))
+ mappedNewOrdering.map(_.map(_.canonicalized)) == cachedOrdering.map(_.map(_.canonicalized))
+ }
+
+ override protected def createScalarSubquery(plan: LogicalPlan, exprId: ExprId): ScalarSubquery = {
+ ScalarSubquery(plan, exprId = exprId)
+ }
+}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/resources/function/hive-test-udfs.jar b/paimon-spark/paimon-spark-4.1/src/test/resources/function/hive-test-udfs.jar
new file mode 100644
index 000000000000..a5bfa456f668
Binary files /dev/null and b/paimon-spark/paimon-spark-4.1/src/test/resources/function/hive-test-udfs.jar differ
diff --git a/paimon-spark/paimon-spark-4.1/src/test/resources/hive-site.xml b/paimon-spark/paimon-spark-4.1/src/test/resources/hive-site.xml
new file mode 100644
index 000000000000..bdf2bb090760
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/resources/hive-site.xml
@@ -0,0 +1,56 @@
+
+
+
+
+ hive.metastore.integral.jdo.pushdown
+ true
+
+
+
+ hive.metastore.schema.verification
+ false
+
+
+
+ hive.metastore.client.capability.check
+ false
+
+
+
+ datanucleus.schema.autoCreateTables
+ true
+
+
+
+ datanucleus.schema.autoCreateAll
+ true
+
+
+
+
+ datanucleus.connectionPoolingType
+ DBCP
+
+
+
+ hive.metastore.uris
+ thrift://localhost:9090
+ Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
+
+
\ No newline at end of file
diff --git a/paimon-spark/paimon-spark-4.1/src/test/resources/log4j2-test.properties b/paimon-spark/paimon-spark-4.1/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000000..6f324f5863ac
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/resources/log4j2-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
+
+logger.kafka.name = kafka
+logger.kafka.level = OFF
+logger.kafka2.name = state.change
+logger.kafka2.level = OFF
+
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = OFF
+logger.I0Itec.name = org.I0Itec
+logger.I0Itec.level = OFF
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
new file mode 100644
index 000000000000..322d50a62127
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+class CompactProcedureTest extends CompactProcedureTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTest.scala
new file mode 100644
index 000000000000..d57846709877
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+class ProcedureTest extends ProcedureTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
new file mode 100644
index 000000000000..255906d04bf2
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class AnalyzeTableTest extends AnalyzeTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
new file mode 100644
index 000000000000..b729f57b33e7
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class DDLTest extends DDLTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala
new file mode 100644
index 000000000000..cb139d2a57be
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class DDLWithHiveCatalogTest extends DDLWithHiveCatalogTestBase {}
+
+class DefaultDatabaseTest extends DefaultDatabaseTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
new file mode 100644
index 000000000000..6170e2fd6c5c
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class DataFrameWriteTest extends DataFrameWriteTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
new file mode 100644
index 000000000000..8d620ece8245
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.spark.SparkConf
+
+class DeleteFromTableTest extends DeleteFromTableTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
+ }
+}
+
+class V2DeleteFromTableTest extends DeleteFromTableTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
new file mode 100644
index 000000000000..c6aa77419241
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class DescribeTableTest extends DescribeTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 000000000000..ba49976ab6c0
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTest.scala
new file mode 100644
index 000000000000..4f66584c303b
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class InsertOverwriteTableTest extends InsertOverwriteTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
new file mode 100644
index 000000000000..c83ee5493867
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.{PaimonAppendBucketedTableTest, PaimonAppendNonBucketTableTest, PaimonPrimaryKeyBucketedTableTest, PaimonPrimaryKeyNonBucketTableTest}
+
+import org.apache.spark.SparkConf
+
+class MergeIntoPrimaryKeyBucketedTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoPrimaryKeyTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonPrimaryKeyBucketedTableTest {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
+ }
+}
+
+class MergeIntoPrimaryKeyNonBucketTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoPrimaryKeyTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonPrimaryKeyNonBucketTableTest {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
+ }
+}
+
+class MergeIntoAppendBucketedTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonAppendBucketedTableTest {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
+ }
+}
+
+class MergeIntoAppendNonBucketedTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonAppendNonBucketTableTest {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
+ }
+}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
new file mode 100644
index 000000000000..635185a9ed0e
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonCompositePartitionKeyTest extends PaimonCompositePartitionKeyTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
new file mode 100644
index 000000000000..ec140a89bbd3
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField, NamedExpression, ScalarSubquery}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+
+class PaimonOptimizationTest extends PaimonOptimizationTestBase {
+
+ override def extractorExpression(
+ cteIndex: Int,
+ output: Seq[Attribute],
+ fieldIndex: Int): NamedExpression = {
+ GetStructField(
+ ScalarSubquery(
+ SparkShimLoader.shim
+ .createCTERelationRef(cteIndex, resolved = true, output.toSeq, isStreaming = false)),
+ fieldIndex,
+ None)
+ .as("scalarsubquery()")
+ }
+}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
new file mode 100644
index 000000000000..26677d85c71a
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonPushDownTest extends PaimonPushDownTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
new file mode 100644
index 000000000000..f37fbad27033
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonV1FunctionTest extends PaimonV1FunctionTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
new file mode 100644
index 000000000000..6ab8a2671b51
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RewriteUpsertTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RewriteUpsertTableTest.scala
new file mode 100644
index 000000000000..412aa3b30351
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RewriteUpsertTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RewriteUpsertTableTest extends RewriteUpsertTableTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 000000000000..da4c9b854df3
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
new file mode 100644
index 000000000000..9f96840a7788
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/ShowColumnsTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/ShowColumnsTest.scala
new file mode 100644
index 000000000000..6601dc2fca37
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/ShowColumnsTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class ShowColumnsTest extends PaimonShowColumnsTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
new file mode 100644
index 000000000000..21c4c8a495ed
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala
new file mode 100644
index 000000000000..92309d54167b
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class TagDdlTest extends PaimonTagDdlTestBase {}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
new file mode 100644
index 000000000000..3a0f56cd4820
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.spark.SparkConf
+
+class UpdateTableTest extends UpdateTableTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
+ }
+}
+
+class V2UpdateTableTest extends UpdateTableTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
new file mode 100644
index 000000000000..94e9ac683f02
--- /dev/null
+++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/VariantTest.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.spark.SparkConf
+
+class VariantTest extends VariantTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "false")
+ }
+}
+
+class VariantInferShreddingTest extends VariantTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.variant.inferShreddingSchema", "true")
+ }
+}
diff --git a/pom.xml b/pom.xml
index 5a329e76f6cf..cee8c295229d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -427,6 +427,7 @@ under the License.
paimon-spark/paimon-spark4-common
paimon-spark/paimon-spark-4.0
+ paimon-spark/paimon-spark-4.1
17