feat(migrate): [3/7] Async planner, executor, and validator#562
feat(migrate): [3/7] Async planner, executor, and validator#562nkanu17 wants to merge 7 commits intofeat/migrate-executorfrom
Conversation
🛡️ Jit Security Scan Results✅ No security findings were detected in this PR
Security scan by Jit
|
There was a problem hiding this comment.
Pull request overview
Adds an async migration surface (AsyncMigrationPlanner, AsyncMigrationExecutor, AsyncMigrationValidator) to enable non-blocking drop/recreate migrations, plus async readiness/index utilities and corresponding unit/integration tests.
Changes:
- Introduces async planner/executor/validator implementations mirroring the existing sync migration flow (including renames, vector re-encoding, and readiness polling).
- Adds async helper utilities for listing indexes, readiness polling, and snapshot comparison.
- Adds new async unit and integration tests for planning, execution, disk space estimation, and reliability helpers.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
redisvl/migration/async_planner.py |
Async migration planning built on AsyncSearchIndex with sync planner delegation. |
redisvl/migration/async_executor.py |
Async migration apply flow incl. key/field renames, optional vector re-encoding, readiness wait, and validation. |
redisvl/migration/async_validation.py |
Async post-migration validation and query checks. |
redisvl/migration/async_utils.py |
Async index listing, readiness polling, and snapshot match helper. |
redisvl/migration/__init__.py |
Exposes new async migration APIs/utilities from the package. |
tests/unit/test_async_migration_planner.py |
Unit coverage for async planner parity with sync behavior. |
tests/unit/test_async_migration_executor.py |
Unit coverage for async executor + disk estimator + reliability helpers. |
tests/integration/test_async_migration_v1.py |
End-to-end integration coverage for async plan/apply/validate against Redis. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0087dcf33b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0087dcf33b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0087dcf33b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
- Fix unbound 'ready' variable in async_utils.py and async_executor.py - Fix completed checkpoint: resume from post-drop state - Pass rename_operations to get_vector_datatype_changes - Fix has_prefix_change falsy check for empty string prefixes - Fix partial key renames: fail fast on collision - Warn when field rename overwrites existing destination field - Fix async_validation prefix handling and indexing failure delta
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed' - Close internally-created Redis client in async_list_indexes
50cff88 to
33f2a40
Compare
0087dcf to
8642ec7
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8642ec7435
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
33f2a40 to
42aa7bb
Compare
- Fix unbound 'ready' variable in async_utils.py and async_executor.py - Fix completed checkpoint: resume from post-drop state - Pass rename_operations to get_vector_datatype_changes - Fix has_prefix_change falsy check for empty string prefixes - Fix partial key renames: fail fast on collision - Warn when field rename overwrites existing destination field - Fix async_validation prefix handling and indexing failure delta
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed' - Close internally-created Redis client in async_list_indexes
- Pass existing snapshot to create_plan_from_patch to avoid double Redis round-trip - Use _get_client() instead of _redis_client for lazy async client initialization - Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
8642ec7 to
7a1ef9a
Compare
- Pass existing snapshot to create_plan_from_patch to avoid double Redis round-trip - Use _get_client() instead of _redis_client for lazy async client initialization - Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
- Fix unbound 'ready' variable in async_utils.py and async_executor.py - Fix completed checkpoint: resume from post-drop state - Pass rename_operations to get_vector_datatype_changes - Fix has_prefix_change falsy check for empty string prefixes - Fix partial key renames: fail fast on collision - Warn when field rename overwrites existing destination field - Fix async_validation prefix handling and indexing failure delta
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed' - Close internally-created Redis client in async_list_indexes
- Pass existing snapshot to create_plan_from_patch to avoid double Redis round-trip - Use _get_client() instead of _redis_client for lazy async client initialization - Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
- Switch from import logging to redisvl.utils.log.get_logger in async_executor
- Honor ValidationPolicy flags (require_schema_match, require_doc_count_match) in async validator - Handle missing source index in async current_source_matches_snapshot - Always remap datatype_changes keys on resume in async executor - Delete stale completed checkpoint on fresh run in async executor
b96771b to
ef8fca8
Compare
- Pass existing snapshot to create_plan_from_patch to avoid double Redis round-trip - Use _get_client() instead of _redis_client for lazy async client initialization - Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ef8fca8e5f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if percent_indexed is not None or indexing is not None: | ||
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | ||
| if progress_callback: |
There was a problem hiding this comment.
Treat missing percent_indexed as ready when indexing stops
The readiness check currently computes ready as float(percent_indexed or 0) >= 1.0 and not bool(indexing). When Redis returns an indexing flag but omits percent_indexed (which happens on some server/module versions), this expression can never become true even after indexing finishes, so apply() waits until timeout and fails the migration after up to 30 minutes.
Useful? React with 👍 / 👎.
| match_pattern = ( | ||
| f"{prefix}*" | ||
| if prefix.endswith(key_separator) | ||
| else f"{prefix}{key_separator}*" | ||
| ) |
There was a problem hiding this comment.
Use wildcard scan when sampling keys for empty prefixes
For an empty index prefix, this logic builds the SCAN pattern as ":*" instead of "*", so _async_sample_keys returns no documents even when keys exist. That produces an empty key_sample, which then disables post-migration sampled-key existence validation and can hide key migration regressions for indexes configured without a prefix.
Useful? React with 👍 / 👎.
| current_index = await AsyncSearchIndex.from_existing( | ||
| index_name, | ||
| redis_url=redis_url, | ||
| redis_client=redis_client, | ||
| ) |
There was a problem hiding this comment.
Return False when async snapshot source index is absent
This helper is typed/advertised as a boolean check but directly awaits AsyncSearchIndex.from_existing without handling missing-index errors. If the source index has already been dropped (a normal migration/resume scenario), callers get an exception instead of False, which can abort control flow that expects a simple match/non-match result.
Useful? React with 👍 / 👎.
…xecutor, validator, and readiness utilities Async versions of planner, executor, and validator for non-blocking migration workflows. Async executor mirrors the sync drop/recreate flow with async key enumeration, prefix/field renames, vector re-encoding with checkpoint resume, and readiness polling. Includes async utilities for index listing, readiness polling, and source snapshot validation. Adds async unit and integration tests.
- Fix unbound 'ready' variable in async_utils.py and async_executor.py - Fix completed checkpoint: resume from post-drop state - Pass rename_operations to get_vector_datatype_changes - Fix has_prefix_change falsy check for empty string prefixes - Fix partial key renames: fail fast on collision - Warn when field rename overwrites existing destination field - Fix async_validation prefix handling and indexing failure delta
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed' - Close internally-created Redis client in async_list_indexes
- Pass existing snapshot to create_plan_from_patch to avoid double Redis round-trip - Use _get_client() instead of _redis_client for lazy async client initialization - Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
- Switch from import logging to redisvl.utils.log.get_logger in async_executor
- Honor ValidationPolicy flags (require_schema_match, require_doc_count_match) in async validator - Handle missing source index in async current_source_matches_snapshot - Always remap datatype_changes keys on resume in async executor - Delete stale completed checkpoint on fresh run in async executor
34c5f95 to
70520ba
Compare
- Pass existing snapshot to create_plan_from_patch to avoid double Redis round-trip - Use _get_client() instead of _redis_client for lazy async client initialization - Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
ef8fca8 to
28fb826
Compare
|
@codex review |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try: | ||
| search_result = await target_index.search(Query("*").paging(0, 1)) | ||
| total_found = search_result.total | ||
| passed = total_found == expected_doc_count |
There was a problem hiding this comment.
The functional wildcard search check is stricter than the sync validator: it requires total_found == expected_doc_count, but the sync version only verifies the index responds (total_found > 0 unless expected_doc_count == 0). This exact-equality check can fail during transient states (e.g., indexing in progress) and is redundant with the separate doc_count_match check above. Align the async behavior with MigrationValidator._run_functional_checks for consistency and resilience.
| passed = total_found == expected_doc_count | |
| if expected_doc_count == 0: | |
| passed = total_found == 0 | |
| else: | |
| passed = total_found > 0 |
| if percent_indexed is not None or indexing is not None: | ||
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | ||
| if progress_callback: | ||
| total_docs = int(latest_info.get("num_docs", 0)) | ||
| pct = float(percent_indexed or 0) | ||
| indexed_docs = int(total_docs * pct) | ||
| progress_callback(indexed_docs, total_docs, pct * 100) |
There was a problem hiding this comment.
wait-for-ready logic treats missing percent_indexed as 0.0 even when indexing is falsy, which can cause an infinite wait/timeout on Redis versions that return indexing but omit percent_indexed. The sync wait_for_index_ready() handles this by treating the index as ready when indexing is falsy if percent_indexed is absent. Mirror that logic here (and keep progress_callback percentages consistent).
| if percent_indexed is not None or indexing is not None: | |
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) | |
| pct = float(percent_indexed or 0) | |
| indexed_docs = int(total_docs * pct) | |
| progress_callback(indexed_docs, total_docs, pct * 100) | |
| if percent_indexed is not None: | |
| # When percent_indexed is reported, use it directly to determine readiness. | |
| pct = float(percent_indexed or 0) | |
| ready = pct >= 1.0 and not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) | |
| indexed_docs = int(total_docs * pct) | |
| progress_callback(indexed_docs, total_docs, pct * 100) | |
| elif indexing is not None: | |
| # Some Redis versions report "indexing" without "percent_indexed". | |
| # In that case, treat the index as ready when indexing is falsy, | |
| # mirroring the synchronous wait_for_index_ready() behavior. | |
| ready = not bool(indexing) | |
| pct = 1.0 if ready else 0.0 | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) | |
| indexed_docs = int(total_docs * pct) | |
| progress_callback(indexed_docs, total_docs, pct * 100) |
| from pathlib import Path | ||
|
|
||
| from redisvl.utils.log import get_logger | ||
| import time | ||
| from typing import Any, AsyncGenerator, Callable, Dict, List, Optional |
There was a problem hiding this comment.
Import order likely violates the repo’s isort/black expectations (stdlib imports should come before internal imports). Here, get_logger is imported before the stdlib 'time'. Re-run isort (per Makefile/pre-commit) so stdlib imports are grouped and ordered consistently.
| from pathlib import Path | |
| from redisvl.utils.log import get_logger | |
| import time | |
| from typing import Any, AsyncGenerator, Callable, Dict, List, Optional | |
| import time | |
| from pathlib import Path | |
| from typing import Any, AsyncGenerator, Callable, Dict, List, Optional | |
| from redisvl.utils.log import get_logger |
| try: | ||
| if redis_client: | ||
| client = redis_client | ||
| elif redis_url: | ||
| from redis.asyncio import Redis | ||
|
|
||
| client = Redis.from_url(redis_url) | ||
| else: | ||
| client = None | ||
|
|
||
| if client and not await supports_svs_async(client): | ||
| warnings.append( | ||
| "SVS-VAMANA requires Redis >= 8.2.0 and Redis Search >= 2.8.10. " | ||
| "The target Redis instance may not support this algorithm. " | ||
| "Migration will fail at apply time if requirements are not met." | ||
| ) | ||
| except Exception: | ||
| warnings.append( | ||
| "SVS-VAMANA requires Redis >= 8.2.0 and Redis Search >= 2.8.10. " | ||
| "Verify your Redis instance supports this algorithm before applying." | ||
| ) | ||
|
|
There was a problem hiding this comment.
_check_svs_vamana_requirements creates a Redis client via Redis.from_url(redis_url) but never closes it. This can leak connections in repeated planning calls. Follow the sync planner’s pattern (track created_client and close in finally), or use RedisConnectionFactory to create/close the async client.
| pipe = client.pipeline() | ||
| undo = BatchUndoBuffer() | ||
| keys_updated_in_batch: set[str] = set() | ||
|
|
||
| try: | ||
| for key in batch: | ||
| for field_name, change in datatype_changes.items(): | ||
| field_data: bytes | None = await client.hget(key, field_name) # type: ignore[misc,assignment] | ||
| if not field_data: | ||
| continue | ||
|
|
||
| # Idempotent: skip if already converted to target dtype | ||
| dims = change.get("dims", 0) | ||
| if dims and is_already_quantized( | ||
| field_data, dims, change["source"], change["target"] | ||
| ): | ||
| skipped += 1 | ||
| continue | ||
|
|
||
| undo.store(key, field_name, field_data) | ||
| array = buffer_to_array(field_data, change["source"]) | ||
| new_bytes = array_to_buffer(array, change["target"]) | ||
| pipe.hset(key, field_name, new_bytes) # type: ignore[arg-type] | ||
| keys_updated_in_batch.add(key) |
There was a problem hiding this comment.
Quantization reads are done as sequential await client.hget() calls inside nested loops, resulting in (keys * vector_fields) round-trips per batch. This can be a major bottleneck for large indexes and undermines the “non-blocking” benefit. Consider pipelining the HGETs per batch (similar to field rename) or using asyncio.gather with bounded concurrency, then pipeline the HSET writes as you already do.
| # The executor should raise an error internally when trying to connect | ||
| # but let's verify it doesn't crash before it tries to apply | ||
| # For a proper test, we'd need to mock AsyncSearchIndex.from_existing | ||
| # For now, we just verify the executor is created | ||
| assert executor is not None | ||
|
|
||
|
|
There was a problem hiding this comment.
This test doesn’t assert any behavior about redis_url validation (it only asserts the executor instance is not None) and includes comments indicating it’s a placeholder. Either remove it or mock AsyncSearchIndex.from_existing / connection creation to actually verify that apply() fails/returns an error report when neither redis_url nor redis_client is provided.
| # The executor should raise an error internally when trying to connect | |
| # but let's verify it doesn't crash before it tries to apply | |
| # For a proper test, we'd need to mock AsyncSearchIndex.from_existing | |
| # For now, we just verify the executor is created | |
| assert executor is not None | |
| # Calling apply without redis_url or redis_client should fail fast | |
| with pytest.raises(ValueError) as exc_info: | |
| await executor.apply(plan) | |
| # Ensure the error message clearly indicates missing connection info | |
| message = str(exc_info.value).lower() | |
| assert "redis_url" in message or "redis client" in message |
|
Closing in favor of restructured PR stack (Option A ordering). |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 28fb8261d6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| percent_indexed = latest_info.get("percent_indexed") | ||
|
|
||
| if percent_indexed is not None or indexing is not None: | ||
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) |
There was a problem hiding this comment.
Handle missing percent_indexed in readiness checks
When FT.INFO omits percent_indexed but still reports indexing (for example, indexing=0 on some Redis/RediSearch versions), this condition treats the missing percentage as 0 and never marks the index ready. In apply(), that causes migrations to wait until timeout and fail even though indexing is already complete. The readiness logic should fall back to the indexing flag when percent_indexed is absent.
Useful? React with 👍 / 👎.
| try: | ||
| search_result = await target_index.search(Query("*").paging(0, 1)) | ||
| total_found = search_result.total | ||
| passed = total_found == expected_doc_count |
There was a problem hiding this comment.
Decouple functional check from strict doc-count equality
This functional check requires wildcard search totals to exactly equal expected_doc_count, which effectively enforces document-count equality even when plan.validation.require_doc_count_match is disabled. Because failed query checks always add a validation error, migrations that intentionally allow count drift will still fail under async validation. The functional check should verify index operability without re-imposing strict count matching.
Useful? React with 👍 / 👎.
| current_index = await AsyncSearchIndex.from_existing( | ||
| index_name, | ||
| redis_url=redis_url, | ||
| redis_client=redis_client, | ||
| ) |
There was a problem hiding this comment.
Return False for missing source index in snapshot check
async_current_source_matches_snapshot is used as a boolean predicate, but it raises if the source index no longer exists because from_existing exceptions are not handled. That makes callers crash in drop/resume scenarios where a missing index should simply mean “does not match snapshot.” The sync equivalent returns False here, so this async utility currently has incompatible and brittle behavior.
Useful? React with 👍 / 👎.
Summary
Adds fully async versions of the migration planner, executor, and validator. These are designed for large-scale migrations where non-blocking I/O provides better throughput, especially during quantization of many keys.
Usage
# Apply a migration using the async executor rvl migrate apply --plan migration_plan.yaml --asyncFrom Python:
What is included
redisvl/migration/async_planner.py):AsyncMigrationPlannerwith async Redis introspection and schema diffing.redisvl/migration/async_executor.py):AsyncMigrationExecutorwith the full 8-step pipeline using async Redis commands, including async quantization with checkpoint support.redisvl/migration/async_validation.py):AsyncMigrationValidatorfor post-migration checks.redisvl/migration/async_utils.py): Async versions of shared helpers (index listing, AOF detection, etc.).redisvl/migration/__init__.py): Exports for all async classes.Details
The async implementations mirror their sync counterparts exactly but use
redis.asyncioclients throughout. Key differences:awaitfor all Redis operationsMigrationPlanandMigrationReportmodelsPR Stack
feat/migrate-corefeat/migrate-executorfeat/migrate-asyncfeat/migrate-batchfeat/migrate-wizardfeat/migrate-cli-docsfeat/migrate-benchmarks