Skip to content

Conversation

@0oshowero0
Copy link
Collaborator

@0oshowero0 0oshowero0 commented Feb 12, 2026

Background

Since TransferQueue does not treat each key as an atomic object, scenarios may arise where some fields of a key are ready while others are not (as shown below). This partial readiness issue can also occur when requesting multiple keys.

image

Why not happen in low-level APIs

In the low-level API workflow, consumers first call tq_client.get_meta() before tq_client.get_data(). get_meta() is designed to block until it identifies enough samples where all required data_fields are ready, acting as a dynamic sample router. This mechanism inherently guarantees that data is fully produced and written to the TransferQueue before retrieval.

However, in the high-level KV API, users provide specific keys directly. While these keys serve as the identifiers (similar to metadata), the new logic for resolving keys into BatchMeta lacked a validation step to ensure all requested fields were ready. This PR bridges that gap.

Changes

This PR introduce a polling mechanism for (async_)kv_batch_get to ensure data completeness when specific fields are requested.

Polling Mechanism: When user-specified fields are provided, we checks if the retrieved metadata contains all requested fields. If not, it retries every TQ_KV_POLLING_METADATA_CHECK_INTERVAL seconds until the TQ_KV_POLLING_METADATA_TIMEOUT limit is reached.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copilot AI review requested due to automatic review settings February 12, 2026 01:46
@ascend-robot
Copy link

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a polling mechanism to the high-level KV batch retrieval APIs to mitigate “partial readiness” cases where requested fields are not yet fully available for the requested key(s).

Changes:

  • Introduce polling in kv_batch_get when fields is specified, retrying metadata retrieval until requested fields appear or a timeout is reached.
  • Introduce equivalent async polling in async_kv_batch_get.
  • Add env-configurable timeout/interval knobs for the polling behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 418 to 439
target_fields = set(fields)
current_fields = set(batch_meta.field_names)

not_ready_fields = target_fields - current_fields
begin_polling_time = time.time()
while not_ready_fields:
if time.time() - begin_polling_time > TQ_KV_POLLING_METADATA_TIMEOUT:
raise RuntimeError(
f"Timeout for kv_batch_get. Missing fields: {not_ready_fields}"
f" after {TQ_KV_POLLING_METADATA_TIMEOUT} seconds."
)

logger.warning(
f"Fields {list(not_ready_fields)} are not ready yet! "
f"Retry in {TQ_KV_POLLING_METADATA_CHECK_INTERVAL} seconds."
)

time.sleep(TQ_KV_POLLING_METADATA_CHECK_INTERVAL)
batch_meta = tq_client.kv_retrieve_keys(keys=keys, partition_id=partition_id, create=False)
current_fields = set(batch_meta.field_names)
not_ready_fields = target_fields - current_fields

Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The polling loop treats any missing requested field as "not ready" and will wait until timeout. If a caller passes a field name that is not registered for the partition, this now blocks and raises a timeout, whereas BatchMeta.select_fields() intentionally ignores unknown field names. Consider adding an explicit validation step to distinguish "not yet produced" vs "unknown/unregistered" fields (e.g., via the controller production-status API) and fail fast (or preserve the previous ignore semantics) instead of polling until timeout.

Suggested change
target_fields = set(fields)
current_fields = set(batch_meta.field_names)
not_ready_fields = target_fields - current_fields
begin_polling_time = time.time()
while not_ready_fields:
if time.time() - begin_polling_time > TQ_KV_POLLING_METADATA_TIMEOUT:
raise RuntimeError(
f"Timeout for kv_batch_get. Missing fields: {not_ready_fields}"
f" after {TQ_KV_POLLING_METADATA_TIMEOUT} seconds."
)
logger.warning(
f"Fields {list(not_ready_fields)} are not ready yet! "
f"Retry in {TQ_KV_POLLING_METADATA_CHECK_INTERVAL} seconds."
)
time.sleep(TQ_KV_POLLING_METADATA_CHECK_INTERVAL)
batch_meta = tq_client.kv_retrieve_keys(keys=keys, partition_id=partition_id, create=False)
current_fields = set(batch_meta.field_names)
not_ready_fields = target_fields - current_fields
# NOTE: BatchMeta.select_fields() is expected to ignore unknown field names.
# Avoid polling on missing fields here to prevent timeouts when callers
# request fields that are not registered for the partition.

Copilot uses AI. Check for mistakes.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot
Copy link

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 14 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +44 to 47
TQ_KV_POLLING_METADATA_TIMEOUT = int(os.environ.get("TQ_KV_POLLING_METADATA_TIMEOUT", 10))
TQ_KV_POLLING_METADATA_CHECK_INTERVAL = float(os.environ.get("TQ_KV_POLLING_METADATA_CHECK_INTERVAL", 0.5))


Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The polling timeout/interval values come directly from env vars but aren’t validated. If TQ_KV_POLLING_METADATA_CHECK_INTERVAL is <= 0, time.sleep() will raise (negative) or the loop can become a busy-wait (0), and a negative timeout makes the behavior confusing. Consider validating/clamping these at import time (e.g., require timeout >= 0 and interval > 0, otherwise raise ValueError).

Suggested change
TQ_KV_POLLING_METADATA_TIMEOUT = int(os.environ.get("TQ_KV_POLLING_METADATA_TIMEOUT", 10))
TQ_KV_POLLING_METADATA_CHECK_INTERVAL = float(os.environ.get("TQ_KV_POLLING_METADATA_CHECK_INTERVAL", 0.5))
# Validate polling configuration derived from environment variables at import time
_raw_timeout = os.environ.get("TQ_KV_POLLING_METADATA_TIMEOUT", "10")
try:
TQ_KV_POLLING_METADATA_TIMEOUT = int(_raw_timeout)
except ValueError as exc:
raise ValueError(
f"Invalid value for TQ_KV_POLLING_METADATA_TIMEOUT: {_raw_timeout!r}. "
"Expected a non-negative integer."
) from exc
if TQ_KV_POLLING_METADATA_TIMEOUT < 0:
raise ValueError(
f"TQ_KV_POLLING_METADATA_TIMEOUT must be >= 0, got {TQ_KV_POLLING_METADATA_TIMEOUT}."
)
_raw_interval = os.environ.get("TQ_KV_POLLING_METADATA_CHECK_INTERVAL", "0.5")
try:
TQ_KV_POLLING_METADATA_CHECK_INTERVAL = float(_raw_interval)
except ValueError as exc:
raise ValueError(
f"Invalid value for TQ_KV_POLLING_METADATA_CHECK_INTERVAL: {_raw_interval!r}. "
"Expected a positive float."
) from exc
if TQ_KV_POLLING_METADATA_CHECK_INTERVAL <= 0.0:
raise ValueError(
"TQ_KV_POLLING_METADATA_CHECK_INTERVAL must be > 0, "
f"got {TQ_KV_POLLING_METADATA_CHECK_INTERVAL}."
)

Copilot uses AI. Check for mistakes.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot
Copy link

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@ascend-robot
Copy link

CLA Signature Pass

0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants