Skip to content

Commit b7876af

Browse files
author
gonabavi
committed
[FLINK-38830][runtime] Handle duplicate AddColumnEvent gracefully to prevent pipeline crashes
1 parent 2f59f35 commit b7876af

5 files changed

Lines changed: 527 additions & 0 deletions

File tree

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,16 @@ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent eve
120120

121121
private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) {
122122
LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
123+
Set<String> existingColumnNames =
124+
columns.stream()
125+
.map(Column::getName)
126+
.collect(Collectors.toCollection(HashSet::new));
123127
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
128+
// Skip columns that already exist in the schema to handle duplicate AddColumnEvents
129+
// (e.g., from gh-ost online schema migrations)
130+
if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
131+
continue;
132+
}
124133
switch (columnWithPosition.getPosition()) {
125134
case FIRST:
126135
{
@@ -165,10 +174,34 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
165174
break;
166175
}
167176
}
177+
existingColumnNames.add(columnWithPosition.getAddColumn().getName());
168178
}
169179
return oldSchema.copy(columns);
170180
}
171181

182+
/**
183+
* Filters out redundant columns from an {@link AddColumnEvent} that already exist in the
184+
* current schema. Returns {@link Optional#empty()} if all columns are redundant.
185+
*
186+
* <p>This handles cases like gh-ost online schema migrations where duplicate ADD COLUMN events
187+
* may be emitted for the same column.
188+
*/
189+
public static Optional<AddColumnEvent> filterRedundantAddColumns(
190+
Schema currentSchema, AddColumnEvent event) {
191+
Set<String> existingColumns = new HashSet<>(currentSchema.getColumnNames());
192+
List<AddColumnEvent.ColumnWithPosition> nonRedundant =
193+
event.getAddedColumns().stream()
194+
.filter(cwp -> !existingColumns.contains(cwp.getAddColumn().getName()))
195+
.collect(Collectors.toList());
196+
if (nonRedundant.isEmpty()) {
197+
return Optional.empty();
198+
}
199+
if (nonRedundant.size() == event.getAddedColumns().size()) {
200+
return Optional.of(event); // No filtering needed
201+
}
202+
return Optional.of(new AddColumnEvent(event.tableId(), nonRedundant));
203+
}
204+
172205
private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) {
173206
List<Column> columns =
174207
oldSchema.getColumns().stream()

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232

3333
import java.util.ArrayList;
3434
import java.util.Arrays;
35+
import java.util.Collections;
3536
import java.util.HashMap;
3637
import java.util.List;
3738
import java.util.Map;
39+
import java.util.Optional;
3840

3941
/** A test for the {@link org.apache.flink.cdc.common.utils.SchemaUtils}. */
4042
class SchemaUtilsTest {
@@ -484,4 +486,212 @@ void testInferWiderSchema() {
484486
.build()))
485487
.isExactlyInstanceOf(IllegalStateException.class);
486488
}
489+
490+
// ========================== Tests for duplicate AddColumnEvent handling
491+
// ==========================
492+
493+
@Test
494+
void testFilterRedundantAddColumns_allDuplicates() {
495+
TableId tableId = TableId.parse("default.default.table1");
496+
Schema schema =
497+
Schema.newBuilder()
498+
.physicalColumn("id", DataTypes.INT())
499+
.physicalColumn("name", DataTypes.STRING())
500+
.physicalColumn("age", DataTypes.INT())
501+
.build();
502+
503+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
504+
addedColumns.add(
505+
new AddColumnEvent.ColumnWithPosition(
506+
Column.physicalColumn("name", DataTypes.STRING())));
507+
addedColumns.add(
508+
new AddColumnEvent.ColumnWithPosition(
509+
Column.physicalColumn("age", DataTypes.INT())));
510+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
511+
512+
Optional<AddColumnEvent> result =
513+
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
514+
Assertions.assertThat(result).isEmpty();
515+
}
516+
517+
@Test
518+
void testFilterRedundantAddColumns_noDuplicates() {
519+
TableId tableId = TableId.parse("default.default.table1");
520+
Schema schema =
521+
Schema.newBuilder()
522+
.physicalColumn("id", DataTypes.INT())
523+
.physicalColumn("name", DataTypes.STRING())
524+
.build();
525+
526+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
527+
addedColumns.add(
528+
new AddColumnEvent.ColumnWithPosition(
529+
Column.physicalColumn("age", DataTypes.INT())));
530+
addedColumns.add(
531+
new AddColumnEvent.ColumnWithPosition(
532+
Column.physicalColumn("email", DataTypes.STRING())));
533+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
534+
535+
Optional<AddColumnEvent> result =
536+
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
537+
Assertions.assertThat(result).isPresent();
538+
Assertions.assertThat(result.get().getAddedColumns()).hasSize(2);
539+
Assertions.assertThat(result.get()).isEqualTo(addColumnEvent);
540+
}
541+
542+
@Test
543+
void testFilterRedundantAddColumns_partialDuplicates() {
544+
TableId tableId = TableId.parse("default.default.table1");
545+
Schema schema =
546+
Schema.newBuilder()
547+
.physicalColumn("id", DataTypes.INT())
548+
.physicalColumn("name", DataTypes.STRING())
549+
.build();
550+
551+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
552+
addedColumns.add(
553+
new AddColumnEvent.ColumnWithPosition(
554+
Column.physicalColumn("name", DataTypes.STRING())));
555+
addedColumns.add(
556+
new AddColumnEvent.ColumnWithPosition(
557+
Column.physicalColumn("age", DataTypes.INT())));
558+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
559+
560+
Optional<AddColumnEvent> result =
561+
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
562+
Assertions.assertThat(result).isPresent();
563+
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);
564+
Assertions.assertThat(result.get().getAddedColumns().get(0).getAddColumn().getName())
565+
.isEqualTo("age");
566+
}
567+
568+
@Test
569+
void testFilterRedundantAddColumns_emptyAddedColumns() {
570+
TableId tableId = TableId.parse("default.default.table1");
571+
Schema schema =
572+
Schema.newBuilder()
573+
.physicalColumn("id", DataTypes.INT())
574+
.physicalColumn("name", DataTypes.STRING())
575+
.build();
576+
577+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, Collections.emptyList());
578+
579+
Optional<AddColumnEvent> result =
580+
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
581+
Assertions.assertThat(result).isEmpty();
582+
}
583+
584+
@Test
585+
void testApplyAddColumnEvent_idempotent() {
586+
TableId tableId = TableId.parse("default.default.table1");
587+
Schema schema =
588+
Schema.newBuilder()
589+
.physicalColumn("id", DataTypes.INT())
590+
.physicalColumn("name", DataTypes.STRING())
591+
.build();
592+
593+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
594+
addedColumns.add(
595+
new AddColumnEvent.ColumnWithPosition(
596+
Column.physicalColumn("name", DataTypes.STRING())));
597+
addedColumns.add(
598+
new AddColumnEvent.ColumnWithPosition(
599+
Column.physicalColumn("age", DataTypes.INT())));
600+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
601+
602+
Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
603+
Assertions.assertThat(result)
604+
.isEqualTo(
605+
Schema.newBuilder()
606+
.physicalColumn("id", DataTypes.INT())
607+
.physicalColumn("name", DataTypes.STRING())
608+
.physicalColumn("age", DataTypes.INT())
609+
.build());
610+
}
611+
612+
@Test
613+
void testApplyAddColumnEvent_allDuplicates() {
614+
TableId tableId = TableId.parse("default.default.table1");
615+
Schema schema =
616+
Schema.newBuilder()
617+
.physicalColumn("id", DataTypes.INT())
618+
.physicalColumn("name", DataTypes.STRING())
619+
.physicalColumn("age", DataTypes.INT())
620+
.build();
621+
622+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
623+
addedColumns.add(
624+
new AddColumnEvent.ColumnWithPosition(
625+
Column.physicalColumn("name", DataTypes.STRING())));
626+
addedColumns.add(
627+
new AddColumnEvent.ColumnWithPosition(
628+
Column.physicalColumn("age", DataTypes.INT())));
629+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
630+
631+
Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
632+
Assertions.assertThat(result)
633+
.isEqualTo(
634+
Schema.newBuilder()
635+
.physicalColumn("id", DataTypes.INT())
636+
.physicalColumn("name", DataTypes.STRING())
637+
.physicalColumn("age", DataTypes.INT())
638+
.build());
639+
}
640+
641+
@Test
642+
void testFilterRedundantAddColumns_withPositions() {
643+
TableId tableId = TableId.parse("default.default.table1");
644+
Schema schema =
645+
Schema.newBuilder()
646+
.physicalColumn("id", DataTypes.INT())
647+
.physicalColumn("name", DataTypes.STRING())
648+
.build();
649+
650+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
651+
addedColumns.add(
652+
new AddColumnEvent.ColumnWithPosition(
653+
Column.physicalColumn("name", DataTypes.STRING()),
654+
AddColumnEvent.ColumnPosition.AFTER,
655+
"id"));
656+
addedColumns.add(
657+
new AddColumnEvent.ColumnWithPosition(
658+
Column.physicalColumn("age", DataTypes.INT()),
659+
AddColumnEvent.ColumnPosition.AFTER,
660+
"name"));
661+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
662+
663+
Optional<AddColumnEvent> result =
664+
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
665+
Assertions.assertThat(result).isPresent();
666+
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);
667+
668+
AddColumnEvent.ColumnWithPosition remaining = result.get().getAddedColumns().get(0);
669+
Assertions.assertThat(remaining.getAddColumn().getName()).isEqualTo("age");
670+
Assertions.assertThat(remaining.getPosition())
671+
.isEqualTo(AddColumnEvent.ColumnPosition.AFTER);
672+
Assertions.assertThat(remaining.getExistedColumnName()).isEqualTo("name");
673+
}
674+
675+
@Test
676+
void testApplyAddColumnEvent_duplicateWithinSameEvent() {
677+
TableId tableId = TableId.parse("default.default.table1");
678+
Schema schema = Schema.newBuilder().physicalColumn("id", DataTypes.INT()).build();
679+
680+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
681+
addedColumns.add(
682+
new AddColumnEvent.ColumnWithPosition(
683+
Column.physicalColumn("name", DataTypes.STRING())));
684+
addedColumns.add(
685+
new AddColumnEvent.ColumnWithPosition(
686+
Column.physicalColumn("name", DataTypes.STRING())));
687+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
688+
689+
Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
690+
Assertions.assertThat(result)
691+
.isEqualTo(
692+
Schema.newBuilder()
693+
.physicalColumn("id", DataTypes.INT())
694+
.physicalColumn("name", DataTypes.STRING())
695+
.build());
696+
}
487697
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.converter.JavaObjectConverter;
2424
import org.apache.flink.cdc.common.data.RecordData;
2525
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
26+
import org.apache.flink.cdc.common.event.AddColumnEvent;
2627
import org.apache.flink.cdc.common.event.ChangeEvent;
2728
import org.apache.flink.cdc.common.event.CreateTableEvent;
2829
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -46,6 +47,9 @@
4647
import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
4748
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
4849

