Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f26c1a6
[SPARK-XXXXX][SQL] Support CREATE TABLE LIKE for V2 catalogs
viirya Mar 14, 2026
a13cfbb
[SPARK] Add V2→V1 cross-catalog tests for CREATE TABLE LIKE
viirya Mar 14, 2026
ab99578
[SPARK] Add tests verifying V2 catalog provider validation behavior
viirya Mar 14, 2026
9f92a9b
[SPARK] Add tests for provider copy and CHAR/VARCHAR preservation in …
viirya Mar 14, 2026
36a95d0
[SPARK] Document why constraints are not copied in CreateTableLikeExec
viirya Mar 15, 2026
1755580
[SPARK] Correct constraint comment in CreateTableLikeExec
viirya Mar 15, 2026
8fdaf69
[SPARK] Update charvarchar.sql analyzer golden file for fully qualifi…
viirya Mar 15, 2026
4788469
[SPARK] Address PR review comments on CREATE TABLE LIKE V2
viirya Mar 17, 2026
7912780
[SPARK] Add CatalogExtension test for CREATE TABLE LIKE V2 path
viirya Mar 17, 2026
b0a26e4
[SPARK] Add createTableLike API to TableCatalog for connector-delegat…
viirya Mar 19, 2026
a71107d
[SPARK] Implement createTableLike in InMemoryTableCatalog with tests
viirya Mar 19, 2026
5687e8b
[SPARK] Redesign createTableLike API: overrides-only TableInfo, throw…
viirya Mar 20, 2026
6fefd98
[SPARK] Add test verifying V2->session catalog LIKE throws Unsupporte…
viirya Mar 20, 2026
628cc31
[SPARK] Fix Javadoc contradiction in createTableLike default method
viirya Mar 21, 2026
3b94076
[SPARK] Add CHAR/VARCHAR preservation in InMemoryTableCatalog and fix…
viirya Mar 21, 2026
9fdf2fa
[SPARK] Address gengliangwang review: rename tableInfo param, qualify…
viirya Mar 23, 2026
f5d3d25
[SPARK] Make userSpecifiedOverrides strictly user-overrides-only; rem…
viirya Mar 24, 2026
74a7ff1
[SPARK] Remove owner from userSpecifiedOverrides; connector is respon…
viirya Mar 24, 2026
f4a1cd3
[SPARK] Replace CatalogStorageFormat in CreateTableLike with location…
viirya Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ statement
| createTableHeader (LEFT_PAREN tableElementList RIGHT_PAREN)? tableProvider?
createTableClauses
(AS? query)? #createTable
| CREATE TABLE (IF errorCapturingNot EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier
| CREATE TABLE (IF errorCapturingNot EXISTS)? target=identifierReference
LIKE source=identifierReference
(tableProvider |
rowFormat |
createFileFormat |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,41 @@ default Table createTable(Identifier ident, TableInfo tableInfo)
return createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties());
}

/**
* Create a table in the catalog by copying metadata from an existing source table.
* <p>
* This method is called for {@code CREATE TABLE ... LIKE ...} statements targeting this catalog.
* The {@code userSpecifiedOverrides} parameter contains strictly user-specified overrides:
* TBLPROPERTIES, LOCATION, and the USING provider (only if explicitly specified).
* It does NOT contain schema, partitioning, properties, constraints, or owner from the source
* table. Connectors must read all source metadata directly from {@code sourceTable}, including
* columns ({@link Table#columns()}), partitioning ({@link Table#partitioning()}),
* constraints ({@link Table#constraints()}), and format-specific properties
* ({@link Table#properties()}). Connectors are also responsible for setting the owner of the
* new table (e.g. via {@code org.apache.spark.sql.catalyst.CurrentUserContext#getCurrentUser}).
* <p>
* The default implementation throws {@link UnsupportedOperationException}. Connectors that
* support {@code CREATE TABLE ... LIKE ...} must override this method.
*
* @param ident a table identifier for the new table
* @param sourceTable the resolved source table; connectors read schema, partitioning,
* constraints, properties, and any format-specific metadata from this object
* @param userSpecifiedOverrides strictly user-specified overrides: TBLPROPERTIES, LOCATION,
* and USING provider (if explicitly given); source schema,
* partitioning, provider, constraints, and owner are NOT included
* @return metadata for the new table
*
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
* @throws UnsupportedOperationException If the catalog does not support CREATE TABLE LIKE
* @since 4.2.0
*/
default Table createTableLike(
Identifier ident, Table sourceTable, TableInfo userSpecifiedOverrides)
throws TableAlreadyExistsException, NoSuchNamespaceException {
throw new UnsupportedOperationException(name() + " does not support CREATE TABLE LIKE");
}

