Skip to content

Commit 8962d4f

Browse files
VinaySagarGonabavigonabavi
authored andcommitted
[FLINK-38830][runtime] Handle duplicate AddColumnEvent gracefully to prevent pipeline crashes
1 parent 2f59f35 commit 8962d4f

5 files changed

Lines changed: 555 additions & 0 deletions

File tree

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,16 @@ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent eve
120120

121121
private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) {
122122
LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
123+
Set<String> existingColumnNames =
124+
columns.stream()
125+
.map(Column::getName)
126+
.collect(Collectors.toCollection(HashSet::new));
123127
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
128+
// Skip columns that already exist in the schema to handle duplicate AddColumnEvents
129+
// (e.g., from gh-ost online schema migrations)
130+
if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
131+
continue;
132+
}
124133
switch (columnWithPosition.getPosition()) {
125134
case FIRST:
126135
{
@@ -165,10 +174,43 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
165174
break;
166175
}
167176
}
177+
existingColumnNames.add(columnWithPosition.getAddColumn().getName());
168178
}
169179
return oldSchema.copy(columns);
170180
}
171181

182+
/**
183+
* Filters out redundant columns from an {@link AddColumnEvent} that already exist in the
184+
* current schema, and deduplicates columns within the same event. Returns {@link
185+
* Optional#empty()} if all columns are redundant.
186+
*
187+
* <p>This handles cases like gh-ost online schema migrations where duplicate ADD COLUMN events
188+
* may be emitted for the same column.
189+
*
190+
* <p><b>Note:</b> Duplicate detection is based on column name only. If a duplicate
191+
* AddColumnEvent arrives with a different type for an existing column name, it will be silently
192+
* skipped. This is the expected behavior for online schema migration tools (gh-ost, pt-osc)
193+
* where duplicate events are always exact copies.
194+
*/
195+
public static Optional<AddColumnEvent> filterRedundantAddColumns(
196+
Schema currentSchema, AddColumnEvent event) {
197+
Set<String> seenColumns = new HashSet<>(currentSchema.getColumnNames());
198+
List<AddColumnEvent.ColumnWithPosition> nonRedundant = new ArrayList<>();
199+
for (AddColumnEvent.ColumnWithPosition cwp : event.getAddedColumns()) {
200+
String colName = cwp.getAddColumn().getName();
201+
if (seenColumns.add(colName)) {
202+
nonRedundant.add(cwp);
203+
}
204+
}
205+
if (nonRedundant.isEmpty()) {
206+
return Optional.empty();
207+
}
208+
if (nonRedundant.size() == event.getAddedColumns().size()) {
209+
return Optional.of(event);
210+
}
211+
return Optional.of(new AddColumnEvent(event.tableId(), nonRedundant));
212+
}
213+
172214
private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) {
173215
List<Column> columns =
174216
oldSchema.getColumns().stream()

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232

3333
import java.util.ArrayList;
3434
import java.util.Arrays;
35+
import java.util.Collections;
3536
import java.util.HashMap;
3637
import java.util.List;
3738
import java.util.Map;
39+
import java.util.Optional;
3840

3941
/** A test for the {@link org.apache.flink.cdc.common.utils.SchemaUtils}. */
4042
class SchemaUtilsTest {
@@ -484,4 +486,231 @@ void testInferWiderSchema() {
484486
.build()))
485487
.isExactlyInstanceOf(IllegalStateException.class);
486488
}
489+
490+
// ========================== Tests for duplicate AddColumnEvent handling
491+
// ==========================
492+
493+
@Test
494+
void testFilterRedundantAddColumns_allDuplicates() {
495+
TableId tableId = TableId.parse("default.default.table1");
496+
Schema schema =
497+
Schema.newBuilder()
498+
.physicalColumn("id", DataTypes.INT())
499+
.physicalColumn("name", DataTypes.STRING())
500+
.physicalColumn("age", DataTypes.INT())
501+
.build();
502+
503+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
504+
addedColumns.add(
505+
new AddColumnEvent.ColumnWithPosition(
506+
Column.physicalColumn("name", DataTypes.STRING())));
507+
addedColumns.add(
508+
new AddColumnEvent.ColumnWithPosition(
509+
Column.physicalColumn("age", DataTypes.INT())));
510+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
511+
512+
Optional<AddColumnEvent> result =
513+
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
514+
Assertions.assertThat(result).isEmpty();
515+
}
516+
517+
@Test
518+
void testFilterRedundantAddColumns_noDuplicates() {
519+
TableId tableId = TableId.parse("default.default.table1");
520+
Schema schema =
521+
Schema.newBuilder()
522+
.physicalColumn("id", DataTypes.INT())
523+
.physicalColumn("name", DataTypes.STRING())
524+
.build();
525+
526+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
527+
addedColumns.add(
528+
new AddColumnEvent.ColumnWithPosition(
529+
Column.physicalColumn("age", DataTypes.INT())));
530+
addedColumns.add(
531+
new AddColumnEvent.ColumnWithPosition(
532+
Column.physicalColumn("email", DataTypes.STRING())));
533+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
534+
535+
Optional<AddColumnEvent> result =
536+
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
537+
Assertions.assertThat(result).isPresent();
538+
Assertions.assertThat(result.get().getAddedColumns()).hasSize(2);
539+
Assertions.assertThat(result.get()).isEqualTo(addColumnEvent);
540+
}
541+
542+
@Test
543+
void testFilterRedundantAddColumns_partialDuplicates() {
544+
TableId tableId = TableId.parse("default.default.table1");
545+
Schema schema =
546+
Schema.newBuilder()
547+
.physicalColumn("id", DataTypes.INT())
548+
.physicalColumn("name", DataTypes.STRING())
549+
.build();
550+
551+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
552+
addedColumns.add(
553+
new AddColumnEvent.ColumnWithPosition(
554+
Column.physicalColumn("name", DataTypes.STRING())));
555+
addedColumns.add(
556+
new AddColumnEvent.ColumnWithPosition(
557+
Column.physicalColumn("age", DataTypes.INT())));
558+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
559+
560+
Optional<AddColumnEvent> result =
561+
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
562+
Assertions.assertThat(result).isPresent();
563+
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);
564+
Assertions.assertThat(result.get().getAddedColumns().get(0).getAddColumn().getName())
565+
.isEqualTo("age");
566+
}
567+
568+
@Test
569+
void testFilterRedundantAddColumns_emptyAddedColumns() {
570+
TableId tableId = TableId.parse("default.default.table1");
571+
Schema schema =
572+
Schema.newBuilder()
573+
.physicalColumn("id", DataTypes.INT())
574+
.physicalColumn("name", DataTypes.STRING())
575+
.build();
576+
577+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, Collections.emptyList());
578+
579+
Optional<AddColumnEvent> result =
580+
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
581+
Assertions.assertThat(result).isEmpty();
582+
}
583+
584+
@Test
585+
void testApplyAddColumnEvent_idempotent() {
586+
TableId tableId = TableId.parse("default.default.table1");
587+
Schema schema =
588+
Schema.newBuilder()
589+
.physicalColumn("id", DataTypes.INT())
590+
.physicalColumn("name", DataTypes.STRING())
591+
.build();
592+
593+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
594+
addedColumns.add(
595+
new AddColumnEvent.ColumnWithPosition(
596+
Column.physicalColumn("name", DataTypes.STRING())));
597+
addedColumns.add(
598+
new AddColumnEvent.ColumnWithPosition(
599+
Column.physicalColumn("age", DataTypes.INT())));
600+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
601+
602+
Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
603+
Assertions.assertThat(result)
604+
.isEqualTo(
605+
Schema.newBuilder()
606+
.physicalColumn("id", DataTypes.INT())
607+
.physicalColumn("name", DataTypes.STRING())
608+
.physicalColumn("age", DataTypes.INT())
609+
.build());
610+
}
611+
612+
@Test
613+
void testApplyAddColumnEvent_allDuplicates() {
614+
TableId tableId = TableId.parse("default.default.table1");
615+
Schema schema =
616+
Schema.newBuilder()
617+
.physicalColumn("id", DataTypes.INT())
618+
.physicalColumn("name", DataTypes.STRING())
619+
.physicalColumn("age", DataTypes.INT())
620+
.build();
621+
622+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
623+
addedColumns.add(
624+
new AddColumnEvent.ColumnWithPosition(
625+
Column.physicalColumn("name", DataTypes.STRING())));
626+
addedColumns.add(
627+
new AddColumnEvent.ColumnWithPosition(
628+
Column.physicalColumn("age", DataTypes.INT())));
629+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
630+
631+
Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
632+
Assertions.assertThat(result)
633+
.isEqualTo(
634+
Schema.newBuilder()
635+
.physicalColumn("id", DataTypes.INT())
636+
.physicalColumn("name", DataTypes.STRING())
637+
.physicalColumn("age", DataTypes.INT())
638+
.build());
639+
}
640+
641+
@Test
642+
void testFilterRedundantAddColumns_withPositions() {
643+
TableId tableId = TableId.parse("default.default.table1");
644+
Schema schema =
645+
Schema.newBuilder()
646+
.physicalColumn("id", DataTypes.INT())
647+
.physicalColumn("name", DataTypes.STRING())
648+
.build();
649+
650+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
651+
addedColumns.add(
652+
new AddColumnEvent.ColumnWithPosition(
653+
Column.physicalColumn("name", DataTypes.STRING()),
654+
AddColumnEvent.ColumnPosition.AFTER,
655+
"id"));
656+
addedColumns.add(
657+
new AddColumnEvent.ColumnWithPosition(
658+
Column.physicalColumn("age", DataTypes.INT()),
659+
AddColumnEvent.ColumnPosition.AFTER,
660+
"name"));
661+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
662+
663+
Optional<AddColumnEvent> result =
664+
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
665+
Assertions.assertThat(result).isPresent();
666+
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);
667+
668+
AddColumnEvent.ColumnWithPosition remaining = result.get().getAddedColumns().get(0);
669+
Assertions.assertThat(remaining.getAddColumn().getName()).isEqualTo("age");
670+
Assertions.assertThat(remaining.getPosition())
671+
.isEqualTo(AddColumnEvent.ColumnPosition.AFTER);
672+
Assertions.assertThat(remaining.getExistedColumnName()).isEqualTo("name");
673+
}
674+
675+
@Test
676+
void testFilterRedundantAddColumns_intraEventDuplicates() {
677+
Schema schema = Schema.newBuilder().physicalColumn("id", DataTypes.INT()).build();
678+
TableId tableId = TableId.tableId("default", "schema", "table");
679+
AddColumnEvent event =
680+
new AddColumnEvent(
681+
tableId,
682+
Arrays.asList(
683+
new AddColumnEvent.ColumnWithPosition(
684+
Column.physicalColumn("name", DataTypes.STRING())),
685+
new AddColumnEvent.ColumnWithPosition(
686+
Column.physicalColumn("name", DataTypes.STRING()))));
687+
Optional<AddColumnEvent> result = SchemaUtils.filterRedundantAddColumns(schema, event);
688+
Assertions.assertThat(result).isPresent();
689+
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);
690+
Assertions.assertThat(result.get().getAddedColumns().get(0).getAddColumn().getName())
691+
.isEqualTo("name");
692+
}
693+
694+
@Test
695+
void testApplyAddColumnEvent_duplicateWithinSameEvent() {
696+
TableId tableId = TableId.parse("default.default.table1");
697+
Schema schema = Schema.newBuilder().physicalColumn("id", DataTypes.INT()).build();
698+
699+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
700+
addedColumns.add(
701+
new AddColumnEvent.ColumnWithPosition(
702+
Column.physicalColumn("name", DataTypes.STRING())));
703+
addedColumns.add(
704+
new AddColumnEvent.ColumnWithPosition(
705+
Column.physicalColumn("name", DataTypes.STRING())));
706+
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
707+
708+
Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
709+
Assertions.assertThat(result)
710+
.isEqualTo(
711+
Schema.newBuilder()
712+
.physicalColumn("id", DataTypes.INT())
713+
.physicalColumn("name", DataTypes.STRING())
714+
.build());
715+
}
487716
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.converter.JavaObjectConverter;
2424
import org.apache.flink.cdc.common.data.RecordData;
2525
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
26+
import org.apache.flink.cdc.common.event.AddColumnEvent;
2627
import org.apache.flink.cdc.common.event.ChangeEvent;
2728
import org.apache.flink.cdc.common.event.CreateTableEvent;
2829
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -46,6 +47,9 @@
4647
import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
4748
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
4849

