Skip to content

Commit 6b1e6c5

Browse files
committed
feat(docsfilter): filter ids on search
1 parent ea7b447 commit 6b1e6c5

17 files changed

Lines changed: 144 additions & 27 deletions

docsfilter/docs_filter.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,11 @@ type DocsFilter struct {
5353
createDirOnce *sync.Once
5454
}
5555

56-
func Start(
56+
func New(
5757
ctx context.Context,
5858
cfg Config,
5959
params []Params,
6060
mp MappingProvider,
61-
fracs fracmanager.List,
6261
) *DocsFilter {
6362
workers := cfg.Workers
6463
if workers <= 0 {
@@ -72,7 +71,7 @@ func Start(
7271
filtersMap[string(f.Hash())] = f
7372
}
7473

75-
df := &DocsFilter{
74+
return &DocsFilter{
7675
ctx: ctx,
7776
config: cfg,
7877
filters: filtersMap,
@@ -82,7 +81,9 @@ func Start(
8281
rateLimit: make(chan struct{}, workers),
8382
createDirOnce: &sync.Once{},
8483
}
84+
}
8585

86+
func (df *DocsFilter) Start(fracs fracmanager.List) {
8687
df.createDataDir()
8788

8889
err := df.loadFilters()
@@ -106,8 +107,6 @@ func Start(
106107

107108
df.processFilter(f, fracs.FilterInRange(seq.MID(f.params.From), seq.MID(f.params.To)))
108109
}
109-
110-
return df
111110
}
112111

113112
func (df *DocsFilter) GetFilteredLIDsByFrac(fracName string) ([]seq.LID, error) {
@@ -122,9 +121,15 @@ func (df *DocsFilter) GetFilteredLIDsByFrac(fracName string) ([]seq.LID, error)
122121
var lids []seq.LID
123122

124123
for _, f := range fracFiles {
125-
rawLIDs, err := os.ReadFile(f)
124+
compressedLIDs, err := os.ReadFile(f)
125+
if err != nil {
126+
logger.Error("can't read filtered lids from file", zap.String("path", f), zap.Error(err))
127+
return nil, err
128+
}
129+
130+
rawLIDs, err := zstd.Decompress(compressedLIDs, nil)
126131
if err != nil {
127-
logger.Error("can't open filtered lids file", zap.String("path", f), zap.Error(err))
132+
logger.Error("can't decompress filtered lids from file", zap.String("path", f), zap.Error(err))
128133
return nil, err
129134
}
130135

@@ -145,7 +150,7 @@ func (df *DocsFilter) GetFilteredLIDsByFrac(fracName string) ([]seq.LID, error)
145150
return lids, nil
146151
}
147152

148-
func (df *DocsFilter) addDoneFrac(fracName string, fracPath string) {
153+
func (df *DocsFilter) addDoneFrac(fracName, fracPath string) {
149154
df.fracsMu.Lock()
150155
defer df.fracsMu.Unlock()
151156

frac/active.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ type Active struct {
6262

6363
writer *ActiveWriter
6464
indexer *ActiveIndexer
65+
66+
docsFilter DocsFilter
6567
}
6668

6769
const (
@@ -81,6 +83,7 @@ func NewActive(
8183
docsCache *cache.Cache[[]byte],
8284
sortCache *cache.Cache[[]byte],
8385
cfg *Config,
86+
docsFilter DocsFilter,
8487
) *Active {
8588
docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync)
8689
metaFile, metaStats := mustOpenFile(baseFileName+consts.MetaFileSuffix, config.SkipFsync)
@@ -107,6 +110,8 @@ func NewActive(
107110
BaseFileName: baseFileName,
108111
info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())),
109112
Config: cfg,
113+
114+
docsFilter: docsFilter,
110115
}
111116

112117
// use of 0 as keys in maps is prohibited – it's system key, so add first element
@@ -327,6 +332,8 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider {
327332
blocksOffsets: f.DocBlocks.GetVals(),
328333
docsPositions: f.DocsPositions,
329334
docsReader: &f.docsReader,
335+
336+
docsFilter: f.docsFilter,
330337
}
331338
}
332339

frac/active_index.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type activeDataProvider struct {
2828
docsReader *storage.DocsReader
2929

3030
idsIndex *activeIDsIndex
31+
32+
docsFilter DocsFilter
3133
}
3234

3335
func (dp *activeDataProvider) release() {
@@ -116,6 +118,8 @@ func (dp *activeDataProvider) Search(params processor.SearchParams) (*seq.QPR, e
116118
indexes := []activeSearchIndex{{
117119
activeIDsIndex: dp.getIDsIndex(),
118120
activeTokenIndex: dp.getTokenIndex(),
121+
docsFilter: dp.docsFilter,
122+
fracName: dp.info.Name(),
119123
}}
120124
m.Stop()
121125

@@ -182,6 +186,23 @@ func (p *activeIDsIndex) LessOrEqual(lid seq.LID, id seq.ID) bool {
182186
type activeSearchIndex struct {
183187
*activeIDsIndex
184188
*activeTokenIndex
189+
docsFilter DocsFilter
190+
fracName string
191+
}
192+
193+
func (si *activeSearchIndex) GetTombstones() ([]uint32, error) {
194+
filteredLids, err := si.docsFilter.GetFilteredLIDsByFrac(si.fracName)
195+
if err != nil {
196+
return nil, err
197+
}
198+
199+
// TODO: return []uint32 from docsFilter (???)
200+
res := make([]uint32, 0, len(filteredLids))
201+
for _, lid := range filteredLids {
202+
res = append(res, uint32(lid))
203+
}
204+
205+
return res, nil
185206
}
186207

187208
type activeTokenIndex struct {

frac/active_indexer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func BenchmarkIndexer(b *testing.B) {
9191
cache.NewCache[[]byte](nil, nil),
9292
cache.NewCache[[]byte](nil, nil),
9393
&Config{},
94+
nil,
9495
)
9596

9697
processor := getTestProcessor()

frac/fraction_concurrency_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func TestConcurrentAppendAndQuery(t *testing.T) {
5353
cache.NewCache[[]byte](nil, nil),
5454
cache.NewCache[[]byte](nil, nil),
5555
&Config{},
56+
nil,
5657
)
5758

5859
mapping := seq.Mapping{
@@ -325,6 +326,7 @@ func seal(active *Active) (*Sealed, error) {
325326
indexCache,
326327
cache.NewCache[[]byte](nil, nil),
327328
&Config{},
329+
nil,
328330
)
329331
active.Release()
330332
return sealed, nil

frac/fraction_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,6 +1285,7 @@ func (s *FractionTestSuite) newActive(bulks ...[]string) *Active {
12851285
cache.NewCache[[]byte](nil, nil),
12861286
cache.NewCache[[]byte](nil, nil),
12871287
s.config,
1288+
nil,
12881289
)
12891290

12901291
var wg sync.WaitGroup
@@ -1347,6 +1348,7 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed {
13471348
indexCache,
13481349
cache.NewCache[[]byte](nil, nil),
13491350
s.config,
1351+
nil,
13501352
)
13511353
active.Release()
13521354
return sealed
@@ -1427,7 +1429,9 @@ func (s *ActiveReplayedFractionTestSuite) Replay(frac *Active) Fraction {
14271429
storage.NewReadLimiter(1, nil),
14281430
cache.NewCache[[]byte](nil, nil),
14291431
cache.NewCache[[]byte](nil, nil),
1430-
&Config{})
1432+
&Config{},
1433+
nil,
1434+
)
14311435
err := replayedFrac.Replay(context.Background())
14321436
s.Require().NoError(err, "replay failed")
14331437
return replayedFrac
@@ -1541,7 +1545,9 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal
15411545
indexCache,
15421546
cache.NewCache[[]byte](nil, nil),
15431547
nil,
1544-
s.config)
1548+
s.config,
1549+
nil,
1550+
)
15451551
s.fraction = sealed
15461552
return sealed
15471553
}
@@ -1610,7 +1616,9 @@ func (s *RemoteFractionTestSuite) SetupTest() {
16101616
cache.NewCache[[]byte](nil, nil),
16111617
sealed.info,
16121618
s.config,
1613-
s3cli)
1619+
s3cli,
1620+
nil,
1621+
)
16141622
s.fraction = remoteFrac
16151623
}
16161624
}

frac/processor/eval_tree.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ func evalLeaf(
7777
return node.BuildORTree(lidsTids, order.IsReverse()), nil
7878
}
7979

80+
func evalTombstones(root node.Node, tombstones []uint32, reverse bool, stats *searchStats) node.Node {
81+
if len(tombstones) == 0 {
82+
return root
83+
}
84+
stats.NodesTotal++
85+
return node.NewNAnd(node.NewStatic(tombstones, reverse), root, reverse)
86+
}
87+
8088
type Aggregator interface {
8189
// Next iterates to count the next lid.
8290
Next(lid uint32) error

frac/processor/search.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ type tokenIndex interface {
3333
GetValByTID(tid uint32) []byte
3434
GetTIDsByTokenExpr(token parser.Token) ([]uint32, error)
3535
GetLIDsFromTIDs(tids []uint32, stats lids.Counter, minLID, maxLID uint32, order seq.DocsOrder) []node.Node
36-
// GetTombstones() []seq.LID
3736
}
3837

3938
type searchIndex interface {
4039
tokenIndex
4140
idsIndex
41+
GetTombstones() ([]uint32, error)
4242
}
4343

4444
func IndexSearch(
@@ -88,6 +88,17 @@ func IndexSearch(
8888
m.Stop()
8989
}
9090

91+
m = sw.Start("get_tombstones")
92+
tombstones, err := index.GetTombstones()
93+
m.Stop()
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
m = sw.Start("eval_tombstones")
99+
evalTree = evalTombstones(evalTree, tombstones, params.Order.IsReverse(), stats)
100+
m.Stop()
101+
91102
m = sw.Start("iterate_eval_tree")
92103
total, ids, histogram, err := iterateEvalTree(ctx, params, index, evalTree, aggs, sw)
93104
m.Stop()
@@ -187,7 +198,7 @@ func iterateEvalTree(
187198
}
188199

189200
timerEval.Start()
190-
lid, has := evalTree.Next()
201+
lid, has := evalTree.Next() // TODO: here ???
191202
timerEval.Stop()
192203

193204
if !has {

frac/remote.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type Remote struct {
5959

6060
s3cli *s3.Client
6161
readLimiter *storage.ReadLimiter
62+
63+
docsFilter DocsFilter
6264
}
6365

6466
func NewRemote(
@@ -70,6 +72,7 @@ func NewRemote(
7072
info *common.Info,
7173
config *Config,
7274
s3cli *s3.Client,
75+
docsFilter DocsFilter,
7376
) *Remote {
7477
f := &Remote{
7578
ctx: ctx,
@@ -85,6 +88,8 @@ func NewRemote(
8588
Config: config,
8689

8790
s3cli: s3cli,
91+
92+
docsFilter: docsFilter,
8893
}
8994

9095
// Fast path if fraction-info cache exists AND it has valid index size.
@@ -240,6 +245,8 @@ func (f *Remote) createDataProvider(ctx context.Context) *sealedDataProvider {
240245
&f.blocksData.IDsTable,
241246
f.info.BinaryDataVer,
242247
),
248+
249+
docsFilter: f.docsFilter,
243250
}
244251
}
245252

frac/sealed.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ type Sealed struct {
5555

5656
// shit for testing
5757
PartialSuicideMode PSD
58+
59+
docsFilter DocsFilter
5860
}
5961

6062
type PSD int // emulates hard shutdown on different stages of fraction deletion, used for tests
@@ -72,6 +74,7 @@ func NewSealed(
7274
docsCache *cache.Cache[[]byte],
7375
info *common.Info,
7476
config *Config,
77+
docsFilter DocsFilter,
7578
) *Sealed {
7679
f := &Sealed{
7780
loadMu: &sync.RWMutex{},
@@ -85,6 +88,8 @@ func NewSealed(
8588
Config: config,
8689

8790
PartialSuicideMode: Off,
91+
92+
docsFilter: docsFilter,
8893
}
8994

9095
// fast path if fraction-info cache exists AND it has valid index size
@@ -134,6 +139,7 @@ func NewSealedPreloaded(
134139
indexCache *IndexCache,
135140
docsCache *cache.Cache[[]byte],
136141
config *Config,
142+
docsFilter DocsFilter,
137143
) *Sealed {
138144
f := &Sealed{
139145
blocksData: preloaded.BlocksData,
@@ -148,6 +154,8 @@ func NewSealedPreloaded(
148154
info: preloaded.Info,
149155
BaseFileName: baseFile,
150156
Config: config,
157+
158+
docsFilter: docsFilter,
151159
}
152160

153161
// put the token table built during sealing into the cache of the sealed fraction
@@ -401,6 +409,8 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider {
401409
&f.blocksData.IDsTable,
402410
f.info.BinaryDataVer,
403411
),
412+
413+
docsFilter: f.docsFilter,
404414
}
405415
}
406416

0 commit comments

Comments
 (0)