4242import org .apache .fluss .record .KvRecordBatch ;
4343import org .apache .fluss .record .KvRecordReadContext ;
4444import org .apache .fluss .row .BinaryRow ;
45+ import org .apache .fluss .row .InternalRow ;
4546import org .apache .fluss .row .PaddingRow ;
4647import org .apache .fluss .row .arrow .ArrowWriterPool ;
4748import org .apache .fluss .row .arrow .ArrowWriterProvider ;
49+ import org .apache .fluss .row .encode .RowEncoder ;
4850import org .apache .fluss .row .encode .ValueDecoder ;
4951import org .apache .fluss .rpc .protocol .MergeMode ;
5052import org .apache .fluss .server .kv .autoinc .AutoIncIDRange ;
8688import java .io .File ;
8789import java .io .IOException ;
8890import java .util .Collection ;
91+ import java .util .HashMap ;
8992import java .util .List ;
9093import java .util .Map ;
9194import java .util .concurrent .Executor ;
@@ -374,23 +377,31 @@ public LogAppendInfo putAsLeader(
374377 short latestSchemaId = (short ) schemaInfo .getSchemaId ();
375378 validateSchemaId (kvRecords .schemaId (), latestSchemaId );
376379
380+ // Convert target column positions from client's schema to latest
381+ // schema positions using column IDs if schemas differ.
382+ int [] resolvedTargetColumns = targetColumns ;
383+ if (targetColumns != null && kvRecords .schemaId () != latestSchemaId ) {
384+ Schema clientSchema = schemaGetter .getSchema (kvRecords .schemaId ());
385+ resolvedTargetColumns =
386+ convertTargetColumns (targetColumns , clientSchema , latestSchema );
387+ }
388+
377389 AutoIncrementUpdater currentAutoIncrementUpdater =
378390 autoIncrementManager .getUpdaterForSchema (kvFormat , latestSchemaId );
379391
380392 // Validate targetColumns doesn't contain auto-increment column
381- currentAutoIncrementUpdater .validateTargetColumns (targetColumns );
393+ currentAutoIncrementUpdater .validateTargetColumns (resolvedTargetColumns );
382394
383395 // Determine the row merger based on mergeMode:
384396 // - DEFAULT: Use the configured merge engine (rowMerger)
385397 // - OVERWRITE: Bypass merge engine, use pre-created overwriteRowMerger
386398 // to directly replace values (for undo recovery scenarios)
387- // We only support ADD COLUMN, so targetColumns is fine to be used directly.
388399 RowMerger currentMerger =
389400 (mergeMode == MergeMode .OVERWRITE )
390401 ? overwriteRowMerger .configureTargetColumns (
391- targetColumns , latestSchemaId , latestSchema )
402+ resolvedTargetColumns , latestSchemaId , latestSchema )
392403 : rowMerger .configureTargetColumns (
393- targetColumns , latestSchemaId , latestSchema );
404+ resolvedTargetColumns , latestSchemaId , latestSchema );
394405
395406 RowType latestRowType = latestSchema .getRowType ();
396407 WalBuilder walBuilder = createWalBuilder (latestSchemaId , latestRowType );
@@ -406,6 +417,8 @@ public LogAppendInfo putAsLeader(
406417 processKvRecords (
407418 kvRecords ,
408419 kvRecords .schemaId (),
420+ latestSchemaId ,
421+ latestSchema ,
409422 currentMerger ,
410423 currentAutoIncrementUpdater ,
411424 walBuilder ,
@@ -460,6 +473,8 @@ private void validateSchemaId(short schemaIdOfNewData, short latestSchemaId) {
460473 private void processKvRecords (
461474 KvRecordBatch kvRecords ,
462475 short schemaIdOfNewData ,
476+ short latestSchemaId ,
477+ Schema latestSchema ,
463478 RowMerger currentMerger ,
464479 AutoIncrementUpdater autoIncrementUpdater ,
465480 WalBuilder walBuilder ,
@@ -473,39 +488,50 @@ private void processKvRecords(
473488 KvRecordReadContext .createReadContext (kvFormat , schemaGetter );
474489 ValueDecoder valueDecoder = new ValueDecoder (schemaGetter , kvFormat );
475490
476- for (KvRecord kvRecord : kvRecords .records (readContext )) {
477- byte [] keyBytes = BytesUtils .toArray (kvRecord .getKey ());
478- KvPreWriteBuffer .Key key = KvPreWriteBuffer .Key .of (keyBytes );
479- BinaryRow row = kvRecord .getRow ();
480- BinaryValue currentValue = row == null ? null : new BinaryValue (schemaIdOfNewData , row );
491+ try (SchemaAlignmentContext alignmentContext =
492+ new SchemaAlignmentContext (latestSchemaId , latestSchema , kvFormat )) {
493+ for (KvRecord kvRecord : kvRecords .records (readContext )) {
494+ byte [] keyBytes = BytesUtils .toArray (kvRecord .getKey ());
495+ KvPreWriteBuffer .Key key = KvPreWriteBuffer .Key .of (keyBytes );
496+ BinaryRow row = kvRecord .getRow ();
497+ BinaryValue currentValue =
498+ row == null ? null : new BinaryValue (schemaIdOfNewData , row );
499+
500+ if (currentValue != null && schemaIdOfNewData != latestSchemaId ) {
501+ currentValue = alignToLatestSchema (currentValue , alignmentContext );
502+ }
481503
482- if (currentValue == null ) {
483- logOffset =
484- processDeletion (
485- key ,
486- currentMerger ,
487- valueDecoder ,
488- walBuilder ,
489- latestSchemaRow ,
490- logOffset );
491- } else {
492- logOffset =
493- processUpsert (
494- key ,
495- currentValue ,
496- currentMerger ,
497- autoIncrementUpdater ,
498- valueDecoder ,
499- walBuilder ,
500- latestSchemaRow ,
501- logOffset );
504+ if (currentValue == null ) {
505+ logOffset =
506+ processDeletion (
507+ key ,
508+ currentMerger ,
509+ alignmentContext ,
510+ valueDecoder ,
511+ walBuilder ,
512+ latestSchemaRow ,
513+ logOffset );
514+ } else {
515+ logOffset =
516+ processUpsert (
517+ key ,
518+ currentValue ,
519+ currentMerger ,
520+ alignmentContext ,
521+ autoIncrementUpdater ,
522+ valueDecoder ,
523+ walBuilder ,
524+ latestSchemaRow ,
525+ logOffset );
526+ }
502527 }
503528 }
504529 }
505530
506531 private long processDeletion (
507532 KvPreWriteBuffer .Key key ,
508533 RowMerger currentMerger ,
534+ SchemaAlignmentContext alignmentContext ,
509535 ValueDecoder valueDecoder ,
510536 WalBuilder walBuilder ,
511537 PaddingRow latestSchemaRow ,
@@ -530,6 +556,9 @@ private long processDeletion(
530556 }
531557
532558 BinaryValue oldValue = valueDecoder .decodeValue (oldValueBytes );
559+ if (oldValue .schemaId != alignmentContext .latestSchemaId ) {
560+ oldValue = alignToLatestSchema (oldValue , alignmentContext );
561+ }
533562 BinaryValue newValue = currentMerger .delete (oldValue );
534563
535564 // if newValue is null, it means the row should be deleted
@@ -544,6 +573,7 @@ private long processUpsert(
544573 KvPreWriteBuffer .Key key ,
545574 BinaryValue currentValue ,
546575 RowMerger currentMerger ,
576+ SchemaAlignmentContext alignmentContext ,
547577 AutoIncrementUpdater autoIncrementUpdater ,
548578 ValueDecoder valueDecoder ,
549579 WalBuilder walBuilder ,
@@ -572,6 +602,9 @@ private long processUpsert(
572602 }
573603
574604 BinaryValue oldValue = valueDecoder .decodeValue (oldValueBytes );
605+ if (oldValue .schemaId != alignmentContext .latestSchemaId ) {
606+ oldValue = alignToLatestSchema (oldValue , alignmentContext );
607+ }
575608 BinaryValue newValue = currentMerger .merge (oldValue , currentValue );
576609
577610 if (newValue == oldValue ) {
@@ -628,6 +661,102 @@ private long applyUpdate(
628661 }
629662 }
630663
664+ /** Batch-constant state for aligning rows to the latest schema. */
665+ private static class SchemaAlignmentContext implements AutoCloseable {
666+ final short latestSchemaId ;
667+ final List <Integer > targetColIds ;
668+ final RowEncoder encoder ;
669+ final Map <Short , SourceSchemaMapping > cache = new HashMap <>();
670+
671+ SchemaAlignmentContext (short latestSchemaId , Schema latestSchema , KvFormat kvFormat ) {
672+ this .latestSchemaId = latestSchemaId ;
673+ this .targetColIds = latestSchema .getColumnIds ();
674+ this .encoder = RowEncoder .create (kvFormat , latestSchema .getRowType ());
675+ }
676+
677+ @ Override
678+ public void close () throws Exception {
679+ encoder .close ();
680+ }
681+
682+ /** Cached field getters and column-id→position map for a single source schema. */
683+ private static class SourceSchemaMapping {
684+ final Map <Integer , Integer > idToPos ;
685+ final InternalRow .FieldGetter [] getters ;
686+
687+ SourceSchemaMapping (Schema sourceSchema ) {
688+ List <Integer > sourceColIds = sourceSchema .getColumnIds ();
689+ this .idToPos = new HashMap <>();
690+ for (int i = 0 ; i < sourceColIds .size (); i ++) {
691+ idToPos .put (sourceColIds .get (i ), i );
692+ }
693+ this .getters = InternalRow .createFieldGetters (sourceSchema .getRowType ());
694+ }
695+ }
696+ }
697+
698+ /**
699+ * Converts a {@link BinaryValue} from its source schema layout to the latest schema layout
700+ * using column IDs to map positions. New columns (present in latest but not in source) are
701+ * filled with null. Only call when {@code value.schemaId != latestSchemaId}.
702+ */
703+ private BinaryValue alignToLatestSchema (BinaryValue value , SchemaAlignmentContext ctx ) {
704+ SchemaAlignmentContext .SourceSchemaMapping mapping =
705+ ctx .cache .computeIfAbsent (
706+ value .schemaId ,
707+ id ->
708+ new SchemaAlignmentContext .SourceSchemaMapping (
709+ schemaGetter .getSchema (id )));
710+
711+ ctx .encoder .startNewRow ();
712+ for (int targetPos = 0 ; targetPos < ctx .targetColIds .size (); targetPos ++) {
713+ Integer sourcePos = mapping .idToPos .get (ctx .targetColIds .get (targetPos ));
714+ if (sourcePos == null ) {
715+ // Column added after the source schema — fill with null.
716+ ctx .encoder .encodeField (targetPos , null );
717+ } else {
718+ ctx .encoder .encodeField (
719+ targetPos , mapping .getters [sourcePos ].getFieldOrNull (value .row ));
720+ }
721+ }
722+ // copy() is required: the encoder reuses its internal buffer, so the next
723+ // startNewRow() would overwrite the row returned here.
724+ return new BinaryValue (ctx .latestSchemaId , ctx .encoder .finishRow ().copy ());
725+ }
726+
727+ /**
728+ * Converts target column positions from the client's (source) schema to the latest (target)
729+ * schema using column IDs. This is needed when a client sends a partial update using an older
730+ * schema whose positions need to be remapped to the latest schema layout.
731+ */
732+ @ VisibleForTesting
733+ static int [] convertTargetColumns (int [] positions , Schema sourceSchema , Schema targetSchema ) {
734+ List <Integer > sourceColIds = sourceSchema .getColumnIds ();
735+ List <Integer > targetColIds = targetSchema .getColumnIds ();
736+
737+ Map <Integer , Integer > targetIdToPos = new HashMap <>();
738+ for (int i = 0 ; i < targetColIds .size (); i ++) {
739+ targetIdToPos .put (targetColIds .get (i ), i );
740+ }
741+
742+ int resultCount = 0 ;
743+ int [] buffer = new int [positions .length ];
744+ for (int position : positions ) {
745+ int colId = sourceColIds .get (position );
746+ Integer targetPos = targetIdToPos .get (colId );
747+ if (targetPos != null ) {
748+ buffer [resultCount ++] = targetPos ;
749+ }
750+ // Column was dropped in the latest schema — skip it.
751+ }
752+ if (resultCount == positions .length ) {
753+ return buffer ;
754+ }
755+ int [] result = new int [resultCount ];
756+ System .arraycopy (buffer , 0 , result , 0 , resultCount );
757+ return result ;
758+ }
759+
631760 private WalBuilder createWalBuilder (int schemaId , RowType rowType ) throws Exception {
632761 switch (logFormat ) {
633762 case INDEXED :
0 commit comments