Skip to content

Commit fd95218

Browse files
authored
[feat] Introduce high-level key-value (KV) interface (#28)
## Summary This PR introduces a **High-Level Key-Value (KV) Interface** to TransferQueue, offering a Redis-style API that can enjoy most of the advanced features provided by TransferQueue. ## Background In previous versions of TransferQueue, the learning curve was relatively sharp for new users. To perform basic operations, users had to: 1. Understand `BatchMeta` `SampleMeta` and `FieldMeta` design (as illustrated in [tutorial/02_metadat_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/02_metadata_concepts.py) 2. Navigate the flexible but complex [`TransferQueueClient`](https://github.com/Ascend/TransferQueue/blob/main/transfer_queue/client.py) API. Although PR #26 simplified the initialization process, the core interaction still required exposing low-level details. This PR bridges that gap by providing a familiar, easy-to-use KV abstraction. ## TransferQueue API Architecture With this PR, TransferQueue now supports a two-level API architecture to satisfy different user needs. | Level | Tier | Style | Fine-Grained Access | Streaming | Sampler | Multiple-Backends | |---|---|---|---|---|---|---| | High | **KV Interface** (this PR) | Put/Get/List/Clear | ✓ | ○ | ✗ | ✓ | | High | **StreamingDataLoader** (#23) | PyTorch DataLoader | ✓ |✓ | ✓ | ✓ | | Low | **TransferQueueClient** | Metadata-based | ✓ | ✓ | ✓ | ✓ | ### High-Level API #### Key-Value based API (This PR) **Methods** - **(async_)kv_put**: Insert/Update a multi-column sample by key, with optional metadata tag - **(async_)kv_batch_put**: Put multiple key-value pairs efficiently in batch - **(async_)kv_batch_get**: Retrieve samples (by keys), supporting column selection (by fields) - **(async_)kv_list**: List keys and tags (metadata) in a partition - **(async_)kv_clear**: Remove key-value pairs from storage **Key Features** - **Redis-style Semantics**: Familiar KV interface (Put/Get/List) for zero learning curve - **Fine-grained Access**: Update or retrieve specific fields (columns) within a key (row) without full op. - **Partition Isolation**: Logical separation of storage namespaces - **Metadata Tags**: Lightweight metadata for status tracking - **Pluggable Backends**: Supports multiple backends #### StreamingDataLoader API Refer to our [RoadMap](#1) and related PRs(#23). The usage example can be found in [tutorial/06_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/06_streaming_dataloader.py). ### Low-Level API Directly manipulate the `TransferQueueClient`. Refer to [tutorial/03_metadata_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/03_metadata_concepts.py), [tutorial/04_understanding_controller.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/04_understanding_controller.py) and [tutorial/05_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_custom_sampler.py) for details. ## Usage Example Please refer to [tutorial/02_kv_interface.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/02_kv_interface.py) and [tests/e2e/test_kv_interface_e2e.py](https://github.com/Ascend/TransferQueue/blob/main/tests/e2e/test_kv_interface_e2e.py) for details. ```python3 import torch from tensordict import TensorDict import transfer_queue as tq # initialize TQ tq.init() # prepare data batch_input_ids = torch.tensor( [ [4, 5, 6], [7, 8, 9], [10, 11, 12], [13, 14, 15], ] ) batch_attention_mask = torch.ones_like(batch_input_ids) data_batch = TensorDict( { "input_ids": batch_input_ids, "attention_mask": batch_attention_mask, }, batch_size=batch_input_ids.size(0), ) keys = ["1_0", "1_1", "1_2", "2_0"] # 4 keys for 4 samples tags = [{"global_steps": 1, "status": "running", "model_version": 1} for _ in range(len(keys))] partition_id = "test" # use kv interface to put into TQ tq.kv_batch_put(keys=keys, partition_id=partition_id, fields=data_batch, tags=tags) # list all keys and tags all_keys, all_tags = tq.kv_list(partition_id=partition_id) for k, t in zip(all_keys, all_tags, strict=False): print(f" - key='{k}' | tag={t}") # retrieve all data retrieved_all = tq.kv_batch_get(keys=all_keys, partition_id=partition_id) print(f" Fields: {list(retrieved_all.keys())}") ``` ## Use Cases & Limitations **Best For**: - Scenarios requiring fine-grained data access (e.g., updating a reward score for a specific prompt). - Integration with external ReplayBuffers or Single-Controller architectures that manage sample dispatching logic. **Limitations (vs. Streaming/Low-level APIs):** - No built-in production/consumption tracking: Users must manually check status via tags or manage logic externally. - No Built-in Sampler: Must implement data dispatch by ReplayBuffer or single-controller externally. - Not Fully Streaming: Consumers typically wait for a controller to dispatch `keys` before fetching, rather than a continuous stream. --------- Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
1 parent 7c9e970 commit fd95218

23 files changed

Lines changed: 3117 additions & 865 deletions

.github/workflows/python-package.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ on:
1616
jobs:
1717
build:
1818
runs-on: ubuntu-latest
19-
timeout-minutes: 10
19+
timeout-minutes: 15
2020
strategy:
2121
fail-fast: false
2222
matrix:

.github/workflows/tutorial-check.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,5 @@ jobs:
3636
- name: Run tutorials
3737
run: |
3838
export TQ_NUM_THREADS=2
39+
export RAY_DEDUP_LOGS=0
3940
for file in tutorial/*.py; do python3 "$file"; done

README.md

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ TransferQueue offers **fine-grained, sub-sample-level** data management and **lo
3131

3232
<h2 id="updates">🔄 Updates</h2>
3333

34-
- **Jan 28, 2026**: We experimentally introduce `StreamingDataloader` interface for fully-streamed production-consumption pipeline. Refer to our [tutorials/05_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_streaming_dataloader.py) for details.
34+
- **Feb 8, 2026**: 🔥 The initialization and usage is greatly simplified by high-level APIs [PR#26](https://github.com/Ascend/TransferQueue/pull/26), [PR#28](https://github.com/Ascend/TransferQueue/pull/28). You can now use a Redis-style API to take advantage of most of the advanced features provided by TransferQueue!
35+
- **Jan 28, 2026**: We experimentally introduce `StreamingDataLoader` interface for fully-streamed production-consumption pipeline. Refer to our [tutorials/06_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/06_streaming_dataloader.py) for details.
3536
- **Dec 30, 2025**: **TransferQueue x verl** integration is tested with the DAPO algorithm at scale **(64 nodes, 1024 cards)**. It significantly optimizes host memory utilization and accelerates data transfers. Stay tuned for more details!
3637
- **Dec 20, 2025**: 🔥 The official [tutorial](https://github.com/Ascend/TransferQueue/tree/main/tutorial) is released! Feel free to check it out.
3738
- **Nov 10, 2025**: We disentangle the data retrieval logic from TransferQueueController [PR#101](https://github.com/TransferQueue/TransferQueue/pull/101). Now you can implement your own `Sampler` to control how to consume the data.
@@ -91,26 +92,55 @@ This data structure design is motivated by the computational characteristics of
9192
<img src="https://github.com/TransferQueue/community_doc/blob/main/docs/data_plane.png?raw=true" width="70%">
9293
</p>
9394

94-
### User Interface: Asynchronous & Synchronous Client
95-
To simplify the usage of TransferQueue, we have encapsulated this process into `TransferQueueClient`. The client provides both asynchronous and synchronous interfaces for data transfer, allowing users to easily integrate TransferQueue into their framework.
95+
### User Interface: High-Level & Low-Level APIs
9696

97-
We also experimentally provide a `StreamingDataLoader` interface as a standard PyTorch DataLoader. Leveraging this abstraction, each rank can automatically get its own data like `DataLoader` in PyTorch. The TransferQueue system will handle the underlying data scheduling and transfer logic caused by different parallelism strategies, significantly simplifying the design of disaggregated frameworks.
98-
This interface simplifies TransferQueue's integration, ensuring seamless compatibility with existing training workflows. Please refer to our [Roadmap](https://github.com/Ascend/TransferQueue/issues/1) and [tutorials/05_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_streaming_dataloader.py) for more details.
97+
| Level | Tier | Style | Fine-Grained Access | Streaming | Sampler | Multiple-Backends |
98+
|---|---|---|---|------------------|---|---|
99+
| High | **KV Interface** (this PR) | Put/Get/List/Clear |||||
100+
| High | **StreamingDataLoader** (#23) | PyTorch DataLoader |||||
101+
| Low | **TransferQueueClient** | Metadata-based |||||
99102

100-
<h2 id="show-cases">🔥 Showcases</h2>
101103

102-
### General Usage
104+
#### Key-Value based API
105+
106+
To simplify the usage of TransferQueue, we have provided a Redis-style high-level API that can enjoy most of the advanced features provided by TransferQueue ([#PR28](https://github.com/Ascend/TransferQueue/pull/28)).
107+
108+
**Methods**
109+
110+
- **(async_)kv_put**: Insert/Update a multi-column sample by key, with optional metadata tag
111+
- **(async_)kv_batch_put**: Put multiple key-value pairs efficiently in batch
112+
- **(async_)kv_batch_get**: Retrieve samples (by keys), supporting column selection (by fields)
113+
- **(async_)kv_list**: List keys and tags (metadata) in a partition
114+
- **(async_)kv_clear**: Remove key-value pairs from storage
115+
116+
**Key Features**
117+
118+
- **Redis-style Semantics**: Familiar KV interface (Put/Get/List) for zero learning curve
119+
- **Fine-grained Access**: Update or retrieve specific fields (columns) within a key (row) without full op.
120+
- **Partition Isolation**: Logical separation of storage namespaces
121+
- **Metadata Tags**: Lightweight metadata for status tracking
122+
- **Pluggable Backends**: Supports multiple backends
123+
124+
#### StreamingDataLoader API
103125

104-
The primary interaction points are `AsyncTransferQueueClient` and `TransferQueueClient`, serving as the communication interface with the TransferQueue system.
126+
Designed as a drop-in replacement for the standard PyTorch `DataLoader`, this API allows each rank to automatically consume data without single-controller intervention.
105127

106-
Core interfaces:
128+
In this scenario, `TransferQueueController` serves as a side-controller for data dispatching, with user-defined `Sampler` class to organize dataflow.
129+
It encapsulates the complex scheduling and data transfer logic required for various parallelism strategies, seamlessly integrating TransferQueue into existing training workflows and simplifying the development of disaggregated frameworks.
107130

108-
- `(async_)get_meta(data_fields: list[str], batch_size:int, partition_id: str, mode: str, task_name:str, sampling_config: Optional[dict[str, Any]]) -> BatchMeta`
109-
- `(async_)get_data(metadata: BatchMeta) -> TensorDict`
110-
- `(async_)put(data: TensorDict, metadata: Optional[BatchMeta], partition_id: Optional[str])`
111-
- `(async_)clear_partition(partition_id: str)` and `(async_)clear_samples(metadata: BatchMeta)`
131+
See [Roadmap](https://github.com/Ascend/TransferQueue/issues/1) and [tutorials/06_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/06_streaming_dataloader.py) for more details.
112132

113-
<span style="color: #FF0000;">**Refer to our [tutorial](https://github.com/Ascend/TransferQueue/tree/main/tutorial) for detailed examples.**</span>
133+
#### Low-Level Native API
134+
135+
The native interface of TransferQueue are implemented in `TransferQueueClient`. It offers maximum flexibility through native, atomic operations.
136+
137+
Developers can leverage `TransferQueueClient` directly to implement advanced features that require fine-grained control and fully streamed data scheduling, as illustrated in the following tutorials:
138+
- [tutorial/03_metadata_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/03_metadata_concepts.py)
139+
- [tutorial/04_understanding_controller.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/04_understanding_controller.py)
140+
- [tutorial/05_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_custom_sampler.py)
141+
142+
143+
<h2 id="show-cases">🔥 Showcases</h2>
114144

115145
### Collocated Example
116146

@@ -131,7 +161,7 @@ You may refer to the [recipe](https://github.com/Ascend/TransferQueue/tree/dev/r
131161

132162
### Disaggregated Example
133163

134-
We have implemented a series of PRs ([#4](https://github.com/Ascend/TransferQueue/pull/4), [#7](https://github.com/Ascend/TransferQueue/pull/7), [#9](https://github.com/Ascend/TransferQueue/pull/9), [#16](https://github.com/Ascend/TransferQueue/pull/16)) to establish a **standardized, fully-streamed distributed** workflow via TransferQueue.
164+
We have experimentally implemented a **standardized, fully-streamed distributed** workflow via TransferQueue.
135165

136166
By leveraging the `RankAwareSampler` and `StreamingDataLoader` interfaces, we achieve a **streamlined micro-batch-level producer-consumer pipeline**. This design eliminates the need to manually determine data dispatching logic across varying parallelism strategies—a typical complexity in the single-controller paradigm—thereby greatly simplifying framework design.
137167

@@ -186,7 +216,7 @@ pip install TransferQueue
186216
<img src="https://github.com/TransferQueue/community_doc/blob/main/docs/performance_0.1.1.dev2.png?raw=true" width="100%">
187217
</p>
188218

189-
> Note: The above benchmark for TransferQueue is based on our naive `SimpleStorageUnit` backend. By introducing high-performance storage backends and optimizing serialization/deserialization, we expect to achieve even better performance. Warmly welcome contributions from the community!
219+
> Note: The above benchmark for TransferQueue is based on our naive `SimpleStorage` backend. By introducing high-performance storage backends and optimizing serialization/deserialization, we expect to achieve even better performance. Warmly welcome contributions from the community!
190220
191221
For detailed performance benchmarks, please refer to [this blog](https://www.yuque.com/haomingzi-lfse7/hlx5g0/tml8ke0zkgn6roey?singleDoc#).
192222

@@ -250,7 +280,7 @@ batch_meta = client.get_meta(
250280
)
251281
```
252282

253-
<span style="color: #FF0000;">**Refer to [tutorial/04_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/04_custom_sampler.py) for more details.**</span>
283+
<span style="color: #FF0000;">**Refer to [tutorial/05_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_custom_sampler.py) for more details.**</span>
254284

255285

256286
### How to integrate a new storage backend
@@ -299,21 +329,6 @@ pip install pre-commit
299329
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always
300330
```
301331

302-
<h2 id="roadmap"> 🛣️ Roadmap</h2>
303-
304-
- [x] Support data rewrite for partial rollout & agentic post-training
305-
- [x] Provide a general storage abstraction layer `TransferQueueStorageManager` to manage distributed storage units, which simplifies `Client` design and makes it possible to introduce different storage backends ([PR#66](https://github.com/TransferQueue/TransferQueue/pull/66), [issue#72](https://github.com/TransferQueue/TransferQueue/issues/72))
306-
- [x] Implement `AsyncSimpleStorageManager` as the default storage backend based on the `TransferQueueStorageManager` abstraction
307-
- [x] Provide a `KVStorageManager` to cover all the KV based storage backends ([PR#96](https://github.com/TransferQueue/TransferQueue/pull/96))
308-
- [x] Support topic-based data partitioning to maintain train/val/test data simultaneously ([PR#98](https://github.com/TransferQueue/TransferQueue/pull/98))
309-
- [x] Release the first stable version through PyPI
310-
- [ ] Support disaggregated framework (each rank retrieves its own data without going through a centralized node)
311-
- [ ] Provide a `StreamingDataLoader` interface for disaggregated framework
312-
- [ ] Support load-balancing and dynamic batching
313-
- [x] Support high-performance storage backends for RDMA transmission (e.g., [Mooncake Store](https://github.com/kvcache-ai/Mooncake), [Ray Direct Transport](https://docs.ray.io/en/master/ray-core/direct-transport.html)...)
314-
- [x] High-performance serialization and deserialization
315-
- [ ] More documentation, examples and tutorials
316-
317332
<h2 id="citation">📑 Citation</h2>
318333
Please kindly cite our paper if you find this repo is useful:
319334

0 commit comments

Comments
 (0)