diff --git a/redisvl/cli/main.py b/redisvl/cli/main.py index 1353192fc..6b3287535 100644 --- a/redisvl/cli/main.py +++ b/redisvl/cli/main.py @@ -2,6 +2,7 @@ import sys from redisvl.cli.index import Index +from redisvl.cli.migrate import Migrate from redisvl.cli.stats import Stats from redisvl.cli.version import Version from redisvl.utils.log import get_logger @@ -16,6 +17,7 @@ def _usage(): "\tindex Index manipulation (create, delete, etc.)", "\tversion Obtain the version of RedisVL", "\tstats Obtain statistics about an index", + "\tmigrate Index migration planning and execution", ] return "\n".join(usage) + "\n" @@ -49,3 +51,6 @@ def version(self): def stats(self): Stats() exit(0) + + def migrate(self): + Migrate() diff --git a/redisvl/cli/migrate.py b/redisvl/cli/migrate.py new file mode 100644 index 000000000..d111a93a8 --- /dev/null +++ b/redisvl/cli/migrate.py @@ -0,0 +1,384 @@ +import argparse +import sys +from pathlib import Path +from typing import Optional + +from redisvl.cli.utils import add_redis_connection_options, create_redis_url +from redisvl.migration import MigrationExecutor, MigrationPlanner, MigrationValidator +from redisvl.migration.utils import ( + detect_aof_enabled, + estimate_disk_space, + list_indexes, + load_migration_plan, + load_yaml, + write_benchmark_report, + write_migration_report, +) +from redisvl.redis.connection import RedisConnectionFactory +from redisvl.utils.log import get_logger + +logger = get_logger("[RedisVL]") + + +class Migrate: + usage = "\n".join( + [ + "rvl migrate []\n", + "Commands:", + "\thelper Show migration guidance and supported capabilities", + "\tlist List all available indexes", + "\tplan Generate a migration plan for a document-preserving drop/recreate migration", + "\tapply Execute a reviewed drop/recreate migration plan", + "\testimate Estimate disk space required for a migration plan (dry-run, no mutations)", + "\tvalidate Validate a completed migration plan against the live index", + "\n", + ] + ) + + def __init__(self): + parser = argparse.ArgumentParser(usage=self.usage) + parser.add_argument("command", help="Subcommand to run") + + args = parser.parse_args(sys.argv[2:3]) + # Convert dashes to underscores for method lookup (e.g., batch-plan -> batch_plan) + command = args.command.replace("-", "_") + if not hasattr(self, command): + print(f"Unknown subcommand: {args.command}") + parser.print_help() + exit(1) + + try: + getattr(self, command)() + except Exception as e: + logger.error(e) + exit(1) + + def helper(self): + parser = argparse.ArgumentParser( + usage="rvl migrate helper [--host --port | --url ]" + ) + parser = add_redis_connection_options(parser) + args = parser.parse_args(sys.argv[3:]) + redis_url = create_redis_url(args) + indexes = list_indexes(redis_url=redis_url) + + print( + """RedisVL Index Migrator + +Available indexes:""" + ) + if indexes: + for position, index_name in enumerate(indexes, start=1): + print(f" {position}. {index_name}") + else: + print(" (none found)") + + print( + """ +Supported changes: + - Adding or removing non-vector fields (text, tag, numeric, geo) + - Changing field options (sortable, separator, weight) + - Changing vector algorithm (FLAT, HNSW, SVS-VAMANA) + - Changing distance metric (COSINE, L2, IP) + - Tuning algorithm parameters (M, EF_CONSTRUCTION, EF_RUNTIME, EPSILON) + - Quantizing vectors (float32 to float16/bfloat16/int8/uint8) + - Changing key prefix (renames all keys) + - Renaming fields (updates all documents) + - Renaming the index + +Not yet supported: + - Changing vector dimensions + - Changing storage type (hash to JSON) + +Commands: + rvl migrate list List all indexes + rvl migrate plan --index --schema-patch + rvl migrate apply --plan + rvl migrate validate --plan """ + ) + + def list(self): + parser = argparse.ArgumentParser( + usage="rvl migrate list [--host --port | --url ]" + ) + parser = add_redis_connection_options(parser) + args = parser.parse_args(sys.argv[3:]) + redis_url = create_redis_url(args) + indexes = list_indexes(redis_url=redis_url) + print("Available indexes:") + for position, index_name in enumerate(indexes, start=1): + print(f"{position}. {index_name}") + + def plan(self): + parser = argparse.ArgumentParser( + usage=( + "rvl migrate plan --index " + "(--schema-patch | --target-schema )" + ) + ) + parser.add_argument("-i", "--index", help="Source index name", required=True) + parser.add_argument("--schema-patch", help="Path to a schema patch file") + parser.add_argument("--target-schema", help="Path to a target schema file") + parser.add_argument( + "--plan-out", + help="Path to write migration_plan.yaml", + default="migration_plan.yaml", + ) + parser.add_argument( + "--key-sample-limit", + help="Maximum number of keys to sample from the index keyspace", + type=int, + default=10, + ) + parser = add_redis_connection_options(parser) + + args = parser.parse_args(sys.argv[3:]) + redis_url = create_redis_url(args) + planner = MigrationPlanner(key_sample_limit=args.key_sample_limit) + plan = planner.create_plan( + args.index, + redis_url=redis_url, + schema_patch_path=args.schema_patch, + target_schema_path=args.target_schema, + ) + planner.write_plan(plan, args.plan_out) + self._print_plan_summary(args.plan_out, plan) + + def apply(self): + parser = argparse.ArgumentParser( + usage=( + "rvl migrate apply --plan " + "[--report-out ]" + ) + ) + parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True) + parser.add_argument( + "--report-out", + help="Path to write migration_report.yaml", + default="migration_report.yaml", + ) + parser.add_argument( + "--benchmark-out", + help="Optional path to write benchmark_report.yaml", + default=None, + ) + parser.add_argument( + "--query-check-file", + help="Optional YAML file containing fetch_ids and keys_exist checks", + default=None, + ) + parser = add_redis_connection_options(parser) + args = parser.parse_args(sys.argv[3:]) + + redis_url = create_redis_url(args) + plan = load_migration_plan(args.plan) + + report = self._apply_sync(plan, redis_url, args.query_check_file) + + write_migration_report(report, args.report_out) + if args.benchmark_out: + write_benchmark_report(report, args.benchmark_out) + self._print_report_summary(args.report_out, report, args.benchmark_out) + + def estimate(self): + """Estimate disk space required for a migration plan (dry-run).""" + parser = argparse.ArgumentParser( + usage="rvl migrate estimate --plan " + ) + parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True) + parser.add_argument( + "--aof-enabled", + action="store_true", + help="Include AOF growth in the disk space estimate", + ) + args = parser.parse_args(sys.argv[3:]) + + plan = load_migration_plan(args.plan) + disk_estimate = estimate_disk_space(plan, aof_enabled=args.aof_enabled) + print(disk_estimate.summary()) + + @staticmethod + def _make_progress_callback(): + """Create a progress callback for migration apply.""" + step_labels = { + "enumerate": "[1/8] Enumerate keys", + "bgsave": "[2/8] BGSAVE snapshot", + "drop": "[3/8] Drop index", + "quantize": "[4/8] Quantize vectors", + "field_rename": "[5/8] Rename fields", + "key_rename": "[6/8] Rename keys", + "create": "[7/8] Create index", + "index": "[8/8] Re-indexing", + "validate": "Validate", + } + + def progress_callback(step: str, detail: Optional[str]) -> None: + label = step_labels.get(step, step) + if detail and not detail.startswith("done"): + print(f" {label}: {detail} ", end="\r", flush=True) + else: + print(f" {label}: {detail} ") + + return progress_callback + + def _apply_sync( + self, + plan, + redis_url: str, + query_check_file: Optional[str], + ): + """Execute migration synchronously.""" + executor = MigrationExecutor() + + print(f"\nApplying migration to '{plan.source.index_name}'...") + + report = executor.apply( + plan, + redis_url=redis_url, + query_check_file=query_check_file, + progress_callback=self._make_progress_callback(), + ) + + self._print_apply_result(report) + return report + + def _print_apply_result(self, report) -> None: + """Print the result summary after migration apply.""" + if report.result == "succeeded": + total_time = report.timings.total_migration_duration_seconds or 0 + downtime = report.timings.downtime_duration_seconds or 0 + print(f"\nMigration completed in {total_time}s (downtime: {downtime}s)") + else: + print(f"\nMigration {report.result}") + if report.validation.errors: + for error in report.validation.errors: + print(f" ERROR: {error}") + + def validate(self): + parser = argparse.ArgumentParser( + usage=( + "rvl migrate validate --plan " + "[--report-out ]" + ) + ) + parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True) + parser.add_argument( + "--report-out", + help="Path to write migration_report.yaml", + default="migration_report.yaml", + ) + parser.add_argument( + "--benchmark-out", + help="Optional path to write benchmark_report.yaml", + default=None, + ) + parser.add_argument( + "--query-check-file", + help="Optional YAML file containing fetch_ids and keys_exist checks", + default=None, + ) + parser = add_redis_connection_options(parser) + args = parser.parse_args(sys.argv[3:]) + + redis_url = create_redis_url(args) + plan = load_migration_plan(args.plan) + validator = MigrationValidator() + + # Local import to avoid circular dependency with migration module + from redisvl.migration.utils import timestamp_utc + + started_at = timestamp_utc() + validation, target_info, validation_duration = validator.validate( + plan, + redis_url=redis_url, + query_check_file=args.query_check_file, + ) + finished_at = timestamp_utc() + + # Local import to avoid circular dependency with migration module + from redisvl.migration.models import ( + MigrationBenchmarkSummary, + MigrationReport, + MigrationTimings, + ) + + source_size = float( + plan.source.stats_snapshot.get("vector_index_sz_mb", 0) or 0 + ) + target_size = float(target_info.get("vector_index_sz_mb", 0) or 0) + + report = MigrationReport( + source_index=plan.source.index_name, + target_index=plan.merged_target_schema["index"]["name"], + result="succeeded" if not validation.errors else "failed", + started_at=started_at, + finished_at=finished_at, + timings=MigrationTimings(validation_duration_seconds=validation_duration), + validation=validation, + benchmark_summary=MigrationBenchmarkSummary( + source_index_size_mb=round(source_size, 3), + target_index_size_mb=round(target_size, 3), + index_size_delta_mb=round(target_size - source_size, 3), + ), + warnings=list(plan.warnings), + manual_actions=( + ["Review validation errors before proceeding."] + if validation.errors + else [] + ), + ) + write_migration_report(report, args.report_out) + if args.benchmark_out: + write_benchmark_report(report, args.benchmark_out) + self._print_report_summary(args.report_out, report, args.benchmark_out) + + def _print_plan_summary(self, plan_out: str, plan) -> None: + import os + + abs_path = os.path.abspath(plan_out) + print( + f"""Migration plan written to {abs_path} +Mode: {plan.mode} +Supported: {plan.diff_classification.supported}""" + ) + if plan.warnings: + print("Warnings:") + for warning in plan.warnings: + print(f"- {warning}") + if plan.diff_classification.blocked_reasons: + print("Blocked reasons:") + for reason in plan.diff_classification.blocked_reasons: + print(f"- {reason}") + + print( + f"""\nNext steps: + Review the plan: cat {plan_out} + Apply the migration: rvl migrate apply --plan {plan_out} + Validate the result: rvl migrate validate --plan {plan_out} + To cancel: rm {plan_out}""" + ) + + def _print_report_summary( + self, + report_out: str, + report, + benchmark_out: Optional[str], + ) -> None: + print( + f"""Migration report written to {report_out} +Result: {report.result} +Schema match: {report.validation.schema_match} +Doc count match: {report.validation.doc_count_match} +Key sample exists: {report.validation.key_sample_exists} +Indexing failures delta: {report.validation.indexing_failures_delta}""" + ) + if report.validation.errors: + print("Errors:") + for error in report.validation.errors: + print(f"- {error}") + if report.manual_actions: + print("Manual actions:") + for action in report.manual_actions: + print(f"- {action}") + if benchmark_out: + print(f"Benchmark report written to {benchmark_out}") diff --git a/redisvl/cli/utils.py b/redisvl/cli/utils.py index 5d76a1842..9b19a126c 100644 --- a/redisvl/cli/utils.py +++ b/redisvl/cli/utils.py @@ -14,9 +14,10 @@ def create_redis_url(args: Namespace) -> str: elif args.url: return args.url else: - url = "redis://" if args.ssl: - url += "rediss://" + url = "rediss://" + else: + url = "redis://" if args.user: url += args.user if args.password: @@ -26,11 +27,7 @@ def create_redis_url(args: Namespace) -> str: return url -def add_index_parsing_options(parser: ArgumentParser) -> ArgumentParser: - parser.add_argument("-i", "--index", help="Index name", type=str, required=False) - parser.add_argument( - "-s", "--schema", help="Path to schema file", type=str, required=False - ) +def add_redis_connection_options(parser: ArgumentParser) -> ArgumentParser: parser.add_argument("-u", "--url", help="Redis URL", type=str, required=False) parser.add_argument("--host", help="Redis host", type=str, default="localhost") parser.add_argument("-p", "--port", help="Redis port", type=int, default=6379) @@ -38,3 +35,11 @@ def add_index_parsing_options(parser: ArgumentParser) -> ArgumentParser: parser.add_argument("--ssl", help="Use SSL", action="store_true") parser.add_argument("-a", "--password", help="Redis password", type=str, default="") return parser + + +def add_index_parsing_options(parser: ArgumentParser) -> ArgumentParser: + parser.add_argument("-i", "--index", help="Index name", type=str, required=False) + parser.add_argument( + "-s", "--schema", help="Path to schema file", type=str, required=False + ) + return add_redis_connection_options(parser) diff --git a/redisvl/migration/__init__.py b/redisvl/migration/__init__.py new file mode 100644 index 000000000..267ad4c31 --- /dev/null +++ b/redisvl/migration/__init__.py @@ -0,0 +1,23 @@ +from redisvl.migration.executor import MigrationExecutor +from redisvl.migration.models import ( + DiskSpaceEstimate, + FieldRename, + MigrationPlan, + MigrationReport, + RenameOperations, + SchemaPatch, +) +from redisvl.migration.planner import MigrationPlanner +from redisvl.migration.validation import MigrationValidator + +__all__ = [ + "DiskSpaceEstimate", + "FieldRename", + "MigrationExecutor", + "MigrationPlan", + "MigrationPlanner", + "MigrationReport", + "MigrationValidator", + "RenameOperations", + "SchemaPatch", +] diff --git a/redisvl/migration/executor.py b/redisvl/migration/executor.py new file mode 100644 index 000000000..565f9e10f --- /dev/null +++ b/redisvl/migration/executor.py @@ -0,0 +1,692 @@ +from __future__ import annotations + +import time +from typing import Any, Callable, Dict, Generator, List, Optional + +from redis.exceptions import ResponseError + +from redisvl.index import SearchIndex +from redisvl.migration.models import ( + MigrationBenchmarkSummary, + MigrationPlan, + MigrationReport, + MigrationTimings, + MigrationValidation, +) +from redisvl.migration.planner import MigrationPlanner +from redisvl.migration.utils import ( + build_scan_match_patterns, + current_source_matches_snapshot, + detect_aof_enabled, + estimate_disk_space, + get_schema_field_path, + normalize_keys, + timestamp_utc, + wait_for_index_ready, +) +from redisvl.migration.validation import MigrationValidator +from redisvl.types import SyncRedisClient +from redisvl.utils.log import get_logger + +logger = get_logger(__name__) + + +class MigrationExecutor: + def __init__(self, validator: Optional[MigrationValidator] = None): + self.validator = validator or MigrationValidator() + + def _enumerate_indexed_keys( + self, + client: SyncRedisClient, + index_name: str, + batch_size: int = 1000, + key_separator: str = ":", + ) -> Generator[str, None, None]: + """Enumerate document keys using FT.AGGREGATE with SCAN fallback. + + Uses FT.AGGREGATE WITHCURSOR for efficient enumeration when the index + has no indexing failures. Falls back to SCAN if: + - Index has hash_indexing_failures > 0 (would miss failed docs) + - FT.AGGREGATE command fails for any reason + + Args: + client: Redis client + index_name: Name of the index to enumerate + batch_size: Number of keys per batch + key_separator: Separator between prefix and key ID + + Yields: + Document keys as strings + """ + # Check for indexing failures - if any, fall back to SCAN + try: + info = client.ft(index_name).info() + failures = int(info.get("hash_indexing_failures", 0) or 0) + if failures > 0: + logger.warning( + f"Index '{index_name}' has {failures} indexing failures. " + "Using SCAN for complete enumeration." + ) + yield from self._enumerate_with_scan( + client, index_name, batch_size, key_separator + ) + return + except Exception as e: + logger.warning(f"Failed to check index info: {e}. Using SCAN fallback.") + yield from self._enumerate_with_scan( + client, index_name, batch_size, key_separator + ) + return + + # Try FT.AGGREGATE enumeration + try: + yield from self._enumerate_with_aggregate(client, index_name, batch_size) + except ResponseError as e: + logger.warning( + f"FT.AGGREGATE failed: {e}. Falling back to SCAN enumeration." + ) + yield from self._enumerate_with_scan( + client, index_name, batch_size, key_separator + ) + + def _enumerate_with_aggregate( + self, + client: SyncRedisClient, + index_name: str, + batch_size: int = 1000, + ) -> Generator[str, None, None]: + """Enumerate keys using FT.AGGREGATE WITHCURSOR. + + More efficient than SCAN for sparse indexes (only returns indexed docs). + Requires LOAD 1 __key to retrieve document keys. + """ + cursor_id: Optional[int] = None + + try: + # Initial aggregate call with LOAD 1 __key (not LOAD 0!) + result = client.execute_command( + "FT.AGGREGATE", + index_name, + "*", + "LOAD", + "1", + "__key", + "WITHCURSOR", + "COUNT", + str(batch_size), + ) + + while True: + results_data, cursor_id = result + + # Extract keys from results (skip first element which is count) + for item in results_data[1:]: + if isinstance(item, (list, tuple)) and len(item) >= 2: + key = item[1] + yield key.decode() if isinstance(key, bytes) else str(key) + + # Check if done (cursor_id == 0) + if cursor_id == 0: + break + + # Read next batch + result = client.execute_command( + "FT.CURSOR", + "READ", + index_name, + str(cursor_id), + "COUNT", + str(batch_size), + ) + finally: + # Clean up cursor if interrupted + if cursor_id and cursor_id != 0: + try: + client.execute_command( + "FT.CURSOR", "DEL", index_name, str(cursor_id) + ) + except Exception: + pass # Cursor may have expired + + def _enumerate_with_scan( + self, + client: SyncRedisClient, + index_name: str, + batch_size: int = 1000, + key_separator: str = ":", + ) -> Generator[str, None, None]: + """Enumerate keys using SCAN with prefix matching. + + Fallback method that scans all keys matching the index prefix. + Less efficient but more complete (includes failed-to-index docs). + """ + # Get prefix from index info + try: + info = client.ft(index_name).info() + # Handle both dict and list formats from FT.INFO + if isinstance(info, dict): + prefixes = info.get("index_definition", {}).get("prefixes", []) + else: + # List format - find index_definition + prefixes = [] + for i, item in enumerate(info): + if item == b"index_definition" or item == "index_definition": + defn = info[i + 1] + if isinstance(defn, dict): + prefixes = defn.get("prefixes", []) + elif isinstance(defn, list): + for j, d in enumerate(defn): + if d in (b"prefixes", "prefixes") and j + 1 < len(defn): + prefixes = defn[j + 1] + break + normalized_prefixes = [ + p.decode() if isinstance(p, bytes) else str(p) for p in prefixes + ] + except Exception as e: + logger.warning(f"Failed to get prefix from index info: {e}") + normalized_prefixes = [] + + seen_keys: set[str] = set() + for match_pattern in build_scan_match_patterns( + normalized_prefixes, key_separator + ): + cursor = 0 + while True: + cursor, keys = client.scan( # type: ignore[misc] + cursor=cursor, + match=match_pattern, + count=batch_size, + ) + for key in keys: + key_str = key.decode() if isinstance(key, bytes) else str(key) + if key_str not in seen_keys: + seen_keys.add(key_str) + yield key_str + + if cursor == 0: + break + + def _rename_keys( + self, + client: SyncRedisClient, + keys: List[str], + old_prefix: str, + new_prefix: str, + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> int: + """Rename keys from old prefix to new prefix. + + Uses RENAMENX to avoid overwriting existing destination keys. + Raises on collision to prevent silent data loss. + + Args: + client: Redis client + keys: List of keys to rename + old_prefix: Current prefix (e.g., "doc:") + new_prefix: New prefix (e.g., "article:") + progress_callback: Optional callback(done, total) + + Returns: + Number of keys successfully renamed + """ + renamed = 0 + total = len(keys) + pipeline_size = 100 # Process in batches + collisions: List[str] = [] + + for i in range(0, total, pipeline_size): + batch = keys[i : i + pipeline_size] + pipe = client.pipeline(transaction=False) + batch_new_keys: List[str] = [] + + for key in batch: + # Compute new key name + if key.startswith(old_prefix): + new_key = new_prefix + key[len(old_prefix) :] + else: + # Key doesn't match expected prefix, skip + logger.warning( + f"Key '{key}' does not start with prefix '{old_prefix}'" + ) + continue + pipe.renamenx(key, new_key) + batch_new_keys.append(new_key) + + try: + results = pipe.execute() + for j, r in enumerate(results): + if r is True or r == 1: + renamed += 1 + else: + collisions.append(batch_new_keys[j]) + except Exception as e: + logger.warning(f"Error in rename batch: {e}") + raise + + # Fail fast on collisions to avoid partial renames across batches. + if collisions: + raise RuntimeError( + f"Prefix rename aborted after {renamed} successful rename(s): " + f"{len(collisions)} destination key(s) already exist " + f"(first 5: {collisions[:5]}). This would overwrite existing data. " + f"Remove conflicting keys or choose a different prefix." + ) + + if progress_callback: + progress_callback(min(i + pipeline_size, total), total) + + return renamed + + def _rename_field_in_hash( + self, + client: SyncRedisClient, + keys: List[str], + old_name: str, + new_name: str, + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> int: + """Rename a field in hash documents. + + For each document: + 1. HGET key old_name -> value + 2. HSET key new_name value + 3. HDEL key old_name + """ + renamed = 0 + total = len(keys) + pipeline_size = 100 + + for i in range(0, total, pipeline_size): + batch = keys[i : i + pipeline_size] + + # First, get old field values AND check if destination exists + pipe = client.pipeline(transaction=False) + for key in batch: + pipe.hget(key, old_name) + pipe.hexists(key, new_name) + raw_results = pipe.execute() + # Interleaved: [hget_0, hexists_0, hget_1, hexists_1, ...] + values = raw_results[0::2] + dest_exists = raw_results[1::2] + + # Now set new field and delete old + pipe = client.pipeline(transaction=False) + batch_ops = 0 + for key, value, exists in zip(batch, values, dest_exists): + if value is not None: + if exists: + logger.warning( + "Field '%s' already exists in key '%s'; " + "overwriting with value from '%s'", + new_name, + key, + old_name, + ) + pipe.hset(key, new_name, value) + pipe.hdel(key, old_name) + batch_ops += 1 + + try: + pipe.execute() + renamed += batch_ops + except Exception as e: + logger.warning(f"Error in field rename batch: {e}") + raise + + if progress_callback: + progress_callback(min(i + pipeline_size, total), total) + + return renamed + + def _rename_field_in_json( + self, + client: SyncRedisClient, + keys: List[str], + old_path: str, + new_path: str, + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> int: + """Rename a field in JSON documents. + + For each document: + 1. JSON.GET key old_path -> value + 2. JSON.SET key new_path value + 3. JSON.DEL key old_path + """ + renamed = 0 + total = len(keys) + pipeline_size = 100 + + for i in range(0, total, pipeline_size): + batch = keys[i : i + pipeline_size] + + # First, get all old field values + pipe = client.pipeline(transaction=False) + for key in batch: + pipe.json().get(key, old_path) + values = pipe.execute() + + # Now set new field and delete old + pipe = client.pipeline(transaction=False) + batch_ops = 0 + for key, value in zip(batch, values): + if value is None or value == []: + continue + if isinstance(value, list) and len(value) == 1: + value = value[0] + pipe.json().set(key, new_path, value) + pipe.json().delete(key, old_path) + batch_ops += 1 + try: + pipe.execute() + renamed += batch_ops + except Exception as e: + logger.warning(f"Error in JSON field rename batch: {e}") + raise + + if progress_callback: + progress_callback(min(i + pipeline_size, total), total) + + return renamed + + def apply( + self, + plan: MigrationPlan, + *, + redis_url: Optional[str] = None, + redis_client: Optional[Any] = None, + query_check_file: Optional[str] = None, + progress_callback: Optional[Callable[[str, Optional[str]], None]] = None, + ) -> MigrationReport: + """Apply a migration plan. + + Args: + plan: The migration plan to apply. + redis_url: Redis connection URL. + redis_client: Optional existing Redis client. + query_check_file: Optional file with query checks. + progress_callback: Optional callback(step, detail) for progress updates. + step: Current step name (e.g., "drop", "create", "index", "validate") + detail: Optional detail string (e.g., "1000/5000 docs (20%)") + """ + started_at = timestamp_utc() + started = time.perf_counter() + + report = MigrationReport( + source_index=plan.source.index_name, + target_index=plan.merged_target_schema["index"]["name"], + result="failed", + started_at=started_at, + finished_at=started_at, + warnings=list(plan.warnings), + ) + + if not plan.diff_classification.supported: + report.validation.errors.extend(plan.diff_classification.blocked_reasons) + report.manual_actions.append( + "This change requires document migration, which is not yet supported." + ) + report.finished_at = timestamp_utc() + return report + + if not current_source_matches_snapshot( + plan.source.index_name, + plan.source.schema_snapshot, + redis_url=redis_url, + redis_client=redis_client, + ): + report.validation.errors.append( + "The current live source schema no longer matches the saved source snapshot." + ) + report.manual_actions.append( + "Re-run `rvl migrate plan` to refresh the migration plan before applying." + ) + report.finished_at = timestamp_utc() + return report + + source_index = SearchIndex.from_existing( + plan.source.index_name, + redis_url=redis_url, + redis_client=redis_client, + ) + + target_index = SearchIndex.from_dict( + plan.merged_target_schema, + redis_url=redis_url, + redis_client=redis_client, + ) + + enumerate_duration = 0.0 + drop_duration = 0.0 + field_rename_duration = 0.0 + key_rename_duration = 0.0 + recreate_duration = 0.0 + indexing_duration = 0.0 + target_info: Dict[str, Any] = {} + keys_to_process: List[str] = [] + storage_type = plan.source.keyspace.storage_type + + # Check for rename operations + rename_ops = plan.rename_operations + has_prefix_change = rename_ops.change_prefix is not None + has_field_renames = bool(rename_ops.rename_fields) + needs_enumeration = has_prefix_change or has_field_renames + + def _notify(step: str, detail: Optional[str] = None) -> None: + if progress_callback: + progress_callback(step, detail) + + try: + client = source_index._redis_client + aof_enabled = detect_aof_enabled(client) + disk_estimate = estimate_disk_space(plan, aof_enabled=aof_enabled) + report.disk_space_estimate = disk_estimate + + # STEP 1: Enumerate keys BEFORE any modifications + # Needed for: prefix change or field renames + if needs_enumeration: + _notify("enumerate", "Enumerating indexed documents...") + enumerate_started = time.perf_counter() + keys_to_process = list( + self._enumerate_indexed_keys( + client, + plan.source.index_name, + batch_size=1000, + key_separator=plan.source.keyspace.key_separator, + ) + ) + keys_to_process = normalize_keys(keys_to_process) + enumerate_duration = round(time.perf_counter() - enumerate_started, 3) + _notify( + "enumerate", + f"found {len(keys_to_process):,} documents ({enumerate_duration}s)", + ) + + # STEP 2: Field renames (before dropping index) + if has_field_renames and keys_to_process: + _notify("field_rename", "Renaming fields in documents...") + field_rename_started = time.perf_counter() + for field_rename in rename_ops.rename_fields: + if storage_type == "json": + old_path = get_schema_field_path( + plan.source.schema_snapshot, field_rename.old_name + ) + new_path = get_schema_field_path( + plan.merged_target_schema, field_rename.new_name + ) + if not old_path or not new_path or old_path == new_path: + continue + self._rename_field_in_json( + client, + keys_to_process, + old_path, + new_path, + progress_callback=lambda done, total: _notify( + "field_rename", + f"{field_rename.old_name} -> {field_rename.new_name}: {done:,}/{total:,}", + ), + ) + else: + self._rename_field_in_hash( + client, + keys_to_process, + field_rename.old_name, + field_rename.new_name, + progress_callback=lambda done, total: _notify( + "field_rename", + f"{field_rename.old_name} -> {field_rename.new_name}: {done:,}/{total:,}", + ), + ) + field_rename_duration = round( + time.perf_counter() - field_rename_started, 3 + ) + _notify("field_rename", f"done ({field_rename_duration}s)") + + # STEP 3: Drop the index + _notify("drop", "Dropping index definition...") + drop_started = time.perf_counter() + source_index.delete(drop=False) + drop_duration = round(time.perf_counter() - drop_started, 3) + _notify("drop", f"done ({drop_duration}s)") + + # STEP 4: Key renames (after drop, before recreate) + if has_prefix_change and keys_to_process: + _notify("key_rename", "Renaming keys...") + key_rename_started = time.perf_counter() + old_prefix = plan.source.keyspace.prefixes[0] + new_prefix = rename_ops.change_prefix + assert new_prefix is not None # For type checker + renamed_count = self._rename_keys( + client, + keys_to_process, + old_prefix, + new_prefix, + progress_callback=lambda done, total: _notify( + "key_rename", f"{done:,}/{total:,} keys" + ), + ) + key_rename_duration = round(time.perf_counter() - key_rename_started, 3) + _notify( + "key_rename", + f"done ({renamed_count:,} keys in {key_rename_duration}s)", + ) + + _notify("create", "Creating index with new schema...") + recreate_started = time.perf_counter() + target_index.create() + recreate_duration = round(time.perf_counter() - recreate_started, 3) + _notify("create", f"done ({recreate_duration}s)") + + _notify("index", "Waiting for re-indexing...") + + def _index_progress(indexed: int, total: int, pct: float) -> None: + _notify("index", f"{indexed:,}/{total:,} docs ({pct:.0f}%)") + + target_info, indexing_duration = wait_for_index_ready( + target_index, progress_callback=_index_progress + ) + _notify("index", f"done ({indexing_duration}s)") + + _notify("validate", "Validating migration...") + validation, target_info, validation_duration = self.validator.validate( + plan, + redis_url=redis_url, + redis_client=redis_client, + query_check_file=query_check_file, + ) + _notify("validate", f"done ({validation_duration}s)") + report.validation = validation + total_duration = round(time.perf_counter() - started, 3) + report.timings = MigrationTimings( + total_migration_duration_seconds=total_duration, + drop_duration_seconds=drop_duration, + field_rename_duration_seconds=( + field_rename_duration if field_rename_duration else None + ), + key_rename_duration_seconds=( + key_rename_duration if key_rename_duration else None + ), + recreate_duration_seconds=recreate_duration, + initial_indexing_duration_seconds=indexing_duration, + validation_duration_seconds=validation_duration, + downtime_duration_seconds=round( + drop_duration + + field_rename_duration + + key_rename_duration + + recreate_duration + + indexing_duration, + 3, + ), + ) + report.benchmark_summary = self._build_benchmark_summary( + plan, + target_info, + report.timings, + ) + report.result = "succeeded" if not validation.errors else "failed" + if validation.errors: + report.manual_actions.append( + "Review validation errors before treating the migration as complete." + ) + except Exception as exc: + total_duration = round(time.perf_counter() - started, 3) + report.timings = MigrationTimings( + total_migration_duration_seconds=total_duration, + drop_duration_seconds=drop_duration or None, + field_rename_duration_seconds=field_rename_duration or None, + key_rename_duration_seconds=key_rename_duration or None, + recreate_duration_seconds=recreate_duration or None, + initial_indexing_duration_seconds=indexing_duration or None, + downtime_duration_seconds=( + round( + drop_duration + + field_rename_duration + + key_rename_duration + + recreate_duration + + indexing_duration, + 3, + ) + if drop_duration + or field_rename_duration + or key_rename_duration + or recreate_duration + or indexing_duration + else None + ), + ) + report.validation = MigrationValidation( + errors=[f"Migration execution failed: {exc}"] + ) + report.manual_actions.extend( + [ + "Inspect the Redis index state before retrying.", + "If the source index was dropped, recreate it from the saved migration plan.", + ] + ) + finally: + report.finished_at = timestamp_utc() + + return report + + def _build_benchmark_summary( + self, + plan: MigrationPlan, + target_info: dict, + timings: MigrationTimings, + ) -> MigrationBenchmarkSummary: + source_index_size = float( + plan.source.stats_snapshot.get("vector_index_sz_mb", 0) or 0 + ) + target_index_size = float(target_info.get("vector_index_sz_mb", 0) or 0) + source_num_docs = int(plan.source.stats_snapshot.get("num_docs", 0) or 0) + indexed_per_second = None + indexing_time = timings.initial_indexing_duration_seconds + if indexing_time and indexing_time > 0: + indexed_per_second = round(source_num_docs / indexing_time, 3) + + return MigrationBenchmarkSummary( + documents_indexed_per_second=indexed_per_second, + source_index_size_mb=round(source_index_size, 3), + target_index_size_mb=round(target_index_size, 3), + index_size_delta_mb=round(target_index_size - source_index_size, 3), + ) diff --git a/redisvl/migration/models.py b/redisvl/migration/models.py new file mode 100644 index 000000000..8cffd2a4f --- /dev/null +++ b/redisvl/migration/models.py @@ -0,0 +1,380 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field, model_validator + + +class FieldUpdate(BaseModel): + """Partial field update for schema patch inputs.""" + + name: str + type: Optional[str] = None + path: Optional[str] = None + attrs: Dict[str, Any] = Field(default_factory=dict) + options: Dict[str, Any] = Field(default_factory=dict) + + @model_validator(mode="after") + def merge_options_into_attrs(self) -> "FieldUpdate": + if self.options: + merged_attrs = dict(self.attrs) + merged_attrs.update(self.options) + self.attrs = merged_attrs + self.options = {} + return self + + +class FieldRename(BaseModel): + """Field rename specification for schema patch inputs.""" + + old_name: str + new_name: str + + +class SchemaPatchChanges(BaseModel): + add_fields: List[Dict[str, Any]] = Field(default_factory=list) + remove_fields: List[str] = Field(default_factory=list) + update_fields: List[FieldUpdate] = Field(default_factory=list) + rename_fields: List[FieldRename] = Field(default_factory=list) + index: Dict[str, Any] = Field(default_factory=dict) + + +class SchemaPatch(BaseModel): + version: int = 1 + changes: SchemaPatchChanges = Field(default_factory=SchemaPatchChanges) + + +class KeyspaceSnapshot(BaseModel): + storage_type: str + prefixes: List[str] + key_separator: str + key_sample: List[str] = Field(default_factory=list) + + +class SourceSnapshot(BaseModel): + index_name: str + schema_snapshot: Dict[str, Any] + stats_snapshot: Dict[str, Any] + keyspace: KeyspaceSnapshot + + +class DiffClassification(BaseModel): + supported: bool + blocked_reasons: List[str] = Field(default_factory=list) + + +class ValidationPolicy(BaseModel): + require_doc_count_match: bool = True + require_schema_match: bool = True + + +class RenameOperations(BaseModel): + """Tracks which rename operations are required for a migration.""" + + rename_index: Optional[str] = None # New index name if renaming + change_prefix: Optional[str] = None # New prefix if changing + rename_fields: List[FieldRename] = Field(default_factory=list) + + @property + def has_operations(self) -> bool: + return bool( + self.rename_index is not None + or self.change_prefix is not None + or self.rename_fields + ) + + +class MigrationPlan(BaseModel): + version: int = 1 + mode: str = "drop_recreate" + source: SourceSnapshot + requested_changes: Dict[str, Any] + merged_target_schema: Dict[str, Any] + diff_classification: DiffClassification + rename_operations: RenameOperations = Field(default_factory=RenameOperations) + warnings: List[str] = Field(default_factory=list) + validation: ValidationPolicy = Field(default_factory=ValidationPolicy) + + +class QueryCheckResult(BaseModel): + name: str + passed: bool + details: Optional[str] = None + + +class MigrationValidation(BaseModel): + schema_match: bool = False + doc_count_match: bool = False + key_sample_exists: bool = False + indexing_failures_delta: int = 0 + query_checks: List[QueryCheckResult] = Field(default_factory=list) + errors: List[str] = Field(default_factory=list) + + +class MigrationTimings(BaseModel): + total_migration_duration_seconds: Optional[float] = None + drop_duration_seconds: Optional[float] = None + quantize_duration_seconds: Optional[float] = None + field_rename_duration_seconds: Optional[float] = None + key_rename_duration_seconds: Optional[float] = None + recreate_duration_seconds: Optional[float] = None + initial_indexing_duration_seconds: Optional[float] = None + validation_duration_seconds: Optional[float] = None + downtime_duration_seconds: Optional[float] = None + + +class MigrationBenchmarkSummary(BaseModel): + documents_indexed_per_second: Optional[float] = None + source_index_size_mb: Optional[float] = None + target_index_size_mb: Optional[float] = None + index_size_delta_mb: Optional[float] = None + + +class MigrationReport(BaseModel): + version: int = 1 + mode: str = "drop_recreate" + source_index: str + target_index: str + result: str + started_at: str + finished_at: str + timings: MigrationTimings = Field(default_factory=MigrationTimings) + validation: MigrationValidation = Field(default_factory=MigrationValidation) + benchmark_summary: MigrationBenchmarkSummary = Field( + default_factory=MigrationBenchmarkSummary + ) + disk_space_estimate: Optional["DiskSpaceEstimate"] = None + warnings: List[str] = Field(default_factory=list) + manual_actions: List[str] = Field(default_factory=list) + + +# ----------------------------------------------------------------------------- +# Disk Space Estimation +# ----------------------------------------------------------------------------- + +# Bytes per element for each vector datatype +DTYPE_BYTES: Dict[str, int] = { + "float64": 8, + "float32": 4, + "float16": 2, + "bfloat16": 2, + "int8": 1, + "uint8": 1, +} + +# AOF protocol overhead per HSET command (RESP framing) +AOF_HSET_OVERHEAD_BYTES = 114 +# JSON.SET has slightly larger RESP framing +AOF_JSON_SET_OVERHEAD_BYTES = 140 +# RDB compression ratio for pseudo-random vector data (compresses poorly) +RDB_COMPRESSION_RATIO = 0.95 + + +class VectorFieldEstimate(BaseModel): + """Per-field disk space breakdown for a single vector field.""" + + field_name: str + dims: int + source_dtype: str + target_dtype: str + source_bytes_per_doc: int + target_bytes_per_doc: int + + +class DiskSpaceEstimate(BaseModel): + """Pre-migration estimate of disk and memory costs. + + Produced by estimate_disk_space() as a pure calculation from the migration + plan. No Redis mutations are performed. + """ + + # Index metadata + index_name: str + doc_count: int + storage_type: str = "hash" + + # Per-field breakdowns + vector_fields: List[VectorFieldEstimate] = Field(default_factory=list) + + # Aggregate vector data sizes + total_source_vector_bytes: int = 0 + total_target_vector_bytes: int = 0 + + # RDB snapshot cost (BGSAVE before migration) + rdb_snapshot_disk_bytes: int = 0 + rdb_cow_memory_if_concurrent_bytes: int = 0 + + # AOF growth cost (only if aof_enabled is True) + aof_enabled: bool = False + aof_growth_bytes: int = 0 + + # Totals + total_new_disk_bytes: int = 0 + memory_savings_after_bytes: int = 0 + + @property + def has_quantization(self) -> bool: + return len(self.vector_fields) > 0 + + def summary(self) -> str: + """Human-readable summary for CLI output.""" + if not self.has_quantization: + return "No vector quantization in this migration. No additional disk space required." + + lines = [ + "Pre-migration disk space estimate:", + f" Index: {self.index_name} ({self.doc_count:,} documents)", + ] + for vf in self.vector_fields: + lines.append( + f" Vector field '{vf.field_name}': {vf.dims} dims, " + f"{vf.source_dtype} -> {vf.target_dtype}" + ) + + lines.append("") + lines.append( + f" RDB snapshot (BGSAVE): ~{_format_bytes(self.rdb_snapshot_disk_bytes)}" + ) + if self.aof_enabled: + lines.append( + f" AOF growth (appendonly=yes): ~{_format_bytes(self.aof_growth_bytes)}" + ) + else: + lines.append( + " AOF growth: not estimated (pass aof_enabled=True if AOF is on)" + ) + lines.append( + f" Total new disk required: ~{_format_bytes(self.total_new_disk_bytes)}" + ) + lines.append("") + lines.append( + f" Post-migration memory delta: ~{_format_bytes(abs(self.memory_savings_after_bytes))} " + f"({'reduction' if self.memory_savings_after_bytes >= 0 else 'increase'}, " + f"{abs(self._savings_pct())}%)" + ) + return "\n".join(lines) + + def _savings_pct(self) -> int: + if self.total_source_vector_bytes == 0: + return 0 + return round( + 100 * self.memory_savings_after_bytes / self.total_source_vector_bytes + ) + + +def _format_bytes(n: int) -> str: + """Format byte count as human-readable string.""" + if n >= 1_073_741_824: + return f"{n / 1_073_741_824:.2f} GB" + if n >= 1_048_576: + return f"{n / 1_048_576:.1f} MB" + if n >= 1024: + return f"{n / 1024:.1f} KB" + return f"{n} bytes" + + +# ----------------------------------------------------------------------------- +# Batch Migration Models +# ----------------------------------------------------------------------------- + + +class BatchIndexEntry(BaseModel): + """Entry for a single index in a batch migration plan.""" + + name: str + applicable: bool = True + skip_reason: Optional[str] = None + + +class BatchPlan(BaseModel): + """Plan for migrating multiple indexes with a shared patch.""" + + version: int = 1 + batch_id: str + mode: str = "drop_recreate" + failure_policy: str = "fail_fast" # or "continue_on_error" + requires_quantization: bool = False + shared_patch: SchemaPatch + indexes: List[BatchIndexEntry] = Field(default_factory=list) + created_at: str + + @property + def applicable_count(self) -> int: + return sum(1 for idx in self.indexes if idx.applicable) + + @property + def skipped_count(self) -> int: + return sum(1 for idx in self.indexes if not idx.applicable) + + +class BatchIndexState(BaseModel): + """State of a single index in batch execution.""" + + name: str + status: str # pending, in_progress, success, failed, skipped + started_at: Optional[str] = None + completed_at: Optional[str] = None + failed_at: Optional[str] = None + error: Optional[str] = None + report_path: Optional[str] = None + + +class BatchState(BaseModel): + """Checkpoint state for batch migration execution.""" + + batch_id: str + plan_path: str + started_at: str + updated_at: str + completed: List[BatchIndexState] = Field(default_factory=list) + current_index: Optional[str] = None + remaining: List[str] = Field(default_factory=list) + + @property + def success_count(self) -> int: + return sum(1 for idx in self.completed if idx.status == "success") + + @property + def failed_count(self) -> int: + return sum(1 for idx in self.completed if idx.status == "failed") + + @property + def skipped_count(self) -> int: + return sum(1 for idx in self.completed if idx.status == "skipped") + + @property + def is_complete(self) -> bool: + return len(self.remaining) == 0 and self.current_index is None + + +class BatchReportSummary(BaseModel): + """Summary statistics for batch migration.""" + + total_indexes: int = 0 + successful: int = 0 + failed: int = 0 + skipped: int = 0 + total_duration_seconds: float = 0.0 + + +class BatchIndexReport(BaseModel): + """Report for a single index in batch execution.""" + + name: str + status: str # success, failed, skipped + duration_seconds: Optional[float] = None + docs_migrated: Optional[int] = None + report_path: Optional[str] = None + error: Optional[str] = None + + +class BatchReport(BaseModel): + """Final report for batch migration execution.""" + + version: int = 1 + batch_id: str + status: str # completed, partial_failure, failed + summary: BatchReportSummary = Field(default_factory=BatchReportSummary) + indexes: List[BatchIndexReport] = Field(default_factory=list) + started_at: str + completed_at: str diff --git a/redisvl/migration/planner.py b/redisvl/migration/planner.py new file mode 100644 index 000000000..bf773621b --- /dev/null +++ b/redisvl/migration/planner.py @@ -0,0 +1,735 @@ +from __future__ import annotations + +from copy import deepcopy +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import yaml + +from redisvl.index import SearchIndex +from redisvl.migration.models import ( + DiffClassification, + FieldRename, + KeyspaceSnapshot, + MigrationPlan, + RenameOperations, + SchemaPatch, + SourceSnapshot, +) +from redisvl.redis.connection import supports_svs +from redisvl.schema.schema import IndexSchema + + +class MigrationPlanner: + """Migration planner for drop/recreate-based index migrations. + + The `drop_recreate` mode drops the index definition and recreates it with + a new schema. By default, documents are preserved in Redis. When possible, + the planner/executor can apply transformations so the preserved documents + remain compatible with the new index schema. + + This means: + - Index-only changes are always safe (algorithm, distance metric, tuning + params, quantization, etc.) + - Some document-dependent changes are supported via explicit migration + operations in the migration plan + + Supported document-dependent changes: + - Prefix/keyspace changes: keys are renamed via RENAME command + - Field renames: documents are updated to use new field names + - Index renaming: the new index is created with a different name + + Document-dependent changes that remain unsupported: + - Vector dimensions: stored vectors have wrong number of dimensions + - Storage type: documents are in hash format but index expects JSON + """ + + def __init__(self, key_sample_limit: int = 10): + self.key_sample_limit = key_sample_limit + + def create_plan( + self, + index_name: str, + *, + redis_url: Optional[str] = None, + schema_patch_path: Optional[str] = None, + target_schema_path: Optional[str] = None, + redis_client: Optional[Any] = None, + ) -> MigrationPlan: + if not schema_patch_path and not target_schema_path: + raise ValueError( + "Must provide either --schema-patch or --target-schema for migration planning" + ) + if schema_patch_path and target_schema_path: + raise ValueError( + "Provide only one of --schema-patch or --target-schema for migration planning" + ) + + snapshot = self.snapshot_source( + index_name, + redis_url=redis_url, + redis_client=redis_client, + ) + source_schema = IndexSchema.from_dict(snapshot.schema_snapshot) + + if schema_patch_path: + schema_patch = self.load_schema_patch(schema_patch_path) + else: + # target_schema_path is guaranteed non-None here due to validation above + assert target_schema_path is not None + schema_patch = self.normalize_target_schema_to_patch( + source_schema, target_schema_path + ) + + return self.create_plan_from_patch( + index_name, + schema_patch=schema_patch, + redis_url=redis_url, + redis_client=redis_client, + _snapshot=snapshot, + ) + + def create_plan_from_patch( + self, + index_name: str, + *, + schema_patch: SchemaPatch, + redis_url: Optional[str] = None, + redis_client: Optional[Any] = None, + _snapshot: Optional[Any] = None, + ) -> MigrationPlan: + if _snapshot is None: + _snapshot = self.snapshot_source( + index_name, + redis_url=redis_url, + redis_client=redis_client, + ) + snapshot = _snapshot + source_schema = IndexSchema.from_dict(snapshot.schema_snapshot) + merged_target_schema = self.merge_patch(source_schema, schema_patch) + + # Extract rename operations first + rename_operations, rename_warnings = self._extract_rename_operations( + source_schema, schema_patch + ) + + # Classify diff with awareness of rename operations + diff_classification = self.classify_diff( + source_schema, schema_patch, merged_target_schema, rename_operations + ) + + # Build warnings list + warnings = ["Index downtime is required"] + warnings.extend(rename_warnings) + + # Check for SVS-VAMANA in target schema and add appropriate warnings + svs_warnings = self._check_svs_vamana_requirements( + merged_target_schema, + redis_url=redis_url, + redis_client=redis_client, + ) + warnings.extend(svs_warnings) + + return MigrationPlan( + source=snapshot, + requested_changes=schema_patch.model_dump(exclude_none=True), + merged_target_schema=merged_target_schema.to_dict(), + diff_classification=diff_classification, + rename_operations=rename_operations, + warnings=warnings, + ) + + def snapshot_source( + self, + index_name: str, + *, + redis_url: Optional[str] = None, + redis_client: Optional[Any] = None, + ) -> SourceSnapshot: + index = SearchIndex.from_existing( + index_name, + redis_url=redis_url, + redis_client=redis_client, + ) + schema_dict = index.schema.to_dict() + stats_snapshot = index.info() + prefixes = index.schema.index.prefix + prefix_list = prefixes if isinstance(prefixes, list) else [prefixes] + + return SourceSnapshot( + index_name=index_name, + schema_snapshot=schema_dict, + stats_snapshot=stats_snapshot, + keyspace=KeyspaceSnapshot( + storage_type=index.schema.index.storage_type.value, + prefixes=prefix_list, + key_separator=index.schema.index.key_separator, + key_sample=self._sample_keys( + client=index.client, + prefixes=prefix_list, + key_separator=index.schema.index.key_separator, + ), + ), + ) + + def load_schema_patch(self, schema_patch_path: str) -> SchemaPatch: + patch_path = Path(schema_patch_path).resolve() + if not patch_path.exists(): + raise FileNotFoundError( + f"Schema patch file {schema_patch_path} does not exist" + ) + + with open(patch_path, "r") as f: + patch_data = yaml.safe_load(f) or {} + return SchemaPatch.model_validate(patch_data) + + def normalize_target_schema_to_patch( + self, source_schema: IndexSchema, target_schema_path: str + ) -> SchemaPatch: + target_schema = IndexSchema.from_yaml(target_schema_path) + source_dict = source_schema.to_dict() + target_dict = target_schema.to_dict() + + changes: Dict[str, Any] = { + "add_fields": [], + "remove_fields": [], + "update_fields": [], + "index": {}, + } + + source_fields = {field["name"]: field for field in source_dict["fields"]} + target_fields = {field["name"]: field for field in target_dict["fields"]} + + for field_name, target_field in target_fields.items(): + if field_name not in source_fields: + changes["add_fields"].append(target_field) + elif source_fields[field_name] != target_field: + changes["update_fields"].append(target_field) + + for field_name in source_fields: + if field_name not in target_fields: + changes["remove_fields"].append(field_name) + + for index_key, target_value in target_dict["index"].items(): + source_value = source_dict["index"].get(index_key) + if source_value != target_value: + changes["index"][index_key] = target_value + + return SchemaPatch.model_validate({"version": 1, "changes": changes}) + + def merge_patch( + self, source_schema: IndexSchema, schema_patch: SchemaPatch + ) -> IndexSchema: + schema_dict = deepcopy(source_schema.to_dict()) + changes = schema_patch.changes + fields_by_name = { + field["name"]: deepcopy(field) for field in schema_dict["fields"] + } + + # Apply field renames first (before other modifications) + # This ensures the merged schema's field names match the executor's renamed fields + for rename in changes.rename_fields: + if rename.old_name not in fields_by_name: + raise ValueError( + f"Cannot rename field '{rename.old_name}' because it does not exist in the source schema" + ) + if rename.new_name in fields_by_name and rename.new_name != rename.old_name: + raise ValueError( + f"Cannot rename field '{rename.old_name}' to '{rename.new_name}' because a field with the new name already exists" + ) + if rename.new_name == rename.old_name: + continue # No-op rename + field_def = fields_by_name.pop(rename.old_name) + field_def["name"] = rename.new_name + fields_by_name[rename.new_name] = field_def + + for field_name in changes.remove_fields: + fields_by_name.pop(field_name, None) + + # Build a mapping from old field names to new names so that + # update_fields entries referencing pre-rename names still resolve. + rename_map = { + rename.old_name: rename.new_name + for rename in changes.rename_fields + if rename.old_name != rename.new_name + } + + for field_update in changes.update_fields: + # Resolve through renames: if the update references the old name, + # look up the field under its new name. + resolved_name = rename_map.get(field_update.name, field_update.name) + if resolved_name not in fields_by_name: + raise ValueError( + f"Cannot update field '{field_update.name}' because it does not exist in the source schema" + ) + existing_field = fields_by_name[resolved_name] + if field_update.type is not None: + existing_field["type"] = field_update.type + if field_update.path is not None: + existing_field["path"] = field_update.path + if field_update.attrs: + merged_attrs = dict(existing_field.get("attrs", {})) + merged_attrs.update(field_update.attrs) + existing_field["attrs"] = merged_attrs + + for field in changes.add_fields: + field_name = field["name"] + if field_name in fields_by_name: + raise ValueError( + f"Cannot add field '{field_name}' because it already exists in the source schema" + ) + fields_by_name[field_name] = deepcopy(field) + + schema_dict["fields"] = list(fields_by_name.values()) + schema_dict["index"].update(changes.index) + return IndexSchema.from_dict(schema_dict) + + def _extract_rename_operations( + self, + source_schema: IndexSchema, + schema_patch: SchemaPatch, + ) -> Tuple[RenameOperations, List[str]]: + """Extract rename operations from the patch and generate warnings. + + Returns: + Tuple of (RenameOperations, warnings list) + """ + source_dict = source_schema.to_dict() + changes = schema_patch.changes + warnings: List[str] = [] + + # Index rename + rename_index: Optional[str] = None + if "name" in changes.index: + new_name = changes.index["name"] + old_name = source_dict["index"].get("name") + if new_name != old_name: + rename_index = new_name + warnings.append( + f"Index rename: '{old_name}' -> '{new_name}' (index-only change, no document migration needed)" + ) + + # Prefix change + change_prefix: Optional[str] = None + if "prefix" in changes.index: + new_prefix = changes.index["prefix"] + # Normalize list-type prefix to a single string (local copy only) + if isinstance(new_prefix, list): + if len(new_prefix) != 1: + raise ValueError( + f"Target prefix must be a single string, got list: {new_prefix}. " + f"Multi-prefix migrations are not supported." + ) + new_prefix = new_prefix[0] + old_prefix = source_dict["index"].get("prefix") + # Normalize single-element list to string for comparison + if isinstance(old_prefix, list) and len(old_prefix) == 1: + old_prefix = old_prefix[0] + if new_prefix != old_prefix: + # Block multi-prefix migrations - we only support single prefix + if isinstance(old_prefix, list) and len(old_prefix) > 1: + raise ValueError( + f"Cannot change prefix for multi-prefix indexes. " + f"Source index has multiple prefixes: {old_prefix}. " + f"Multi-prefix migrations are not supported." + ) + change_prefix = new_prefix + warnings.append( + f"Prefix change: '{old_prefix}' -> '{new_prefix}' " + "(requires RENAME for all keys, may be slow for large datasets)" + ) + + # Field renames from explicit rename_fields + rename_fields: List[FieldRename] = list(changes.rename_fields) + for field_rename in rename_fields: + warnings.append( + f"Field rename: '{field_rename.old_name}' -> '{field_rename.new_name}' " + "(requires read/write for all documents, may be slow for large datasets)" + ) + + return ( + RenameOperations( + rename_index=rename_index, + change_prefix=change_prefix, + rename_fields=rename_fields, + ), + warnings, + ) + + def _check_svs_vamana_requirements( + self, + target_schema: IndexSchema, + *, + redis_url: Optional[str] = None, + redis_client: Optional[Any] = None, + ) -> List[str]: + """Check SVS-VAMANA requirements and return warnings. + + Checks: + 1. If target uses SVS-VAMANA, verify Redis version supports it + 2. Add Intel hardware warning for LVQ/LeanVec optimizations + """ + warnings: List[str] = [] + target_dict = target_schema.to_dict() + + # Check if any vector field uses SVS-VAMANA + uses_svs = False + uses_compression = False + compression_type = None + + for field in target_dict.get("fields", []): + if field.get("type") != "vector": + continue + attrs = field.get("attrs", {}) + algo = attrs.get("algorithm", "").upper() + if algo == "SVS-VAMANA": + uses_svs = True + compression = attrs.get("compression", "") + if compression: + uses_compression = True + compression_type = compression + + if not uses_svs: + return warnings + + # Check Redis version support + created_client = None + try: + if redis_client: + client = redis_client + elif redis_url: + from redis import Redis + + client = Redis.from_url(redis_url) + created_client = client + else: + client = None + + if client and not supports_svs(client): + warnings.append( + "SVS-VAMANA requires Redis >= 8.2.0 and Redis Search >= 2.8.10. " + "The target Redis instance may not support this algorithm. " + "Migration will fail at apply time if requirements are not met." + ) + except Exception: + # If we can't check, add a general warning + warnings.append( + "SVS-VAMANA requires Redis >= 8.2.0 and Redis Search >= 2.8.10. " + "Verify your Redis instance supports this algorithm before applying." + ) + finally: + if created_client: + created_client.close() + + # Intel hardware warning for compression + if uses_compression: + warnings.append( + f"SVS-VAMANA with {compression_type} compression: " + "LVQ and LeanVec optimizations require Intel hardware with AVX-512 support. " + "On non-Intel platforms or Redis Open Source, these fall back to basic " + "8-bit scalar quantization with reduced performance benefits." + ) + else: + warnings.append( + "SVS-VAMANA: For optimal performance, Intel hardware with AVX-512 support " + "is recommended. LVQ/LeanVec compression options provide additional memory " + "savings on supported hardware." + ) + + return warnings + + def classify_diff( + self, + source_schema: IndexSchema, + schema_patch: SchemaPatch, + merged_target_schema: IndexSchema, + rename_operations: Optional[RenameOperations] = None, + ) -> DiffClassification: + blocked_reasons: List[str] = [] + changes = schema_patch.changes + source_dict = source_schema.to_dict() + target_dict = merged_target_schema.to_dict() + + # Check which rename operations are being handled + has_index_rename = rename_operations and rename_operations.rename_index + has_prefix_change = ( + rename_operations and rename_operations.change_prefix is not None + ) + has_field_renames = ( + rename_operations and len(rename_operations.rename_fields) > 0 + ) + + for index_key, target_value in changes.index.items(): + source_value = source_dict["index"].get(index_key) + # Normalize single-element list prefixes for comparison so that + # e.g. source ``["docs"]`` and target ``"docs"`` are treated as equal. + sv, tv = source_value, target_value + if index_key == "prefix": + if isinstance(sv, list) and len(sv) == 1: + sv = sv[0] + if isinstance(tv, list) and len(tv) == 1: + tv = tv[0] + if sv == tv: + continue + if index_key == "name": + # Index rename is now supported - skip blocking if we have rename_operations + if not has_index_rename: + blocked_reasons.append( + "Changing the index name requires document migration (not yet supported)." + ) + elif index_key == "prefix": + # Prefix change is now supported + if not has_prefix_change: + blocked_reasons.append( + "Changing index prefixes requires document migration (not yet supported)." + ) + elif index_key == "key_separator": + blocked_reasons.append( + "Changing the key separator requires document migration (not yet supported)." + ) + elif index_key == "storage_type": + blocked_reasons.append( + "Changing the storage type requires document migration (not yet supported)." + ) + + source_fields = {field["name"]: field for field in source_dict["fields"]} + target_fields = {field["name"]: field for field in target_dict["fields"]} + + for field in changes.add_fields: + if field["type"] == "vector": + blocked_reasons.append( + f"Adding vector field '{field['name']}' requires document migration (not yet supported)." + ) + + # Build rename mappings: old->new and new->old so update_fields + # can reference either the pre-rename or post-rename name + classify_rename_map = { + rename.old_name: rename.new_name + for rename in changes.rename_fields + if rename.old_name != rename.new_name + } + reverse_rename_map = {v: k for k, v in classify_rename_map.items()} + + for field_update in changes.update_fields: + # Resolve through renames: update_fields may use old or new name + if field_update.name in classify_rename_map: + # update references old name -> look up source by old, target by new + source_name = field_update.name + target_name = classify_rename_map[field_update.name] + elif field_update.name in reverse_rename_map: + # update references new name -> look up source by old, target by new + source_name = reverse_rename_map[field_update.name] + target_name = field_update.name + else: + # no rename involved + source_name = field_update.name + target_name = field_update.name + source_field = source_fields.get(source_name) + target_field = target_fields.get(target_name) + if source_field is None or target_field is None: + # Field not found in source or target; skip classification + continue + source_type = source_field["type"] + target_type = target_field["type"] + + if source_type != target_type: + blocked_reasons.append( + f"Changing field '{field_update.name}' type from {source_type} to {target_type} is not supported by drop_recreate." + ) + continue + + source_path = source_field.get("path") + target_path = target_field.get("path") + if source_path != target_path: + blocked_reasons.append( + f"Changing field '{field_update.name}' path from {source_path} to {target_path} is not supported by drop_recreate." + ) + continue + + if target_type == "vector" and source_field != target_field: + # Check for document-dependent changes that are not yet supported + vector_blocked = self._classify_vector_field_change( + source_field, target_field + ) + blocked_reasons.extend(vector_blocked) + + # Detect possible undeclared field renames. When explicit renames + # exist, exclude those fields from heuristic detection so we still + # catch additional add/remove pairs that look like renames. + detect_source = dict(source_fields) + detect_target = dict(target_fields) + if has_field_renames and rename_operations: + for fr in rename_operations.rename_fields: + detect_source.pop(fr.old_name, None) + detect_target.pop(fr.new_name, None) + blocked_reasons.extend( + self._detect_possible_field_renames(detect_source, detect_target) + ) + + return DiffClassification( + supported=len(blocked_reasons) == 0, + blocked_reasons=self._dedupe(blocked_reasons), + ) + + def write_plan(self, plan: MigrationPlan, plan_out: str) -> None: + plan_path = Path(plan_out).resolve() + with open(plan_path, "w") as f: + yaml.safe_dump(plan.model_dump(exclude_none=True), f, sort_keys=False) + + def _sample_keys( + self, *, client: Any, prefixes: List[str], key_separator: str + ) -> List[str]: + key_sample: List[str] = [] + if client is None or self.key_sample_limit <= 0: + return key_sample + + for prefix in prefixes: + if len(key_sample) >= self.key_sample_limit: + break + if prefix == "": + match_pattern = "*" + elif prefix.endswith(key_separator): + match_pattern = f"{prefix}*" + else: + match_pattern = f"{prefix}{key_separator}*" + cursor = 0 + while True: + cursor, keys = client.scan( + cursor=cursor, + match=match_pattern, + count=max(self.key_sample_limit, 10), + ) + for key in keys: + decoded_key = key.decode() if isinstance(key, bytes) else str(key) + if decoded_key not in key_sample: + key_sample.append(decoded_key) + if len(key_sample) >= self.key_sample_limit: + return key_sample + if cursor == 0: + break + return key_sample + + def _detect_possible_field_renames( + self, + source_fields: Dict[str, Dict[str, Any]], + target_fields: Dict[str, Dict[str, Any]], + ) -> List[str]: + blocked_reasons: List[str] = [] + added_fields = [ + field for name, field in target_fields.items() if name not in source_fields + ] + removed_fields = [ + field for name, field in source_fields.items() if name not in target_fields + ] + + for removed_field in removed_fields: + for added_field in added_fields: + if self._fields_match_except_name(removed_field, added_field): + blocked_reasons.append( + f"Possible field rename from '{removed_field['name']}' to '{added_field['name']}' is not supported by drop_recreate." + ) + return blocked_reasons + + @staticmethod + def _classify_vector_field_change( + source_field: Dict[str, Any], target_field: Dict[str, Any] + ) -> List[str]: + """Classify vector field changes as supported or blocked for drop_recreate. + + Index-only changes (allowed with drop_recreate): + - algorithm (FLAT -> HNSW -> SVS-VAMANA) + - distance_metric (COSINE, L2, IP) + - initial_cap + - Algorithm tuning: m, ef_construction, ef_runtime, epsilon, block_size, + graph_max_degree, construction_window_size, search_window_size, etc. + + Quantization changes (allowed with drop_recreate, requires vector re-encoding): + - datatype (float32 -> float16, etc.) - executor will re-encode vectors + + Document-dependent changes (blocked, not yet supported): + - dims (vectors stored with wrong number of dimensions) + """ + blocked_reasons: List[str] = [] + field_name = source_field.get("name", "unknown") + source_attrs = source_field.get("attrs", {}) + target_attrs = target_field.get("attrs", {}) + + # Document-dependent properties (not yet supported) + if source_attrs.get("dims") != target_attrs.get("dims"): + blocked_reasons.append( + f"Changing vector field '{field_name}' dims from {source_attrs.get('dims')} " + f"to {target_attrs.get('dims')} requires document migration (not yet supported). " + "Vectors are stored with incompatible dimensions." + ) + + # Datatype changes are now ALLOWED - executor will re-encode vectors + # before recreating the index + + # All other vector changes are index-only and allowed + return blocked_reasons + + @staticmethod + def get_vector_datatype_changes( + source_schema: Dict[str, Any], + target_schema: Dict[str, Any], + rename_operations: Optional[Any] = None, + ) -> Dict[str, Dict[str, Any]]: + """Identify vector fields that need datatype conversion (quantization). + + Handles renamed vector fields by using rename_operations to map + source field names to their target counterparts. + + Returns: + Dict mapping source_field_name -> { + "source": source_dtype, + "target": target_dtype, + "dims": int # vector dimensions for idempotent detection + } + """ + changes: Dict[str, Dict[str, Any]] = {} + source_fields = {f["name"]: f for f in source_schema.get("fields", [])} + target_fields = {f["name"]: f for f in target_schema.get("fields", [])} + + # Build rename map: source_name -> target_name + field_rename_map: Dict[str, str] = {} + if rename_operations and hasattr(rename_operations, "rename_fields"): + for fr in rename_operations.rename_fields: + field_rename_map[fr.old_name] = fr.new_name + + for name, source_field in source_fields.items(): + if source_field.get("type") != "vector": + continue + # Look up target by renamed name if applicable + target_name = field_rename_map.get(name, name) + target_field = target_fields.get(target_name) + if not target_field or target_field.get("type") != "vector": + continue + + source_dtype = source_field.get("attrs", {}).get("datatype", "float32") + target_dtype = target_field.get("attrs", {}).get("datatype", "float32") + dims = source_field.get("attrs", {}).get("dims", 0) + + if source_dtype != target_dtype: + changes[name] = { + "source": source_dtype, + "target": target_dtype, + "dims": dims, + } + + return changes + + @staticmethod + def _fields_match_except_name( + source_field: Dict[str, Any], target_field: Dict[str, Any] + ) -> bool: + comparable_source = {k: v for k, v in source_field.items() if k != "name"} + comparable_target = {k: v for k, v in target_field.items() if k != "name"} + return comparable_source == comparable_target + + @staticmethod + def _dedupe(values: List[str]) -> List[str]: + deduped: List[str] = [] + for value in values: + if value not in deduped: + deduped.append(value) + return deduped diff --git a/redisvl/migration/utils.py b/redisvl/migration/utils.py new file mode 100644 index 000000000..da339f4c4 --- /dev/null +++ b/redisvl/migration/utils.py @@ -0,0 +1,466 @@ +from __future__ import annotations + +import json +import time +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple + +import yaml + +from redisvl.utils.log import get_logger + +logger = get_logger(__name__) + +from redisvl.index import SearchIndex +from redisvl.migration.models import ( + AOF_HSET_OVERHEAD_BYTES, + AOF_JSON_SET_OVERHEAD_BYTES, + DTYPE_BYTES, + RDB_COMPRESSION_RATIO, + DiskSpaceEstimate, + MigrationPlan, + MigrationReport, + VectorFieldEstimate, +) +from redisvl.redis.connection import RedisConnectionFactory +from redisvl.schema.schema import IndexSchema + + +def list_indexes( + *, redis_url: Optional[str] = None, redis_client: Optional[Any] = None +): + if redis_client is None: + if not redis_url: + raise ValueError("Must provide either redis_url or redis_client") + redis_client = RedisConnectionFactory.get_redis_connection(redis_url=redis_url) + index = SearchIndex.from_dict( + {"index": {"name": "__redisvl_migration_helper__"}, "fields": []}, + redis_client=redis_client, + ) + return index.listall() + + +def load_yaml(path: str) -> Dict[str, Any]: + resolved = Path(path).resolve() + with open(resolved, "r") as f: + return yaml.safe_load(f) or {} + + +def write_yaml(data: Dict[str, Any], path: str) -> None: + resolved = Path(path).resolve() + with open(resolved, "w") as f: + yaml.safe_dump(data, f, sort_keys=False) + + +def load_migration_plan(path: str) -> MigrationPlan: + return MigrationPlan.model_validate(load_yaml(path)) + + +def write_migration_report(report: MigrationReport, path: str) -> None: + write_yaml(report.model_dump(exclude_none=True), path) + + +def write_benchmark_report(report: MigrationReport, path: str) -> None: + benchmark_report = { + "version": report.version, + "mode": report.mode, + "source_index": report.source_index, + "target_index": report.target_index, + "result": report.result, + "timings": report.timings.model_dump(exclude_none=True), + "benchmark_summary": report.benchmark_summary.model_dump(exclude_none=True), + "validation": { + "schema_match": report.validation.schema_match, + "doc_count_match": report.validation.doc_count_match, + "indexing_failures_delta": report.validation.indexing_failures_delta, + "key_sample_exists": report.validation.key_sample_exists, + }, + } + write_yaml(benchmark_report, path) + + +def normalize_keys(keys: List[str]) -> List[str]: + """Deduplicate and sort keys for deterministic resume behavior.""" + return sorted(set(keys)) + + +def build_scan_match_patterns(prefixes: List[str], key_separator: str) -> List[str]: + """Build SCAN patterns for all configured prefixes.""" + if not prefixes: + logger.warning( + "No prefixes provided for SCAN pattern. " + "Using '*' which will scan the entire keyspace." + ) + return ["*"] + + patterns = set() + for prefix in prefixes: + if not prefix: + logger.warning( + "Empty prefix in prefix list. " + "Using '*' which will scan the entire keyspace." + ) + return ["*"] + if key_separator and not prefix.endswith(key_separator): + patterns.add(f"{prefix}{key_separator}*") + else: + patterns.add(f"{prefix}*") + return sorted(patterns) + + +def detect_aof_enabled(client: Any) -> bool: + """Best-effort detection of whether AOF is enabled on the live Redis.""" + try: + info = client.info("persistence") + if isinstance(info, dict) and "aof_enabled" in info: + return bool(int(info["aof_enabled"])) + except Exception: + pass + + try: + config = client.config_get("appendonly") + if isinstance(config, dict): + value = config.get("appendonly") + if value is not None: + return str(value).lower() in {"yes", "1", "true", "on"} + except Exception: + pass + + return False + + +def get_schema_field_path(schema: Dict[str, Any], field_name: str) -> Optional[str]: + """Return the JSON path configured for a field, if present.""" + for field in schema.get("fields", []): + if field.get("name") != field_name: + continue + path = field.get("path") + if path is None: + path = field.get("attrs", {}).get("path") + return str(path) if path is not None else None + return None + + +# Attributes excluded from schema validation comparison. +# These are query-time or creation-hint parameters that FT.INFO does not return +# and are not relevant for index structure validation (confirmed by RediSearch team). +# - ef_runtime, epsilon: query-time tuning knobs, not index definition attributes +# - initial_cap: creation-time memory pre-allocation hint, diverges after indexing +EXCLUDED_VECTOR_ATTRS = {"ef_runtime", "epsilon", "initial_cap"} +# phonetic_matcher: the matcher string (e.g. "dm:en") is not stored server-side, +# only a boolean flag is kept, so it cannot be read back. +# withsuffixtrie: returned as a flag in FT.INFO but not as a KV attribute, +# so RedisVL's parser does not capture it yet. +EXCLUDED_TEXT_ATTRS = {"phonetic_matcher", "withsuffixtrie"} +EXCLUDED_TAG_ATTRS = {"withsuffixtrie"} + + +def _strip_excluded_attrs(field: Dict[str, Any]) -> Dict[str, Any]: + """Remove attributes not relevant for index validation comparison. + + These are either query-time parameters, creation-time hints, or attributes + whose server-side representation differs from the schema definition. + + Also normalizes attributes that have implicit behavior: + - For NUMERIC + SORTABLE, Redis auto-applies UNF, so we normalize to unf=True + """ + field = field.copy() + attrs = field.get("attrs", {}) + if not attrs: + return field + + attrs = attrs.copy() + field_type = field.get("type", "").lower() + + if field_type == "vector": + for attr in EXCLUDED_VECTOR_ATTRS: + attrs.pop(attr, None) + elif field_type == "text": + for attr in EXCLUDED_TEXT_ATTRS: + attrs.pop(attr, None) + # Normalize weight to int for comparison (FT.INFO may return float) + if "weight" in attrs and isinstance(attrs["weight"], float): + if attrs["weight"] == int(attrs["weight"]): + attrs["weight"] = int(attrs["weight"]) + elif field_type == "tag": + for attr in EXCLUDED_TAG_ATTRS: + attrs.pop(attr, None) + elif field_type == "numeric": + # Redis auto-applies UNF when SORTABLE is set on NUMERIC fields. + # Normalize unf to True when sortable is True to avoid false mismatches. + if attrs.get("sortable"): + attrs["unf"] = True + + field["attrs"] = attrs + return field + + +def canonicalize_schema( + schema_dict: Dict[str, Any], + *, + strip_unreliable: bool = False, + strip_excluded: bool = False, +) -> Dict[str, Any]: + """Canonicalize schema for comparison. + + Args: + schema_dict: The schema dictionary to canonicalize. + strip_unreliable: Deprecated alias for strip_excluded. Kept for + backward compatibility. + strip_excluded: If True, remove query-time and creation-hint attributes + that are not part of index structure validation. + """ + schema = IndexSchema.from_dict(schema_dict).to_dict() + + should_strip = strip_excluded or strip_unreliable + fields = schema.get("fields", []) + if should_strip: + fields = [_strip_excluded_attrs(f) for f in fields] + + schema["fields"] = sorted(fields, key=lambda field: field["name"]) + prefixes = schema["index"].get("prefix") + if isinstance(prefixes, list): + schema["index"]["prefix"] = sorted(prefixes) + stopwords = schema["index"].get("stopwords") + if isinstance(stopwords, list): + schema["index"]["stopwords"] = sorted(stopwords) + return schema + + +def schemas_equal( + left: Dict[str, Any], + right: Dict[str, Any], + *, + strip_unreliable: bool = False, + strip_excluded: bool = False, +) -> bool: + """Compare two schemas for equality. + + Args: + left: First schema dictionary. + right: Second schema dictionary. + strip_unreliable: Deprecated alias for strip_excluded. Kept for + backward compatibility. + strip_excluded: If True, exclude query-time and creation-hint attributes + (ef_runtime, epsilon, initial_cap, phonetic_matcher) from comparison. + """ + should_strip = strip_excluded or strip_unreliable + return json.dumps( + canonicalize_schema(left, strip_excluded=should_strip), sort_keys=True + ) == json.dumps( + canonicalize_schema(right, strip_excluded=should_strip), sort_keys=True + ) + + +def wait_for_index_ready( + index: SearchIndex, + *, + timeout_seconds: int = 1800, + poll_interval_seconds: float = 0.5, + progress_callback: Optional[Callable[[int, int, float], None]] = None, +) -> Tuple[Dict[str, Any], float]: + """Wait for index to finish indexing all documents. + + Args: + index: The SearchIndex to monitor. + timeout_seconds: Maximum time to wait. + poll_interval_seconds: How often to check status. + progress_callback: Optional callback(indexed_docs, total_docs, percent). + """ + start = time.perf_counter() + deadline = start + timeout_seconds + latest_info = index.info() + + stable_ready_checks: Optional[int] = None + while time.perf_counter() < deadline: + ready = False + latest_info = index.info() + indexing = latest_info.get("indexing") + percent_indexed = latest_info.get("percent_indexed") + + if percent_indexed is not None or indexing is not None: + pct = float(percent_indexed) if percent_indexed is not None else None + is_indexing = bool(indexing) + if pct is not None: + ready = pct >= 1.0 and not is_indexing + else: + # percent_indexed missing but indexing flag present: + # treat as ready when indexing flag is falsy (0 / False). + ready = not is_indexing + if progress_callback: + total_docs = int(latest_info.get("num_docs", 0)) + display_pct = pct if pct is not None else (1.0 if ready else 0.0) + indexed_docs = int(total_docs * display_pct) + progress_callback(indexed_docs, total_docs, display_pct * 100) + else: + current_docs = latest_info.get("num_docs") + if current_docs is None: + ready = True + else: + if stable_ready_checks is None: + stable_ready_checks = int(current_docs) + time.sleep(poll_interval_seconds) + continue + current = int(current_docs) + if current == stable_ready_checks: + ready = True + else: + # num_docs changed; update baseline and keep waiting + stable_ready_checks = current + + if ready: + return latest_info, round(time.perf_counter() - start, 3) + + time.sleep(poll_interval_seconds) + + raise TimeoutError( + f"Index {index.schema.index.name} did not become ready within {timeout_seconds} seconds" + ) + + +def current_source_matches_snapshot( + index_name: str, + expected_schema: Dict[str, Any], + *, + redis_url: Optional[str] = None, + redis_client: Optional[Any] = None, +) -> bool: + try: + current_index = SearchIndex.from_existing( + index_name, + redis_url=redis_url, + redis_client=redis_client, + ) + except Exception: + # Index no longer exists (e.g. already dropped during migration) + return False + return schemas_equal(current_index.schema.to_dict(), expected_schema) + + +def timestamp_utc() -> str: + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + + +def estimate_disk_space( + plan: MigrationPlan, + *, + aof_enabled: bool = False, +) -> DiskSpaceEstimate: + """Estimate disk space required for a migration with quantization. + + This is a pure calculation based on the migration plan. No Redis + operations are performed. + + Args: + plan: The migration plan containing source/target schemas. + aof_enabled: Whether AOF persistence is active on the Redis instance. + + Returns: + DiskSpaceEstimate with projected costs. + """ + doc_count = int(plan.source.stats_snapshot.get("num_docs", 0) or 0) + storage_type = plan.source.keyspace.storage_type + index_name = plan.source.index_name + + # Find vector fields with datatype changes + source_fields = { + f["name"]: f for f in plan.source.schema_snapshot.get("fields", []) + } + target_fields = {f["name"]: f for f in plan.merged_target_schema.get("fields", [])} + + # Build rename map: source_name -> target_name + field_rename_map: Dict[str, str] = {} + rename_ops = plan.rename_operations + if rename_ops and rename_ops.rename_fields: + for fr in rename_ops.rename_fields: + field_rename_map[fr.old_name] = fr.new_name + + vector_field_estimates: list[VectorFieldEstimate] = [] + total_source_bytes = 0 + total_target_bytes = 0 + total_aof_growth = 0 + + aof_overhead = ( + AOF_JSON_SET_OVERHEAD_BYTES + if storage_type == "json" + else AOF_HSET_OVERHEAD_BYTES + ) + + for name, source_field in source_fields.items(): + if source_field.get("type") != "vector": + continue + # Look up target by renamed name if applicable + target_name = field_rename_map.get(name, name) + target_field = target_fields.get(target_name) + if not target_field or target_field.get("type") != "vector": + continue + + source_attrs = source_field.get("attrs", {}) + target_attrs = target_field.get("attrs", {}) + source_dtype = source_attrs.get("datatype", "float32").lower() + target_dtype = target_attrs.get("datatype", "float32").lower() + + if source_dtype == target_dtype: + continue + + if source_dtype not in DTYPE_BYTES: + raise ValueError( + f"Unknown source vector datatype '{source_dtype}' for field '{name}'. " + f"Supported datatypes: {', '.join(sorted(DTYPE_BYTES.keys()))}" + ) + if target_dtype not in DTYPE_BYTES: + raise ValueError( + f"Unknown target vector datatype '{target_dtype}' for field '{name}'. " + f"Supported datatypes: {', '.join(sorted(DTYPE_BYTES.keys()))}" + ) + + if storage_type == "json": + # JSON-backed migrations do not rewrite per-document vector payloads + # during apply(); they rely on recreate + re-index instead. + continue + + dims = int(source_attrs.get("dims", 0)) + source_bpe = DTYPE_BYTES[source_dtype] + target_bpe = DTYPE_BYTES[target_dtype] + + source_vec_size = dims * source_bpe + target_vec_size = dims * target_bpe + + vector_field_estimates.append( + VectorFieldEstimate( + field_name=name, + dims=dims, + source_dtype=source_dtype, + target_dtype=target_dtype, + source_bytes_per_doc=source_vec_size, + target_bytes_per_doc=target_vec_size, + ) + ) + + field_source_total = doc_count * source_vec_size + field_target_total = doc_count * target_vec_size + total_source_bytes += field_source_total + total_target_bytes += field_target_total + + if aof_enabled: + total_aof_growth += doc_count * (target_vec_size + aof_overhead) + + rdb_snapshot_disk = int(total_source_bytes * RDB_COMPRESSION_RATIO) + rdb_cow_memory = total_source_bytes + total_new_disk = rdb_snapshot_disk + total_aof_growth + memory_savings = total_source_bytes - total_target_bytes + + return DiskSpaceEstimate( + index_name=index_name, + doc_count=doc_count, + storage_type=storage_type, + vector_fields=vector_field_estimates, + total_source_vector_bytes=total_source_bytes, + total_target_vector_bytes=total_target_bytes, + rdb_snapshot_disk_bytes=rdb_snapshot_disk, + rdb_cow_memory_if_concurrent_bytes=rdb_cow_memory, + aof_enabled=aof_enabled, + aof_growth_bytes=total_aof_growth, + total_new_disk_bytes=total_new_disk, + memory_savings_after_bytes=memory_savings, + ) diff --git a/redisvl/migration/validation.py b/redisvl/migration/validation.py new file mode 100644 index 000000000..99b6911cc --- /dev/null +++ b/redisvl/migration/validation.py @@ -0,0 +1,179 @@ +from __future__ import annotations + +import time +from typing import Any, Dict, List, Optional + +from redis.commands.search.query import Query + +from redisvl.index import SearchIndex +from redisvl.migration.models import ( + MigrationPlan, + MigrationValidation, + QueryCheckResult, +) +from redisvl.migration.utils import load_yaml, schemas_equal + + +class MigrationValidator: + def validate( + self, + plan: MigrationPlan, + *, + redis_url: Optional[str] = None, + redis_client: Optional[Any] = None, + query_check_file: Optional[str] = None, + ) -> tuple[MigrationValidation, Dict[str, Any], float]: + started = time.perf_counter() + target_index = SearchIndex.from_existing( + plan.merged_target_schema["index"]["name"], + redis_url=redis_url, + redis_client=redis_client, + ) + target_info = target_index.info() + validation = MigrationValidation() + + live_schema = target_index.schema.to_dict() + # Exclude query-time and creation-hint attributes (ef_runtime, epsilon, + # initial_cap, phonetic_matcher) that are not part of index structure + # validation. Confirmed by RediSearch team as not relevant for this check. + validation.schema_match = schemas_equal( + live_schema, plan.merged_target_schema, strip_excluded=True + ) + + source_num_docs = int(plan.source.stats_snapshot.get("num_docs", 0) or 0) + target_num_docs = int(target_info.get("num_docs", 0) or 0) + validation.doc_count_match = source_num_docs == target_num_docs + + source_failures = int( + plan.source.stats_snapshot.get("hash_indexing_failures", 0) or 0 + ) + target_failures = int(target_info.get("hash_indexing_failures", 0) or 0) + validation.indexing_failures_delta = target_failures - source_failures + + key_sample = plan.source.keyspace.key_sample + if not key_sample: + validation.key_sample_exists = True + else: + # Handle prefix change: transform key_sample to use new prefix. + # Must match the executor's RENAME logic exactly: + # new_key = new_prefix + key[len(old_prefix):] + keys_to_check = key_sample + if plan.rename_operations.change_prefix is not None: + old_prefix = plan.source.keyspace.prefixes[0] + new_prefix = plan.rename_operations.change_prefix + key_sep = plan.source.keyspace.key_separator + # Normalize prefixes: strip trailing separator for consistent slicing + old_base = old_prefix.rstrip(key_sep) if old_prefix else "" + new_base = new_prefix.rstrip(key_sep) if new_prefix else "" + keys_to_check = [] + for k in key_sample: + if old_base and k.startswith(old_base): + keys_to_check.append(new_base + k[len(old_base) :]) + else: + keys_to_check.append(k) + existing_count = target_index.client.exists(*keys_to_check) + validation.key_sample_exists = existing_count == len(keys_to_check) + + # Run automatic functional checks (always) + functional_checks = self._run_functional_checks(target_index, source_num_docs) + validation.query_checks.extend(functional_checks) + + # Run user-provided query checks (if file provided) + if query_check_file: + user_checks = self._run_query_checks(target_index, query_check_file) + validation.query_checks.extend(user_checks) + + if not validation.schema_match and plan.validation.require_schema_match: + validation.errors.append("Live schema does not match merged_target_schema.") + if not validation.doc_count_match and plan.validation.require_doc_count_match: + validation.errors.append( + "Live document count does not match source num_docs." + ) + if validation.indexing_failures_delta > 0: + validation.errors.append("Indexing failures increased during migration.") + if not validation.key_sample_exists: + validation.errors.append( + "One or more sampled source keys is missing after migration." + ) + if any(not query_check.passed for query_check in validation.query_checks): + validation.errors.append("One or more query checks failed.") + + return validation, target_info, round(time.perf_counter() - started, 3) + + def _run_query_checks( + self, + target_index: SearchIndex, + query_check_file: str, + ) -> list[QueryCheckResult]: + query_checks = load_yaml(query_check_file) + results: list[QueryCheckResult] = [] + + for doc_id in query_checks.get("fetch_ids", []): + fetched = target_index.fetch(doc_id) + results.append( + QueryCheckResult( + name=f"fetch:{doc_id}", + passed=fetched is not None, + details=( + "Document fetched successfully" + if fetched is not None + else "Document not found" + ), + ) + ) + + for key in query_checks.get("keys_exist", []): + client = target_index.client + if client is None: + raise ValueError("Redis client not connected") + exists = bool(client.exists(key)) + results.append( + QueryCheckResult( + name=f"key:{key}", + passed=exists, + details="Key exists" if exists else "Key not found", + ) + ) + + return results + + def _run_functional_checks( + self, target_index: SearchIndex, expected_doc_count: int + ) -> List[QueryCheckResult]: + """Run automatic functional checks to verify the index is operational. + + These checks run automatically after every migration to prove the index + actually works, not just that the schema looks correct. + """ + results: List[QueryCheckResult] = [] + + # Check 1: Wildcard search - proves the index responds and returns docs + try: + search_result = target_index.search(Query("*").paging(0, 1)) + total_found = search_result.total + # When expected_doc_count is 0 (empty index), a successful + # search returning 0 docs is correct behaviour, not a failure. + if expected_doc_count == 0: + passed = total_found == 0 + else: + passed = total_found > 0 + results.append( + QueryCheckResult( + name="functional:wildcard_search", + passed=passed, + details=( + f"Wildcard search returned {total_found} docs " + f"(expected {expected_doc_count})" + ), + ) + ) + except Exception as e: + results.append( + QueryCheckResult( + name="functional:wildcard_search", + passed=False, + details=f"Wildcard search failed: {str(e)}", + ) + ) + + return results diff --git a/redisvl/redis/connection.py b/redisvl/redis/connection.py index 6c435c547..cf5a38193 100644 --- a/redisvl/redis/connection.py +++ b/redisvl/redis/connection.py @@ -304,6 +304,19 @@ def parse_vector_attrs(attrs): # Default to float32 if missing normalized["datatype"] = "float32" + # Handle HNSW-specific parameters + if "m" in vector_attrs: + try: + normalized["m"] = int(vector_attrs["m"]) + except (ValueError, TypeError): + pass + + if "ef_construction" in vector_attrs: + try: + normalized["ef_construction"] = int(vector_attrs["ef_construction"]) + except (ValueError, TypeError): + pass + # Handle SVS-VAMANA specific parameters # Compression - Redis uses different internal names, so we need to map them if "compression" in vector_attrs: diff --git a/tests/integration/test_field_modifier_ordering_integration.py b/tests/integration/test_field_modifier_ordering_integration.py index b26463df0..b9d609674 100644 --- a/tests/integration/test_field_modifier_ordering_integration.py +++ b/tests/integration/test_field_modifier_ordering_integration.py @@ -399,6 +399,241 @@ def test_indexmissing_enables_ismissing_query(self, client, redis_url, worker_id index.delete(drop=True) +class TestIndexEmptyIntegration: + """Integration tests for INDEXEMPTY functionality.""" + + def test_text_field_index_empty_creates_successfully( + self, client, redis_url, worker_id + ): + """Test that INDEXEMPTY on text field allows index creation.""" + skip_if_search_version_below_for_indexmissing(client) + schema_dict = { + "index": { + "name": f"test_text_empty_{worker_id}", + "prefix": f"textempty_{worker_id}:", + "storage_type": "hash", + }, + "fields": [ + { + "name": "description", + "type": "text", + "attrs": {"index_empty": True}, + } + ], + } + + schema = IndexSchema.from_dict(schema_dict) + index = SearchIndex(schema=schema, redis_url=redis_url) + index.create(overwrite=True) + + # Verify index was created + info = client.execute_command("FT.INFO", f"test_text_empty_{worker_id}") + assert info is not None + + # Create documents with empty and non-empty values + client.hset(f"textempty_{worker_id}:1", "description", "has content") + client.hset(f"textempty_{worker_id}:2", "description", "") + client.hset(f"textempty_{worker_id}:3", "description", "more content") + + # Search should work, empty string doc should be indexed + result = client.execute_command( + "FT.SEARCH", + f"test_text_empty_{worker_id}", + "*", + ) + # All 3 docs should be found + assert result[0] == 3 + + # Cleanup + client.delete( + f"textempty_{worker_id}:1", + f"textempty_{worker_id}:2", + f"textempty_{worker_id}:3", + ) + index.delete(drop=True) + + def test_tag_field_index_empty_creates_successfully( + self, client, redis_url, worker_id + ): + """Test that INDEXEMPTY on tag field allows index creation.""" + skip_if_search_version_below_for_indexmissing(client) + schema_dict = { + "index": { + "name": f"test_tag_empty_{worker_id}", + "prefix": f"tagempty_{worker_id}:", + "storage_type": "hash", + }, + "fields": [ + { + "name": "category", + "type": "tag", + "attrs": {"index_empty": True}, + } + ], + } + + schema = IndexSchema.from_dict(schema_dict) + index = SearchIndex(schema=schema, redis_url=redis_url) + index.create(overwrite=True) + + # Verify index was created + info = client.execute_command("FT.INFO", f"test_tag_empty_{worker_id}") + assert info is not None + + # Create documents with empty and non-empty values + client.hset(f"tagempty_{worker_id}:1", "category", "electronics") + client.hset(f"tagempty_{worker_id}:2", "category", "") + client.hset(f"tagempty_{worker_id}:3", "category", "books") + + # Search should work + result = client.execute_command( + "FT.SEARCH", + f"test_tag_empty_{worker_id}", + "*", + ) + # All 3 docs should be found + assert result[0] == 3 + + # Cleanup + client.delete( + f"tagempty_{worker_id}:1", + f"tagempty_{worker_id}:2", + f"tagempty_{worker_id}:3", + ) + index.delete(drop=True) + + +class TestUnfModifierIntegration: + """Integration tests for UNF (un-normalized form) modifier.""" + + def test_text_field_unf_requires_sortable(self, client, redis_url, worker_id): + """Test that UNF on text field works only when sortable is also True.""" + skip_if_search_version_below_for_indexmissing(client) + schema_dict = { + "index": { + "name": f"test_text_unf_{worker_id}", + "prefix": f"textunf_{worker_id}:", + "storage_type": "hash", + }, + "fields": [ + { + "name": "title", + "type": "text", + "attrs": {"sortable": True, "unf": True}, + } + ], + } + + schema = IndexSchema.from_dict(schema_dict) + index = SearchIndex(schema=schema, redis_url=redis_url) + + # Should create successfully + index.create(overwrite=True) + + info = client.execute_command("FT.INFO", f"test_text_unf_{worker_id}") + assert info is not None + + index.delete(drop=True) + + def test_numeric_field_unf_with_sortable(self, client, redis_url, worker_id): + """Test that UNF on numeric field works when sortable is True.""" + skip_if_search_version_below_for_indexmissing(client) + schema_dict = { + "index": { + "name": f"test_num_unf_{worker_id}", + "prefix": f"numunf_{worker_id}:", + "storage_type": "hash", + }, + "fields": [ + { + "name": "price", + "type": "numeric", + "attrs": {"sortable": True, "unf": True}, + } + ], + } + + schema = IndexSchema.from_dict(schema_dict) + index = SearchIndex(schema=schema, redis_url=redis_url) + + # Should create successfully + index.create(overwrite=True) + + info = client.execute_command("FT.INFO", f"test_num_unf_{worker_id}") + assert info is not None + + index.delete(drop=True) + + +class TestNoIndexModifierIntegration: + """Integration tests for NOINDEX modifier.""" + + def test_noindex_with_sortable_allows_sorting_not_searching( + self, client, redis_url, worker_id + ): + """Test that NOINDEX field can be sorted but not searched.""" + schema_dict = { + "index": { + "name": f"test_noindex_{worker_id}", + "prefix": f"noindex_{worker_id}:", + "storage_type": "hash", + }, + "fields": [ + { + "name": "searchable", + "type": "text", + }, + { + "name": "sort_only", + "type": "numeric", + "attrs": {"sortable": True, "no_index": True}, + }, + ], + } + + schema = IndexSchema.from_dict(schema_dict) + index = SearchIndex(schema=schema, redis_url=redis_url) + index.create(overwrite=True) + + # Add test documents + client.hset( + f"noindex_{worker_id}:1", mapping={"searchable": "hello", "sort_only": 10} + ) + client.hset( + f"noindex_{worker_id}:2", mapping={"searchable": "world", "sort_only": 5} + ) + client.hset( + f"noindex_{worker_id}:3", mapping={"searchable": "test", "sort_only": 15} + ) + + # Sorting by no_index field should work + result = client.execute_command( + "FT.SEARCH", + f"test_noindex_{worker_id}", + "*", + "SORTBY", + "sort_only", + "ASC", + ) + assert result[0] == 3 + + # Filtering by NOINDEX field should return no results + filter_result = client.execute_command( + "FT.SEARCH", + f"test_noindex_{worker_id}", + "@sort_only:[5 10]", + ) + assert filter_result[0] == 0 + + # Cleanup + client.delete( + f"noindex_{worker_id}:1", + f"noindex_{worker_id}:2", + f"noindex_{worker_id}:3", + ) + index.delete(drop=True) + + class TestFieldTypeModifierSupport: """Test that field types only support their documented modifiers.""" diff --git a/tests/integration/test_migration_comprehensive.py b/tests/integration/test_migration_comprehensive.py new file mode 100644 index 000000000..c9d26e90f --- /dev/null +++ b/tests/integration/test_migration_comprehensive.py @@ -0,0 +1,1336 @@ +""" +Comprehensive integration tests for all 38 supported migration operations. + +This test suite validates migrations against real Redis with a tiered validation approach: +- L1: Execution (plan.supported == True) +- L2: Data Integrity (doc_count_match == True) +- L3: Key Existence (key_sample_exists == True) +- L4: Schema Match (schema_match == True) + +Test Categories: +1. Index-Level (2): rename index, change prefix +2. Field Add (4): text, tag, numeric, geo +3. Field Remove (5): text, tag, numeric, geo, vector +4. Field Rename (5): text, tag, numeric, geo, vector +5. Base Attrs (3): sortable, no_index, index_missing +6. Text Attrs (5): weight, no_stem, phonetic_matcher, index_empty, unf +7. Tag Attrs (3): separator, case_sensitive, index_empty +8. Numeric Attrs (1): unf +9. Vector Attrs (8): algorithm, distance_metric, initial_cap, m, ef_construction, + ef_runtime, epsilon, datatype +10. JSON Storage (2): add field, rename field + +Some tests use L2-only validation due to Redis FT.INFO limitations: +- prefix change (keys renamed), HNSW params, initial_cap, phonetic_matcher, numeric unf + +Run: pytest tests/integration/test_migration_comprehensive.py -v +Spec: nitin_docs/index_migrator/32_integration_test_spec.md +""" + +import uuid +from typing import Any, Dict, List + +import pytest +import yaml + +from redisvl.index import SearchIndex +from redisvl.migration import MigrationExecutor, MigrationPlanner +from redisvl.migration.utils import load_migration_plan, schemas_equal +from redisvl.redis.utils import array_to_buffer + +# ============================================================================== +# Fixtures +# ============================================================================== + + +@pytest.fixture +def unique_ids(worker_id): + """Generate unique identifiers for test isolation.""" + uid = str(uuid.uuid4())[:8] + return { + "name": f"mig_test_{worker_id}_{uid}", + "prefix": f"mig_test:{worker_id}:{uid}", + } + + +@pytest.fixture +def base_schema(unique_ids): + """Base schema with all field types for testing.""" + return { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "title", "type": "text"}, + {"name": "description", "type": "text"}, + {"name": "category", "type": "tag"}, + {"name": "price", "type": "numeric"}, + {"name": "location", "type": "geo"}, + { + "name": "embedding", + "type": "vector", + "attrs": { + "algorithm": "hnsw", + "dims": 4, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + } + + +@pytest.fixture +def sample_docs(): + """Sample documents with all field types.""" + return [ + { + "doc_id": "1", + "title": "Alpha Product", + "description": "First product description", + "category": "electronics", + "price": 99.99, + "location": "-122.4194,37.7749", # SF coordinates + "embedding": array_to_buffer([0.1, 0.2, 0.3, 0.4], "float32"), + }, + { + "doc_id": "2", + "title": "Beta Service", + "description": "Second service description", + "category": "software", + "price": 149.99, + "location": "-73.9857,40.7484", # NYC coordinates + "embedding": array_to_buffer([0.2, 0.3, 0.4, 0.5], "float32"), + }, + { + "doc_id": "3", + "title": "Gamma Item", + "description": "", # Empty for index_empty tests + "category": "", # Empty for index_empty tests + "price": 0, + "location": "-118.2437,34.0522", # LA coordinates + "embedding": array_to_buffer([0.3, 0.4, 0.5, 0.6], "float32"), + }, + ] + + +def run_migration( + redis_url: str, + tmp_path, + index_name: str, + patch: Dict[str, Any], +) -> Dict[str, Any]: + """Helper to run a migration and return results.""" + patch_path = tmp_path / "patch.yaml" + patch_path.write_text(yaml.safe_dump(patch, sort_keys=False)) + + plan_path = tmp_path / "plan.yaml" + planner = MigrationPlanner() + plan = planner.create_plan( + index_name, + redis_url=redis_url, + schema_patch_path=str(patch_path), + ) + planner.write_plan(plan, str(plan_path)) + + executor = MigrationExecutor() + report = executor.apply( + load_migration_plan(str(plan_path)), + redis_url=redis_url, + ) + + return { + "plan": plan, + "report": report, + "supported": plan.diff_classification.supported, + "succeeded": report.result == "succeeded", + # Additional validation fields for granular checks + "doc_count_match": report.validation.doc_count_match, + "schema_match": report.validation.schema_match, + "key_sample_exists": report.validation.key_sample_exists, + "validation_errors": report.validation.errors, + } + + +def setup_index(redis_url: str, schema: Dict, docs: List[Dict]) -> SearchIndex: + """Create index and load documents.""" + index = SearchIndex.from_dict(schema, redis_url=redis_url) + index.create(overwrite=True) + index.load(docs, id_field="doc_id") + return index + + +def cleanup_index(index: SearchIndex): + """Clean up index after test.""" + try: + index.delete(drop=True) + except Exception: + pass + + +# ============================================================================== +# 1. Index-Level Changes +# ============================================================================== + + +class TestIndexLevelChanges: + """Tests for index-level migration operations.""" + + def test_rename_index(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming an index.""" + index = setup_index(redis_url, base_schema, sample_docs) + old_name = base_schema["index"]["name"] + new_name = f"{old_name}_renamed" + + try: + result = run_migration( + redis_url, + tmp_path, + old_name, + {"version": 1, "changes": {"index": {"name": new_name}}}, + ) + + assert result["supported"], "Rename index should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + + # Verify new index exists + live_index = SearchIndex.from_existing(new_name, redis_url=redis_url) + assert live_index.schema.index.name == new_name + cleanup_index(live_index) + except Exception: + cleanup_index(index) + raise + + def test_change_prefix(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing the key prefix (requires key renames).""" + index = setup_index(redis_url, base_schema, sample_docs) + old_prefix = base_schema["index"]["prefix"] + new_prefix = f"{old_prefix}_newprefix" + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"index": {"prefix": new_prefix}}}, + ) + + assert result["supported"], "Change prefix should be supported" + # Validation now handles prefix change by transforming key_sample to new prefix + assert result["succeeded"], f"Migration failed: {result['report']}" + + # Verify keys were renamed + live_index = SearchIndex.from_existing( + base_schema["index"]["name"], redis_url=redis_url + ) + assert live_index.schema.index.prefix == new_prefix + cleanup_index(live_index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 2. Field Operations - Add Fields +# ============================================================================== + + +class TestAddFields: + """Tests for adding fields of different types.""" + + def test_add_text_field(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding a text field.""" + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [{"name": "doc_id", "type": "tag"}], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "add_fields": [{"name": "title", "type": "text"}], + }, + }, + ) + + assert result["supported"], "Add text field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_tag_field(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding a tag field.""" + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [{"name": "doc_id", "type": "tag"}], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "add_fields": [ + { + "name": "category", + "type": "tag", + "attrs": {"separator": ","}, + } + ], + }, + }, + ) + + assert result["supported"], "Add tag field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_numeric_field(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding a numeric field.""" + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [{"name": "doc_id", "type": "tag"}], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "add_fields": [{"name": "price", "type": "numeric"}], + }, + }, + ) + + assert result["supported"], "Add numeric field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_geo_field(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding a geo field.""" + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [{"name": "doc_id", "type": "tag"}], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "add_fields": [{"name": "location", "type": "geo"}], + }, + }, + ) + + assert result["supported"], "Add geo field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 2. Field Operations - Remove Fields +# ============================================================================== + + +class TestRemoveFields: + """Tests for removing fields of different types.""" + + def test_remove_text_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test removing a text field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"remove_fields": ["description"]}}, + ) + + assert result["supported"], "Remove text field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_remove_tag_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test removing a tag field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"remove_fields": ["category"]}}, + ) + + assert result["supported"], "Remove tag field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_remove_numeric_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test removing a numeric field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"remove_fields": ["price"]}}, + ) + + assert result["supported"], "Remove numeric field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_remove_geo_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test removing a geo field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"remove_fields": ["location"]}}, + ) + + assert result["supported"], "Remove geo field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_remove_vector_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test removing a vector field (allowed but warned).""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"remove_fields": ["embedding"]}}, + ) + + assert result["supported"], "Remove vector field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 2. Field Operations - Rename Fields +# ============================================================================== + + +class TestRenameFields: + """Tests for renaming fields of different types.""" + + def test_rename_text_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming a text field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "rename_fields": [ + {"old_name": "title", "new_name": "headline"} + ], + }, + }, + ) + + assert result["supported"], "Rename text field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_rename_tag_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming a tag field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "rename_fields": [{"old_name": "category", "new_name": "tags"}], + }, + }, + ) + + assert result["supported"], "Rename tag field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_rename_numeric_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming a numeric field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "rename_fields": [{"old_name": "price", "new_name": "cost"}], + }, + }, + ) + + assert result["supported"], "Rename numeric field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_rename_geo_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming a geo field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "rename_fields": [ + {"old_name": "location", "new_name": "coordinates"} + ], + }, + }, + ) + + assert result["supported"], "Rename geo field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_rename_vector_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming a vector field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "rename_fields": [ + {"old_name": "embedding", "new_name": "vector"} + ], + }, + }, + ) + + assert result["supported"], "Rename vector field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 3. Base Attributes (All Non-Vector Types) +# ============================================================================== + + +class TestBaseAttributes: + """Tests for base attributes: sortable, no_index, index_missing.""" + + def test_add_sortable(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding sortable attribute to a field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"sortable": True}} + ], + }, + }, + ) + + assert result["supported"], "Add sortable should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_no_index(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding no_index attribute (store only, no searching).""" + # Need a sortable field first + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "title", "type": "text", "attrs": {"sortable": True}}, + ], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"no_index": True}} + ], + }, + }, + ) + + assert result["supported"], "Add no_index should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_index_missing(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding index_missing attribute.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"index_missing": True}} + ], + }, + }, + ) + + assert result["supported"], "Add index_missing should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 4. Text Field Attributes +# ============================================================================== + + +class TestTextAttributes: + """Tests for text field attributes: weight, no_stem, phonetic_matcher, etc.""" + + def test_change_weight(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing text field weight.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [{"name": "title", "attrs": {"weight": 2.0}}], + }, + }, + ) + + assert result["supported"], "Change weight should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_no_stem(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding no_stem attribute.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"no_stem": True}} + ], + }, + }, + ) + + assert result["supported"], "Add no_stem should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_phonetic_matcher(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding phonetic_matcher attribute.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"phonetic_matcher": "dm:en"}} + ], + }, + }, + ) + + assert result["supported"], "Add phonetic_matcher should be supported" + # phonetic_matcher is stripped from schema comparison (FT.INFO doesn't return it) + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_index_empty_text(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding index_empty to text field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"index_empty": True}} + ], + }, + }, + ) + + assert result["supported"], "Add index_empty should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_unf_text(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding unf (un-normalized form) to text field.""" + # UNF requires sortable + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "title", "type": "text", "attrs": {"sortable": True}}, + ], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "update_fields": [{"name": "title", "attrs": {"unf": True}}], + }, + }, + ) + + assert result["supported"], "Add UNF should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 5. Tag Field Attributes +# ============================================================================== + + +class TestTagAttributes: + """Tests for tag field attributes: separator, case_sensitive, withsuffixtrie, etc.""" + + def test_change_separator(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing tag separator.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "category", "attrs": {"separator": "|"}} + ], + }, + }, + ) + + assert result["supported"], "Change separator should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_case_sensitive(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding case_sensitive attribute.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "category", "attrs": {"case_sensitive": True}} + ], + }, + }, + ) + + assert result["supported"], "Add case_sensitive should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_index_empty_tag(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding index_empty to tag field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "category", "attrs": {"index_empty": True}} + ], + }, + }, + ) + + assert result["supported"], "Add index_empty should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 6. Numeric Field Attributes +# ============================================================================== + + +class TestNumericAttributes: + """Tests for numeric field attributes: unf.""" + + def test_add_unf_numeric(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding unf (un-normalized form) to numeric field.""" + # UNF requires sortable + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "price", "type": "numeric", "attrs": {"sortable": True}}, + ], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "update_fields": [{"name": "price", "attrs": {"unf": True}}], + }, + }, + ) + + assert result["supported"], "Add UNF to numeric should be supported" + # Redis auto-applies UNF with SORTABLE on numeric fields, so both should match + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 7. Vector Field Attributes (Index-Only Changes) +# ============================================================================== + + +class TestVectorAttributes: + """Tests for vector field attributes: algorithm, distance_metric, HNSW params, etc.""" + + def test_change_algorithm_hnsw_to_flat( + self, redis_url, tmp_path, base_schema, sample_docs + ): + """Test changing vector algorithm from HNSW to FLAT.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"algorithm": "flat"}} + ], + }, + }, + ) + + assert result["supported"], "Change algorithm should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_distance_metric( + self, redis_url, tmp_path, base_schema, sample_docs + ): + """Test changing distance metric.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"distance_metric": "l2"}} + ], + }, + }, + ) + + assert result["supported"], "Change distance_metric should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_initial_cap(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing initial_cap.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"initial_cap": 1000}} + ], + }, + }, + ) + + assert result["supported"], "Change initial_cap should be supported" + # Redis may not return initial_cap accurately in FT.INFO. + # Check doc_count_match to confirm the migration executed successfully. + assert result[ + "doc_count_match" + ], f"Migration failed: {result['validation_errors']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_hnsw_m(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing HNSW m parameter.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [{"name": "embedding", "attrs": {"m": 32}}], + }, + }, + ) + + assert result["supported"], "Change HNSW m should be supported" + # Redis may not return m accurately in FT.INFO. + # Check doc_count_match to confirm the migration executed successfully. + assert result[ + "doc_count_match" + ], f"Migration failed: {result['validation_errors']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_hnsw_ef_construction( + self, redis_url, tmp_path, base_schema, sample_docs + ): + """Test changing HNSW ef_construction parameter.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"ef_construction": 400}} + ], + }, + }, + ) + + assert result["supported"], "Change ef_construction should be supported" + # Redis may not return ef_construction accurately in FT.INFO. + # Check doc_count_match to confirm the migration executed successfully. + assert result[ + "doc_count_match" + ], f"Migration failed: {result['validation_errors']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_hnsw_ef_runtime( + self, redis_url, tmp_path, base_schema, sample_docs + ): + """Test changing HNSW ef_runtime parameter.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"ef_runtime": 20}} + ], + }, + }, + ) + + assert result["supported"], "Change ef_runtime should be supported" + # Redis may not return ef_runtime accurately in FT.INFO (often returns defaults). + # Check doc_count_match to confirm the migration executed successfully. + assert result[ + "doc_count_match" + ], f"Migration failed: {result['validation_errors']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_hnsw_epsilon(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing HNSW epsilon parameter.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"epsilon": 0.05}} + ], + }, + }, + ) + + assert result["supported"], "Change epsilon should be supported" + # Redis may not return epsilon accurately in FT.INFO (often returns defaults). + # Check doc_count_match to confirm the migration executed successfully. + assert result[ + "doc_count_match" + ], f"Migration failed: {result['validation_errors']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +class TestJsonStorageType: + """Tests for migrations with JSON storage type.""" + + @pytest.fixture + def json_schema(self, unique_ids): + """Schema using JSON storage type.""" + return { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "json", + }, + "fields": [ + {"name": "doc_id", "type": "tag", "attrs": {"path": "$.doc_id"}}, + {"name": "title", "type": "text", "attrs": {"path": "$.title"}}, + {"name": "category", "type": "tag", "attrs": {"path": "$.category"}}, + {"name": "price", "type": "numeric", "attrs": {"path": "$.price"}}, + { + "name": "embedding", + "type": "vector", + "attrs": { + "path": "$.embedding", + "algorithm": "hnsw", + "dims": 4, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + } + + @pytest.fixture + def json_sample_docs(self): + """Sample JSON documents (as dicts for RedisJSON).""" + return [ + { + "doc_id": "1", + "title": "Alpha Product", + "category": "electronics", + "price": 99.99, + "embedding": [0.1, 0.2, 0.3, 0.4], + }, + { + "doc_id": "2", + "title": "Beta Service", + "category": "software", + "price": 149.99, + "embedding": [0.2, 0.3, 0.4, 0.5], + }, + ] + + def test_json_add_field( + self, redis_url, tmp_path, unique_ids, json_schema, json_sample_docs, client + ): + """Test adding a field with JSON storage.""" + index = SearchIndex.from_dict(json_schema, redis_url=redis_url) + index.create(overwrite=True) + + # Load JSON docs directly + for i, doc in enumerate(json_sample_docs): + key = f"{unique_ids['prefix']}:{i+1}" + client.json().set(key, "$", json_sample_docs[i]) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "add_fields": [ + { + "name": "status", + "type": "tag", + "attrs": {"path": "$.status"}, + } + ], + }, + }, + ) + + assert result["supported"], "Add field with JSON should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_json_rename_field( + self, redis_url, tmp_path, unique_ids, json_schema, json_sample_docs, client + ): + """Test renaming a field with JSON storage.""" + index = SearchIndex.from_dict(json_schema, redis_url=redis_url) + index.create(overwrite=True) + + # Load JSON docs + for i, doc in enumerate(json_sample_docs): + key = f"{unique_ids['prefix']}:{i+1}" + client.json().set(key, "$", doc) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "rename_fields": [ + {"old_name": "title", "new_name": "headline"} + ], + }, + }, + ) + + assert result["supported"], "Rename field with JSON should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise diff --git a/tests/integration/test_migration_v1.py b/tests/integration/test_migration_v1.py new file mode 100644 index 000000000..88720cb94 --- /dev/null +++ b/tests/integration/test_migration_v1.py @@ -0,0 +1,129 @@ +import uuid + +import yaml + +from redisvl.index import SearchIndex +from redisvl.migration import MigrationExecutor, MigrationPlanner, MigrationValidator +from redisvl.migration.utils import load_migration_plan, schemas_equal +from redisvl.redis.utils import array_to_buffer + + +def test_drop_recreate_plan_apply_validate_flow(redis_url, worker_id, tmp_path): + unique_id = str(uuid.uuid4())[:8] + index_name = f"migration_v1_{worker_id}_{unique_id}" + prefix = f"migration_v1:{worker_id}:{unique_id}" + + source_index = SearchIndex.from_dict( + { + "index": { + "name": index_name, + "prefix": prefix, + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "title", "type": "text"}, + {"name": "price", "type": "numeric"}, + { + "name": "embedding", + "type": "vector", + "attrs": { + "algorithm": "hnsw", + "dims": 3, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + }, + redis_url=redis_url, + ) + + docs = [ + { + "doc_id": "1", + "title": "alpha", + "price": 1, + "category": "news", + "embedding": array_to_buffer([0.1, 0.2, 0.3], "float32"), + }, + { + "doc_id": "2", + "title": "beta", + "price": 2, + "category": "sports", + "embedding": array_to_buffer([0.2, 0.1, 0.4], "float32"), + }, + ] + + source_index.create(overwrite=True) + source_index.load(docs, id_field="doc_id") + + patch_path = tmp_path / "schema_patch.yaml" + patch_path.write_text( + yaml.safe_dump( + { + "version": 1, + "changes": { + "add_fields": [ + { + "name": "category", + "type": "tag", + "attrs": {"separator": ","}, + } + ], + "remove_fields": ["price"], + "update_fields": [{"name": "title", "attrs": {"sortable": True}}], + }, + }, + sort_keys=False, + ) + ) + + plan_path = tmp_path / "migration_plan.yaml" + planner = MigrationPlanner() + plan = planner.create_plan( + index_name, + redis_url=redis_url, + schema_patch_path=str(patch_path), + ) + assert plan.diff_classification.supported is True + planner.write_plan(plan, str(plan_path)) + + query_check_path = tmp_path / "query_checks.yaml" + query_check_path.write_text( + yaml.safe_dump({"fetch_ids": ["1", "2"]}, sort_keys=False) + ) + + executor = MigrationExecutor() + report = executor.apply( + load_migration_plan(str(plan_path)), + redis_url=redis_url, + query_check_file=str(query_check_path), + ) + + try: + assert report.result == "succeeded" + assert report.validation.schema_match is True + assert report.validation.doc_count_match is True + assert report.validation.key_sample_exists is True + assert report.validation.indexing_failures_delta == 0 + assert not report.validation.errors + assert report.benchmark_summary.documents_indexed_per_second is not None + + live_index = SearchIndex.from_existing(index_name, redis_url=redis_url) + assert schemas_equal(live_index.schema.to_dict(), plan.merged_target_schema) + + validator = MigrationValidator() + validation, _target_info, _duration = validator.validate( + load_migration_plan(str(plan_path)), + redis_url=redis_url, + query_check_file=str(query_check_path), + ) + assert validation.schema_match is True + assert validation.doc_count_match is True + assert validation.key_sample_exists is True + assert not validation.errors + finally: + live_index = SearchIndex.from_existing(index_name, redis_url=redis_url) + live_index.delete(drop=True) diff --git a/tests/unit/test_migration_planner.py b/tests/unit/test_migration_planner.py new file mode 100644 index 000000000..b63a297cd --- /dev/null +++ b/tests/unit/test_migration_planner.py @@ -0,0 +1,890 @@ +from fnmatch import fnmatch + +import yaml + +from redisvl.migration import MigrationPlanner +from redisvl.schema.schema import IndexSchema + + +class DummyClient: + def __init__(self, keys): + self.keys = keys + + def scan(self, cursor=0, match=None, count=None): + matched = [] + for key in self.keys: + decoded_key = key.decode() if isinstance(key, bytes) else str(key) + if match is None or fnmatch(decoded_key, match): + matched.append(key) + return 0, matched + + +class DummyIndex: + def __init__(self, schema, stats, keys): + self.schema = schema + self._stats = stats + self._client = DummyClient(keys) + + @property + def client(self): + return self._client + + def info(self): + return self._stats + + +def _make_source_schema(): + return IndexSchema.from_dict( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "json", + }, + "fields": [ + { + "name": "title", + "type": "text", + "path": "$.title", + "attrs": {"sortable": False}, + }, + { + "name": "price", + "type": "numeric", + "path": "$.price", + "attrs": {"sortable": True}, + }, + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "flat", + "dims": 3, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + } + ) + + +def test_create_plan_from_schema_patch_preserves_unspecified_config( + monkeypatch, tmp_path +): + source_schema = _make_source_schema() + dummy_index = DummyIndex( + source_schema, + {"num_docs": 2, "indexing": False}, + [b"docs:1", b"docs:2", b"docs:3"], + ) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + patch_path = tmp_path / "schema_patch.yaml" + patch_path.write_text( + yaml.safe_dump( + { + "version": 1, + "changes": { + "add_fields": [ + { + "name": "category", + "type": "tag", + "path": "$.category", + "attrs": {"separator": ","}, + } + ], + "remove_fields": ["price"], + "update_fields": [ + { + "name": "title", + "options": {"sortable": True}, + } + ], + }, + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner(key_sample_limit=2) + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + schema_patch_path=str(patch_path), + ) + + assert plan.diff_classification.supported is True + assert plan.source.index_name == "docs" + assert plan.source.keyspace.storage_type == "json" + assert plan.source.keyspace.prefixes == ["docs"] + assert plan.source.keyspace.key_separator == ":" + assert plan.source.keyspace.key_sample == ["docs:1", "docs:2"] + assert plan.warnings == ["Index downtime is required"] + + merged_fields = { + field["name"]: field for field in plan.merged_target_schema["fields"] + } + assert plan.merged_target_schema["index"]["prefix"] == "docs" + assert merged_fields["title"]["attrs"]["sortable"] is True + assert "price" not in merged_fields + assert merged_fields["category"]["type"] == "tag" + + plan_path = tmp_path / "migration_plan.yaml" + planner.write_plan(plan, str(plan_path)) + written_plan = yaml.safe_load(plan_path.read_text()) + assert written_plan["mode"] == "drop_recreate" + assert written_plan["validation"]["require_doc_count_match"] is True + assert written_plan["diff_classification"]["supported"] is True + + +def test_target_schema_vector_datatype_change_is_allowed(monkeypatch, tmp_path): + """Changing vector datatype (quantization) is allowed - executor will re-encode.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "json", + }, + "fields": [ + { + "name": "title", + "type": "text", + "path": "$.title", + "attrs": {"sortable": False}, + }, + { + "name": "price", + "type": "numeric", + "path": "$.price", + "attrs": {"sortable": True}, + }, + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "flat", # Same algorithm + "dims": 3, + "distance_metric": "cosine", + "datatype": "float16", # Changed from float32 + }, + }, + ], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + # Datatype change (quantization) should now be ALLOWED + assert plan.diff_classification.supported is True + assert len(plan.diff_classification.blocked_reasons) == 0 + + # Verify datatype changes are detected for the executor + datatype_changes = MigrationPlanner.get_vector_datatype_changes( + plan.source.schema_snapshot, plan.merged_target_schema + ) + assert "embedding" in datatype_changes + assert datatype_changes["embedding"]["source"] == "float32" + assert datatype_changes["embedding"]["target"] == "float16" + + +def test_target_schema_vector_algorithm_change_is_allowed(monkeypatch, tmp_path): + """Changing vector algorithm is allowed (index-only change).""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "json", + }, + "fields": [ + { + "name": "title", + "type": "text", + "path": "$.title", + "attrs": {"sortable": False}, + }, + { + "name": "price", + "type": "numeric", + "path": "$.price", + "attrs": {"sortable": True}, + }, + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "hnsw", # Changed from flat + "dims": 3, + "distance_metric": "cosine", + "datatype": "float32", # Same datatype + }, + }, + ], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + # Algorithm change should be ALLOWED + assert plan.diff_classification.supported is True + assert len(plan.diff_classification.blocked_reasons) == 0 + + +# ============================================================================= +# BLOCKED CHANGES (Document-Dependent) - require iterative_shadow +# ============================================================================= + + +def test_target_schema_prefix_change_is_supported(monkeypatch, tmp_path): + """Prefix change is now supported via key rename operations.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs_v2", + "key_separator": ":", + "storage_type": "json", + }, + "fields": source_schema.to_dict()["fields"], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + # Prefix change is now supported + assert plan.diff_classification.supported is True + # Verify rename operation is populated + assert plan.rename_operations.change_prefix == "docs_v2" + # Verify warning is present + assert any("Prefix change" in w for w in plan.warnings) + + +def test_key_separator_change_is_blocked(monkeypatch, tmp_path): + """Key separator change is blocked: document keys don't match new pattern.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": "/", # Changed from ":" + "storage_type": "json", + }, + "fields": source_schema.to_dict()["fields"], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + assert plan.diff_classification.supported is False + assert any( + "key_separator" in reason.lower() or "separator" in reason.lower() + for reason in plan.diff_classification.blocked_reasons + ) + + +def test_storage_type_change_is_blocked(monkeypatch, tmp_path): + """Storage type change is blocked: documents are in wrong format.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "hash", # Changed from "json" + }, + "fields": [ + {"name": "title", "type": "text", "attrs": {"sortable": False}}, + {"name": "price", "type": "numeric", "attrs": {"sortable": True}}, + { + "name": "embedding", + "type": "vector", + "attrs": { + "algorithm": "flat", + "dims": 3, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + assert plan.diff_classification.supported is False + assert any( + "storage" in reason.lower() + for reason in plan.diff_classification.blocked_reasons + ) + + +def test_vector_dimension_change_is_blocked(monkeypatch, tmp_path): + """Vector dimension change is blocked: stored vectors have wrong size.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "json", + }, + "fields": [ + { + "name": "title", + "type": "text", + "path": "$.title", + "attrs": {"sortable": False}, + }, + { + "name": "price", + "type": "numeric", + "path": "$.price", + "attrs": {"sortable": True}, + }, + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "flat", + "dims": 768, # Changed from 3 + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + assert plan.diff_classification.supported is False + assert any( + "dims" in reason and "document migration" in reason + for reason in plan.diff_classification.blocked_reasons + ) + + +def test_field_path_change_is_blocked(monkeypatch, tmp_path): + """JSON path change is blocked: stored data is at wrong path.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "json", + }, + "fields": [ + { + "name": "title", + "type": "text", + "path": "$.metadata.title", # Changed from $.title + "attrs": {"sortable": False}, + }, + { + "name": "price", + "type": "numeric", + "path": "$.price", + "attrs": {"sortable": True}, + }, + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "flat", + "dims": 3, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + assert plan.diff_classification.supported is False + assert any( + "path" in reason.lower() for reason in plan.diff_classification.blocked_reasons + ) + + +def test_field_type_change_is_blocked(monkeypatch, tmp_path): + """Field type change is blocked: index expects different data format.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "json", + }, + "fields": [ + { + "name": "title", + "type": "tag", # Changed from text + "path": "$.title", + }, + { + "name": "price", + "type": "numeric", + "path": "$.price", + "attrs": {"sortable": True}, + }, + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "flat", + "dims": 3, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + assert plan.diff_classification.supported is False + assert any( + "type" in reason.lower() for reason in plan.diff_classification.blocked_reasons + ) + + +def test_field_rename_is_detected_and_blocked(monkeypatch, tmp_path): + """Field rename is blocked: stored data uses old field name.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "json", + }, + "fields": [ + { + "name": "document_title", # Renamed from "title" + "type": "text", + "path": "$.title", + "attrs": {"sortable": False}, + }, + { + "name": "price", + "type": "numeric", + "path": "$.price", + "attrs": {"sortable": True}, + }, + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "flat", + "dims": 3, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + assert plan.diff_classification.supported is False + assert any( + "rename" in reason.lower() + for reason in plan.diff_classification.blocked_reasons + ) + + +# ============================================================================= +# ALLOWED CHANGES (Index-Only) +# ============================================================================= + + +def test_add_non_vector_field_is_allowed(monkeypatch, tmp_path): + """Adding a non-vector field is allowed.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + patch_path = tmp_path / "schema_patch.yaml" + patch_path.write_text( + yaml.safe_dump( + { + "version": 1, + "changes": { + "add_fields": [ + {"name": "category", "type": "tag", "path": "$.category"} + ] + }, + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + schema_patch_path=str(patch_path), + ) + + assert plan.diff_classification.supported is True + + +def test_remove_field_is_allowed(monkeypatch, tmp_path): + """Removing a field from the index is allowed.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + patch_path = tmp_path / "schema_patch.yaml" + patch_path.write_text( + yaml.safe_dump( + {"version": 1, "changes": {"remove_fields": ["price"]}}, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + schema_patch_path=str(patch_path), + ) + + assert plan.diff_classification.supported is True + + +def test_change_field_sortable_is_allowed(monkeypatch, tmp_path): + """Changing field sortable option is allowed.""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + patch_path = tmp_path / "schema_patch.yaml" + patch_path.write_text( + yaml.safe_dump( + { + "version": 1, + "changes": { + "update_fields": [{"name": "title", "options": {"sortable": True}}] + }, + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + schema_patch_path=str(patch_path), + ) + + assert plan.diff_classification.supported is True + + +def test_change_vector_distance_metric_is_allowed(monkeypatch, tmp_path): + """Changing vector distance metric is allowed (index-only).""" + source_schema = _make_source_schema() + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "json", + }, + "fields": [ + { + "name": "title", + "type": "text", + "path": "$.title", + "attrs": {"sortable": False}, + }, + { + "name": "price", + "type": "numeric", + "path": "$.price", + "attrs": {"sortable": True}, + }, + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "flat", + "dims": 3, + "distance_metric": "L2", # Changed from cosine + "datatype": "float32", + }, + }, + ], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + assert plan.diff_classification.supported is True + assert len(plan.diff_classification.blocked_reasons) == 0 + + +def test_change_hnsw_tuning_params_is_allowed(monkeypatch, tmp_path): + """Changing HNSW tuning parameters is allowed (index-only).""" + source_schema = IndexSchema.from_dict( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "json", + }, + "fields": [ + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "hnsw", + "dims": 3, + "distance_metric": "cosine", + "datatype": "float32", + "m": 16, + "ef_construction": 200, + }, + }, + ], + } + ) + dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"]) + monkeypatch.setattr( + "redisvl.migration.planner.SearchIndex.from_existing", + lambda *args, **kwargs: dummy_index, + ) + + target_schema_path = tmp_path / "target_schema.yaml" + target_schema_path.write_text( + yaml.safe_dump( + { + "index": { + "name": "docs", + "prefix": "docs", + "key_separator": ":", + "storage_type": "json", + }, + "fields": [ + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "hnsw", + "dims": 3, + "distance_metric": "cosine", + "datatype": "float32", + "m": 32, # Changed from 16 + "ef_construction": 400, # Changed from 200 + }, + }, + ], + }, + sort_keys=False, + ) + ) + + planner = MigrationPlanner() + plan = planner.create_plan( + "docs", + redis_url="redis://localhost:6379", + target_schema_path=str(target_schema_path), + ) + + assert plan.diff_classification.supported is True + assert len(plan.diff_classification.blocked_reasons) == 0