-
Notifications
You must be signed in to change notification settings - Fork 523
[BugFix] Fix ArrayIndexOutOfBoundsException in partial updates after ADD COLUMN #2594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,9 +42,11 @@ | |
| import org.apache.fluss.record.KvRecordBatch; | ||
| import org.apache.fluss.record.KvRecordReadContext; | ||
| import org.apache.fluss.row.BinaryRow; | ||
| import org.apache.fluss.row.InternalRow; | ||
| import org.apache.fluss.row.PaddingRow; | ||
| import org.apache.fluss.row.arrow.ArrowWriterPool; | ||
| import org.apache.fluss.row.arrow.ArrowWriterProvider; | ||
| import org.apache.fluss.row.encode.RowEncoder; | ||
| import org.apache.fluss.row.encode.ValueDecoder; | ||
| import org.apache.fluss.rpc.protocol.MergeMode; | ||
| import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; | ||
|
|
@@ -86,6 +88,7 @@ | |
| import java.io.File; | ||
| import java.io.IOException; | ||
| import java.util.Collection; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.Executor; | ||
|
|
@@ -374,23 +377,31 @@ public LogAppendInfo putAsLeader( | |
| short latestSchemaId = (short) schemaInfo.getSchemaId(); | ||
| validateSchemaId(kvRecords.schemaId(), latestSchemaId); | ||
|
|
||
| // Convert target column positions from client's schema to latest | ||
| // schema positions using column IDs if schemas differ. | ||
| int[] resolvedTargetColumns = targetColumns; | ||
| if (targetColumns != null && kvRecords.schemaId() != latestSchemaId) { | ||
| Schema clientSchema = schemaGetter.getSchema(kvRecords.schemaId()); | ||
| resolvedTargetColumns = | ||
| convertTargetColumns(targetColumns, clientSchema, latestSchema); | ||
| } | ||
|
|
||
| AutoIncrementUpdater currentAutoIncrementUpdater = | ||
| autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId); | ||
|
|
||
| // Validate targetColumns doesn't contain auto-increment column | ||
| currentAutoIncrementUpdater.validateTargetColumns(targetColumns); | ||
| currentAutoIncrementUpdater.validateTargetColumns(resolvedTargetColumns); | ||
|
|
||
| // Determine the row merger based on mergeMode: | ||
| // - DEFAULT: Use the configured merge engine (rowMerger) | ||
| // - OVERWRITE: Bypass merge engine, use pre-created overwriteRowMerger | ||
| // to directly replace values (for undo recovery scenarios) | ||
| // We only support ADD COLUMN, so targetColumns is fine to be used directly. | ||
| RowMerger currentMerger = | ||
| (mergeMode == MergeMode.OVERWRITE) | ||
| ? overwriteRowMerger.configureTargetColumns( | ||
| targetColumns, latestSchemaId, latestSchema) | ||
| resolvedTargetColumns, latestSchemaId, latestSchema) | ||
| : rowMerger.configureTargetColumns( | ||
| targetColumns, latestSchemaId, latestSchema); | ||
| resolvedTargetColumns, latestSchemaId, latestSchema); | ||
|
|
||
| RowType latestRowType = latestSchema.getRowType(); | ||
| WalBuilder walBuilder = createWalBuilder(latestSchemaId, latestRowType); | ||
|
|
@@ -406,6 +417,8 @@ public LogAppendInfo putAsLeader( | |
| processKvRecords( | ||
| kvRecords, | ||
| kvRecords.schemaId(), | ||
| latestSchemaId, | ||
| latestSchema, | ||
| currentMerger, | ||
| currentAutoIncrementUpdater, | ||
| walBuilder, | ||
|
|
@@ -460,6 +473,8 @@ private void validateSchemaId(short schemaIdOfNewData, short latestSchemaId) { | |
| private void processKvRecords( | ||
| KvRecordBatch kvRecords, | ||
| short schemaIdOfNewData, | ||
| short latestSchemaId, | ||
| Schema latestSchema, | ||
| RowMerger currentMerger, | ||
| AutoIncrementUpdater autoIncrementUpdater, | ||
| WalBuilder walBuilder, | ||
|
|
@@ -473,39 +488,50 @@ private void processKvRecords( | |
| KvRecordReadContext.createReadContext(kvFormat, schemaGetter); | ||
| ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, kvFormat); | ||
|
|
||
| for (KvRecord kvRecord : kvRecords.records(readContext)) { | ||
| byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); | ||
| KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); | ||
| BinaryRow row = kvRecord.getRow(); | ||
| BinaryValue currentValue = row == null ? null : new BinaryValue(schemaIdOfNewData, row); | ||
| try (SchemaAlignmentContext alignmentContext = | ||
| new SchemaAlignmentContext(latestSchemaId, latestSchema, kvFormat)) { | ||
| for (KvRecord kvRecord : kvRecords.records(readContext)) { | ||
| byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); | ||
| KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); | ||
| BinaryRow row = kvRecord.getRow(); | ||
| BinaryValue currentValue = | ||
| row == null ? null : new BinaryValue(schemaIdOfNewData, row); | ||
|
|
||
| if (currentValue != null && schemaIdOfNewData != latestSchemaId) { | ||
| currentValue = alignToLatestSchema(currentValue, alignmentContext); | ||
| } | ||
|
|
||
| if (currentValue == null) { | ||
| logOffset = | ||
| processDeletion( | ||
| key, | ||
| currentMerger, | ||
| valueDecoder, | ||
| walBuilder, | ||
| latestSchemaRow, | ||
| logOffset); | ||
| } else { | ||
| logOffset = | ||
| processUpsert( | ||
| key, | ||
| currentValue, | ||
| currentMerger, | ||
| autoIncrementUpdater, | ||
| valueDecoder, | ||
| walBuilder, | ||
| latestSchemaRow, | ||
| logOffset); | ||
| if (currentValue == null) { | ||
| logOffset = | ||
| processDeletion( | ||
| key, | ||
| currentMerger, | ||
| alignmentContext, | ||
| valueDecoder, | ||
| walBuilder, | ||
| latestSchemaRow, | ||
| logOffset); | ||
| } else { | ||
| logOffset = | ||
| processUpsert( | ||
| key, | ||
| currentValue, | ||
| currentMerger, | ||
| alignmentContext, | ||
| autoIncrementUpdater, | ||
| valueDecoder, | ||
| walBuilder, | ||
| latestSchemaRow, | ||
| logOffset); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private long processDeletion( | ||
| KvPreWriteBuffer.Key key, | ||
| RowMerger currentMerger, | ||
| SchemaAlignmentContext alignmentContext, | ||
| ValueDecoder valueDecoder, | ||
| WalBuilder walBuilder, | ||
| PaddingRow latestSchemaRow, | ||
|
|
@@ -530,6 +556,9 @@ private long processDeletion( | |
| } | ||
|
|
||
| BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes); | ||
| if (oldValue.schemaId != alignmentContext.latestSchemaId) { | ||
| oldValue = alignToLatestSchema(oldValue, alignmentContext); | ||
| } | ||
| BinaryValue newValue = currentMerger.delete(oldValue); | ||
|
|
||
| // if newValue is null, it means the row should be deleted | ||
|
|
@@ -544,6 +573,7 @@ private long processUpsert( | |
| KvPreWriteBuffer.Key key, | ||
| BinaryValue currentValue, | ||
| RowMerger currentMerger, | ||
| SchemaAlignmentContext alignmentContext, | ||
| AutoIncrementUpdater autoIncrementUpdater, | ||
| ValueDecoder valueDecoder, | ||
| WalBuilder walBuilder, | ||
|
|
@@ -572,6 +602,9 @@ private long processUpsert( | |
| } | ||
|
|
||
| BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes); | ||
| if (oldValue.schemaId != alignmentContext.latestSchemaId) { | ||
| oldValue = alignToLatestSchema(oldValue, alignmentContext); | ||
| } | ||
| BinaryValue newValue = currentMerger.merge(oldValue, currentValue); | ||
|
|
||
| if (newValue == oldValue) { | ||
|
|
@@ -628,6 +661,102 @@ private long applyUpdate( | |
| } | ||
| } | ||
|
|
||
| /** Batch-constant state for aligning rows to the latest schema. */ | ||
| private static class SchemaAlignmentContext implements AutoCloseable { | ||
| final short latestSchemaId; | ||
| final List<Integer> targetColIds; | ||
| final RowEncoder encoder; | ||
| final Map<Short, SourceSchemaMapping> cache = new HashMap<>(); | ||
|
|
||
| SchemaAlignmentContext(short latestSchemaId, Schema latestSchema, KvFormat kvFormat) { | ||
| this.latestSchemaId = latestSchemaId; | ||
| this.targetColIds = latestSchema.getColumnIds(); | ||
| this.encoder = RowEncoder.create(kvFormat, latestSchema.getRowType()); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws Exception { | ||
| encoder.close(); | ||
| } | ||
|
|
||
| /** Cached field getters and column-id→position map for a single source schema. */ | ||
| private static class SourceSchemaMapping { | ||
| final Map<Integer, Integer> idToPos; | ||
| final InternalRow.FieldGetter[] getters; | ||
|
|
||
| SourceSchemaMapping(Schema sourceSchema) { | ||
| List<Integer> sourceColIds = sourceSchema.getColumnIds(); | ||
| this.idToPos = new HashMap<>(); | ||
| for (int i = 0; i < sourceColIds.size(); i++) { | ||
| idToPos.put(sourceColIds.get(i), i); | ||
| } | ||
| this.getters = InternalRow.createFieldGetters(sourceSchema.getRowType()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Converts a {@link BinaryValue} from its source schema layout to the latest schema layout | ||
| * using column IDs to map positions. New columns (present in latest but not in source) are | ||
| * filled with null. Only call when {@code value.schemaId != latestSchemaId}. | ||
| */ | ||
| private BinaryValue alignToLatestSchema(BinaryValue value, SchemaAlignmentContext ctx) { | ||
| SchemaAlignmentContext.SourceSchemaMapping mapping = | ||
| ctx.cache.computeIfAbsent( | ||
| value.schemaId, | ||
| id -> | ||
| new SchemaAlignmentContext.SourceSchemaMapping( | ||
| schemaGetter.getSchema(id))); | ||
|
|
||
| ctx.encoder.startNewRow(); | ||
| for (int targetPos = 0; targetPos < ctx.targetColIds.size(); targetPos++) { | ||
| Integer sourcePos = mapping.idToPos.get(ctx.targetColIds.get(targetPos)); | ||
| if (sourcePos == null) { | ||
| // Column added after the source schema — fill with null. | ||
| ctx.encoder.encodeField(targetPos, null); | ||
| } else { | ||
| ctx.encoder.encodeField( | ||
| targetPos, mapping.getters[sourcePos].getFieldOrNull(value.row)); | ||
| } | ||
| } | ||
| // copy() is required: the encoder reuses its internal buffer, so the next | ||
| // startNewRow() would overwrite the row returned here. | ||
| return new BinaryValue(ctx.latestSchemaId, ctx.encoder.finishRow().copy()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After schema evolution, every |
||
| } | ||
|
|
||
| /** | ||
| * Converts target column positions from the client's (source) schema to the latest (target) | ||
| * schema using column IDs. This is needed when a client sends a partial update using an older | ||
| * schema whose positions need to be remapped to the latest schema layout. | ||
| */ | ||
| @VisibleForTesting | ||
| static int[] convertTargetColumns(int[] positions, Schema sourceSchema, Schema targetSchema) { | ||
| List<Integer> sourceColIds = sourceSchema.getColumnIds(); | ||
| List<Integer> targetColIds = targetSchema.getColumnIds(); | ||
|
|
||
| Map<Integer, Integer> targetIdToPos = new HashMap<>(); | ||
| for (int i = 0; i < targetColIds.size(); i++) { | ||
| targetIdToPos.put(targetColIds.get(i), i); | ||
| } | ||
|
|
||
| int resultCount = 0; | ||
| int[] buffer = new int[positions.length]; | ||
| for (int position : positions) { | ||
| int colId = sourceColIds.get(position); | ||
| Integer targetPos = targetIdToPos.get(colId); | ||
| if (targetPos != null) { | ||
| buffer[resultCount++] = targetPos; | ||
| } | ||
| // Column was dropped in the latest schema — skip it. | ||
| } | ||
| if (resultCount == positions.length) { | ||
| return buffer; | ||
| } | ||
| int[] result = new int[resultCount]; | ||
| System.arraycopy(buffer, 0, result, 0, resultCount); | ||
| return result; | ||
| } | ||
|
|
||
| private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception { | ||
| switch (logFormat) { | ||
| case INDEXED: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SchemaAlignmentContextis created unconditionally on everyprocessKvRecordscall, even whenschemaIdOfNewData == latestSchemaId(the common case — no schema evolution). This allocates aRowEncoder+HashMapon the hot path for every batch. Which will introduce performance regression.