50+
import org.slf4j.Logger;
51+
import org.slf4j.LoggerFactory;
52+
4953
import javax.annotation.Nullable;
5054

5155
import java.io.Serializable;
@@ -68,6 +72,7 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
6872
implements OneInputStreamOperator<Event, Event>, Serializable {
6973

7074
private static final long serialVersionUID = 1L;
75+
private static final Logger LOG = LoggerFactory.getLogger(PostTransformOperator.class);
7176

7277
private final String timezone;
7378
private final List<TransformRule> transformRules;
@@ -240,6 +245,23 @@ private Optional<Event> processSchemaChangeEvent(
240245
TableId tableId = event.tableId();
241246
PostTransformChangeInfo info = checkNotNull(postTransformInfoMap.get(tableId));
242247

248+
// Filter out redundant AddColumnEvent columns that already exist in the schema
249+
// to handle duplicate events from tools like gh-ost online schema migrations
250+
if (event instanceof AddColumnEvent) {
251+
AddColumnEvent addColumnEvent = (AddColumnEvent) event;
252+
Schema currentSchema = info.getPreTransformedSchema();
253+
Optional<AddColumnEvent> filtered =
254+
SchemaUtils.filterRedundantAddColumns(currentSchema, addColumnEvent);
255+
if (!filtered.isPresent()) {
256+
LOG.info(
257+
"Skipping fully redundant AddColumnEvent for table {} "
258+
+ "- all columns already exist",
259+
tableId);
260+
return Optional.empty();
261+
}
262+
event = filtered.get();
263+
}
264+
243265
// Apply schema change event to the pre-transformed schema
244266
Schema prevPreSchema = info.getPreTransformedSchema();
245267
Schema nextPreSchema = SchemaUtils.applySchemaChangeEvent(prevPreSchema, event);

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.java.tuple.Tuple2;
2121
import org.apache.flink.api.java.tuple.Tuple3;
2222
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
23+
import org.apache.flink.cdc.common.event.AddColumnEvent;
2324
import org.apache.flink.cdc.common.event.ChangeEvent;
2425
import org.apache.flink.cdc.common.event.CreateTableEvent;
2526
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -45,6 +46,9 @@
4546
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
4647
import org.apache.flink.streaming.runtime.tasks.StreamTask;
4748

49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
51+
4852
import javax.annotation.Nullable;
4953

5054
import java.io.Serializable;
@@ -64,6 +68,7 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
6468
implements OneInputStreamOperator<Event, Event>, Serializable {
6569

6670
private static final long serialVersionUID = 1L;
71+
private static final Logger LOG = LoggerFactory.getLogger(PreTransformOperator.class);
6772

6873
private final List<TransformRule> transformRules;
6974
private final Map<TableId, PreTransformChangeInfo> preTransformChangeInfoMap;
@@ -215,6 +220,24 @@ private CreateTableEvent cacheCreateTable(CreateTableEvent event) {
215220
private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) {
216221
TableId tableId = event.tableId();
217222
PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId);
223+
224+
// Filter out redundant AddColumnEvent columns that already exist in the schema
225+
// to handle duplicate events from tools like gh-ost online schema migrations
226+
if (event instanceof AddColumnEvent) {
227+
AddColumnEvent addColumnEvent = (AddColumnEvent) event;
228+
Schema currentSchema = tableChangeInfo.getSourceSchema();
229+
Optional<AddColumnEvent> filtered =
230+
SchemaUtils.filterRedundantAddColumns(currentSchema, addColumnEvent);
231+
if (!filtered.isPresent()) {
232+
LOG.info(
233+
"Skipping fully redundant AddColumnEvent for table {} "
234+
+ "- all columns already exist",
235+
tableId);
236+
return Optional.empty();
237+
}
238+
event = filtered.get();
239+
}
240+
218241
Schema originalSchema =
219242
SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
220243
Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema();

0 commit comments

Comments
 (0)