From 9a90ba460708da9509f2c21ee16df1aff6ec370e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Mar 2026 14:23:20 -0700 Subject: [PATCH 01/19] [SPARK-XXXXX][SQL] Support CREATE TABLE LIKE for V2 catalogs ## What changes were proposed in this pull request? Previously, `CREATE TABLE LIKE` was implemented only via `CreateTableLikeCommand`, which bypassed the V2 catalog pipeline entirely. This meant: - 3-part names (catalog.namespace.table) caused a parse error - 2-part names targeting a V2 catalog caused `NoSuchDatabaseException` This PR adds a V2 execution path for `CREATE TABLE LIKE`: - Grammar: change `tableIdentifier` (2-part max) to `identifierReference` (N-part) for both target and source, consistent with all other DDL commands - Parser: emit `CreateTableLike` (new V2 logical plan) instead of `CreateTableLikeCommand` directly - `ResolveCatalogs`: resolve the target `UnresolvedIdentifier` to `ResolvedIdentifier` - `ResolveSessionCatalog`: route back to `CreateTableLikeCommand` when both target and source are V1 tables/views in the session catalog (V1->V1 path) - `DataSourceV2Strategy`: convert `CreateTableLike` to new `CreateTableLikeExec` - `CreateTableLikeExec`: physical exec that copies schema and partitioning from the resolved source `Table` and calls `TableCatalog.createTable()` ## How was this patch tested? - `CreateTableLikeSuite`: new integration tests covering V2 target with V1/V2 source, cross-catalog, views as source, IF NOT EXISTS, property behavior, and V1 fallback regression - `DDLParserSuite`: updated existing `create table like` test to match the new `CreateTableLike` plan shape; added 3-part name parsing test Co-Authored-By: Claude Sonnet 4.6 --- .../sql/catalyst/parser/SqlBaseParser.g4 | 4 +- .../catalyst/analysis/ResolveCatalogs.scala | 4 + .../catalyst/plans/logical/v2Commands.scala | 31 ++- .../analysis/ResolveSessionCatalog.scala | 10 + .../spark/sql/execution/SparkSqlParser.scala | 11 +- .../datasources/v2/CreateTableLikeExec.scala | 110 +++++++++ .../datasources/v2/DataSourceV2Strategy.scala | 33 ++- .../sql/connector/CreateTableLikeSuite.scala | 212 ++++++++++++++++++ .../execution/command/DDLParserSuite.scala | 91 ++++---- 9 files changed, 447 insertions(+), 59 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index a1de1234ef317..918e8eac79bfa 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -248,8 +248,8 @@ statement | createTableHeader (LEFT_PAREN tableElementList RIGHT_PAREN)? tableProvider? createTableClauses (AS? query)? #createTable - | CREATE TABLE (IF errorCapturingNot EXISTS)? target=tableIdentifier - LIKE source=tableIdentifier + | CREATE TABLE (IF errorCapturingNot EXISTS)? target=identifierReference + LIKE source=identifierReference (tableProvider | rowFormat | createFileFormat | diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 316dc8faff3ff..b7c9638ca75e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -121,6 +121,10 @@ class ResolveCatalogs(val catalogManager: CatalogManager) val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, columns) r.copy(name = resolvedIdentifier) + case c @ CreateTableLike(UnresolvedIdentifier(nameParts, allowTemp), _, _, _, _, _) => + val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, Nil) + c.copy(name = resolvedIdentifier) + case UnresolvedIdentifier(nameParts, allowTemp) => resolveIdentifier(nameParts, allowTemp, Nil) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index c0fe58596c9fd..052681d41ad18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -21,7 +21,7 @@ import org.apache.spark.{SparkException, SparkIllegalArgumentException, SparkUns import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AssignmentUtils, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, ResolvedProcedure, ResolveSchemaEvolution, TypeCheckResult, UnresolvedAttribute, UnresolvedException, UnresolvedProcedure, ViewSchemaMode} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} -import org.apache.spark.sql.catalyst.catalog.{FunctionResource, RoutineLanguage} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, FunctionResource, RoutineLanguage} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema @@ -615,6 +615,35 @@ case class CreateTable( } } +/** + * Create a new table with the same schema/partitioning as an existing table or view, + * for use with a v2 catalog. + * + * @param name Target table identifier. Starts as UnresolvedIdentifier, resolved to + * ResolvedIdentifier by ResolveCatalogs. + * @param source Source table or view. Starts as UnresolvedTableOrView, resolved to + * ResolvedTable / ResolvedPersistentView / ResolvedTempView by ResolveRelations. + * @param fileFormat User-specified STORED AS / ROW FORMAT (Hive-style). Empty if not specified. + * @param provider User-specified USING provider. None if not specified. + * @param properties User-specified TBLPROPERTIES. + * @param ifNotExists IF NOT EXISTS flag. + */ +case class CreateTableLike( + name: LogicalPlan, + source: LogicalPlan, + fileFormat: CatalogStorageFormat, + provider: Option[String], + properties: Map[String, String], + ifNotExists: Boolean) extends BinaryCommand { + + override def left: LogicalPlan = name + override def right: LogicalPlan = source + + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, newRight: LogicalPlan): CreateTableLike = + copy(name = newLeft, source = newRight) +} + /** * Create a new table from a select query with a v2 catalog. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 7efd2e1113179..830d495923057 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -293,6 +293,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c } + // For CREATE TABLE LIKE, use the v1 command if both the target and source are in the session + // catalog (or a V1-compatible catalog extension). If source is in a different catalog, fall + // through to the V2 execution path (CreateTableLikeExec via DataSourceV2Strategy). + case CreateTableLike( + ResolvedV1Identifier(targetIdent), + ResolvedV1TableOrViewIdentifier(sourceIdent), + fileFormat, provider, properties, ifNotExists) => + CreateTableLikeCommand( + targetIdent, sourceIdent, fileFormat, provider, properties, ifNotExists) + case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if conf.useV1Command => DropTableCommand(ident, ifExists, isView = false, purge = purge) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 6c19f53d2dc42..1cf15be1d3a7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1158,8 +1158,6 @@ class SparkSqlAstBuilder extends AstBuilder { * }}} */ override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { - val targetTable = visitTableIdentifier(ctx.target) - val sourceTable = visitTableIdentifier(ctx.source) checkDuplicateClauses(ctx.tableProvider, "PROVIDER", ctx) checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx) checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx) @@ -1188,8 +1186,13 @@ class SparkSqlAstBuilder extends AstBuilder { val storage = toStorageFormat(location, serdeInfo, ctx) val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) val cleanedProperties = cleanTableProperties(ctx, properties) - CreateTableLikeCommand( - targetTable, sourceTable, storage, provider, cleanedProperties, ctx.EXISTS != null) + CreateTableLike( + name = withIdentClause(ctx.target, UnresolvedIdentifier(_)), + source = createUnresolvedTableOrView(ctx.source, "CREATE TABLE LIKE", allowTempView = true), + fileFormat = storage, + provider = provider, + properties = cleanedProperties, + ifNotExists = ctx.EXISTS != null) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala new file mode 100644 index 0000000000000..b4c1f9c0336b8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.internal.LogKeys.TABLE_NAME +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog, TableInfo, V1Table} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * Physical plan node for CREATE TABLE ... LIKE ... targeting a v2 catalog. + * + * Copies schema (columns) and partitioning from `sourceTable`. The following properties of the + * source table are intentionally NOT copied (matching v1 behavior): + * - Table-level comments + * - Source table's TBLPROPERTIES (user-specified `properties` are used instead) + * - Statistics, owner, create time + */ +case class CreateTableLikeExec( + targetCatalog: TableCatalog, + targetIdent: Identifier, + sourceTable: Table, + fileFormat: CatalogStorageFormat, + provider: Option[String], + properties: Map[String, String], + ifNotExists: Boolean) extends LeafV2CommandExec { + + override def output: Seq[Attribute] = Seq.empty + + override protected def run(): Seq[InternalRow] = { + if (!targetCatalog.tableExists(targetIdent)) { + // 1. Extract columns from source. For V1Table sources use the raw schema so that + // CHAR/VARCHAR types are preserved as declared (without internal metadata expansion). + val columns = sourceTable match { + case v1: V1Table => + val rawSchema = CharVarcharUtils.getRawSchema(v1.catalogTable.schema) + CatalogV2Util.structTypeToV2Columns(rawSchema) + case _ => + sourceTable.columns() + } + + // 2. Extract partitioning from source (includes both partition columns and bucket spec + // for V1Table, as V1Table.partitioning encodes both). + val partitioning = sourceTable.partitioning + + // 3. Resolve provider: USING clause overrides, else copy from source. + val resolvedProvider = provider.orElse { + sourceTable match { + case v1: V1Table if v1.catalogTable.tableType == CatalogTableType.VIEW => + // When the source is a view, default to the session's default data source. + // This matches V1 CreateTableLikeCommand behavior. + Some(session.sessionState.conf.defaultDataSourceName) + case v1: V1Table => + v1.catalogTable.provider + case _ => + Option(sourceTable.properties.get(TableCatalog.PROP_PROVIDER)) + } + } + + // 4. Build final properties. User-specified TBLPROPERTIES are used as-is; source table + // properties are NOT copied. Provider and location are added if determined above. + val locationProp: Option[(String, String)] = + fileFormat.locationUri.map(uri => + TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri)) + + val finalProps = properties ++ + resolvedProvider.map(TableCatalog.PROP_PROVIDER -> _) ++ + locationProp + + try { + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(partitioning) + .withProperties(finalProps.asJava) + .build() + targetCatalog.createTable(targetIdent, tableInfo) + } catch { + case _: TableAlreadyExistsException if ifNotExists => + logWarning( + log"Table ${MDC(TABLE_NAME, targetIdent.quoted)} was created concurrently. Ignoring.") + } + } else if (!ifNotExists) { + throw QueryCompilationErrors.tableAlreadyExistsError(targetIdent) + } + + Seq.empty + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 91e753096a238..e1a8e8b502f15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,8 +24,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkException, SparkIllegalArgumentException} import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.EXPR -import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} -import org.apache.spark.sql.catalyst.catalog.CatalogUtils +import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedPersistentView, ResolvedTable, ResolvedTempView} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogUtils} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, IdentityColumn, ResolveDefaultColumns, ResolveTableConstraints, V2ExpressionBuilder} import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable} +import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable, V1Table} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} @@ -240,6 +240,33 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat qualifyLocInTableSpec(tableSpec), options, ifNotExists) :: Nil } + // CREATE TABLE ... LIKE ... for a v2 catalog target. + // Source is an already-resolved Table object; no extra catalog round-trip is needed. + case CreateTableLike( + ResolvedIdentifier(catalog, ident), + ResolvedTable(_, _, table, _), + fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) => + CreateTableLikeExec( + catalog.asTableCatalog, ident, table, fileFormat, provider, properties, ifNotExists) :: Nil + + // Source is a persistent or temporary view; wrap its CatalogTable in V1Table so the + // exec can extract schema and provider uniformly. + case CreateTableLike( + ResolvedIdentifier(catalog, ident), + ResolvedPersistentView(_, _, meta), + fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) => + CreateTableLikeExec( + catalog.asTableCatalog, ident, V1Table(meta), + fileFormat, provider, properties, ifNotExists) :: Nil + + case CreateTableLike( + ResolvedIdentifier(catalog, ident), + ResolvedTempView(_, meta), + fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) => + CreateTableLikeExec( + catalog.asTableCatalog, ident, V1Table(meta), + fileFormat, provider, properties, ifNotExists) :: Nil + case RefreshTable(r: ResolvedTable) => RefreshTableExec(r.catalog, r.identifier, recacheTable(r, includeTimeTravel = true)) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala new file mode 100644 index 0000000000000..09c05c96fb311 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryCatalog} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType} + +class CreateTableLikeSuite extends DatasourceV2SQLBase { + + private def testCatalog: InMemoryCatalog = + catalog("testcat").asInstanceOf[InMemoryCatalog] + + private def testCatalog2: InMemoryCatalog = + catalog("testcat2").asInstanceOf[InMemoryCatalog] + + // ------------------------------------------------------------------------- + // Basic V2 path + // ------------------------------------------------------------------------- + + test("v2 target, v1 source: schema and partitioning are copied") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint, data string) USING parquet PARTITIONED BY (data)") + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.schema() === CatalogV2Util.v2ColumnsToStructType(dst.columns())) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + // partition column encoded in partitioning (identity transform on the partition column) + assert(dst.partitioning.nonEmpty) + } + } + + test("v2 target, v2 source: pure v2 path") { + withTable("testcat.src", "testcat.dst") { + sql("CREATE TABLE testcat.src (id bigint, data string) USING foo") + sql("CREATE TABLE testcat.dst LIKE testcat.src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + } + } + + test("cross-catalog: source in testcat, target in testcat2") { + withTable("testcat.src", "testcat2.dst") { + sql("CREATE TABLE testcat.src (id bigint, name string) USING foo") + sql("CREATE TABLE testcat2.dst LIKE testcat.src") + + val dst = testCatalog2.loadTable(Identifier.of(Array(), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "name")) + } + } + + test("3-part name: catalog.namespace.table for both target and source") { + withTable("testcat.ns.src", "testcat2.ns.dst") { + sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns") + sql("CREATE NAMESPACE IF NOT EXISTS testcat2.ns") + sql("CREATE TABLE testcat.ns.src (id bigint, data string) USING foo") + sql("CREATE TABLE testcat2.ns.dst LIKE testcat.ns.src") + + val dst = testCatalog2.loadTable(Identifier.of(Array("ns"), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + } + } + + // ------------------------------------------------------------------------- + // IF NOT EXISTS + // ------------------------------------------------------------------------- + + test("IF NOT EXISTS: second call is silent when table already exists") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src") + // Should not throw + sql("CREATE TABLE IF NOT EXISTS testcat.dst LIKE src") + } + } + + test("without IF NOT EXISTS, duplicate create throws TableAlreadyExistsException") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src") + intercept[TableAlreadyExistsException] { + sql("CREATE TABLE testcat.dst LIKE src") + } + } + } + + // ------------------------------------------------------------------------- + // Views as source + // ------------------------------------------------------------------------- + + test("persistent view as source, v2 target") { + withTable("src", "testcat.dst") { + withView("v") { + sql("CREATE TABLE src (id bigint, data string) USING parquet") + sql("INSERT INTO src VALUES (1, 'a')") + sql("CREATE VIEW v AS SELECT * FROM src") + sql("CREATE TABLE testcat.dst LIKE v") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + } + } + } + + test("temp view as source, v2 target") { + withTable("src", "testcat.dst") { + withTempView("tv") { + sql("CREATE TABLE src (id bigint, data string) USING parquet") + sql("CREATE TEMP VIEW tv AS SELECT id, data FROM src") + sql("CREATE TABLE testcat.dst LIKE tv") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + } + } + } + + // ------------------------------------------------------------------------- + // Property / provider behavior + // ------------------------------------------------------------------------- + + test("source TBLPROPERTIES are NOT copied to target") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet TBLPROPERTIES ('secret_key' = 'secret')") + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(!dst.properties.containsKey("secret_key"), + "Source TBLPROPERTIES should not be copied to target") + } + } + + test("user-specified TBLPROPERTIES are applied on target") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src TBLPROPERTIES ('custom' = 'value')") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.get("custom") === "value") + } + } + + test("USING clause overrides source provider") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src USING foo") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.get("provider") === "foo") + } + } + + // ------------------------------------------------------------------------- + // Column type fidelity + // ------------------------------------------------------------------------- + + test("multiple column types are preserved") { + withTable("src", "testcat.dst") { + sql( + """CREATE TABLE src ( + | id bigint, + | name string, + | score int + |) USING parquet""".stripMargin) + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val schema = CatalogV2Util.v2ColumnsToStructType(dst.columns()) + assert(schema("id").dataType === LongType) + assert(schema("name").dataType === StringType) + assert(schema("score").dataType === IntegerType) + } + } + + // ------------------------------------------------------------------------- + // V1 fallback regression + // ------------------------------------------------------------------------- + + test("v1 fallback: CREATE TABLE default.dst LIKE default.src still uses CreateTableLikeCommand") { + withTable("src", "dst") { + sql("CREATE TABLE src (id bigint, data string) USING parquet") + sql("CREATE TABLE dst LIKE src") + // Verify via session catalog that dst was created with the correct schema + val meta = spark.sessionState.catalog.getTableMetadata( + spark.sessionState.sqlParser.parseTableIdentifier("dst")) + assert(meta.schema.fieldNames === Array("id", "data")) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 9db7ae2e2824c..2fa323c1dc9cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.SparkThrowable import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, SchemaCompensation, UnresolvedAttribute, UnresolvedIdentifier} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, SchemaCompensation, UnresolvedAttribute, UnresolvedIdentifier, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, FileResource, FunctionResource, JarResource} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans @@ -721,83 +721,76 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { } test("create table like") { - val v1 = "CREATE TABLE table1 LIKE table2" + // Helper to extract fields from the new CreateTableLike unresolved plan. + // The parser now emits CreateTableLike (v2 logical plan) instead of + // CreateTableLikeCommand, so both name and source are unresolved identifiers. + def extract(sql: String): (Seq[String], Seq[String], + org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat, + Option[String], Map[String, String], Boolean) = + parser.parsePlan(sql).collect { + case CreateTableLike( + UnresolvedIdentifier(targetParts, _), + UnresolvedTableOrView(sourceParts, _, _), + f, p, pr, e) => + (targetParts, sourceParts, f, p, pr, e) + }.head + val (target, source, fileFormat, provider, properties, exists) = - parser.parsePlan(v1).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + extract("CREATE TABLE table1 LIKE table2") assert(exists == false) - assert(target.database.isEmpty) - assert(target.table == "table1") - assert(source.database.isEmpty) - assert(source.table == "table2") + assert(target == Seq("table1")) + assert(source == Seq("table2")) assert(fileFormat.locationUri.isEmpty) assert(provider.isEmpty) - val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2" val (target2, source2, fileFormat2, provider2, properties2, exists2) = - parser.parsePlan(v2).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2") assert(exists2) - assert(target2.database.isEmpty) - assert(target2.table == "table1") - assert(source2.database.isEmpty) - assert(source2.table == "table2") + assert(target2 == Seq("table1")) + assert(source2 == Seq("table2")) assert(fileFormat2.locationUri.isEmpty) assert(provider2.isEmpty) - val v3 = "CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'" val (target3, source3, fileFormat3, provider3, properties3, exists3) = - parser.parsePlan(v3).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + extract("CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'") assert(!exists3) - assert(target3.database.isEmpty) - assert(target3.table == "table1") - assert(source3.database.isEmpty) - assert(source3.table == "table2") + assert(target3 == Seq("table1")) + assert(source3 == Seq("table2")) assert(fileFormat3.locationUri.map(_.toString) == Some("/spark/warehouse")) assert(provider3.isEmpty) - val v4 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'" val (target4, source4, fileFormat4, provider4, properties4, exists4) = - parser.parsePlan(v4).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'") assert(exists4) - assert(target4.database.isEmpty) - assert(target4.table == "table1") - assert(source4.database.isEmpty) - assert(source4.table == "table2") + assert(target4 == Seq("table1")) + assert(source4 == Seq("table2")) assert(fileFormat4.locationUri.map(_.toString) == Some("/spark/warehouse")) assert(provider4.isEmpty) - val v5 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING parquet" val (target5, source5, fileFormat5, provider5, properties5, exists5) = - parser.parsePlan(v5).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING parquet") assert(exists5) - assert(target5.database.isEmpty) - assert(target5.table == "table1") - assert(source5.database.isEmpty) - assert(source5.table == "table2") + assert(target5 == Seq("table1")) + assert(source5 == Seq("table2")) assert(fileFormat5.locationUri.isEmpty) assert(provider5 == Some("parquet")) - val v6 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING ORC" val (target6, source6, fileFormat6, provider6, properties6, exists6) = - parser.parsePlan(v6).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING ORC") assert(exists6) - assert(target6.database.isEmpty) - assert(target6.table == "table1") - assert(source6.database.isEmpty) - assert(source6.table == "table2") + assert(target6 == Seq("table1")) + assert(source6 == Seq("table2")) assert(fileFormat6.locationUri.isEmpty) assert(provider6 == Some("ORC")) + + // 3-part names: catalog.namespace.table + val (target7, source7, fileFormat7, provider7, properties7, exists7) = + extract("CREATE TABLE testcat.ns.dst LIKE testcat.ns.src") + assert(!exists7) + assert(target7 == Seq("testcat", "ns", "dst")) + assert(source7 == Seq("testcat", "ns", "src")) + assert(fileFormat7.locationUri.isEmpty) + assert(provider7.isEmpty) } test("SET CATALOG") { From c1dc58a13172d78339a3d726b9dbe36d102604fb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Mar 2026 14:31:45 -0700 Subject: [PATCH 02/19] =?UTF-8?q?[SPARK]=20Add=20V2=E2=86=92V1=20cross-cat?= =?UTF-8?q?alog=20tests=20for=20CREATE=20TABLE=20LIKE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add two tests covering the case where the source is a V2 table in a non-session catalog and the target resolves to the session catalog. These exercise the CreateTableLikeExec → V2SessionCatalog path and confirm that schema and partitioning are correctly propagated. Co-Authored-By: Claude Sonnet 4.6 --- .../sql/connector/CreateTableLikeSuite.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index 09c05c96fb311..d715a009b59ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -195,6 +195,39 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { } } + // ------------------------------------------------------------------------- + // V2 source, session catalog (V1) target + // ------------------------------------------------------------------------- + + test("v2 source, v1 target: session catalog target with v2 catalog source") { + // Source is a pure V2 table in testcat (InMemoryTable, not a V1Table). + // Target is the session catalog. ResolvedV1TableOrViewIdentifier does not match + // the V2 source, so ResolveSessionCatalog falls through and CreateTableLikeExec + // is used (targeting V2SessionCatalog which backs the session catalog). + // The source must use a real provider (parquet) so that V2SessionCatalog can + // validate it when creating the target table. + withTable("testcat.src", "dst") { + sql("CREATE TABLE testcat.src (id bigint, data string) USING parquet") + sql("CREATE TABLE dst LIKE testcat.src") + + assert(spark.catalog.tableExists("dst")) + val schema = spark.table("dst").schema + assert(schema.fieldNames === Array("id", "data")) + } + } + + test("v2 source, v1 target: schema and partitioning are copied") { + withTable("testcat.src", "dst") { + sql("CREATE TABLE testcat.src (id bigint, data string) USING parquet " + + "PARTITIONED BY (data)") + sql("CREATE TABLE dst LIKE testcat.src") + + assert(spark.catalog.tableExists("dst")) + val schema = spark.table("dst").schema + assert(schema.fieldNames === Array("id", "data")) + } + } + // ------------------------------------------------------------------------- // V1 fallback regression // ------------------------------------------------------------------------- From 590b4fabcd8da5df3de584c04630dad67de2807f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Mar 2026 15:59:46 -0700 Subject: [PATCH 03/19] [SPARK] Add tests verifying V2 catalog provider validation behavior Add two tests to CreateTableLikeSuite documenting that pure V2 catalogs (e.g. InMemoryCatalog) accept any provider string without validation, while V2SessionCatalog rejects non-existent providers by delegating to DataSource.lookupDataSource. This is consistent with how CreateTableExec handles the USING clause for other V2 DDL commands. Co-Authored-By: Claude Sonnet 4.6 --- .../sql/connector/CreateTableLikeSuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index d715a009b59ed..fecb3c0518e93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -173,6 +173,35 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { } } + test("v2 catalog target: non-existent provider is stored as property without validation") { + // Pure V2 catalogs (e.g. InMemoryCatalog) do not validate the provider — they store it + // as a plain property and let the catalog implementation decide what to do with it. + // This is consistent with how CreateTableExec works for CREATE TABLE targeting V2 catalogs. + withTable("testcat.src", "testcat.dst") { + sql("CREATE TABLE testcat.src (id bigint) USING foo") + sql("CREATE TABLE testcat.dst LIKE testcat.src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.get("provider") === "foo", + "Provider should be copied from source as-is without validation") + } + } + + test("session catalog target: non-existent provider from source is rejected") { + // V2SessionCatalog bridges to the V1 DataSource world and calls + // DataSource.lookupDataSource(provider) when creating the target table. + // A non-existent provider therefore causes a DATA_SOURCE_NOT_FOUND error, + // unlike pure V2 catalogs which accept any provider string. + withTable("testcat.src") { + sql("CREATE TABLE testcat.src (id bigint) USING foo") + val ex = intercept[Exception] { + sql("CREATE TABLE dst LIKE testcat.src") + } + assert(ex.getMessage.contains("foo"), + "Error should mention the unresolvable provider") + } + } + // ------------------------------------------------------------------------- // Column type fidelity // ------------------------------------------------------------------------- From 7519e0cf8e332f889c8945bc41c6a2d4e8d3bb71 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Mar 2026 16:10:12 -0700 Subject: [PATCH 04/19] [SPARK] Add tests for provider copy and CHAR/VARCHAR preservation in CREATE TABLE LIKE Two new tests covering previously untested code paths in CreateTableLikeExec: - Source provider is copied to V2 target as PROP_PROVIDER when no USING override is given, consistent with how CreateTableExec handles other V2 DDL. - CHAR(n)/VARCHAR(n) types declared on a V1 source are preserved in the V2 target via CharVarcharUtils.getRawSchema, not collapsed to StringType. Co-Authored-By: Claude Sonnet 4.6 --- .../sql/connector/CreateTableLikeSuite.scala | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index fecb3c0518e93..0dcb454e28d99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryCatalog} -import org.apache.spark.sql.types.{IntegerType, LongType, StringType} +import org.apache.spark.sql.types.{CharType, IntegerType, LongType, StringType, VarcharType} class CreateTableLikeSuite extends DatasourceV2SQLBase { @@ -202,6 +202,19 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { } } + test("source provider is copied to v2 target when no USING override") { + // When no USING clause is given, CreateTableLikeExec copies the provider from the + // source table into PROP_PROVIDER of the target's TableInfo properties. + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.get("provider") === "parquet", + "Source provider should be copied to the V2 target when no USING clause is specified") + } + } + // ------------------------------------------------------------------------- // Column type fidelity // ------------------------------------------------------------------------- @@ -224,6 +237,20 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { } } + test("CHAR and VARCHAR types are preserved from v1 source to v2 target") { + // CreateTableLikeExec calls CharVarcharUtils.getRawSchema on V1Table sources so that + // CHAR(n)/VARCHAR(n) declarations survive the copy instead of being collapsed to StringType. + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint, name CHAR(10), tag VARCHAR(20)) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val schema = CatalogV2Util.v2ColumnsToStructType(dst.columns()) + assert(schema("name").dataType === CharType(10)) + assert(schema("tag").dataType === VarcharType(20)) + } + } + // ------------------------------------------------------------------------- // V2 source, session catalog (V1) target // ------------------------------------------------------------------------- From b0000592a112b95c12165b9cbd7e848feb299b3f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Mar 2026 17:15:59 -0700 Subject: [PATCH 05/19] [SPARK] Document why constraints are not copied in CreateTableLikeExec Add inline comment explaining the six reasons withConstraints is intentionally omitted: V1 behavior parity, ForeignKey cross-catalog dangling references, constraint name collision risk, validation status semantics on empty tables, NOT NULL already captured in nullability, and PostgreSQL precedent (INCLUDING CONSTRAINTS opt-in). Also notes the path forward if constraint copying is added in the future. Co-Authored-By: Claude Sonnet 4.6 --- .../datasources/v2/CreateTableLikeExec.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index b4c1f9c0336b8..0819875408bf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -90,6 +90,20 @@ case class CreateTableLikeExec( locationProp try { + // Constraints from the source table are intentionally NOT copied for several reasons: + // 1. Matches V1 behavior: CreateTableLikeCommand never copied constraints. + // 2. ForeignKey constraints carry a refTable Identifier bound to the source catalog; + // copying them to a different catalog creates dangling cross-catalog references. + // 3. Constraint names must be unique within a namespace; blindly copying them risks + // collisions with existing constraints in the target namespace. + // 4. Validation status (VALID/INVALID/UNVALIDATED) is tied to the source table's + // existing data and is meaningless on a newly created empty target table. + // 5. NOT NULL is already captured in Column.nullable() and copied via withColumns. + // 6. Consistent with PostgreSQL semantics: CREATE TABLE LIKE does not include + // constraints by default; users must explicitly opt in via INCLUDING CONSTRAINTS. + // If constraint copying is desired, use ALTER TABLE ADD CONSTRAINT after creation. + // If we wanted to support them in the future, the right approach would be to add an + // INCLUDING CONSTRAINTS clause (as PostgreSQL does) rather than copying blindly. val tableInfo = new TableInfo.Builder() .withColumns(columns) .withPartitions(partitioning) From 6fa11f629536027dd3f22bb2c5f223b5aae49e4c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Mar 2026 17:27:02 -0700 Subject: [PATCH 06/19] [SPARK] Correct constraint comment in CreateTableLikeExec Clarify that V1 tables (CatalogTable) have no constraint objects at all since CHECK/PRIMARY KEY/UNIQUE/FOREIGN KEY are V2-only concepts added in Spark 4.1.0, rather than saying CreateTableLikeCommand "never copied" them which implies an intentional decision rather than absence of the feature. Co-Authored-By: Claude Sonnet 4.6 --- .../sql/execution/datasources/v2/CreateTableLikeExec.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index 0819875408bf7..a85207de41c54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -91,7 +91,10 @@ case class CreateTableLikeExec( try { // Constraints from the source table are intentionally NOT copied for several reasons: - // 1. Matches V1 behavior: CreateTableLikeCommand never copied constraints. + // 1. V1 tables (CatalogTable) have no constraint objects — CHECK, PRIMARY KEY, + // UNIQUE and FOREIGN KEY are V2-only concepts (introduced in Spark 4.1.0). + // CreateTableLikeCommand had nothing to copy, so not copying here preserves + // behavioral parity with the V1 path. // 2. ForeignKey constraints carry a refTable Identifier bound to the source catalog; // copying them to a different catalog creates dangling cross-catalog references. // 3. Constraint names must be unique within a namespace; blindly copying them risks From 62bda93b9230ff0a7e060f14a0296f88a2ea508d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Mar 2026 23:20:17 -0700 Subject: [PATCH 07/19] [SPARK] Update charvarchar.sql analyzer golden file for fully qualified identifiers After the CREATE TABLE LIKE V2 change, the target and source identifiers in CreateTableLikeCommand are now fully qualified (spark_catalog.default.*) because ResolvedV1Identifier explicitly adds the catalog component via ident.asTableIdentifier.copy(catalog = Some(catalog.name)), and ResolvedV1TableIdentifier returns t.catalogTable.identifier which also includes the catalog. Update the analyzer golden file accordingly. Co-Authored-By: Claude Sonnet 4.6 --- .../resources/sql-tests/analyzer-results/charvarchar.sql.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out index 4e864523368d7..7bd87a4267c77 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out @@ -55,7 +55,7 @@ DescribeColumnCommand `spark_catalog`.`default`.`char_tbl2`, [spark_catalog, def -- !query create table char_tbl3 like char_tbl -- !query analysis -CreateTableLikeCommand `char_tbl3`, `char_tbl`, Storage(), false +CreateTableLikeCommand `spark_catalog`.`default`.`char_tbl3`, `spark_catalog`.`default`.`char_tbl`, Storage(), false -- !query From 1823a9537047cebfd635c9675680a84df1ae4242 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 16 Mar 2026 22:55:03 -0700 Subject: [PATCH 08/19] [SPARK] Address PR review comments on CREATE TABLE LIKE V2 - Narrow CreateTableLikeExec parameter from fileFormat: CatalogStorageFormat to location: Option[URI] since only locationUri is used; extract at the DataSourceV2Strategy callsite (gengliangwang) - Add withDefaultOwnership to finalProps so the target table records the current user as owner, consistent with CreateTableExec (aokolnychyi) - Consolidate three CreateTableLike pattern match cases in DataSourceV2Strategy into a single case with an inner match on the source (gengliangwang) - Shorten the constraint comment and add a note on source provider inheritance (aokolnychyi) - Fix DDLParserSuite comment: source is UnresolvedTableOrView, not an unresolved identifier (gengliangwang) Co-Authored-By: Claude Sonnet 4.6 --- .../datasources/v2/CreateTableLikeExec.scala | 44 ++++++++----------- .../datasources/v2/DataSourceV2Strategy.scala | 31 ++++--------- .../execution/command/DDLParserSuite.scala | 3 +- 3 files changed, 30 insertions(+), 48 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index a85207de41c54..93f99f1c52633 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -17,12 +17,14 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.net.URI + import scala.jdk.CollectionConverters._ import org.apache.spark.internal.LogKeys.TABLE_NAME import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog, TableInfo, V1Table} @@ -42,7 +44,7 @@ case class CreateTableLikeExec( targetCatalog: TableCatalog, targetIdent: Identifier, sourceTable: Table, - fileFormat: CatalogStorageFormat, + location: Option[URI], provider: Option[String], properties: Map[String, String], ifNotExists: Boolean) extends LeafV2CommandExec { @@ -66,6 +68,9 @@ case class CreateTableLikeExec( val partitioning = sourceTable.partitioning // 3. Resolve provider: USING clause overrides, else copy from source. + // The source provider is inherited so that the target table uses the same format, + // matching V1 CreateTableLikeCommand behavior. Whether the target catalog validates + // or uses this property is catalog-specific (e.g. V2SessionCatalog validates it). val resolvedProvider = provider.orElse { sourceTable match { case v1: V1Table if v1.catalogTable.tableType == CatalogTableType.VIEW => @@ -80,33 +85,22 @@ case class CreateTableLikeExec( } // 4. Build final properties. User-specified TBLPROPERTIES are used as-is; source table - // properties are NOT copied. Provider and location are added if determined above. + // properties are NOT copied. Provider, location, and owner are added if applicable. val locationProp: Option[(String, String)] = - fileFormat.locationUri.map(uri => - TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri)) + location.map(uri => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri)) - val finalProps = properties ++ - resolvedProvider.map(TableCatalog.PROP_PROVIDER -> _) ++ - locationProp + val finalProps = CatalogV2Util.withDefaultOwnership( + properties ++ + resolvedProvider.map(TableCatalog.PROP_PROVIDER -> _) ++ + locationProp) try { - // Constraints from the source table are intentionally NOT copied for several reasons: - // 1. V1 tables (CatalogTable) have no constraint objects — CHECK, PRIMARY KEY, - // UNIQUE and FOREIGN KEY are V2-only concepts (introduced in Spark 4.1.0). - // CreateTableLikeCommand had nothing to copy, so not copying here preserves - // behavioral parity with the V1 path. - // 2. ForeignKey constraints carry a refTable Identifier bound to the source catalog; - // copying them to a different catalog creates dangling cross-catalog references. - // 3. Constraint names must be unique within a namespace; blindly copying them risks - // collisions with existing constraints in the target namespace. - // 4. Validation status (VALID/INVALID/UNVALIDATED) is tied to the source table's - // existing data and is meaningless on a newly created empty target table. - // 5. NOT NULL is already captured in Column.nullable() and copied via withColumns. - // 6. Consistent with PostgreSQL semantics: CREATE TABLE LIKE does not include - // constraints by default; users must explicitly opt in via INCLUDING CONSTRAINTS. - // If constraint copying is desired, use ALTER TABLE ADD CONSTRAINT after creation. - // If we wanted to support them in the future, the right approach would be to add an - // INCLUDING CONSTRAINTS clause (as PostgreSQL does) rather than copying blindly. + // Constraints are intentionally NOT copied: V1 tables have no constraint objects + // (CHECK/PRIMARY KEY/UNIQUE/FOREIGN KEY are V2-only, added in Spark 4.1.0); + // ForeignKey carries a catalog-specific Identifier that becomes stale cross-catalog; + // constraint names risk collisions in the target namespace; and NOT NULL is already + // captured in Column.nullable(). Use ALTER TABLE ADD CONSTRAINT after creation, or + // add an INCLUDING CONSTRAINTS clause in the future (following PostgreSQL semantics). val tableInfo = new TableInfo.Builder() .withColumns(columns) .withPartitions(partitioning) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index e1a8e8b502f15..b73ddbb96876f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -242,30 +242,17 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat // CREATE TABLE ... LIKE ... for a v2 catalog target. // Source is an already-resolved Table object; no extra catalog round-trip is needed. + // Views are wrapped in V1Table so the exec can extract schema and provider uniformly. case CreateTableLike( - ResolvedIdentifier(catalog, ident), - ResolvedTable(_, _, table, _), + ResolvedIdentifier(catalog, ident), source, fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) => - CreateTableLikeExec( - catalog.asTableCatalog, ident, table, fileFormat, provider, properties, ifNotExists) :: Nil - - // Source is a persistent or temporary view; wrap its CatalogTable in V1Table so the - // exec can extract schema and provider uniformly. - case CreateTableLike( - ResolvedIdentifier(catalog, ident), - ResolvedPersistentView(_, _, meta), - fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) => - CreateTableLikeExec( - catalog.asTableCatalog, ident, V1Table(meta), - fileFormat, provider, properties, ifNotExists) :: Nil - - case CreateTableLike( - ResolvedIdentifier(catalog, ident), - ResolvedTempView(_, meta), - fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) => - CreateTableLikeExec( - catalog.asTableCatalog, ident, V1Table(meta), - fileFormat, provider, properties, ifNotExists) :: Nil + val table = source match { + case ResolvedTable(_, _, t, _) => t + case ResolvedPersistentView(_, _, meta) => V1Table(meta) + case ResolvedTempView(_, meta) => V1Table(meta) + } + CreateTableLikeExec(catalog.asTableCatalog, ident, table, + fileFormat.locationUri, provider, properties, ifNotExists) :: Nil case RefreshTable(r: ResolvedTable) => RefreshTableExec(r.catalog, r.identifier, recacheTable(r, includeTimeTravel = true)) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 2fa323c1dc9cb..24ca86b26a52d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -723,7 +723,8 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { test("create table like") { // Helper to extract fields from the new CreateTableLike unresolved plan. // The parser now emits CreateTableLike (v2 logical plan) instead of - // CreateTableLikeCommand, so both name and source are unresolved identifiers. + // CreateTableLikeCommand; the name is an UnresolvedIdentifier and the source is + // an UnresolvedTableOrView. def extract(sql: String): (Seq[String], Seq[String], org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat, Option[String], Map[String, String], Boolean) = From 7637e1f618ad1d1635bad0d85249e86ca6c4b55f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 16 Mar 2026 23:10:55 -0700 Subject: [PATCH 09/19] [SPARK] Add CatalogExtension test for CREATE TABLE LIKE V2 path Tests that when a DSv2 connector overrides the session catalog via CatalogExtension (e.g. Iceberg SparkSessionCatalog), CREATE TABLE LIKE with a native V2 source correctly uses CreateTableLikeExec and creates the target as a native V2 table in the extension catalog. Co-Authored-By: Claude Sonnet 4.6 --- .../sql/connector/CreateTableLikeSuite.scala | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index 0dcb454e28d99..b728cc7f242c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -284,6 +284,41 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { } } + // ------------------------------------------------------------------------- + // CatalogExtension (Iceberg-style session catalog override) scenario + // ------------------------------------------------------------------------- + + test("CatalogExtension session catalog override: source is native V2 table, uses V2 exec path") { + // In this suite the session catalog is already overridden with InMemoryTableSessionCatalog, + // which implements CatalogExtension — the same pattern used by Iceberg's SparkSessionCatalog. + // + // When the source is a native V2 InMemoryTable (from testcat, not a V1Table): + // - supportsV1Command returns true (CatalogExtension catalog) + // - but ResolvedV1TableOrViewIdentifier does NOT match a non-V1Table source + // - so CreateTableLikeExec (V2 exec path) is used instead of CreateTableLikeCommand + // - CreateTableLikeExec calls InMemoryTableSessionCatalog.createTable which stores + // the target as a native InMemoryTable in the extension catalog + withTable("testcat.src", "dst") { + sql("CREATE TABLE testcat.src (id bigint, data string) USING parquet") + sql("CREATE TABLE dst LIKE testcat.src") + + // The target should exist and have the correct schema + assert(spark.catalog.tableExists("dst")) + val schema = spark.table("dst").schema + assert(schema.fieldNames === Array("id", "data")) + + // The target was created through the CatalogExtension catalog; verify it is an + // InMemoryTable (the native V2 type used by InMemoryTableSessionCatalog) rather than + // a V1Table backed by the Hive metastore. + val extCatalog = spark.sessionState.catalogManager + .catalog("spark_catalog") + .asInstanceOf[InMemoryTableSessionCatalog] + val dst = extCatalog.loadTable(Identifier.of(Array("default"), "dst")) + assert(dst.isInstanceOf[org.apache.spark.sql.connector.catalog.InMemoryTable], + "Target table should be a native V2 InMemoryTable in the CatalogExtension catalog") + } + } + // ------------------------------------------------------------------------- // V1 fallback regression // ------------------------------------------------------------------------- From b16c76ce96c1a7215cc614da1d6292dac2bb6bc1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 19 Mar 2026 13:41:11 -0700 Subject: [PATCH 10/19] [SPARK] Add createTableLike API to TableCatalog for connector-delegated copy semantics Add a new default method `createTableLike(Identifier, TableInfo, Table)` to `TableCatalog` so that connectors (Delta, Iceberg, etc.) can implement format-specific CREATE TABLE LIKE semantics by accessing the resolved source `Table` object directly (e.g. Delta protocol inheritance, Iceberg sort order and format version). `TableInfo` contains user-specified overrides (TBLPROPERTIES, LOCATION), resolved provider, and current user as owner. Source TBLPROPERTIES and constraints are NOT bulk-copied; connectors read them from `sourceTable`. The default implementation falls back to `createTable(ident, tableInfo)`. `CreateTableLikeExec` is updated to call `createTableLike` instead of `createTable`. Co-Authored-By: Claude Sonnet 4.6 --- .../sql/connector/catalog/TableCatalog.java | 30 +++++++++++++++++++ .../datasources/v2/CreateTableLikeExec.scala | 27 +++++++++-------- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 33bf615680063..6b92f378b614d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -295,6 +295,36 @@ default Table createTable(Identifier ident, TableInfo tableInfo) return createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties()); } + /** + * Create a table in the catalog by copying metadata from an existing source table. + *

