Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 158 additions & 29 deletions fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.KvRecordReadContext;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.PaddingRow;
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.row.arrow.ArrowWriterProvider;
import org.apache.fluss.row.encode.RowEncoder;
import org.apache.fluss.row.encode.ValueDecoder;
import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
Expand Down Expand Up @@ -86,6 +88,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -374,23 +377,31 @@ public LogAppendInfo putAsLeader(
short latestSchemaId = (short) schemaInfo.getSchemaId();
validateSchemaId(kvRecords.schemaId(), latestSchemaId);

// Convert target column positions from client's schema to latest
// schema positions using column IDs if schemas differ.
int[] resolvedTargetColumns = targetColumns;
if (targetColumns != null && kvRecords.schemaId() != latestSchemaId) {
Schema clientSchema = schemaGetter.getSchema(kvRecords.schemaId());
resolvedTargetColumns =
convertTargetColumns(targetColumns, clientSchema, latestSchema);
}

AutoIncrementUpdater currentAutoIncrementUpdater =
autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId);

// Validate targetColumns doesn't contain auto-increment column
currentAutoIncrementUpdater.validateTargetColumns(targetColumns);
currentAutoIncrementUpdater.validateTargetColumns(resolvedTargetColumns);

// Determine the row merger based on mergeMode:
// - DEFAULT: Use the configured merge engine (rowMerger)
// - OVERWRITE: Bypass merge engine, use pre-created overwriteRowMerger
// to directly replace values (for undo recovery scenarios)
// We only support ADD COLUMN, so targetColumns is fine to be used directly.
RowMerger currentMerger =
(mergeMode == MergeMode.OVERWRITE)
? overwriteRowMerger.configureTargetColumns(
targetColumns, latestSchemaId, latestSchema)
resolvedTargetColumns, latestSchemaId, latestSchema)
: rowMerger.configureTargetColumns(
targetColumns, latestSchemaId, latestSchema);
resolvedTargetColumns, latestSchemaId, latestSchema);

