Skip to content

Commit 42aa7bb

Browse files
committed
fix: address review round 3 for migrate-executor (#561)
- Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
1 parent c693635 commit 42aa7bb

1 file changed

Lines changed: 35 additions & 8 deletions

File tree

redisvl/migration/executor.py

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -464,14 +464,29 @@ def apply(
464464
plan.source.index_name,
465465
)
466466
elif existing_checkpoint.status == "completed":
467-
# Quantization completed before the crash -- still need
468-
# to resume from post-drop state (index recreation).
469-
resuming_from_checkpoint = True
470-
logger.info(
471-
"Checkpoint at %s is already completed; resuming "
472-
"index recreation from post-drop state",
473-
checkpoint_path,
467+
# Quantization completed previously. Only resume if
468+
# the source index is actually gone (post-drop crash).
469+
# If the source still exists, this is a fresh run and
470+
# the stale checkpoint should be ignored.
471+
source_still_exists = current_source_matches_snapshot(
472+
plan.source.index_name,
473+
plan.source.schema_snapshot,
474+
redis_url=redis_url,
475+
redis_client=redis_client,
474476
)
477+
if source_still_exists:
478+
logger.info(
479+
"Checkpoint at %s is completed and source index "
480+
"still exists; treating as fresh run",
481+
checkpoint_path,
482+
)
483+
else:
484+
resuming_from_checkpoint = True
485+
logger.info(
486+
"Checkpoint at %s is already completed; resuming "
487+
"index recreation from post-drop state",
488+
checkpoint_path,
489+
)
475490
else:
476491
resuming_from_checkpoint = True
477492
logger.info(
@@ -742,9 +757,21 @@ def _notify(step: str, detail: Optional[str] = None) -> None:
742757
for k in keys_to_process
743758
]
744759
keys_to_process = normalize_keys(keys_to_process)
760+
# Remap datatype_changes keys from source to target field
761+
# names when field renames exist, since quantization runs
762+
# after field renames (step 2).
763+
effective_changes = datatype_changes
764+
if has_field_renames and not resuming_from_checkpoint:
765+
field_rename_map = {
766+
fr.old_name: fr.new_name for fr in rename_ops.rename_fields
767+
}
768+
effective_changes = {
769+
field_rename_map.get(k, k): v
770+
for k, v in datatype_changes.items()
771+
}
745772
docs_quantized = self._quantize_vectors(
746773
source_index,
747-
datatype_changes,
774+
effective_changes,
748775
keys_to_process,
749776
progress_callback=lambda done, total: _notify(
750777
"quantize", f"{done:,}/{total:,} docs"

0 commit comments

Comments
 (0)