Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,31 @@ ctx.add_many([
},
])

# Deferred embeddings: raw-first capture, enrich later.
#
# Bulk ingestion often needs to persist source chunks immediately and compute
# embeddings asynchronously (large documents, rate-limited or remote embedding
# providers). Append the raw text first with a stable external_id, then have a
# worker patch in the embedding once it is ready. A record without an embedding
# is durably stored but excluded from vector search until it is enriched.
ctx.add_many([
{
"role": "source",
"content": "Deferred chunk",
"external_id": "doc-77#chunk-1",
"metadata": {"embedding_status": "pending"},
},
])

# ...later, from your own worker/queue/batch job:
vector = [0.0] * 1536
ctx.update(
external_id="doc-77#chunk-1",
embedding=vector, # attach the freshly computed vector
metadata={"embedding_status": "ready"},
)
# The enriched record now shows up in vector search and hybrid retrieve.

# Time-travel to prior state
first_version = ctx.version()
ctx.add("assistant", "Let me fetch suggestions…")
Expand Down
51 changes: 48 additions & 3 deletions crates/lance-context-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ pub trait ContextStoreApi {
&self,
limit: Option<usize>,
offset: Option<usize>,
filters: Option<Value>,
include_expired: bool,
include_retired: bool,
) -> impl Future<Output = ContextResult<Vec<RecordDto>>> + Send;