/**
* If true, mark all the fields of the query schema as nullable when executing
* CREATE/REPLACE TABLE ... AS SELECT ... and creating the table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Transform[] partitions() {
public Constraint[] constraints() { return constraints; }

public static class Builder {
private Column[] columns;
private Column[] columns = new Column[0];
private Map<String, String> properties = new HashMap<>();
private Transform[] partitions = new Transform[0];
private Constraint[] constraints = new Constraint[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, columns)
r.copy(name = resolvedIdentifier)

case c @ CreateTableLike(UnresolvedIdentifier(nameParts, allowTemp), _, _, _, _, _, _) =>
val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, Nil)
c.copy(name = resolvedIdentifier)

case UnresolvedIdentifier(nameParts, allowTemp) =>
resolveIdentifier(nameParts, allowTemp, Nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,41 @@ case class CreateTable(
}
}

/**
* Create a new table with the same schema/partitioning as an existing table or view,
* for use with a v2 catalog.
*
* @param name Target table identifier. Starts as UnresolvedIdentifier, resolved to
* ResolvedIdentifier by ResolveCatalogs.
* @param source Source table or view. Starts as UnresolvedTableOrView, resolved to
* ResolvedTable / ResolvedPersistentView / ResolvedTempView by ResolveRelations.
* @param location User-specified LOCATION. None if not specified.
* @param provider User-specified USING provider. None if not specified.
* @param serdeInfo User-specified STORED AS / ROW FORMAT (Hive-style). None if not specified.
* Kept separate from [[location]] so that [[ResolveSessionCatalog]] can
* reconstruct the full
* [[org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat]]
* for the V1 fallback path without pulling V1 catalog types into this plan.
* @param properties User-specified TBLPROPERTIES.
* @param ifNotExists IF NOT EXISTS flag.
*/
case class CreateTableLike(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have one single command (UnaryRunnableCommand)? I thought that's the preferred way now to reduce plan complexity in the different stages

name: LogicalPlan,
source: LogicalPlan,
location: Option[String],
provider: Option[String],
serdeInfo: Option[SerdeInfo],
properties: Map[String, String],
ifNotExists: Boolean) extends BinaryCommand {

override def left: LogicalPlan = name
override def right: LogicalPlan = source

override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): CreateTableLike =
copy(name = newLeft, source = newRight)
}

