From 041a61fca7cf5835367adedbdd99e642198552c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Mon, 1 Jun 2026 17:38:28 +0800 Subject: [PATCH 1/3] [core] Validate schema after applying schema changes --- .../apache/paimon/schema/SchemaManager.java | 19 +++++---- .../paimon/schema/SchemaManagerTest.java | 41 +++++++++++++++++++ 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 95c6f04514db..24eb99570414 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -609,14 +609,17 @@ protected void updateLastColumn( applyRenameColumnsToOptions(newOptions, changes), newComment); - return new TableSchema( - oldTableSchema.id() + 1, - newSchema.fields(), - highestFieldId.get(), - newSchema.partitionKeys(), - newSchema.primaryKeys(), - newSchema.options(), - newSchema.comment()); + TableSchema newTableSchema = + new TableSchema( + oldTableSchema.id() + 1, + newSchema.fields(), + highestFieldId.get(), + newSchema.partitionKeys(), + newSchema.primaryKeys(), + newSchema.options(), + newSchema.comment()); + SchemaValidation.validateTableSchema(newTableSchema); + return newTableSchema; } // gets the rootType at the defined depth diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 9a15cab36159..0c9470514c0a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -168,6 +168,47 @@ public void testUpdateOptions() throws Exception { assertThat(latest.get().options()).containsEntry("new_k", "new_v"); } + @Test + public void testResetSequenceGroupForAggregateFunction() throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update"); + options.put(CoreOptions.BUCKET.key(), "1"); + options.put("fields.f2.aggregate-function", "sum"); + options.put("fields.f1.sequence-group", "f2"); + Schema schema = new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""); + + retryArtificialException(() -> manager.createTable(schema)); + + assertThatThrownBy( + () -> + retryArtificialException( + () -> + manager.commitChanges( + SchemaChange.removeOption( + "fields.f1.sequence-group")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Must use sequence group for aggregation functions but not found for field f2."); + } + + @Test + public void testResetSequenceGroupForLastNonNullAggregateFunction() throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update"); + options.put(CoreOptions.BUCKET.key(), "1"); + options.put("fields.f2.aggregate-function", "last_non_null_value"); + options.put("fields.f1.sequence-group", "f2"); + Schema schema = new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""); + + retryArtificialException(() -> manager.createTable(schema)); + retryArtificialException( + () -> manager.commitChanges(SchemaChange.removeOption("fields.f1.sequence-group"))); + + Optional latest = retryArtificialException(() -> manager.latest()); + assertThat(latest.isPresent()).isTrue(); + assertThat(latest.get().options()).doesNotContainKey("fields.f1.sequence-group"); + } + @Test public void testConcurrentCommit() throws Exception { retryArtificialException( From c404b1375a0af93cdfc70341547d71b454a9f3a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Mon, 8 Jun 2026 15:12:00 +0800 Subject: [PATCH 2/3] [core] Fix Spark conflict option assertion --- .../apache/paimon/spark/SparkReadITCase.java | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index 5ed50328ec29..fbc2f660d42f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -51,6 +51,9 @@ /** ITCase for spark reader. */ public class SparkReadITCase extends SparkReadTestBase { + private static final String CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS = + "Can not set changelog-producer on table without primary keys"; + @Test public void testNormal() { innerTestSimpleType(spark.table("t1")); @@ -311,10 +314,7 @@ public void testConflictOption() { () -> spark.sql( "CREATE TABLE T (a INT) TBLPROPERTIES ('changelog-producer' = 'input')")) - .rootCause() - .isInstanceOf(RuntimeException.class) - .hasMessageContaining( - "Can not set changelog-producer on table without primary keys"); + .satisfies(SparkReadITCase::assertChangelogProducerWithoutPrimaryKeys); spark.sql("CREATE TABLE T (a INT)"); @@ -322,10 +322,21 @@ public void testConflictOption() { () -> spark.sql( "ALTER TABLE T SET TBLPROPERTIES('changelog-producer' 'input')")) - .rootCause() - .isInstanceOf(RuntimeException.class) - .hasMessageContaining( - "Can not set changelog-producer on table without primary keys"); + .satisfies(SparkReadITCase::assertChangelogProducerWithoutPrimaryKeys); + } + + private static void assertChangelogProducerWithoutPrimaryKeys(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof RuntimeException + && current.getMessage() != null + && current.getMessage().contains(CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS)) { + return; + } + current = current.getCause(); + } + + assertThat(throwable).hasMessageContaining(CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS); } @Test From 03a618bd23495ad262f421eb74d2d4f7b614684c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Mon, 8 Jun 2026 18:08:34 +0800 Subject: [PATCH 3/3] [core] Fix sequence group schema tests --- .../paimon/schema/SchemaManagerTest.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 0c9470514c0a..7e8616db9df8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -92,6 +92,8 @@ public class SchemaManagerTest { private final List primaryKeys = Arrays.asList("f0", "f1"); private final Map options = Collections.singletonMap("key", "value"); private final RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType()); + private final RowType rowTypeWithSequenceField = + RowType.of(new IntType(), new BigIntType(), new VarCharType(), new BigIntType()); private final Schema schema = new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""); @@ -174,8 +176,10 @@ public void testResetSequenceGroupForAggregateFunction() throws Exception { options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update"); options.put(CoreOptions.BUCKET.key(), "1"); options.put("fields.f2.aggregate-function", "sum"); - options.put("fields.f1.sequence-group", "f2"); - Schema schema = new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""); + options.put("fields.f3.sequence-group", "f2"); + Schema schema = + new Schema( + rowTypeWithSequenceField.getFields(), partitionKeys, primaryKeys, options, ""); retryArtificialException(() -> manager.createTable(schema)); @@ -185,7 +189,7 @@ public void testResetSequenceGroupForAggregateFunction() throws Exception { () -> manager.commitChanges( SchemaChange.removeOption( - "fields.f1.sequence-group")))) + "fields.f3.sequence-group")))) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining( "Must use sequence group for aggregation functions but not found for field f2."); @@ -197,16 +201,18 @@ public void testResetSequenceGroupForLastNonNullAggregateFunction() throws Excep options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update"); options.put(CoreOptions.BUCKET.key(), "1"); options.put("fields.f2.aggregate-function", "last_non_null_value"); - options.put("fields.f1.sequence-group", "f2"); - Schema schema = new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""); + options.put("fields.f3.sequence-group", "f2"); + Schema schema = + new Schema( + rowTypeWithSequenceField.getFields(), partitionKeys, primaryKeys, options, ""); retryArtificialException(() -> manager.createTable(schema)); retryArtificialException( - () -> manager.commitChanges(SchemaChange.removeOption("fields.f1.sequence-group"))); + () -> manager.commitChanges(SchemaChange.removeOption("fields.f3.sequence-group"))); Optional latest = retryArtificialException(() -> manager.latest()); assertThat(latest.isPresent()).isTrue(); - assertThat(latest.get().options()).doesNotContainKey("fields.f1.sequence-group"); + assertThat(latest.get().options()).doesNotContainKey("fields.f3.sequence-group"); } @Test