diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index a1de1234ef317..918e8eac79bfa 100644
--- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -248,8 +248,8 @@ statement
| createTableHeader (LEFT_PAREN tableElementList RIGHT_PAREN)? tableProvider?
createTableClauses
(AS? query)? #createTable
- | CREATE TABLE (IF errorCapturingNot EXISTS)? target=tableIdentifier
- LIKE source=tableIdentifier
+ | CREATE TABLE (IF errorCapturingNot EXISTS)? target=identifierReference
+ LIKE source=identifierReference
(tableProvider |
rowFormat |
createFileFormat |
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index 33bf615680063..e96777f43df76 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -295,6 +295,41 @@ default Table createTable(Identifier ident, TableInfo tableInfo)
return createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties());
}
+ /**
+ * Create a table in the catalog by copying metadata from an existing source table.
+ *
+ * This method is called for {@code CREATE TABLE ... LIKE ...} statements targeting this catalog.
+ * The {@code userSpecifiedOverrides} parameter contains strictly user-specified overrides:
+ * TBLPROPERTIES, LOCATION, and the USING provider (only if explicitly specified).
+ * It does NOT contain schema, partitioning, properties, constraints, or owner from the source
+ * table. Connectors must read all source metadata directly from {@code sourceTable}, including
+ * columns ({@link Table#columns()}), partitioning ({@link Table#partitioning()}),
+ * constraints ({@link Table#constraints()}), and format-specific properties
+ * ({@link Table#properties()}). Connectors are also responsible for setting the owner of the
+ * new table (e.g. via {@code org.apache.spark.sql.catalyst.CurrentUserContext#getCurrentUser}).
+ *
+ * The default implementation throws {@link UnsupportedOperationException}. Connectors that
+ * support {@code CREATE TABLE ... LIKE ...} must override this method.
+ *
+ * @param ident a table identifier for the new table
+ * @param sourceTable the resolved source table; connectors read schema, partitioning,
+ * constraints, properties, and any format-specific metadata from this object
+ * @param userSpecifiedOverrides strictly user-specified overrides: TBLPROPERTIES, LOCATION,
+ * and USING provider (if explicitly given); source schema,
+ * partitioning, provider, constraints, and owner are NOT included
+ * @return metadata for the new table
+ *
+ * @throws TableAlreadyExistsException If a table or view already exists for the identifier
+ * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
+ * @throws UnsupportedOperationException If the catalog does not support CREATE TABLE LIKE
+ * @since 4.2.0
+ */
+ default Table createTableLike(
+ Identifier ident, Table sourceTable, TableInfo userSpecifiedOverrides)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ throw new UnsupportedOperationException(name() + " does not support CREATE TABLE LIKE");
+ }
+
/**
* If true, mark all the fields of the query schema as nullable when executing
* CREATE/REPLACE TABLE ... AS SELECT ... and creating the table.
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java
index a5b4e333afa87..9870a3b0fa45d 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java
@@ -61,7 +61,7 @@ public Transform[] partitions() {
public Constraint[] constraints() { return constraints; }
public static class Builder {
- private Column[] columns;
+ private Column[] columns = new Column[0];
private Map properties = new HashMap<>();
private Transform[] partitions = new Transform[0];
private Constraint[] constraints = new Constraint[0];
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 316dc8faff3ff..f34c6be9954e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -121,6 +121,10 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, columns)
r.copy(name = resolvedIdentifier)
+ case c @ CreateTableLike(UnresolvedIdentifier(nameParts, allowTemp), _, _, _, _, _, _) =>
+ val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, Nil)
+ c.copy(name = resolvedIdentifier)
+
case UnresolvedIdentifier(nameParts, allowTemp) =>
resolveIdentifier(nameParts, allowTemp, Nil)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index c0fe58596c9fd..905797eb021c8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -615,6 +615,41 @@ case class CreateTable(
}
}
+/**
+ * Create a new table with the same schema/partitioning as an existing table or view,
+ * for use with a v2 catalog.
+ *
+ * @param name Target table identifier. Starts as UnresolvedIdentifier, resolved to
+ * ResolvedIdentifier by ResolveCatalogs.
+ * @param source Source table or view. Starts as UnresolvedTableOrView, resolved to
+ * ResolvedTable / ResolvedPersistentView / ResolvedTempView by ResolveRelations.
+ * @param location User-specified LOCATION. None if not specified.
+ * @param provider User-specified USING provider. None if not specified.
+ * @param serdeInfo User-specified STORED AS / ROW FORMAT (Hive-style). None if not specified.
+ * Kept separate from [[location]] so that [[ResolveSessionCatalog]] can
+ * reconstruct the full
+ * [[org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat]]
+ * for the V1 fallback path without pulling V1 catalog types into this plan.
+ * @param properties User-specified TBLPROPERTIES.
+ * @param ifNotExists IF NOT EXISTS flag.
+ */
+case class CreateTableLike(
+ name: LogicalPlan,
+ source: LogicalPlan,
+ location: Option[String],
+ provider: Option[String],
+ serdeInfo: Option[SerdeInfo],
+ properties: Map[String, String],
+ ifNotExists: Boolean) extends BinaryCommand {
+
+ override def left: LogicalPlan = name
+ override def right: LogicalPlan = source
+
+ override protected def withNewChildrenInternal(
+ newLeft: LogicalPlan, newRight: LogicalPlan): CreateTableLike =
+ copy(name = newLeft, source = newRight)
+}
+
/**
* Create a new table from a select query with a v2 catalog.
*/
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
index b4a4c6f46cda4..4fd9f6f6da913 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
@@ -1377,4 +1377,122 @@ class CatalogSuite extends SparkFunSuite {
catalog.alterTable(testIdent, TableChange.addConstraint(constraints.apply(0), "3"))
assert(catalog.loadTable(testIdent).version() == "4")
}
+
+ // ---- createTableLike tests ----
+
+ test("createTableLike: user-specified properties in tableInfo are applied to target") {
+ val catalog = newCatalog()
+ val srcIdent = Identifier.of(Array("ns"), "src")
+ val dstIdent = Identifier.of(Array("ns"), "dst")
+
+ val srcProps = Map("source.key" -> "source.value").asJava
+ catalog.createTable(srcIdent, columns, emptyTrans, srcProps)
+ val sourceTable = catalog.loadTable(srcIdent)
+
+ // tableInfo contains only user overrides; schema and partitioning come from sourceTable
+ val overrides = Map("user.key" -> "user.value").asJava
+ val tableInfo = new TableInfo.Builder()
+ .withProperties(overrides)
+ .build()
+ catalog.createTableLike(dstIdent, sourceTable, tableInfo)
+
+ val dst = catalog.loadTable(dstIdent)
+ assert(dst.properties.asScala("user.key") == "user.value",
+ "user-specified properties should be applied to the target")
+ }
+
+ test("createTableLike: source properties are copied to target by connector implementation") {
+ val catalog = newCatalog()
+ val srcIdent = Identifier.of(Array("ns"), "src")
+ val dstIdent = Identifier.of(Array("ns"), "dst")
+
+ val srcProps = Map("format.version" -> "2", "format.feature" -> "deletion-vectors").asJava
+ catalog.createTable(srcIdent, columns, emptyTrans, srcProps)
+ val sourceTable = catalog.loadTable(srcIdent)
+
+ // tableInfo contains no overrides; connector reads schema and properties from sourceTable
+ val tableInfo = new TableInfo.Builder()
+ .withProperties(emptyProps)
+ .build()
+ catalog.createTableLike(dstIdent, sourceTable, tableInfo)
+
+ val dst = catalog.loadTable(dstIdent)
+ assert(dst.properties.asScala("format.version") == "2",
+ "connector should copy source properties from sourceTable")
+ assert(dst.properties.asScala("format.feature") == "deletion-vectors",
+ "connector should copy source properties from sourceTable")
+ }
+
+ test("createTableLike: user-specified properties override source properties") {
+ val catalog = newCatalog()
+ val srcIdent = Identifier.of(Array("ns"), "src")
+ val dstIdent = Identifier.of(Array("ns"), "dst")
+
+ val srcProps = Map("format.version" -> "1", "source.only" -> "yes").asJava
+ catalog.createTable(srcIdent, columns, emptyTrans, srcProps)
+ val sourceTable = catalog.loadTable(srcIdent)
+
+ // user explicitly overrides format.version
+ val overrides = Map("format.version" -> "2").asJava
+ val tableInfo = new TableInfo.Builder()
+ .withProperties(overrides)
+ .build()
+ catalog.createTableLike(dstIdent, sourceTable, tableInfo)
+
+ val dst = catalog.loadTable(dstIdent)
+ assert(dst.properties.asScala("format.version") == "2",
+ "user-specified properties should override source properties")
+ assert(dst.properties.asScala("source.only") == "yes",
+ "non-overridden source properties should still be copied")
+ }
+
+ test("createTableLike: source constraints are copied to target by connector implementation") {
+ val catalog = newCatalog()
+ val srcIdent = Identifier.of(Array("ns"), "src")
+ val dstIdent = Identifier.of(Array("ns"), "dst")
+
+ val srcTableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withPartitions(emptyTrans)
+ .withProperties(emptyProps)
+ .withConstraints(constraints)
+ .build()
+ catalog.createTable(srcIdent, srcTableInfo)
+ val sourceTable = catalog.loadTable(srcIdent)
+
+ val tableInfo = new TableInfo.Builder()
+ .withProperties(emptyProps)
+ .build()
+ catalog.createTableLike(dstIdent, sourceTable, tableInfo)
+
+ val dst = catalog.loadTable(dstIdent)
+ assert(dst.constraints().toSet == constraints.toSet,
+ "connector should copy source constraints from sourceTable.constraints()")
+ }
+
+ test("createTableLike: catalog without createTableLike override throws " +
+ "UnsupportedOperationException") {
+ // BasicInMemoryTableCatalog does not override createTableLike. The default implementation
+ // throws UnsupportedOperationException to signal that connectors must explicitly implement
+ // CREATE TABLE LIKE support.
+ val catalog = new BasicInMemoryTableCatalog
+ catalog.initialize("basic", CaseInsensitiveStringMap.empty())
+
+ val srcIdent = Identifier.of(Array("ns"), "src")
+ val dstIdent = Identifier.of(Array("ns"), "dst")
+
+ catalog.createTable(srcIdent, columns, emptyTrans, Map.empty[String, String].asJava)
+ val sourceTable = catalog.loadTable(srcIdent)
+
+ val tableInfo = new TableInfo.Builder()
+ .withProperties(Map.empty[String, String].asJava)
+ .build()
+ val ex = intercept[UnsupportedOperationException] {
+ catalog.createTableLike(dstIdent, sourceTable, tableInfo)
+ }
+ assert(ex.getMessage.contains("basic"),
+ "Exception should mention the catalog name")
+ assert(ex.getMessage.contains("CREATE TABLE LIKE"),
+ "Exception should mention the unsupported operation")
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index dc580848d09de..81f4717fc4dfc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -24,8 +24,9 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CurrentUserContext, InternalRow}
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.constraints.Constraint
import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, ProcedureParameter, UnboundProcedure}
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
@@ -238,6 +239,28 @@ class BasicInMemoryTableCatalog extends TableCatalog {
class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces
with ProcedureCatalog {
+ override def createTableLike(
+ ident: Identifier,
+ sourceTable: Table,
+ userSpecifiedOverrides: TableInfo): Table = {
+ // Read schema from source. For V1Table sources, apply CharVarcharUtils to preserve
+ // CHAR/VARCHAR types as declared rather than collapsed to StringType.
+ val columns = sourceTable match {
+ case v1: V1Table =>
+ CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(v1.catalogTable.schema))
+ case _ =>
+ sourceTable.columns()
+ }
+ // Merge source properties with user overrides (user overrides win), then set current user
+ // as owner (overrides source owner). Connectors are responsible for setting the owner.
+ val mergedProps =
+ (sourceTable.properties().asScala ++
+ userSpecifiedOverrides.properties().asScala ++
+ Map(TableCatalog.PROP_OWNER -> CurrentUserContext.getCurrentUser)).asJava
+ createTable(ident, columns, sourceTable.partitioning(), mergedProps,
+ Distributions.unspecified(), Array.empty, None, None, sourceTable.constraints())
+ }
+
override def capabilities: java.util.Set[TableCatalogCapability] = {
Set(
TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 7efd2e1113179..8860815f182bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -293,6 +293,17 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c
}
+ // For CREATE TABLE LIKE, use the v1 command if both the target and source are in the session
+ // catalog (or a V1-compatible catalog extension). If source is in a different catalog, fall
+ // through to the V2 execution path (CreateTableLikeExec via DataSourceV2Strategy).
+ case CreateTableLike(
+ ResolvedV1Identifier(targetIdent),
+ ResolvedV1TableOrViewIdentifier(sourceIdent),
+ location, provider, serdeInfo, properties, ifNotExists) =>
+ val fileFormat = buildStorageFormatFromSerdeInfo(location, serdeInfo)
+ CreateTableLikeCommand(
+ targetIdent, sourceIdent, fileFormat, provider, properties, ifNotExists)
+
case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if conf.useV1Command =>
DropTableCommand(ident, ifExists, isView = false, purge = purge)
@@ -626,6 +637,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
CreateTableV1(tableDesc, mode, query)
}
+ private def buildStorageFormatFromSerdeInfo(
+ location: Option[String],
+ maybeSerdeInfo: Option[SerdeInfo]): CatalogStorageFormat = {
+ HiveSerDe.buildStorageFormat(
+ location,
+ maybeSerdeInfo,
+ si => QueryCompilationErrors.invalidFileFormatForStoredAsError(si))
+ }
+
private def getStorageFormatAndProvider(
provider: Option[String],
options: Map[String, String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 6c19f53d2dc42..f19c2063612d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1113,35 +1113,15 @@ class SparkSqlAstBuilder extends AstBuilder {
location: Option[String],
maybeSerdeInfo: Option[SerdeInfo],
ctx: ParserRuleContext): CatalogStorageFormat = {
- if (maybeSerdeInfo.isEmpty) {
- CatalogStorageFormat.empty.copy(locationUri = location.map(CatalogUtils.stringToURI))
- } else {
- val serdeInfo = maybeSerdeInfo.get
- if (serdeInfo.storedAs.isEmpty) {
- CatalogStorageFormat.empty.copy(
- locationUri = location.map(CatalogUtils.stringToURI),
- inputFormat = serdeInfo.formatClasses.map(_.input),
- outputFormat = serdeInfo.formatClasses.map(_.output),
- serde = serdeInfo.serde,
- properties = serdeInfo.serdeProperties)
- } else {
- HiveSerDe.sourceToSerDe(serdeInfo.storedAs.get) match {
- case Some(hiveSerde) =>
- CatalogStorageFormat.empty.copy(
- locationUri = location.map(CatalogUtils.stringToURI),
- inputFormat = hiveSerde.inputFormat,
- outputFormat = hiveSerde.outputFormat,
- serde = serdeInfo.serde.orElse(hiveSerde.serde),
- properties = serdeInfo.serdeProperties)
- case _ =>
- operationNotAllowed(s"STORED AS with file format '${serdeInfo.storedAs.get}'", ctx)
- }
- }
- }
+ HiveSerDe.buildStorageFormat(
+ location,
+ maybeSerdeInfo,
+ si => QueryParsingErrors.operationNotAllowedError(
+ s"STORED AS with file format '${si.storedAs.get}'", ctx))
}
/**
- * Create a [[CreateTableLikeCommand]] command.
+ * Create a [[CreateTableLike]] logical plan.
*
* For example:
* {{{
@@ -1158,8 +1138,6 @@ class SparkSqlAstBuilder extends AstBuilder {
* }}}
*/
override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
- val targetTable = visitTableIdentifier(ctx.target)
- val sourceTable = visitTableIdentifier(ctx.source)
checkDuplicateClauses(ctx.tableProvider, "PROVIDER", ctx)
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
@@ -1185,11 +1163,16 @@ class SparkSqlAstBuilder extends AstBuilder {
case _ =>
}
- val storage = toStorageFormat(location, serdeInfo, ctx)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val cleanedProperties = cleanTableProperties(ctx, properties)
- CreateTableLikeCommand(
- targetTable, sourceTable, storage, provider, cleanedProperties, ctx.EXISTS != null)
+ CreateTableLike(
+ name = withIdentClause(ctx.target, UnresolvedIdentifier(_)),
+ source = createUnresolvedTableOrView(ctx.source, "CREATE TABLE LIKE", allowTempView = true),
+ location = location,
+ provider = provider,
+ serdeInfo = serdeInfo,
+ properties = cleanedProperties,
+ ifNotExists = ctx.EXISTS != null)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala
new file mode 100644
index 0000000000000..c4930b45375b4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.net.URI
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.internal.LogKeys.TABLE_NAME
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableInfo}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * Physical plan node for CREATE TABLE ... LIKE ... targeting a v2 catalog.
+ *
+ * Calls [[TableCatalog.createTableLike]] so that connectors can implement format-specific copy
+ * semantics (e.g. Delta protocol inheritance, Iceberg sort order and format version). Connectors
+ * must override [[TableCatalog.createTableLike]]; the default implementation throws
+ * [[UnsupportedOperationException]].
+ *
+ * The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains strictly user-specified
+ * overrides: TBLPROPERTIES, LOCATION, and USING provider (only if explicitly given).
+ * Schema, partitioning, source provider, source TBLPROPERTIES, constraints, and owner are NOT
+ * pre-populated; connectors read all source metadata directly from [[sourceTable]] and are
+ * responsible for setting the owner.
+ */
+case class CreateTableLikeExec(
+ targetCatalog: TableCatalog,
+ targetIdent: Identifier,
+ sourceTable: Table,
+ location: Option[URI],
+ provider: Option[String],
+ properties: Map[String, String],
+ ifNotExists: Boolean) extends LeafV2CommandExec {
+
+ override def output: Seq[Attribute] = Seq.empty
+
+ override protected def run(): Seq[InternalRow] = {
+ if (!targetCatalog.tableExists(targetIdent)) {
+ // Build strictly user-specified overrides: explicit TBLPROPERTIES, LOCATION (if given),
+ // and USING provider (if given). Provider and owner are not included here; connectors
+ // are responsible for reading PROP_PROVIDER from sourceTable.properties() and for
+ // setting the owner via CurrentUserContext.getCurrentUser.
+ val locationProp: Option[(String, String)] =
+ location.map(uri => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri))
+
+ val finalProps =
+ properties ++
+ provider.map(TableCatalog.PROP_PROVIDER -> _) ++
+ locationProp
+
+ try {
+ val userSpecifiedOverrides = new TableInfo.Builder()
+ .withProperties(finalProps.asJava)
+ .build()
+ targetCatalog.createTableLike(targetIdent, sourceTable, userSpecifiedOverrides)
+ } catch {
+ case _: TableAlreadyExistsException if ifNotExists =>
+ logWarning(
+ log"Table ${MDC(TABLE_NAME, targetIdent.quoted)} was created concurrently. Ignoring.")
+ }
+ } else if (!ifNotExists) {
+ throw QueryCompilationErrors.tableAlreadyExistsError(targetIdent)
+ }
+
+ Seq.empty
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 91e753096a238..7932a0aa53bac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.EXPR
-import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedPersistentView, ResolvedTable, ResolvedTempView}
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression}
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, IdentityColumn, ResolveDefaultColumns, ResolveTableConstraints, V2ExpressionBuilder}
import org.apache.spark.sql.classic.SparkSession
-import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable}
+import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable, V1Table}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
@@ -240,6 +240,26 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
qualifyLocInTableSpec(tableSpec), options, ifNotExists) :: Nil
}
+ // CREATE TABLE ... LIKE ... for a v2 catalog target.
+ // Source is an already-resolved Table object; no extra catalog round-trip is needed.
+ // Views are wrapped in V1Table so the exec can extract schema and provider uniformly.
+ case CreateTableLike(
+ ResolvedIdentifier(catalog, ident), source,
+ locationStr, provider, _, properties, ifNotExists) =>
+ val table = source match {
+ case ResolvedTable(_, _, t, _) => t
+ case ResolvedPersistentView(_, _, meta) => V1Table(meta)
+ case ResolvedTempView(_, meta) => V1Table(meta)
+ }
+ val location = locationStr.map { loc =>
+ val uri = CatalogUtils.stringToURI(loc)
+ if (uri.isAbsolute) uri
+ else if (new Path(uri).isAbsolute) CatalogUtils.makeQualifiedPath(uri, hadoopConf)
+ else uri
+ }
+ CreateTableLikeExec(catalog.asTableCatalog, ident, table,
+ location, provider, properties, ifNotExists) :: Nil
+
case RefreshTable(r: ResolvedTable) =>
RefreshTableExec(r.catalog, r.identifier, recacheTable(r, includeTimeTravel = true)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
index cfcfeabbf1f6e..32ada4beb313a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.internal
import java.util.Locale
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogUtils}
+import org.apache.spark.sql.catalyst.plans.logical.SerdeInfo
case class HiveSerDe(
inputFormat: Option[String] = None,
@@ -104,6 +105,44 @@ object HiveSerDe {
*/
def serdeToSource(serde: HiveSerDe): Option[String] = serdeInverseMap.get(serde)
+ /**
+ * Builds a [[CatalogStorageFormat]] from a user-specified location and optional serde info.
+ * Uses [[CatalogStorageFormat.empty]] as the base (no Hive defaults).
+ *
+ * @param location optional LOCATION URI string
+ * @param maybeSerdeInfo optional serde/format info from ROW FORMAT / STORED AS clauses
+ * @param invalidStoredAsError callback returning a [[Throwable]] when STORED AS names an
+ * unrecognized file format; the caller controls the error message
+ */
+ def buildStorageFormat(
+ location: Option[String],
+ maybeSerdeInfo: Option[SerdeInfo],
+ invalidStoredAsError: SerdeInfo => Throwable): CatalogStorageFormat = {
+ val locationUri = location.map(CatalogUtils.stringToURI)
+ maybeSerdeInfo match {
+ case None =>
+ CatalogStorageFormat.empty.copy(locationUri = locationUri)
+ case Some(si) if si.storedAs.isDefined =>
+ sourceToSerDe(si.storedAs.get) match {
+ case Some(hiveSerde) =>
+ CatalogStorageFormat.empty.copy(
+ locationUri = locationUri,
+ inputFormat = hiveSerde.inputFormat,
+ outputFormat = hiveSerde.outputFormat,
+ serde = si.serde.orElse(hiveSerde.serde),
+ properties = si.serdeProperties)
+ case _ => throw invalidStoredAsError(si)
+ }
+ case Some(si) =>
+ CatalogStorageFormat.empty.copy(
+ locationUri = locationUri,
+ inputFormat = si.formatClasses.map(_.input),
+ outputFormat = si.formatClasses.map(_.output),
+ serde = si.serde,
+ properties = si.serdeProperties)
+ }
+ }
+
def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = {
// To respect hive-site.xml, it peeks Hadoop configuration from existing Spark session,
// as an easy workaround. See SPARK-27555.
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out
index 4e864523368d7..7bd87a4267c77 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out
@@ -55,7 +55,7 @@ DescribeColumnCommand `spark_catalog`.`default`.`char_tbl2`, [spark_catalog, def
-- !query
create table char_tbl3 like char_tbl
-- !query analysis
-CreateTableLikeCommand `char_tbl3`, `char_tbl`, Storage(), false
+CreateTableLikeCommand `spark_catalog`.`default`.`char_tbl3`, `spark_catalog`.`default`.`char_tbl`, Storage(), false
-- !query
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala
new file mode 100644
index 0000000000000..a67831cf63503
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryCatalog}
+import org.apache.spark.sql.types.{CharType, IntegerType, LongType, StringType, VarcharType}
+
+class CreateTableLikeSuite extends DatasourceV2SQLBase {
+
+ private def testCatalog: InMemoryCatalog =
+ catalog("testcat").asInstanceOf[InMemoryCatalog]
+
+ private def testCatalog2: InMemoryCatalog =
+ catalog("testcat2").asInstanceOf[InMemoryCatalog]
+
+ // -------------------------------------------------------------------------
+ // Basic V2 path
+ // -------------------------------------------------------------------------
+
+ test("v2 target, v1 source: schema and partitioning are copied") {
+ withTable("src", "testcat.dst") {
+ sql("CREATE TABLE src (id bigint, data string) USING parquet PARTITIONED BY (data)")
+ sql("CREATE TABLE testcat.dst LIKE src")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ assert(dst.schema() === CatalogV2Util.v2ColumnsToStructType(dst.columns()))
+ val columnNames = dst.columns().map(_.name)
+ assert(columnNames === Array("id", "data"))
+ // partition column encoded in partitioning (identity transform on the partition column)
+ assert(dst.partitioning.nonEmpty)
+ }
+ }
+
+ test("v2 target, v2 source: pure v2 path") {
+ withTable("testcat.src", "testcat.dst") {
+ sql("CREATE TABLE testcat.src (id bigint, data string) USING foo")
+ sql("CREATE TABLE testcat.dst LIKE testcat.src")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ val columnNames = dst.columns().map(_.name)
+ assert(columnNames === Array("id", "data"))
+ }
+ }
+
+ test("cross-catalog: source in testcat, target in testcat2") {
+ withTable("testcat.src", "testcat2.dst") {
+ sql("CREATE TABLE testcat.src (id bigint, name string) USING foo")
+ sql("CREATE TABLE testcat2.dst LIKE testcat.src")
+
+ val dst = testCatalog2.loadTable(Identifier.of(Array(), "dst"))
+ val columnNames = dst.columns().map(_.name)
+ assert(columnNames === Array("id", "name"))
+ }
+ }
+
+ test("3-part name: catalog.namespace.table for both target and source") {
+ withTable("testcat.ns.src", "testcat2.ns.dst") {
+ sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns")
+ sql("CREATE NAMESPACE IF NOT EXISTS testcat2.ns")
+ sql("CREATE TABLE testcat.ns.src (id bigint, data string) USING foo")
+ sql("CREATE TABLE testcat2.ns.dst LIKE testcat.ns.src")
+
+ val dst = testCatalog2.loadTable(Identifier.of(Array("ns"), "dst"))
+ val columnNames = dst.columns().map(_.name)
+ assert(columnNames === Array("id", "data"))
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // IF NOT EXISTS
+ // -------------------------------------------------------------------------
+
+ test("IF NOT EXISTS: second call is silent when table already exists") {
+ withTable("src", "testcat.dst") {
+ sql("CREATE TABLE src (id bigint) USING parquet")
+ sql("CREATE TABLE testcat.dst LIKE src")
+ // Should not throw
+ sql("CREATE TABLE IF NOT EXISTS testcat.dst LIKE src")
+ }
+ }
+
+ test("without IF NOT EXISTS, duplicate create throws TableAlreadyExistsException") {
+ withTable("src", "testcat.dst") {
+ sql("CREATE TABLE src (id bigint) USING parquet")
+ sql("CREATE TABLE testcat.dst LIKE src")
+ intercept[TableAlreadyExistsException] {
+ sql("CREATE TABLE testcat.dst LIKE src")
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Views as source
+ // -------------------------------------------------------------------------
+
+ test("persistent view as source, v2 target") {
+ withTable("src", "testcat.dst") {
+ withView("v") {
+ sql("CREATE TABLE src (id bigint, data string) USING parquet")
+ sql("INSERT INTO src VALUES (1, 'a')")
+ sql("CREATE VIEW v AS SELECT * FROM src")
+ sql("CREATE TABLE testcat.dst LIKE v")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ val columnNames = dst.columns().map(_.name)
+ assert(columnNames === Array("id", "data"))
+ }
+ }
+ }
+
+ test("temp view as source, v2 target") {
+ withTable("src", "testcat.dst") {
+ withTempView("tv") {
+ sql("CREATE TABLE src (id bigint, data string) USING parquet")
+ sql("CREATE TEMP VIEW tv AS SELECT id, data FROM src")
+ sql("CREATE TABLE testcat.dst LIKE tv")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ val columnNames = dst.columns().map(_.name)
+ assert(columnNames === Array("id", "data"))
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Property / provider behavior
+ // -------------------------------------------------------------------------
+
+ test("source TBLPROPERTIES are copied to target when connector implements createTableLike") {
+ // InMemoryTableCatalog overrides createTableLike to merge source properties into the target,
+ // demonstrating connector-specific copy semantics. Connectors that do not override
+ // createTableLike will throw UnsupportedOperationException.
+ withTable("src", "testcat.dst") {
+ sql("CREATE TABLE src (id bigint) USING parquet TBLPROPERTIES ('source_key' = 'source')")
+ sql("CREATE TABLE testcat.dst LIKE src")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ assert(dst.properties.containsKey("source_key"),
+ "Connector-implemented createTableLike copies source TBLPROPERTIES to target")
+ }
+ }
+
+ test("user-specified TBLPROPERTIES override source TBLPROPERTIES in createTableLike") {
+ withTable("src", "testcat.dst") {
+ sql("CREATE TABLE src (id bigint) USING parquet TBLPROPERTIES ('key' = 'source_value')")
+ sql("CREATE TABLE testcat.dst LIKE src TBLPROPERTIES ('key' = 'user_value')")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ assert(dst.properties.get("key") == "user_value",
+ "User-specified TBLPROPERTIES should override source TBLPROPERTIES")
+ }
+ }
+
+ test("user-specified TBLPROPERTIES are applied on target") {
+ withTable("src", "testcat.dst") {
+ sql("CREATE TABLE src (id bigint) USING parquet")
+ sql("CREATE TABLE testcat.dst LIKE src TBLPROPERTIES ('custom' = 'value')")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ assert(dst.properties.get("custom") === "value")
+ }
+ }
+
+ test("USING clause overrides source provider") {
+ withTable("src", "testcat.dst") {
+ sql("CREATE TABLE src (id bigint) USING parquet")
+ sql("CREATE TABLE testcat.dst LIKE src USING foo")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ assert(dst.properties.get("provider") === "foo")
+ }
+ }
+
+ test("v2 catalog target: non-existent provider is stored as property without validation") {
+ // Pure V2 catalogs (e.g. InMemoryCatalog) do not validate the provider — they store it
+ // as a plain property and let the catalog implementation decide what to do with it.
+ // This is consistent with how CreateTableExec works for CREATE TABLE targeting V2 catalogs.
+ withTable("testcat.src", "testcat.dst") {
+ sql("CREATE TABLE testcat.src (id bigint) USING foo")
+ sql("CREATE TABLE testcat.dst LIKE testcat.src")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ assert(dst.properties.get("provider") === "foo",
+ "Provider should be copied from source as-is without validation")
+ }
+ }
+
+ test("source provider is copied to v2 target when no USING override") {
+ // When no USING clause is given, provider inheritance is handled by the connector:
+ // InMemoryTableCatalog.createTableLike merges sourceTable.properties() into the target,
+ // which includes PROP_PROVIDER set by the source table.
+ withTable("src", "testcat.dst") {
+ sql("CREATE TABLE src (id bigint) USING parquet")
+ sql("CREATE TABLE testcat.dst LIKE src")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ assert(dst.properties.get("provider") === "parquet",
+ "Source provider should be copied to the V2 target when no USING clause is specified")
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Column type fidelity
+ // -------------------------------------------------------------------------
+
+ test("CHAR and VARCHAR types are preserved from v1 source to v2 target") {
+ // InMemoryTableCatalog.createTableLike applies CharVarcharUtils.getRawSchema when
+ // the source is a V1Table, preserving CHAR/VARCHAR types as declared.
+ // This illustrates the pattern connectors should follow to preserve declared types.
+ withTable("src", "testcat.dst") {
+ sql("CREATE TABLE src (id bigint, name CHAR(10), tag VARCHAR(20)) USING parquet")
+ sql("CREATE TABLE testcat.dst LIKE src")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ val schema = CatalogV2Util.v2ColumnsToStructType(dst.columns())
+ assert(schema("name").dataType === CharType(10))
+ assert(schema("tag").dataType === VarcharType(20))
+ }
+ }
+
+ test("multiple column types are preserved") {
+ withTable("src", "testcat.dst") {
+ sql(
+ """CREATE TABLE src (
+ | id bigint,
+ | name string,
+ | score int
+ |) USING parquet""".stripMargin)
+ sql("CREATE TABLE testcat.dst LIKE src")
+
+ val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
+ val schema = CatalogV2Util.v2ColumnsToStructType(dst.columns())
+ assert(schema("id").dataType === LongType)
+ assert(schema("name").dataType === StringType)
+ assert(schema("score").dataType === IntegerType)
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // V2 source, session catalog (V1) target: unsupported
+ // -------------------------------------------------------------------------
+
+ test("v2 source, session catalog target: throws because session catalog does not " +
+ "implement createTableLike") {
+ // CREATE TABLE LIKE targeting the session catalog with a V2 source goes through
+ // CreateTableLikeExec, which calls createTableLike on the session catalog.
+ // The session catalog (InMemoryTableSessionCatalog in tests) does not override
+ // createTableLike, so the default UnsupportedOperationException is thrown.
+ withTable("testcat.src") {
+ sql("CREATE TABLE testcat.src (id bigint, data string) USING parquet")
+ val ex = intercept[UnsupportedOperationException] {
+ sql("CREATE TABLE dst LIKE testcat.src")
+ }
+ assert(ex.getMessage.contains("CREATE TABLE LIKE"))
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // V1 fallback regression
+ // -------------------------------------------------------------------------
+
+ test("v1 fallback: CREATE TABLE default.dst LIKE default.src still uses CreateTableLikeCommand") {
+ withTable("src", "dst") {
+ sql("CREATE TABLE src (id bigint, data string) USING parquet")
+ sql("CREATE TABLE dst LIKE src")
+ // Verify via session catalog that dst was created with the correct schema
+ val meta = spark.sessionState.catalog.getTableMetadata(
+ spark.sessionState.sqlParser.parseTableIdentifier("dst"))
+ assert(meta.schema.fieldNames === Array("id", "data"))
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index 9db7ae2e2824c..2508a5cee3e02 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, SchemaCompensation, UnresolvedAttribute, UnresolvedIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, SchemaCompensation, UnresolvedAttribute, UnresolvedIdentifier, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans
@@ -721,83 +721,76 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
}
test("create table like") {
- val v1 = "CREATE TABLE table1 LIKE table2"
- val (target, source, fileFormat, provider, properties, exists) =
- parser.parsePlan(v1).collect {
- case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
- }.head
+ // Helper to extract fields from the new CreateTableLike unresolved plan.
+ // The parser now emits CreateTableLike (v2 logical plan) instead of
+ // CreateTableLikeCommand; the name is an UnresolvedIdentifier and the source is
+ // an UnresolvedTableOrView.
+ def extract(sql: String): (Seq[String], Seq[String],
+ Option[String], Option[String], Map[String, String], Boolean) =
+ parser.parsePlan(sql).collect {
+ case CreateTableLike(
+ UnresolvedIdentifier(targetParts, _),
+ UnresolvedTableOrView(sourceParts, _, _),
+ loc, p, _, pr, e) =>
+ (targetParts, sourceParts, loc, p, pr, e)
+ }.head
+
+ val (target, source, location, provider, properties, exists) =
+ extract("CREATE TABLE table1 LIKE table2")
assert(exists == false)
- assert(target.database.isEmpty)
- assert(target.table == "table1")
- assert(source.database.isEmpty)
- assert(source.table == "table2")
- assert(fileFormat.locationUri.isEmpty)
+ assert(target == Seq("table1"))
+ assert(source == Seq("table2"))
+ assert(location.isEmpty)
assert(provider.isEmpty)
- val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2"
- val (target2, source2, fileFormat2, provider2, properties2, exists2) =
- parser.parsePlan(v2).collect {
- case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
- }.head
+ val (target2, source2, location2, provider2, properties2, exists2) =
+ extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2")
assert(exists2)
- assert(target2.database.isEmpty)
- assert(target2.table == "table1")
- assert(source2.database.isEmpty)
- assert(source2.table == "table2")
- assert(fileFormat2.locationUri.isEmpty)
+ assert(target2 == Seq("table1"))
+ assert(source2 == Seq("table2"))
+ assert(location2.isEmpty)
assert(provider2.isEmpty)
- val v3 = "CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'"
- val (target3, source3, fileFormat3, provider3, properties3, exists3) =
- parser.parsePlan(v3).collect {
- case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
- }.head
+ val (target3, source3, location3, provider3, properties3, exists3) =
+ extract("CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'")
assert(!exists3)
- assert(target3.database.isEmpty)
- assert(target3.table == "table1")
- assert(source3.database.isEmpty)
- assert(source3.table == "table2")
- assert(fileFormat3.locationUri.map(_.toString) == Some("/spark/warehouse"))
+ assert(target3 == Seq("table1"))
+ assert(source3 == Seq("table2"))
+ assert(location3 == Some("/spark/warehouse"))
assert(provider3.isEmpty)
- val v4 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'"
- val (target4, source4, fileFormat4, provider4, properties4, exists4) =
- parser.parsePlan(v4).collect {
- case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
- }.head
+ val (target4, source4, location4, provider4, properties4, exists4) =
+ extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'")
assert(exists4)
- assert(target4.database.isEmpty)
- assert(target4.table == "table1")
- assert(source4.database.isEmpty)
- assert(source4.table == "table2")
- assert(fileFormat4.locationUri.map(_.toString) == Some("/spark/warehouse"))
+ assert(target4 == Seq("table1"))
+ assert(source4 == Seq("table2"))
+ assert(location4 == Some("/spark/warehouse"))
assert(provider4.isEmpty)
- val v5 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING parquet"
- val (target5, source5, fileFormat5, provider5, properties5, exists5) =
- parser.parsePlan(v5).collect {
- case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
- }.head
+ val (target5, source5, location5, provider5, properties5, exists5) =
+ extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING parquet")
assert(exists5)
- assert(target5.database.isEmpty)
- assert(target5.table == "table1")
- assert(source5.database.isEmpty)
- assert(source5.table == "table2")
- assert(fileFormat5.locationUri.isEmpty)
+ assert(target5 == Seq("table1"))
+ assert(source5 == Seq("table2"))
+ assert(location5.isEmpty)
assert(provider5 == Some("parquet"))
- val v6 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING ORC"
- val (target6, source6, fileFormat6, provider6, properties6, exists6) =
- parser.parsePlan(v6).collect {
- case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
- }.head
+ val (target6, source6, location6, provider6, properties6, exists6) =
+ extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING ORC")
assert(exists6)
- assert(target6.database.isEmpty)
- assert(target6.table == "table1")
- assert(source6.database.isEmpty)
- assert(source6.table == "table2")
- assert(fileFormat6.locationUri.isEmpty)
+ assert(target6 == Seq("table1"))
+ assert(source6 == Seq("table2"))
+ assert(location6.isEmpty)
assert(provider6 == Some("ORC"))
+
+ // 3-part names: catalog.namespace.table
+ val (target7, source7, location7, provider7, properties7, exists7) =
+ extract("CREATE TABLE testcat.ns.dst LIKE testcat.ns.src")
+ assert(!exists7)
+ assert(target7 == Seq("testcat", "ns", "dst"))
+ assert(source7 == Seq("testcat", "ns", "src"))
+ assert(location7.isEmpty)
+ assert(provider7.isEmpty)
}
test("SET CATALOG") {