diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 95c6f04514db..24eb99570414 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -609,14 +609,17 @@ protected void updateLastColumn( applyRenameColumnsToOptions(newOptions, changes), newComment); - return new TableSchema( - oldTableSchema.id() + 1, - newSchema.fields(), - highestFieldId.get(), - newSchema.partitionKeys(), - newSchema.primaryKeys(), - newSchema.options(), - newSchema.comment()); + TableSchema newTableSchema = + new TableSchema( + oldTableSchema.id() + 1, + newSchema.fields(), + highestFieldId.get(), + newSchema.partitionKeys(), + newSchema.primaryKeys(), + newSchema.options(), + newSchema.comment()); + SchemaValidation.validateTableSchema(newTableSchema); + return newTableSchema; } // gets the rootType at the defined depth diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 9a15cab36159..7e8616db9df8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -92,6 +92,8 @@ public class SchemaManagerTest { private final List primaryKeys = Arrays.asList("f0", "f1"); private final Map options = Collections.singletonMap("key", "value"); private final RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType()); + private final RowType rowTypeWithSequenceField = + RowType.of(new IntType(), new BigIntType(), new VarCharType(), new BigIntType()); private final Schema schema = new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""); @@ -168,6 +170,51 @@ public void testUpdateOptions() throws Exception { assertThat(latest.get().options()).containsEntry("new_k", "new_v"); } + @Test + public void testResetSequenceGroupForAggregateFunction() throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update"); + options.put(CoreOptions.BUCKET.key(), "1"); + options.put("fields.f2.aggregate-function", "sum"); + options.put("fields.f3.sequence-group", "f2"); + Schema schema = + new Schema( + rowTypeWithSequenceField.getFields(), partitionKeys, primaryKeys, options, ""); + + retryArtificialException(() -> manager.createTable(schema)); + + assertThatThrownBy( + () -> + retryArtificialException( + () -> + manager.commitChanges( + SchemaChange.removeOption( + "fields.f3.sequence-group")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Must use sequence group for aggregation functions but not found for field f2."); + } + + @Test + public void testResetSequenceGroupForLastNonNullAggregateFunction() throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update"); + options.put(CoreOptions.BUCKET.key(), "1"); + options.put("fields.f2.aggregate-function", "last_non_null_value"); + options.put("fields.f3.sequence-group", "f2"); + Schema schema = + new Schema( + rowTypeWithSequenceField.getFields(), partitionKeys, primaryKeys, options, ""); + + retryArtificialException(() -> manager.createTable(schema)); + retryArtificialException( + () -> manager.commitChanges(SchemaChange.removeOption("fields.f3.sequence-group"))); + + Optional latest = retryArtificialException(() -> manager.latest()); + assertThat(latest.isPresent()).isTrue(); + assertThat(latest.get().options()).doesNotContainKey("fields.f3.sequence-group"); + } + @Test public void testConcurrentCommit() throws Exception { retryArtificialException( diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index 5ed50328ec29..fbc2f660d42f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -51,6 +51,9 @@ /** ITCase for spark reader. */ public class SparkReadITCase extends SparkReadTestBase { + private static final String CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS = + "Can not set changelog-producer on table without primary keys"; + @Test public void testNormal() { innerTestSimpleType(spark.table("t1")); @@ -311,10 +314,7 @@ public void testConflictOption() { () -> spark.sql( "CREATE TABLE T (a INT) TBLPROPERTIES ('changelog-producer' = 'input')")) - .rootCause() - .isInstanceOf(RuntimeException.class) - .hasMessageContaining( - "Can not set changelog-producer on table without primary keys"); + .satisfies(SparkReadITCase::assertChangelogProducerWithoutPrimaryKeys); spark.sql("CREATE TABLE T (a INT)"); @@ -322,10 +322,21 @@ public void testConflictOption() { () -> spark.sql( "ALTER TABLE T SET TBLPROPERTIES('changelog-producer' 'input')")) - .rootCause() - .isInstanceOf(RuntimeException.class) - .hasMessageContaining( - "Can not set changelog-producer on table without primary keys"); + .satisfies(SparkReadITCase::assertChangelogProducerWithoutPrimaryKeys); + } + + private static void assertChangelogProducerWithoutPrimaryKeys(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof RuntimeException + && current.getMessage() != null + && current.getMessage().contains(CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS)) { + return; + } + current = current.getCause(); + } + + assertThat(throwable).hasMessageContaining(CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS); } @Test