Skip to content

Commit 4846a3e

Browse files
committed
chore(docsfilter): handle errors
1 parent 277508f commit 4846a3e

5 files changed

Lines changed: 37 additions & 14 deletions

File tree

docsfilter/docs_filter.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,11 @@ func (df *DocsFilter) RefreshFrac(fraction frac.Fraction) {
167167
util.MustWriteFileAtomic(queueFilePath, []byte{}, tmpExt)
168168

169169
filter.processWg.Add(1)
170-
go df.processFrac(fraction, filter, true) // nolint:errcheck // TODO: in progress
170+
go func() {
171+
if err := df.processFrac(fraction, filter, false); err != nil {
172+
panic(fmt.Errorf("docs filter refresh frac err: %s", err))
173+
}
174+
}()
171175
}
172176
}
173177

@@ -281,7 +285,11 @@ func (df *DocsFilter) processFilter(filter *Filter, fracs fracmanager.List) {
281285
processFracInQueue := func(name string) error {
282286
f := fracsByName[fracNameFromFilePath(name)]
283287
filter.processWg.Add(1)
284-
go df.processFrac(f, filter, false) // nolint:errcheck // TODO: in progress
288+
go func() {
289+
if err := df.processFrac(f, filter, false); err != nil {
290+
panic(fmt.Errorf("docs filter process frac err: %s", err))
291+
}
292+
}()
285293

286294
return nil
287295
}
@@ -321,7 +329,12 @@ func (df *DocsFilter) processFrac(f frac.Fraction, filter *Filter, refresh bool)
321329
util.RemoveFile(tmpFilePath)
322330
return nil
323331
}
324-
// TODO: don't read lids twice: f.Search() and f.FindLIDs()
332+
333+
// TODO: here we doing part of the work twice:
334+
// first time we find LIDs inside f.Search() and then find IDs by these LIDs.
335+
// Then we again find LIDs by earlier found IDs in f.FindLIDs().
336+
// We did it like this because otherwise we had to do serious f.Search() rewrite.
337+
// For now we're ok with some performance penalty.
325338
lids, err := f.FindLIDs(df.ctx, qpr.IDs.IDs())
326339
if err != nil {
327340
return err

frac/active_index.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func (di *activeFetchIndex) GetBlocksOffsets(num uint32) uint64 {
263263
return di.blocksOffsets[num]
264264
}
265265

266-
func (di *activeFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos {
266+
func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) {
267267
// TODO: don't read tombstones twice for search
268268
// TODO: optimize all these maps creation and slices traversal
269269

@@ -272,9 +272,13 @@ func (di *activeFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos {
272272
docsPos[i] = di.docsPositions.GetSync(id)
273273
}
274274

275-
tombstoneLIDs, _ := di.docsFilter.GetFilteredLIDsByFrac(di.fracName) // TODO: handle the error
275+
tombstoneLIDs, err := di.docsFilter.GetFilteredLIDsByFrac(di.fracName)
276+
if err != nil {
277+
return nil, err
278+
}
279+
276280
if len(tombstoneLIDs) == 0 {
277-
return docsPos
281+
return docsPos, nil
278282
}
279283

280284
allLids := make([]seq.LID, len(ids))
@@ -295,7 +299,7 @@ func (di *activeFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos {
295299
}
296300
}
297301

298-
return docsPos
302+
return docsPos, nil
299303
}
300304

301305
func (di *activeFetchIndex) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) {

frac/fraction.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type Fraction interface {
2121
Contains(mid seq.MID) bool
2222
Fetch(context.Context, []seq.ID) ([][]byte, error)
2323
Search(context.Context, processor.SearchParams) (*seq.QPR, error)
24-
FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) // TODO: ???
24+
FindLIDs(context.Context, []seq.ID) ([]seq.LID, error)
2525
}
2626

2727
var (

frac/processor/fetch.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ import (
77

88
type fetchIndex interface {
99
GetBlocksOffsets(uint32) uint64
10-
GetDocPos([]seq.ID) []seq.DocPos
10+
GetDocPos([]seq.ID) ([]seq.DocPos, error)
1111
ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error)
1212
}
1313

1414
func IndexFetch(ids []seq.ID, sw *stopwatch.Stopwatch, fetchIndex fetchIndex, res [][]byte) error {
1515
m := sw.Start("get_docs_pos")
16-
docsPos := fetchIndex.GetDocPos(ids)
16+
docsPos, err := fetchIndex.GetDocPos(ids)
17+
if err != nil {
18+
return err
19+
}
1720
blocks, offsets, index := seq.GroupDocsOffsets(docsPos)
1821
m.Stop()
1922

frac/sealed_index.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,15 +279,18 @@ func (fi *sealedFetchIndex) GetBlocksOffsets(num uint32) uint64 {
279279
return fi.blocksOffsets[num]
280280
}
281281

282-
func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos {
282+
func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) {
283283
// TODO: don't read tombstones twice for search
284284
// TODO: optimize all these maps creation and slices traversal
285285

286286
allLids := fi.findLIDs(ids)
287-
tombstoneLIDs, _ := fi.docsFilter.GetFilteredLIDsByFrac(fi.fracName) // TODO: handle the error
287+
tombstoneLIDs, err := fi.docsFilter.GetFilteredLIDsByFrac(fi.fracName)
288+
if err != nil {
289+
return nil, err
290+
}
288291

289292
if len(tombstoneLIDs) == 0 {
290-
return fi.getDocPosByLIDs(allLids)
293+
return fi.getDocPosByLIDs(allLids), nil
291294
}
292295

293296
filtersLIDsMap := make(map[seq.LID]struct{}, len(tombstoneLIDs))
@@ -301,7 +304,7 @@ func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos {
301304
}
302305
}
303306

304-
return fi.getDocPosByLIDs(allLids)
307+
return fi.getDocPosByLIDs(allLids), nil
305308
}
306309

307310
func (fi *sealedFetchIndex) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) {

0 commit comments

Comments
 (0)