RowType latestRowType = latestSchema.getRowType();
WalBuilder walBuilder = createWalBuilder(latestSchemaId, latestRowType);
Expand All @@ -406,6 +417,8 @@ public LogAppendInfo putAsLeader(
processKvRecords(
kvRecords,
kvRecords.schemaId(),
latestSchemaId,
latestSchema,
currentMerger,
currentAutoIncrementUpdater,
walBuilder,
Expand Down Expand Up @@ -460,6 +473,8 @@ private void validateSchemaId(short schemaIdOfNewData, short latestSchemaId) {
private void processKvRecords(
KvRecordBatch kvRecords,
short schemaIdOfNewData,
short latestSchemaId,
Schema latestSchema,
RowMerger currentMerger,
AutoIncrementUpdater autoIncrementUpdater,
WalBuilder walBuilder,
Expand All @@ -473,39 +488,50 @@ private void processKvRecords(
KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, kvFormat);

for (KvRecord kvRecord : kvRecords.records(readContext)) {
byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
BinaryRow row = kvRecord.getRow();
BinaryValue currentValue = row == null ? null : new BinaryValue(schemaIdOfNewData, row);
try (SchemaAlignmentContext alignmentContext =
new SchemaAlignmentContext(latestSchemaId, latestSchema, kvFormat)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchemaAlignmentContext is created unconditionally on every processKvRecords call, even when schemaIdOfNewData == latestSchemaId (the common case — no schema evolution). This allocates a RowEncoder + HashMap on the hot path for every batch. Which will introduce performance regression.

for (KvRecord kvRecord : kvRecords.records(readContext)) {
byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
BinaryRow row = kvRecord.getRow();
BinaryValue currentValue =
row == null ? null : new BinaryValue(schemaIdOfNewData, row);

if (currentValue != null && schemaIdOfNewData != latestSchemaId) {
currentValue = alignToLatestSchema(currentValue, alignmentContext);
}

if (currentValue == null) {
logOffset =
processDeletion(
key,
currentMerger,
valueDecoder,
walBuilder,
latestSchemaRow,
logOffset);
} else {
logOffset =
processUpsert(
key,
currentValue,
currentMerger,
autoIncrementUpdater,
valueDecoder,
walBuilder,
latestSchemaRow,
logOffset);
if (currentValue == null) {
logOffset =
processDeletion(
key,
currentMerger,
alignmentContext,
valueDecoder,
walBuilder,
latestSchemaRow,
logOffset);
} else {
logOffset =
processUpsert(
key,
currentValue,
currentMerger,
alignmentContext,
autoIncrementUpdater,
valueDecoder,
walBuilder,
latestSchemaRow,
logOffset);
}
}
}
}

private long processDeletion(
KvPreWriteBuffer.Key key,
RowMerger currentMerger,
SchemaAlignmentContext alignmentContext,
ValueDecoder valueDecoder,
WalBuilder walBuilder,
PaddingRow latestSchemaRow,
Expand All @@ -530,6 +556,9 @@ private long processDeletion(
}

BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
if (oldValue.schemaId != alignmentContext.latestSchemaId) {
oldValue = alignToLatestSchema(oldValue, alignmentContext);
}
BinaryValue newValue = currentMerger.delete(oldValue);

// if newValue is null, it means the row should be deleted
Expand All @@ -544,6 +573,7 @@ private long processUpsert(
KvPreWriteBuffer.Key key,
BinaryValue currentValue,
RowMerger currentMerger,
SchemaAlignmentContext alignmentContext,
AutoIncrementUpdater autoIncrementUpdater,
ValueDecoder valueDecoder,
WalBuilder walBuilder,
Expand Down Expand Up @@ -572,6 +602,9 @@ private long processUpsert(
}

BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
if (oldValue.schemaId != alignmentContext.latestSchemaId) {
oldValue = alignToLatestSchema(oldValue, alignmentContext);
}
BinaryValue newValue = currentMerger.merge(oldValue, currentValue);

if (newValue == oldValue) {
Expand Down Expand Up @@ -628,6 +661,102 @@ private long applyUpdate(
}
}

/** Batch-constant state for aligning rows to the latest schema. */
private static class SchemaAlignmentContext implements AutoCloseable {
final short latestSchemaId;
final List<Integer> targetColIds;
final RowEncoder encoder;
final Map<Short, SourceSchemaMapping> cache = new HashMap<>();

SchemaAlignmentContext(short latestSchemaId, Schema latestSchema, KvFormat kvFormat) {
this.latestSchemaId = latestSchemaId;
this.targetColIds = latestSchema.getColumnIds();
this.encoder = RowEncoder.create(kvFormat, latestSchema.getRowType());
}

@Override
public void close() throws Exception {
encoder.close();
}

/** Cached field getters and column-id→position map for a single source schema. */
private static class SourceSchemaMapping {
final Map<Integer, Integer> idToPos;
final InternalRow.FieldGetter[] getters;

SourceSchemaMapping(Schema sourceSchema) {
List<Integer> sourceColIds = sourceSchema.getColumnIds();
this.idToPos = new HashMap<>();
for (int i = 0; i < sourceColIds.size(); i++) {
idToPos.put(sourceColIds.get(i), i);
}
this.getters = InternalRow.createFieldGetters(sourceSchema.getRowType());
}
}
}

/**
* Converts a {@link BinaryValue} from its source schema layout to the latest schema layout
* using column IDs to map positions. New columns (present in latest but not in source) are
* filled with null. Only call when {@code value.schemaId != latestSchemaId}.
*/
private BinaryValue alignToLatestSchema(BinaryValue value, SchemaAlignmentContext ctx) {
SchemaAlignmentContext.SourceSchemaMapping mapping =
ctx.cache.computeIfAbsent(
value.schemaId,
id ->
new SchemaAlignmentContext.SourceSchemaMapping(
schemaGetter.getSchema(id)));

ctx.encoder.startNewRow();
for (int targetPos = 0; targetPos < ctx.targetColIds.size(); targetPos++) {
Integer sourcePos = mapping.idToPos.get(ctx.targetColIds.get(targetPos));
if (sourcePos == null) {
// Column added after the source schema — fill with null.
ctx.encoder.encodeField(targetPos, null);
} else {
ctx.encoder.encodeField(
targetPos, mapping.getters[sourcePos].getFieldOrNull(value.row));
}
}
// copy() is required: the encoder reuses its internal buffer, so the next
// startNewRow() would overwrite the row returned here.
return new BinaryValue(ctx.latestSchemaId, ctx.encoder.finishRow().copy());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After schema evolution, every oldValue read from KV store with an older schema goes through alignToLatestSchema, which does a full field-by-field re-encode + copy(). The previous approach used PaddingRow as a zero-copy virtual view. This introduces a significant per-record overhead during the (potentially long) transition period when old-schema data still dominates the KV store.

}

/**
* Converts target column positions from the client's (source) schema to the latest (target)
* schema using column IDs. This is needed when a client sends a partial update using an older
* schema whose positions need to be remapped to the latest schema layout.
*/
@VisibleForTesting
static int[] convertTargetColumns(int[] positions, Schema sourceSchema, Schema targetSchema) {
List<Integer> sourceColIds = sourceSchema.getColumnIds();
List<Integer> targetColIds = targetSchema.getColumnIds();

Map<Integer, Integer> targetIdToPos = new HashMap<>();
for (int i = 0; i < targetColIds.size(); i++) {
targetIdToPos.put(targetColIds.get(i), i);
}

int resultCount = 0;
int[] buffer = new int[positions.length];
for (int position : positions) {
int colId = sourceColIds.get(position);
Integer targetPos = targetIdToPos.get(colId);
if (targetPos != null) {
buffer[resultCount++] = targetPos;
}
// Column was dropped in the latest schema — skip it.
}
if (resultCount == positions.length) {
return buffer;
}
int[] result = new int[resultCount];
System.arraycopy(buffer, 0, result, 0, resultCount);
return result;
}

private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception {
switch (logFormat) {
case INDEXED:
Expand Down
Loading