@@ -488,50 +488,50 @@ private void processKvRecords(
488488 KvRecordReadContext .createReadContext (kvFormat , schemaGetter );
489489 ValueDecoder valueDecoder = new ValueDecoder (schemaGetter , kvFormat );
490490
491- for (KvRecord kvRecord : kvRecords .records (readContext )) {
492- byte [] keyBytes = BytesUtils .toArray (kvRecord .getKey ());
493- KvPreWriteBuffer .Key key = KvPreWriteBuffer .Key .of (keyBytes );
494- BinaryRow row = kvRecord .getRow ();
495- BinaryValue currentValue = row == null ? null : new BinaryValue (schemaIdOfNewData , row );
496-
497- // Align incoming row to latest schema if it was written with an older schema.
498- if (currentValue != null && schemaIdOfNewData != latestSchemaId ) {
499- currentValue = alignToLatestSchema (currentValue , latestSchemaId , latestSchema );
500- }
491+ try (SchemaAlignmentContext alignmentContext =
492+ new SchemaAlignmentContext (latestSchemaId , latestSchema , kvFormat )) {
493+ for (KvRecord kvRecord : kvRecords .records (readContext )) {
494+ byte [] keyBytes = BytesUtils .toArray (kvRecord .getKey ());
495+ KvPreWriteBuffer .Key key = KvPreWriteBuffer .Key .of (keyBytes );
496+ BinaryRow row = kvRecord .getRow ();
497+ BinaryValue currentValue =
498+ row == null ? null : new BinaryValue (schemaIdOfNewData , row );
499+
500+ if (currentValue != null && schemaIdOfNewData != latestSchemaId ) {
501+ currentValue = alignToLatestSchema (currentValue , alignmentContext );
502+ }
501503
502- if (currentValue == null ) {
503- logOffset =
504- processDeletion (
505- key ,
506- latestSchemaId ,
507- latestSchema ,
508- currentMerger ,
509- valueDecoder ,
510- walBuilder ,
511- latestSchemaRow ,
512- logOffset );
513- } else {
514- logOffset =
515- processUpsert (
516- key ,
517- currentValue ,
518- latestSchemaId ,
519- latestSchema ,
520- currentMerger ,
521- autoIncrementUpdater ,
522- valueDecoder ,
523- walBuilder ,
524- latestSchemaRow ,
525- logOffset );
504+ if (currentValue == null ) {
505+ logOffset =
506+ processDeletion (
507+ key ,
508+ currentMerger ,
509+ alignmentContext ,
510+ valueDecoder ,
511+ walBuilder ,
512+ latestSchemaRow ,
513+ logOffset );
514+ } else {
515+ logOffset =
516+ processUpsert (
517+ key ,
518+ currentValue ,
519+ currentMerger ,
520+ alignmentContext ,
521+ autoIncrementUpdater ,
522+ valueDecoder ,
523+ walBuilder ,
524+ latestSchemaRow ,
525+ logOffset );
526+ }
526527 }
527528 }
528529 }
529530
530531 private long processDeletion (
531532 KvPreWriteBuffer .Key key ,
532- short latestSchemaId ,
533- Schema latestSchema ,
534533 RowMerger currentMerger ,
534+ SchemaAlignmentContext alignmentContext ,
535535 ValueDecoder valueDecoder ,
536536 WalBuilder walBuilder ,
537537 PaddingRow latestSchemaRow ,
@@ -556,9 +556,8 @@ private long processDeletion(
556556 }
557557
558558 BinaryValue oldValue = valueDecoder .decodeValue (oldValueBytes );
559- // Align old KV row to latest schema if it was stored with an older schema.
560- if (oldValue .schemaId != latestSchemaId ) {
561- oldValue = alignToLatestSchema (oldValue , latestSchemaId , latestSchema );
559+ if (oldValue .schemaId != alignmentContext .latestSchemaId ) {
560+ oldValue = alignToLatestSchema (oldValue , alignmentContext );
562561 }
563562 BinaryValue newValue = currentMerger .delete (oldValue );
564563
@@ -573,9 +572,8 @@ private long processDeletion(
573572 private long processUpsert (
574573 KvPreWriteBuffer .Key key ,
575574 BinaryValue currentValue ,
576- short latestSchemaId ,
577- Schema latestSchema ,
578575 RowMerger currentMerger ,
576+ SchemaAlignmentContext alignmentContext ,
579577 AutoIncrementUpdater autoIncrementUpdater ,
580578 ValueDecoder valueDecoder ,
581579 WalBuilder walBuilder ,
@@ -604,9 +602,8 @@ private long processUpsert(
604602 }
605603
606604 BinaryValue oldValue = valueDecoder .decodeValue (oldValueBytes );
607- // Align old KV row to latest schema if it was stored with an older schema.
608- if (oldValue .schemaId != latestSchemaId ) {
609- oldValue = alignToLatestSchema (oldValue , latestSchemaId , latestSchema );
605+ if (oldValue .schemaId != alignmentContext .latestSchemaId ) {
606+ oldValue = alignToLatestSchema (oldValue , alignmentContext );
610607 }
611608 BinaryValue newValue = currentMerger .merge (oldValue , currentValue );
612609
@@ -664,40 +661,67 @@ private long applyUpdate(
664661 }
665662 }
666663
667- /**
668- * Converts a {@link BinaryValue} from its source schema layout to the latest schema layout
669- * using column IDs to map positions. New columns (present in latest but not in source) are
670- * filled with null. This only runs when schemas differ; the common case short-circuits.
671- */
672- private BinaryValue alignToLatestSchema (
673- BinaryValue value , short latestSchemaId , Schema latestSchema ) {
674- if (value .schemaId == latestSchemaId ) {
675- return value ;
664+ /** Batch-constant state for aligning rows to the latest schema. */
665+ private static class SchemaAlignmentContext implements AutoCloseable {
666+ final short latestSchemaId ;
667+ final List <Integer > targetColIds ;
668+ final RowEncoder encoder ;
669+ final Map <Short , SourceSchemaMapping > cache = new HashMap <>();
670+
671+ SchemaAlignmentContext (short latestSchemaId , Schema latestSchema , KvFormat kvFormat ) {
672+ this .latestSchemaId = latestSchemaId ;
673+ this .targetColIds = latestSchema .getColumnIds ();
674+ this .encoder = RowEncoder .create (kvFormat , latestSchema .getRowType ());
676675 }
677676
678- Schema sourceSchema = schemaGetter .getSchema (value .schemaId );
679- List <Integer > sourceColIds = sourceSchema .getColumnIds ();
680- List <Integer > targetColIds = latestSchema .getColumnIds ();
677+ @ Override
678+ public void close () throws Exception {
679+ encoder .close ();
680+ }
681681
682- Map <Integer , Integer > sourceIdToPos = new HashMap <>();
683- for (int i = 0 ; i < sourceColIds .size (); i ++) {
684- sourceIdToPos .put (sourceColIds .get (i ), i );
682+ /** Cached field getters and column-id→position map for a single source schema. */
683+ private static class SourceSchemaMapping {
684+ final Map <Integer , Integer > idToPos ;
685+ final InternalRow .FieldGetter [] getters ;
686+
687+ SourceSchemaMapping (Schema sourceSchema ) {
688+ List <Integer > sourceColIds = sourceSchema .getColumnIds ();
689+ this .idToPos = new HashMap <>();
690+ for (int i = 0 ; i < sourceColIds .size (); i ++) {
691+ idToPos .put (sourceColIds .get (i ), i );
692+ }
693+ this .getters = InternalRow .createFieldGetters (sourceSchema .getRowType ());
694+ }
685695 }
696+ }
686697
687- InternalRow .FieldGetter [] sourceGetters =
688- InternalRow .createFieldGetters (sourceSchema .getRowType ());
689- RowEncoder encoder = RowEncoder .create (kvFormat , latestSchema .getRowType ());
690- encoder .startNewRow ();
691- for (int targetPos = 0 ; targetPos < targetColIds .size (); targetPos ++) {
692- Integer sourcePos = sourceIdToPos .get (targetColIds .get (targetPos ));
698+ /**
699+ * Converts a {@link BinaryValue} from its source schema layout to the latest schema layout
700+ * using column IDs to map positions. New columns (present in latest but not in source) are
701+ * filled with null. Only call when {@code value.schemaId != latestSchemaId}.
702+ */
703+ private BinaryValue alignToLatestSchema (BinaryValue value , SchemaAlignmentContext ctx ) {
704+ SchemaAlignmentContext .SourceSchemaMapping mapping =
705+ ctx .cache .computeIfAbsent (
706+ value .schemaId ,
707+ id ->
708+ new SchemaAlignmentContext .SourceSchemaMapping (
709+ schemaGetter .getSchema (id )));
710+
711+ ctx .encoder .startNewRow ();
712+ for (int targetPos = 0 ; targetPos < ctx .targetColIds .size (); targetPos ++) {
713+ Integer sourcePos = mapping .idToPos .get (ctx .targetColIds .get (targetPos ));
693714 if (sourcePos == null ) {
694715 // Column added after the source schema — fill with null.
695- encoder .encodeField (targetPos , null );
716+ ctx . encoder .encodeField (targetPos , null );
696717 } else {
697- encoder .encodeField (targetPos , sourceGetters [sourcePos ].getFieldOrNull (value .row ));
718+ ctx .encoder .encodeField (
719+ targetPos , mapping .getters [sourcePos ].getFieldOrNull (value .row ));
698720 }
699721 }
700- return new BinaryValue (latestSchemaId , encoder .finishRow ());
722+ // copy() is required: the encoder reuses its internal buffer, so the next
723+ // startNewRow() would overwrite the row returned here.
724+ return new BinaryValue (ctx .latestSchemaId , ctx .encoder .finishRow ().copy ());
701725 }
702726
703727 /**
0 commit comments