@@ -123,22 +123,31 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int
123123 public static void validateAlterTableProperties (
124124 TableInfo currentTable , Set <String > tableKeysToChange ) {
125125 TableConfig currentConfig = currentTable .getTableConfig ();
126- tableKeysToChange .forEach (
127- k -> {
128- if (isTableStorageConfig (k ) && !isAlterableTableOption (k )) {
129- throw new InvalidAlterTableException (
130- "The option '" + k + "' is not supported to alter yet." );
131- }
132126
133- if (!currentConfig .getDataLakeFormat ().isPresent ()
134- && ConfigOptions .TABLE_DATALAKE_ENABLED .key ().equals (k )) {
135- throw new InvalidAlterTableException (
136- String .format (
137- "The option '%s' cannot be altered for tables that were"
138- + " created before the Fluss cluster enabled datalake." ,
139- ConfigOptions .TABLE_DATALAKE_ENABLED .key ()));
140- }
141- });
127+ List <String > unsupportedKeys =
128+ tableKeysToChange .stream ()
129+ .filter (k -> isTableStorageConfig (k ) && !isAlterableTableOption (k ))
130+ .collect (Collectors .toList ());
131+ if (!unsupportedKeys .isEmpty ()) {
132+ throw new InvalidAlterTableException (
133+ String .format (
134+ "The following options are not supported to alter yet: %s." ,
135+ String .join (", " , unsupportedKeys )));
136+ }
137+
138+ if (!currentConfig .getDataLakeFormat ().isPresent ()) {
139+ List <String > datalakeKeys =
140+ tableKeysToChange .stream ()
141+ .filter (k -> k .startsWith ("table.datalake." ))
142+ .collect (Collectors .toList ());
143+ if (!datalakeKeys .isEmpty ()) {
144+ throw new InvalidAlterTableException (
145+ String .format (
146+ "The following options cannot be altered for tables that were"
147+ + " created before the Fluss cluster enabled datalake: %s." ,
148+ String .join (", " , datalakeKeys )));
149+ }
150+ }
142151 }
143152
144153 private static void checkSystemColumns (RowType schema ) {
0 commit comments