@@ -488,50 +488,51 @@ private void processKvRecords(
488488 KvRecordReadContext .createReadContext (kvFormat , schemaGetter );
489489 ValueDecoder valueDecoder = new ValueDecoder (schemaGetter , kvFormat );
490490
491- for (KvRecord kvRecord : kvRecords .records (readContext )) {
492- byte [] keyBytes = BytesUtils .toArray (kvRecord .getKey ());
493- KvPreWriteBuffer .Key key = KvPreWriteBuffer .Key .of (keyBytes );
494- BinaryRow row = kvRecord .getRow ();
495- BinaryValue currentValue = row == null ? null : new BinaryValue (schemaIdOfNewData , row );
496-
497- // Align incoming row to latest schema if it was written with an older schema.
498- if (currentValue != null && schemaIdOfNewData != latestSchemaId ) {
499- currentValue = alignToLatestSchema (currentValue , latestSchemaId , latestSchema );
500- }
491+ try (SchemaAlignmentContext alignmentContext =
492+ new SchemaAlignmentContext (latestSchemaId , latestSchema , kvFormat )) {
493+ for (KvRecord kvRecord : kvRecords .records (readContext )) {
494+ byte [] keyBytes = BytesUtils .toArray (kvRecord .getKey ());
495+ KvPreWriteBuffer .Key key = KvPreWriteBuffer .Key .of (keyBytes );
496+ BinaryRow row = kvRecord .getRow ();
497+ BinaryValue currentValue =
498+ row == null ? null : new BinaryValue (schemaIdOfNewData , row );
499+
500+ // Align incoming row to latest schema if it was written with an older schema.
501+ if (currentValue != null && schemaIdOfNewData != latestSchemaId ) {
502+ currentValue = alignToLatestSchema (currentValue , alignmentContext );
503+ }
501504
502- if (currentValue == null ) {
503- logOffset =
504- processDeletion (
505- key ,
506- latestSchemaId ,
507- latestSchema ,
508- currentMerger ,
509- valueDecoder ,
510- walBuilder ,
511- latestSchemaRow ,
512- logOffset );
513- } else {
514- logOffset =
515- processUpsert (
516- key ,
517- currentValue ,
518- latestSchemaId ,
519- latestSchema ,
520- currentMerger ,
521- autoIncrementUpdater ,
522- valueDecoder ,
523- walBuilder ,
524- latestSchemaRow ,
525- logOffset );
505+ if (currentValue == null ) {
506+ logOffset =
507+ processDeletion (
508+ key ,
509+ currentMerger ,
510+ alignmentContext ,
511+ valueDecoder ,
512+ walBuilder ,
513+ latestSchemaRow ,
514+ logOffset );
515+ } else {
516+ logOffset =
517+ processUpsert (
518+ key ,
519+ currentValue ,
520+ currentMerger ,
521+ alignmentContext ,
522+ autoIncrementUpdater ,
523+ valueDecoder ,
524+ walBuilder ,
525+ latestSchemaRow ,
526+ logOffset );
527+ }
526528 }
527529 }
528530 }
529531
530532 private long processDeletion (
531533 KvPreWriteBuffer .Key key ,
532- short latestSchemaId ,
533- Schema latestSchema ,
534534 RowMerger currentMerger ,
535+ SchemaAlignmentContext alignmentContext ,
535536 ValueDecoder valueDecoder ,
536537 WalBuilder walBuilder ,
537538 PaddingRow latestSchemaRow ,
@@ -557,8 +558,8 @@ private long processDeletion(
557558
558559 BinaryValue oldValue = valueDecoder .decodeValue (oldValueBytes );
559560 // Align old KV row to latest schema if it was stored with an older schema.
560- if (oldValue .schemaId != latestSchemaId ) {
561- oldValue = alignToLatestSchema (oldValue , latestSchemaId , latestSchema );
561+ if (oldValue .schemaId != alignmentContext . latestSchemaId ) {
562+ oldValue = alignToLatestSchema (oldValue , alignmentContext );
562563 }
563564 BinaryValue newValue = currentMerger .delete (oldValue );
564565
@@ -573,9 +574,8 @@ private long processDeletion(
573574 private long processUpsert (
574575 KvPreWriteBuffer .Key key ,
575576 BinaryValue currentValue ,
576- short latestSchemaId ,
577- Schema latestSchema ,
578577 RowMerger currentMerger ,
578+ SchemaAlignmentContext alignmentContext ,
579579 AutoIncrementUpdater autoIncrementUpdater ,
580580 ValueDecoder valueDecoder ,
581581 WalBuilder walBuilder ,
@@ -605,8 +605,8 @@ private long processUpsert(
605605
606606 BinaryValue oldValue = valueDecoder .decodeValue (oldValueBytes );
607607 // Align old KV row to latest schema if it was stored with an older schema.
608- if (oldValue .schemaId != latestSchemaId ) {
609- oldValue = alignToLatestSchema (oldValue , latestSchemaId , latestSchema );
608+ if (oldValue .schemaId != alignmentContext . latestSchemaId ) {
609+ oldValue = alignToLatestSchema (oldValue , alignmentContext );
610610 }
611611 BinaryValue newValue = currentMerger .merge (oldValue , currentValue );
612612
@@ -664,40 +664,67 @@ private long applyUpdate(
664664 }
665665 }
666666
667- /**
668- * Converts a {@link BinaryValue} from its source schema layout to the latest schema layout
669- * using column IDs to map positions. New columns (present in latest but not in source) are
670- * filled with null. This only runs when schemas differ; the common case short-circuits.
671- */
672- private BinaryValue alignToLatestSchema (
673- BinaryValue value , short latestSchemaId , Schema latestSchema ) {
674- if (value .schemaId == latestSchemaId ) {
675- return value ;
667+ /** Batch-constant state for aligning rows to the latest schema. */
668+ private static class SchemaAlignmentContext implements AutoCloseable {
669+ final short latestSchemaId ;
670+ final List <Integer > targetColIds ;
671+ final RowEncoder encoder ;
672+ final Map <Short , SourceSchemaMapping > cache = new HashMap <>();
673+
674+ SchemaAlignmentContext (short latestSchemaId , Schema latestSchema , KvFormat kvFormat ) {
675+ this .latestSchemaId = latestSchemaId ;
676+ this .targetColIds = latestSchema .getColumnIds ();
677+ this .encoder = RowEncoder .create (kvFormat , latestSchema .getRowType ());
676678 }
677679
678- Schema sourceSchema = schemaGetter .getSchema (value .schemaId );
679- List <Integer > sourceColIds = sourceSchema .getColumnIds ();
680- List <Integer > targetColIds = latestSchema .getColumnIds ();
680+ @ Override
681+ public void close () throws Exception {
682+ encoder .close ();
683+ }
681684
682- Map <Integer , Integer > sourceIdToPos = new HashMap <>();
683- for (int i = 0 ; i < sourceColIds .size (); i ++) {
684- sourceIdToPos .put (sourceColIds .get (i ), i );
685+ /** Cached field getters and column-id→position map for a single source schema. */
686+ private static class SourceSchemaMapping {
687+ final Map <Integer , Integer > idToPos ;
688+ final InternalRow .FieldGetter [] getters ;
689+
690+ SourceSchemaMapping (Schema sourceSchema ) {
691+ List <Integer > sourceColIds = sourceSchema .getColumnIds ();
692+ this .idToPos = new HashMap <>();
693+ for (int i = 0 ; i < sourceColIds .size (); i ++) {
694+ idToPos .put (sourceColIds .get (i ), i );
695+ }
696+ this .getters = InternalRow .createFieldGetters (sourceSchema .getRowType ());
697+ }
685698 }
699+ }
686700
687- InternalRow .FieldGetter [] sourceGetters =
688- InternalRow .createFieldGetters (sourceSchema .getRowType ());
689- RowEncoder encoder = RowEncoder .create (kvFormat , latestSchema .getRowType ());
690- encoder .startNewRow ();
691- for (int targetPos = 0 ; targetPos < targetColIds .size (); targetPos ++) {
692- Integer sourcePos = sourceIdToPos .get (targetColIds .get (targetPos ));
701+ /**
702+ * Converts a {@link BinaryValue} from its source schema layout to the latest schema layout
703+ * using column IDs to map positions. New columns (present in latest but not in source) are
704+ * filled with null. Only call when {@code value.schemaId != latestSchemaId}.
705+ */
706+ private BinaryValue alignToLatestSchema (BinaryValue value , SchemaAlignmentContext ctx ) {
707+ SchemaAlignmentContext .SourceSchemaMapping mapping =
708+ ctx .cache .computeIfAbsent (
709+ value .schemaId ,
710+ id ->
711+ new SchemaAlignmentContext .SourceSchemaMapping (
712+ schemaGetter .getSchema (id )));
713+
714+ ctx .encoder .startNewRow ();
715+ for (int targetPos = 0 ; targetPos < ctx .targetColIds .size (); targetPos ++) {
716+ Integer sourcePos = mapping .idToPos .get (ctx .targetColIds .get (targetPos ));
693717 if (sourcePos == null ) {
694718 // Column added after the source schema — fill with null.
695- encoder .encodeField (targetPos , null );
719+ ctx . encoder .encodeField (targetPos , null );
696720 } else {
697- encoder .encodeField (targetPos , sourceGetters [sourcePos ].getFieldOrNull (value .row ));
721+ ctx .encoder .encodeField (
722+ targetPos , mapping .getters [sourcePos ].getFieldOrNull (value .row ));
698723 }
699724 }
700- return new BinaryValue (latestSchemaId , encoder .finishRow ());
725+ // copy() is required: the encoder reuses its internal buffer, so the next
726+ // startNewRow() would overwrite the row returned here.
727+ return new BinaryValue (ctx .latestSchemaId , ctx .encoder .finishRow ().copy ());
701728 }
702729
703730 /**
0 commit comments