Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 271 additions & 0 deletions redisvl/cli/migrate.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import argparse
import asyncio
import sys
from pathlib import Path
from typing import Optional

from redisvl.cli.utils import add_redis_connection_options, create_redis_url
from redisvl.migration import (
AsyncMigrationExecutor,
BatchMigrationExecutor,
BatchMigrationPlanner,
MigrationExecutor,
MigrationPlanner,
MigrationValidator,
Expand All @@ -16,6 +19,7 @@
estimate_disk_space,
list_indexes,
load_migration_plan,
load_yaml,
write_benchmark_report,
write_migration_report,
)
Expand All @@ -37,6 +41,10 @@ class Migrate:
"\tapply Execute a reviewed drop/recreate migration plan (use --async for large migrations)",
"\testimate Estimate disk space required for a migration plan (dry-run, no mutations)",
"\tvalidate Validate a completed migration plan against the live index",
"\tbatch-plan Generate a batch migration plan for multiple indexes",
"\tbatch-apply Execute a batch migration plan with checkpointing",
"\tbatch-resume Resume an interrupted batch migration",
"\tbatch-status Show status of an in-progress or completed batch migration",
"\n",
]
)
Expand Down Expand Up @@ -495,3 +503,266 @@ def _print_report_summary(
print(f"- {action}")
if benchmark_out:
print(f"Benchmark report written to {benchmark_out}")

def batch_plan(self):
"""Generate a batch migration plan for multiple indexes."""
parser = argparse.ArgumentParser(
usage=(
"rvl migrate batch-plan --schema-patch <patch.yaml> "
"(--pattern <glob> | --indexes <name1,name2> | --indexes-file <file>)"
)
)
parser.add_argument(
"--schema-patch", help="Path to shared schema patch file", required=True
)
parser.add_argument(
"--pattern", help="Glob pattern to match index names (e.g., '*_idx')"
)
parser.add_argument("--indexes", help="Comma-separated list of index names")
parser.add_argument(
"--indexes-file", help="File with index names (one per line)"
)
parser.add_argument(
"--failure-policy",
help="How to handle failures: fail_fast or continue_on_error",
choices=["fail_fast", "continue_on_error"],
default="fail_fast",
)
parser.add_argument(
"--plan-out",
help="Path to write batch_plan.yaml",
default="batch_plan.yaml",
)
parser = add_redis_connection_options(parser)
args = parser.parse_args(sys.argv[3:])

redis_url = create_redis_url(args)
indexes = (
[idx.strip() for idx in args.indexes.split(",") if idx.strip()]
if args.indexes
else None
)

planner = BatchMigrationPlanner()
batch_plan = planner.create_batch_plan(
indexes=indexes,
pattern=args.pattern,
indexes_file=args.indexes_file,
schema_patch_path=args.schema_patch,
redis_url=redis_url,
failure_policy=args.failure_policy,
)

planner.write_batch_plan(batch_plan, args.plan_out)
self._print_batch_plan_summary(args.plan_out, batch_plan)

def batch_apply(self):
"""Execute a batch migration plan with checkpointing."""
parser = argparse.ArgumentParser(
usage=(
"rvl migrate batch-apply --plan <batch_plan.yaml> "
"[--state <batch_state.yaml>] [--report-dir <./reports>]"
)
)
parser.add_argument("--plan", help="Path to batch_plan.yaml", required=True)
parser.add_argument(
"--accept-data-loss",
help="Acknowledge that quantization is lossy and cannot be reverted",
action="store_true",
)
parser.add_argument(
"--state",
help="Path to checkpoint state file",
default="batch_state.yaml",
)
parser.add_argument(
"--report-dir",
help="Directory for per-index migration reports",
default="./reports",
)
parser = add_redis_connection_options(parser)
args = parser.parse_args(sys.argv[3:])

from redisvl.migration.models import BatchPlan

plan_data = load_yaml(args.plan)
batch_plan = BatchPlan.model_validate(plan_data)

if batch_plan.requires_quantization and not args.accept_data_loss:
print(
"""WARNING: This batch migration includes quantization (e.g., float32 -> float16).
Vector data will be modified. Original precision cannot be recovered.
To proceed, add --accept-data-loss flag.

If you need to preserve original vectors, backup your data first:
redis-cli BGSAVE"""
)
sys.exit(1)

redis_url = create_redis_url(args)
executor = BatchMigrationExecutor()

def progress_callback(
index_name: str, position: int, total: int, status: str
) -> None:
print(f"[{position}/{total}] {index_name}: {status}")

report = executor.apply(
batch_plan,
batch_plan_path=args.plan,
state_path=args.state,
report_dir=args.report_dir,
redis_url=redis_url,
progress_callback=progress_callback,
)

self._print_batch_report_summary(report)

def batch_resume(self):
"""Resume an interrupted batch migration."""
parser = argparse.ArgumentParser(
usage=(
"rvl migrate batch-resume --state <batch_state.yaml> "
"[--plan <batch_plan.yaml>] [--retry-failed]"
)
)
parser.add_argument(
"--state", help="Path to checkpoint state file", required=True
)
parser.add_argument(
"--plan", help="Path to batch_plan.yaml (optional, uses state.plan_path)"
)
parser.add_argument(
"--retry-failed",
help="Retry previously failed indexes",
action="store_true",
)
parser.add_argument(
"--report-dir",
help="Directory for per-index migration reports",
default="./reports",
)
parser = add_redis_connection_options(parser)
args = parser.parse_args(sys.argv[3:])

redis_url = create_redis_url(args)
executor = BatchMigrationExecutor()

def progress_callback(
index_name: str, position: int, total: int, status: str
) -> None:
print(f"[{position}/{total}] {index_name}: {status}")

report = executor.resume(
args.state,
batch_plan_path=args.plan,
retry_failed=args.retry_failed,
report_dir=args.report_dir,
Comment on lines +656 to +660
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Enforce data-loss acknowledgement on batch resume

batch-resume starts execution immediately via executor.resume(...) but never performs the quantization safety gate used by batch-apply (requires_quantization + --accept-data-loss). This allows a quantizing migration to run without explicit acknowledgement if a user resumes from an existing/handcrafted state file, which bypasses the intended irreversible-data-loss confirmation path.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — batch-resume now has quantization safety gate with --accept-data-loss.

redis_url=redis_url,
progress_callback=progress_callback,
)

self._print_batch_report_summary(report)

def batch_status(self):
"""Show status of an in-progress or completed batch migration."""
parser = argparse.ArgumentParser(
usage="rvl migrate batch-status --state <batch_state.yaml>"
)
parser.add_argument(
"--state", help="Path to checkpoint state file", required=True
)
args = parser.parse_args(sys.argv[3:])

state_path = Path(args.state).resolve()
if not state_path.exists():
print(f"State file not found: {args.state}")
sys.exit(1)

from redisvl.migration.models import BatchState

state_data = load_yaml(args.state)
state = BatchState.model_validate(state_data)

print(
f"""Batch ID: {state.batch_id}
Started at: {state.started_at}
Updated at: {state.updated_at}
Current index: {state.current_index or '(none)'}
Remaining: {len(state.remaining)}
Completed: {len(state.completed)}
- Succeeded: {state.success_count}
- Failed: {state.failed_count}
- Skipped: {state.skipped_count}"""
)

if state.completed:
print("\nCompleted indexes:")
for idx in state.completed:
if idx.status == "success":
status_icon = "[OK]"
elif idx.status == "skipped":
status_icon = "[SKIP]"
else:
status_icon = "[FAIL]"
print(f" {status_icon} {idx.name}")
if idx.error:
print(f" Error: {idx.error}")

if state.remaining:
print(f"\nRemaining indexes ({len(state.remaining)}):")
for name in state.remaining[:10]:
print(f" - {name}")
if len(state.remaining) > 10:
print(f" ... and {len(state.remaining) - 10} more")

def _print_batch_plan_summary(self, plan_out: str, batch_plan) -> None:
"""Print summary after generating batch plan."""
import os

abs_path = os.path.abspath(plan_out)
print(
f"""Batch plan written to {abs_path}
Batch ID: {batch_plan.batch_id}
Mode: {batch_plan.mode}
Failure policy: {batch_plan.failure_policy}
Requires quantization: {batch_plan.requires_quantization}
Total indexes: {len(batch_plan.indexes)}
- Applicable: {batch_plan.applicable_count}
- Skipped: {batch_plan.skipped_count}"""
)

if batch_plan.skipped_count > 0:
print("\nSkipped indexes:")
for idx in batch_plan.indexes:
if not idx.applicable:
print(f" - {idx.name}: {idx.skip_reason}")

print(
f"""
Next steps:
Review the plan: cat {plan_out}
Apply the migration: rvl migrate batch-apply --plan {plan_out}"""
)

if batch_plan.requires_quantization:
print(" (add --accept-data-loss for quantization)")

def _print_batch_report_summary(self, report) -> None:
"""Print summary after batch migration completes."""
print(
f"""
Batch migration {report.status}
Batch ID: {report.batch_id}
Duration: {report.summary.total_duration_seconds}s
Total: {report.summary.total_indexes}
- Succeeded: {report.summary.successful}
- Failed: {report.summary.failed}
- Skipped: {report.summary.skipped}"""
)

if report.summary.failed > 0:
print("\nFailed indexes:")
for idx in report.indexes:
if idx.status == "failed":
print(f" - {idx.name}: {idx.error}")
8 changes: 8 additions & 0 deletions redisvl/migration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from redisvl.migration.async_executor import AsyncMigrationExecutor
from redisvl.migration.async_planner import AsyncMigrationPlanner
from redisvl.migration.async_validation import AsyncMigrationValidator
from redisvl.migration.batch_executor import BatchMigrationExecutor
from redisvl.migration.batch_planner import BatchMigrationPlanner
from redisvl.migration.executor import MigrationExecutor
from redisvl.migration.models import BatchPlan, BatchState, SchemaPatch
from redisvl.migration.planner import MigrationPlanner
from redisvl.migration.validation import MigrationValidator
from redisvl.migration.wizard import MigrationWizard
Expand All @@ -10,8 +13,13 @@
"AsyncMigrationExecutor",
"AsyncMigrationPlanner",
"AsyncMigrationValidator",
"BatchMigrationExecutor",
"BatchMigrationPlanner",
"BatchPlan",
"BatchState",
"MigrationExecutor",
"MigrationPlanner",
"MigrationValidator",
"MigrationWizard",
"SchemaPatch",
]
Loading
Loading