From 6c480fd9e52147ef5468f126e94b41bf951b7eab Mon Sep 17 00:00:00 2001 From: zhangbaojun Date: Wed, 3 Jun 2026 18:30:50 +0800 Subject: [PATCH 1/3] refactor(storage): delete agfs http mode client --- Cargo.lock | 204 +--- crates/ragfs/Cargo.toml | 15 - crates/ragfs/src/lib.rs | 6 +- crates/ragfs/src/server/config.rs | 125 -- crates/ragfs/src/server/handlers.rs | 606 --------- crates/ragfs/src/server/main.rs | 89 -- crates/ragfs/src/server/mod.rs | 9 - crates/ragfs/src/server/router.rs | 79 -- docs/en/faq/faq.md | 19 +- docs/en/guides/01-configuration.md | 3 + docs/zh/faq/faq.md | 12 +- docs/zh/guides/01-configuration.md | 3 + openviking/__init__.py | 2 +- openviking/eval/recorder/recorder.py | 5 +- openviking/eval/recorder/recording_client.py | 10 +- openviking/pyagfs/__init__.py | 5 +- openviking/pyagfs/async_client.py | 15 +- openviking/pyagfs/client.py | 1088 ----------------- openviking/pyagfs/helpers.py | 55 +- openviking/pyagfs/protocols.py | 67 + openviking/storage/queuefs/named_queue.py | 9 +- .../storage/transaction/lock_manager.py | 6 +- openviking/storage/transaction/path_lock.py | 4 +- openviking/storage/transaction/redo_log.py | 4 +- openviking/storage/viking_fs.py | 62 +- tests/server/test_api_sessions.py | 34 +- tests/session/test_session_context.py | 34 +- tests/test_prompt_manager.py | 136 +-- tests/utils/mock_agfs.py | 23 +- 29 files changed, 289 insertions(+), 2440 deletions(-) delete mode 100644 crates/ragfs/src/server/config.rs delete mode 100644 crates/ragfs/src/server/handlers.rs delete mode 100644 crates/ragfs/src/server/main.rs delete mode 100644 crates/ragfs/src/server/mod.rs delete mode 100644 crates/ragfs/src/server/router.rs delete mode 100644 openviking/pyagfs/client.py create mode 100644 openviking/pyagfs/protocols.py diff --git a/Cargo.lock b/Cargo.lock index eed73bc2f6..d126de163e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -724,61 +724,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "axum" -version = "0.7.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" -dependencies = [ - "async-trait", - "axum-core", - "bytes", - "futures-util", - "http 1.4.0", - "http-body 1.0.1", - "http-body-util", - "hyper 1.8.1", - "hyper-util", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "serde_json", - "serde_path_to_error", - "serde_urlencoded", - "sync_wrapper", - "tokio", - "tower", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "axum-core" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http 1.4.0", - "http-body 1.0.1", - "http-body-util", - "mime", - "pin-project-lite", - "rustversion", - "sync_wrapper", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "base16ct" version = "0.1.1" @@ -2367,7 +2312,6 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "httparse", - "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -2963,21 +2907,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "matchers" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" -dependencies = [ - "regex-automata", -] - -[[package]] -name = "matchit" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" - [[package]] name = "maybe-rayon" version = "0.1.1" @@ -3130,15 +3059,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0676bb32a98c1a483ce53e500a81ad9c3d5b3f7c920c28c24e9cb0980d0b5bc8" -[[package]] -name = "nu-ansi-term" -version = "0.50.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" -dependencies = [ - "windows-sys 0.61.2", -] - [[package]] name = "num-bigint" version = "0.4.6" @@ -3717,17 +3637,14 @@ dependencies = [ "aws-config", "aws-sdk-s3", "aws-types", - "axum", "bytes", "chrono", - "clap", "criterion", "futures", "grep", "grep-matcher", "grep-regex", "grep-searcher", - "hyper 1.8.1", "ignore", "lru", "path-clean", @@ -3736,15 +3653,11 @@ dependencies = [ "rusqlite", "serde", "serde_json", - "serde_yaml", "sqlx", "tempfile", "thiserror 1.0.69", "tokio", - "tower", - "tower-http 0.5.2", "tracing", - "tracing-subscriber", "uuid", ] @@ -4003,7 +3916,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.4", "tower", - "tower-http 0.6.8", + "tower-http", "tower-service", "url", "wasm-bindgen", @@ -4347,17 +4260,6 @@ dependencies = [ "zmij", ] -[[package]] -name = "serde_path_to_error" -version = "0.1.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" -dependencies = [ - "itoa", - "serde", - "serde_core", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -4370,19 +4272,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_yaml" -version = "0.9.34+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" -dependencies = [ - "indexmap", - "itoa", - "ryu", - "serde", - "unsafe-libyaml", -] - [[package]] name = "sha1" version = "0.10.6" @@ -4405,15 +4294,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sharded-slab" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" -dependencies = [ - "lazy_static", -] - [[package]] name = "shlex" version = "1.3.0" @@ -4916,15 +4796,6 @@ dependencies = [ "syn", ] -[[package]] -name = "thread_local" -version = "1.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" -dependencies = [ - "cfg-if", -] - [[package]] name = "tiff" version = "0.11.3" @@ -5089,24 +4960,6 @@ dependencies = [ "tokio", "tower-layer", "tower-service", - "tracing", -] - -[[package]] -name = "tower-http" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" -dependencies = [ - "bitflags", - "bytes", - "http 1.4.0", - "http-body 1.0.1", - "http-body-util", - "pin-project-lite", - "tower-layer", - "tower-service", - "tracing", ] [[package]] @@ -5169,49 +5022,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", - "valuable", -] - -[[package]] -name = "tracing-log" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - -[[package]] -name = "tracing-serde" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" -dependencies = [ - "serde", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.3.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" -dependencies = [ - "matchers", - "nu-ansi-term", - "once_cell", - "regex-automata", - "serde", - "serde_json", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", - "tracing-serde", ] [[package]] @@ -5306,12 +5116,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81e544489bf3d8ef66c953931f56617f423cd4b5494be343d9b9d3dda037b9a3" -[[package]] -name = "unsafe-libyaml" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" - [[package]] name = "untrusted" version = "0.9.0" @@ -5371,12 +5175,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "valuable" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" - [[package]] name = "vcpkg" version = "0.2.15" diff --git a/crates/ragfs/Cargo.toml b/crates/ragfs/Cargo.toml index 935589f954..790e0d5e43 100644 --- a/crates/ragfs/Cargo.toml +++ b/crates/ragfs/Cargo.toml @@ -14,10 +14,6 @@ categories = ["filesystem", "network-programming"] name = "ragfs" path = "src/lib.rs" -[[bin]] -name = "ragfs-server" -path = "src/server/main.rs" - [[bin]] name = "ragfs-shell" path = "src/shell/main.rs" @@ -27,23 +23,12 @@ path = "src/shell/main.rs" tokio = { version = "1.38", features = ["full"] } async-trait = "0.1" -# HTTP server -axum = "0.7" -tower = "0.5" -tower-http = { version = "0.5", features = ["trace", "cors"] } -hyper = "1.0" - # Serialization serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -serde_yaml = "0.9" - -# Configuration -clap = { version = "4.5", features = ["derive", "env"] } # Logging tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } # Path handling and filesystem path-clean = "1.0" diff --git a/crates/ragfs/src/lib.rs b/crates/ragfs/src/lib.rs index 8c345276a9..07335c0d57 100644 --- a/crates/ragfs/src/lib.rs +++ b/crates/ragfs/src/lib.rs @@ -2,15 +2,12 @@ //! //! RAGFS provides a unified filesystem abstraction that allows multiple //! filesystem implementations (plugins) to be mounted at different paths. -//! It exposes these filesystems through a REST API, making them accessible -//! to AI agents and other clients. +//! It is consumed in-process through the Rust binding (`ragfs-python`). //! //! # Architecture //! //! - **Core**: Fundamental traits and types (FileSystem, ServicePlugin, etc.) //! - **Plugins**: Filesystem implementations (MemFS, KVFS, QueueFS, etc.) -//! - **Server**: HTTP API server for remote access -//! - **Shell**: Interactive command-line interface //! //! # Example //! @@ -34,7 +31,6 @@ pub mod core; pub mod plugins; -pub mod server; // Re-export core types for convenience pub use core::{ diff --git a/crates/ragfs/src/server/config.rs b/crates/ragfs/src/server/config.rs deleted file mode 100644 index f8aea2ddab..0000000000 --- a/crates/ragfs/src/server/config.rs +++ /dev/null @@ -1,125 +0,0 @@ -//! Server configuration module -//! -//! This module handles server configuration including address binding, -//! logging levels, and other runtime settings. - -use clap::Parser; -use serde::{Deserialize, Serialize}; -use std::net::SocketAddr; - -/// Server configuration -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ServerConfig { - /// Server bind address - pub address: String, - - /// Log level (trace, debug, info, warn, error) - pub log_level: String, - - /// Enable CORS - pub enable_cors: bool, -} - -impl Default for ServerConfig { - fn default() -> Self { - Self { - address: "0.0.0.0:8080".to_string(), - log_level: "info".to_string(), - enable_cors: true, - } - } -} - -impl ServerConfig { - /// Parse server address into SocketAddr - pub fn socket_addr(&self) -> Result { - self.address.parse().map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::InvalidInput, - format!("Invalid address '{}': {}", self.address, e), - ) - }) - } -} - -/// Command-line arguments -#[derive(Debug, Parser)] -#[command(name = "ragfs-server")] -#[command(about = "RAGFS HTTP Server", long_about = None)] -pub struct Args { - /// Server bind address - #[arg(short, long, default_value = "0.0.0.0:8080", env = "RAGFS_ADDRESS")] - pub address: String, - - /// Log level - #[arg(short, long, default_value = "info", env = "RAGFS_LOG_LEVEL")] - pub log_level: String, - - /// Configuration file path (optional) - #[arg(short, long, env = "RAGFS_CONFIG")] - pub config: Option, - - /// Enable CORS - #[arg(long, default_value = "true", env = "RAGFS_ENABLE_CORS")] - pub enable_cors: bool, -} - -impl Args { - /// Convert Args to ServerConfig - pub fn to_config(&self) -> ServerConfig { - ServerConfig { - address: self.address.clone(), - log_level: self.log_level.clone(), - enable_cors: self.enable_cors, - } - } - - /// Load configuration from file if specified, otherwise use CLI args - pub fn load_config(&self) -> Result> { - if let Some(config_path) = &self.config { - // Load from YAML file - let content = std::fs::read_to_string(config_path)?; - let config: ServerConfig = serde_yaml::from_str(&content)?; - Ok(config) - } else { - // Use CLI args - Ok(self.to_config()) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_default_config() { - let config = ServerConfig::default(); - assert_eq!(config.address, "0.0.0.0:8080"); - assert_eq!(config.log_level, "info"); - assert!(config.enable_cors); - } - - #[test] - fn test_socket_addr_parsing() { - let config = ServerConfig { - address: "127.0.0.1:3000".to_string(), - log_level: "debug".to_string(), - enable_cors: false, - }; - - let addr = config.socket_addr().unwrap(); - assert_eq!(addr.port(), 3000); - } - - #[test] - fn test_invalid_socket_addr() { - let config = ServerConfig { - address: "invalid".to_string(), - log_level: "info".to_string(), - enable_cors: true, - }; - - assert!(config.socket_addr().is_err()); - } -} diff --git a/crates/ragfs/src/server/handlers.rs b/crates/ragfs/src/server/handlers.rs deleted file mode 100644 index 036c411802..0000000000 --- a/crates/ragfs/src/server/handlers.rs +++ /dev/null @@ -1,606 +0,0 @@ -//! HTTP handlers for RAGFS API -//! -//! This module implements all HTTP request handlers for the RAGFS REST API. - -use axum::{ - extract::{Query, State}, - http::StatusCode, - response::{IntoResponse, Response}, - Json, -}; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use crate::core::{FileSystem, FilesystemStats, GrepResult, MountableFS, PluginConfig, WriteFlag}; - -/// Shared application state -#[derive(Clone)] -pub struct AppState { - /// The mounted filesystem - pub fs: Arc, -} - -/// Standard API response -#[derive(Debug, Serialize)] -pub struct ApiResponse { - /// Whether the operation succeeded - pub success: bool, - /// Response data (if successful) - #[serde(skip_serializing_if = "Option::is_none")] - pub data: Option, - /// Error message (if failed) - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, -} - -impl ApiResponse { - /// Create a successful response - pub fn success(data: T) -> Self { - Self { - success: true, - data: Some(data), - error: None, - } - } - - /// Create an error response - pub fn error(message: impl Into) -> ApiResponse<()> { - ApiResponse { - success: false, - data: None, - error: Some(message.into()), - } - } -} - -/// Query parameters for file operations -#[derive(Debug, Deserialize)] -pub struct FileQuery { - /// File path - pub path: String, - /// Read offset in bytes - #[serde(default)] - pub offset: u64, - /// Number of bytes to read (0 = all) - #[serde(default)] - pub size: u64, -} - -/// Query parameters for directory operations -#[derive(Debug, Deserialize)] -pub struct DirQuery { - /// Directory path - pub path: String, - /// Optional permission mode (octal string, e.g. "755") - pub mode: Option, -} - -/// Query parameters for statistics operations -#[derive(Debug, Deserialize)] -pub struct StatsQuery { - /// Mount path (optional, if not provided returns all stats) - pub path: Option, -} - -/// Query parameters for tree (recursive directory listing) operations -#[derive(Debug, Deserialize)] -pub struct TreeQuery { - /// Directory path to traverse - pub path: String, - /// Whether to include hidden entries (names starting with '.') - #[serde(default)] - pub show_hidden: bool, - /// Maximum number of nodes to return (None = unlimited) - #[serde(default)] - pub node_limit: Option, - /// Maximum depth relative to the query root (None = unlimited) - #[serde(default)] - pub level_limit: Option, -} - -/// Statistics response for a single mount -#[derive(Debug, Serialize)] -pub struct MountStats { - /// Mount path - pub path: String, - /// Plugin name - pub plugin: String, - /// Filesystem statistics - pub stats: FilesystemStats, -} - -/// Statistics response for all mounts -#[derive(Debug, Serialize)] -pub struct AllStatsResponse { - /// Statistics for all mounts - pub mounts: Vec, -} - -/// Request body for mount operation -#[derive(Debug, Deserialize)] -pub struct MountRequest { - /// Plugin name - pub plugin: String, - /// Mount path - pub path: String, - /// Plugin configuration parameters - #[serde(default)] - pub params: std::collections::HashMap, -} - -/// Request body for unmount operation -#[derive(Debug, Deserialize)] -pub struct UnmountRequest { - /// Mount path to unmount - pub path: String, -} - -/// Request body for grep operation -#[derive(Debug, Deserialize)] -pub struct GrepRequest { - /// File or directory path to search - pub path: String, - /// Regular expression pattern - pub pattern: String, - /// Whether to search recursively - #[serde(default)] - pub recursive: bool, - /// Whether to perform case-insensitive matching - #[serde(default)] - pub case_insensitive: bool, - /// Whether to stream results (currently unsupported by ragfs server) - #[serde(default)] - pub stream: bool, - /// Maximum number of matches to return - #[serde(default)] - pub node_limit: Option, - /// Optional path prefix to exclude from search - #[serde(default)] - pub exclude_path: Option, - /// Optional maximum depth relative to query root - #[serde(default)] - pub level_limit: Option, -} - -/// Health check response -#[derive(Debug, Serialize)] -pub struct HealthResponse { - /// Health status - pub status: String, - /// Server version - pub version: String, -} - -/// Mount info response -#[derive(Debug, Serialize)] -pub struct MountInfo { - /// Mount path - pub path: String, - /// Plugin name - pub plugin: String, -} - -// ============================================================================ -// File Operations Handlers -// ============================================================================ - -/// GET /api/v1/files - Read file -pub async fn read_file(State(state): State, Query(query): Query) -> Response { - match state.fs.read(&query.path, query.offset, query.size).await { - Ok(data) => (StatusCode::OK, data).into_response(), - Err(e) => ( - StatusCode::NOT_FOUND, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -/// PUT /api/v1/files - Write file -pub async fn write_file( - State(state): State, - Query(query): Query, - body: bytes::Bytes, -) -> Response { - match state - .fs - .write(&query.path, &body, query.offset, WriteFlag::None) - .await - { - Ok(written) => ( - StatusCode::OK, - Json(ApiResponse::success(serde_json::json!({ - "bytes_written": written - }))), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -/// POST /api/v1/files - Create file -pub async fn create_file( - State(state): State, - Query(query): Query, -) -> Response { - match state.fs.create(&query.path).await { - Ok(_) => ( - StatusCode::CREATED, - Json(ApiResponse::success(serde_json::json!({ - "path": query.path - }))), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -/// DELETE /api/v1/files - Delete file -pub async fn delete_file( - State(state): State, - Query(query): Query, -) -> Response { - match state.fs.remove(&query.path).await { - Ok(_) => ( - StatusCode::OK, - Json(ApiResponse::success(serde_json::json!({ - "path": query.path - }))), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -/// GET /api/v1/stat - Get file metadata -pub async fn stat_file(State(state): State, Query(query): Query) -> Response { - match state.fs.stat(&query.path).await { - Ok(info) => (StatusCode::OK, Json(ApiResponse::success(info))).into_response(), - Err(e) => ( - StatusCode::NOT_FOUND, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -// ============================================================================ -// Directory Operations Handlers -// ============================================================================ - -/// GET /api/v1/directories - List directory -pub async fn list_directory( - State(state): State, - Query(query): Query, -) -> Response { - match state.fs.read_dir(&query.path).await { - Ok(entries) => (StatusCode::OK, Json(ApiResponse::success(entries))).into_response(), - Err(e) => ( - StatusCode::NOT_FOUND, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -/// POST /api/v1/directories - Create directory -pub async fn create_directory( - State(state): State, - Query(query): Query, -) -> Response { - match state.fs.mkdir(&query.path, 0o755).await { - Ok(_) => ( - StatusCode::CREATED, - Json(ApiResponse::success(serde_json::json!({ - "path": query.path - }))), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -/// POST /api/v1/directories/ensure-parent - Ensure parent directories exist -pub async fn ensure_parent_dirs( - State(state): State, - Query(query): Query, -) -> Response { - let mode = query.mode.and_then(|m| u32::from_str_radix(&m, 8).ok()).unwrap_or(0o755); - match state.fs.ensure_parent_dirs(&query.path, mode).await { - Ok(_) => ( - StatusCode::OK, - Json(ApiResponse::success(serde_json::json!({ - "path": query.path - }))), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -// ============================================================================ -// Mount Management Handlers -// ============================================================================ - -/// GET /api/v1/mounts - List all mounts -pub async fn list_mounts(State(state): State) -> Response { - let mounts = state.fs.list_mounts().await; - let mount_infos: Vec = mounts - .into_iter() - .map(|(path, plugin)| MountInfo { path, plugin }) - .collect(); - - (StatusCode::OK, Json(ApiResponse::success(mount_infos))).into_response() -} - -/// POST /api/v1/mount - Mount a filesystem -pub async fn mount_filesystem( - State(state): State, - Json(req): Json, -) -> Response { - // Convert JSON params to ConfigValue - let params = req - .params - .into_iter() - .map(|(k, v)| { - let config_value = match v { - serde_json::Value::String(s) => crate::core::ConfigValue::String(s), - serde_json::Value::Number(n) => { - if let Some(i) = n.as_i64() { - crate::core::ConfigValue::Int(i) - } else { - crate::core::ConfigValue::String(n.to_string()) - } - } - serde_json::Value::Bool(b) => crate::core::ConfigValue::Bool(b), - serde_json::Value::Array(arr) => { - let strings: Vec = arr - .into_iter() - .filter_map(|v| v.as_str().map(|s| s.to_string())) - .collect(); - crate::core::ConfigValue::StringList(strings) - } - _ => crate::core::ConfigValue::String(v.to_string()), - }; - (k, config_value) - }) - .collect(); - - let config = PluginConfig { - name: req.plugin.clone(), - mount_path: req.path.clone(), - params, - }; - - match state.fs.mount(config).await { - Ok(_) => ( - StatusCode::OK, - Json(ApiResponse::success(serde_json::json!({ - "plugin": req.plugin, - "path": req.path - }))), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -/// POST /api/v1/unmount - Unmount a filesystem -pub async fn unmount_filesystem( - State(state): State, - Json(req): Json, -) -> Response { - match state.fs.unmount(&req.path).await { - Ok(_) => ( - StatusCode::OK, - Json(ApiResponse::success(serde_json::json!({ - "path": req.path - }))), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -// ============================================================================ -// Health Check Handler -// ============================================================================ - -/// POST /api/v1/grep - Search file content -pub async fn grep_content(State(state): State, Json(req): Json) -> Response { - if req.stream { - return ( - StatusCode::NOT_IMPLEMENTED, - Json(ApiResponse::<()>::error( - "streaming grep is not supported by the ragfs HTTP server", - )), - ) - .into_response(); - } - - match state - .fs - .grep( - &req.path, - &req.pattern, - req.recursive, - req.case_insensitive, - req.node_limit, - req.exclude_path.as_deref(), - req.level_limit, - ) - .await - { - Ok(result) => (StatusCode::OK, Json(ApiResponse::success(result))).into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiResponse::::error(e.to_string())), - ) - .into_response(), - } -} - -/// GET /api/v1/tree - Recursively list a directory tree. -/// -/// Mirrors the binding-side `tree_directory` so HTTP clients have the same -/// surface as the native binding client. -pub async fn tree_directory( - State(state): State, - Query(query): Query, -) -> Response { - match state - .fs - .tree_directory( - &query.path, - query.show_hidden, - query.node_limit, - query.level_limit, - ) - .await - { - Ok(entries) => (StatusCode::OK, Json(ApiResponse::success(entries))).into_response(), - Err(e) => ( - StatusCode::NOT_FOUND, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } -} - -/// GET /api/v1/health - Health check -pub async fn health_check() -> Response { - let response = HealthResponse { - status: "healthy".to_string(), - version: crate::VERSION.to_string(), - }; - - (StatusCode::OK, Json(ApiResponse::success(response))).into_response() -} - -// ============================================================================ -// Statistics Handlers -// ============================================================================ - -/// GET /api/v1/stats - Get filesystem statistics -pub async fn get_stats(State(state): State, Query(query): Query) -> Response { - if let Some(path) = query.path { - // Get stats for a specific mount - match state.fs.get_mount_stats(&path).await { - Ok(stats) => { - let mounts = state.fs.list_mounts().await; - let plugin_name = mounts - .into_iter() - .find(|(p, _)| p == &path) - .map(|(_, plugin)| plugin) - .unwrap_or_default(); - - let mount_stats = MountStats { - path, - plugin: plugin_name, - stats, - }; - - (StatusCode::OK, Json(ApiResponse::success(mount_stats))).into_response() - } - Err(e) => ( - StatusCode::NOT_FOUND, - Json(ApiResponse::<()>::error(e.to_string())), - ) - .into_response(), - } - } else { - // Get stats for all mounts - let all_stats = state.fs.get_all_stats().await; - let mounts: Vec = all_stats - .into_iter() - .map(|(path, (plugin, stats))| MountStats { path, plugin, stats }) - .collect(); - - let response = AllStatsResponse { mounts }; - (StatusCode::OK, Json(ApiResponse::success(response))).into_response() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use axum::extract::State; - use std::collections::HashMap; - - use crate::plugins::LocalFSPlugin; - - #[tokio::test] - async fn test_grep_content_forwards_exclude_path_and_level_limit() { - let fs = Arc::new(MountableFS::new()); - fs.register_plugin(LocalFSPlugin::new()).await; - - let temp = tempfile::TempDir::new().unwrap(); - std::fs::create_dir_all(temp.path().join("excluded")).unwrap(); - std::fs::create_dir_all(temp.path().join("ok")).unwrap(); - std::fs::write(temp.path().join("excluded/x.txt"), "hit\n").unwrap(); - std::fs::write(temp.path().join("ok/y.txt"), "hit\n").unwrap(); - - fs.mount(PluginConfig { - name: "localfs".to_string(), - mount_path: "/local".to_string(), - params: HashMap::from([( - "local_dir".to_string(), - crate::core::ConfigValue::String(temp.path().to_string_lossy().to_string()), - )]), - }) - .await - .unwrap(); - - let response = grep_content( - State(AppState { fs: fs.clone() }), - Json(GrepRequest { - path: "/local".to_string(), - pattern: "hit".to_string(), - recursive: true, - case_insensitive: false, - stream: false, - node_limit: Some(1), - exclude_path: Some("/local/excluded".to_string()), - level_limit: Some(5), - }), - ) - .await; - - let body = axum::body::to_bytes(response.into_body(), usize::MAX) - .await - .unwrap(); - let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap(); - - assert_eq!(parsed["success"], true); - assert_eq!(parsed["data"]["count"], 1); - assert_eq!(parsed["data"]["matches"][0]["file"], "ok/y.txt"); - } -} diff --git a/crates/ragfs/src/server/main.rs b/crates/ragfs/src/server/main.rs deleted file mode 100644 index 3af9824d2e..0000000000 --- a/crates/ragfs/src/server/main.rs +++ /dev/null @@ -1,89 +0,0 @@ -//! RAGFS Server -//! -//! HTTP server that exposes the RAGFS filesystem through a REST API. - -use clap::Parser; -use ragfs::core::MountableFS; -use ragfs::plugins::{KVFSPlugin, MemFSPlugin, QueueFSPlugin, SQLFSPlugin}; -#[cfg(feature = "s3")] -use ragfs::plugins::S3FSPlugin; -use ragfs::server::{create_router, AppState, Args}; -use std::sync::Arc; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - -#[tokio::main] -async fn main() -> Result<(), Box> { - // Parse command-line arguments - let args = Args::parse(); - - // Load configuration - let config = args.load_config()?; - - // Initialize tracing/logging - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| config.log_level.clone().into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); - - tracing::info!("Starting RAGFS Server v{}", ragfs::VERSION); - tracing::info!("Configuration: {:?}", config); - - // Create MountableFS - let fs = Arc::new(MountableFS::new()); - - // Register built-in plugins - tracing::info!("Registering plugins..."); - fs.register_plugin(MemFSPlugin).await; - tracing::info!(" - memfs: In-memory file system"); - fs.register_plugin(KVFSPlugin).await; - tracing::info!(" - kvfs: Key-value file system"); - fs.register_plugin(QueueFSPlugin::new()).await; - tracing::info!(" - queuefs: Message queue file system"); - fs.register_plugin(SQLFSPlugin::new()).await; - tracing::info!(" - sqlfs: Database-backed file system (SQLite)"); - #[cfg(feature = "s3")] - { - fs.register_plugin(S3FSPlugin::new()).await; - tracing::info!(" - s3fs: S3-backed file system"); - } - - // Create application state - let state = AppState { fs: fs.clone() }; - - // Create router - let app = create_router(state, config.enable_cors); - - // Parse socket address - let addr = config.socket_addr()?; - - tracing::info!("Server listening on {}", addr); - tracing::info!("API endpoints:"); - tracing::info!(" GET /api/v1/health"); - tracing::info!(" GET /api/v1/files?path="); - tracing::info!(" PUT /api/v1/files?path="); - tracing::info!(" POST /api/v1/files?path="); - tracing::info!(" DELETE /api/v1/files?path="); - tracing::info!(" GET /api/v1/stat?path="); - tracing::info!(" GET /api/v1/directories?path="); - tracing::info!(" POST /api/v1/directories?path="); - tracing::info!(" GET /api/v1/mounts"); - tracing::info!(" POST /api/v1/mount"); - tracing::info!(" POST /api/v1/unmount"); - tracing::info!(" POST /api/v1/grep"); - tracing::info!(""); - tracing::info!("Example: Mount MemFS"); - tracing::info!(" curl -X POST http://{}/api/v1/mount \\", addr); - tracing::info!(" -H 'Content-Type: application/json' \\"); - tracing::info!(" -d '{{\"plugin\": \"memfs\", \"path\": \"/memfs\"}}'"); - - // Create TCP listener - let listener = tokio::net::TcpListener::bind(addr).await?; - - // Start server - axum::serve(listener, app).await?; - - Ok(()) -} diff --git a/crates/ragfs/src/server/mod.rs b/crates/ragfs/src/server/mod.rs deleted file mode 100644 index 832c4a5a27..0000000000 --- a/crates/ragfs/src/server/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -//! Server module for RAGFS HTTP API - -pub mod config; -pub mod handlers; -pub mod router; - -pub use config::{Args, ServerConfig}; -pub use handlers::AppState; -pub use router::create_router; diff --git a/crates/ragfs/src/server/router.rs b/crates/ragfs/src/server/router.rs deleted file mode 100644 index 3794510132..0000000000 --- a/crates/ragfs/src/server/router.rs +++ /dev/null @@ -1,79 +0,0 @@ -//! Router configuration for RAGFS HTTP server -//! -//! This module sets up all the routes and middleware for the API. - -use axum::{ - routing::{delete, get, post, put}, - Router, -}; -use tower_http::{ - cors::CorsLayer, - trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}, -}; -use tracing::Level; - -use super::handlers::{ - create_directory, create_file, delete_file, ensure_parent_dirs, get_stats, grep_content, - health_check, list_directory, list_mounts, mount_filesystem, read_file, stat_file, - tree_directory, unmount_filesystem, write_file, AppState, -}; - -/// Create the main application router -pub fn create_router(state: AppState, enable_cors: bool) -> Router { - let api_routes = Router::new() - // File operations - .route("/files", get(read_file)) - .route("/files", put(write_file)) - .route("/files", post(create_file)) - .route("/files", delete(delete_file)) - .route("/stat", get(stat_file)) - // Directory operations - .route("/directories", get(list_directory)) - .route("/directories", post(create_directory)) - .route("/directories/ensure-parent", post(ensure_parent_dirs)) - // Tree (recursive directory listing) - .route("/tree", get(tree_directory)) - // Mount management - .route("/mounts", get(list_mounts)) - .route("/mount", post(mount_filesystem)) - .route("/unmount", post(unmount_filesystem)) - // Search operations - .route("/grep", post(grep_content)) - // Statistics - .route("/stats", get(get_stats)) - // Health check - .route("/health", get(health_check)); - - let app = Router::new().nest("/api/v1", api_routes).with_state(state); - - // Add tracing middleware - let app = app.layer( - TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) - .on_response(DefaultOnResponse::new().level(Level::INFO)), - ); - - // Add CORS if enabled - if enable_cors { - app.layer(CorsLayer::permissive()) - } else { - app - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::core::MountableFS; - use std::sync::Arc; - - #[test] - fn test_router_creation() { - let state = AppState { - fs: Arc::new(MountableFS::new()), - }; - - let _router = create_router(state, true); - // If this compiles and runs, the router is correctly configured - } -} diff --git a/docs/en/faq/faq.md b/docs/en/faq/faq.md index 15c6a17e30..d9df0cd889 100644 --- a/docs/en/faq/faq.md +++ b/docs/en/faq/faq.md @@ -62,18 +62,23 @@ viking:// - VLM (Vision Language Model): For multimodal content processing and semantic extraction - Rerank model: For improved retrieval precision -### What are `binding-client` and `http-client`? Which one should I choose? +### How does OpenViking access the AGFS filesystem? -- **`binding-client` (Default)**: Runs AGFS logic directly within the Python process via CGO bindings. Advantages: extremely high performance, zero network latency; Disadvantages: requires a compiled AGFS shared library locally. -- **`http-client`**: Communicates with a standalone `agfs-server` via HTTP. Advantages: decoupled deployment, no local Go compilation needed; Disadvantages: some network communication overhead. +OpenViking runs the RAGFS filesystem in-process through the Rust binding +(`ragfs_python` / `RAGFSBindingClient`). The binding executes filesystem logic +directly within the Python process, giving extremely high performance and zero +network latency. A compiled RAGFS shared library must be available locally +(shipped in the prebuilt Wheel, or built from source). -If your environment supports Go compilation or you've installed a Wheel package containing pre-compiled libraries, the default `binding-client` is recommended. +> [!WARNING] +> OpenViking no longer supports the AGFS HTTP client mode. AGFS / RAGFS filesystem access now happens only through the in-process Rust binding (`RAGFSBindingClient`). This does not affect the OpenViking server HTTP API, the `ov` CLI, or `AsyncHTTPClient` / `SyncHTTPClient` when they connect to an OpenViking server. ### What should I do if I encounter "AGFS binding library not found"? -This usually means the AGFS shared library is not pre-built in your environment. You can: -1. **Re-compile and install**: Run `pip install -e . --force-reinstall` in the project root (requires Go environment). -2. **Switch to HTTP mode**: Set `storage.agfs.mode = "http-client"` in your `ov.conf` and ensure an `agfs-server` is running. +This usually means the RAGFS shared library is not available in your +environment. Re-compile and install it by running +`pip install -e . --force-reinstall` in the project root (requires a Rust +toolchain). ### How do I install/upgrade OpenViking? diff --git a/docs/en/guides/01-configuration.md b/docs/en/guides/01-configuration.md index 4043e1cadc..14a3de9626 100644 --- a/docs/en/guides/01-configuration.md +++ b/docs/en/guides/01-configuration.md @@ -886,6 +886,9 @@ Storage configuration for context data, including file storage (RAGFS) and vecto RAGFS uses Rust binding mode by default, directly accessing the file system through the Rust implementation. +> [!WARNING] +> `storage.agfs` no longer supports the AGFS HTTP client mode, and the old HTTP client entry should not be configured anymore. AGFS / RAGFS filesystem access now happens only through the in-process Rust binding (`RAGFSBindingClient`). This does not affect the OpenViking server HTTP API, the `ov` CLI, or `AsyncHTTPClient` / `SyncHTTPClient` when they connect to an OpenViking server. + ##### QueueFS Configuration | Parameter | Type | Description | Default | diff --git a/docs/zh/faq/faq.md b/docs/zh/faq/faq.md index a1cf0d3468..c1c7f6c66b 100644 --- a/docs/zh/faq/faq.md +++ b/docs/zh/faq/faq.md @@ -62,18 +62,16 @@ viking:// - VLM(视觉语言模型):用于多模态内容处理和语义提取 - Rerank 模型:用于提升检索精度 -### 什么是 `binding-client` 和 `http-client`?我该选哪个? +### OpenViking 是如何访问 AGFS 文件系统的? -- **`binding-client`(默认值)**:通过 CGO 绑定直接在 Python 进程内运行 AGFS 逻辑。优点是性能极高,无网络延迟;缺点是需要本地有编译好的 AGFS 共享库。 -- **`http-client`**:通过 HTTP 协议与独立的 `agfs-server` 通信。优点是部署解耦,不需要本地编译 Go 代码;缺点是有一定的网络通信开销。 +OpenViking 通过 Rust 绑定(`ragfs_python` / `RAGFSBindingClient`)在 Python 进程内直接运行 RAGFS 文件系统逻辑。优点是性能极高、无网络延迟;前提是本地需要有编译好的 RAGFS 共享库(预编译 Wheel 包内置,或从源码编译)。 -如果你的环境支持编译 Go 代码,或者安装了包含预编译库的 Wheel 包,推荐使用默认的 `binding-client`。 +> [!WARNING] +> OpenViking 已不再支持 AGFS HTTP client 模式。当前 AGFS / RAGFS 文件系统访问仅通过 Rust binding(`RAGFSBindingClient`)在进程内完成。这不影响 OpenViking server 的 HTTP API、`ov` CLI,或 `AsyncHTTPClient` / `SyncHTTPClient` 访问 OpenViking 服务端的能力。 ### 遇到 "AGFS binding library not found" 错误怎么办? -这通常是因为本地没有编译好的 AGFS 共享库。你可以: -1. **重新编译安装**:在项目根目录运行 `pip install -e . --force-reinstall`(需要 Go 环境)。 -2. **切换到 HTTP 模式**:在 `ov.conf` 中设置 `storage.agfs.mode = "http-client"`,并确保有一个正在运行的 `agfs-server`。 +这通常是因为本地没有可用的 RAGFS 共享库。在项目根目录运行 `pip install -e . --force-reinstall` 重新编译安装即可(需要 Rust 工具链)。 ### 如何安装 OpenViking? diff --git a/docs/zh/guides/01-configuration.md b/docs/zh/guides/01-configuration.md index 393fe42647..2e620d7a5e 100644 --- a/docs/zh/guides/01-configuration.md +++ b/docs/zh/guides/01-configuration.md @@ -857,6 +857,9 @@ AST 提取支持:Python、JavaScript/TypeScript、Rust、Go、Java、C/C++。 RAGFS 默认使用 Rust binding 模式,通过 Rust 实现直接访问文件系统。 +> [!WARNING] +> `storage.agfs` 已不再支持 AGFS HTTP client 模式,也无需再配置旧的 HTTP client 入口。当前 AGFS / RAGFS 文件系统访问仅通过 Rust binding(`RAGFSBindingClient`)在进程内完成。这不影响 OpenViking server 的 HTTP API、`ov` CLI,或 `AsyncHTTPClient` / `SyncHTTPClient` 访问 OpenViking 服务端的能力。 + ##### QueueFS 配置 | 参数 | 类型 | 说明 | 默认值 | diff --git a/openviking/__init__.py b/openviking/__init__.py index 7805ea0a84..f906a9d306 100644 --- a/openviking/__init__.py +++ b/openviking/__init__.py @@ -19,7 +19,7 @@ __version__ = "0.0.0+unknown" try: - from openviking.pyagfs import AGFSClient + from openviking.pyagfs import get_binding_client except ImportError as exc: raise ImportError( "Bundled OpenViking AGFS client is unavailable. " diff --git a/openviking/eval/recorder/recorder.py b/openviking/eval/recorder/recorder.py index 03f60a9cf5..3aa64d5ef0 100644 --- a/openviking/eval/recorder/recorder.py +++ b/openviking/eval/recorder/recorder.py @@ -340,13 +340,14 @@ def create_recording_agfs_client(agfs_client: Any, record_file: Optional[str] = Usage: from openviking.eval.recorder import init_recorder, create_recording_agfs_client - from openviking.pyagfs import AGFSClient + from openviking.pyagfs import get_binding_client # Initialize recorder init_recorder(enabled=True) # Create recording client - base_client = AGFSClient(api_base_url="http://localhost:1833") + BindingClient, _ = get_binding_client() + base_client = BindingClient() recording_client = create_recording_agfs_client(base_client) # Use in VikingFS diff --git a/openviking/eval/recorder/recording_client.py b/openviking/eval/recorder/recording_client.py index a63ce91fa0..180ad45a5a 100644 --- a/openviking/eval/recorder/recording_client.py +++ b/openviking/eval/recorder/recording_client.py @@ -3,7 +3,7 @@ """ Recording AGFS Client wrapper. -Wraps AGFSClient to record all IO operations for later playback. +Wraps an AGFS binding client to record all IO operations for later playback. """ import time @@ -25,13 +25,13 @@ class RecordingAGFSClient: to a file for later playback and performance analysis. Usage: - from openviking.pyagfs import AGFSClient - from openviking.eval.recorder.recording_client import RecordingAGFSClient + from openviking.pyagfs import get_binding_client - base_client = AGFSClient(api_base_url="http://localhost:1833") + BindingClient, _ = get_binding_client() + base_client = BindingClient() recording_client = RecordingAGFSClient(base_client, "./records/io_recorder.jsonl") - # Use recording_client as you would use AGFSClient + # Use recording_client as you would use the binding client result = recording_client.ls("/") # Stop recording when done diff --git a/openviking/pyagfs/__init__.py b/openviking/pyagfs/__init__.py index 989153083a..1c2145ae3b 100644 --- a/openviking/pyagfs/__init__.py +++ b/openviking/pyagfs/__init__.py @@ -11,7 +11,6 @@ from pathlib import Path from .async_client import AsyncAGFSClient -from .client import AGFSClient, FileHandle from .exceptions import ( AGFSAlreadyExistsError, AGFSClientError, @@ -37,6 +36,7 @@ AGFSTimeoutError, ) from .helpers import cp, download, upload +from .protocols import AGFSSyncClientProtocol _logger = logging.getLogger(__name__) @@ -146,11 +146,10 @@ def get_binding_client(): BindingFileHandle = None __all__ = [ - "AGFSClient", "AsyncAGFSClient", + "AGFSSyncClientProtocol", "AGFSBindingClient", "RAGFSBindingClient", - "FileHandle", "BindingFileHandle", "get_binding_client", "AGFSClientError", diff --git a/openviking/pyagfs/async_client.py b/openviking/pyagfs/async_client.py index 87e243efaa..d7f4b7e39a 100644 --- a/openviking/pyagfs/async_client.py +++ b/openviking/pyagfs/async_client.py @@ -8,20 +8,23 @@ from collections.abc import Iterator from typing import Any, BinaryIO, Dict, List, Union +from .protocols import AGFSSyncClientProtocol + class AsyncAGFSClient: - """Run blocking AGFS client operations off the event loop. + """Run blocking AGFS binding client operations off the event loop. - This is intentionally a thin adapter over the existing synchronous client. - If AGFS later provides native async methods, they can be swapped in here - without changing storage and transaction call sites. + This is intentionally a thin adapter over the synchronous RAGFS binding + client (``RAGFSBindingClient``). If the binding later provides native async + methods, they can be swapped in here without changing storage and + transaction call sites. """ - def __init__(self, client: Any): + def __init__(self, client: AGFSSyncClientProtocol): self._client = client @property - def sync_client(self) -> Any: + def sync_client(self) -> AGFSSyncClientProtocol: return self._client async def run(self, method_name: str, /, *args: Any, **kwargs: Any) -> Any: diff --git a/openviking/pyagfs/client.py b/openviking/pyagfs/client.py deleted file mode 100644 index 2af379d3fc..0000000000 --- a/openviking/pyagfs/client.py +++ /dev/null @@ -1,1088 +0,0 @@ -"""AGFS Server API Client""" - -import time -from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Union - -import requests -from requests.exceptions import ConnectionError, Timeout - -from .exceptions import AGFSClientError, AGFSHTTPError, AGFSNotSupportedError - - -class AGFSClient: - """Client for interacting with AGFS (Plugin-based File System) Server API""" - - def __init__(self, api_base_url="http://localhost:8080", timeout=10): - """ - Initialize AGFS client. - - Args: - api_base_url: API base URL. Can be either full URL with "/api/v1" or just the base. - If "/api/v1" is not present, it will be automatically appended. - e.g., "http://localhost:8080" or "http://localhost:8080/api/v1" - timeout: Request timeout in seconds (default: 10) - """ - api_base_url = api_base_url.rstrip("/") - # Auto-append /api/v1 if not present - if not api_base_url.endswith("/api/v1"): - api_base_url = api_base_url + "/api/v1" - self.api_base = api_base_url - self.session = requests.Session() - self.timeout = timeout - - def _handle_request_error(self, e: Exception, operation: str = "request") -> None: - """Convert request exceptions to user-friendly error messages""" - if isinstance(e, ConnectionError): - # Extract host and port from the error message - url_parts = self.api_base.split("://") - if len(url_parts) > 1: - host_port = url_parts[1].split("/")[0] - else: - host_port = "server" - raise AGFSClientError(f"Connection refused - server not running at {host_port}") - elif isinstance(e, Timeout): - raise AGFSClientError(f"Request timeout after {self.timeout}s") - elif isinstance(e, requests.exceptions.HTTPError): - # Extract useful error information from response - if hasattr(e, "response") and e.response is not None: - status_code = e.response.status_code - - # Special handling for 501 Not Implemented - always raise typed error - if status_code == 501: - try: - error_data = e.response.json() - error_msg = error_data.get("error", "Operation not supported") - except (ValueError, KeyError, TypeError): - error_msg = "Operation not supported" - raise AGFSNotSupportedError(error_msg) - - # Try to get error message from JSON response first - error_msg = None - try: - error_data = e.response.json() - error_msg = error_data.get("error", "") - except (ValueError, KeyError, TypeError): - pass - - # Always use AGFSHTTPError to preserve status_code - if error_msg: - raise AGFSHTTPError(error_msg, status_code) - elif status_code == 404: - raise AGFSHTTPError("No such file or directory", status_code) - elif status_code == 403: - raise AGFSHTTPError("Permission denied", status_code) - elif status_code == 409: - raise AGFSHTTPError("Resource already exists", status_code) - elif status_code == 500: - raise AGFSHTTPError("Internal server error", status_code) - elif status_code == 502: - raise AGFSHTTPError("Bad Gateway - backend service unavailable", status_code) - else: - raise AGFSHTTPError(f"HTTP error {status_code}", status_code) - else: - raise AGFSHTTPError("HTTP error", None) - else: - # For other exceptions, re-raise with simplified message - raise AGFSClientError(str(e)) - - def health(self) -> Dict[str, Any]: - """Check server health""" - response = self.session.get(f"{self.api_base}/health", timeout=self.timeout) - response.raise_for_status() - return response.json() - - def get_capabilities(self) -> Dict[str, Any]: - """Get server capabilities - - Returns: - Dict containing 'version' and 'features' list. - e.g., {'version': '1.4.0', 'features': ['handlefs', 'grep', ...]} - """ - try: - response = self.session.get(f"{self.api_base}/capabilities", timeout=self.timeout) - - # If capabilities endpoint doesn't exist (older server), return empty capabilities - if response.status_code == 404: - return {"version": "unknown", "features": []} - - response.raise_for_status() - return response.json() - except Exception as e: - # If capabilities check fails, treat it as unknown/empty rather than error - # unless it's a connection error - if isinstance(e, ConnectionError): - self._handle_request_error(e) - return {"version": "unknown", "features": []} - - def ls(self, path: str = "/") -> List[Dict[str, Any]]: - """List directory contents""" - try: - response = self.session.get( - f"{self.api_base}/directories", params={"path": path}, timeout=self.timeout - ) - response.raise_for_status() - data = response.json() - files = data.get("files") - return files if files is not None else [] - except Exception as e: - self._handle_request_error(e) - - def tree_directory( - self, - path: str, - show_hidden: bool = False, - node_limit: Optional[int] = None, - level_limit: Optional[int] = None, - ) -> List[Dict[str, Any]]: - """Recursively list a directory tree. - - Mirrors the native binding client's ``tree_directory`` so the HTTP - client exposes the same surface. - - Args: - path: Directory path to traverse. - show_hidden: Include hidden entries (names starting with '.'). - node_limit: Maximum number of nodes to return (None = unlimited). - level_limit: Maximum depth relative to the query root (None = unlimited). - - Returns: - List of tree entry dicts (path, rel_path, info, extra). - """ - try: - params: Dict[str, Any] = {"path": path} - if show_hidden: - params["show_hidden"] = "true" - if node_limit is not None: - params["node_limit"] = str(node_limit) - if level_limit is not None: - params["level_limit"] = str(level_limit) - response = self.session.get( - f"{self.api_base}/tree", params=params, timeout=self.timeout - ) - response.raise_for_status() - data = response.json() - entries = data.get("data") - return entries if entries is not None else [] - except Exception as e: - self._handle_request_error(e) - - def read(self, path: str, offset: int = 0, size: int = -1, stream: bool = False): - return self.cat(path, offset, size, stream) - - def cat(self, path: str, offset: int = 0, size: int = -1, stream: bool = False): - """Read file content with optional offset and size - - Args: - path: File path - offset: Starting position (default: 0) - size: Number of bytes to read (default: -1, read all) - stream: Enable streaming mode for continuous reads (default: False) - - Returns: - If stream=False: bytes content - If stream=True: Response object for iteration - """ - try: - params = {"path": path} - - if stream: - params["stream"] = "true" - # Streaming mode - return response object for iteration - response = self.session.get( - f"{self.api_base}/files", - params=params, - stream=True, - timeout=None, # No timeout for streaming - ) - response.raise_for_status() - return response - else: - # Normal mode - return content - if offset > 0: - params["offset"] = str(offset) - if size >= 0: - params["size"] = str(size) - - response = self.session.get( - f"{self.api_base}/files", params=params, timeout=self.timeout - ) - response.raise_for_status() - return response.content - except Exception as e: - self._handle_request_error(e) - - def write( - self, path: str, data: Union[bytes, Iterator[bytes], BinaryIO], max_retries: int = 3 - ) -> str: - """Write data to file and return the response message - - Args: - path: Path to write the file - data: File content as bytes, iterator of bytes, or file-like object - max_retries: Maximum number of retry attempts (default: 3) - - Returns: - Response message from server - """ - # Calculate timeout based on file size (if known) - # For streaming data, use a larger default timeout - if isinstance(data, bytes): - data_size_mb = len(data) / (1024 * 1024) - write_timeout = max(10, min(300, int(data_size_mb * 1 + 10))) - else: - # For streaming/unknown size, use no timeout - write_timeout = None - - last_error = None - - for attempt in range(max_retries + 1): - try: - response = self.session.put( - f"{self.api_base}/files", - params={"path": path}, - data=data, # requests supports bytes, iterator, or file-like object - timeout=write_timeout, - ) - response.raise_for_status() - result = response.json() - - # If we succeeded after retrying, let user know - if attempt > 0: - print(f"✓ Upload succeeded after {attempt} retry(ies)") - - return result.get("message", "OK") - - except (ConnectionError, Timeout) as e: - # Network errors and timeouts are retryable - last_error = e - - if attempt < max_retries: - # Exponential backoff: 1s, 2s, 4s - wait_time = 2**attempt - print( - f"⚠ Upload failed (attempt {attempt + 1}/{max_retries + 1}): {type(e).__name__}" - ) - print(f" Retrying in {wait_time} seconds...") - time.sleep(wait_time) - else: - # Last attempt failed - print(f"✗ Upload failed after {max_retries + 1} attempts") - self._handle_request_error(e) - - except requests.exceptions.HTTPError as e: - # Check if it's a server error (5xx) which might be retryable - if hasattr(e, "response") and e.response is not None: - status_code = e.response.status_code - - # Only retry specific server errors that indicate temporary issues - # 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout - # Do NOT retry 500 Internal Server Error (usually indicates business logic errors) - retryable_5xx = [502, 503, 504] - - if status_code in retryable_5xx: - last_error = e - - if attempt < max_retries: - wait_time = 2**attempt - print( - f"⚠ Server error {status_code} (attempt {attempt + 1}/{max_retries + 1})" - ) - print(f" Retrying in {wait_time} seconds...") - time.sleep(wait_time) - else: - print(f"✗ Upload failed after {max_retries + 1} attempts") - self._handle_request_error(e) - else: - # 500 and other errors (including 4xx) are not retryable - # They usually indicate business logic errors or client mistakes - self._handle_request_error(e) - else: - self._handle_request_error(e) - - except Exception as e: - # Other exceptions are not retryable - self._handle_request_error(e) - - # Should not reach here, but just in case - if last_error: - self._handle_request_error(last_error) - - def create(self, path: str) -> Dict[str, Any]: - """Create a new file""" - try: - response = self.session.post( - f"{self.api_base}/files", params={"path": path}, timeout=self.timeout - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def mkdir(self, path: str, mode: str = "755") -> Dict[str, Any]: - """Create a directory""" - try: - response = self.session.post( - f"{self.api_base}/directories", - params={"path": path, "mode": mode}, - timeout=self.timeout, - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def ensure_parent_dirs(self, path: str, mode: str = "755") -> Dict[str, Any]: - """Ensure all parent directories exist for the given path""" - try: - response = self.session.post( - f"{self.api_base}/directories/ensure-parent", - params={"path": path, "mode": mode}, - timeout=self.timeout, - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def rm(self, path: str, recursive: bool = False, force: bool = True) -> Dict[str, Any]: - """Remove a file or directory. - - Args: - path: Path to remove. - recursive: Remove directories recursively. - force: If True (default), ignore nonexistent files (like rm -f). Idempotent by default. - """ - try: - params = {"path": path} - if recursive: - params["recursive"] = "true" - response = self.session.delete( - f"{self.api_base}/files", - params=params, - timeout=self.timeout, - ) - response.raise_for_status() - return response.json() - except requests.exceptions.HTTPError as e: - if force and e.response is not None and e.response.status_code == 404: - return {"message": "deleted"} - self._handle_request_error(e) - except Exception as e: - self._handle_request_error(e) - - def stat(self, path: str) -> Dict[str, Any]: - """Get file/directory information""" - try: - response = self.session.get( - f"{self.api_base}/stat", params={"path": path}, timeout=self.timeout - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def mv(self, old_path: str, new_path: str) -> Dict[str, Any]: - """Rename/move a file or directory""" - try: - response = self.session.post( - f"{self.api_base}/rename", - params={"path": old_path}, - json={"newPath": new_path}, - timeout=self.timeout, - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def chmod(self, path: str, mode: int) -> Dict[str, Any]: - """Change file permissions""" - try: - response = self.session.post( - f"{self.api_base}/chmod", - params={"path": path}, - json={"mode": mode}, - timeout=self.timeout, - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def touch(self, path: str) -> Dict[str, Any]: - """Touch a file (update timestamp by writing empty content)""" - try: - response = self.session.post( - f"{self.api_base}/touch", params={"path": path}, timeout=self.timeout - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def mounts(self) -> List[Dict[str, Any]]: - """List all mounted plugins""" - try: - response = self.session.get(f"{self.api_base}/mounts", timeout=self.timeout) - response.raise_for_status() - data = response.json() - return data.get("mounts", []) - except Exception as e: - self._handle_request_error(e) - - def mount(self, fstype: str, path: str, config: Dict[str, Any]) -> Dict[str, Any]: - """Mount a plugin dynamically - - Args: - fstype: Filesystem type (e.g., 'sqlfs', 's3fs', 'memfs') - path: Mount path - config: Plugin configuration as dictionary - - Returns: - Response with message - """ - try: - response = self.session.post( - f"{self.api_base}/mount", - json={"fstype": fstype, "path": path, "config": config}, - timeout=self.timeout, - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def unmount(self, path: str) -> Dict[str, Any]: - """Unmount a plugin""" - try: - response = self.session.post( - f"{self.api_base}/unmount", json={"path": path}, timeout=self.timeout - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def load_plugin(self, library_path: str) -> Dict[str, Any]: - """Load an external plugin from a shared library or HTTP(S) URL - - Args: - library_path: Path to the shared library (.so/.dylib/.dll) or HTTP(S) URL - - Returns: - Response with message and plugin name - """ - try: - response = self.session.post( - f"{self.api_base}/plugins/load", - json={"library_path": library_path}, - timeout=self.timeout, - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def unload_plugin(self, library_path: str) -> Dict[str, Any]: - """Unload an external plugin - - Args: - library_path: Path to the shared library - - Returns: - Response with message - """ - try: - response = self.session.post( - f"{self.api_base}/plugins/unload", - json={"library_path": library_path}, - timeout=self.timeout, - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def list_plugins(self) -> List[str]: - """List all loaded external plugins - - Returns: - List of plugin library paths - """ - try: - response = self.session.get(f"{self.api_base}/plugins", timeout=self.timeout) - response.raise_for_status() - data = response.json() - - # Support both old and new API formats - if "loaded_plugins" in data: - # Old format - return data.get("loaded_plugins", []) - elif "plugins" in data: - # New format - extract library paths from external plugins only - plugins = data.get("plugins", []) - return [ - p.get("library_path", "") - for p in plugins - if p.get("is_external", False) and p.get("library_path") - ] - else: - return [] - except Exception as e: - self._handle_request_error(e) - - def get_plugins_info(self) -> List[dict]: - """Get detailed information about all loaded plugins - - Returns: - List of plugin info dictionaries with keys: - - name: Plugin name - - library_path: Path to plugin library (for external plugins) - - is_external: Whether this is an external plugin - - mounted_paths: List of mount point information - - config_params: List of configuration parameters (name, type, required, default, description) - """ - try: - response = self.session.get(f"{self.api_base}/plugins", timeout=self.timeout) - response.raise_for_status() - data = response.json() - return data.get("plugins", []) - except Exception as e: - self._handle_request_error(e) - - def grep( - self, - path: str, - pattern: str, - recursive: bool = False, - case_insensitive: bool = False, - stream: bool = False, - node_limit: Optional[int] = None, - exclude_path: Optional[str] = None, - level_limit: Optional[int] = None, - ): - """Search for a pattern in files using regular expressions - - Args: - path: Path to file or directory to search - pattern: Regular expression pattern to search for - recursive: Whether to search recursively in directories (default: False) - case_insensitive: Whether to perform case-insensitive matching (default: False) - stream: Whether to stream results as NDJSON (default: False) - node_limit: Maximum number of results to return (default: None) - exclude_path: Optional path prefix to exclude from search (default: None) - level_limit: Optional maximum depth relative to query root (default: None) - - Returns: - If stream=False: Dict with 'matches' (list of match objects) and 'count' - If stream=True: Iterator yielding match dicts and a final summary dict - - Example (non-stream): - >>> result = client.grep("/local/test-grep", "error", recursive=True) - >>> print(result['count']) - 2 - - Example (stream): - >>> for item in client.grep("/local/test-grep", "error", recursive=True, stream=True): - ... if item.get('type') == 'summary': - ... print(f"Total: {item['count']}") - ... else: - ... print(f"{item['file']}:{item['line']}: {item['content']}") - """ - try: - json_payload = { - "path": path, - "pattern": pattern, - "recursive": recursive, - "case_insensitive": case_insensitive, - "stream": stream, - } - if node_limit is not None: - json_payload["node_limit"] = node_limit - if exclude_path is not None: - json_payload["exclude_path"] = exclude_path - if level_limit is not None: - json_payload["level_limit"] = level_limit - response = self.session.post( - f"{self.api_base}/grep", - json=json_payload, - timeout=None if stream else self.timeout, - stream=stream, - ) - response.raise_for_status() - - if stream: - # Return iterator for streaming results - return self._parse_ndjson_stream(response) - else: - # Return complete result - return response.json() - except Exception as e: - self._handle_request_error(e) - - def _parse_ndjson_stream(self, response): - """Parse NDJSON streaming response line by line""" - import json - - for line in response.iter_lines(): - if line: - try: - yield json.loads(line) - except json.JSONDecodeError: - # Skip malformed lines - continue - - def digest(self, path: str, algorithm: str = "xxh3") -> Dict[str, Any]: - """Calculate the digest of a file using specified algorithm - - Args: - path: Path to the file - algorithm: Hash algorithm to use - "xxh3" or "md5" (default: "xxh3") - - Returns: - Dict with 'algorithm', 'path', and 'digest' keys - - Example: - >>> result = client.digest("/local/file.txt", "xxh3") - >>> print(result['digest']) - abc123def456... - - >>> result = client.digest("/local/file.txt", "md5") - >>> print(result['digest']) - 5d41402abc4b2a76b9719d911017c592 - """ - try: - response = self.session.post( - f"{self.api_base}/digest", - json={"algorithm": algorithm, "path": path}, - timeout=self.timeout, - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - # ==================== HandleFS API ==================== - # These APIs provide POSIX-like file handle operations for - # filesystems that support stateful file access (e.g., seek, pread/pwrite) - - def open_handle( - self, path: str, flags: int = 0, mode: int = 0o644, lease: int = 60 - ) -> "FileHandle": - """Open a file handle for stateful operations - - Args: - path: Path to the file - flags: Open flags (0=O_RDONLY, 1=O_WRONLY, 2=O_RDWR, can OR with O_APPEND=8, O_CREATE=16, O_EXCL=32, O_TRUNC=64) - mode: File mode for creation (default: 0644) - lease: Lease duration in seconds (default: 60) - - Returns: - FileHandle object for performing operations - - Example: - >>> with client.open_handle("/memfs/file.txt", flags=2) as fh: - ... data = fh.read(100) - ... fh.seek(0) - ... fh.write(b"Hello") - """ - try: - response = self.session.post( - f"{self.api_base}/handles/open", - params={"path": path, "flags": str(flags), "mode": str(mode), "lease": str(lease)}, - timeout=self.timeout, - ) - response.raise_for_status() - data = response.json() - return FileHandle(self, data["handle_id"], path, data.get("flags", "")) - except Exception as e: - self._handle_request_error(e) - - def list_handles(self) -> List[Dict[str, Any]]: - """List all active file handles - - Returns: - List of handle info dicts with keys: handle_id, path, flags, lease, expires_at, created_at, last_access - """ - try: - response = self.session.get(f"{self.api_base}/handles", timeout=self.timeout) - response.raise_for_status() - data = response.json() - return data.get("handles", []) - except Exception as e: - self._handle_request_error(e) - - def get_handle_info(self, handle_id: int) -> Dict[str, Any]: - """Get information about a specific handle - - Args: - handle_id: The handle ID (int64) - - Returns: - Handle info dict - """ - try: - response = self.session.get( - f"{self.api_base}/handles/{handle_id}", timeout=self.timeout - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def close_handle(self, handle_id: int) -> Dict[str, Any]: - """Close a file handle - - Args: - handle_id: The handle ID (int64) to close - - Returns: - Response with message - """ - try: - response = self.session.delete( - f"{self.api_base}/handles/{handle_id}", timeout=self.timeout - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def handle_read(self, handle_id: int, size: int = -1, offset: Optional[int] = None) -> bytes: - """Read from a file handle - - Args: - handle_id: The handle ID (int64) - size: Number of bytes to read (default: -1, read all) - offset: If specified, read at this offset (pread), otherwise read at current position - - Returns: - bytes content - """ - try: - params = {"size": str(size)} - if offset is not None: - params["offset"] = str(offset) - response = self.session.get( - f"{self.api_base}/handles/{handle_id}/read", params=params, timeout=self.timeout - ) - response.raise_for_status() - return response.content - except Exception as e: - self._handle_request_error(e) - - def handle_write(self, handle_id: int, data: bytes, offset: Optional[int] = None) -> int: - """Write to a file handle - - Args: - handle_id: The handle ID (int64) - data: Data to write - offset: If specified, write at this offset (pwrite), otherwise write at current position - - Returns: - Number of bytes written - """ - try: - params = {} - if offset is not None: - params["offset"] = str(offset) - response = self.session.put( - f"{self.api_base}/handles/{handle_id}/write", - params=params, - data=data, - timeout=self.timeout, - ) - response.raise_for_status() - result = response.json() - return result.get("bytes_written", 0) - except Exception as e: - self._handle_request_error(e) - - def handle_seek(self, handle_id: int, offset: int, whence: int = 0) -> int: - """Seek within a file handle - - Args: - handle_id: The handle ID (int64) - offset: Offset to seek to - whence: 0=SEEK_SET, 1=SEEK_CUR, 2=SEEK_END - - Returns: - New position - """ - try: - response = self.session.post( - f"{self.api_base}/handles/{handle_id}/seek", - params={"offset": str(offset), "whence": str(whence)}, - timeout=self.timeout, - ) - response.raise_for_status() - result = response.json() - return result.get("position", 0) - except Exception as e: - self._handle_request_error(e) - - def handle_sync(self, handle_id: int) -> Dict[str, Any]: - """Sync a file handle (flush to storage) - - Args: - handle_id: The handle ID (int64) - - Returns: - Response with message - """ - try: - response = self.session.post( - f"{self.api_base}/handles/{handle_id}/sync", timeout=self.timeout - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def handle_stat(self, handle_id: int) -> Dict[str, Any]: - """Get file info via handle - - Args: - handle_id: The handle ID (int64) - - Returns: - File info dict - """ - try: - response = self.session.get( - f"{self.api_base}/handles/{handle_id}/stat", timeout=self.timeout - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def renew_handle(self, handle_id: int, lease: int = 60) -> Dict[str, Any]: - """Renew the lease on a file handle - - Args: - handle_id: The handle ID (int64) - lease: New lease duration in seconds - - Returns: - Response with new expires_at - """ - try: - response = self.session.post( - f"{self.api_base}/handles/{handle_id}/renew", - params={"lease": str(lease)}, - timeout=self.timeout, - ) - response.raise_for_status() - return response.json() - except Exception as e: - self._handle_request_error(e) - - def get_stats(self, path: Optional[str] = None) -> Dict[str, Any]: - """Get filesystem statistics - - Args: - path: Optional mount path to get stats for. If None, get stats for all mounts. - - Returns: - Statistics data - """ - try: - params = {} - if path is not None: - params["path"] = path - response = self.session.get( - f"{self.api_base}/stats", - params=params, - timeout=self.timeout, - ) - response.raise_for_status() - data = response.json() - return data.get("data", {}) - except Exception as e: - self._handle_request_error(e) - - -class FileHandle: - """A file handle for stateful file operations - - Supports context manager protocol for automatic cleanup. - - Example: - >>> with client.open_handle("/memfs/file.txt", flags=2) as fh: - ... fh.write(b"Hello World") - ... fh.seek(0) - ... print(fh.read()) - """ - - # Open flag constants - O_RDONLY = 0 - O_WRONLY = 1 - O_RDWR = 2 - O_APPEND = 8 - O_CREATE = 16 - O_EXCL = 32 - O_TRUNC = 64 - - # Seek whence constants - SEEK_SET = 0 - SEEK_CUR = 1 - SEEK_END = 2 - - def __init__(self, client: AGFSClient, handle_id: int, path: str, flags: int): - self._client = client - self._handle_id = handle_id - self._path = path - self._flags = flags - self._closed = False - - @property - def handle_id(self) -> int: - """The handle ID (int64)""" - return self._handle_id - - @property - def path(self) -> str: - """The file path""" - return self._path - - @property - def flags(self) -> int: - """The open flags (numeric)""" - return self._flags - - @property - def closed(self) -> bool: - """Whether the handle is closed""" - return self._closed - - def read(self, size: int = -1) -> bytes: - """Read from current position - - Args: - size: Number of bytes to read (default: -1, read all) - - Returns: - bytes content - """ - if self._closed: - raise AGFSClientError("Handle is closed") - return self._client.handle_read(self._handle_id, size) - - def read_at(self, size: int, offset: int) -> bytes: - """Read at specific offset (pread) - - Args: - size: Number of bytes to read - offset: Offset to read from - - Returns: - bytes content - """ - if self._closed: - raise AGFSClientError("Handle is closed") - return self._client.handle_read(self._handle_id, size, offset) - - def write(self, data: bytes) -> int: - """Write at current position - - Args: - data: Data to write - - Returns: - Number of bytes written - """ - if self._closed: - raise AGFSClientError("Handle is closed") - return self._client.handle_write(self._handle_id, data) - - def write_at(self, data: bytes, offset: int) -> int: - """Write at specific offset (pwrite) - - Args: - data: Data to write - offset: Offset to write at - - Returns: - Number of bytes written - """ - if self._closed: - raise AGFSClientError("Handle is closed") - return self._client.handle_write(self._handle_id, data, offset) - - def seek(self, offset: int, whence: int = 0) -> int: - """Seek to position - - Args: - offset: Offset to seek to - whence: SEEK_SET(0), SEEK_CUR(1), or SEEK_END(2) - - Returns: - New position - """ - if self._closed: - raise AGFSClientError("Handle is closed") - return self._client.handle_seek(self._handle_id, offset, whence) - - def tell(self) -> int: - """Get current position - - Returns: - Current position - """ - return self.seek(0, self.SEEK_CUR) - - def sync(self) -> None: - """Flush data to storage""" - if self._closed: - raise AGFSClientError("Handle is closed") - self._client.handle_sync(self._handle_id) - - def stat(self) -> Dict[str, Any]: - """Get file info - - Returns: - File info dict - """ - if self._closed: - raise AGFSClientError("Handle is closed") - return self._client.handle_stat(self._handle_id) - - def info(self) -> Dict[str, Any]: - """Get handle info - - Returns: - Handle info dict - """ - if self._closed: - raise AGFSClientError("Handle is closed") - return self._client.get_handle_info(self._handle_id) - - def renew(self, lease: int = 60) -> Dict[str, Any]: - """Renew the handle lease - - Args: - lease: New lease duration in seconds - - Returns: - Response with new expires_at - """ - if self._closed: - raise AGFSClientError("Handle is closed") - return self._client.renew_handle(self._handle_id, lease) - - def close(self) -> None: - """Close the handle""" - if not self._closed: - self._client.close_handle(self._handle_id) - self._closed = True - - def __enter__(self) -> "FileHandle": - return self - - def __exit__(self, exc_type, exc_val, exc_tb) -> None: - self.close() - - def __repr__(self) -> str: - status = "closed" if self._closed else "open" - return f"FileHandle(id={self._handle_id}, path={self._path}, flags={self._flags}, {status})" diff --git a/openviking/pyagfs/helpers.py b/openviking/pyagfs/helpers.py index 19881f27ea..4fd21ec7a4 100644 --- a/openviking/pyagfs/helpers.py +++ b/openviking/pyagfs/helpers.py @@ -7,19 +7,21 @@ """ from pathlib import Path -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from .client import AGFSClient +from .protocols import AGFSSyncClientProtocol def cp( - client: "AGFSClient", src: str, dst: str, recursive: bool = False, stream: bool = False + client: AGFSSyncClientProtocol, + src: str, + dst: str, + recursive: bool = False, + stream: bool = False, ) -> None: """Copy a file or directory within AGFS. Args: - client: AGFSClient instance + client: AGFS binding client instance src: Source path in AGFS dst: Destination path in AGFS recursive: If True, copy directories recursively @@ -29,7 +31,6 @@ def cp( AGFSClientError: If source doesn't exist or operation fails Examples: - >>> client = AGFSClient("http://localhost:8080") >>> cp(client, "/file.txt", "/backup/file.txt") # Copy file >>> cp(client, "/dir", "/backup/dir", recursive=True) # Copy directory """ @@ -46,7 +47,7 @@ def cp( def upload( - client: "AGFSClient", + client: AGFSSyncClientProtocol, local_path: str, remote_path: str, recursive: bool = False, @@ -55,7 +56,7 @@ def upload( """Upload a file or directory from local filesystem to AGFS. Args: - client: AGFSClient instance + client: AGFS binding client instance local_path: Path to local file or directory remote_path: Destination path in AGFS recursive: If True, upload directories recursively @@ -66,7 +67,6 @@ def upload( AGFSClientError: If upload fails Examples: - >>> client = AGFSClient("http://localhost:8080") >>> upload(client, "/tmp/file.txt", "/remote/file.txt") # Upload file >>> upload(client, "/tmp/data", "/remote/data", recursive=True) # Upload directory """ @@ -84,7 +84,7 @@ def upload( def download( - client: "AGFSClient", + client: AGFSSyncClientProtocol, remote_path: str, local_path: str, recursive: bool = False, @@ -93,7 +93,7 @@ def download( """Download a file or directory from AGFS to local filesystem. Args: - client: AGFSClient instance + client: AGFS binding client instance remote_path: Path in AGFS local_path: Destination path on local filesystem recursive: If True, download directories recursively @@ -103,7 +103,6 @@ def download( AGFSClientError: If remote path doesn't exist or download fails Examples: - >>> client = AGFSClient("http://localhost:8080") >>> download(client, "/remote/file.txt", "/tmp/file.txt") # Download file >>> download(client, "/remote/data", "/tmp/data", recursive=True) # Download directory """ @@ -122,7 +121,7 @@ def download( # Internal helper functions -def _copy_file(client: "AGFSClient", src: str, dst: str, stream: bool) -> None: +def _copy_file(client: AGFSSyncClientProtocol, src: str, dst: str, stream: bool) -> None: """Copy a single file within AGFS.""" # Ensure parent directory exists _ensure_remote_parent_dir(client, dst) @@ -143,7 +142,7 @@ def _copy_file(client: "AGFSClient", src: str, dst: str, stream: bool) -> None: client.write(dst, data) -def _copy_directory(client: "AGFSClient", src: str, dst: str, stream: bool) -> None: +def _copy_directory(client: AGFSSyncClientProtocol, src: str, dst: str, stream: bool) -> None: """Recursively copy a directory within AGFS.""" # Create destination directory try: @@ -168,7 +167,12 @@ def _copy_directory(client: "AGFSClient", src: str, dst: str, stream: bool) -> N _copy_file(client, src_path, dst_path, stream) -def _upload_file(client: "AGFSClient", local_file: Path, remote_path: str, stream: bool) -> None: +def _upload_file( + client: AGFSSyncClientProtocol, + local_file: Path, + remote_path: str, + stream: bool, +) -> None: """Upload a single file to AGFS.""" # Ensure parent directory exists in AGFS _ensure_remote_parent_dir(client, remote_path) @@ -193,7 +197,10 @@ def _upload_file(client: "AGFSClient", local_file: Path, remote_path: str, strea def _upload_directory( - client: "AGFSClient", local_dir: Path, remote_path: str, stream: bool + client: AGFSSyncClientProtocol, + local_dir: Path, + remote_path: str, + stream: bool, ) -> None: """Recursively upload a directory to AGFS.""" # Create remote directory @@ -215,7 +222,12 @@ def _upload_directory( _upload_file(client, item, remote_item_path, stream) -def _download_file(client: "AGFSClient", remote_path: str, local_file: Path, stream: bool) -> None: +def _download_file( + client: AGFSSyncClientProtocol, + remote_path: str, + local_file: Path, + stream: bool, +) -> None: """Download a single file from AGFS.""" # Ensure parent directory exists locally local_file.parent.mkdir(parents=True, exist_ok=True) @@ -234,7 +246,10 @@ def _download_file(client: "AGFSClient", remote_path: str, local_file: Path, str def _download_directory( - client: "AGFSClient", remote_path: str, local_dir: Path, stream: bool + client: AGFSSyncClientProtocol, + remote_path: str, + local_dir: Path, + stream: bool, ) -> None: """Recursively download a directory from AGFS.""" # Create local directory @@ -256,7 +271,7 @@ def _download_directory( _download_file(client, remote_item_path, local_item_path, stream) -def _ensure_remote_parent_dir(client: "AGFSClient", path: str) -> None: +def _ensure_remote_parent_dir(client: AGFSSyncClientProtocol, path: str) -> None: """Ensure the parent directory exists for a remote path.""" parent = "/".join(path.rstrip("/").split("/")[:-1]) if parent and parent != "/": @@ -264,7 +279,7 @@ def _ensure_remote_parent_dir(client: "AGFSClient", path: str) -> None: _ensure_remote_dir_recursive(client, parent) -def _ensure_remote_dir_recursive(client: "AGFSClient", path: str) -> None: +def _ensure_remote_dir_recursive(client: AGFSSyncClientProtocol, path: str) -> None: """Recursively ensure a directory exists in AGFS.""" if not path or path == "/": return diff --git a/openviking/pyagfs/protocols.py b/openviking/pyagfs/protocols.py new file mode 100644 index 0000000000..f4fdf8187c --- /dev/null +++ b/openviking/pyagfs/protocols.py @@ -0,0 +1,67 @@ +"""Protocol types for synchronous AGFS-like clients.""" + +from __future__ import annotations + +from collections.abc import Iterator +from typing import Any, BinaryIO, Dict, Protocol, runtime_checkable + + +@runtime_checkable +class AGFSSyncClientProtocol(Protocol): + """Minimal synchronous AGFS client contract used by OpenViking.""" + + def ls(self, path: str = "/") -> list[Dict[str, Any]]: + """List directory entries under the given AGFS path.""" + + def read( + self, + path: str, + offset: int = 0, + size: int = -1, + stream: bool = False, + ) -> Any: + """Read file content from AGFS.""" + + def cat( + self, + path: str, + offset: int = 0, + size: int = -1, + stream: bool = False, + ) -> Any: + """Read file content or a streaming response from AGFS.""" + + def write( + self, + path: str, + data: bytes | Iterator[bytes] | BinaryIO, + max_retries: int = 3, + ) -> str: + """Write file content to AGFS and return the backend result.""" + + def mkdir(self, path: str, mode: str = "755") -> Dict[str, Any]: + """Create a directory in AGFS.""" + + def ensure_parent_dirs(self, path: str, mode: str = "755") -> Dict[str, Any]: + """Ensure parent directories exist for the given AGFS path.""" + + def rm(self, path: str, recursive: bool = False, force: bool = True) -> Dict[str, Any]: + """Remove a file or directory from AGFS.""" + + def stat(self, path: str) -> Dict[str, Any]: + """Return AGFS metadata for the given path.""" + + def mv(self, old_path: str, new_path: str) -> Dict[str, Any]: + """Move or rename a path inside AGFS.""" + + def grep(self, **kwargs: Any) -> Dict[str, Any]: + """Run a grep-like search through the AGFS backend.""" + + def tree_directory( + self, + path: str, + show_hidden: bool = False, + node_limit: int | None = None, + level_limit: int | None = None, + ) -> list[Dict[str, Any]]: + """Return a tree view for the given AGFS directory.""" diff --git a/openviking/storage/queuefs/named_queue.py b/openviking/storage/queuefs/named_queue.py index 350991bb27..50e543d350 100644 --- a/openviking/storage/queuefs/named_queue.py +++ b/openviking/storage/queuefs/named_queue.py @@ -5,14 +5,11 @@ import threading from dataclasses import dataclass, field from datetime import datetime -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union -from openviking.pyagfs import AsyncAGFSClient +from openviking.pyagfs import AGFSSyncClientProtocol, AsyncAGFSClient from openviking_cli.utils.logger import get_logger -if TYPE_CHECKING: - from openviking.pyagfs import AGFSClient - logger = get_logger(__name__) @@ -106,7 +103,7 @@ class NamedQueue: def __init__( self, - agfs: "AGFSClient", + agfs: AGFSSyncClientProtocol, mount_point: str, name: str, enqueue_hook: Optional[EnqueueHookBase] = None, diff --git a/openviking/storage/transaction/lock_manager.py b/openviking/storage/transaction/lock_manager.py index 89e0b269c0..316588989e 100644 --- a/openviking/storage/transaction/lock_manager.py +++ b/openviking/storage/transaction/lock_manager.py @@ -7,7 +7,7 @@ import time from typing import Any, Dict, List, Optional, Protocol -from openviking.pyagfs import AGFSClient, AsyncAGFSClient +from openviking.pyagfs import AGFSSyncClientProtocol, AsyncAGFSClient from openviking.storage.transaction.lock_handle import LockHandle from openviking.storage.transaction.path_lock import PathLockEngine from openviking.storage.transaction.redo_log import RedoLog @@ -37,7 +37,7 @@ class LockManager: def __init__( self, - agfs: AGFSClient, + agfs: AGFSSyncClientProtocol, lock_timeout: float = 0.0, lock_expire: float = 300.0, redo_recovery_enabled: bool = True, @@ -561,7 +561,7 @@ async def _enqueue_semantic(self, **params: Any) -> None: def init_lock_manager( - agfs: AGFSClient, + agfs: AGFSSyncClientProtocol, lock_timeout: float = 0.0, lock_expire: float = 300.0, redo_recovery_enabled: bool = True, diff --git a/openviking/storage/transaction/path_lock.py b/openviking/storage/transaction/path_lock.py index 78a5f84da4..1d5b2ad4d1 100644 --- a/openviking/storage/transaction/path_lock.py +++ b/openviking/storage/transaction/path_lock.py @@ -5,7 +5,7 @@ from dataclasses import dataclass, field from typing import Optional, Tuple -from openviking.pyagfs import AGFSClient, AsyncAGFSClient +from openviking.pyagfs import AGFSSyncClientProtocol, AsyncAGFSClient from openviking.storage.transaction.lock_handle import LockOwner from openviking_cli.utils.logger import get_logger @@ -65,7 +65,7 @@ def _log_timeout_waiting(message: str) -> None: class PathLockEngine: - def __init__(self, agfs_client: AGFSClient, lock_expire: float = 300.0): + def __init__(self, agfs_client: AGFSSyncClientProtocol, lock_expire: float = 300.0): self._agfs = agfs_client self._async_agfs = AsyncAGFSClient(agfs_client) self._lock_expire = lock_expire diff --git a/openviking/storage/transaction/redo_log.py b/openviking/storage/transaction/redo_log.py index de8f6fcbbf..d51de83604 100644 --- a/openviking/storage/transaction/redo_log.py +++ b/openviking/storage/transaction/redo_log.py @@ -5,7 +5,7 @@ import json from typing import Any, Dict, List -from openviking.pyagfs import AGFSClient, AsyncAGFSClient +from openviking.pyagfs import AGFSSyncClientProtocol, AsyncAGFSClient from openviking_cli.utils.logger import get_logger logger = get_logger(__name__) @@ -20,7 +20,7 @@ class RedoLog: On startup, scan for leftover markers and redo. """ - def __init__(self, agfs: AGFSClient): + def __init__(self, agfs: AGFSSyncClientProtocol): self._async_agfs = AsyncAGFSClient(agfs) def _task_path(self, task_id: str) -> str: diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index 0a83a42a15..f74f7e3077 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -3,7 +3,7 @@ """ VikingFS: OpenViking file system abstraction layer -Encapsulates AGFSClient, providing file operation interface based on Viking URI. +Encapsulates the AGFS binding client, providing file operation interface based on Viking URI. Responsibilities: - URI conversion (viking:// <-> /local/) - L0/L1 reading (.abstract.md, .overview.md) @@ -862,13 +862,11 @@ async def _grep_with_agfs( files_scanned_set.add(file_uri) - results.append( - { - "line": match.get("line", match.get("line_number", 0)), - "uri": file_uri, - "content": match.get("content", ""), - } - ) + results.append({ + "line": match.get("line", match.get("line_number", 0)), + "uri": file_uri, + "content": match.get("content", ""), + }) if node_limit and len(results) >= node_limit: break @@ -1036,13 +1034,11 @@ async def _grep_single_file( lines = content.split("\n") for line_num, line in enumerate(lines, 1): if compiled_pattern.search(line): - matches.append( - { - "line": line_num, - "uri": entry_uri, - "content": line, - } - ) + matches.append({ + "line": line_num, + "uri": entry_uri, + "content": line, + }) return matches, 1 except Exception as e: logger.debug(f"Failed to grep {entry_uri}: {e}") @@ -1251,17 +1247,15 @@ async def _tree_original( ): info = entry["info"] new_entry = dict(entry.get("extra", {})) - new_entry.update( - { - "name": info["name"], - "size": info["size"], - "mode": info["mode"], - "modTime": info["modTime"], - "isDir": info["isDir"], - "rel_path": entry["rel_path"], - "uri": entry_uri, - } - ) + new_entry.update({ + "name": info["name"], + "size": info["size"], + "mode": info["mode"], + "modTime": info["modTime"], + "isDir": info["isDir"], + "rel_path": entry["rel_path"], + "uri": entry_uri, + }) result.append(new_entry) return result @@ -1287,15 +1281,13 @@ async def _tree_agent( ): info = entry["info"] is_dir = info["isDir"] - result.append( - { - "uri": entry_uri, - "size": 0 if is_dir else info["size"], - "isDir": is_dir, - "modTime": format_simplified(parse_iso_datetime(info["modTime"]), now), - "rel_path": entry["rel_path"], - } - ) + result.append({ + "uri": entry_uri, + "size": 0 if is_dir else info["size"], + "isDir": is_dir, + "modTime": format_simplified(parse_iso_datetime(info["modTime"]), now), + "rel_path": entry["rel_path"], + }) await self._batch_fetch_abstracts(result, abs_limit, ctx=ctx) diff --git a/tests/server/test_api_sessions.py b/tests/server/test_api_sessions.py index 1d662d494d..abba3de168 100644 --- a/tests/server/test_api_sessions.py +++ b/tests/server/test_api_sessions.py @@ -54,24 +54,22 @@ def _message_request( def _configure_test_env(monkeypatch, tmp_path): config_path = tmp_path / "ov.conf" config_path.write_text( - json.dumps( - { - "storage": { - "workspace": str(tmp_path / "workspace"), - "agfs": {"backend": "local", "mode": "binding-client"}, - "vectordb": {"backend": "local"}, - }, - "embedding": { - "dense": { - "provider": "openai", - "model": "test-embedder", - "api_base": "http://127.0.0.1:11434/v1", - "dimension": 1024, - } - }, - "encryption": {"enabled": False}, - } - ), + json.dumps({ + "storage": { + "workspace": str(tmp_path / "workspace"), + "agfs": {"backend": "local"}, + "vectordb": {"backend": "local"}, + }, + "embedding": { + "dense": { + "provider": "openai", + "model": "test-embedder", + "api_base": "http://127.0.0.1:11434/v1", + "dimension": 1024, + } + }, + "encryption": {"enabled": False}, + }), encoding="utf-8", ) diff --git a/tests/session/test_session_context.py b/tests/session/test_session_context.py index cc60c43924..9f62daf172 100644 --- a/tests/session/test_session_context.py +++ b/tests/session/test_session_context.py @@ -54,24 +54,22 @@ async def _fake_get_vision_completion(self, prompt, images, thinking=False): def _write_test_config(tmp_path): config_path = tmp_path / "ov.conf" config_path.write_text( - json.dumps( - { - "storage": { - "workspace": str(tmp_path / "workspace"), - "agfs": {"backend": "local", "mode": "binding-client"}, - "vectordb": {"backend": "local"}, - }, - "embedding": { - "dense": { - "provider": "openai", - "model": "test-embedder", - "api_base": "http://127.0.0.1:11434/v1", - "dimension": 1024, - } - }, - "encryption": {"enabled": False}, - } - ), + json.dumps({ + "storage": { + "workspace": str(tmp_path / "workspace"), + "agfs": {"backend": "local"}, + "vectordb": {"backend": "local"}, + }, + "embedding": { + "dense": { + "provider": "openai", + "model": "test-embedder", + "api_base": "http://127.0.0.1:11434/v1", + "dimension": 1024, + } + }, + "encryption": {"enabled": False}, + }), encoding="utf-8", ) return config_path diff --git a/tests/test_prompt_manager.py b/tests/test_prompt_manager.py index 4a02950fc3..a21d216439 100644 --- a/tests/test_prompt_manager.py +++ b/tests/test_prompt_manager.py @@ -23,44 +23,40 @@ def _write_template(templates_dir: Path, content: str) -> None: template_path = templates_dir / "memory" / "profile.yaml" template_path.parent.mkdir(parents=True, exist_ok=True) template_path.write_text( - json.dumps( - { - "metadata": { - "id": "memory.profile", - "name": "Profile", - "description": "Test template", - "version": "1.0.0", - "language": "en", - "category": "memory", - }, - "template": content, - } - ), + json.dumps({ + "metadata": { + "id": "memory.profile", + "name": "Profile", + "description": "Test template", + "version": "1.0.0", + "language": "en", + "category": "memory", + }, + "template": content, + }), encoding="utf-8", ) def _write_config(config_path: Path, templates_dir: Path) -> None: config_path.write_text( - json.dumps( - { - "storage": { - "workspace": str(config_path.parent / "workspace"), - "agfs": {"backend": "local", "mode": "binding-client"}, - "vectordb": {"backend": "local"}, - }, - "embedding": { - "dense": { - "provider": "openai", - "model": "text-embedding-3-small", - "api_key": "test-key", - } - }, - "prompts": { - "templates_dir": str(templates_dir), - }, - } - ), + json.dumps({ + "storage": { + "workspace": str(config_path.parent / "workspace"), + "agfs": {"backend": "local"}, + "vectordb": {"backend": "local"}, + }, + "embedding": { + "dense": { + "provider": "openai", + "model": "text-embedding-3-small", + "api_key": "test-key", + } + }, + "prompts": { + "templates_dir": str(templates_dir), + }, + }), encoding="utf-8", ) @@ -72,12 +68,10 @@ def teardown_function() -> None: def test_profile_memory_template_keeps_profile_minimal_and_migrates_preferences(): template_path = PromptManager._get_bundled_templates_dir() / "memory" / "profile.yaml" schema = yaml.safe_load(template_path.read_text(encoding="utf-8")) - text = "\n".join( - [ - schema["description"], - schema["fields"][0]["description"], - ] - ) + text = "\n".join([ + schema["description"], + schema["fields"][0]["description"], + ]) assert "identity summary" in text assert "5-8" in text @@ -94,13 +88,11 @@ def test_profile_memory_template_keeps_profile_minimal_and_migrates_preferences( def test_preferences_memory_template_limits_topics_and_splits_when_too_large(): template_path = PromptManager._get_bundled_templates_dir() / "memory" / "preferences.yaml" schema = yaml.safe_load(template_path.read_text(encoding="utf-8")) - text = "\n".join( - [ - schema["description"], - schema["fields"][1]["description"], - schema["fields"][2]["description"], - ] - ) + text = "\n".join([ + schema["description"], + schema["fields"][1]["description"], + schema["fields"][2]["description"], + ]) assert "Complete but minimal" in text assert "3-8" in text @@ -185,15 +177,13 @@ def test_memory_type_registry_loads_schemas_from_prompt_manager_resolved_templat memory_dir = resolved_templates_dir / "memory" memory_dir.mkdir(parents=True) (memory_dir / "custom.yaml").write_text( - json.dumps( - { - "memory_type": "custom_memory", - "description": "custom schema from resolved prompt root", - "directory": "viking://user/{{ user_space }}/memories/custom", - "filename_template": "custom.md", - "fields": [], - } - ), + json.dumps({ + "memory_type": "custom_memory", + "description": "custom schema from resolved prompt root", + "directory": "viking://user/{{ user_space }}/memories/custom", + "filename_template": "custom.md", + "fields": [], + }), encoding="utf-8", ) @@ -223,27 +213,23 @@ def test_memory_type_registry_prefers_custom_memory_dir_over_prompt_manager_temp resolved_memory_dir.mkdir(parents=True) custom_memory_dir.mkdir(parents=True) (resolved_memory_dir / "prompt_root.yaml").write_text( - json.dumps( - { - "memory_type": "prompt_root_memory", - "description": "schema from prompt manager root", - "directory": "viking://user/{{ user_space }}/memories/prompt-root", - "filename_template": "prompt-root.md", - "fields": [], - } - ), + json.dumps({ + "memory_type": "prompt_root_memory", + "description": "schema from prompt manager root", + "directory": "viking://user/{{ user_space }}/memories/prompt-root", + "filename_template": "prompt-root.md", + "fields": [], + }), encoding="utf-8", ) (custom_memory_dir / "custom.yaml").write_text( - json.dumps( - { - "memory_type": "custom_memory", - "description": "schema from custom memory dir", - "directory": "viking://user/{{ user_space }}/memories/custom", - "filename_template": "custom.md", - "fields": [], - } - ), + json.dumps({ + "memory_type": "custom_memory", + "description": "schema from custom memory dir", + "directory": "viking://user/{{ user_space }}/memories/custom", + "filename_template": "custom.md", + "fields": [], + }), encoding="utf-8", ) @@ -326,8 +312,10 @@ def test_context_provider_schema_directories_prefer_custom_memory_dir_over_promp ) monkeypatch.setattr( "os.path.exists", - lambda path: path == str(custom_memory_dir) - or path == str(PromptManager._get_bundled_templates_dir() / "memory"), + lambda path: ( + path == str(custom_memory_dir) + or path == str(PromptManager._get_bundled_templates_dir() / "memory") + ), ) provider = SessionExtractContextProvider(messages=[]) diff --git a/tests/utils/mock_agfs.py b/tests/utils/mock_agfs.py index 24df4962bb..b26f64ea0a 100644 --- a/tests/utils/mock_agfs.py +++ b/tests/utils/mock_agfs.py @@ -5,8 +5,9 @@ class MockLocalAGFS: """ - A mock implementation of AGFSClient that operates on a local directory. - Useful for tests where running a real AGFS server is not feasible or desired. + A mock implementation of the AGFS binding client that operates on a local + directory. Useful for tests where running the real RAGFS binding is not + feasible or desired. """ def __init__(self, config=None, root_path=None): @@ -33,16 +34,14 @@ def ls(self, path, ctx=None, **kwargs): return [] res = [] for item in p.iterdir(): - res.append( - { - "name": item.name, - "isDir": item.is_dir(), # Note: JS style camelCase for some APIs - "type": "directory" if item.is_dir() else "file", - "size": item.stat().st_size if item.is_file() else 0, - "mtime": item.stat().st_mtime, - "uri": f"viking://{path}/{item.name}".replace("//", "/"), - } - ) + res.append({ + "name": item.name, + "isDir": item.is_dir(), # Note: JS style camelCase for some APIs + "type": "directory" if item.is_dir() else "file", + "size": item.stat().st_size if item.is_file() else 0, + "mtime": item.stat().st_mtime, + "uri": f"viking://{path}/{item.name}".replace("//", "/"), + }) return res def writeto(self, path, content, ctx=None, **kwargs): From 3f6ee68eb11620576485d54814bb69db9776eb6b Mon Sep 17 00:00:00 2001 From: zhangbaojun Date: Wed, 3 Jun 2026 21:03:11 +0800 Subject: [PATCH 2/3] refactor(storage): refactor protocol to support diff return type --- openviking/pyagfs/helpers.py | 50 ++++++++++++++++++---------------- openviking/pyagfs/protocols.py | 36 ++++++++++++++++++++---- 2 files changed, 56 insertions(+), 30 deletions(-) diff --git a/openviking/pyagfs/helpers.py b/openviking/pyagfs/helpers.py index 4fd21ec7a4..d846340000 100644 --- a/openviking/pyagfs/helpers.py +++ b/openviking/pyagfs/helpers.py @@ -6,9 +6,10 @@ - download: Download files/directories from AGFS to local filesystem """ +from collections.abc import Iterator from pathlib import Path -from .protocols import AGFSSyncClientProtocol +from .protocols import AGFSByteStream, AGFSSyncClientProtocol def cp( @@ -127,15 +128,9 @@ def _copy_file(client: AGFSSyncClientProtocol, src: str, dst: str, stream: bool) _ensure_remote_parent_dir(client, dst) if stream: - # Stream the file content for memory efficiency - response = client.cat(src, stream=True) - # Read and write in chunks - chunk_size = 8192 - chunks = [] - for chunk in response.iter_content(chunk_size=chunk_size): - chunks.append(chunk) - data = b"".join(chunks) - client.write(dst, data) + # The binding client returns bytes or an iterator of bytes, not an + # HTTP response object with iter_content(). + client.write(dst, _iter_file_bytes(client.cat(src, stream=True))) else: # Read entire file and write data = client.cat(src) @@ -178,17 +173,7 @@ def _upload_file( _ensure_remote_parent_dir(client, remote_path) if stream: - # Read file in chunks for memory efficiency - chunk_size = 8192 - chunks = [] - with open(local_file, "rb") as f: - while True: - chunk = f.read(chunk_size) - if not chunk: - break - chunks.append(chunk) - data = b"".join(chunks) - client.write(remote_path, data) + client.write(remote_path, _iter_local_file_bytes(local_file)) else: # Read entire file with open(local_file, "rb") as f: @@ -233,10 +218,8 @@ def _download_file( local_file.parent.mkdir(parents=True, exist_ok=True) if stream: - # Stream the file content - response = client.cat(remote_path, stream=True) with open(local_file, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): + for chunk in _iter_file_bytes(client.cat(remote_path, stream=True)): f.write(chunk) else: # Read entire file @@ -304,3 +287,22 @@ def _ensure_remote_dir_recursive(client: AGFSSyncClientProtocol, path: str) -> N except Exception: # Might already exist due to race condition, ignore pass + + +def _iter_file_bytes(data: bytes | AGFSByteStream) -> AGFSByteStream: + """Normalize AGFS read results to a byte iterator.""" + if isinstance(data, bytes): + return iter((data,)) + if isinstance(data, Iterator): + return data + return iter(data) + + +def _iter_local_file_bytes(local_file: Path, chunk_size: int = 8192) -> AGFSByteStream: + """Yield local file content in chunks for streaming uploads.""" + with open(local_file, "rb") as file_obj: + while True: + chunk = file_obj.read(chunk_size) + if not chunk: + break + yield chunk diff --git a/openviking/pyagfs/protocols.py b/openviking/pyagfs/protocols.py index f4fdf8187c..04c145e263 100644 --- a/openviking/pyagfs/protocols.py +++ b/openviking/pyagfs/protocols.py @@ -3,7 +3,9 @@ from __future__ import annotations from collections.abc import Iterator -from typing import Any, BinaryIO, Dict, Protocol, runtime_checkable +from typing import Any, BinaryIO, Dict, Literal, Protocol, overload, runtime_checkable + +AGFSByteStream = Iterator[bytes] @runtime_checkable @@ -13,23 +15,45 @@ class AGFSSyncClientProtocol(Protocol): def ls(self, path: str = "/") -> list[Dict[str, Any]]: """List directory entries under the given AGFS path.""" + @overload + def read( + self, + path: str, + offset: int = 0, + size: int = -1, + stream: Literal[False] = False, + ) -> bytes: + """Read file content from AGFS.""" + + @overload def read( self, path: str, offset: int = 0, size: int = -1, - stream: bool = False, - ) -> Any: + stream: Literal[True] = True, + ) -> AGFSByteStream: + """Read file content from AGFS.""" + + @overload + def cat( + self, + path: str, + offset: int = 0, + size: int = -1, + stream: Literal[False] = False, + ) -> bytes: """Read file content from AGFS.""" + @overload def cat( self, path: str, offset: int = 0, size: int = -1, - stream: bool = False, - ) -> Any: - """Read file content or a streaming response from AGFS.""" + stream: Literal[True] = True, + ) -> AGFSByteStream: + """Read file content or a byte stream from AGFS.""" def write( self, From 3e67c93e884c48f001aeb160a719048e409065ac Mon Sep 17 00:00:00 2001 From: zhangbaojun Date: Wed, 3 Jun 2026 21:36:18 +0800 Subject: [PATCH 3/3] refactor(storage): refactor code format from lint --- openviking/storage/viking_fs.py | 60 ++++++------ tests/server/test_api_sessions.py | 34 +++---- tests/session/test_session_context.py | 34 +++---- tests/test_prompt_manager.py | 130 ++++++++++++++------------ tests/utils/mock_agfs.py | 18 ++-- 5 files changed, 152 insertions(+), 124 deletions(-) diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index f74f7e3077..2c07a8aa65 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -862,11 +862,13 @@ async def _grep_with_agfs( files_scanned_set.add(file_uri) - results.append({ - "line": match.get("line", match.get("line_number", 0)), - "uri": file_uri, - "content": match.get("content", ""), - }) + results.append( + { + "line": match.get("line", match.get("line_number", 0)), + "uri": file_uri, + "content": match.get("content", ""), + } + ) if node_limit and len(results) >= node_limit: break @@ -1034,11 +1036,13 @@ async def _grep_single_file( lines = content.split("\n") for line_num, line in enumerate(lines, 1): if compiled_pattern.search(line): - matches.append({ - "line": line_num, - "uri": entry_uri, - "content": line, - }) + matches.append( + { + "line": line_num, + "uri": entry_uri, + "content": line, + } + ) return matches, 1 except Exception as e: logger.debug(f"Failed to grep {entry_uri}: {e}") @@ -1247,15 +1251,17 @@ async def _tree_original( ): info = entry["info"] new_entry = dict(entry.get("extra", {})) - new_entry.update({ - "name": info["name"], - "size": info["size"], - "mode": info["mode"], - "modTime": info["modTime"], - "isDir": info["isDir"], - "rel_path": entry["rel_path"], - "uri": entry_uri, - }) + new_entry.update( + { + "name": info["name"], + "size": info["size"], + "mode": info["mode"], + "modTime": info["modTime"], + "isDir": info["isDir"], + "rel_path": entry["rel_path"], + "uri": entry_uri, + } + ) result.append(new_entry) return result @@ -1281,13 +1287,15 @@ async def _tree_agent( ): info = entry["info"] is_dir = info["isDir"] - result.append({ - "uri": entry_uri, - "size": 0 if is_dir else info["size"], - "isDir": is_dir, - "modTime": format_simplified(parse_iso_datetime(info["modTime"]), now), - "rel_path": entry["rel_path"], - }) + result.append( + { + "uri": entry_uri, + "size": 0 if is_dir else info["size"], + "isDir": is_dir, + "modTime": format_simplified(parse_iso_datetime(info["modTime"]), now), + "rel_path": entry["rel_path"], + } + ) await self._batch_fetch_abstracts(result, abs_limit, ctx=ctx) diff --git a/tests/server/test_api_sessions.py b/tests/server/test_api_sessions.py index abba3de168..cb246c741a 100644 --- a/tests/server/test_api_sessions.py +++ b/tests/server/test_api_sessions.py @@ -54,22 +54,24 @@ def _message_request( def _configure_test_env(monkeypatch, tmp_path): config_path = tmp_path / "ov.conf" config_path.write_text( - json.dumps({ - "storage": { - "workspace": str(tmp_path / "workspace"), - "agfs": {"backend": "local"}, - "vectordb": {"backend": "local"}, - }, - "embedding": { - "dense": { - "provider": "openai", - "model": "test-embedder", - "api_base": "http://127.0.0.1:11434/v1", - "dimension": 1024, - } - }, - "encryption": {"enabled": False}, - }), + json.dumps( + { + "storage": { + "workspace": str(tmp_path / "workspace"), + "agfs": {"backend": "local"}, + "vectordb": {"backend": "local"}, + }, + "embedding": { + "dense": { + "provider": "openai", + "model": "test-embedder", + "api_base": "http://127.0.0.1:11434/v1", + "dimension": 1024, + } + }, + "encryption": {"enabled": False}, + } + ), encoding="utf-8", ) diff --git a/tests/session/test_session_context.py b/tests/session/test_session_context.py index 9f62daf172..dd30f9498b 100644 --- a/tests/session/test_session_context.py +++ b/tests/session/test_session_context.py @@ -54,22 +54,24 @@ async def _fake_get_vision_completion(self, prompt, images, thinking=False): def _write_test_config(tmp_path): config_path = tmp_path / "ov.conf" config_path.write_text( - json.dumps({ - "storage": { - "workspace": str(tmp_path / "workspace"), - "agfs": {"backend": "local"}, - "vectordb": {"backend": "local"}, - }, - "embedding": { - "dense": { - "provider": "openai", - "model": "test-embedder", - "api_base": "http://127.0.0.1:11434/v1", - "dimension": 1024, - } - }, - "encryption": {"enabled": False}, - }), + json.dumps( + { + "storage": { + "workspace": str(tmp_path / "workspace"), + "agfs": {"backend": "local"}, + "vectordb": {"backend": "local"}, + }, + "embedding": { + "dense": { + "provider": "openai", + "model": "test-embedder", + "api_base": "http://127.0.0.1:11434/v1", + "dimension": 1024, + } + }, + "encryption": {"enabled": False}, + } + ), encoding="utf-8", ) return config_path diff --git a/tests/test_prompt_manager.py b/tests/test_prompt_manager.py index a21d216439..33b169c2e3 100644 --- a/tests/test_prompt_manager.py +++ b/tests/test_prompt_manager.py @@ -23,40 +23,44 @@ def _write_template(templates_dir: Path, content: str) -> None: template_path = templates_dir / "memory" / "profile.yaml" template_path.parent.mkdir(parents=True, exist_ok=True) template_path.write_text( - json.dumps({ - "metadata": { - "id": "memory.profile", - "name": "Profile", - "description": "Test template", - "version": "1.0.0", - "language": "en", - "category": "memory", - }, - "template": content, - }), + json.dumps( + { + "metadata": { + "id": "memory.profile", + "name": "Profile", + "description": "Test template", + "version": "1.0.0", + "language": "en", + "category": "memory", + }, + "template": content, + } + ), encoding="utf-8", ) def _write_config(config_path: Path, templates_dir: Path) -> None: config_path.write_text( - json.dumps({ - "storage": { - "workspace": str(config_path.parent / "workspace"), - "agfs": {"backend": "local"}, - "vectordb": {"backend": "local"}, - }, - "embedding": { - "dense": { - "provider": "openai", - "model": "text-embedding-3-small", - "api_key": "test-key", - } - }, - "prompts": { - "templates_dir": str(templates_dir), - }, - }), + json.dumps( + { + "storage": { + "workspace": str(config_path.parent / "workspace"), + "agfs": {"backend": "local"}, + "vectordb": {"backend": "local"}, + }, + "embedding": { + "dense": { + "provider": "openai", + "model": "text-embedding-3-small", + "api_key": "test-key", + } + }, + "prompts": { + "templates_dir": str(templates_dir), + }, + } + ), encoding="utf-8", ) @@ -68,10 +72,12 @@ def teardown_function() -> None: def test_profile_memory_template_keeps_profile_minimal_and_migrates_preferences(): template_path = PromptManager._get_bundled_templates_dir() / "memory" / "profile.yaml" schema = yaml.safe_load(template_path.read_text(encoding="utf-8")) - text = "\n".join([ - schema["description"], - schema["fields"][0]["description"], - ]) + text = "\n".join( + [ + schema["description"], + schema["fields"][0]["description"], + ] + ) assert "identity summary" in text assert "5-8" in text @@ -88,11 +94,13 @@ def test_profile_memory_template_keeps_profile_minimal_and_migrates_preferences( def test_preferences_memory_template_limits_topics_and_splits_when_too_large(): template_path = PromptManager._get_bundled_templates_dir() / "memory" / "preferences.yaml" schema = yaml.safe_load(template_path.read_text(encoding="utf-8")) - text = "\n".join([ - schema["description"], - schema["fields"][1]["description"], - schema["fields"][2]["description"], - ]) + text = "\n".join( + [ + schema["description"], + schema["fields"][1]["description"], + schema["fields"][2]["description"], + ] + ) assert "Complete but minimal" in text assert "3-8" in text @@ -177,13 +185,15 @@ def test_memory_type_registry_loads_schemas_from_prompt_manager_resolved_templat memory_dir = resolved_templates_dir / "memory" memory_dir.mkdir(parents=True) (memory_dir / "custom.yaml").write_text( - json.dumps({ - "memory_type": "custom_memory", - "description": "custom schema from resolved prompt root", - "directory": "viking://user/{{ user_space }}/memories/custom", - "filename_template": "custom.md", - "fields": [], - }), + json.dumps( + { + "memory_type": "custom_memory", + "description": "custom schema from resolved prompt root", + "directory": "viking://user/{{ user_space }}/memories/custom", + "filename_template": "custom.md", + "fields": [], + } + ), encoding="utf-8", ) @@ -213,23 +223,27 @@ def test_memory_type_registry_prefers_custom_memory_dir_over_prompt_manager_temp resolved_memory_dir.mkdir(parents=True) custom_memory_dir.mkdir(parents=True) (resolved_memory_dir / "prompt_root.yaml").write_text( - json.dumps({ - "memory_type": "prompt_root_memory", - "description": "schema from prompt manager root", - "directory": "viking://user/{{ user_space }}/memories/prompt-root", - "filename_template": "prompt-root.md", - "fields": [], - }), + json.dumps( + { + "memory_type": "prompt_root_memory", + "description": "schema from prompt manager root", + "directory": "viking://user/{{ user_space }}/memories/prompt-root", + "filename_template": "prompt-root.md", + "fields": [], + } + ), encoding="utf-8", ) (custom_memory_dir / "custom.yaml").write_text( - json.dumps({ - "memory_type": "custom_memory", - "description": "schema from custom memory dir", - "directory": "viking://user/{{ user_space }}/memories/custom", - "filename_template": "custom.md", - "fields": [], - }), + json.dumps( + { + "memory_type": "custom_memory", + "description": "schema from custom memory dir", + "directory": "viking://user/{{ user_space }}/memories/custom", + "filename_template": "custom.md", + "fields": [], + } + ), encoding="utf-8", ) diff --git a/tests/utils/mock_agfs.py b/tests/utils/mock_agfs.py index b26f64ea0a..f73ad121f9 100644 --- a/tests/utils/mock_agfs.py +++ b/tests/utils/mock_agfs.py @@ -34,14 +34,16 @@ def ls(self, path, ctx=None, **kwargs): return [] res = [] for item in p.iterdir(): - res.append({ - "name": item.name, - "isDir": item.is_dir(), # Note: JS style camelCase for some APIs - "type": "directory" if item.is_dir() else "file", - "size": item.stat().st_size if item.is_file() else 0, - "mtime": item.stat().st_mtime, - "uri": f"viking://{path}/{item.name}".replace("//", "/"), - }) + res.append( + { + "name": item.name, + "isDir": item.is_dir(), # Note: JS style camelCase for some APIs + "type": "directory" if item.is_dir() else "file", + "size": item.stat().st_size if item.is_file() else 0, + "mtime": item.stat().st_mtime, + "uri": f"viking://{path}/{item.name}".replace("//", "/"), + } + ) return res def writeto(self, path, content, ctx=None, **kwargs):