Skip to content

Commit 8fc8be7

Browse files
committed
fix: executor bug fixes and improvements (#561)
- Fix partial key renames: fail fast on collision instead of continuing - Fix completed checkpoint: resume from post-drop state for index recreation - Pass rename_operations to get_vector_datatype_changes for renamed vectors - Add warning when SCAN falls back to '*' (full keyspace scan) - Warn when field rename overwrites existing destination field - Fix has_prefix_change falsy check for empty string prefixes - Add logger to migration utils
1 parent 7cd35a6 commit 8fc8be7

2 files changed

Lines changed: 48 additions & 13 deletions

File tree

redisvl/migration/executor.py

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -271,16 +271,20 @@ def _rename_keys(
271271
logger.warning(f"Error in rename batch: {e}")
272272
raise
273273

274+
# Fail fast on collisions to avoid partial renames across batches.
275+
# Keys already renamed in THIS batch are not rolled back -- caller
276+
# can inspect the error to understand which keys moved.
277+
if collisions:
278+
raise RuntimeError(
279+
f"Prefix rename aborted after {renamed} successful rename(s): "
280+
f"{len(collisions)} destination key(s) already exist "
281+
f"(first 5: {collisions[:5]}). This would overwrite existing data. "
282+
f"Remove conflicting keys or choose a different prefix."
283+
)
284+
274285
if progress_callback:
275286
progress_callback(min(i + pipeline_size, total), total)
276287

277-
if collisions:
278-
raise RuntimeError(
279-
f"Prefix rename aborted: {len(collisions)} destination key(s) already exist "
280-
f"(first 5: {collisions[:5]}). This would overwrite existing data. "
281-
f"Remove conflicting keys or choose a different prefix."
282-
)
283-
284288
return renamed
285289

286290
def _rename_field_in_hash(
@@ -305,17 +309,29 @@ def _rename_field_in_hash(
305309
for i in range(0, total, pipeline_size):
306310
batch = keys[i : i + pipeline_size]
307311

308-
# First, get all old field values
312+
# First, get old field values AND check if destination exists
309313
pipe = client.pipeline(transaction=False)
310314
for key in batch:
311315
pipe.hget(key, old_name)
312-
values = pipe.execute()
316+
pipe.hexists(key, new_name)
317+
raw_results = pipe.execute()
318+
# Interleaved: [hget_0, hexists_0, hget_1, hexists_1, ...]
319+
values = raw_results[0::2]
320+
dest_exists = raw_results[1::2]
313321

314322
# Now set new field and delete old
315323
pipe = client.pipeline(transaction=False)
316324
batch_ops = 0
317-
for key, value in zip(batch, values):
325+
for key, value, exists in zip(batch, values, dest_exists):
318326
if value is not None:
327+
if exists:
328+
logger.warning(
329+
"Field '%s' already exists in key '%s'; "
330+
"overwriting with value from '%s'",
331+
new_name,
332+
key,
333+
old_name,
334+
)
319335
pipe.hset(key, new_name, value)
320336
pipe.hdel(key, old_name)
321337
batch_ops += 1
@@ -448,8 +464,12 @@ def apply(
448464
plan.source.index_name,
449465
)
450466
elif existing_checkpoint.status == "completed":
467+
# Quantization completed before the crash -- still need
468+
# to resume from post-drop state (index recreation).
469+
resuming_from_checkpoint = True
451470
logger.info(
452-
"Checkpoint at %s is already completed, ignoring",
471+
"Checkpoint at %s is already completed; resuming "
472+
"index recreation from post-drop state",
453473
checkpoint_path,
454474
)
455475
else:
@@ -510,12 +530,14 @@ def apply(
510530

511531
# Check if we need to re-encode vectors for datatype changes
512532
datatype_changes = MigrationPlanner.get_vector_datatype_changes(
513-
plan.source.schema_snapshot, plan.merged_target_schema
533+
plan.source.schema_snapshot,
534+
plan.merged_target_schema,
535+
rename_operations=plan.rename_operations,
514536
)
515537

516538
# Check for rename operations
517539
rename_ops = plan.rename_operations
518-
has_prefix_change = bool(rename_ops.change_prefix)
540+
has_prefix_change = rename_ops.change_prefix is not None
519541
has_field_renames = bool(rename_ops.rename_fields)
520542
needs_quantization = bool(datatype_changes) and storage_type != "json"
521543
needs_enumeration = needs_quantization or has_prefix_change or has_field_renames

redisvl/migration/utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
from __future__ import annotations
22

33
import json
4+
import logging
45
import time
56
from pathlib import Path
67
from typing import Any, Callable, Dict, List, Optional, Tuple
78

89
import yaml
910

11+
from redisvl.utils.log import get_logger
12+
13+
logger = get_logger(__name__)
14+
1015
from redisvl.index import SearchIndex
1116
from redisvl.migration.models import (
1217
AOF_HSET_OVERHEAD_BYTES,
@@ -83,11 +88,19 @@ def normalize_keys(keys: List[str]) -> List[str]:
8388
def build_scan_match_patterns(prefixes: List[str], key_separator: str) -> List[str]:
8489
"""Build SCAN patterns for all configured prefixes."""
8590
if not prefixes:
91+
logger.warning(
92+
"No prefixes provided for SCAN pattern. "
93+
"Using '*' which will scan the entire keyspace."
94+
)
8695
return ["*"]
8796

8897
patterns = set()
8998
for prefix in prefixes:
9099
if not prefix:
100+
logger.warning(
101+
"Empty prefix in prefix list. "
102+
"Using '*' which will scan the entire keyspace."
103+
)
91104
return ["*"]
92105
if key_separator and not prefix.endswith(key_separator):
93106
patterns.add(f"{prefix}{key_separator}*")

0 commit comments

Comments
 (0)