Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
cmake_minimum_required(VERSION 3.25)
project(light_mem LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

find_package(Threads REQUIRED)

add_subdirectory(src)
Expand Down
175 changes: 127 additions & 48 deletions README.md
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,28 @@ LightMem serves as a storage optimization layer for LLM inference frameworks, of
- **Asynchronous I/O**: Non-blocking cache operations using multi-threaded task queues
- **Memory Efficiency**: Reduced GPU/CPU memory footprint by offloading KV cache to disk
- **Scalability**: Support for large-scale inference workloads with configurable storage sharding
- **Multi-Node Support**: Distributed shard ownership and cross-node deduplication via etcd + Redis

## Key Features

### Core Modules
| Module | Description |
|-----------------|-------------------------------------------------------------------------------------------------|
| **Storage** | Pluggable storage engine interface with local file system implementation |
| **Service** | Cache service layer managing read/write operations with task scheduling |
| **Task Queue** | Asynchronous task processing system with configurable worker threads |
| **Core** | Cache block management and task state tracking for reliable operations |
| Module | Description |
|---------------------|----------------------------------------------------------------------------------------------------|
| **Storage** | Pluggable storage engine interface with local file system implementation |
| **Service** | Cache service layer managing read/write operations with task scheduling |
| **Task Queue** | Asynchronous task processing system with configurable worker threads |
| **Core** | Cache block management and task state tracking for reliable operations |
| **Index (Redis)** | Global hash index for cross-node deduplication and fast cache lookup |
| **Coordinator (etcd)** | Distributed shard ownership via Rendezvous (HRW) hashing with lease-based failure detection |

### Architecture Highlights
- **Block-Level Management**: KV cache divided into fixed-size blocks for efficient I/O
- **Hash-Based Indexing**: Fast cache lookup using content-based hashing
- **Zero-Copy Design**: Direct memory mapping between PyTorch tensors and storage
- **Thread-Safe Operations**: Concurrent read/write support with fine-grained locking
- **HRW Shard Assignment**: Deterministic Rendezvous hashing minimizes shard movement on node join/leave
- **Epoch Fencing**: Write operations are bound to shard ownership epochs to prevent stale writes
- **Delayed Handoff (Drain)**: Ownership transfer waits for in-flight operations to complete before releasing

## Installation

Expand Down Expand Up @@ -68,18 +74,39 @@ conda install -c conda-forge cmake cxx-compiler boost libboost-devel
pip install torch
```

#### Using pip (Recommended)
#### Install optional dependencies for multi-node mode

**Docker (recommended — one command starts everything):**
- [Install Docker Engine](https://docs.docker.com/engine/install/) and the `docker compose` plugin
```bash
pip install -v .
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] \
https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" \
| sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update
sudo apt-get install docker-compose-plugin
```

**Local binaries (alternative to Docker):**

On Ubuntu/Debian:
```bash
sudo apt-get install redis-server etcd
```

On macOS:
```bash
brew install redis etcd
```

#### Build and install from source
On Conda:
```bash
# Build wheel package
python -m build --wheel
conda install -c conda-forge redis-server etcd
```

# Install the built wheel
pip install dist/*.whl
#### Using pip (Recommended)
```bash
pip install -v .
```

### Environment Variables
Expand All @@ -98,6 +125,48 @@ Controls the maximum size of each cache block in megabytes (MB).
- **Smaller blocks** (e.g., 16): More fine-grained control, better for random access, but higher overhead per operation
- Must be set before starting the cache service

#### Index Persistence (Optional)

LightMem can persist the hash index to an external Redis backend. When LightMem restarts (even after a process crash), as long as Redis is still running, LightMem can rebuild the in-memory hash index from it and continue using the existing local disk cache files.

It also writes recovery metadata to each shard's `meta` file using **SuperBlock + Journal (truncate mode)**. If Redis is not available (or index data is missing), LightMem falls back to replaying the local journal to rebuild the index.

Enable by passing `index_endpoint` to `PyLocalCacheService`:

```python
# Single-host default ports (Redis: 6379, etcd: 2379)
svc = PyLocalCacheService(
kvcache_tensor=kvcache_tensor,
file=file,
index_endpoint="127.0.0.1", # host-only: auto-use default ports
)

# Explicit ports
svc = PyLocalCacheService(
kvcache_tensor=kvcache_tensor,
file=file,
index_endpoint="127.0.0.1:6379",
coord_endpoints="127.0.0.1:2379",
)
```

#### `lightmem_server` (one command)

After installing LightMem, start all dependency services with a single command:

```bash
# Start Redis (index) + etcd (coordinator) via Docker Compose
lightmem_server --mode docker --index-port 6379 --coord-port 2379 --coord-peer-port 2380

# Start Redis (index) + etcd (coordinator) via local platform
lightmem_server --mode local --index-port 6379 --coord-port 2379 --coord-peer-port 2380

