@@ -7,9 +7,12 @@ use crate::traits::KnowledgeStore;
77use async_trait:: async_trait;
88use chrono:: { DateTime , Utc } ;
99use opencode_mem_core:: {
10- GlobalKnowledge , KNOWLEDGE_TRIGRAM_CANDIDATE_LIMIT , KNOWLEDGE_TRIGRAM_LOG_THRESHOLD ,
10+ EMBEDDING_DIMENSION , GlobalKnowledge , KNOWLEDGE_SEMANTIC_DEDUP_THRESHOLD ,
11+ KNOWLEDGE_TRIGRAM_CANDIDATE_LIMIT , KNOWLEDGE_TRIGRAM_LOG_THRESHOLD ,
1112 KNOWLEDGE_TRIGRAM_MERGE_THRESHOLD , KnowledgeInput , KnowledgeSearchResult , KnowledgeType ,
13+ contains_non_finite, is_zero_vector,
1214} ;
15+ use pgvector:: Vector ;
1316use sqlx:: Row ;
1417
1518type ExistingKnowledgeRow = (
@@ -120,6 +123,7 @@ impl PgStorage {
120123 & self ,
121124 id : Option < & str > ,
122125 input : & KnowledgeInput ,
126+ embedding : Option < & [ f32 ] > ,
123127 ) -> Result < GlobalKnowledge , StorageError > {
124128 let mut tx = self . pool . begin ( ) . await ?;
125129
@@ -129,7 +133,7 @@ impl PgStorage {
129133
130134 let now = Utc :: now ( ) ;
131135
132- let trimmed_title = input. title . trim ( ) ;
136+ let trimmed_title = opencode_mem_core :: strip_uuid_from_title ( input. title . trim ( ) ) ;
133137
134138 let existing: Option < ExistingKnowledgeRow > = sqlx:: query_as (
135139 "SELECT id, created_at, triggers, source_projects, source_observations,
@@ -138,7 +142,7 @@ impl PgStorage {
138142 WHERE LOWER(title) = LOWER($1)
139143 FOR UPDATE" ,
140144 )
141- . bind ( trimmed_title)
145+ . bind ( & trimmed_title)
142146 . fetch_optional ( & mut * tx)
143147 . await ?;
144148
@@ -164,7 +168,7 @@ impl PgStorage {
164168 confidence,
165169 usage_count,
166170 last_used_at,
167- trimmed_title,
171+ & trimmed_title,
168172 input,
169173 now,
170174 )
@@ -174,7 +178,7 @@ impl PgStorage {
174178 }
175179
176180 if let Some ( similar) = self
177- . find_trigram_similar_in_tx ( & mut tx, trimmed_title)
181+ . find_trigram_similar_in_tx ( & mut tx, & trimmed_title)
178182 . await ?
179183 {
180184 let result = self
@@ -186,14 +190,45 @@ impl PgStorage {
186190 tx. commit ( ) . await ?;
187191
188192 tracing:: info!(
189- new_title = trimmed_title,
193+ new_title = % trimmed_title,
190194 merged_into = %result. id,
191195 existing_title = %result. title,
192196 "knowledge trigram dedup: merged similar entry"
193197 ) ;
194198 return Ok ( result) ;
195199 }
196200
201+ // Step 3: Semantic similarity via embedding cosine distance
202+ if let Some ( emb) = embedding
203+ && let Some ( semantic) = self . find_semantic_similar_in_tx ( & mut tx, emb) . await ?
204+ {
205+ let result = self
206+ . merge_into_existing (
207+ & mut tx,
208+ & semantic. 0 ,
209+ semantic. 1 ,
210+ semantic. 2 ,
211+ semantic. 3 ,
212+ semantic. 4 ,
213+ semantic. 5 ,
214+ semantic. 6 ,
215+ semantic. 7 ,
216+ & semantic. 8 ,
217+ input,
218+ now,
219+ )
220+ . await ?;
221+ tx. commit ( ) . await ?;
222+
223+ tracing:: info!(
224+ new_title = %trimmed_title,
225+ merged_into = %result. id,
226+ existing_title = %result. title,
227+ "knowledge semantic dedup: merged via embedding similarity"
228+ ) ;
229+ return Ok ( result) ;
230+ }
231+
197232 let id = id. map_or_else ( || uuid:: Uuid :: new_v4 ( ) . to_string ( ) , ToOwned :: to_owned) ;
198233 let source_projects: Vec < String > = input
199234 . source_project
@@ -212,7 +247,7 @@ impl PgStorage {
212247 ) )
213248 . bind ( & id)
214249 . bind ( input. knowledge_type . as_str ( ) )
215- . bind ( trimmed_title)
250+ . bind ( & trimmed_title)
216251 . bind ( & input. description )
217252 . bind ( & input. instructions )
218253 . bind ( serde_json:: to_value ( & input. triggers ) ?)
@@ -227,12 +262,21 @@ impl PgStorage {
227262 . execute ( & mut * tx)
228263 . await ?;
229264
265+ if let Some ( emb) = embedding {
266+ let vector = Vector :: from ( emb. to_vec ( ) ) ;
267+ sqlx:: query ( "UPDATE global_knowledge SET embedding = $1 WHERE id = $2" )
268+ . bind ( vector)
269+ . bind ( & id)
270+ . execute ( & mut * tx)
271+ . await ?;
272+ }
273+
230274 tx. commit ( ) . await ?;
231275
232276 Ok ( GlobalKnowledge :: new (
233277 id,
234278 input. knowledge_type ,
235- trimmed_title. to_owned ( ) ,
279+ trimmed_title,
236280 input. description . clone ( ) ,
237281 input. instructions . clone ( ) ,
238282 input. triggers . clone ( ) ,
@@ -355,6 +399,113 @@ impl PgStorage {
355399
356400 Ok ( best_merge)
357401 }
402+
403+ #[ expect(
404+ clippy:: type_complexity,
405+ reason = "tuple matches ExistingKnowledgeRow + title for merge"
406+ ) ]
407+ async fn find_semantic_similar_in_tx (
408+ & self ,
409+ tx : & mut sqlx:: Transaction < ' _ , sqlx:: Postgres > ,
410+ embedding : & [ f32 ] ,
411+ ) -> Result <
412+ Option < (
413+ String ,
414+ DateTime < Utc > ,
415+ serde_json:: Value ,
416+ serde_json:: Value ,
417+ serde_json:: Value ,
418+ f64 ,
419+ i64 ,
420+ Option < DateTime < Utc > > ,
421+ String ,
422+ ) > ,
423+ StorageError ,
424+ > {
425+ if embedding. is_empty ( ) || is_zero_vector ( embedding) || contains_non_finite ( embedding) {
426+ return Ok ( None ) ;
427+ }
428+
429+ let vector = Vector :: from ( embedding. to_vec ( ) ) ;
430+
431+ let row: Option < (
432+ String ,
433+ DateTime < Utc > ,
434+ serde_json:: Value ,
435+ serde_json:: Value ,
436+ serde_json:: Value ,
437+ f64 ,
438+ i64 ,
439+ Option < DateTime < Utc > > ,
440+ String ,
441+ f64 ,
442+ ) > = sqlx:: query_as (
443+ "SELECT id, created_at, triggers, source_projects, source_observations,
444+ confidence, usage_count, last_used_at, title,
445+ 1.0 - (embedding <=> $1) AS similarity
446+ FROM global_knowledge
447+ WHERE archived_at IS NULL
448+ AND embedding IS NOT NULL
449+ ORDER BY embedding <=> $1
450+ LIMIT 1
451+ FOR UPDATE" ,
452+ )
453+ . bind ( vector)
454+ . fetch_optional ( & mut * * tx)
455+ . await ?;
456+
457+ match row {
458+ Some ( (
459+ id,
460+ created_at,
461+ triggers,
462+ src_proj,
463+ src_obs,
464+ confidence,
465+ usage_count,
466+ last_used_at,
467+ title,
468+ similarity,
469+ ) ) => {
470+ #[ expect(
471+ clippy:: cast_possible_truncation,
472+ reason = "similarity f64→f32 is acceptable lossy narrowing"
473+ ) ]
474+ let sim_f32 = similarity as f32 ;
475+ if sim_f32 >= KNOWLEDGE_SEMANTIC_DEDUP_THRESHOLD {
476+ tracing:: debug!(
477+ existing_title = %title,
478+ existing_id = %id,
479+ similarity = %sim_f32,
480+ "knowledge semantic match above threshold"
481+ ) ;
482+ Ok ( Some ( (
483+ id,
484+ created_at,
485+ triggers,
486+ src_proj,
487+ src_obs,
488+ confidence,
489+ usage_count,
490+ last_used_at,
491+ title,
492+ ) ) )
493+ } else {
494+ if sim_f32 >= 0.7 {
495+ tracing:: info!(
496+ existing_title = %title,
497+ existing_id = %id,
498+ similarity = %sim_f32,
499+ threshold = %KNOWLEDGE_SEMANTIC_DEDUP_THRESHOLD ,
500+ "knowledge semantic near-miss (below threshold)"
501+ ) ;
502+ }
503+ Ok ( None )
504+ }
505+ }
506+ None => Ok ( None ) ,
507+ }
508+ }
358509}
359510
360511#[ async_trait]
@@ -370,7 +521,7 @@ impl KnowledgeStore for PgStorage {
370521 input : KnowledgeInput ,
371522 ) -> Result < GlobalKnowledge , StorageError > {
372523 for attempt in 0u8 ..3u8 {
373- match self . save_knowledge_inner ( Some ( id) , & input) . await {
524+ match self . save_knowledge_inner ( Some ( id) , & input, None ) . await {
374525 Ok ( knowledge) => return Ok ( knowledge) ,
375526 Err ( ref e) if e. is_duplicate ( ) && attempt < 2 => {
376527 tracing:: debug!(
@@ -386,6 +537,68 @@ impl KnowledgeStore for PgStorage {
386537 unreachable ! ( )
387538 }
388539
540+ async fn save_knowledge_with_embedding (
541+ & self ,
542+ id : & str ,
543+ input : KnowledgeInput ,
544+ embedding : Vec < f32 > ,
545+ ) -> Result < GlobalKnowledge , StorageError > {
546+ for attempt in 0u8 ..3u8 {
547+ match self
548+ . save_knowledge_inner ( Some ( id) , & input, Some ( & embedding) )
549+ . await
550+ {
551+ Ok ( knowledge) => return Ok ( knowledge) ,
552+ Err ( ref e) if e. is_duplicate ( ) && attempt < 2 => {
553+ tracing:: debug!(
554+ title = %input. title,
555+ attempt,
556+ "knowledge save with embedding hit unique constraint, retrying"
557+ ) ;
558+ continue ;
559+ }
560+ Err ( e) => return Err ( e) ,
561+ }
562+ }
563+ unreachable ! ( )
564+ }
565+
566+ async fn store_knowledge_embedding (
567+ & self ,
568+ knowledge_id : & str ,
569+ embedding : & [ f32 ] ,
570+ ) -> Result < ( ) , StorageError > {
571+ if embedding. len ( ) != EMBEDDING_DIMENSION {
572+ return Err ( StorageError :: DataCorruption {
573+ context : format ! (
574+ "knowledge embedding dimension mismatch: expected {EMBEDDING_DIMENSION}, got {}" ,
575+ embedding. len( )
576+ ) ,
577+ source : "dimension check" . into ( ) ,
578+ } ) ;
579+ }
580+ if is_zero_vector ( embedding) {
581+ return Err ( StorageError :: DataCorruption {
582+ context : format ! ( "rejecting zero vector embedding for knowledge {knowledge_id}" ) ,
583+ source : "zero vector check" . into ( ) ,
584+ } ) ;
585+ }
586+ if contains_non_finite ( embedding) {
587+ return Err ( StorageError :: DataCorruption {
588+ context : "knowledge embedding contains NaN or Infinity values" . to_owned ( ) ,
589+ source : Box :: from ( "non-finite check" ) ,
590+ } ) ;
591+ }
592+
593+ let vector = Vector :: from ( embedding. to_vec ( ) ) ;
594+ sqlx:: query ( "UPDATE global_knowledge SET embedding = $1 WHERE id = $2" )
595+ . bind ( vector)
596+ . bind ( knowledge_id)
597+ . execute ( & self . pool )
598+ . await ?;
599+ Ok ( ( ) )
600+ }
601+
389602 async fn get_knowledge ( & self , id : & str ) -> Result < Option < GlobalKnowledge > , StorageError > {
390603 let row = sqlx:: query ( & format ! (
391604 "SELECT {KNOWLEDGE_COLUMNS} FROM global_knowledge WHERE id = $1 AND archived_at IS NULL"
@@ -555,4 +768,24 @@ impl KnowledgeStore for PgStorage {
555768 . await ?;
556769 Ok ( result. rows_affected ( ) )
557770 }
771+
772+ async fn link_source_observation (
773+ & self ,
774+ knowledge_id : & str ,
775+ observation_id : & str ,
776+ ) -> Result < bool , StorageError > {
777+ let result = sqlx:: query (
778+ "UPDATE global_knowledge
779+ SET source_observations = source_observations || to_jsonb($2::text),
780+ updated_at = NOW()
781+ WHERE id = $1
782+ AND archived_at IS NULL
783+ AND NOT source_observations @> to_jsonb($2::text)" ,
784+ )
785+ . bind ( knowledge_id)
786+ . bind ( observation_id)
787+ . execute ( & self . pool )
788+ . await ?;
789+ Ok ( result. rows_affected ( ) > 0 )
790+ }
558791}
0 commit comments