-
Notifications
You must be signed in to change notification settings - Fork 7
[fix] Add polling mechanism for kv_batch_get
#32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Adds a polling mechanism to the high-level KV batch retrieval APIs to mitigate “partial readiness” cases where requested fields are not yet fully available for the requested key(s).
Changes:
- Introduce polling in
kv_batch_getwhenfieldsis specified, retrying metadata retrieval until requested fields appear or a timeout is reached. - Introduce equivalent async polling in
async_kv_batch_get. - Add env-configurable timeout/interval knobs for the polling behavior.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| target_fields = set(fields) | ||
| current_fields = set(batch_meta.field_names) | ||
|
|
||
| not_ready_fields = target_fields - current_fields | ||
| begin_polling_time = time.time() | ||
| while not_ready_fields: | ||
| if time.time() - begin_polling_time > TQ_KV_POLLING_METADATA_TIMEOUT: | ||
| raise RuntimeError( | ||
| f"Timeout for kv_batch_get. Missing fields: {not_ready_fields}" | ||
| f" after {TQ_KV_POLLING_METADATA_TIMEOUT} seconds." | ||
| ) | ||
|
|
||
| logger.warning( | ||
| f"Fields {list(not_ready_fields)} are not ready yet! " | ||
| f"Retry in {TQ_KV_POLLING_METADATA_CHECK_INTERVAL} seconds." | ||
| ) | ||
|
|
||
| time.sleep(TQ_KV_POLLING_METADATA_CHECK_INTERVAL) | ||
| batch_meta = tq_client.kv_retrieve_keys(keys=keys, partition_id=partition_id, create=False) | ||
| current_fields = set(batch_meta.field_names) | ||
| not_ready_fields = target_fields - current_fields | ||
|
|
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The polling loop treats any missing requested field as "not ready" and will wait until timeout. If a caller passes a field name that is not registered for the partition, this now blocks and raises a timeout, whereas BatchMeta.select_fields() intentionally ignores unknown field names. Consider adding an explicit validation step to distinguish "not yet produced" vs "unknown/unregistered" fields (e.g., via the controller production-status API) and fail fast (or preserve the previous ignore semantics) instead of polling until timeout.
| target_fields = set(fields) | |
| current_fields = set(batch_meta.field_names) | |
| not_ready_fields = target_fields - current_fields | |
| begin_polling_time = time.time() | |
| while not_ready_fields: | |
| if time.time() - begin_polling_time > TQ_KV_POLLING_METADATA_TIMEOUT: | |
| raise RuntimeError( | |
| f"Timeout for kv_batch_get. Missing fields: {not_ready_fields}" | |
| f" after {TQ_KV_POLLING_METADATA_TIMEOUT} seconds." | |
| ) | |
| logger.warning( | |
| f"Fields {list(not_ready_fields)} are not ready yet! " | |
| f"Retry in {TQ_KV_POLLING_METADATA_CHECK_INTERVAL} seconds." | |
| ) | |
| time.sleep(TQ_KV_POLLING_METADATA_CHECK_INTERVAL) | |
| batch_meta = tq_client.kv_retrieve_keys(keys=keys, partition_id=partition_id, create=False) | |
| current_fields = set(batch_meta.field_names) | |
| not_ready_fields = target_fields - current_fields | |
| # NOTE: BatchMeta.select_fields() is expected to ignore unknown field names. | |
| # Avoid polling on missing fields here to prevent timeouts when callers | |
| # request fields that are not registered for the partition. |
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 14 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| TQ_KV_POLLING_METADATA_TIMEOUT = int(os.environ.get("TQ_KV_POLLING_METADATA_TIMEOUT", 10)) | ||
| TQ_KV_POLLING_METADATA_CHECK_INTERVAL = float(os.environ.get("TQ_KV_POLLING_METADATA_CHECK_INTERVAL", 0.5)) | ||
|
|
||
|
|
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The polling timeout/interval values come directly from env vars but aren’t validated. If TQ_KV_POLLING_METADATA_CHECK_INTERVAL is <= 0, time.sleep() will raise (negative) or the loop can become a busy-wait (0), and a negative timeout makes the behavior confusing. Consider validating/clamping these at import time (e.g., require timeout >= 0 and interval > 0, otherwise raise ValueError).
| TQ_KV_POLLING_METADATA_TIMEOUT = int(os.environ.get("TQ_KV_POLLING_METADATA_TIMEOUT", 10)) | |
| TQ_KV_POLLING_METADATA_CHECK_INTERVAL = float(os.environ.get("TQ_KV_POLLING_METADATA_CHECK_INTERVAL", 0.5)) | |
| # Validate polling configuration derived from environment variables at import time | |
| _raw_timeout = os.environ.get("TQ_KV_POLLING_METADATA_TIMEOUT", "10") | |
| try: | |
| TQ_KV_POLLING_METADATA_TIMEOUT = int(_raw_timeout) | |
| except ValueError as exc: | |
| raise ValueError( | |
| f"Invalid value for TQ_KV_POLLING_METADATA_TIMEOUT: {_raw_timeout!r}. " | |
| "Expected a non-negative integer." | |
| ) from exc | |
| if TQ_KV_POLLING_METADATA_TIMEOUT < 0: | |
| raise ValueError( | |
| f"TQ_KV_POLLING_METADATA_TIMEOUT must be >= 0, got {TQ_KV_POLLING_METADATA_TIMEOUT}." | |
| ) | |
| _raw_interval = os.environ.get("TQ_KV_POLLING_METADATA_CHECK_INTERVAL", "0.5") | |
| try: | |
| TQ_KV_POLLING_METADATA_CHECK_INTERVAL = float(_raw_interval) | |
| except ValueError as exc: | |
| raise ValueError( | |
| f"Invalid value for TQ_KV_POLLING_METADATA_CHECK_INTERVAL: {_raw_interval!r}. " | |
| "Expected a positive float." | |
| ) from exc | |
| if TQ_KV_POLLING_METADATA_CHECK_INTERVAL <= 0.0: | |
| raise ValueError( | |
| "TQ_KV_POLLING_METADATA_CHECK_INTERVAL must be > 0, " | |
| f"got {TQ_KV_POLLING_METADATA_CHECK_INTERVAL}." | |
| ) |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Background
Since TransferQueue does not treat each key as an atomic object, scenarios may arise where some fields of a key are ready while others are not (as shown below). This partial readiness issue can also occur when requesting multiple keys.
Why not happen in low-level APIs
In the low-level API workflow, consumers first call
tq_client.get_meta()beforetq_client.get_data().get_meta()is designed to block until it identifies enough samples where all requireddata_fieldsare ready, acting as a dynamic sample router. This mechanism inherently guarantees that data is fully produced and written to the TransferQueue before retrieval.However, in the high-level KV API, users provide specific keys directly. While these keys serve as the identifiers (similar to metadata), the new logic for resolving keys into
BatchMetalacked a validation step to ensure all requested fields were ready. This PR bridges that gap.Changes
This PR introduce a polling mechanism for
(async_)kv_batch_getto ensure data completeness when specific fields are requested.Polling Mechanism: When user-specified fields are provided, we checks if the retrieved metadata contains all requested fields. If not, it retries every
TQ_KV_POLLING_METADATA_CHECK_INTERVALseconds until theTQ_KV_POLLING_METADATA_TIMEOUTlimit is reached.