Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions src/memos/api/handlers/cube_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
Cube handler for memory cube management operations.

This module handles cube creation and registration through the HTTP API.
"""

from fastapi import HTTPException

from memos.api.handlers.base_handler import BaseHandler
from memos.api.product_models import (
CreateCubeRequest,
CreateCubeResponse,
CreateCubeResponseData,
RegisterCubeRequest,
RegisterCubeResponse,
RegisterCubeResponseData,
)
from memos.log import get_logger
from memos.mem_user.user_manager import UserManager


logger = get_logger(__name__)


class CubeHandler(BaseHandler):
"""Handler for memory cube management operations."""

def __init__(self, *args, **kwargs):
"""Initialize CubeHandler with dependencies."""
super().__init__(*args, **kwargs)
# Initialize UserManager for cube operations
# Use graph_db as the backend for user/cube management
self.user_manager = UserManager()

async def create_cube(self, request: CreateCubeRequest) -> CreateCubeResponse:
"""Create a new memory cube for a user.

Args:
request: Cube creation request

Returns:
CreateCubeResponse with created cube details

Raises:
HTTPException: If cube creation fails
"""
try:
# Validate owner exists
if not self.user_manager.validate_user(request.owner_id):
raise ValueError(f"Owner user '{request.owner_id}' does not exist or is inactive")

# Create cube via UserManager
created_cube_id = self.user_manager.create_cube(
cube_name=request.cube_name,
owner_id=request.owner_id,
cube_path=request.cube_path,
cube_id=request.cube_id,
)

logger.info(f"Created cube: {created_cube_id} for owner: {request.owner_id}")

return CreateCubeResponse(
code=200,
message="Cube created successfully",
data=CreateCubeResponseData(
cube_id=created_cube_id,
cube_name=request.cube_name,
owner_id=request.owner_id,
),
)

except ValueError as e:
logger.error(f"Validation error creating cube: {e}")
raise HTTPException(status_code=400, detail=str(e)) from e
except Exception as e:
logger.error(f"Failed to create cube: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to create cube: {e!s}") from e

async def register_cube(self, request: RegisterCubeRequest) -> RegisterCubeResponse:
"""Register an existing memory cube with the system.

Note: This endpoint currently validates the request but the actual registration
logic requires integration with MOSCore.register_mem_cube(), which is not
directly available in the API server context. This is a placeholder that
validates inputs and could be enhanced when the architecture supports it.

Args:
request: Cube registration request

Returns:
RegisterCubeResponse with registration details

Raises:
HTTPException: If registration fails
"""
try:
# Validate user exists if provided
if request.user_id and not self.user_manager.validate_user(request.user_id):
raise ValueError(f"User '{request.user_id}' does not exist or is inactive")

# Note: Full registration logic requires MOSCore which isn't available
# in the current API architecture. This validates the request.
# Future work: integrate with MOSCore.register_mem_cube()

# Use provided cube_id or fall back to name_or_path as identifier
final_cube_id = request.mem_cube_id or request.mem_cube_name_or_path

logger.info(f"Cube registration validated: {final_cube_id}")
logger.warning(
"register_cube endpoint validates inputs but full registration "
"requires MOSCore integration (not yet available in API context)"
)

return RegisterCubeResponse(
code=200,
message="Cube registration request validated (full registration pending architecture support)",
data=RegisterCubeResponseData(
cube_id=final_cube_id,
cube_name_or_path=request.mem_cube_name_or_path,
),
)

except ValueError as e:
logger.error(f"Validation error registering cube: {e}")
raise HTTPException(status_code=400, detail=str(e)) from e
except Exception as e:
logger.error(f"Failed to register cube: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to register cube: {e!s}") from e
63 changes: 63 additions & 0 deletions src/memos/api/product_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,69 @@ class MemCubeRegister(BaseRequest):
mem_cube_id: str | None = Field(None, description="ID for the MemCube")


class CreateCubeRequest(BaseRequest):
"""Request model for creating a new memory cube."""

cube_name: str = Field(..., description="Human-readable name for the memory cube")
owner_id: str = Field(..., description="User ID of the cube owner")
cube_path: str | None = Field(None, description="File system path where cube data will be stored")
cube_id: str | None = Field(
None,
description=(
"Custom unique identifier for the cube. If not provided, one will be generated. "
"Note: cube_id is also referred to as mem_cube_id throughout the API."
),
)


class CreateCubeResponseData(BaseModel):
"""Data model for cube creation response."""

cube_id: str = Field(..., description="The created cube ID (also called mem_cube_id)")
cube_name: str = Field(..., description="Name of the created cube")
owner_id: str = Field(..., description="Owner user ID")


class CreateCubeResponse(BaseResponse[CreateCubeResponseData]):
"""Response model for cube creation."""

message: str = "Cube created successfully"


class RegisterCubeRequest(BaseRequest):
"""Request model for registering an existing memory cube.

