diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index a1de1234ef317..918e8eac79bfa 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -248,8 +248,8 @@ statement | createTableHeader (LEFT_PAREN tableElementList RIGHT_PAREN)? tableProvider? createTableClauses (AS? query)? #createTable - | CREATE TABLE (IF errorCapturingNot EXISTS)? target=tableIdentifier - LIKE source=tableIdentifier + | CREATE TABLE (IF errorCapturingNot EXISTS)? target=identifierReference + LIKE source=identifierReference (tableProvider | rowFormat | createFileFormat | diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 33bf615680063..e96777f43df76 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -295,6 +295,41 @@ default Table createTable(Identifier ident, TableInfo tableInfo) return createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties()); } + /** + * Create a table in the catalog by copying metadata from an existing source table. + *

+ * This method is called for {@code CREATE TABLE ... LIKE ...} statements targeting this catalog. + * The {@code userSpecifiedOverrides} parameter contains strictly user-specified overrides: + * TBLPROPERTIES, LOCATION, and the USING provider (only if explicitly specified). + * It does NOT contain schema, partitioning, properties, constraints, or owner from the source + * table. Connectors must read all source metadata directly from {@code sourceTable}, including + * columns ({@link Table#columns()}), partitioning ({@link Table#partitioning()}), + * constraints ({@link Table#constraints()}), and format-specific properties + * ({@link Table#properties()}). Connectors are also responsible for setting the owner of the + * new table (e.g. via {@code org.apache.spark.sql.catalyst.CurrentUserContext#getCurrentUser}). + *

+ * The default implementation throws {@link UnsupportedOperationException}. Connectors that + * support {@code CREATE TABLE ... LIKE ...} must override this method. + * + * @param ident a table identifier for the new table + * @param sourceTable the resolved source table; connectors read schema, partitioning, + * constraints, properties, and any format-specific metadata from this object + * @param userSpecifiedOverrides strictly user-specified overrides: TBLPROPERTIES, LOCATION, + * and USING provider (if explicitly given); source schema, + * partitioning, provider, constraints, and owner are NOT included + * @return metadata for the new table + * + * @throws TableAlreadyExistsException If a table or view already exists for the identifier + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + * @throws UnsupportedOperationException If the catalog does not support CREATE TABLE LIKE + * @since 4.2.0 + */ + default Table createTableLike( + Identifier ident, Table sourceTable, TableInfo userSpecifiedOverrides) + throws TableAlreadyExistsException, NoSuchNamespaceException { + throw new UnsupportedOperationException(name() + " does not support CREATE TABLE LIKE"); + } + /** * If true, mark all the fields of the query schema as nullable when executing * CREATE/REPLACE TABLE ... AS SELECT ... and creating the table. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java index a5b4e333afa87..9870a3b0fa45d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java @@ -61,7 +61,7 @@ public Transform[] partitions() { public Constraint[] constraints() { return constraints; } public static class Builder { - private Column[] columns; + private Column[] columns = new Column[0]; private Map properties = new HashMap<>(); private Transform[] partitions = new Transform[0]; private Constraint[] constraints = new Constraint[0]; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 316dc8faff3ff..f34c6be9954e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -121,6 +121,10 @@ class ResolveCatalogs(val catalogManager: CatalogManager) val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, columns) r.copy(name = resolvedIdentifier) + case c @ CreateTableLike(UnresolvedIdentifier(nameParts, allowTemp), _, _, _, _, _, _) => + val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, Nil) + c.copy(name = resolvedIdentifier) + case UnresolvedIdentifier(nameParts, allowTemp) => resolveIdentifier(nameParts, allowTemp, Nil) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index c0fe58596c9fd..905797eb021c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -615,6 +615,41 @@ case class CreateTable( } } +/** + * Create a new table with the same schema/partitioning as an existing table or view, + * for use with a v2 catalog. + * + * @param name Target table identifier. Starts as UnresolvedIdentifier, resolved to + * ResolvedIdentifier by ResolveCatalogs. + * @param source Source table or view. Starts as UnresolvedTableOrView, resolved to + * ResolvedTable / ResolvedPersistentView / ResolvedTempView by ResolveRelations. + * @param location User-specified LOCATION. None if not specified. + * @param provider User-specified USING provider. None if not specified. + * @param serdeInfo User-specified STORED AS / ROW FORMAT (Hive-style). None if not specified. + * Kept separate from [[location]] so that [[ResolveSessionCatalog]] can + * reconstruct the full + * [[org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat]] + * for the V1 fallback path without pulling V1 catalog types into this plan. + * @param properties User-specified TBLPROPERTIES. + * @param ifNotExists IF NOT EXISTS flag. + */ +case class CreateTableLike( + name: LogicalPlan, + source: LogicalPlan, + location: Option[String], + provider: Option[String], + serdeInfo: Option[SerdeInfo], + properties: Map[String, String], + ifNotExists: Boolean) extends BinaryCommand { + + override def left: LogicalPlan = name + override def right: LogicalPlan = source + + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, newRight: LogicalPlan): CreateTableLike = + copy(name = newLeft, source = newRight) +} + /** * Create a new table from a select query with a v2 catalog. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index b4a4c6f46cda4..4fd9f6f6da913 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -1377,4 +1377,122 @@ class CatalogSuite extends SparkFunSuite { catalog.alterTable(testIdent, TableChange.addConstraint(constraints.apply(0), "3")) assert(catalog.loadTable(testIdent).version() == "4") } + + // ---- createTableLike tests ---- + + test("createTableLike: user-specified properties in tableInfo are applied to target") { + val catalog = newCatalog() + val srcIdent = Identifier.of(Array("ns"), "src") + val dstIdent = Identifier.of(Array("ns"), "dst") + + val srcProps = Map("source.key" -> "source.value").asJava + catalog.createTable(srcIdent, columns, emptyTrans, srcProps) + val sourceTable = catalog.loadTable(srcIdent) + + // tableInfo contains only user overrides; schema and partitioning come from sourceTable + val overrides = Map("user.key" -> "user.value").asJava + val tableInfo = new TableInfo.Builder() + .withProperties(overrides) + .build() + catalog.createTableLike(dstIdent, sourceTable, tableInfo) + + val dst = catalog.loadTable(dstIdent) + assert(dst.properties.asScala("user.key") == "user.value", + "user-specified properties should be applied to the target") + } + + test("createTableLike: source properties are copied to target by connector implementation") { + val catalog = newCatalog() + val srcIdent = Identifier.of(Array("ns"), "src") + val dstIdent = Identifier.of(Array("ns"), "dst") + + val srcProps = Map("format.version" -> "2", "format.feature" -> "deletion-vectors").asJava + catalog.createTable(srcIdent, columns, emptyTrans, srcProps) + val sourceTable = catalog.loadTable(srcIdent) + + // tableInfo contains no overrides; connector reads schema and properties from sourceTable + val tableInfo = new TableInfo.Builder() + .withProperties(emptyProps) + .build() + catalog.createTableLike(dstIdent, sourceTable, tableInfo) + + val dst = catalog.loadTable(dstIdent) + assert(dst.properties.asScala("format.version") == "2", + "connector should copy source properties from sourceTable") + assert(dst.properties.asScala("format.feature") == "deletion-vectors", + "connector should copy source properties from sourceTable") + } + + test("createTableLike: user-specified properties override source properties") { + val catalog = newCatalog() + val srcIdent = Identifier.of(Array("ns"), "src") + val dstIdent = Identifier.of(Array("ns"), "dst") + + val srcProps = Map("format.version" -> "1", "source.only" -> "yes").asJava + catalog.createTable(srcIdent, columns, emptyTrans, srcProps) + val sourceTable = catalog.loadTable(srcIdent) + + // user explicitly overrides format.version + val overrides = Map("format.version" -> "2").asJava + val tableInfo = new TableInfo.Builder() + .withProperties(overrides) + .build() + catalog.createTableLike(dstIdent, sourceTable, tableInfo) + + val dst = catalog.loadTable(dstIdent) + assert(dst.properties.asScala("format.version") == "2", + "user-specified properties should override source properties") + assert(dst.properties.asScala("source.only") == "yes", + "non-overridden source properties should still be copied") + } + + test("createTableLike: source constraints are copied to target by connector implementation") { + val catalog = newCatalog() + val srcIdent = Identifier.of(Array("ns"), "src") + val dstIdent = Identifier.of(Array("ns"), "dst") + + val srcTableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(emptyTrans) + .withProperties(emptyProps) + .withConstraints(constraints) + .build() + catalog.createTable(srcIdent, srcTableInfo) + val sourceTable = catalog.loadTable(srcIdent) + + val tableInfo = new TableInfo.Builder() + .withProperties(emptyProps) + .build() + catalog.createTableLike(dstIdent, sourceTable, tableInfo) + + val dst = catalog.loadTable(dstIdent) + assert(dst.constraints().toSet == constraints.toSet, + "connector should copy source constraints from sourceTable.constraints()") + } + + test("createTableLike: catalog without createTableLike override throws " + + "UnsupportedOperationException") { + // BasicInMemoryTableCatalog does not override createTableLike. The default implementation + // throws UnsupportedOperationException to signal that connectors must explicitly implement + // CREATE TABLE LIKE support. + val catalog = new BasicInMemoryTableCatalog + catalog.initialize("basic", CaseInsensitiveStringMap.empty()) + + val srcIdent = Identifier.of(Array("ns"), "src") + val dstIdent = Identifier.of(Array("ns"), "dst") + + catalog.createTable(srcIdent, columns, emptyTrans, Map.empty[String, String].asJava) + val sourceTable = catalog.loadTable(srcIdent) + + val tableInfo = new TableInfo.Builder() + .withProperties(Map.empty[String, String].asJava) + .build() + val ex = intercept[UnsupportedOperationException] { + catalog.createTableLike(dstIdent, sourceTable, tableInfo) + } + assert(ex.getMessage.contains("basic"), + "Exception should mention the catalog name") + assert(ex.getMessage.contains("CREATE TABLE LIKE"), + "Exception should mention the unsupported operation") + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index dc580848d09de..81f4717fc4dfc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -24,8 +24,9 @@ import java.util.concurrent.atomic.AtomicInteger import scala.jdk.CollectionConverters._ -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{CurrentUserContext, InternalRow} import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, ProcedureParameter, UnboundProcedure} import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} @@ -238,6 +239,28 @@ class BasicInMemoryTableCatalog extends TableCatalog { class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces with ProcedureCatalog { + override def createTableLike( + ident: Identifier, + sourceTable: Table, + userSpecifiedOverrides: TableInfo): Table = { + // Read schema from source. For V1Table sources, apply CharVarcharUtils to preserve + // CHAR/VARCHAR types as declared rather than collapsed to StringType. + val columns = sourceTable match { + case v1: V1Table => + CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(v1.catalogTable.schema)) + case _ => + sourceTable.columns() + } + // Merge source properties with user overrides (user overrides win), then set current user + // as owner (overrides source owner). Connectors are responsible for setting the owner. + val mergedProps = + (sourceTable.properties().asScala ++ + userSpecifiedOverrides.properties().asScala ++ + Map(TableCatalog.PROP_OWNER -> CurrentUserContext.getCurrentUser)).asJava + createTable(ident, columns, sourceTable.partitioning(), mergedProps, + Distributions.unspecified(), Array.empty, None, None, sourceTable.constraints()) + } + override def capabilities: java.util.Set[TableCatalogCapability] = { Set( TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 7efd2e1113179..8860815f182bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -293,6 +293,17 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c } + // For CREATE TABLE LIKE, use the v1 command if both the target and source are in the session + // catalog (or a V1-compatible catalog extension). If source is in a different catalog, fall + // through to the V2 execution path (CreateTableLikeExec via DataSourceV2Strategy). + case CreateTableLike( + ResolvedV1Identifier(targetIdent), + ResolvedV1TableOrViewIdentifier(sourceIdent), + location, provider, serdeInfo, properties, ifNotExists) => + val fileFormat = buildStorageFormatFromSerdeInfo(location, serdeInfo) + CreateTableLikeCommand( + targetIdent, sourceIdent, fileFormat, provider, properties, ifNotExists) + case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if conf.useV1Command => DropTableCommand(ident, ifExists, isView = false, purge = purge) @@ -626,6 +637,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) CreateTableV1(tableDesc, mode, query) } + private def buildStorageFormatFromSerdeInfo( + location: Option[String], + maybeSerdeInfo: Option[SerdeInfo]): CatalogStorageFormat = { + HiveSerDe.buildStorageFormat( + location, + maybeSerdeInfo, + si => QueryCompilationErrors.invalidFileFormatForStoredAsError(si)) + } + private def getStorageFormatAndProvider( provider: Option[String], options: Map[String, String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 6c19f53d2dc42..f19c2063612d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1113,35 +1113,15 @@ class SparkSqlAstBuilder extends AstBuilder { location: Option[String], maybeSerdeInfo: Option[SerdeInfo], ctx: ParserRuleContext): CatalogStorageFormat = { - if (maybeSerdeInfo.isEmpty) { - CatalogStorageFormat.empty.copy(locationUri = location.map(CatalogUtils.stringToURI)) - } else { - val serdeInfo = maybeSerdeInfo.get - if (serdeInfo.storedAs.isEmpty) { - CatalogStorageFormat.empty.copy( - locationUri = location.map(CatalogUtils.stringToURI), - inputFormat = serdeInfo.formatClasses.map(_.input), - outputFormat = serdeInfo.formatClasses.map(_.output), - serde = serdeInfo.serde, - properties = serdeInfo.serdeProperties) - } else { - HiveSerDe.sourceToSerDe(serdeInfo.storedAs.get) match { - case Some(hiveSerde) => - CatalogStorageFormat.empty.copy( - locationUri = location.map(CatalogUtils.stringToURI), - inputFormat = hiveSerde.inputFormat, - outputFormat = hiveSerde.outputFormat, - serde = serdeInfo.serde.orElse(hiveSerde.serde), - properties = serdeInfo.serdeProperties) - case _ => - operationNotAllowed(s"STORED AS with file format '${serdeInfo.storedAs.get}'", ctx) - } - } - } + HiveSerDe.buildStorageFormat( + location, + maybeSerdeInfo, + si => QueryParsingErrors.operationNotAllowedError( + s"STORED AS with file format '${si.storedAs.get}'", ctx)) } /** - * Create a [[CreateTableLikeCommand]] command. + * Create a [[CreateTableLike]] logical plan. * * For example: * {{{ @@ -1158,8 +1138,6 @@ class SparkSqlAstBuilder extends AstBuilder { * }}} */ override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { - val targetTable = visitTableIdentifier(ctx.target) - val sourceTable = visitTableIdentifier(ctx.source) checkDuplicateClauses(ctx.tableProvider, "PROVIDER", ctx) checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx) checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx) @@ -1185,11 +1163,16 @@ class SparkSqlAstBuilder extends AstBuilder { case _ => } - val storage = toStorageFormat(location, serdeInfo, ctx) val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) val cleanedProperties = cleanTableProperties(ctx, properties) - CreateTableLikeCommand( - targetTable, sourceTable, storage, provider, cleanedProperties, ctx.EXISTS != null) + CreateTableLike( + name = withIdentClause(ctx.target, UnresolvedIdentifier(_)), + source = createUnresolvedTableOrView(ctx.source, "CREATE TABLE LIKE", allowTempView = true), + location = location, + provider = provider, + serdeInfo = serdeInfo, + properties = cleanedProperties, + ifNotExists = ctx.EXISTS != null) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala new file mode 100644 index 0000000000000..c4930b45375b4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.net.URI + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.internal.LogKeys.TABLE_NAME +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.catalog.CatalogUtils +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableInfo} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * Physical plan node for CREATE TABLE ... LIKE ... targeting a v2 catalog. + * + * Calls [[TableCatalog.createTableLike]] so that connectors can implement format-specific copy + * semantics (e.g. Delta protocol inheritance, Iceberg sort order and format version). Connectors + * must override [[TableCatalog.createTableLike]]; the default implementation throws + * [[UnsupportedOperationException]]. + * + * The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains strictly user-specified + * overrides: TBLPROPERTIES, LOCATION, and USING provider (only if explicitly given). + * Schema, partitioning, source provider, source TBLPROPERTIES, constraints, and owner are NOT + * pre-populated; connectors read all source metadata directly from [[sourceTable]] and are + * responsible for setting the owner. + */ +case class CreateTableLikeExec( + targetCatalog: TableCatalog, + targetIdent: Identifier, + sourceTable: Table, + location: Option[URI], + provider: Option[String], + properties: Map[String, String], + ifNotExists: Boolean) extends LeafV2CommandExec { + + override def output: Seq[Attribute] = Seq.empty + + override protected def run(): Seq[InternalRow] = { + if (!targetCatalog.tableExists(targetIdent)) { + // Build strictly user-specified overrides: explicit TBLPROPERTIES, LOCATION (if given), + // and USING provider (if given). Provider and owner are not included here; connectors + // are responsible for reading PROP_PROVIDER from sourceTable.properties() and for + // setting the owner via CurrentUserContext.getCurrentUser. + val locationProp: Option[(String, String)] = + location.map(uri => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri)) + + val finalProps = + properties ++ + provider.map(TableCatalog.PROP_PROVIDER -> _) ++ + locationProp + + try { + val userSpecifiedOverrides = new TableInfo.Builder() + .withProperties(finalProps.asJava) + .build() + targetCatalog.createTableLike(targetIdent, sourceTable, userSpecifiedOverrides) + } catch { + case _: TableAlreadyExistsException if ifNotExists => + logWarning( + log"Table ${MDC(TABLE_NAME, targetIdent.quoted)} was created concurrently. Ignoring.") + } + } else if (!ifNotExists) { + throw QueryCompilationErrors.tableAlreadyExistsError(targetIdent) + } + + Seq.empty + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 91e753096a238..7932a0aa53bac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkException, SparkIllegalArgumentException} import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.EXPR -import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} +import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedPersistentView, ResolvedTable, ResolvedTempView} import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression} @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, IdentityColumn, ResolveDefaultColumns, ResolveTableConstraints, V2ExpressionBuilder} import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable} +import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable, V1Table} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} @@ -240,6 +240,26 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat qualifyLocInTableSpec(tableSpec), options, ifNotExists) :: Nil } + // CREATE TABLE ... LIKE ... for a v2 catalog target. + // Source is an already-resolved Table object; no extra catalog round-trip is needed. + // Views are wrapped in V1Table so the exec can extract schema and provider uniformly. + case CreateTableLike( + ResolvedIdentifier(catalog, ident), source, + locationStr, provider, _, properties, ifNotExists) => + val table = source match { + case ResolvedTable(_, _, t, _) => t + case ResolvedPersistentView(_, _, meta) => V1Table(meta) + case ResolvedTempView(_, meta) => V1Table(meta) + } + val location = locationStr.map { loc => + val uri = CatalogUtils.stringToURI(loc) + if (uri.isAbsolute) uri + else if (new Path(uri).isAbsolute) CatalogUtils.makeQualifiedPath(uri, hadoopConf) + else uri + } + CreateTableLikeExec(catalog.asTableCatalog, ident, table, + location, provider, properties, ifNotExists) :: Nil + case RefreshTable(r: ResolvedTable) => RefreshTableExec(r.catalog, r.identifier, recacheTable(r, includeTimeTravel = true)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index cfcfeabbf1f6e..32ada4beb313a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.internal import java.util.Locale import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogUtils} +import org.apache.spark.sql.catalyst.plans.logical.SerdeInfo case class HiveSerDe( inputFormat: Option[String] = None, @@ -104,6 +105,44 @@ object HiveSerDe { */ def serdeToSource(serde: HiveSerDe): Option[String] = serdeInverseMap.get(serde) + /** + * Builds a [[CatalogStorageFormat]] from a user-specified location and optional serde info. + * Uses [[CatalogStorageFormat.empty]] as the base (no Hive defaults). + * + * @param location optional LOCATION URI string + * @param maybeSerdeInfo optional serde/format info from ROW FORMAT / STORED AS clauses + * @param invalidStoredAsError callback returning a [[Throwable]] when STORED AS names an + * unrecognized file format; the caller controls the error message + */ + def buildStorageFormat( + location: Option[String], + maybeSerdeInfo: Option[SerdeInfo], + invalidStoredAsError: SerdeInfo => Throwable): CatalogStorageFormat = { + val locationUri = location.map(CatalogUtils.stringToURI) + maybeSerdeInfo match { + case None => + CatalogStorageFormat.empty.copy(locationUri = locationUri) + case Some(si) if si.storedAs.isDefined => + sourceToSerDe(si.storedAs.get) match { + case Some(hiveSerde) => + CatalogStorageFormat.empty.copy( + locationUri = locationUri, + inputFormat = hiveSerde.inputFormat, + outputFormat = hiveSerde.outputFormat, + serde = si.serde.orElse(hiveSerde.serde), + properties = si.serdeProperties) + case _ => throw invalidStoredAsError(si) + } + case Some(si) => + CatalogStorageFormat.empty.copy( + locationUri = locationUri, + inputFormat = si.formatClasses.map(_.input), + outputFormat = si.formatClasses.map(_.output), + serde = si.serde, + properties = si.serdeProperties) + } + } + def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = { // To respect hive-site.xml, it peeks Hadoop configuration from existing Spark session, // as an easy workaround. See SPARK-27555. diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out index 4e864523368d7..7bd87a4267c77 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out @@ -55,7 +55,7 @@ DescribeColumnCommand `spark_catalog`.`default`.`char_tbl2`, [spark_catalog, def -- !query create table char_tbl3 like char_tbl -- !query analysis -CreateTableLikeCommand `char_tbl3`, `char_tbl`, Storage(), false +CreateTableLikeCommand `spark_catalog`.`default`.`char_tbl3`, `spark_catalog`.`default`.`char_tbl`, Storage(), false -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala new file mode 100644 index 0000000000000..a67831cf63503 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryCatalog} +import org.apache.spark.sql.types.{CharType, IntegerType, LongType, StringType, VarcharType} + +class CreateTableLikeSuite extends DatasourceV2SQLBase { + + private def testCatalog: InMemoryCatalog = + catalog("testcat").asInstanceOf[InMemoryCatalog] + + private def testCatalog2: InMemoryCatalog = + catalog("testcat2").asInstanceOf[InMemoryCatalog] + + // ------------------------------------------------------------------------- + // Basic V2 path + // ------------------------------------------------------------------------- + + test("v2 target, v1 source: schema and partitioning are copied") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint, data string) USING parquet PARTITIONED BY (data)") + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.schema() === CatalogV2Util.v2ColumnsToStructType(dst.columns())) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + // partition column encoded in partitioning (identity transform on the partition column) + assert(dst.partitioning.nonEmpty) + } + } + + test("v2 target, v2 source: pure v2 path") { + withTable("testcat.src", "testcat.dst") { + sql("CREATE TABLE testcat.src (id bigint, data string) USING foo") + sql("CREATE TABLE testcat.dst LIKE testcat.src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + } + } + + test("cross-catalog: source in testcat, target in testcat2") { + withTable("testcat.src", "testcat2.dst") { + sql("CREATE TABLE testcat.src (id bigint, name string) USING foo") + sql("CREATE TABLE testcat2.dst LIKE testcat.src") + + val dst = testCatalog2.loadTable(Identifier.of(Array(), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "name")) + } + } + + test("3-part name: catalog.namespace.table for both target and source") { + withTable("testcat.ns.src", "testcat2.ns.dst") { + sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns") + sql("CREATE NAMESPACE IF NOT EXISTS testcat2.ns") + sql("CREATE TABLE testcat.ns.src (id bigint, data string) USING foo") + sql("CREATE TABLE testcat2.ns.dst LIKE testcat.ns.src") + + val dst = testCatalog2.loadTable(Identifier.of(Array("ns"), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + } + } + + // ------------------------------------------------------------------------- + // IF NOT EXISTS + // ------------------------------------------------------------------------- + + test("IF NOT EXISTS: second call is silent when table already exists") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src") + // Should not throw + sql("CREATE TABLE IF NOT EXISTS testcat.dst LIKE src") + } + } + + test("without IF NOT EXISTS, duplicate create throws TableAlreadyExistsException") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src") + intercept[TableAlreadyExistsException] { + sql("CREATE TABLE testcat.dst LIKE src") + } + } + } + + // ------------------------------------------------------------------------- + // Views as source + // ------------------------------------------------------------------------- + + test("persistent view as source, v2 target") { + withTable("src", "testcat.dst") { + withView("v") { + sql("CREATE TABLE src (id bigint, data string) USING parquet") + sql("INSERT INTO src VALUES (1, 'a')") + sql("CREATE VIEW v AS SELECT * FROM src") + sql("CREATE TABLE testcat.dst LIKE v") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + } + } + } + + test("temp view as source, v2 target") { + withTable("src", "testcat.dst") { + withTempView("tv") { + sql("CREATE TABLE src (id bigint, data string) USING parquet") + sql("CREATE TEMP VIEW tv AS SELECT id, data FROM src") + sql("CREATE TABLE testcat.dst LIKE tv") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + } + } + } + + // ------------------------------------------------------------------------- + // Property / provider behavior + // ------------------------------------------------------------------------- + + test("source TBLPROPERTIES are copied to target when connector implements createTableLike") { + // InMemoryTableCatalog overrides createTableLike to merge source properties into the target, + // demonstrating connector-specific copy semantics. Connectors that do not override + // createTableLike will throw UnsupportedOperationException. + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet TBLPROPERTIES ('source_key' = 'source')") + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.containsKey("source_key"), + "Connector-implemented createTableLike copies source TBLPROPERTIES to target") + } + } + + test("user-specified TBLPROPERTIES override source TBLPROPERTIES in createTableLike") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet TBLPROPERTIES ('key' = 'source_value')") + sql("CREATE TABLE testcat.dst LIKE src TBLPROPERTIES ('key' = 'user_value')") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.get("key") == "user_value", + "User-specified TBLPROPERTIES should override source TBLPROPERTIES") + } + } + + test("user-specified TBLPROPERTIES are applied on target") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src TBLPROPERTIES ('custom' = 'value')") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.get("custom") === "value") + } + } + + test("USING clause overrides source provider") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src USING foo") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.get("provider") === "foo") + } + } + + test("v2 catalog target: non-existent provider is stored as property without validation") { + // Pure V2 catalogs (e.g. InMemoryCatalog) do not validate the provider — they store it + // as a plain property and let the catalog implementation decide what to do with it. + // This is consistent with how CreateTableExec works for CREATE TABLE targeting V2 catalogs. + withTable("testcat.src", "testcat.dst") { + sql("CREATE TABLE testcat.src (id bigint) USING foo") + sql("CREATE TABLE testcat.dst LIKE testcat.src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.get("provider") === "foo", + "Provider should be copied from source as-is without validation") + } + } + + test("source provider is copied to v2 target when no USING override") { + // When no USING clause is given, provider inheritance is handled by the connector: + // InMemoryTableCatalog.createTableLike merges sourceTable.properties() into the target, + // which includes PROP_PROVIDER set by the source table. + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.get("provider") === "parquet", + "Source provider should be copied to the V2 target when no USING clause is specified") + } + } + + // ------------------------------------------------------------------------- + // Column type fidelity + // ------------------------------------------------------------------------- + + test("CHAR and VARCHAR types are preserved from v1 source to v2 target") { + // InMemoryTableCatalog.createTableLike applies CharVarcharUtils.getRawSchema when + // the source is a V1Table, preserving CHAR/VARCHAR types as declared. + // This illustrates the pattern connectors should follow to preserve declared types. + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint, name CHAR(10), tag VARCHAR(20)) USING parquet") + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val schema = CatalogV2Util.v2ColumnsToStructType(dst.columns()) + assert(schema("name").dataType === CharType(10)) + assert(schema("tag").dataType === VarcharType(20)) + } + } + + test("multiple column types are preserved") { + withTable("src", "testcat.dst") { + sql( + """CREATE TABLE src ( + | id bigint, + | name string, + | score int + |) USING parquet""".stripMargin) + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val schema = CatalogV2Util.v2ColumnsToStructType(dst.columns()) + assert(schema("id").dataType === LongType) + assert(schema("name").dataType === StringType) + assert(schema("score").dataType === IntegerType) + } + } + + // ------------------------------------------------------------------------- + // V2 source, session catalog (V1) target: unsupported + // ------------------------------------------------------------------------- + + test("v2 source, session catalog target: throws because session catalog does not " + + "implement createTableLike") { + // CREATE TABLE LIKE targeting the session catalog with a V2 source goes through + // CreateTableLikeExec, which calls createTableLike on the session catalog. + // The session catalog (InMemoryTableSessionCatalog in tests) does not override + // createTableLike, so the default UnsupportedOperationException is thrown. + withTable("testcat.src") { + sql("CREATE TABLE testcat.src (id bigint, data string) USING parquet") + val ex = intercept[UnsupportedOperationException] { + sql("CREATE TABLE dst LIKE testcat.src") + } + assert(ex.getMessage.contains("CREATE TABLE LIKE")) + } + } + + // ------------------------------------------------------------------------- + // V1 fallback regression + // ------------------------------------------------------------------------- + + test("v1 fallback: CREATE TABLE default.dst LIKE default.src still uses CreateTableLikeCommand") { + withTable("src", "dst") { + sql("CREATE TABLE src (id bigint, data string) USING parquet") + sql("CREATE TABLE dst LIKE src") + // Verify via session catalog that dst was created with the correct schema + val meta = spark.sessionState.catalog.getTableMetadata( + spark.sessionState.sqlParser.parseTableIdentifier("dst")) + assert(meta.schema.fieldNames === Array("id", "data")) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 9db7ae2e2824c..2508a5cee3e02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.SparkThrowable import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, SchemaCompensation, UnresolvedAttribute, UnresolvedIdentifier} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, SchemaCompensation, UnresolvedAttribute, UnresolvedIdentifier, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, FileResource, FunctionResource, JarResource} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans @@ -721,83 +721,76 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { } test("create table like") { - val v1 = "CREATE TABLE table1 LIKE table2" - val (target, source, fileFormat, provider, properties, exists) = - parser.parsePlan(v1).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + // Helper to extract fields from the new CreateTableLike unresolved plan. + // The parser now emits CreateTableLike (v2 logical plan) instead of + // CreateTableLikeCommand; the name is an UnresolvedIdentifier and the source is + // an UnresolvedTableOrView. + def extract(sql: String): (Seq[String], Seq[String], + Option[String], Option[String], Map[String, String], Boolean) = + parser.parsePlan(sql).collect { + case CreateTableLike( + UnresolvedIdentifier(targetParts, _), + UnresolvedTableOrView(sourceParts, _, _), + loc, p, _, pr, e) => + (targetParts, sourceParts, loc, p, pr, e) + }.head + + val (target, source, location, provider, properties, exists) = + extract("CREATE TABLE table1 LIKE table2") assert(exists == false) - assert(target.database.isEmpty) - assert(target.table == "table1") - assert(source.database.isEmpty) - assert(source.table == "table2") - assert(fileFormat.locationUri.isEmpty) + assert(target == Seq("table1")) + assert(source == Seq("table2")) + assert(location.isEmpty) assert(provider.isEmpty) - val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2" - val (target2, source2, fileFormat2, provider2, properties2, exists2) = - parser.parsePlan(v2).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + val (target2, source2, location2, provider2, properties2, exists2) = + extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2") assert(exists2) - assert(target2.database.isEmpty) - assert(target2.table == "table1") - assert(source2.database.isEmpty) - assert(source2.table == "table2") - assert(fileFormat2.locationUri.isEmpty) + assert(target2 == Seq("table1")) + assert(source2 == Seq("table2")) + assert(location2.isEmpty) assert(provider2.isEmpty) - val v3 = "CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'" - val (target3, source3, fileFormat3, provider3, properties3, exists3) = - parser.parsePlan(v3).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + val (target3, source3, location3, provider3, properties3, exists3) = + extract("CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'") assert(!exists3) - assert(target3.database.isEmpty) - assert(target3.table == "table1") - assert(source3.database.isEmpty) - assert(source3.table == "table2") - assert(fileFormat3.locationUri.map(_.toString) == Some("/spark/warehouse")) + assert(target3 == Seq("table1")) + assert(source3 == Seq("table2")) + assert(location3 == Some("/spark/warehouse")) assert(provider3.isEmpty) - val v4 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'" - val (target4, source4, fileFormat4, provider4, properties4, exists4) = - parser.parsePlan(v4).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + val (target4, source4, location4, provider4, properties4, exists4) = + extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'") assert(exists4) - assert(target4.database.isEmpty) - assert(target4.table == "table1") - assert(source4.database.isEmpty) - assert(source4.table == "table2") - assert(fileFormat4.locationUri.map(_.toString) == Some("/spark/warehouse")) + assert(target4 == Seq("table1")) + assert(source4 == Seq("table2")) + assert(location4 == Some("/spark/warehouse")) assert(provider4.isEmpty) - val v5 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING parquet" - val (target5, source5, fileFormat5, provider5, properties5, exists5) = - parser.parsePlan(v5).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + val (target5, source5, location5, provider5, properties5, exists5) = + extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING parquet") assert(exists5) - assert(target5.database.isEmpty) - assert(target5.table == "table1") - assert(source5.database.isEmpty) - assert(source5.table == "table2") - assert(fileFormat5.locationUri.isEmpty) + assert(target5 == Seq("table1")) + assert(source5 == Seq("table2")) + assert(location5.isEmpty) assert(provider5 == Some("parquet")) - val v6 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING ORC" - val (target6, source6, fileFormat6, provider6, properties6, exists6) = - parser.parsePlan(v6).collect { - case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e) - }.head + val (target6, source6, location6, provider6, properties6, exists6) = + extract("CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING ORC") assert(exists6) - assert(target6.database.isEmpty) - assert(target6.table == "table1") - assert(source6.database.isEmpty) - assert(source6.table == "table2") - assert(fileFormat6.locationUri.isEmpty) + assert(target6 == Seq("table1")) + assert(source6 == Seq("table2")) + assert(location6.isEmpty) assert(provider6 == Some("ORC")) + + // 3-part names: catalog.namespace.table + val (target7, source7, location7, provider7, properties7, exists7) = + extract("CREATE TABLE testcat.ns.dst LIKE testcat.ns.src") + assert(!exists7) + assert(target7 == Seq("testcat", "ns", "dst")) + assert(source7 == Seq("testcat", "ns", "src")) + assert(location7.isEmpty) + assert(provider7.isEmpty) } test("SET CATALOG") {