# Stop services and remove persistent volumes
lightmem_server --stop --purge-volumes
```

`lightmem_server` prints a ready-to-use Python snippet for `PyLocalCacheService` clients once services are up.

## Quick Start

### Key Concepts
Expand Down Expand Up @@ -178,6 +247,7 @@ LightMem has a layered architecture with C++ core and Python bindings:
- **PyLocalCacheService**: Main Python interface for cache operations
- **PyTask**: Python wrapper for task management
- **PyState**: Enum for task state tracking (Initial, Working, Finished, Aborted)
- **EtcdShardCoordinator**: Background thread managing distributed shard ownership via etcd

### C++ Core (Internal)

Expand All @@ -194,6 +264,39 @@ LightMem has a layered architecture with C++ core and Python bindings:
- **CacheBlock**: Individual block within a task, processed independently
- **TaskQueue**: Thread pool managing asynchronous task execution

## Multi-Node Distributed Cache

Multiple nodes share the same storage infrastructure: each node independently manages local shard files, coordinates ownership through **etcd**, and deduplicates via a **Redis** global hash index.

- **Shard ownership**: Assigned via Rendezvous (HRW) hashing — deterministic, minimal movement on join/leave, single owner per shard at any time
- **Ownership lifecycle**: `FREE → CLAIMED → DRAINING → FREE`. Owner keys are lease-bound; a crashed node's shards are reassigned automatically after `coord_ttl` seconds
- **Epoch fencing**: Each claim generates a monotonic epoch stored in the shard WAL/superblock, preventing stale writes from a previous owner
- **Cross-node dedup**: On write, each node checks `lightmem:global:index` in Redis; if another node already wrote the same hash, the write is skipped

### Setup

```bash
# 1. Start Redis + etcd (on any accessible host)
lightmem_server --mode docker --index-port 6379 --coord-port 2379 --coord-peer-port 2380
```

```python
# 2. Start each node (num_shard must be identical across all nodes)
svc = PyLocalCacheService(
kvcache_tensor=kv_cache,
file="./cache_storage",
storage_size=100 * 1024**3,
num_shard=128,
index_endpoint="192.168.1.10", # Redis host (default port 6379)
index_prefix="light_mem",
)

# 3. Graceful shutdown
svc.close()
```

> Multiple processes on the **same machine** must use different `coord_node_id` values.

## Configuration

### Block Size Configuration
Expand Down Expand Up @@ -221,49 +324,25 @@ num_shard=8 # Creates 8 separate storage files

### PyLocalCacheService

#### Constructor
```python
PyLocalCacheService(
kvcache_tensor: torch.Tensor,
file: str,
storage_size: int = 32 * 1024**3, # Default: 32GB
kvcache_tensor: torch.Tensor, # 2D uint8 tensor [num_pages, page_size], CPU, contiguous
file: str, # storage directory path
storage_size: int = 32 * 1024**3,
num_shard: int = 32,
num_worker: int = 16
num_worker: int = 16,
)
```
**Parameters:**
- `kvcache_tensor`: 2D uint8 tensor with shape `[num_pages, page_size]`, must be CPU and contiguous
- `file`: Path to storage directory/file
- `storage_size`: Total storage size in bytes (distributed across shards)
- `num_shard`: Number of storage file shards
- `num_worker`: Number of worker threads

#### Methods
- `query(hash_128s: List[int]) -> List[bool]`: Check if caches exist for given cumulative hashes
- Input: List of 128-bit cumulative hash integers
- Returns one boolean per block (not per hash)
- Data is grouped into blocks internally based on block_size / page_size
- `create(hash_128s: List[int], kv_page_indexer: torch.Tensor, mode: str, start_pos: int = 0) -> PyTask`: Create cache task
- `hash_128s`: List of 128-bit cumulative hash integers
- `kv_page_indexer`: Int32 tensor containing page indices, **must have the same length as hash_128s** (one-to-one mapping)
- `mode`: `"w"` for write, `"r"` for read
- `start_pos`: Optional starting position in token list (default: 0)
- `abort(task: PyTask)`: Cancel a running task
- `active_threads(mode: str) -> int`: Get count of active read/write tasks (`"w"` or `"r"`)

- `query(hash_128s) -> List[bool]`: check if blocks exist on disk (one bool per block)
- `create(hash_128s, kv_page_indexer, mode, start_pos=0) -> PyTask`: submit async read (`"r"`) or write (`"w"`) task; `kv_page_indexer` is an `int32` tensor same length as `hash_128s`
- `abort(task)`: cancel a task

### PyTask

#### Methods
- `ready() -> bool`: Check if all blocks are finished
- `data_safe() -> bool`: Check if source data can be safely modified (write: data copied; read: equivalent to ready())
- `state() -> List[PyState]`: Get PyState enum for each block
- `PyState.Initial` (0): Task just created
- `PyState.Working` (1): Task in progress
- `PyState.Finished` (2): Task completed successfully
- `PyState.Aborted` (3): Task aborted (possibly due to error)

#### Properties
- `page_already_list -> List[int]`: Get list of page indices already on disk (write mode: pages found in cache via hash query)
- `ready() -> bool`: all blocks done (finished or aborted)
- `data_safe() -> bool`: source tensor pages can be safely reused (write: data buffered; read: same as `ready()`)
- `state() -> List[PyState]`: per-block state — `Initial / Working / Finished / Aborted`

## Contributing

Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ dependencies = [
'numpy>=1.25.1'
]

[project.scripts]
lightmem_server = "light_mem.server_cli:main"

[build-system]
requires = ["py-build-cmake~=0.4.3"]
build-backend = "py_build_cmake.build"
Expand All @@ -17,7 +20,7 @@ name = "light_mem"
directory = "python"

[tool.py-build-cmake.sdist]
include = ["CMakeLists.txt", "src"]
include = ["CMakeLists.txt", "src", "python"]

[tool.py-build-cmake.cmake]
minimum_version = "3.24"
Expand Down
Loading