This allows loading a cube from disk or creating a new one if the path doesn't exist.
"""

mem_cube_name_or_path: str = Field(
..., description="File path to the memory cube or name for a new cube"
)
mem_cube_id: str | None = Field(
None,
description=(
"Custom identifier for the cube (also called cube_id). "
"If not provided, one will be generated."
),
)
user_id: str | None = Field(
None, description="User ID to associate with the cube. If not provided, uses default user"
)


class RegisterCubeResponseData(BaseModel):
"""Data model for cube registration response."""

cube_id: str = Field(..., description="The registered cube ID (also called mem_cube_id)")
cube_name_or_path: str = Field(..., description="Name or path of the registered cube")


class RegisterCubeResponse(BaseResponse[RegisterCubeResponseData]):
"""Response model for cube registration."""

message: str = "Cube registered successfully"


class ChatRequest(BaseRequest):
"""Request model for chat operations.

Expand Down
53 changes: 53 additions & 0 deletions src/memos/api/routers/server_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from memos.api.handlers.add_handler import AddHandler
from memos.api.handlers.base_handler import HandlerDependencies
from memos.api.handlers.chat_handler import ChatHandler
from memos.api.handlers.cube_handler import CubeHandler
from memos.api.handlers.feedback_handler import FeedbackHandler
from memos.api.handlers.search_handler import SearchHandler
from memos.api.product_models import (
Expand All @@ -32,6 +33,8 @@
ChatBusinessRequest,
ChatPlaygroundRequest,
ChatRequest,
CreateCubeRequest,
CreateCubeResponse,
DeleteMemoryByRecordIdRequest,
DeleteMemoryByRecordIdResponse,
DeleteMemoryRequest,
Expand All @@ -47,6 +50,8 @@
MemoryResponse,
RecoverMemoryByRecordIdRequest,
RecoverMemoryByRecordIdResponse,
RegisterCubeRequest,
RegisterCubeResponse,
SearchResponse,
StatusResponse,
SuggestionRequest,
Expand Down Expand Up @@ -87,6 +92,7 @@
else None
)
feedback_handler = FeedbackHandler(dependencies)
cube_handler = CubeHandler(dependencies)
# Extract commonly used components for function-based handlers
# (These can be accessed from the components dict without unpacking all of them)
mem_scheduler: BaseScheduler = components["mem_scheduler"]
Expand Down Expand Up @@ -128,6 +134,53 @@ def add_memories(add_req: APIADDRequest):
return add_handler.handle_add_memories(add_req)


# =============================================================================
# Cube Management API Endpoints
# =============================================================================


@router.post("/create_cube", summary="Create a new memory cube", response_model=CreateCubeResponse)
async def create_cube(request: CreateCubeRequest) -> CreateCubeResponse:
"""
Create a new memory cube for a user.

Memory cubes are containers that store different types of memories (textual, activation, parametric).
Each cube can be owned by a user and shared with other users.

**Note on cube_id vs mem_cube_id:**
These terms are used interchangeably throughout the API:
- `cube_id` is the canonical identifier for a cube
- `mem_cube_id` appears in many legacy endpoints and means the same thing
- When using other endpoints (search, add, chat), you can reference this cube using either term

**Semantic Clarification:**
- **Single mem_cube_id** (deprecated): Used in older endpoints to identify a single cube.
New code should use `readable_cube_ids` / `writable_cube_ids` lists instead.
- **readable_cube_ids**: List of cube IDs the user can read from (used in search/chat)
- **writable_cube_ids**: List of cube IDs the user can write to (used in add/chat)
"""
return await cube_handler.create_cube(request)


@router.post("/register_cube", summary="Register an existing memory cube", response_model=RegisterCubeResponse)
async def register_cube(request: RegisterCubeRequest) -> RegisterCubeResponse:
"""
Register an existing memory cube with the MOS system.

This method loads and registers a memory cube from a file path or creates a new one
if the path doesn't exist. The cube becomes available for memory operations.

**Note on cube_id vs mem_cube_id:**
These terms are used interchangeably throughout the API. The registered cube can then
be referenced by its cube_id/mem_cube_id in other endpoints.

**Current Status:**
This endpoint validates the registration request. Full registration functionality
requires architectural integration with MOSCore, which will be completed in a future update.
"""
return await cube_handler.register_cube(request)


# =============================================================================
# Scheduler API Endpoints
# =============================================================================
Expand Down
Loading
Loading