/**
* Create a new table from a select query with a v2 catalog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1377,4 +1377,122 @@ class CatalogSuite extends SparkFunSuite {
catalog.alterTable(testIdent, TableChange.addConstraint(constraints.apply(0), "3"))
assert(catalog.loadTable(testIdent).version() == "4")
}

// ---- createTableLike tests ----

test("createTableLike: user-specified properties in tableInfo are applied to target") {
val catalog = newCatalog()
val srcIdent = Identifier.of(Array("ns"), "src")
val dstIdent = Identifier.of(Array("ns"), "dst")

val srcProps = Map("source.key" -> "source.value").asJava
catalog.createTable(srcIdent, columns, emptyTrans, srcProps)
val sourceTable = catalog.loadTable(srcIdent)

// tableInfo contains only user overrides; schema and partitioning come from sourceTable
val overrides = Map("user.key" -> "user.value").asJava
val tableInfo = new TableInfo.Builder()
.withProperties(overrides)
.build()
catalog.createTableLike(dstIdent, sourceTable, tableInfo)

val dst = catalog.loadTable(dstIdent)
assert(dst.properties.asScala("user.key") == "user.value",
"user-specified properties should be applied to the target")
}

test("createTableLike: source properties are copied to target by connector implementation") {
val catalog = newCatalog()
val srcIdent = Identifier.of(Array("ns"), "src")
val dstIdent = Identifier.of(Array("ns"), "dst")

val srcProps = Map("format.version" -> "2", "format.feature" -> "deletion-vectors").asJava
catalog.createTable(srcIdent, columns, emptyTrans, srcProps)
val sourceTable = catalog.loadTable(srcIdent)

// tableInfo contains no overrides; connector reads schema and properties from sourceTable
val tableInfo = new TableInfo.Builder()
.withProperties(emptyProps)
.build()
catalog.createTableLike(dstIdent, sourceTable, tableInfo)

val dst = catalog.loadTable(dstIdent)
assert(dst.properties.asScala("format.version") == "2",
"connector should copy source properties from sourceTable")
assert(dst.properties.asScala("format.feature") == "deletion-vectors",
"connector should copy source properties from sourceTable")
}

test("createTableLike: user-specified properties override source properties") {
val catalog = newCatalog()
val srcIdent = Identifier.of(Array("ns"), "src")
val dstIdent = Identifier.of(Array("ns"), "dst")

val srcProps = Map("format.version" -> "1", "source.only" -> "yes").asJava
catalog.createTable(srcIdent, columns, emptyTrans, srcProps)
val sourceTable = catalog.loadTable(srcIdent)

// user explicitly overrides format.version
val overrides = Map("format.version" -> "2").asJava
val tableInfo = new TableInfo.Builder()
.withProperties(overrides)
.build()
catalog.createTableLike(dstIdent, sourceTable, tableInfo)

val dst = catalog.loadTable(dstIdent)
assert(dst.properties.asScala("format.version") == "2",
"user-specified properties should override source properties")
assert(dst.properties.asScala("source.only") == "yes",
"non-overridden source properties should still be copied")
}

test("createTableLike: source constraints are copied to target by connector implementation") {
val catalog = newCatalog()
val srcIdent = Identifier.of(Array("ns"), "src")
val dstIdent = Identifier.of(Array("ns"), "dst")

val srcTableInfo = new TableInfo.Builder()
.withColumns(columns)
.withPartitions(emptyTrans)
.withProperties(emptyProps)
.withConstraints(constraints)
.build()
catalog.createTable(srcIdent, srcTableInfo)
val sourceTable = catalog.loadTable(srcIdent)

val tableInfo = new TableInfo.Builder()
.withProperties(emptyProps)
.build()
catalog.createTableLike(dstIdent, sourceTable, tableInfo)

val dst = catalog.loadTable(dstIdent)
assert(dst.constraints().toSet == constraints.toSet,
"connector should copy source constraints from sourceTable.constraints()")
}

test("createTableLike: catalog without createTableLike override throws " +
"UnsupportedOperationException") {
// BasicInMemoryTableCatalog does not override createTableLike. The default implementation
// throws UnsupportedOperationException to signal that connectors must explicitly implement
// CREATE TABLE LIKE support.
val catalog = new BasicInMemoryTableCatalog
catalog.initialize("basic", CaseInsensitiveStringMap.empty())

val srcIdent = Identifier.of(Array("ns"), "src")
val dstIdent = Identifier.of(Array("ns"), "dst")

catalog.createTable(srcIdent, columns, emptyTrans, Map.empty[String, String].asJava)
val sourceTable = catalog.loadTable(srcIdent)

val tableInfo = new TableInfo.Builder()
.withProperties(Map.empty[String, String].asJava)
.build()
val ex = intercept[UnsupportedOperationException] {
catalog.createTableLike(dstIdent, sourceTable, tableInfo)
}
assert(ex.getMessage.contains("basic"),
"Exception should mention the catalog name")
assert(ex.getMessage.contains("CREATE TABLE LIKE"),
"Exception should mention the unsupported operation")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.{CurrentUserContext, InternalRow}
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.constraints.Constraint
import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, ProcedureParameter, UnboundProcedure}
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
Expand Down Expand Up @@ -238,6 +239,28 @@ class BasicInMemoryTableCatalog extends TableCatalog {
class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces
with ProcedureCatalog {

override def createTableLike(
ident: Identifier,
sourceTable: Table,
userSpecifiedOverrides: TableInfo): Table = {
// Read schema from source. For V1Table sources, apply CharVarcharUtils to preserve
// CHAR/VARCHAR types as declared rather than collapsed to StringType.
val columns = sourceTable match {
case v1: V1Table =>
CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(v1.catalogTable.schema))
case _ =>
sourceTable.columns()
}
// Merge source properties with user overrides (user overrides win), then set current user
// as owner (overrides source owner). Connectors are responsible for setting the owner.
val mergedProps =
(sourceTable.properties().asScala ++
userSpecifiedOverrides.properties().asScala ++
Map(TableCatalog.PROP_OWNER -> CurrentUserContext.getCurrentUser)).asJava
createTable(ident, columns, sourceTable.partitioning(), mergedProps,
Distributions.unspecified(), Array.empty, None, None, sourceTable.constraints())
}

override def capabilities: java.util.Set[TableCatalogCapability] = {
Set(
TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,17 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c
}

// For CREATE TABLE LIKE, use the v1 command if both the target and source are in the session
// catalog (or a V1-compatible catalog extension). If source is in a different catalog, fall
// through to the V2 execution path (CreateTableLikeExec via DataSourceV2Strategy).
case CreateTableLike(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean for DSv2 connectors that override the session catalog?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example is Iceberg session catalog.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, agree, we should add a test for sessionCatalog

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. When a connector like Iceberg overrides the session catalog, the target resolves through ResolvedV1Identifier (since supportsV1Command returns true for the session catalog), but the source — a native Iceberg Table — does NOT match ResolvedV1TableOrViewIdentifier (which requires V1Table). So ResolveSessionCatalog falls through and CreateTableLikeExec handles it, passing the Iceberg Table directly. The target createTable call goes to V2SessionCatalog, which delegates to the Iceberg catalog extension. This should work, but deserves a test. I can add one if you'd like.

ResolvedV1Identifier(targetIdent),
ResolvedV1TableOrViewIdentifier(sourceIdent),
location, provider, serdeInfo, properties, ifNotExists) =>
val fileFormat = buildStorageFormatFromSerdeInfo(location, serdeInfo)
CreateTableLikeCommand(
targetIdent, sourceIdent, fileFormat, provider, properties, ifNotExists)

case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if conf.useV1Command =>
DropTableCommand(ident, ifExists, isView = false, purge = purge)

Expand Down Expand Up @@ -626,6 +637,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
CreateTableV1(tableDesc, mode, query)
}

private def buildStorageFormatFromSerdeInfo(
location: Option[String],
maybeSerdeInfo: Option[SerdeInfo]): CatalogStorageFormat = {
HiveSerDe.buildStorageFormat(
location,
maybeSerdeInfo,
si => QueryCompilationErrors.invalidFileFormatForStoredAsError(si))
}

private def getStorageFormatAndProvider(
provider: Option[String],
options: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,35 +1113,15 @@ class SparkSqlAstBuilder extends AstBuilder {
location: Option[String],
maybeSerdeInfo: Option[SerdeInfo],
ctx: ParserRuleContext): CatalogStorageFormat = {
if (maybeSerdeInfo.isEmpty) {
CatalogStorageFormat.empty.copy(locationUri = location.map(CatalogUtils.stringToURI))
} else {
val serdeInfo = maybeSerdeInfo.get
if (serdeInfo.storedAs.isEmpty) {
CatalogStorageFormat.empty.copy(
locationUri = location.map(CatalogUtils.stringToURI),
inputFormat = serdeInfo.formatClasses.map(_.input),
outputFormat = serdeInfo.formatClasses.map(_.output),
serde = serdeInfo.serde,
properties = serdeInfo.serdeProperties)
} else {
HiveSerDe.sourceToSerDe(serdeInfo.storedAs.get) match {
case Some(hiveSerde) =>
CatalogStorageFormat.empty.copy(
locationUri = location.map(CatalogUtils.stringToURI),
inputFormat = hiveSerde.inputFormat,
outputFormat = hiveSerde.outputFormat,
serde = serdeInfo.serde.orElse(hiveSerde.serde),
properties = serdeInfo.serdeProperties)
case _ =>
operationNotAllowed(s"STORED AS with file format '${serdeInfo.storedAs.get}'", ctx)
}
}
}
HiveSerDe.buildStorageFormat(
location,
maybeSerdeInfo,
si => QueryParsingErrors.operationNotAllowedError(
s"STORED AS with file format '${si.storedAs.get}'", ctx))
}

/**
* Create a [[CreateTableLikeCommand]] command.
* Create a [[CreateTableLike]] logical plan.
*
* For example:
* {{{
Expand All @@ -1158,8 +1138,6 @@ class SparkSqlAstBuilder extends AstBuilder {
* }}}
*/
override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
val targetTable = visitTableIdentifier(ctx.target)
val sourceTable = visitTableIdentifier(ctx.source)
checkDuplicateClauses(ctx.tableProvider, "PROVIDER", ctx)
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
Expand All @@ -1185,11 +1163,16 @@ class SparkSqlAstBuilder extends AstBuilder {
case _ =>
}

val storage = toStorageFormat(location, serdeInfo, ctx)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val cleanedProperties = cleanTableProperties(ctx, properties)
CreateTableLikeCommand(
targetTable, sourceTable, storage, provider, cleanedProperties, ctx.EXISTS != null)
CreateTableLike(
name = withIdentClause(ctx.target, UnresolvedIdentifier(_)),
source = createUnresolvedTableOrView(ctx.source, "CREATE TABLE LIKE", allowTempView = true),
location = location,
provider = provider,
serdeInfo = serdeInfo,
properties = cleanedProperties,
ifNotExists = ctx.EXISTS != null)
}

/**
Expand Down
Loading