50+
import org.slf4j.Logger;
51+
import org.slf4j.LoggerFactory;
52+
4953
import javax.annotation.Nullable;
5054

5155
import java.io.Serializable;
@@ -68,6 +72,7 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
6872
implements OneInputStreamOperator<Event, Event>, Serializable {
6973

7074
private static final long serialVersionUID = 1L;
75+
private static final Logger LOG = LoggerFactory.getLogger(PostTransformOperator.class);
7176

7277
private final String timezone;
7378
private final List<TransformRule> transformRules;
@@ -240,6 +245,23 @@ private Optional<Event> processSchemaChangeEvent(
240245
TableId tableId = event.tableId();
241246
PostTransformChangeInfo info = checkNotNull(postTransformInfoMap.get(tableId));
242247

248+
// Filter out redundant AddColumnEvent columns that already exist in the schema
249+
// to handle duplicate events from tools like gh-ost online schema migrations
250+
if (event instanceof AddColumnEvent) {
251+
AddColumnEvent addColumnEvent = (AddColumnEvent) event;
252+
Schema currentSchema = info.getPreTransformedSchema();
253+
Optional<AddColumnEvent> filtered =
254+
SchemaUtils.filterRedundantAddColumns(currentSchema, addColumnEvent);
255+
if (!filtered.isPresent()) {
256+
LOG.debug(
257+
"Skipping fully redundant AddColumnEvent for table {} "
258+
+ "- all columns already exist",
259+
tableId);
260+
return Optional.empty();
261+
}
262+
event = filtered.get();
263+
}
264+
243265
// Apply schema change event to the pre-transformed schema
244266
Schema prevPreSchema = info.getPreTransformedSchema();
245267
Schema nextPreSchema = SchemaUtils.applySchemaChangeEvent(prevPreSchema, event);

0 commit comments

Comments
 (0)