+ * This method is called for {@code CREATE TABLE ... LIKE ...} statements targeting this catalog. + * The {@code tableInfo} parameter contains only the explicit overrides provided by the user + * (TBLPROPERTIES, LOCATION, USING clause) — it does NOT contain properties copied from the + * source table. Connectors that want to copy source-format-specific metadata (e.g. Delta + * protocol version, Iceberg sort order, format version) should read it directly from + * {@code sourceTable}. + *

+ * The default implementation falls back to {@link #createTable(Identifier, TableInfo)}, which + * creates a new table using only the schema and partitioning extracted from {@code sourceTable} + * by Spark, plus the user-specified overrides in {@code tableInfo}. Connectors with + * format-specific copy semantics should override this method. + * + * @param ident a table identifier for the new table + * @param tableInfo user-specified overrides (TBLPROPERTIES, LOCATION), resolved provider, and + * current user as owner; source TBLPROPERTIES are NOT bulk-copied + * @param sourceTable the resolved source table whose metadata is being copied + * @return metadata for the new table + * + * @throws TableAlreadyExistsException If a table or view already exists for the identifier + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + * @since 4.2.0 + */ + default Table createTableLike(Identifier ident, TableInfo tableInfo, Table sourceTable) + throws TableAlreadyExistsException, NoSuchNamespaceException { + return createTable(ident, tableInfo); + } + /** * If true, mark all the fields of the query schema as nullable when executing * CREATE/REPLACE TABLE ... AS SELECT ... and creating the table. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index 93f99f1c52633..10abbb34c3f6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -34,11 +34,15 @@ import org.apache.spark.sql.errors.QueryCompilationErrors /** * Physical plan node for CREATE TABLE ... LIKE ... targeting a v2 catalog. * - * Copies schema (columns) and partitioning from `sourceTable`. The following properties of the - * source table are intentionally NOT copied (matching v1 behavior): - * - Table-level comments - * - Source table's TBLPROPERTIES (user-specified `properties` are used instead) - * - Statistics, owner, create time + * Calls [[TableCatalog.createTableLike]] so that connectors can implement format-specific copy + * semantics (e.g. Delta protocol inheritance, Iceberg sort order and format version). Connectors + * that do not override [[TableCatalog.createTableLike]] fall back to [[TableCatalog.createTable]] + * with only the schema and partitioning extracted from `sourceTable` by Spark. + * + * The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains user-specified overrides + * (TBLPROPERTIES, LOCATION), the resolved provider, and the current user as owner. Source + * TBLPROPERTIES and constraints are NOT bulk-copied; connectors can read them directly from + * [[sourceTable]]. */ case class CreateTableLikeExec( targetCatalog: TableCatalog, @@ -95,18 +99,17 @@ case class CreateTableLikeExec( locationProp) try { - // Constraints are intentionally NOT copied: V1 tables have no constraint objects - // (CHECK/PRIMARY KEY/UNIQUE/FOREIGN KEY are V2-only, added in Spark 4.1.0); - // ForeignKey carries a catalog-specific Identifier that becomes stale cross-catalog; - // constraint names risk collisions in the target namespace; and NOT NULL is already - // captured in Column.nullable(). Use ALTER TABLE ADD CONSTRAINT after creation, or - // add an INCLUDING CONSTRAINTS clause in the future (following PostgreSQL semantics). + // The TableInfo passed to createTableLike contains user-specified overrides + // (TBLPROPERTIES, LOCATION), the resolved provider (from USING clause if specified, + // otherwise inherited from the source), and the current user as owner. Source + // TBLPROPERTIES are NOT bulk-copied; connectors that override createTableLike can + // read source metadata, including properties and constraints, directly from sourceTable. val tableInfo = new TableInfo.Builder() .withColumns(columns) .withPartitions(partitioning) .withProperties(finalProps.asJava) .build() - targetCatalog.createTable(targetIdent, tableInfo) + targetCatalog.createTableLike(targetIdent, tableInfo, sourceTable) } catch { case _: TableAlreadyExistsException if ifNotExists => logWarning( From ffc7166620b9384e0f8de6e79f4882e1d7680b0b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 19 Mar 2026 15:38:48 -0700 Subject: [PATCH 11/19] [SPARK] Implement createTableLike in InMemoryTableCatalog with tests InMemoryTableCatalog overrides createTableLike to demonstrate connector-specific copy semantics: source properties are merged into the target (user overrides win), and source constraints are copied from sourceTable.constraints() directly. BasicInMemoryTableCatalog does not override createTableLike and uses the default fallback, which copies only schema, partitioning, and user-specified overrides. Tests added to CatalogSuite covering: - User-specified properties in tableInfo are applied to the target - Source properties are copied by the connector implementation - User-specified properties override source properties - Source constraints are copied by the connector implementation - Default fallback does not copy source properties CreateTableLikeSuite updated to reflect that InMemoryTableCatalog's createTableLike copies source properties, and adds a test for user override precedence. Co-Authored-By: Claude Sonnet 4.6 --- .../sql/connector/catalog/CatalogSuite.scala | 128 ++++++++++++++++++ .../catalog/InMemoryTableCatalog.scala | 12 ++ .../sql/connector/CreateTableLikeSuite.scala | 22 ++- 3 files changed, 158 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index b4a4c6f46cda4..2949a1636b641 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -1377,4 +1377,132 @@ class CatalogSuite extends SparkFunSuite { catalog.alterTable(testIdent, TableChange.addConstraint(constraints.apply(0), "3")) assert(catalog.loadTable(testIdent).version() == "4") } + + // ---- createTableLike tests ---- + + test("createTableLike: user-specified properties in tableInfo are applied to target") { + val catalog = newCatalog() + val srcIdent = Identifier.of(Array("ns"), "src") + val dstIdent = Identifier.of(Array("ns"), "dst") + + val srcProps = Map("source.key" -> "source.value").asJava + catalog.createTable(srcIdent, columns, emptyTrans, srcProps) + val sourceTable = catalog.loadTable(srcIdent) + + // tableInfo contains only user overrides + val overrides = Map("user.key" -> "user.value").asJava + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(emptyTrans) + .withProperties(overrides) + .build() + catalog.createTableLike(dstIdent, tableInfo, sourceTable) + + val dst = catalog.loadTable(dstIdent) + assert(dst.properties.asScala("user.key") == "user.value", + "user-specified properties should be applied to the target") + } + + test("createTableLike: source properties are copied to target by connector implementation") { + val catalog = newCatalog() + val srcIdent = Identifier.of(Array("ns"), "src") + val dstIdent = Identifier.of(Array("ns"), "dst") + + val srcProps = Map("format.version" -> "2", "format.feature" -> "deletion-vectors").asJava + catalog.createTable(srcIdent, columns, emptyTrans, srcProps) + val sourceTable = catalog.loadTable(srcIdent) + + // tableInfo contains no overrides; connector should copy from sourceTable + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(emptyTrans) + .withProperties(emptyProps) + .build() + catalog.createTableLike(dstIdent, tableInfo, sourceTable) + + val dst = catalog.loadTable(dstIdent) + assert(dst.properties.asScala("format.version") == "2", + "connector should copy source properties from sourceTable") + assert(dst.properties.asScala("format.feature") == "deletion-vectors", + "connector should copy source properties from sourceTable") + } + + test("createTableLike: user-specified properties override source properties") { + val catalog = newCatalog() + val srcIdent = Identifier.of(Array("ns"), "src") + val dstIdent = Identifier.of(Array("ns"), "dst") + + val srcProps = Map("format.version" -> "1", "source.only" -> "yes").asJava + catalog.createTable(srcIdent, columns, emptyTrans, srcProps) + val sourceTable = catalog.loadTable(srcIdent) + + // user explicitly overrides format.version + val overrides = Map("format.version" -> "2").asJava + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(emptyTrans) + .withProperties(overrides) + .build() + catalog.createTableLike(dstIdent, tableInfo, sourceTable) + + val dst = catalog.loadTable(dstIdent) + assert(dst.properties.asScala("format.version") == "2", + "user-specified properties should override source properties") + assert(dst.properties.asScala("source.only") == "yes", + "non-overridden source properties should still be copied") + } + + test("createTableLike: source constraints are copied to target by connector implementation") { + val catalog = newCatalog() + val srcIdent = Identifier.of(Array("ns"), "src") + val dstIdent = Identifier.of(Array("ns"), "dst") + + val srcTableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(emptyTrans) + .withProperties(emptyProps) + .withConstraints(constraints) + .build() + catalog.createTable(srcIdent, srcTableInfo) + val sourceTable = catalog.loadTable(srcIdent) + + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(emptyTrans) + .withProperties(emptyProps) + .build() + catalog.createTableLike(dstIdent, tableInfo, sourceTable) + + val dst = catalog.loadTable(dstIdent) + assert(dst.constraints().toSet == constraints.toSet, + "connector should copy source constraints from sourceTable.constraints()") + } + + test("createTableLike: default implementation falls back to createTable with tableInfo only") { + // BasicInMemoryTableCatalog does not override createTableLike, so it uses the default + // implementation which calls createTable(ident, tableInfo); source properties are NOT copied. + val catalog = new BasicInMemoryTableCatalog + catalog.initialize("basic", CaseInsensitiveStringMap.empty()) + + val srcIdent = Identifier.of(Array("ns"), "src") + val dstIdent = Identifier.of(Array("ns"), "dst") + + val srcProps = Map("source.key" -> "source.value").asJava + catalog.createTable(srcIdent, columns, emptyTrans, srcProps) + val sourceTable = catalog.loadTable(srcIdent) + + val overrides = Map("user.key" -> "user.value").asJava + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(emptyTrans) + .withProperties(overrides) + .build() + catalog.createTableLike(dstIdent, tableInfo, sourceTable) + + val dst = catalog.loadTable(dstIdent) + assert(dst.properties.asScala("user.key") == "user.value", + "user-specified properties should be present") + assert(!dst.properties.containsKey("source.key"), + "default createTableLike does not copy source properties; connector must override to do so") + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index dc580848d09de..4e2a6f6f3ac4f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -238,6 +238,18 @@ class BasicInMemoryTableCatalog extends TableCatalog { class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces with ProcedureCatalog { + override def createTableLike( + ident: Identifier, + tableInfo: TableInfo, + sourceTable: Table): Table = { + // Format-specific behavior: merge source properties with user overrides, with user overrides + // taking precedence. Copy source constraints from sourceTable directly. This demonstrates + // how a connector uses sourceTable to access source-format-specific metadata. + val mergedProps = (sourceTable.properties().asScala ++ tableInfo.properties().asScala).asJava + createTable(ident, tableInfo.columns(), tableInfo.partitions(), mergedProps, + Distributions.unspecified(), Array.empty, None, None, sourceTable.constraints()) + } + override def capabilities: java.util.Set[TableCatalogCapability] = { Set( TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index b728cc7f242c9..d68f3a6a9c033 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -142,14 +142,28 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { // Property / provider behavior // ------------------------------------------------------------------------- - test("source TBLPROPERTIES are NOT copied to target") { + test("source TBLPROPERTIES are copied to target when connector implements createTableLike") { + // InMemoryTableCatalog overrides createTableLike to merge source properties into the target, + // demonstrating connector-specific copy semantics. Connectors that do not override + // createTableLike fall back to createTable and do not copy source properties. withTable("src", "testcat.dst") { - sql("CREATE TABLE src (id bigint) USING parquet TBLPROPERTIES ('secret_key' = 'secret')") + sql("CREATE TABLE src (id bigint) USING parquet TBLPROPERTIES ('source_key' = 'source')") sql("CREATE TABLE testcat.dst LIKE src") val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) - assert(!dst.properties.containsKey("secret_key"), - "Source TBLPROPERTIES should not be copied to target") + assert(dst.properties.containsKey("source_key"), + "Connector-implemented createTableLike copies source TBLPROPERTIES to target") + } + } + + test("user-specified TBLPROPERTIES override source TBLPROPERTIES in createTableLike") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet TBLPROPERTIES ('key' = 'source_value')") + sql("CREATE TABLE testcat.dst LIKE src TBLPROPERTIES ('key' = 'user_value')") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.get("key") == "user_value", + "User-specified TBLPROPERTIES should override source TBLPROPERTIES") } } From dd35048bd3f0b405b182104a72b50e8a8e14b660 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 20 Mar 2026 11:07:36 -0700 Subject: [PATCH 12/19] [SPARK] Redesign createTableLike API: overrides-only TableInfo, throw by default Key changes: - TableCatalog.createTableLike default now throws UnsupportedOperationException instead of falling back to createTable; connectors must explicitly implement it - TableInfo passed to createTableLike contains only user-specified overrides (TBLPROPERTIES, LOCATION, resolved provider, owner); schema, partitioning, and constraints are NOT pre-populated -- connectors read all source metadata directly from sourceTable - CreateTableLikeExec no longer extracts columns/partitioning into TableInfo; removed CharVarcharUtils usage (CHAR/VARCHAR preservation is connector-specific) - InMemoryTableCatalog.createTableLike updated to read columns, partitioning, and constraints from sourceTable directly - TableInfo.Builder.columns defaults to empty array (no longer null) so properties-only builds succeed while requireNonNull guard is preserved - Removed V2->V1 (session catalog target) support and related tests; that path is unsupported -- connectors targeting the session catalog must override createTableLike themselves - Updated CatalogSuite test to verify UnsupportedOperationException is thrown for catalogs that do not override createTableLike Co-Authored-By: Claude Sonnet 4.6 --- .../sql/connector/catalog/TableCatalog.java | 27 ++--- .../sql/connector/catalog/TableInfo.java | 2 +- .../sql/connector/catalog/CatalogSuite.scala | 42 +++----- .../catalog/InMemoryTableCatalog.scala | 8 +- .../datasources/v2/CreateTableLikeExec.scala | 47 +++------ .../sql/connector/CreateTableLikeSuite.scala | 99 +------------------ 6 files changed, 50 insertions(+), 175 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 6b92f378b614d..6f711852a890c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -300,29 +300,32 @@ default Table createTable(Identifier ident, TableInfo tableInfo) *

* This method is called for {@code CREATE TABLE ... LIKE ...} statements targeting this catalog. * The {@code tableInfo} parameter contains only the explicit overrides provided by the user - * (TBLPROPERTIES, LOCATION, USING clause) — it does NOT contain properties copied from the - * source table. Connectors that want to copy source-format-specific metadata (e.g. Delta - * protocol version, Iceberg sort order, format version) should read it directly from - * {@code sourceTable}. + * (TBLPROPERTIES, LOCATION, USING clause), the resolved provider, and the current user as owner. + * It does NOT contain schema, partitioning, properties, or constraints from the source table. + * Connectors must read all source metadata directly from {@code sourceTable}, including + * columns ({@link Table#columns()}), partitioning ({@link Table#partitioning()}), + * constraints ({@link Table#constraints()}), and format-specific properties + * ({@link Table#properties()}). *

- * The default implementation falls back to {@link #createTable(Identifier, TableInfo)}, which - * creates a new table using only the schema and partitioning extracted from {@code sourceTable} - * by Spark, plus the user-specified overrides in {@code tableInfo}. Connectors with - * format-specific copy semantics should override this method. + * There is no default implementation. Connectors that support {@code CREATE TABLE ... LIKE ...} + * must override this method. Connectors that do not override it will throw + * {@link UnsupportedOperationException} when the command is issued against their catalog. * * @param ident a table identifier for the new table - * @param tableInfo user-specified overrides (TBLPROPERTIES, LOCATION), resolved provider, and - * current user as owner; source TBLPROPERTIES are NOT bulk-copied - * @param sourceTable the resolved source table whose metadata is being copied + * @param tableInfo user-specified overrides only: TBLPROPERTIES, LOCATION, resolved provider, + * and owner; source schema, partitioning, and constraints are NOT included + * @param sourceTable the resolved source table; connectors read schema, partitioning, + * constraints, properties, and any format-specific metadata from this object * @return metadata for the new table * * @throws TableAlreadyExistsException If a table or view already exists for the identifier * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + * @throws UnsupportedOperationException If the catalog does not support CREATE TABLE LIKE * @since 4.2.0 */ default Table createTableLike(Identifier ident, TableInfo tableInfo, Table sourceTable) throws TableAlreadyExistsException, NoSuchNamespaceException { - return createTable(ident, tableInfo); + throw new UnsupportedOperationException(name() + " does not support CREATE TABLE LIKE"); } /** diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java index a5b4e333afa87..9870a3b0fa45d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java @@ -61,7 +61,7 @@ public Transform[] partitions() { public Constraint[] constraints() { return constraints; } public static class Builder { - private Column[] columns; + private Column[] columns = new Column[0]; private Map properties = new HashMap<>(); private Transform[] partitions = new Transform[0]; private Constraint[] constraints = new Constraint[0]; diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index 2949a1636b641..cc7aaf4ccfffb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -1389,11 +1389,9 @@ class CatalogSuite extends SparkFunSuite { catalog.createTable(srcIdent, columns, emptyTrans, srcProps) val sourceTable = catalog.loadTable(srcIdent) - // tableInfo contains only user overrides + // tableInfo contains only user overrides; schema and partitioning come from sourceTable val overrides = Map("user.key" -> "user.value").asJava val tableInfo = new TableInfo.Builder() - .withColumns(columns) - .withPartitions(emptyTrans) .withProperties(overrides) .build() catalog.createTableLike(dstIdent, tableInfo, sourceTable) @@ -1412,10 +1410,8 @@ class CatalogSuite extends SparkFunSuite { catalog.createTable(srcIdent, columns, emptyTrans, srcProps) val sourceTable = catalog.loadTable(srcIdent) - // tableInfo contains no overrides; connector should copy from sourceTable + // tableInfo contains no overrides; connector reads schema and properties from sourceTable val tableInfo = new TableInfo.Builder() - .withColumns(columns) - .withPartitions(emptyTrans) .withProperties(emptyProps) .build() catalog.createTableLike(dstIdent, tableInfo, sourceTable) @@ -1439,8 +1435,6 @@ class CatalogSuite extends SparkFunSuite { // user explicitly overrides format.version val overrides = Map("format.version" -> "2").asJava val tableInfo = new TableInfo.Builder() - .withColumns(columns) - .withPartitions(emptyTrans) .withProperties(overrides) .build() catalog.createTableLike(dstIdent, tableInfo, sourceTable) @@ -1467,8 +1461,6 @@ class CatalogSuite extends SparkFunSuite { val sourceTable = catalog.loadTable(srcIdent) val tableInfo = new TableInfo.Builder() - .withColumns(columns) - .withPartitions(emptyTrans) .withProperties(emptyProps) .build() catalog.createTableLike(dstIdent, tableInfo, sourceTable) @@ -1478,31 +1470,29 @@ class CatalogSuite extends SparkFunSuite { "connector should copy source constraints from sourceTable.constraints()") } - test("createTableLike: default implementation falls back to createTable with tableInfo only") { - // BasicInMemoryTableCatalog does not override createTableLike, so it uses the default - // implementation which calls createTable(ident, tableInfo); source properties are NOT copied. + test("createTableLike: catalog without createTableLike override throws " + + "UnsupportedOperationException") { + // BasicInMemoryTableCatalog does not override createTableLike. The default implementation + // throws UnsupportedOperationException to signal that connectors must explicitly implement + // CREATE TABLE LIKE support. val catalog = new BasicInMemoryTableCatalog catalog.initialize("basic", CaseInsensitiveStringMap.empty()) val srcIdent = Identifier.of(Array("ns"), "src") val dstIdent = Identifier.of(Array("ns"), "dst") - val srcProps = Map("source.key" -> "source.value").asJava - catalog.createTable(srcIdent, columns, emptyTrans, srcProps) + catalog.createTable(srcIdent, columns, emptyTrans, Map.empty[String, String].asJava) val sourceTable = catalog.loadTable(srcIdent) - val overrides = Map("user.key" -> "user.value").asJava val tableInfo = new TableInfo.Builder() - .withColumns(columns) - .withPartitions(emptyTrans) - .withProperties(overrides) + .withProperties(Map.empty[String, String].asJava) .build() - catalog.createTableLike(dstIdent, tableInfo, sourceTable) - - val dst = catalog.loadTable(dstIdent) - assert(dst.properties.asScala("user.key") == "user.value", - "user-specified properties should be present") - assert(!dst.properties.containsKey("source.key"), - "default createTableLike does not copy source properties; connector must override to do so") + val ex = intercept[UnsupportedOperationException] { + catalog.createTableLike(dstIdent, tableInfo, sourceTable) + } + assert(ex.getMessage.contains("basic"), + "Exception should mention the catalog name") + assert(ex.getMessage.contains("CREATE TABLE LIKE"), + "Exception should mention the unsupported operation") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 4e2a6f6f3ac4f..3e1eab0b3cfda 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -242,11 +242,11 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp ident: Identifier, tableInfo: TableInfo, sourceTable: Table): Table = { - // Format-specific behavior: merge source properties with user overrides, with user overrides - // taking precedence. Copy source constraints from sourceTable directly. This demonstrates - // how a connector uses sourceTable to access source-format-specific metadata. + // Format-specific behavior: read schema and partitioning from sourceTable, merge source + // properties with user overrides (user overrides win), and copy source constraints. + // This demonstrates how a connector accesses all source metadata from sourceTable. val mergedProps = (sourceTable.properties().asScala ++ tableInfo.properties().asScala).asJava - createTable(ident, tableInfo.columns(), tableInfo.partitions(), mergedProps, + createTable(ident, sourceTable.columns(), sourceTable.partitioning(), mergedProps, Distributions.unspecified(), Array.empty, None, None, sourceTable.constraints()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index 10abbb34c3f6e..78eed2a9b30a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog, TableInfo, V1Table} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -36,13 +35,13 @@ import org.apache.spark.sql.errors.QueryCompilationErrors * * Calls [[TableCatalog.createTableLike]] so that connectors can implement format-specific copy * semantics (e.g. Delta protocol inheritance, Iceberg sort order and format version). Connectors - * that do not override [[TableCatalog.createTableLike]] fall back to [[TableCatalog.createTable]] - * with only the schema and partitioning extracted from `sourceTable` by Spark. + * must override [[TableCatalog.createTableLike]]; the default implementation throws + * [[UnsupportedOperationException]]. * - * The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains user-specified overrides - * (TBLPROPERTIES, LOCATION), the resolved provider, and the current user as owner. Source - * TBLPROPERTIES and constraints are NOT bulk-copied; connectors can read them directly from - * [[sourceTable]]. + * The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains only user-specified + * overrides (TBLPROPERTIES, LOCATION), the resolved provider, and the current user as owner. + * Schema, partitioning, source TBLPROPERTIES, and constraints are NOT pre-populated; connectors + * read all source metadata directly from [[sourceTable]]. */ case class CreateTableLikeExec( targetCatalog: TableCatalog, @@ -57,24 +56,10 @@ case class CreateTableLikeExec( override protected def run(): Seq[InternalRow] = { if (!targetCatalog.tableExists(targetIdent)) { - // 1. Extract columns from source. For V1Table sources use the raw schema so that - // CHAR/VARCHAR types are preserved as declared (without internal metadata expansion). - val columns = sourceTable match { - case v1: V1Table => - val rawSchema = CharVarcharUtils.getRawSchema(v1.catalogTable.schema) - CatalogV2Util.structTypeToV2Columns(rawSchema) - case _ => - sourceTable.columns() - } - - // 2. Extract partitioning from source (includes both partition columns and bucket spec - // for V1Table, as V1Table.partitioning encodes both). - val partitioning = sourceTable.partitioning - - // 3. Resolve provider: USING clause overrides, else copy from source. - // The source provider is inherited so that the target table uses the same format, - // matching V1 CreateTableLikeCommand behavior. Whether the target catalog validates - // or uses this property is catalog-specific (e.g. V2SessionCatalog validates it). + // Resolve provider: USING clause overrides, else inherit from source. + // The source provider is inherited so that the target uses the same format, + // matching V1 CreateTableLikeCommand behavior. Whether the target catalog validates + // or uses this property is catalog-specific (e.g. V2SessionCatalog validates it). val resolvedProvider = provider.orElse { sourceTable match { case v1: V1Table if v1.catalogTable.tableType == CatalogTableType.VIEW => @@ -88,8 +73,9 @@ case class CreateTableLikeExec( } } - // 4. Build final properties. User-specified TBLPROPERTIES are used as-is; source table - // properties are NOT copied. Provider, location, and owner are added if applicable. + // Build overrides-only properties: user TBLPROPERTIES, resolved provider, location, owner. + // Schema, partitioning, source TBLPROPERTIES, and constraints are NOT included here; + // connectors read them directly from sourceTable. val locationProp: Option[(String, String)] = location.map(uri => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri)) @@ -99,14 +85,7 @@ case class CreateTableLikeExec( locationProp) try { - // The TableInfo passed to createTableLike contains user-specified overrides - // (TBLPROPERTIES, LOCATION), the resolved provider (from USING clause if specified, - // otherwise inherited from the source), and the current user as owner. Source - // TBLPROPERTIES are NOT bulk-copied; connectors that override createTableLike can - // read source metadata, including properties and constraints, directly from sourceTable. val tableInfo = new TableInfo.Builder() - .withColumns(columns) - .withPartitions(partitioning) .withProperties(finalProps.asJava) .build() targetCatalog.createTableLike(targetIdent, tableInfo, sourceTable) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index d68f3a6a9c033..da03dae0afb95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryCatalog} -import org.apache.spark.sql.types.{CharType, IntegerType, LongType, StringType, VarcharType} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType} class CreateTableLikeSuite extends DatasourceV2SQLBase { @@ -201,21 +201,6 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { } } - test("session catalog target: non-existent provider from source is rejected") { - // V2SessionCatalog bridges to the V1 DataSource world and calls - // DataSource.lookupDataSource(provider) when creating the target table. - // A non-existent provider therefore causes a DATA_SOURCE_NOT_FOUND error, - // unlike pure V2 catalogs which accept any provider string. - withTable("testcat.src") { - sql("CREATE TABLE testcat.src (id bigint) USING foo") - val ex = intercept[Exception] { - sql("CREATE TABLE dst LIKE testcat.src") - } - assert(ex.getMessage.contains("foo"), - "Error should mention the unresolvable provider") - } - } - test("source provider is copied to v2 target when no USING override") { // When no USING clause is given, CreateTableLikeExec copies the provider from the // source table into PROP_PROVIDER of the target's TableInfo properties. @@ -251,88 +236,6 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { } } - test("CHAR and VARCHAR types are preserved from v1 source to v2 target") { - // CreateTableLikeExec calls CharVarcharUtils.getRawSchema on V1Table sources so that - // CHAR(n)/VARCHAR(n) declarations survive the copy instead of being collapsed to StringType. - withTable("src", "testcat.dst") { - sql("CREATE TABLE src (id bigint, name CHAR(10), tag VARCHAR(20)) USING parquet") - sql("CREATE TABLE testcat.dst LIKE src") - - val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) - val schema = CatalogV2Util.v2ColumnsToStructType(dst.columns()) - assert(schema("name").dataType === CharType(10)) - assert(schema("tag").dataType === VarcharType(20)) - } - } - - // ------------------------------------------------------------------------- - // V2 source, session catalog (V1) target - // ------------------------------------------------------------------------- - - test("v2 source, v1 target: session catalog target with v2 catalog source") { - // Source is a pure V2 table in testcat (InMemoryTable, not a V1Table). - // Target is the session catalog. ResolvedV1TableOrViewIdentifier does not match - // the V2 source, so ResolveSessionCatalog falls through and CreateTableLikeExec - // is used (targeting V2SessionCatalog which backs the session catalog). - // The source must use a real provider (parquet) so that V2SessionCatalog can - // validate it when creating the target table. - withTable("testcat.src", "dst") { - sql("CREATE TABLE testcat.src (id bigint, data string) USING parquet") - sql("CREATE TABLE dst LIKE testcat.src") - - assert(spark.catalog.tableExists("dst")) - val schema = spark.table("dst").schema - assert(schema.fieldNames === Array("id", "data")) - } - } - - test("v2 source, v1 target: schema and partitioning are copied") { - withTable("testcat.src", "dst") { - sql("CREATE TABLE testcat.src (id bigint, data string) USING parquet " + - "PARTITIONED BY (data)") - sql("CREATE TABLE dst LIKE testcat.src") - - assert(spark.catalog.tableExists("dst")) - val schema = spark.table("dst").schema - assert(schema.fieldNames === Array("id", "data")) - } - } - - // ------------------------------------------------------------------------- - // CatalogExtension (Iceberg-style session catalog override) scenario - // ------------------------------------------------------------------------- - - test("CatalogExtension session catalog override: source is native V2 table, uses V2 exec path") { - // In this suite the session catalog is already overridden with InMemoryTableSessionCatalog, - // which implements CatalogExtension — the same pattern used by Iceberg's SparkSessionCatalog. - // - // When the source is a native V2 InMemoryTable (from testcat, not a V1Table): - // - supportsV1Command returns true (CatalogExtension catalog) - // - but ResolvedV1TableOrViewIdentifier does NOT match a non-V1Table source - // - so CreateTableLikeExec (V2 exec path) is used instead of CreateTableLikeCommand - // - CreateTableLikeExec calls InMemoryTableSessionCatalog.createTable which stores - // the target as a native InMemoryTable in the extension catalog - withTable("testcat.src", "dst") { - sql("CREATE TABLE testcat.src (id bigint, data string) USING parquet") - sql("CREATE TABLE dst LIKE testcat.src") - - // The target should exist and have the correct schema - assert(spark.catalog.tableExists("dst")) - val schema = spark.table("dst").schema - assert(schema.fieldNames === Array("id", "data")) - - // The target was created through the CatalogExtension catalog; verify it is an - // InMemoryTable (the native V2 type used by InMemoryTableSessionCatalog) rather than - // a V1Table backed by the Hive metastore. - val extCatalog = spark.sessionState.catalogManager - .catalog("spark_catalog") - .asInstanceOf[InMemoryTableSessionCatalog] - val dst = extCatalog.loadTable(Identifier.of(Array("default"), "dst")) - assert(dst.isInstanceOf[org.apache.spark.sql.connector.catalog.InMemoryTable], - "Target table should be a native V2 InMemoryTable in the CatalogExtension catalog") - } - } - // ------------------------------------------------------------------------- // V1 fallback regression // ------------------------------------------------------------------------- From ba70454666c218f81f68dc24ba7c72b6a585aff0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 20 Mar 2026 11:09:31 -0700 Subject: [PATCH 13/19] [SPARK] Add test verifying V2->session catalog LIKE throws UnsupportedOperationException The session catalog does not implement createTableLike, so CREATE TABLE LIKE targeting it with a V2 source should throw UnsupportedOperationException. Co-Authored-By: Claude Sonnet 4.6 --- .../sql/connector/CreateTableLikeSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index da03dae0afb95..2c262868deab9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -236,6 +236,25 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { } } + // ------------------------------------------------------------------------- + // V2 source, session catalog (V1) target: unsupported + // ------------------------------------------------------------------------- + + test("v2 source, session catalog target: throws because session catalog does not " + + "implement createTableLike") { + // CREATE TABLE LIKE targeting the session catalog with a V2 source goes through + // CreateTableLikeExec, which calls createTableLike on the session catalog. + // The session catalog (InMemoryTableSessionCatalog in tests) does not override + // createTableLike, so the default UnsupportedOperationException is thrown. + withTable("testcat.src") { + sql("CREATE TABLE testcat.src (id bigint, data string) USING parquet") + val ex = intercept[UnsupportedOperationException] { + sql("CREATE TABLE dst LIKE testcat.src") + } + assert(ex.getMessage.contains("CREATE TABLE LIKE")) + } + } + // ------------------------------------------------------------------------- // V1 fallback regression // ------------------------------------------------------------------------- From 4057b3b54f6f16f368cb77e751b19e795c7107f9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 20 Mar 2026 20:23:31 -0700 Subject: [PATCH 14/19] [SPARK] Fix Javadoc contradiction in createTableLike default method "There is no default implementation" was contradictory since the method is declared with 'default' and has a body. Reword to accurately say the default implementation throws UnsupportedOperationException. Co-Authored-By: Claude Sonnet 4.6 --- .../org/apache/spark/sql/connector/catalog/TableCatalog.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 6f711852a890c..a3575f53e044e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -307,9 +307,8 @@ default Table createTable(Identifier ident, TableInfo tableInfo) * constraints ({@link Table#constraints()}), and format-specific properties * ({@link Table#properties()}). *

- * There is no default implementation. Connectors that support {@code CREATE TABLE ... LIKE ...} - * must override this method. Connectors that do not override it will throw - * {@link UnsupportedOperationException} when the command is issued against their catalog. + * The default implementation throws {@link UnsupportedOperationException}. Connectors that + * support {@code CREATE TABLE ... LIKE ...} must override this method. * * @param ident a table identifier for the new table * @param tableInfo user-specified overrides only: TBLPROPERTIES, LOCATION, resolved provider, From f1e0955f1c0589cc5820cb965ea576ba05e9254c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 20 Mar 2026 20:33:51 -0700 Subject: [PATCH 15/19] [SPARK] Add CHAR/VARCHAR preservation in InMemoryTableCatalog and fix stale Scaladoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - InMemoryTableCatalog.createTableLike now applies CharVarcharUtils.getRawSchema when the source is a V1Table, preserving CHAR/VARCHAR column types as declared rather than collapsed to StringType. This illustrates the pattern connectors should follow to preserve declared types from V1 sources. - Add test "CHAR and VARCHAR types are preserved from v1 source to v2 target" in CreateTableLikeSuite to verify end-to-end type fidelity. - Fix stale Scaladoc in SparkSqlParser.scala: "Create a [[CreateTableLikeCommand]] command" → "Create a [[CreateTableLike]] logical plan." Co-Authored-By: Claude Sonnet 4.6 --- .../catalog/InMemoryTableCatalog.scala | 15 +++++++++++---- .../spark/sql/execution/SparkSqlParser.scala | 2 +- .../sql/connector/CreateTableLikeSuite.scala | 17 ++++++++++++++++- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 3e1eab0b3cfda..a0f90e025e5cc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, ProcedureParameter, UnboundProcedure} import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} @@ -242,11 +243,17 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp ident: Identifier, tableInfo: TableInfo, sourceTable: Table): Table = { - // Format-specific behavior: read schema and partitioning from sourceTable, merge source - // properties with user overrides (user overrides win), and copy source constraints. - // This demonstrates how a connector accesses all source metadata from sourceTable. + // Read schema from source. For V1Table sources, apply CharVarcharUtils to preserve + // CHAR/VARCHAR types as declared rather than collapsed to StringType. + val columns = sourceTable match { + case v1: V1Table => + CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(v1.catalogTable.schema)) + case _ => + sourceTable.columns() + } + // Merge source properties with user overrides (user overrides win), copy constraints. val mergedProps = (sourceTable.properties().asScala ++ tableInfo.properties().asScala).asJava - createTable(ident, sourceTable.columns(), sourceTable.partitioning(), mergedProps, + createTable(ident, columns, sourceTable.partitioning(), mergedProps, Distributions.unspecified(), Array.empty, None, None, sourceTable.constraints()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 1cf15be1d3a7f..2beae505e59f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1141,7 +1141,7 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Create a [[CreateTableLikeCommand]] command. + * Create a [[CreateTableLike]] logical plan. * * For example: * {{{ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index 2c262868deab9..00afb6166341c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryCatalog} -import org.apache.spark.sql.types.{IntegerType, LongType, StringType} +import org.apache.spark.sql.types.{CharType, IntegerType, LongType, StringType, VarcharType} class CreateTableLikeSuite extends DatasourceV2SQLBase { @@ -218,6 +218,21 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { // Column type fidelity // ------------------------------------------------------------------------- + test("CHAR and VARCHAR types are preserved from v1 source to v2 target") { + // InMemoryTableCatalog.createTableLike applies CharVarcharUtils.getRawSchema when + // the source is a V1Table, preserving CHAR/VARCHAR types as declared. + // This illustrates the pattern connectors should follow to preserve declared types. + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint, name CHAR(10), tag VARCHAR(20)) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val schema = CatalogV2Util.v2ColumnsToStructType(dst.columns()) + assert(schema("name").dataType === CharType(10)) + assert(schema("tag").dataType === VarcharType(20)) + } + } + test("multiple column types are preserved") { withTable("src", "testcat.dst") { sql( From 029a4ce2cd068928ad019a766c76edc62faf70e6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 23 Mar 2026 15:42:46 -0700 Subject: [PATCH 16/19] [SPARK] Address gengliangwang review: rename tableInfo param, qualify location, fix comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address gengliangwang's Mar 23 review comments on the createTableLike API: - Rename `tableInfo` → `userSpecifiedOverrides` and move `sourceTable` before it in `TableCatalog.createTableLike` signature for clarity. Update all callers (InMemoryTableCatalog, CreateTableLikeExec, CatalogSuite). - Fix Javadoc on resolved provider: the provider can come from the USING clause OR be inherited from the source table (not only from USING). - Qualify LOCATION URI in DataSourceV2Strategy like other CREATE TABLE operations do via qualifyLocInTableSpec (absolute URI passthrough, Hadoop-qualified relative). - Add Scaladoc to CreateTableLike explaining why CatalogStorageFormat is retained (ResolveSessionCatalog passes full serde info to CreateTableLikeCommand for V1). - Fix stale test comment: "fall back to createTable" → "throw UnsupportedOperationException". Co-Authored-By: Claude Sonnet 4.6 --- .../spark/sql/connector/catalog/TableCatalog.java | 13 ++++++++----- .../sql/catalyst/plans/logical/v2Commands.scala | 4 ++++ .../spark/sql/connector/catalog/CatalogSuite.scala | 10 +++++----- .../connector/catalog/InMemoryTableCatalog.scala | 7 ++++--- .../datasources/v2/CreateTableLikeExec.scala | 4 ++-- .../datasources/v2/DataSourceV2Strategy.scala | 7 ++++++- .../spark/sql/connector/CreateTableLikeSuite.scala | 2 +- 7 files changed, 30 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index a3575f53e044e..c82df476adc8f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -299,8 +299,9 @@ default Table createTable(Identifier ident, TableInfo tableInfo) * Create a table in the catalog by copying metadata from an existing source table. *

* This method is called for {@code CREATE TABLE ... LIKE ...} statements targeting this catalog. - * The {@code tableInfo} parameter contains only the explicit overrides provided by the user - * (TBLPROPERTIES, LOCATION, USING clause), the resolved provider, and the current user as owner. + * The {@code userSpecifiedOverrides} parameter contains user-specified overrides + * (TBLPROPERTIES, LOCATION), the resolved provider (from the USING clause or inherited + * from the source table), and the current user as owner. * It does NOT contain schema, partitioning, properties, or constraints from the source table. * Connectors must read all source metadata directly from {@code sourceTable}, including * columns ({@link Table#columns()}), partitioning ({@link Table#partitioning()}), @@ -311,10 +312,11 @@ default Table createTable(Identifier ident, TableInfo tableInfo) * support {@code CREATE TABLE ... LIKE ...} must override this method. * * @param ident a table identifier for the new table - * @param tableInfo user-specified overrides only: TBLPROPERTIES, LOCATION, resolved provider, - * and owner; source schema, partitioning, and constraints are NOT included * @param sourceTable the resolved source table; connectors read schema, partitioning, * constraints, properties, and any format-specific metadata from this object + * @param userSpecifiedOverrides user-specified overrides: TBLPROPERTIES, LOCATION, resolved + * provider, and owner; source schema, partitioning, and + * constraints are NOT included * @return metadata for the new table * * @throws TableAlreadyExistsException If a table or view already exists for the identifier @@ -322,7 +324,8 @@ default Table createTable(Identifier ident, TableInfo tableInfo) * @throws UnsupportedOperationException If the catalog does not support CREATE TABLE LIKE * @since 4.2.0 */ - default Table createTableLike(Identifier ident, TableInfo tableInfo, Table sourceTable) + default Table createTableLike( + Identifier ident, Table sourceTable, TableInfo userSpecifiedOverrides) throws TableAlreadyExistsException, NoSuchNamespaceException { throw new UnsupportedOperationException(name() + " does not support CREATE TABLE LIKE"); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 052681d41ad18..14b1cfc3ab9fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -624,6 +624,10 @@ case class CreateTable( * @param source Source table or view. Starts as UnresolvedTableOrView, resolved to * ResolvedTable / ResolvedPersistentView / ResolvedTempView by ResolveRelations. * @param fileFormat User-specified STORED AS / ROW FORMAT (Hive-style). Empty if not specified. + * [[CatalogStorageFormat]] is carried here because ResolveSessionCatalog + * passes the full format (including serde info) to [[CreateTableLikeCommand]] + * for the V1 fallback path. The V2 execution path (DataSourceV2Strategy) + * extracts only [[CatalogStorageFormat#locationUri]] from this field. * @param provider User-specified USING provider. None if not specified. * @param properties User-specified TBLPROPERTIES. * @param ifNotExists IF NOT EXISTS flag. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index cc7aaf4ccfffb..4fd9f6f6da913 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -1394,7 +1394,7 @@ class CatalogSuite extends SparkFunSuite { val tableInfo = new TableInfo.Builder() .withProperties(overrides) .build() - catalog.createTableLike(dstIdent, tableInfo, sourceTable) + catalog.createTableLike(dstIdent, sourceTable, tableInfo) val dst = catalog.loadTable(dstIdent) assert(dst.properties.asScala("user.key") == "user.value", @@ -1414,7 +1414,7 @@ class CatalogSuite extends SparkFunSuite { val tableInfo = new TableInfo.Builder() .withProperties(emptyProps) .build() - catalog.createTableLike(dstIdent, tableInfo, sourceTable) + catalog.createTableLike(dstIdent, sourceTable, tableInfo) val dst = catalog.loadTable(dstIdent) assert(dst.properties.asScala("format.version") == "2", @@ -1437,7 +1437,7 @@ class CatalogSuite extends SparkFunSuite { val tableInfo = new TableInfo.Builder() .withProperties(overrides) .build() - catalog.createTableLike(dstIdent, tableInfo, sourceTable) + catalog.createTableLike(dstIdent, sourceTable, tableInfo) val dst = catalog.loadTable(dstIdent) assert(dst.properties.asScala("format.version") == "2", @@ -1463,7 +1463,7 @@ class CatalogSuite extends SparkFunSuite { val tableInfo = new TableInfo.Builder() .withProperties(emptyProps) .build() - catalog.createTableLike(dstIdent, tableInfo, sourceTable) + catalog.createTableLike(dstIdent, sourceTable, tableInfo) val dst = catalog.loadTable(dstIdent) assert(dst.constraints().toSet == constraints.toSet, @@ -1488,7 +1488,7 @@ class CatalogSuite extends SparkFunSuite { .withProperties(Map.empty[String, String].asJava) .build() val ex = intercept[UnsupportedOperationException] { - catalog.createTableLike(dstIdent, tableInfo, sourceTable) + catalog.createTableLike(dstIdent, sourceTable, tableInfo) } assert(ex.getMessage.contains("basic"), "Exception should mention the catalog name") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index a0f90e025e5cc..5a9f72ba80c82 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -241,8 +241,8 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp override def createTableLike( ident: Identifier, - tableInfo: TableInfo, - sourceTable: Table): Table = { + sourceTable: Table, + userSpecifiedOverrides: TableInfo): Table = { // Read schema from source. For V1Table sources, apply CharVarcharUtils to preserve // CHAR/VARCHAR types as declared rather than collapsed to StringType. val columns = sourceTable match { @@ -252,7 +252,8 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp sourceTable.columns() } // Merge source properties with user overrides (user overrides win), copy constraints. - val mergedProps = (sourceTable.properties().asScala ++ tableInfo.properties().asScala).asJava + val mergedProps = + (sourceTable.properties().asScala ++ userSpecifiedOverrides.properties().asScala).asJava createTable(ident, columns, sourceTable.partitioning(), mergedProps, Distributions.unspecified(), Array.empty, None, None, sourceTable.constraints()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index 78eed2a9b30a6..5e0ef9ec07afa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -85,10 +85,10 @@ case class CreateTableLikeExec( locationProp) try { - val tableInfo = new TableInfo.Builder() + val userSpecifiedOverrides = new TableInfo.Builder() .withProperties(finalProps.asJava) .build() - targetCatalog.createTableLike(targetIdent, tableInfo, sourceTable) + targetCatalog.createTableLike(targetIdent, sourceTable, userSpecifiedOverrides) } catch { case _: TableAlreadyExistsException if ifNotExists => logWarning( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index b73ddbb96876f..4ae3285ac6897 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -251,8 +251,13 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case ResolvedPersistentView(_, _, meta) => V1Table(meta) case ResolvedTempView(_, meta) => V1Table(meta) } + val location = fileFormat.locationUri.map { uri => + if (uri.isAbsolute) uri + else if (new Path(uri).isAbsolute) CatalogUtils.makeQualifiedPath(uri, hadoopConf) + else uri + } CreateTableLikeExec(catalog.asTableCatalog, ident, table, - fileFormat.locationUri, provider, properties, ifNotExists) :: Nil + location, provider, properties, ifNotExists) :: Nil case RefreshTable(r: ResolvedTable) => RefreshTableExec(r.catalog, r.identifier, recacheTable(r, includeTimeTravel = true)) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index 00afb6166341c..e4cf0687334fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -145,7 +145,7 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { test("source TBLPROPERTIES are copied to target when connector implements createTableLike") { // InMemoryTableCatalog overrides createTableLike to merge source properties into the target, // demonstrating connector-specific copy semantics. Connectors that do not override - // createTableLike fall back to createTable and do not copy source properties. + // createTableLike will throw UnsupportedOperationException. withTable("src", "testcat.dst") { sql("CREATE TABLE src (id bigint) USING parquet TBLPROPERTIES ('source_key' = 'source')") sql("CREATE TABLE testcat.dst LIKE src") From bfbbe9626ea254971b5616f03ef28cf1cbd5766b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 23 Mar 2026 17:56:22 -0700 Subject: [PATCH 17/19] [SPARK] Make userSpecifiedOverrides strictly user-overrides-only; remove provider inheritance The provider inherited from the source table is not a user override and should not be placed in userSpecifiedOverrides. Remove the resolvedProvider fallback logic from CreateTableLikeExec; connectors that want to inherit the source provider can read PROP_PROVIDER from sourceTable.properties() themselves (V1Table.properties() already exposes it via v1Table.provider). InMemoryTableCatalog.createTableLike merges sourceTable.properties() into the target, so provider inheritance continues to work for that connector without any change. The exec now only adds PROP_PROVIDER to userSpecifiedOverrides when the user explicitly specifies a USING clause. Co-Authored-By: Claude Sonnet 4.6 --- .../sql/connector/catalog/TableCatalog.java | 12 +++--- .../datasources/v2/CreateTableLikeExec.scala | 38 ++++++------------- .../sql/connector/CreateTableLikeSuite.scala | 5 ++- 3 files changed, 20 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index c82df476adc8f..538031b16c50c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -299,9 +299,9 @@ default Table createTable(Identifier ident, TableInfo tableInfo) * Create a table in the catalog by copying metadata from an existing source table. *

* This method is called for {@code CREATE TABLE ... LIKE ...} statements targeting this catalog. - * The {@code userSpecifiedOverrides} parameter contains user-specified overrides - * (TBLPROPERTIES, LOCATION), the resolved provider (from the USING clause or inherited - * from the source table), and the current user as owner. + * The {@code userSpecifiedOverrides} parameter contains strictly user-specified overrides: + * TBLPROPERTIES, LOCATION, the USING provider (only if explicitly specified), and the + * current user as owner. * It does NOT contain schema, partitioning, properties, or constraints from the source table. * Connectors must read all source metadata directly from {@code sourceTable}, including * columns ({@link Table#columns()}), partitioning ({@link Table#partitioning()}), @@ -314,9 +314,9 @@ default Table createTable(Identifier ident, TableInfo tableInfo) * @param ident a table identifier for the new table * @param sourceTable the resolved source table; connectors read schema, partitioning, * constraints, properties, and any format-specific metadata from this object - * @param userSpecifiedOverrides user-specified overrides: TBLPROPERTIES, LOCATION, resolved - * provider, and owner; source schema, partitioning, and - * constraints are NOT included + * @param userSpecifiedOverrides strictly user-specified overrides: TBLPROPERTIES, LOCATION, + * USING provider (if explicitly given), and owner; source schema, + * partitioning, provider, and constraints are NOT included * @return metadata for the new table * * @throws TableAlreadyExistsException If a table or view already exists for the identifier diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index 5e0ef9ec07afa..972139fe52aaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -24,9 +24,9 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.internal.LogKeys.TABLE_NAME import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException -import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog, TableInfo, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog, TableInfo} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -38,10 +38,10 @@ import org.apache.spark.sql.errors.QueryCompilationErrors * must override [[TableCatalog.createTableLike]]; the default implementation throws * [[UnsupportedOperationException]]. * - * The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains only user-specified - * overrides (TBLPROPERTIES, LOCATION), the resolved provider, and the current user as owner. - * Schema, partitioning, source TBLPROPERTIES, and constraints are NOT pre-populated; connectors - * read all source metadata directly from [[sourceTable]]. + * The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains strictly user-specified + * overrides: TBLPROPERTIES, LOCATION, USING provider (only if explicitly given), and owner. + * Schema, partitioning, source provider, source TBLPROPERTIES, and constraints are NOT + * pre-populated; connectors read all source metadata directly from [[sourceTable]]. */ case class CreateTableLikeExec( targetCatalog: TableCatalog, @@ -56,32 +56,16 @@ case class CreateTableLikeExec( override protected def run(): Seq[InternalRow] = { if (!targetCatalog.tableExists(targetIdent)) { - // Resolve provider: USING clause overrides, else inherit from source. - // The source provider is inherited so that the target uses the same format, - // matching V1 CreateTableLikeCommand behavior. Whether the target catalog validates - // or uses this property is catalog-specific (e.g. V2SessionCatalog validates it). - val resolvedProvider = provider.orElse { - sourceTable match { - case v1: V1Table if v1.catalogTable.tableType == CatalogTableType.VIEW => - // When the source is a view, default to the session's default data source. - // This matches V1 CreateTableLikeCommand behavior. - Some(session.sessionState.conf.defaultDataSourceName) - case v1: V1Table => - v1.catalogTable.provider - case _ => - Option(sourceTable.properties.get(TableCatalog.PROP_PROVIDER)) - } - } - - // Build overrides-only properties: user TBLPROPERTIES, resolved provider, location, owner. - // Schema, partitioning, source TBLPROPERTIES, and constraints are NOT included here; - // connectors read them directly from sourceTable. + // Build strictly user-specified overrides: explicit TBLPROPERTIES, LOCATION (if given), + // USING provider (if given), and the current user as owner. Provider inheritance from + // the source is left to the connector — it can read PROP_PROVIDER from + // sourceTable.properties() and apply its own format-specific semantics. val locationProp: Option[(String, String)] = location.map(uri => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri)) val finalProps = CatalogV2Util.withDefaultOwnership( properties ++ - resolvedProvider.map(TableCatalog.PROP_PROVIDER -> _) ++ + provider.map(TableCatalog.PROP_PROVIDER -> _) ++ locationProp) try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index e4cf0687334fd..a67831cf63503 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -202,8 +202,9 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { } test("source provider is copied to v2 target when no USING override") { - // When no USING clause is given, CreateTableLikeExec copies the provider from the - // source table into PROP_PROVIDER of the target's TableInfo properties. + // When no USING clause is given, provider inheritance is handled by the connector: + // InMemoryTableCatalog.createTableLike merges sourceTable.properties() into the target, + // which includes PROP_PROVIDER set by the source table. withTable("src", "testcat.dst") { sql("CREATE TABLE src (id bigint) USING parquet") sql("CREATE TABLE testcat.dst LIKE src") From 0cf15703509d65aa26c0403cbee5f2781b91d102 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 23 Mar 2026 18:52:43 -0700 Subject: [PATCH 18/19] [SPARK] Remove owner from userSpecifiedOverrides; connector is responsible for setting it The current user as owner is Spark-injected metadata, not a user override, so it should not be in userSpecifiedOverrides. Remove withDefaultOwnership from CreateTableLikeExec; connectors are responsible for setting the owner themselves. InMemoryTableCatalog.createTableLike now explicitly stamps the current user as owner via CurrentUserContext.getCurrentUser after merging source and override properties, ensuring the new table is owned by the user issuing the command (not inherited from the source table's owner). Update TableCatalog.java Javadoc to reflect that owner is not included in userSpecifiedOverrides and is the connector's responsibility. Co-Authored-By: Claude Sonnet 4.6 --- .../sql/connector/catalog/TableCatalog.java | 14 +++++++------- .../catalog/InMemoryTableCatalog.scala | 9 ++++++--- .../datasources/v2/CreateTableLikeExec.scala | 19 ++++++++++--------- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 538031b16c50c..e96777f43df76 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -300,13 +300,13 @@ default Table createTable(Identifier ident, TableInfo tableInfo) *

* This method is called for {@code CREATE TABLE ... LIKE ...} statements targeting this catalog. * The {@code userSpecifiedOverrides} parameter contains strictly user-specified overrides: - * TBLPROPERTIES, LOCATION, the USING provider (only if explicitly specified), and the - * current user as owner. - * It does NOT contain schema, partitioning, properties, or constraints from the source table. - * Connectors must read all source metadata directly from {@code sourceTable}, including + * TBLPROPERTIES, LOCATION, and the USING provider (only if explicitly specified). + * It does NOT contain schema, partitioning, properties, constraints, or owner from the source + * table. Connectors must read all source metadata directly from {@code sourceTable}, including * columns ({@link Table#columns()}), partitioning ({@link Table#partitioning()}), * constraints ({@link Table#constraints()}), and format-specific properties - * ({@link Table#properties()}). + * ({@link Table#properties()}). Connectors are also responsible for setting the owner of the + * new table (e.g. via {@code org.apache.spark.sql.catalyst.CurrentUserContext#getCurrentUser}). *

* The default implementation throws {@link UnsupportedOperationException}. Connectors that * support {@code CREATE TABLE ... LIKE ...} must override this method. @@ -315,8 +315,8 @@ default Table createTable(Identifier ident, TableInfo tableInfo) * @param sourceTable the resolved source table; connectors read schema, partitioning, * constraints, properties, and any format-specific metadata from this object * @param userSpecifiedOverrides strictly user-specified overrides: TBLPROPERTIES, LOCATION, - * USING provider (if explicitly given), and owner; source schema, - * partitioning, provider, and constraints are NOT included + * and USING provider (if explicitly given); source schema, + * partitioning, provider, constraints, and owner are NOT included * @return metadata for the new table * * @throws TableAlreadyExistsException If a table or view already exists for the identifier diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 5a9f72ba80c82..81f4717fc4dfc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.jdk.CollectionConverters._ -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{CurrentUserContext, InternalRow} import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.constraints.Constraint @@ -251,9 +251,12 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp case _ => sourceTable.columns() } - // Merge source properties with user overrides (user overrides win), copy constraints. + // Merge source properties with user overrides (user overrides win), then set current user + // as owner (overrides source owner). Connectors are responsible for setting the owner. val mergedProps = - (sourceTable.properties().asScala ++ userSpecifiedOverrides.properties().asScala).asJava + (sourceTable.properties().asScala ++ + userSpecifiedOverrides.properties().asScala ++ + Map(TableCatalog.PROP_OWNER -> CurrentUserContext.getCurrentUser)).asJava createTable(ident, columns, sourceTable.partitioning(), mergedProps, Distributions.unspecified(), Array.empty, None, None, sourceTable.constraints()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index 972139fe52aaa..c4930b45375b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog, TableInfo} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableInfo} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -39,9 +39,10 @@ import org.apache.spark.sql.errors.QueryCompilationErrors * [[UnsupportedOperationException]]. * * The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains strictly user-specified - * overrides: TBLPROPERTIES, LOCATION, USING provider (only if explicitly given), and owner. - * Schema, partitioning, source provider, source TBLPROPERTIES, and constraints are NOT - * pre-populated; connectors read all source metadata directly from [[sourceTable]]. + * overrides: TBLPROPERTIES, LOCATION, and USING provider (only if explicitly given). + * Schema, partitioning, source provider, source TBLPROPERTIES, constraints, and owner are NOT + * pre-populated; connectors read all source metadata directly from [[sourceTable]] and are + * responsible for setting the owner. */ case class CreateTableLikeExec( targetCatalog: TableCatalog, @@ -57,16 +58,16 @@ case class CreateTableLikeExec( override protected def run(): Seq[InternalRow] = { if (!targetCatalog.tableExists(targetIdent)) { // Build strictly user-specified overrides: explicit TBLPROPERTIES, LOCATION (if given), - // USING provider (if given), and the current user as owner. Provider inheritance from - // the source is left to the connector — it can read PROP_PROVIDER from - // sourceTable.properties() and apply its own format-specific semantics. + // and USING provider (if given). Provider and owner are not included here; connectors + // are responsible for reading PROP_PROVIDER from sourceTable.properties() and for + // setting the owner via CurrentUserContext.getCurrentUser. val locationProp: Option[(String, String)] = location.map(uri => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri)) - val finalProps = CatalogV2Util.withDefaultOwnership( + val finalProps = properties ++ provider.map(TableCatalog.PROP_PROVIDER -> _) ++ - locationProp) + locationProp try { val userSpecifiedOverrides = new TableInfo.Builder() From c482cdbb6e2e82f63440b88f943ab1345f266ab5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 23 Mar 2026 21:25:54 -0700 Subject: [PATCH 19/19] [SPARK] Replace CatalogStorageFormat in CreateTableLike with location+serdeInfo; deduplicate via HiveSerDe - Replace `fileFormat: CatalogStorageFormat` in the `CreateTableLike` logical plan with `location: Option[String]` + `serdeInfo: Option[SerdeInfo]` to keep V2 logical plans free of V1 catalog types. - `SparkSqlParser` passes location and serdeInfo directly; `ResolveSessionCatalog` reconstructs `CatalogStorageFormat` for the V1 fallback path. - Add `HiveSerDe.buildStorageFormat(location, maybeSerdeInfo, invalidStoredAsError)` as a shared helper with a caller-supplied error callback, eliminating the duplicate logic between `SparkSqlParser.toStorageFormat` and `ResolveSessionCatalog.buildStorageFormatFromSerdeInfo`. Co-Authored-By: Claude Sonnet 4.6 --- .../catalyst/analysis/ResolveCatalogs.scala | 2 +- .../catalyst/plans/logical/v2Commands.scala | 16 ++++---- .../analysis/ResolveSessionCatalog.scala | 12 +++++- .../spark/sql/execution/SparkSqlParser.scala | 34 ++++----------- .../datasources/v2/DataSourceV2Strategy.scala | 7 ++-- .../apache/spark/sql/internal/HiveSerDe.scala | 41 ++++++++++++++++++- .../execution/command/DDLParserSuite.scala | 35 ++++++++-------- 7 files changed, 89 insertions(+), 58 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index b7c9638ca75e0..f34c6be9954e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -121,7 +121,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, columns) r.copy(name = resolvedIdentifier) - case c @ CreateTableLike(UnresolvedIdentifier(nameParts, allowTemp), _, _, _, _, _) => + case c @ CreateTableLike(UnresolvedIdentifier(nameParts, allowTemp), _, _, _, _, _, _) => val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, Nil) c.copy(name = resolvedIdentifier) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 14b1cfc3ab9fc..905797eb021c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -21,7 +21,7 @@ import org.apache.spark.{SparkException, SparkIllegalArgumentException, SparkUns import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AssignmentUtils, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, ResolvedProcedure, ResolveSchemaEvolution, TypeCheckResult, UnresolvedAttribute, UnresolvedException, UnresolvedProcedure, ViewSchemaMode} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, FunctionResource, RoutineLanguage} +import org.apache.spark.sql.catalyst.catalog.{FunctionResource, RoutineLanguage} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema @@ -623,20 +623,22 @@ case class CreateTable( * ResolvedIdentifier by ResolveCatalogs. * @param source Source table or view. Starts as UnresolvedTableOrView, resolved to * ResolvedTable / ResolvedPersistentView / ResolvedTempView by ResolveRelations. - * @param fileFormat User-specified STORED AS / ROW FORMAT (Hive-style). Empty if not specified. - * [[CatalogStorageFormat]] is carried here because ResolveSessionCatalog - * passes the full format (including serde info) to [[CreateTableLikeCommand]] - * for the V1 fallback path. The V2 execution path (DataSourceV2Strategy) - * extracts only [[CatalogStorageFormat#locationUri]] from this field. + * @param location User-specified LOCATION. None if not specified. * @param provider User-specified USING provider. None if not specified. + * @param serdeInfo User-specified STORED AS / ROW FORMAT (Hive-style). None if not specified. + * Kept separate from [[location]] so that [[ResolveSessionCatalog]] can + * reconstruct the full + * [[org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat]] + * for the V1 fallback path without pulling V1 catalog types into this plan. * @param properties User-specified TBLPROPERTIES. * @param ifNotExists IF NOT EXISTS flag. */ case class CreateTableLike( name: LogicalPlan, source: LogicalPlan, - fileFormat: CatalogStorageFormat, + location: Option[String], provider: Option[String], + serdeInfo: Option[SerdeInfo], properties: Map[String, String], ifNotExists: Boolean) extends BinaryCommand { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 830d495923057..8860815f182bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -299,7 +299,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case CreateTableLike( ResolvedV1Identifier(targetIdent), ResolvedV1TableOrViewIdentifier(sourceIdent), - fileFormat, provider, properties, ifNotExists) => + location, provider, serdeInfo, properties, ifNotExists) => + val fileFormat = buildStorageFormatFromSerdeInfo(location, serdeInfo) CreateTableLikeCommand( targetIdent, sourceIdent, fileFormat, provider, properties, ifNotExists) @@ -636,6 +637,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) CreateTableV1(tableDesc, mode, query) } + private def buildStorageFormatFromSerdeInfo( + location: Option[String], + maybeSerdeInfo: Option[SerdeInfo]): CatalogStorageFormat = { + HiveSerDe.buildStorageFormat( + location, + maybeSerdeInfo, + si => QueryCompilationErrors.invalidFileFormatForStoredAsError(si)) + } + private def getStorageFormatAndProvider( provider: Option[String], options: Map[String, String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 2beae505e59f6..f19c2063612d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1113,31 +1113,11 @@ class SparkSqlAstBuilder extends AstBuilder { location: Option[String], maybeSerdeInfo: Option[SerdeInfo], ctx: ParserRuleContext): CatalogStorageFormat = { - if (maybeSerdeInfo.isEmpty) { - CatalogStorageFormat.empty.copy(locationUri = location.map(CatalogUtils.stringToURI)) - } else { - val serdeInfo = maybeSerdeInfo.get - if (serdeInfo.storedAs.isEmpty) { - CatalogStorageFormat.empty.copy( - locationUri = location.map(CatalogUtils.stringToURI), - inputFormat = serdeInfo.formatClasses.map(_.input), - outputFormat = serdeInfo.formatClasses.map(_.output), - serde = serdeInfo.serde, - properties = serdeInfo.serdeProperties) - } else { - HiveSerDe.sourceToSerDe(serdeInfo.storedAs.get) match { - case Some(hiveSerde) => - CatalogStorageFormat.empty.copy( - locationUri = location.map(CatalogUtils.stringToURI), - inputFormat = hiveSerde.inputFormat, - outputFormat = hiveSerde.outputFormat, - serde = serdeInfo.serde.orElse(hiveSerde.serde), - properties = serdeInfo.serdeProperties) - case _ => - operationNotAllowed(s"STORED AS with file format '${serdeInfo.storedAs.get}'", ctx) - } - } - } + HiveSerDe.buildStorageFormat( + location, + maybeSerdeInfo, + si => QueryParsingErrors.operationNotAllowedError( + s"STORED AS with file format '${si.storedAs.get}'", ctx)) } /** @@ -1183,14 +1163,14 @@ class SparkSqlAstBuilder extends AstBuilder { case _ => } - val storage = toStorageFormat(location, serdeInfo, ctx) val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) val cleanedProperties = cleanTableProperties(ctx, properties) CreateTableLike( name = withIdentClause(ctx.target, UnresolvedIdentifier(_)), source = createUnresolvedTableOrView(ctx.source, "CREATE TABLE LIKE", allowTempView = true), - fileFormat = storage, + location = location, provider = provider, + serdeInfo = serdeInfo, properties = cleanedProperties, ifNotExists = ctx.EXISTS != null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 4ae3285ac6897..7932a0aa53bac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -25,7 +25,7 @@ import org.apache.spark.{SparkException, SparkIllegalArgumentException} import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.EXPR import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedPersistentView, ResolvedTable, ResolvedTempView} -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral @@ -245,13 +245,14 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat // Views are wrapped in V1Table so the exec can extract schema and provider uniformly. case CreateTableLike( ResolvedIdentifier(catalog, ident), source, - fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) => + locationStr, provider, _, properties, ifNotExists) => val table = source match { case ResolvedTable(_, _, t, _) => t case ResolvedPersistentView(_, _, meta) => V1Table(meta) case ResolvedTempView(_, meta) => V1Table(meta) } - val location = fileFormat.locationUri.map { uri => + val location = locationStr.map { loc => + val uri = CatalogUtils.stringToURI(loc) if (uri.isAbsolute) uri else if (new Path(uri).isAbsolute) CatalogUtils.makeQualifiedPath(uri, hadoopConf) else uri diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index cfcfeabbf1f6e..32ada4beb313a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.internal import java.util.Locale import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogUtils} +import org.apache.spark.sql.catalyst.plans.logical.SerdeInfo case class HiveSerDe( inputFormat: Option[String] = None, @@ -104,6 +105,44 @@ object HiveSerDe { */ def serdeToSource(serde: HiveSerDe): Option[String] = serdeInverseMap.get(serde) + /** + * Builds a [[CatalogStorageFormat]] from a user-specified location and optional serde info. + * Uses [[CatalogStorageFormat.empty]] as the base (no Hive defaults). + * + * @param location optional LOCATION URI string + * @param maybeSerdeInfo optional serde/format info from ROW FORMAT / STORED AS clauses + * @param invalidStoredAsError callback returning a [[Throwable]] when STORED AS names an + * unrecognized file format; the caller controls the error message + */ + def buildStorageFormat( + location: Option[String], + maybeSerdeInfo: Option[SerdeInfo], + invalidStoredAsError: SerdeInfo => Throwable): CatalogStorageFormat = { + val locationUri = location.map(CatalogUtils.stringToURI) + maybeSerdeInfo match { + case None => + CatalogStorageFormat.empty.copy(locationUri = locationUri) + case Some(si) if si.storedAs.isDefined => + sourceToSerDe(si.storedAs.get) match { + case Some(hiveSerde) => + CatalogStorageFormat.empty.copy( + locationUri = locationUri, + inputFormat = hiveSerde.inputFormat, + outputFormat = hiveSerde.outputFormat, + serde = si.serde.orElse(hiveSerde.serde), + properties = si.serdeProperties) + case _ => throw invalidStoredAsError(si) + } + case Some(si) => + CatalogStorageFormat.empty.copy( + locationUri = locationUri, + inputFormat = si.formatClasses.map(_.input), + outputFormat = si.formatClasses.map(_.output), + serde = si.serde, + properties = si.serdeProperties) + } + } + def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = { // To respect hive-site.xml, it peeks Hadoop configuration from existing Spark session, // as an easy workaround. See SPARK-27555. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 24ca86b26a52d..2508a5cee3e02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -726,71 +726,70 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { // CreateTableLikeCommand; the name is an UnresolvedIdentifier and the source is // an UnresolvedTableOrView. def extract(sql: String): (Seq[String], Seq[String], - org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat, - Option[String], Map[String, String], Boolean) = + Option[String], Option[String], Map[String, String], Boolean) = parser.parsePlan(sql).collect { case CreateTableLike( UnresolvedIdentifier(targetParts, _), UnresolvedTableOrView(sourceParts, _, _), - f, p, pr, e) => - (targetParts, sourceParts, f, p, pr, e) + loc, p, _, pr, e) => + (targetParts, sourceParts, loc, p, pr, e) }.head - val (target, source, fileFormat, provider, properties, exists) = + val (target, source, location, provider, properties, exists) = extract("CREATE TABLE table1 LIKE table2") assert(exists == false) assert(target == Seq("table1")) assert(source == Seq("table2")) - assert(fileFormat.locationUri.isEmpty) + assert(location.isEmpty) assert(provider.isEmpty) - val (target2, source2, fileFormat2, provider2, properties2, exists2) = + val (target2, source2, location2, provider2, properties2, exists2) = extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2") assert(exists2) assert(target2 == Seq("table1")) assert(source2 == Seq("table2")) - assert(fileFormat2.locationUri.isEmpty) + assert(location2.isEmpty) assert(provider2.isEmpty) - val (target3, source3, fileFormat3, provider3, properties3, exists3) = + val (target3, source3, location3, provider3, properties3, exists3) = extract("CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'") assert(!exists3) assert(target3 == Seq("table1")) assert(source3 == Seq("table2")) - assert(fileFormat3.locationUri.map(_.toString) == Some("/spark/warehouse")) + assert(location3 == Some("/spark/warehouse")) assert(provider3.isEmpty) - val (target4, source4, fileFormat4, provider4, properties4, exists4) = + val (target4, source4, location4, provider4, properties4, exists4) = extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'") assert(exists4) assert(target4 == Seq("table1")) assert(source4 == Seq("table2")) - assert(fileFormat4.locationUri.map(_.toString) == Some("/spark/warehouse")) + assert(location4 == Some("/spark/warehouse")) assert(provider4.isEmpty) - val (target5, source5, fileFormat5, provider5, properties5, exists5) = + val (target5, source5, location5, provider5, properties5, exists5) = extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING parquet") assert(exists5) assert(target5 == Seq("table1")) assert(source5 == Seq("table2")) - assert(fileFormat5.locationUri.isEmpty) + assert(location5.isEmpty) assert(provider5 == Some("parquet")) - val (target6, source6, fileFormat6, provider6, properties6, exists6) = + val (target6, source6, location6, provider6, properties6, exists6) = extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING ORC") assert(exists6) assert(target6 == Seq("table1")) assert(source6 == Seq("table2")) - assert(fileFormat6.locationUri.isEmpty) + assert(location6.isEmpty) assert(provider6 == Some("ORC")) // 3-part names: catalog.namespace.table - val (target7, source7, fileFormat7, provider7, properties7, exists7) = + val (target7, source7, location7, provider7, properties7, exists7) = extract("CREATE TABLE testcat.ns.dst LIKE testcat.ns.src") assert(!exists7) assert(target7 == Seq("testcat", "ns", "dst")) assert(source7 == Seq("testcat", "ns", "src")) - assert(fileFormat7.locationUri.isEmpty) + assert(location7.isEmpty) assert(provider7.isEmpty) }