From 01606ff36cb8cac6130717a84cf7a2c23811b5e9 Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Mon, 26 Jan 2026 15:27:57 +0800 Subject: [PATCH 01/11] add ds zero copy in CPU Tensor Signed-off-by: Evelynn-V --- .github/workflows/python-package.yml | 2 +- tests/test_yuanrong_storage_manager.py | 8 ++++---- transfer_queue/storage/clients/yuanrong_client.py | 14 ++++++++++++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 2917c78..cadce43 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -31,7 +31,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install flake8 pytest build pytest_asyncio + python -m pip install flake8 pytest build pytest_asyncio pytest-mock python -m build --wheel pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu pip install dist/*.whl diff --git a/tests/test_yuanrong_storage_manager.py b/tests/test_yuanrong_storage_manager.py index 141d9dc..4c38b7c 100644 --- a/tests/test_yuanrong_storage_manager.py +++ b/tests/test_yuanrong_storage_manager.py @@ -33,7 +33,7 @@ class MockBuffer: def __init__(self, size): self.data = bytearray(size) - def mutable_data(self): + def MutableData(self): return self.data @@ -69,15 +69,15 @@ def mock_deserialization(items): except UnicodeDecodeError: return data - mocker.patch("transfer_queue.storage.clients.yuanrong_client.serialization", side_effect=mock_serialization) - mocker.patch("transfer_queue.storage.clients.yuanrong_client.deserialization", side_effect=mock_deserialization) + mocker.patch("transfer_queue.storage.clients.yuanrong_client._encoder.encode", side_effect=mock_serialization) + mocker.patch("transfer_queue.storage.clients.yuanrong_client._decoder.decode", side_effect=mock_deserialization) stored_raw_buffers = [] def side_effect_mcreate(keys, sizes): buffers = [MockBuffer(size) for size in sizes] for b in buffers: - stored_raw_buffers.append(b.mutable_data()) + stored_raw_buffers.append(b.MutableData()) return 0, buffers storage_client._cpu_ds_client.mcreate.side_effect = side_effect_mcreate diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 5fa5284..c233472 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -185,6 +185,12 @@ def _create_empty_npu_tensorlist(self, shapes, dtypes): return tensors def mset_zcopy(self, keys: list[str], objs: list[Any]): + """Store multiple objects in zero-copy mode using parallel serialization and buffer packing. + + Args: + keys (list[str]): List of string keys under which the objects will be stored. + objs (list[Any]): List of Python objects to store (e.g., tensors, strings). + """ items_list = [[memoryview(b) for b in _encoder.encode(obj)] for obj in objs] packed_sizes = [calc_packed_size(items) for items in items_list] status, buffers = self._cpu_ds_client.mcreate(keys, packed_sizes) @@ -194,6 +200,14 @@ def mset_zcopy(self, keys: list[str], objs: list[Any]): self._cpu_ds_client.mset_buffer(buffers) def mget_zcopy(self, keys: list[str]) -> list[Any]: + """Retrieve multiple objects in zero-copy mode by directly deserializing from shared memory buffers. + + Args: + keys (list[str]): List of string keys to retrieve from storage. + + Returns: + list[Any]: List of deserialized objects corresponding to the input keys. + """ status, buffers = self._cpu_ds_client.get_buffers(keys, timeout_ms=500) return [_decoder.decode(unpack_from(buffer)) if buffer is not None else None for buffer in buffers] From 6f340795e534f7714af241c9153423c8259c69b4 Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Fri, 6 Feb 2026 15:08:16 +0800 Subject: [PATCH 02/11] add ray_storage_manager to backend Signed-off-by: Evelynn-V --- transfer_queue/storage/__init__.py | 2 + .../storage/managers/ray_storage_manager.py | 39 +++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 transfer_queue/storage/managers/ray_storage_manager.py diff --git a/transfer_queue/storage/__init__.py b/transfer_queue/storage/__init__.py index bac9a4c..cdef95b 100644 --- a/transfer_queue/storage/__init__.py +++ b/transfer_queue/storage/__init__.py @@ -19,6 +19,7 @@ TransferQueueStorageManager, TransferQueueStorageManagerFactory, YuanrongStorageManager, + RayStorageManager, ) from .simple_backend import SimpleStorageUnit, StorageMetaGroup, StorageUnitData @@ -31,4 +32,5 @@ "AsyncSimpleStorageManager", "MooncakeStorageManager", "YuanrongStorageManager", + "RayStorageManager", ] diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py new file mode 100644 index 0000000..999f0e2 --- /dev/null +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -0,0 +1,39 @@ +# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright 2025 The TransferQueue Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +from typing import Any + +from transfer_queue.storage.managers.base import KVStorageManager +from transfer_queue.storage.managers.factory import TransferQueueStorageManagerFactory +from transfer_queue.utils.zmq_utils import ZMQServerInfo + +logger = logging.getLogger(__name__) +logger.setLevel(os.getenv("TQ_LOGGING_LEVEL", logging.WARNING)) + +@TransferQueueStorageManagerFactory.register("RayStore") +class RayStorageManager(KVStorageManager): + """Storage manager for Ray-RDT backend.""" + + def __init__(self, controller_info: ZMQServerInfo, config: dict[str, Any]): + client_name = config.get("client_name", None) + + if client_name is None: + logger.info("Missing 'client_name' in config, using default value('RayStorageClient')") + config["client_name"] = "RayStorageClient" + elif client_name != "RayStorageClient": + raise ValueError(f"Invalid 'client_name': {client_name} in config. Expecting 'RayStorageClient'") + super().__init__(controller_info, config) From ab7b406d8bc9b6664781611d1d21d8db1fdb765e Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Fri, 6 Feb 2026 15:26:14 +0800 Subject: [PATCH 03/11] fix the ci Signed-off-by: Evelynn-V --- transfer_queue/storage/managers/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/transfer_queue/storage/managers/__init__.py b/transfer_queue/storage/managers/__init__.py index 9702bb1..48d4073 100644 --- a/transfer_queue/storage/managers/__init__.py +++ b/transfer_queue/storage/managers/__init__.py @@ -18,6 +18,7 @@ from .mooncake_manager import MooncakeStorageManager from .simple_backend_manager import AsyncSimpleStorageManager from .yuanrong_manager import YuanrongStorageManager +from .ray_storage_manager import RayStorageManager __all__ = [ "TransferQueueStorageManager", @@ -25,4 +26,5 @@ "AsyncSimpleStorageManager", "YuanrongStorageManager", "MooncakeStorageManager", + "RayStorageManager", ] From f1e3c9658ad2aa0e1519ee03bbd5189e5536e71b Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Fri, 6 Feb 2026 15:44:06 +0800 Subject: [PATCH 04/11] fix pre-commit Signed-off-by: Evelynn-V --- transfer_queue/storage/__init__.py | 2 +- transfer_queue/storage/managers/__init__.py | 2 +- transfer_queue/storage/managers/ray_storage_manager.py | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/transfer_queue/storage/__init__.py b/transfer_queue/storage/__init__.py index cdef95b..04b0745 100644 --- a/transfer_queue/storage/__init__.py +++ b/transfer_queue/storage/__init__.py @@ -16,10 +16,10 @@ from .managers import ( AsyncSimpleStorageManager, MooncakeStorageManager, + RayStorageManager, TransferQueueStorageManager, TransferQueueStorageManagerFactory, YuanrongStorageManager, - RayStorageManager, ) from .simple_backend import SimpleStorageUnit, StorageMetaGroup, StorageUnitData diff --git a/transfer_queue/storage/managers/__init__.py b/transfer_queue/storage/managers/__init__.py index 48d4073..77954bb 100644 --- a/transfer_queue/storage/managers/__init__.py +++ b/transfer_queue/storage/managers/__init__.py @@ -16,9 +16,9 @@ from .base import TransferQueueStorageManager from .factory import TransferQueueStorageManagerFactory from .mooncake_manager import MooncakeStorageManager +from .ray_storage_manager import RayStorageManager from .simple_backend_manager import AsyncSimpleStorageManager from .yuanrong_manager import YuanrongStorageManager -from .ray_storage_manager import RayStorageManager __all__ = [ "TransferQueueStorageManager", diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py index 999f0e2..79d37f3 100644 --- a/transfer_queue/storage/managers/ray_storage_manager.py +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -24,6 +24,7 @@ logger = logging.getLogger(__name__) logger.setLevel(os.getenv("TQ_LOGGING_LEVEL", logging.WARNING)) + @TransferQueueStorageManagerFactory.register("RayStore") class RayStorageManager(KVStorageManager): """Storage manager for Ray-RDT backend.""" From 0261eeddef39d5d6de79a1bfe08325e309f48f07 Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Fri, 6 Feb 2026 16:31:11 +0800 Subject: [PATCH 05/11] add RDT to config.yaml Signed-off-by: Evelynn-V --- transfer_queue/config.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index 9503afa..98983e5 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -24,6 +24,9 @@ backend: # ZMQ Server IP & Ports (automatically generated during init) zmq_info: null + RayStore: + client_name: RayStorageClient + # For Yuanrong: # TODO From 2a2f48e7c300e7d958507b8e1454c91db7c5353e Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Tue, 10 Feb 2026 19:24:59 +0800 Subject: [PATCH 06/11] fix Signed-off-by: Evelynn-V --- transfer_queue/config.yaml | 1 - .../storage/managers/ray_storage_manager.py | 21 ++++++++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index 98983e5..c0ddfe7 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -25,7 +25,6 @@ backend: zmq_info: null RayStore: - client_name: RayStorageClient # For Yuanrong: # TODO diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py index 79d37f3..f7b713b 100644 --- a/transfer_queue/storage/managers/ray_storage_manager.py +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -30,11 +30,16 @@ class RayStorageManager(KVStorageManager): """Storage manager for Ray-RDT backend.""" def __init__(self, controller_info: ZMQServerInfo, config: dict[str, Any]): - client_name = config.get("client_name", None) - - if client_name is None: - logger.info("Missing 'client_name' in config, using default value('RayStorageClient')") - config["client_name"] = "RayStorageClient" - elif client_name != "RayStorageClient": - raise ValueError(f"Invalid 'client_name': {client_name} in config. Expecting 'RayStorageClient'") - super().__init__(controller_info, config) + client_name = config.get("client_name") + if client_name is not None and client_name != "RayStorageClient": + raise ValueError( + f"Invalid 'client_name': {client_name} in config. " + f"RayStorageManager only supports 'RayStorageClient'" + ) + + ray_storage_client_name = "RayStorageClient" + + super().__init__( + controller_info, + {**config, "client_name": ray_storage_client_name} + ) From 1eecf0a6801af0c150bea423b07a1ad3e57318fd Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Tue, 10 Feb 2026 19:25:31 +0800 Subject: [PATCH 07/11] fix Signed-off-by: Evelynn-V --- .../storage/managers/ray_storage_manager.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py index f7b713b..3c795d7 100644 --- a/transfer_queue/storage/managers/ray_storage_manager.py +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -33,13 +33,9 @@ def __init__(self, controller_info: ZMQServerInfo, config: dict[str, Any]): client_name = config.get("client_name") if client_name is not None and client_name != "RayStorageClient": raise ValueError( - f"Invalid 'client_name': {client_name} in config. " - f"RayStorageManager only supports 'RayStorageClient'" + f"Invalid 'client_name': {client_name} in config. RayStorageManager only supports 'RayStorageClient'" ) - + ray_storage_client_name = "RayStorageClient" - - super().__init__( - controller_info, - {**config, "client_name": ray_storage_client_name} - ) + + super().__init__(controller_info, {**config, "client_name": ray_storage_client_name}) From ff74015f0e3a3581a5e0e0f4528bf9f2b25042fc Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Tue, 10 Feb 2026 20:08:48 +0800 Subject: [PATCH 08/11] fix Signed-off-by: Evelynn-V --- .../storage/managers/ray_storage_manager.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py index 3c795d7..a7c3f8f 100644 --- a/transfer_queue/storage/managers/ray_storage_manager.py +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -30,12 +30,9 @@ class RayStorageManager(KVStorageManager): """Storage manager for Ray-RDT backend.""" def __init__(self, controller_info: ZMQServerInfo, config: dict[str, Any]): - client_name = config.get("client_name") - if client_name is not None and client_name != "RayStorageClient": + config = (config or {}).copy() + if config.get("client_name") not in (None, "RayStorageClient"): raise ValueError( - f"Invalid 'client_name': {client_name} in config. RayStorageManager only supports 'RayStorageClient'" + f"RayStorageManager only supports 'RayStorageClient', got: {config.get('client_name')}" ) - - ray_storage_client_name = "RayStorageClient" - - super().__init__(controller_info, {**config, "client_name": ray_storage_client_name}) + super().__init__(controller_info, {**config, "client_name": "RayStorageClient"}) From d401e761634925656d4195dfd438abf3d7ede972 Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Tue, 10 Feb 2026 20:09:01 +0800 Subject: [PATCH 09/11] fix Signed-off-by: Evelynn-V --- transfer_queue/storage/managers/ray_storage_manager.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py index a7c3f8f..67b1f9e 100644 --- a/transfer_queue/storage/managers/ray_storage_manager.py +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -30,9 +30,7 @@ class RayStorageManager(KVStorageManager): """Storage manager for Ray-RDT backend.""" def __init__(self, controller_info: ZMQServerInfo, config: dict[str, Any]): - config = (config or {}).copy() + config = (config or {}).copy() if config.get("client_name") not in (None, "RayStorageClient"): - raise ValueError( - f"RayStorageManager only supports 'RayStorageClient', got: {config.get('client_name')}" - ) + raise ValueError(f"RayStorageManager only supports 'RayStorageClient', got: {config.get('client_name')}") super().__init__(controller_info, {**config, "client_name": "RayStorageClient"}) From f77be1482465da45ae75edd1e5d803333dc5ab83 Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Wed, 11 Feb 2026 09:11:48 +0800 Subject: [PATCH 10/11] fix Signed-off-by: Evelynn-V --- transfer_queue/storage/managers/ray_storage_manager.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py index 67b1f9e..b8b84a7 100644 --- a/transfer_queue/storage/managers/ray_storage_manager.py +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -21,10 +21,6 @@ from transfer_queue.storage.managers.factory import TransferQueueStorageManagerFactory from transfer_queue.utils.zmq_utils import ZMQServerInfo -logger = logging.getLogger(__name__) -logger.setLevel(os.getenv("TQ_LOGGING_LEVEL", logging.WARNING)) - - @TransferQueueStorageManagerFactory.register("RayStore") class RayStorageManager(KVStorageManager): """Storage manager for Ray-RDT backend.""" From 585f159c5a93c5c829326a73afa10ea7aa733ccf Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Wed, 11 Feb 2026 09:12:03 +0800 Subject: [PATCH 11/11] fix Signed-off-by: Evelynn-V --- transfer_queue/storage/managers/ray_storage_manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py index b8b84a7..78069c2 100644 --- a/transfer_queue/storage/managers/ray_storage_manager.py +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -13,14 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import os from typing import Any from transfer_queue.storage.managers.base import KVStorageManager from transfer_queue.storage.managers.factory import TransferQueueStorageManagerFactory from transfer_queue.utils.zmq_utils import ZMQServerInfo + @TransferQueueStorageManagerFactory.register("RayStore") class RayStorageManager(KVStorageManager): """Storage manager for Ray-RDT backend."""