fn related(
Expand All @@ -78,9 +81,7 @@ pub trait ContextStoreApi {

fn search(
&self,
query: &[f32],
limit: Option<usize>,
include_relationships: bool,
request: &SearchRequest,
) -> impl Future<Output = ContextResult<Vec<SearchResultDto>>> + Send;

fn retrieve(
Expand Down Expand Up @@ -242,6 +243,8 @@ pub struct RecordPatchDto {
pub retired_at: Option<DateTime<Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub retired_reason: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub embedding: Option<Vec<f32>>,
}

impl RecordPatchDto {
Expand All @@ -257,6 +260,7 @@ impl RecordPatchDto {
&& self.lifecycle_status.is_none()
&& self.retired_at.is_none()
&& self.retired_reason.is_none()
&& self.embedding.is_none()
}
}

Expand Down Expand Up @@ -354,6 +358,12 @@ pub struct SearchRequest {
pub query: Vec<f32>,
#[serde(default = "default_search_limit")]
pub limit: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub filters: Option<Value>,
#[serde(default)]
pub include_expired: bool,
#[serde(default)]
pub include_retired: bool,
#[serde(default)]
pub include_relationships: bool,
}
Expand Down Expand Up @@ -517,3 +527,38 @@ where
None => Ok(None),
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn search_request_legacy_payload_defaults_filters_and_lifecycle() {
// Clients written against the pre-#89 shape send only query/limit.
let req: SearchRequest =
serde_json::from_str(r#"{"query": [0.1, 0.2], "limit": 5}"#).unwrap();
assert_eq!(req.query, vec![0.1, 0.2]);
assert_eq!(req.limit, 5);
assert!(req.filters.is_none());
assert!(!req.include_expired);
assert!(!req.include_retired);
assert!(!req.include_relationships);
}

#[test]
fn search_request_defaults_limit_when_omitted() {
let req: SearchRequest = serde_json::from_str(r#"{"query": [1.0]}"#).unwrap();
assert_eq!(req.limit, default_search_limit());
}

#[test]
fn search_request_parses_filters_and_lifecycle() {
let req: SearchRequest = serde_json::from_str(
r#"{"query": [1.0], "filters": {"tenant": "acme"}, "include_expired": true, "include_retired": true}"#,
)
.unwrap();
assert_eq!(req.filters, Some(serde_json::json!({"tenant": "acme"})));
assert!(req.include_expired);
assert!(req.include_retired);
}
}
61 changes: 39 additions & 22 deletions crates/lance-context-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,27 @@ impl ContextStoreApi for RemoteContextStore {
&self,
limit: Option<usize>,
offset: Option<usize>,
filters: Option<serde_json::Value>,
include_expired: bool,
include_retired: bool,
) -> ContextResult<Vec<RecordDto>> {
let filters = filters
.as_ref()
.map(|value| {
serde_json::to_string(value)
.map_err(|err| ContextError::InvalidRequest(err.to_string()))
})
.transpose()?;
let resp = self
.client
.list_records(&self.context_name, limit, offset)
.list_records(
&self.context_name,
limit,
offset,
filters.as_deref(),
include_expired,
include_retired,
)
.await
.map_err(to_ctx_err)?;
Ok(resp.records)
Expand Down Expand Up @@ -161,20 +178,10 @@ impl ContextStoreApi for RemoteContextStore {
Ok(resp.records)
}

async fn search(
&self,
query: &[f32],
limit: Option<usize>,
include_relationships: bool,
) -> ContextResult<Vec<SearchResultDto>> {
let req = SearchRequest {
query: query.to_vec(),
limit: limit.unwrap_or(10),
include_relationships,
};
async fn search(&self, request: &SearchRequest) -> ContextResult<Vec<SearchResultDto>> {
let resp = self
.client
.search(&self.context_name, &req)
.search(&self.context_name, request)
.await
.map_err(to_ctx_err)?;
Ok(resp.results)
Expand Down Expand Up @@ -401,20 +408,30 @@ impl ContextClient {
name: &str,
limit: Option<usize>,
offset: Option<usize>,
filters: Option<&str>,
include_expired: bool,
include_retired: bool,
) -> Result<ListRecordsResponse, ClientError> {
let mut url = self.url(&format!("/contexts/{}/records", name));
let mut params = Vec::new();
if let Some(l) = limit {
params.push(format!("limit={}", l));
let mut request = self
.http
.get(self.url(&format!("/contexts/{}/records", name)));
if let Some(limit) = limit {
request = request.query(&[("limit", limit)]);
}
if let Some(o) = offset {
params.push(format!("offset={}", o));
if let Some(offset) = offset {
request = request.query(&[("offset", offset)]);
}
if !params.is_empty() {
url = format!("{}?{}", url, params.join("&"));
if let Some(filters) = filters {
request = request.query(&[("filters", filters)]);
}
if include_expired {
request = request.query(&[("include_expired", include_expired)]);
}
if include_retired {
request = request.query(&[("include_retired", include_retired)]);
}

let resp = self.http.get(&url).send().await?;
let resp = request.send().await?;
Self::handle_response(resp).await
}

Expand Down
55 changes: 40 additions & 15 deletions crates/lance-context-core/src/api_impl.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use chrono::Utc;
use serde_json::Value;
use uuid::Uuid;

use lance_context_api::{
AddRecordRequest, AddRecordsResponse, CompactRequest, CompactResponse, CompactStatsResponse,
ContextError, ContextResult, ContextStoreApi, DeleteRecordResponse, RecordDto, RecordPatchDto,
RelationshipDto, RetrieveRequest, RetrieveResultDto, SearchResultDto, StateMetadataDto,
UpdateRecordRequest, UpdateRecordResponse, UpsertRecordRequest, UpsertRecordResponse,
RelationshipDto, RetrieveRequest, RetrieveResultDto, SearchRequest, SearchResultDto,
StateMetadataDto, UpdateRecordRequest, UpdateRecordResponse, UpsertRecordRequest,
UpsertRecordResponse,
};

use crate::record::{
Expand Down Expand Up @@ -156,10 +158,24 @@ impl ContextStoreApi for ContextStore {
&self,
limit: Option<usize>,
offset: Option<usize>,
filters: Option<Value>,
include_expired: bool,
include_retired: bool,
) -> ContextResult<Vec<RecordDto>> {
let records = ContextStore::list(self, limit, offset)
.await
.map_err(to_ctx_err)?;
let filters = filters
.map(RecordFilters::from_json_value)
.transpose()
.map_err(ContextError::InvalidRequest)?;
let options = LifecycleQueryOptions::new(include_expired, include_retired);
let records = ContextStore::list_filtered_with_options(
self,
limit,
offset,
filters.as_ref(),
options,
)
.await
.map_err(to_ctx_err)?;
Ok(records.into_iter().map(record_to_dto).collect())
}

Expand All @@ -179,19 +195,27 @@ impl ContextStoreApi for ContextStore {
Ok(records.into_iter().map(record_to_dto).collect())
}

async fn search(
&self,
query: &[f32],
limit: Option<usize>,
include_relationships: bool,
) -> ContextResult<Vec<SearchResultDto>> {
let results = ContextStore::search(self, query, limit)
.await
.map_err(to_ctx_err)?;
async fn search(&self, request: &SearchRequest) -> ContextResult<Vec<SearchResultDto>> {
let filters = request
.filters
.clone()
.map(RecordFilters::from_json_value)
.transpose()
.map_err(ContextError::InvalidRequest)?;
let options = LifecycleQueryOptions::new(request.include_expired, request.include_retired);
let results = ContextStore::search_filtered_with_options(
self,
&request.query,
Some(request.limit),
filters.as_ref(),
options,
)
.await
.map_err(to_ctx_err)?;
Ok(results
.into_iter()
.map(|mut sr| {
if !include_relationships {
if !request.include_relationships {
sr.record.relationships.clear();
}
SearchResultDto {
Expand Down Expand Up @@ -330,6 +354,7 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch {
lifecycle_status: patch.lifecycle_status.clone(),
retired_at: patch.retired_at,
retired_reason: patch.retired_reason.clone(),
embedding: patch.embedding.clone(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/lance-context-core/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ pub struct RecordPatch {
pub lifecycle_status: Option<String>,
pub retired_at: Option<DateTime<Utc>>,
pub retired_reason: Option<String>,
/// Vector embedding to attach to the record. Enables deferred embedding
/// workflows: append raw text first, then enrich with an embedding later.
pub embedding: Option<Vec<f32>>,
}

impl RecordPatch {
Expand All @@ -180,6 +183,7 @@ impl RecordPatch {
&& self.lifecycle_status.is_none()
&& self.retired_at.is_none()
&& self.retired_reason.is_none()
&& self.embedding.is_none()
}
}

Expand Down
Loading
Loading