@@ -25,6 +25,7 @@ import (
2525
2626 "github.com/RoaringBitmap/roaring/v2"
2727 faiss "github.com/blevesearch/go-faiss"
28+ segment "github.com/blevesearch/scorch_segment_api/v2"
2829)
2930
3031func newVectorIndexCache () * vectorIndexCache {
@@ -56,17 +57,17 @@ func (vc *vectorIndexCache) Clear() {
5657// present. It also returns the batch executor for the field if it's present in the
5758// cache.
5859func (vc * vectorIndexCache ) loadOrCreate (fieldID uint16 , mem []byte ,
59- loadDocVecIDMap bool , except * roaring.Bitmap ) (
60+ loadDocVecIDMap bool , except * roaring.Bitmap , options segment. InterpretVectorIndexOptions ) (
6061 index * faiss.IndexImpl , vecDocIDMap map [int64 ]uint32 , docVecIDMap map [uint32 ][]int64 ,
61- vecIDsToExclude []int64 , err error ) {
62+ vecIDsToExclude []int64 , batchExec * batchExecutor , err error ) {
6263 vc .m .RLock ()
6364 entry , ok := vc .cache [fieldID ]
6465 if ok {
65- index , vecDocIDMap , docVecIDMap = entry .load ()
66+ index , vecDocIDMap , docVecIDMap , batchExec = entry .load ()
6667 vecIDsToExclude = getVecIDsToExclude (vecDocIDMap , except )
6768 if ! loadDocVecIDMap || len (entry .docVecIDMap ) > 0 {
6869 vc .m .RUnlock ()
69- return index , vecDocIDMap , docVecIDMap , vecIDsToExclude , nil
70+ return index , vecDocIDMap , docVecIDMap , vecIDsToExclude , batchExec , nil
7071 }
7172
7273 vc .m .RUnlock ()
@@ -76,14 +77,14 @@ func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte,
7677 // typically seen for the first filtered query.
7778 docVecIDMap = vc .addDocVecIDMapToCacheLOCKED (entry )
7879 vc .m .Unlock ()
79- return index , vecDocIDMap , docVecIDMap , vecIDsToExclude , nil
80+ return index , vecDocIDMap , docVecIDMap , vecIDsToExclude , batchExec , nil
8081 }
8182
8283 vc .m .RUnlock ()
8384 // acquiring a lock since this is modifying the cache.
8485 vc .m .Lock ()
8586 defer vc .m .Unlock ()
86- return vc .createAndCacheLOCKED (fieldID , mem , loadDocVecIDMap , except )
87+ return vc .createAndCacheLOCKED (fieldID , mem , loadDocVecIDMap , except , options )
8788}
8889
8990func (vc * vectorIndexCache ) addDocVecIDMapToCacheLOCKED (ce * cacheEntry ) map [uint32 ][]int64 {
@@ -104,21 +105,22 @@ func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint
104105
105106// Rebuilding the cache on a miss.
106107func (vc * vectorIndexCache ) createAndCacheLOCKED (fieldID uint16 , mem []byte ,
107- loadDocVecIDMap bool , except * roaring.Bitmap ) (
108+ loadDocVecIDMap bool , except * roaring.Bitmap , options segment. InterpretVectorIndexOptions ) (
108109 index * faiss.IndexImpl , vecDocIDMap map [int64 ]uint32 ,
109- docVecIDMap map [uint32 ][]int64 , vecIDsToExclude []int64 , err error ) {
110+ docVecIDMap map [uint32 ][]int64 , vecIDsToExclude []int64 ,
111+ batchExec * batchExecutor , err error ) {
110112
111113 // Handle concurrent accesses (to avoid unnecessary work) by adding a
112114 // check within the write lock here.
113115 entry := vc .cache [fieldID ]
114116 if entry != nil {
115- index , vecDocIDMap , docVecIDMap = entry .load ()
117+ index , vecDocIDMap , docVecIDMap , batchExec = entry .load ()
116118 vecIDsToExclude = getVecIDsToExclude (vecDocIDMap , except )
117119 if ! loadDocVecIDMap || len (entry .docVecIDMap ) > 0 {
118- return index , vecDocIDMap , docVecIDMap , vecIDsToExclude , nil
120+ return index , vecDocIDMap , docVecIDMap , vecIDsToExclude , batchExec , nil
119121 }
120122 docVecIDMap = vc .addDocVecIDMapToCacheLOCKED (entry )
121- return index , vecDocIDMap , docVecIDMap , vecIDsToExclude , nil
123+ return index , vecDocIDMap , docVecIDMap , vecIDsToExclude , batchExec , nil
122124 }
123125
124126 // if the cache doesn't have the entry, construct the vector to doc id map and
@@ -154,16 +156,17 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte,
154156
155157 index , err = faiss .ReadIndexFromBuffer (mem [pos :pos + int (indexSize )], faissIOFlags )
156158 if err != nil {
157- return nil , nil , nil , nil , err
159+ return nil , nil , nil , nil , nil , err
158160 }
159161
160- vc .insertLOCKED (fieldID , index , vecDocIDMap , loadDocVecIDMap , docVecIDMap )
161- return index , vecDocIDMap , docVecIDMap , vecIDsToExclude , nil
162+ batchExec = newBatchExecutor (options )
163+ vc .insertLOCKED (fieldID , index , vecDocIDMap , loadDocVecIDMap , docVecIDMap , batchExec )
164+ return index , vecDocIDMap , docVecIDMap , vecIDsToExclude , batchExec , nil
162165}
163166
164167func (vc * vectorIndexCache ) insertLOCKED (fieldIDPlus1 uint16 ,
165168 index * faiss.IndexImpl , vecDocIDMap map [int64 ]uint32 , loadDocVecIDMap bool ,
166- docVecIDMap map [uint32 ][]int64 ) {
169+ docVecIDMap map [uint32 ][]int64 , batchExec * batchExecutor ) {
167170 // the first time we've hit the cache, try to spawn a monitoring routine
168171 // which will reconcile the moving averages for all the fields being hit
169172 if len (vc .cache ) == 0 {
@@ -178,7 +181,7 @@ func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16,
178181 // longer time and thereby the index to be resident in the cache
179182 // for longer time.
180183 vc .cache [fieldIDPlus1 ] = createCacheEntry (index , vecDocIDMap ,
181- loadDocVecIDMap , docVecIDMap , 0.4 )
184+ loadDocVecIDMap , docVecIDMap , 0.4 , batchExec )
182185 }
183186}
184187
@@ -272,15 +275,17 @@ func (e *ewma) add(val uint64) {
272275// -----------------------------------------------------------------------------
273276
274277func createCacheEntry (index * faiss.IndexImpl , vecDocIDMap map [int64 ]uint32 ,
275- loadDocVecIDMap bool , docVecIDMap map [uint32 ][]int64 , alpha float64 ) * cacheEntry {
278+ loadDocVecIDMap bool , docVecIDMap map [uint32 ][]int64 , alpha float64 ,
279+ batchExec * batchExecutor ) * cacheEntry {
276280 ce := & cacheEntry {
277281 index : index ,
278282 vecDocIDMap : vecDocIDMap ,
279283 tracker : & ewma {
280284 alpha : alpha ,
281285 sample : 1 ,
282286 },
283- refs : 1 ,
287+ refs : 1 ,
288+ batchExec : batchExec ,
284289 }
285290 if loadDocVecIDMap {
286291 ce .docVecIDMap = docVecIDMap
@@ -299,6 +304,8 @@ type cacheEntry struct {
299304 index * faiss.IndexImpl
300305 vecDocIDMap map [int64 ]uint32
301306 docVecIDMap map [uint32 ][]int64
307+
308+ batchExec * batchExecutor
302309}
303310
304311func (ce * cacheEntry ) incHit () {
@@ -313,10 +320,14 @@ func (ce *cacheEntry) decRef() {
313320 atomic .AddInt64 (& ce .refs , - 1 )
314321}
315322
316- func (ce * cacheEntry ) load () (* faiss.IndexImpl , map [int64 ]uint32 , map [uint32 ][]int64 ) {
323+ func (ce * cacheEntry ) load () (
324+ * faiss.IndexImpl ,
325+ map [int64 ]uint32 ,
326+ map [uint32 ][]int64 ,
327+ * batchExecutor ) {
317328 ce .incHit ()
318329 ce .addRef ()
319- return ce .index , ce .vecDocIDMap , ce .docVecIDMap
330+ return ce .index , ce .vecDocIDMap , ce .docVecIDMap , ce . batchExec
320331}
321332
322333func (ce * cacheEntry ) close () {
@@ -325,6 +336,7 @@ func (ce *cacheEntry) close() {
325336 ce .index = nil
326337 ce .vecDocIDMap = nil
327338 ce .docVecIDMap = nil
339+ ce .batchExec .close ()
328340 }()
329341}
330342
0 commit comments