diff --git a/Cargo.lock b/Cargo.lock index d2021e8..600a612 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5374,6 +5374,7 @@ dependencies = [ "lance-context-api", "lance-context-client", "lance-context-core", + "serde_json", ] [[package]] diff --git a/README.md b/README.md index 663675f..a87e3d3 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,31 @@ ctx.add_many([ }, ]) +# Deferred embeddings: raw-first capture, enrich later. +# +# Bulk ingestion often needs to persist source chunks immediately and compute +# embeddings asynchronously (large documents, rate-limited or remote embedding +# providers). Append the raw text first with a stable external_id, then have a +# worker patch in the embedding once it is ready. A record without an embedding +# is durably stored but excluded from vector search until it is enriched. +ctx.add_many([ + { + "role": "source", + "content": "Deferred chunk", + "external_id": "doc-77#chunk-1", + "metadata": {"embedding_status": "pending"}, + }, +]) + +# ...later, from your own worker/queue/batch job: +vector = [0.0] * 1536 +ctx.update( + external_id="doc-77#chunk-1", + embedding=vector, # attach the freshly computed vector + metadata={"embedding_status": "ready"}, +) +# The enriched record now shows up in vector search and hybrid retrieve. + # Time-travel to prior state first_version = ctx.version() ctx.add("assistant", "Let me fetch suggestions…") diff --git a/crates/lance-context-api/src/lib.rs b/crates/lance-context-api/src/lib.rs index bac85e7..e84eec5 100644 --- a/crates/lance-context-api/src/lib.rs +++ b/crates/lance-context-api/src/lib.rs @@ -65,6 +65,9 @@ pub trait ContextStoreApi { &self, limit: Option, offset: Option, + filters: Option, + include_expired: bool, + include_retired: bool, ) -> impl Future>> + Send; fn related( @@ -78,9 +81,7 @@ pub trait ContextStoreApi { fn search( &self, - query: &[f32], - limit: Option, - include_relationships: bool, + request: &SearchRequest, ) -> impl Future>> + Send; fn retrieve( @@ -242,6 +243,8 @@ pub struct RecordPatchDto { pub retired_at: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub retired_reason: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub embedding: Option>, } impl RecordPatchDto { @@ -257,6 +260,7 @@ impl RecordPatchDto { && self.lifecycle_status.is_none() && self.retired_at.is_none() && self.retired_reason.is_none() + && self.embedding.is_none() } } @@ -354,6 +358,12 @@ pub struct SearchRequest { pub query: Vec, #[serde(default = "default_search_limit")] pub limit: usize, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub filters: Option, + #[serde(default)] + pub include_expired: bool, + #[serde(default)] + pub include_retired: bool, #[serde(default)] pub include_relationships: bool, } @@ -517,3 +527,38 @@ where None => Ok(None), } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn search_request_legacy_payload_defaults_filters_and_lifecycle() { + // Clients written against the pre-#89 shape send only query/limit. + let req: SearchRequest = + serde_json::from_str(r#"{"query": [0.1, 0.2], "limit": 5}"#).unwrap(); + assert_eq!(req.query, vec![0.1, 0.2]); + assert_eq!(req.limit, 5); + assert!(req.filters.is_none()); + assert!(!req.include_expired); + assert!(!req.include_retired); + assert!(!req.include_relationships); + } + + #[test] + fn search_request_defaults_limit_when_omitted() { + let req: SearchRequest = serde_json::from_str(r#"{"query": [1.0]}"#).unwrap(); + assert_eq!(req.limit, default_search_limit()); + } + + #[test] + fn search_request_parses_filters_and_lifecycle() { + let req: SearchRequest = serde_json::from_str( + r#"{"query": [1.0], "filters": {"tenant": "acme"}, "include_expired": true, "include_retired": true}"#, + ) + .unwrap(); + assert_eq!(req.filters, Some(serde_json::json!({"tenant": "acme"}))); + assert!(req.include_expired); + assert!(req.include_retired); + } +} diff --git a/crates/lance-context-client/src/lib.rs b/crates/lance-context-client/src/lib.rs index f88f4d5..2a75496 100644 --- a/crates/lance-context-client/src/lib.rs +++ b/crates/lance-context-client/src/lib.rs @@ -129,10 +129,27 @@ impl ContextStoreApi for RemoteContextStore { &self, limit: Option, offset: Option, + filters: Option, + include_expired: bool, + include_retired: bool, ) -> ContextResult> { + let filters = filters + .as_ref() + .map(|value| { + serde_json::to_string(value) + .map_err(|err| ContextError::InvalidRequest(err.to_string())) + }) + .transpose()?; let resp = self .client - .list_records(&self.context_name, limit, offset) + .list_records( + &self.context_name, + limit, + offset, + filters.as_deref(), + include_expired, + include_retired, + ) .await .map_err(to_ctx_err)?; Ok(resp.records) @@ -161,20 +178,10 @@ impl ContextStoreApi for RemoteContextStore { Ok(resp.records) } - async fn search( - &self, - query: &[f32], - limit: Option, - include_relationships: bool, - ) -> ContextResult> { - let req = SearchRequest { - query: query.to_vec(), - limit: limit.unwrap_or(10), - include_relationships, - }; + async fn search(&self, request: &SearchRequest) -> ContextResult> { let resp = self .client - .search(&self.context_name, &req) + .search(&self.context_name, request) .await .map_err(to_ctx_err)?; Ok(resp.results) @@ -401,20 +408,30 @@ impl ContextClient { name: &str, limit: Option, offset: Option, + filters: Option<&str>, + include_expired: bool, + include_retired: bool, ) -> Result { - let mut url = self.url(&format!("/contexts/{}/records", name)); - let mut params = Vec::new(); - if let Some(l) = limit { - params.push(format!("limit={}", l)); + let mut request = self + .http + .get(self.url(&format!("/contexts/{}/records", name))); + if let Some(limit) = limit { + request = request.query(&[("limit", limit)]); } - if let Some(o) = offset { - params.push(format!("offset={}", o)); + if let Some(offset) = offset { + request = request.query(&[("offset", offset)]); } - if !params.is_empty() { - url = format!("{}?{}", url, params.join("&")); + if let Some(filters) = filters { + request = request.query(&[("filters", filters)]); + } + if include_expired { + request = request.query(&[("include_expired", include_expired)]); + } + if include_retired { + request = request.query(&[("include_retired", include_retired)]); } - let resp = self.http.get(&url).send().await?; + let resp = request.send().await?; Self::handle_response(resp).await } diff --git a/crates/lance-context-core/src/api_impl.rs b/crates/lance-context-core/src/api_impl.rs index a05ed14..23943e7 100644 --- a/crates/lance-context-core/src/api_impl.rs +++ b/crates/lance-context-core/src/api_impl.rs @@ -1,11 +1,13 @@ use chrono::Utc; +use serde_json::Value; use uuid::Uuid; use lance_context_api::{ AddRecordRequest, AddRecordsResponse, CompactRequest, CompactResponse, CompactStatsResponse, ContextError, ContextResult, ContextStoreApi, DeleteRecordResponse, RecordDto, RecordPatchDto, - RelationshipDto, RetrieveRequest, RetrieveResultDto, SearchResultDto, StateMetadataDto, - UpdateRecordRequest, UpdateRecordResponse, UpsertRecordRequest, UpsertRecordResponse, + RelationshipDto, RetrieveRequest, RetrieveResultDto, SearchRequest, SearchResultDto, + StateMetadataDto, UpdateRecordRequest, UpdateRecordResponse, UpsertRecordRequest, + UpsertRecordResponse, }; use crate::record::{ @@ -156,10 +158,24 @@ impl ContextStoreApi for ContextStore { &self, limit: Option, offset: Option, + filters: Option, + include_expired: bool, + include_retired: bool, ) -> ContextResult> { - let records = ContextStore::list(self, limit, offset) - .await - .map_err(to_ctx_err)?; + let filters = filters + .map(RecordFilters::from_json_value) + .transpose() + .map_err(ContextError::InvalidRequest)?; + let options = LifecycleQueryOptions::new(include_expired, include_retired); + let records = ContextStore::list_filtered_with_options( + self, + limit, + offset, + filters.as_ref(), + options, + ) + .await + .map_err(to_ctx_err)?; Ok(records.into_iter().map(record_to_dto).collect()) } @@ -179,19 +195,27 @@ impl ContextStoreApi for ContextStore { Ok(records.into_iter().map(record_to_dto).collect()) } - async fn search( - &self, - query: &[f32], - limit: Option, - include_relationships: bool, - ) -> ContextResult> { - let results = ContextStore::search(self, query, limit) - .await - .map_err(to_ctx_err)?; + async fn search(&self, request: &SearchRequest) -> ContextResult> { + let filters = request + .filters + .clone() + .map(RecordFilters::from_json_value) + .transpose() + .map_err(ContextError::InvalidRequest)?; + let options = LifecycleQueryOptions::new(request.include_expired, request.include_retired); + let results = ContextStore::search_filtered_with_options( + self, + &request.query, + Some(request.limit), + filters.as_ref(), + options, + ) + .await + .map_err(to_ctx_err)?; Ok(results .into_iter() .map(|mut sr| { - if !include_relationships { + if !request.include_relationships { sr.record.relationships.clear(); } SearchResultDto { @@ -330,6 +354,7 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch { lifecycle_status: patch.lifecycle_status.clone(), retired_at: patch.retired_at, retired_reason: patch.retired_reason.clone(), + embedding: patch.embedding.clone(), } } diff --git a/crates/lance-context-core/src/record.rs b/crates/lance-context-core/src/record.rs index abfa3c2..71306d6 100644 --- a/crates/lance-context-core/src/record.rs +++ b/crates/lance-context-core/src/record.rs @@ -165,6 +165,9 @@ pub struct RecordPatch { pub lifecycle_status: Option, pub retired_at: Option>, pub retired_reason: Option, + /// Vector embedding to attach to the record. Enables deferred embedding + /// workflows: append raw text first, then enrich with an embedding later. + pub embedding: Option>, } impl RecordPatch { @@ -180,6 +183,7 @@ impl RecordPatch { && self.lifecycle_status.is_none() && self.retired_at.is_none() && self.retired_reason.is_none() + && self.embedding.is_none() } } diff --git a/crates/lance-context-core/src/store.rs b/crates/lance-context-core/src/store.rs index 8ddfd37..693bf98 100644 --- a/crates/lance-context-core/src/store.rs +++ b/crates/lance-context-core/src/store.rs @@ -619,6 +619,9 @@ impl ContextStore { if let Some(retired_reason) = patch.retired_reason { record.retired_reason = Some(retired_reason); } + if let Some(embedding) = patch.embedding { + record.embedding = Some(embedding); + } self.validate_new_record_id(&record).await?; let version = self.write_entries(std::slice::from_ref(&record)).await?; @@ -3059,6 +3062,67 @@ mod tests { }); } + #[test] + fn deferred_embedding_patch_makes_raw_record_searchable() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + + // Raw-first capture: append source chunks without embeddings. + let mut by_ext = text_record("raw-ext", 0.0); + by_ext.embedding = None; + by_ext.external_id = Some("doc-1#chunk-1".to_string()); + let mut by_id = text_record("raw-id", 0.0); + by_id.embedding = None; + by_id.external_id = None; + store.add(&[by_ext.clone(), by_id.clone()]).await.unwrap(); + + // Records without an embedding are invisible to vector search. + let query = make_embedding(1.0); + assert!(store.search(&query, Some(10)).await.unwrap().is_empty()); + + // Enrich-later: patch the embedding by external_id... + let enriched_ext = store + .update_by_external_id( + "doc-1#chunk-1", + RecordPatch { + embedding: Some(make_embedding(1.0)), + ..Default::default() + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(enriched_ext.record.embedding, Some(make_embedding(1.0))); + // Raw payload is carried forward onto the superseding record. + assert_eq!(enriched_ext.record.text_payload, by_ext.text_payload); + + // ...and by internal id. + let enriched_id = store + .update_by_id( + &by_id.id, + RecordPatch { + embedding: Some(make_embedding(0.0)), + ..Default::default() + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(enriched_id.record.embedding, Some(make_embedding(0.0))); + + // Both records now participate in vector search. + let results = store.search(&query, Some(10)).await.unwrap(); + let ids: Vec<&str> = results.iter().map(|r| r.record.id.as_str()).collect(); + assert!(ids.contains(&enriched_ext.record.id.as_str())); + assert!(ids.contains(&enriched_id.record.id.as_str())); + // The query matches the external_id record exactly (distance 0). + assert_eq!(results[0].record.id, enriched_ext.record.id); + }); + } + #[test] fn relationships_roundtrip_and_support_related_lookup() { let dir = TempDir::new().unwrap(); diff --git a/crates/lance-context-server/src/routes/records.rs b/crates/lance-context-server/src/routes/records.rs index fcd8344..8fcc616 100644 --- a/crates/lance-context-server/src/routes/records.rs +++ b/crates/lance-context-server/src/routes/records.rs @@ -9,7 +9,7 @@ use lance_context_api::{ UpdateRecordRequest, UpdateRecordResponse, UpsertRecordRequest, UpsertRecordResponse, }; use lance_context_core::{ - ContextRecord, LifecycleQueryOptions, RecordPatch, Relationship, StateMetadata, + ContextRecord, LifecycleQueryOptions, RecordFilters, RecordPatch, Relationship, StateMetadata, LIFECYCLE_ACTIVE, }; use uuid::Uuid; @@ -255,10 +255,16 @@ pub async fn delete_record_by_external_id( Ok(Json(DeleteRecordResponse { deleted, version })) } -#[derive(serde::Deserialize)] +#[derive(Debug, Default, serde::Deserialize)] pub struct ListParams { pub limit: Option, pub offset: Option, + /// JSON object encoding `RecordFilters`, URL-encoded into the query string. + pub filters: Option, + #[serde(default)] + pub include_expired: bool, + #[serde(default)] + pub include_retired: bool, } pub async fn list_records( @@ -266,6 +272,18 @@ pub async fn list_records( Path(name): Path, Query(params): Query, ) -> Result, AppError> { + let filters = params + .filters + .as_deref() + .map(|raw| { + serde_json::from_str(raw) + .map_err(|err| AppError::InvalidRequest(format!("invalid filters JSON: {err}"))) + .and_then(|value| { + RecordFilters::from_json_value(value).map_err(AppError::InvalidRequest) + }) + }) + .transpose()?; + let stores = state.stores.read().await; let store_lock = stores .get(&name) @@ -275,7 +293,12 @@ pub async fn list_records( let store = store_lock.read().await; let records = store - .list(params.limit, params.offset) + .list_filtered_with_options( + params.limit, + params.offset, + filters.as_ref(), + LifecycleQueryOptions::new(params.include_expired, params.include_retired), + ) .await .map_err(AppError::from_lance)?; @@ -397,6 +420,7 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch { lifecycle_status: patch.lifecycle_status.clone(), retired_at: patch.retired_at, retired_reason: patch.retired_reason.clone(), + embedding: patch.embedding.clone(), } } @@ -447,6 +471,7 @@ mod tests { use axum::extract::{Path, Query, State}; use axum::Json; + use chrono::{Duration, Utc}; use lance_context_api::{ AddRecordRequest, AddRecordsRequest, RecordPatchDto, UpdateRecordRequest, UpsertRecordRequest, @@ -622,6 +647,7 @@ mod tests { Query(ListParams { limit: None, offset: None, + ..Default::default() }), ) .await @@ -688,6 +714,7 @@ mod tests { Query(ListParams { limit: None, offset: None, + ..Default::default() }), ) .await @@ -738,4 +765,179 @@ mod tests { ); assert_eq!(response.records[0].relationships.len(), 1); } + + async fn list_with( + state: &Arc, + context_name: &str, + params: ListParams, + ) -> Vec { + let Json(response) = list_records( + State(state.clone()), + Path(context_name.to_string()), + Query(params), + ) + .await + .unwrap(); + response.records + } + + #[tokio::test] + async fn list_filters_by_metadata_and_builtin_fields() { + let context_name = "ctx"; + let (state, _dir) = test_state(context_name).await; + + let mut alpha = text_record("alpha"); + alpha.metadata = Some(serde_json::json!({"tenant": "acme"})); + let mut bravo = text_record("bravo"); + bravo.role = "assistant".to_string(); + bravo.metadata = Some(serde_json::json!({"tenant": "globex"})); + let mut charlie = text_record("charlie"); + charlie.metadata = Some(serde_json::json!({"tenant": "acme"})); + let _ = add_records( + State(state.clone()), + Path(context_name.to_string()), + Json(AddRecordsRequest { + records: vec![alpha, bravo, charlie], + }), + ) + .await + .unwrap(); + + // Metadata filter restricts to tenant=acme (alpha + charlie). + let records = list_with( + &state, + context_name, + ListParams { + filters: Some(r#"{"tenant": "acme"}"#.to_string()), + ..Default::default() + }, + ) + .await; + let texts: Vec<&str> = records + .iter() + .filter_map(|r| r.text_payload.as_deref()) + .collect(); + assert_eq!(records.len(), 2); + assert!(texts.contains(&"alpha")); + assert!(texts.contains(&"charlie")); + + // Built-in field filter restricts to role=assistant (bravo). + let records = list_with( + &state, + context_name, + ListParams { + filters: Some(r#"{"role": "assistant"}"#.to_string()), + ..Default::default() + }, + ) + .await; + assert_eq!(records.len(), 1); + assert_eq!(records[0].text_payload.as_deref(), Some("bravo")); + } + + #[tokio::test] + async fn list_respects_expired_visibility() { + let context_name = "ctx"; + let (state, _dir) = test_state(context_name).await; + + let fresh = text_record("fresh"); + let mut stale = text_record("stale"); + stale.expires_at = Some(Utc::now() - Duration::hours(1)); + let _ = add_records( + State(state.clone()), + Path(context_name.to_string()), + Json(AddRecordsRequest { + records: vec![fresh, stale], + }), + ) + .await + .unwrap(); + + // Default listing hides the expired record. + let records = list_with(&state, context_name, ListParams::default()).await; + assert_eq!(records.len(), 1); + assert_eq!(records[0].text_payload.as_deref(), Some("fresh")); + + // include_expired surfaces it. + let records = list_with( + &state, + context_name, + ListParams { + include_expired: true, + ..Default::default() + }, + ) + .await; + assert_eq!(records.len(), 2); + } + + #[tokio::test] + async fn list_respects_retired_visibility() { + let context_name = "ctx"; + let (state, _dir) = test_state(context_name).await; + + let mut original = text_record("v1"); + original.external_id = Some("doc-1".to_string()); + let (_, Json(add_response)) = add_records( + State(state.clone()), + Path(context_name.to_string()), + Json(AddRecordsRequest { + records: vec![original], + }), + ) + .await + .unwrap(); + let old_id = add_response.ids[0].clone(); + + let Json(updated) = update_record( + State(state.clone()), + Path(context_name.to_string()), + Json(UpdateRecordRequest { + id: None, + external_id: Some("doc-1".to_string()), + patch: RecordPatchDto { + metadata: Some(serde_json::json!({"revision": 2})), + ..Default::default() + }, + }), + ) + .await + .unwrap(); + assert!(updated.updated); + + // Default listing returns only the visible successor. + let records = list_with(&state, context_name, ListParams::default()).await; + assert_eq!(records.len(), 1); + assert_ne!(records[0].id, old_id); + + // include_retired surfaces the superseded original too. + let records = list_with( + &state, + context_name, + ListParams { + include_retired: true, + ..Default::default() + }, + ) + .await; + assert_eq!(records.len(), 2); + assert!(records.iter().any(|r| r.id == old_id)); + } + + #[tokio::test] + async fn list_rejects_invalid_filters_json() { + let context_name = "ctx"; + let (state, _dir) = test_state(context_name).await; + + let result = list_records( + State(state), + Path(context_name.to_string()), + Query(ListParams { + filters: Some("not json".to_string()), + ..Default::default() + }), + ) + .await; + assert!(matches!(result, Err(AppError::InvalidRequest(_)))); + } } diff --git a/crates/lance-context-server/src/routes/search.rs b/crates/lance-context-server/src/routes/search.rs index 6cae04f..a4f2d5d 100644 --- a/crates/lance-context-server/src/routes/search.rs +++ b/crates/lance-context-server/src/routes/search.rs @@ -17,6 +17,13 @@ pub async fn search( Path(name): Path, Json(req): Json, ) -> Result, AppError> { + let filters = req + .filters + .clone() + .map(RecordFilters::from_json_value) + .transpose() + .map_err(AppError::InvalidRequest)?; + let stores = state.stores.read().await; let store_lock = stores .get(&name) @@ -26,7 +33,12 @@ pub async fn search( let store = store_lock.read().await; let results = store - .search(&req.query, Some(req.limit)) + .search_filtered_with_options( + &req.query, + Some(req.limit), + filters.as_ref(), + LifecycleQueryOptions::new(req.include_expired, req.include_retired), + ) .await .map_err(AppError::from_lance)?; @@ -101,3 +113,206 @@ pub async fn retrieve( Ok(Json(RetrieveResponse { results: dtos })) } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use axum::extract::{Path, State}; + use axum::Json; + use chrono::{Duration, Utc}; + use lance_context_api::{ + AddRecordRequest, AddRecordsRequest, RecordPatchDto, RelationshipDto, UpdateRecordRequest, + }; + use lance_context_core::{ContextStore, ContextStoreOptions}; + use tempfile::TempDir; + use tokio::sync::RwLock; + + use super::*; + use crate::routes::records::{add_records, update_record}; + use crate::state::AppState; + + const CTX: &str = "ctx"; + + async fn test_state() -> (Arc, TempDir) { + let dir = TempDir::new().unwrap(); + let uri = dir + .path() + .join(format!("{CTX}.lance")) + .to_string_lossy() + .to_string(); + let store = ContextStore::open_with_options( + &uri, + ContextStoreOptions { + embedding_dim: Some(3), + ..Default::default() + }, + ) + .await + .unwrap(); + let mut stores = HashMap::new(); + stores.insert(CTX.to_string(), Arc::new(RwLock::new(store))); + let state = Arc::new(AppState { + stores: RwLock::new(stores), + base_path: dir.path().to_path_buf(), + }); + (state, dir) + } + + fn embedded_record(text: &str, embedding: [f32; 3]) -> AddRecordRequest { + AddRecordRequest { + role: "user".to_string(), + content_type: "text/plain".to_string(), + text_payload: Some(text.to_string()), + embedding: Some(embedding.to_vec()), + ..Default::default() + } + } + + async fn add(state: &Arc, records: Vec) -> Vec { + let (_, Json(resp)) = add_records( + State(state.clone()), + Path(CTX.to_string()), + Json(AddRecordsRequest { records }), + ) + .await + .unwrap(); + resp.ids + } + + async fn run_search(state: &Arc, req: SearchRequest) -> Vec { + let Json(resp) = search(State(state.clone()), Path(CTX.to_string()), Json(req)) + .await + .unwrap(); + resp.results + } + + fn search_for(query: [f32; 3]) -> SearchRequest { + SearchRequest { + query: query.to_vec(), + limit: 10, + filters: None, + include_expired: false, + include_retired: false, + include_relationships: false, + } + } + + #[tokio::test] + async fn search_filters_by_metadata_and_builtin_fields() { + let (state, _dir) = test_state().await; + + let mut alpha = embedded_record("alpha", [1.0, 0.0, 0.0]); + alpha.metadata = Some(serde_json::json!({"tenant": "acme"})); + let mut bravo = embedded_record("bravo", [0.0, 1.0, 0.0]); + bravo.role = "assistant".to_string(); + bravo.metadata = Some(serde_json::json!({"tenant": "globex"})); + let mut charlie = embedded_record("charlie", [0.0, 0.0, 1.0]); + charlie.metadata = Some(serde_json::json!({"tenant": "acme"})); + add(&state, vec![alpha, bravo, charlie]).await; + + // Metadata filter restricts to tenant=acme (alpha + charlie). + let mut req = search_for([1.0, 0.0, 0.0]); + req.filters = Some(serde_json::json!({"tenant": "acme"})); + let results = run_search(&state, req).await; + let texts: Vec<&str> = results + .iter() + .filter_map(|r| r.record.text_payload.as_deref()) + .collect(); + assert_eq!(results.len(), 2); + assert!(texts.contains(&"alpha")); + assert!(texts.contains(&"charlie")); + + // Built-in field filter restricts to role=assistant (bravo). + let mut req = search_for([0.0, 1.0, 0.0]); + req.filters = Some(serde_json::json!({"role": "assistant"})); + let results = run_search(&state, req).await; + assert_eq!(results.len(), 1); + assert_eq!(results[0].record.text_payload.as_deref(), Some("bravo")); + } + + #[tokio::test] + async fn search_respects_expired_visibility() { + let (state, _dir) = test_state().await; + + let fresh = embedded_record("fresh", [1.0, 0.0, 0.0]); + let mut stale = embedded_record("stale", [1.0, 0.0, 0.0]); + stale.expires_at = Some(Utc::now() - Duration::hours(1)); + add(&state, vec![fresh, stale]).await; + + // Default search hides the expired record. + let results = run_search(&state, search_for([1.0, 0.0, 0.0])).await; + assert_eq!(results.len(), 1); + assert_eq!(results[0].record.text_payload.as_deref(), Some("fresh")); + + // include_expired surfaces it. + let mut req = search_for([1.0, 0.0, 0.0]); + req.include_expired = true; + let results = run_search(&state, req).await; + assert_eq!(results.len(), 2); + } + + #[tokio::test] + async fn search_respects_retired_visibility() { + let (state, _dir) = test_state().await; + + let mut original = embedded_record("v1", [1.0, 0.0, 0.0]); + original.external_id = Some("doc-1".to_string()); + let ids = add(&state, vec![original]).await; + let old_id = ids[0].clone(); + + // Updating supersedes the original; the successor keeps the embedding. + let Json(updated) = update_record( + State(state.clone()), + Path(CTX.to_string()), + Json(UpdateRecordRequest { + id: None, + external_id: Some("doc-1".to_string()), + patch: RecordPatchDto { + metadata: Some(serde_json::json!({"revision": 2})), + ..Default::default() + }, + }), + ) + .await + .unwrap(); + assert!(updated.updated); + + // Default search returns only the visible successor. + let results = run_search(&state, search_for([1.0, 0.0, 0.0])).await; + assert_eq!(results.len(), 1); + assert_ne!(results[0].record.id, old_id); + + // include_retired surfaces the superseded original too. + let mut req = search_for([1.0, 0.0, 0.0]); + req.include_retired = true; + let results = run_search(&state, req).await; + assert_eq!(results.len(), 2); + assert!(results.iter().any(|r| r.record.id == old_id)); + } + + #[tokio::test] + async fn search_include_relationships_toggles_relationship_payload() { + let (state, _dir) = test_state().await; + + let mut record = embedded_record("cites runbook", [1.0, 0.0, 0.0]); + record.relationships = vec![RelationshipDto { + target_id: "doc://runbook".to_string(), + relation: "cites".to_string(), + weight: Some(0.5), + }]; + add(&state, vec![record]).await; + + // Default omits relationships. + let results = run_search(&state, search_for([1.0, 0.0, 0.0])).await; + assert_eq!(results.len(), 1); + assert!(results[0].record.relationships.is_empty()); + + // include_relationships returns them. + let mut req = search_for([1.0, 0.0, 0.0]); + req.include_relationships = true; + let results = run_search(&state, req).await; + assert_eq!(results[0].record.relationships.len(), 1); + } +} diff --git a/crates/lance-context/Cargo.toml b/crates/lance-context/Cargo.toml index 65ef6a9..446d3a1 100644 --- a/crates/lance-context/Cargo.toml +++ b/crates/lance-context/Cargo.toml @@ -16,3 +16,4 @@ remote = ["lance-context-client"] lance-context-core = { version = "0.4.0", path = "../lance-context-core" } lance-context-api = { version = "0.4.0", path = "../lance-context-api" } lance-context-client = { version = "0.4.0", path = "../lance-context-client", optional = true } +serde_json = "1" diff --git a/crates/lance-context/src/unified.rs b/crates/lance-context/src/unified.rs index 454faff..43af986 100644 --- a/crates/lance-context/src/unified.rs +++ b/crates/lance-context/src/unified.rs @@ -3,7 +3,7 @@ use std::collections::HashSet; use lance_context_api::{ AddRecordRequest, AddRecordsResponse, CompactRequest, CompactResponse, CompactStatsResponse, ContextError, ContextResult, ContextStoreApi, DeleteRecordResponse, RecordDto, RetrieveRequest, - RetrieveResultDto, SearchResultDto, UpdateRecordRequest, UpdateRecordResponse, + RetrieveResultDto, SearchRequest, SearchResultDto, UpdateRecordRequest, UpdateRecordResponse, UpsertRecordRequest, UpsertRecordResponse, }; use lance_context_core::{ @@ -159,8 +159,19 @@ impl ContextStoreApi for ContextStore { &self, limit: Option, offset: Option, + filters: Option, + include_expired: bool, + include_retired: bool, ) -> ContextResult> { - dispatch_ref!(self, list, limit, offset) + dispatch_ref!( + self, + list, + limit, + offset, + filters, + include_expired, + include_retired + ) } async fn related( @@ -182,13 +193,8 @@ impl ContextStoreApi for ContextStore { ) } - async fn search( - &self, - query: &[f32], - limit: Option, - include_relationships: bool, - ) -> ContextResult> { - dispatch_ref!(self, search, query, limit, include_relationships) + async fn search(&self, request: &SearchRequest) -> ContextResult> { + dispatch_ref!(self, search, request) } async fn retrieve(&self, request: &RetrieveRequest) -> ContextResult> { diff --git a/python/python/lance_context/api.py b/python/python/lance_context/api.py index 7f0e1c0..3c1dae1 100644 --- a/python/python/lance_context/api.py +++ b/python/python/lance_context/api.py @@ -533,8 +533,14 @@ def update( lifecycle_status: str | None = None, retired_at: datetime | str | None = None, retired_reason: str | None = None, + embedding: list[float] | None = None, ) -> dict[str, Any]: - """Patch mutable fields on a visible record by id or external_id.""" + """Patch mutable fields on a visible record by id or external_id. + + Pass ``embedding`` to attach or replace a record's vector after it was + appended without one (deferred / enrich-later ingestion). The updated + record participates in vector search once the embedding is set. + """ if (id is None) == (external_id is None): raise ValueError("Specify exactly one of id or external_id") if ( @@ -547,6 +553,7 @@ def update( and lifecycle_status is None and retired_at is None and retired_reason is None + and embedding is None ): raise ValueError("update requires at least one patch field") @@ -562,6 +569,7 @@ def update( lifecycle_status, _coerce_timestamp(retired_at, field_name="retired_at"), retired_reason, + embedding, ) record = result.get("record") return { diff --git a/python/src/lib.rs b/python/src/lib.rs index d0b4c26..fd68108 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -375,7 +375,7 @@ impl Context { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None))] + #[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, embedding = None))] fn update( &mut self, py: Python<'_>, @@ -390,6 +390,7 @@ impl Context { lifecycle_status: Option, retired_at: Option, retired_reason: Option, + embedding: Option>, ) -> PyResult { let patch = RecordPatch { bot_id, @@ -402,6 +403,7 @@ impl Context { lifecycle_status, retired_at: parse_optional_datetime(retired_at, "retired_at")?, retired_reason, + embedding, }; if patch.is_empty() { return Err(PyRuntimeError::new_err( diff --git a/python/tests/test_deferred_embedding.py b/python/tests/test_deferred_embedding.py new file mode 100644 index 0000000..e9709a1 --- /dev/null +++ b/python/tests/test_deferred_embedding.py @@ -0,0 +1,106 @@ +"""End-to-end tests for deferred embedding workflows (issue #88). + +Raw-first ingestion: append source chunks without embeddings, then enrich each +record with an embedding later via ``update()``. The enriched record must then +participate in vector search. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pathlib import Path + +from lance_context.api import Context + + +def _embedding(value: float) -> list[float]: + vector = [0.0] * 1536 + vector[0] = value + return vector + + +def test_update_attaches_embedding_by_external_id(tmp_path: Path) -> None: + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + # Raw-first capture: persist the source chunk immediately, no embedding yet. + ctx.add( + "user", + "raw source chunk", + external_id="doc-1#chunk-1", + metadata={"embedding_status": "pending"}, + ) + + raw = ctx.get(external_id="doc-1#chunk-1") + assert raw is not None + assert raw["embedding"] is None + + # Without an embedding the record is invisible to vector search. + assert ctx.search(_embedding(1.0), limit=10) == [] + + # Enrich-later: a worker computes the embedding and patches it in. + result = ctx.update( + external_id="doc-1#chunk-1", + embedding=_embedding(1.0), + metadata={"embedding_status": "ready"}, + ) + assert result["updated"] is True + assert result["record"]["embedding"] == _embedding(1.0) + # Raw payload is preserved across the enrich update. + assert result["record"]["text"] == "raw source chunk" + + # The enriched record now participates in vector search. + hits = ctx.search(_embedding(1.0), limit=10) + assert [hit["external_id"] for hit in hits] == ["doc-1#chunk-1"] + + +def test_update_attaches_embedding_by_id(tmp_path: Path) -> None: + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + ctx.add("user", "raw source chunk") + raw = ctx.list()[0] + assert raw["embedding"] is None + + result = ctx.update(id=raw["id"], embedding=_embedding(0.0)) + assert result["updated"] is True + assert result["record"]["embedding"] == _embedding(0.0) + + hits = ctx.search(_embedding(0.0), limit=10) + assert len(hits) == 1 + assert hits[0]["id"] == result["record"]["id"] + + +def test_embedding_only_is_a_valid_patch(tmp_path: Path) -> None: + """An embedding-only patch must be accepted (no other field required).""" + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + ctx.add("user", "raw source chunk", external_id="doc-2#chunk-1") + result = ctx.update(external_id="doc-2#chunk-1", embedding=_embedding(1.0)) + assert result["updated"] is True + assert result["record"]["embedding"] == _embedding(1.0) + + +def test_bulk_raw_first_then_enrich(tmp_path: Path) -> None: + """add_many() raw chunks, then enrich each by external_id.""" + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + ctx.add_many( + [ + {"role": "user", "content": "chunk a", "external_id": "doc-3#chunk-1"}, + {"role": "user", "content": "chunk b", "external_id": "doc-3#chunk-2"}, + ] + ) + assert ctx.search(_embedding(1.0), limit=10) == [] + + for ext_id, pivot in (("doc-3#chunk-1", 0.0), ("doc-3#chunk-2", 1.0)): + ctx.update(external_id=ext_id, embedding=_embedding(pivot)) + + hits = ctx.search(_embedding(1.0), limit=10) + assert {hit["external_id"] for hit in hits} == {"doc-3#chunk-1", "doc-3#chunk-2"} + # The exact match ranks first. + assert hits[0]["external_id"] == "doc-3#chunk-2" diff --git a/python/tests/test_search.py b/python/tests/test_search.py index 9ce7c90..a02ab87 100644 --- a/python/tests/test_search.py +++ b/python/tests/test_search.py @@ -177,6 +177,7 @@ def update( lifecycle_status: str | None, retired_at: str | None, retired_reason: str | None, + embedding: list[float] | None = None, ): self.update_calls.append( { @@ -191,6 +192,7 @@ def update( "lifecycle_status": lifecycle_status, "retired_at": retired_at, "retired_reason": retired_reason, + "embedding": embedding, } ) if id == "missing" or external_id == "missing": @@ -1159,6 +1161,7 @@ def test_context_update_returns_operation_metadata_and_record(): "lifecycle_status": "active", "retired_at": None, "retired_reason": None, + "embedding": None, } ] assert result["updated"] is True @@ -1173,6 +1176,28 @@ def test_context_update_returns_operation_metadata_and_record(): assert result["record"]["supersedes_id"] == "old-id" +def test_context_update_forwards_embedding(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + ctx.update(external_id="source-1", embedding=[0.1, 0.2, 0.3]) + + assert dummy.update_calls[0]["embedding"] == [0.1, 0.2, 0.3] + + +def test_context_update_accepts_embedding_only_patch(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + # An embedding is sufficient on its own; no "at least one patch field" error. + result = ctx.update(id="rec-1", embedding=[0.1, 0.2]) + + assert dummy.update_calls[0]["embedding"] == [0.1, 0.2] + assert result["updated"] is True + + def test_context_update_missing_record_returns_not_updated(): ctx = Context.__new__(Context) dummy = DummyInner()