Skip to content

Commit 3e082e3

Browse files
committed
refactor(fracmanager): using fifo queues of fractions
1 parent 11c8b48 commit 3e082e3

28 files changed

Lines changed: 1380 additions & 1480 deletions

cmd/seq-db/seq-db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ func startStore(
259259
MaintenanceDelay: 0,
260260
CacheGCDelay: 0,
261261
CacheCleanupDelay: 0,
262+
MinSealFracSize: uint64(cfg.Storage.TotalSize) * consts.DefaultMinSealPercent / 100,
262263
SealParams: common.SealParams{
263264
IDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
264265
LIDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,

frac/active.go

Lines changed: 17 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ type Active struct {
3535

3636
BaseFileName string
3737

38-
useMu sync.RWMutex
39-
suicided bool
40-
released bool
41-
4238
infoMu sync.RWMutex
4339
info *common.Info
4440

@@ -269,40 +265,18 @@ func (f *Active) String() string {
269265
}
270266

271267
func (f *Active) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
272-
dp, release := f.DataProvider(ctx)
273-
defer release()
274-
if dp == nil {
275-
return EmptyFraction.Fetch(ctx, ids)
268+
if f.Info().DocsTotal == 0 { // it is empty active fraction state
269+
return nil, nil
276270
}
277-
return dp.Fetch(ids)
271+
return f.createDataProvider(ctx).Fetch(ids)
278272
}
279273

280274
func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
281-
dp, release := f.DataProvider(ctx)
282-
defer release()
283-
if dp == nil {
284-
return EmptyFraction.Search(ctx, params)
285-
}
286-
return dp.Search(params)
287-
}
288-
289-
func (f *Active) DataProvider(ctx context.Context) (*activeDataProvider, func()) {
290-
f.useMu.RLock()
291-
292-
if f.suicided || f.released || f.Info().DocsTotal == 0 { // it is empty active fraction state
293-
if f.suicided {
294-
metric.CountersTotal.WithLabelValues("fraction_suicided").Inc()
295-
}
296-
f.useMu.RUnlock()
297-
return nil, func() {}
298-
}
299-
300-
// it is ordinary active fraction state
301-
dp := f.createDataProvider(ctx)
302-
return dp, func() {
303-
dp.release()
304-
f.useMu.RUnlock()
275+
if f.Info().DocsTotal == 0 { // it is empty active fraction state
276+
metric.CountersTotal.WithLabelValues("empty_data_provider").Inc()
277+
return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil
305278
}
279+
return f.createDataProvider(ctx).Search(params)
306280
}
307281

308282
func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider {
@@ -338,49 +312,24 @@ func (f *Active) IsIntersecting(from, to seq.MID) bool {
338312
}
339313

340314
func (f *Active) Release() {
341-
f.useMu.Lock()
342-
f.released = true
343-
f.useMu.Unlock()
344-
345315
f.releaseMem()
346316

347317
if !f.Config.KeepMetaFile {
348-
f.removeMetaFile()
318+
util.RemoveFile(f.metaFile.Name())
349319
}
350320

351321
if !f.Config.SkipSortDocs {
352322
// we use sorted docs in sealed fraction so we can remove original docs of active fraction
353-
f.removeDocsFiles()
323+
util.RemoveFile(f.docsFile.Name())
354324
}
355325
}
356326

357-
// Offload for [Active] fraction is no-op.
358-
//
359-
// Since search within [Active] fraction is too costly (we have to replay the whole index in memory),
360-
// we decided to support offloading only for [Sealed] fractions.
361-
func (f *Active) Offload(context.Context, storage.Uploader) (bool, error) {
362-
return false, nil
363-
}
364-
365327
func (f *Active) Suicide() {
366-
f.useMu.Lock()
367-
released := f.released
368-
f.suicided = true
369-
f.released = true
370-
f.useMu.Unlock()
371-
372-
if released { // fraction can be suicided after release
373-
if f.Config.KeepMetaFile {
374-
f.removeMetaFile() // meta was not removed while release
375-
}
376-
if f.Config.SkipSortDocs {
377-
f.removeDocsFiles() // docs was not removed while release
378-
}
379-
} else { // was not release
380-
f.releaseMem()
381-
f.removeMetaFile()
382-
f.removeDocsFiles()
383-
}
328+
f.releaseMem()
329+
330+
util.RemoveFile(f.metaFile.Name())
331+
util.RemoveFile(f.docsFile.Name())
332+
util.RemoveFile(f.BaseFileName + consts.SdocsFileSuffix)
384333
}
385334

386335
func (f *Active) releaseMem() {
@@ -393,24 +342,12 @@ func (f *Active) releaseMem() {
393342
if err := f.metaFile.Close(); err != nil {
394343
logger.Error("can't close meta file", zap.String("frac", f.BaseFileName), zap.Error(err))
395344
}
345+
if err := f.docsFile.Close(); err != nil {
346+
logger.Error("can't close docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
347+
}
396348

397349
f.RIDs = nil
398350
f.MIDs = nil
399351
f.TokenList = nil
400352
f.DocsPositions = nil
401353
}
402-
403-
func (f *Active) removeDocsFiles() {
404-
if err := f.docsFile.Close(); err != nil {
405-
logger.Error("can't close docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
406-
}
407-
if err := os.Remove(f.docsFile.Name()); err != nil {
408-
logger.Error("can't delete docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
409-
}
410-
}
411-
412-
func (f *Active) removeMetaFile() {
413-
if err := os.Remove(f.metaFile.Name()); err != nil {
414-
logger.Error("can't delete metas file", zap.String("frac", f.BaseFileName), zap.Error(err))
415-
}
416-
}

frac/active_indexer.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ type ActiveIndexer struct {
1818
ch chan *indexTask
1919
chMerge chan *mergeTask
2020
workerCount int
21-
22-
stopFn func()
2321
}
2422

2523
type indexTask struct {
@@ -34,12 +32,14 @@ type mergeTask struct {
3432
tokenLIDs *TokenLIDs
3533
}
3634

37-
func NewActiveIndexer(workerCount, chLen int) *ActiveIndexer {
38-
return &ActiveIndexer{
35+
func NewActiveIndexer(workerCount, chLen int) (*ActiveIndexer, func()) {
36+
idx := ActiveIndexer{
3937
ch: make(chan *indexTask, chLen),
4038
chMerge: make(chan *mergeTask, chLen),
4139
workerCount: workerCount,
4240
}
41+
stopIdx := idx.start()
42+
return &idx, stopIdx
4343
}
4444

4545
func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) {
@@ -53,7 +53,7 @@ func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, s
5353
m.Stop()
5454
}
5555

56-
func (ai *ActiveIndexer) Start() {
56+
func (ai *ActiveIndexer) start() func() {
5757
wg := sync.WaitGroup{}
5858
wg.Add(ai.workerCount)
5959

@@ -72,13 +72,10 @@ func (ai *ActiveIndexer) Start() {
7272
}()
7373
}
7474

75-
ai.stopFn = func() {
75+
return func() {
7676
close(ai.ch)
7777
close(ai.chMerge)
78-
7978
wg.Wait()
80-
81-
ai.stopFn = nil
8279
}
8380
}
8481

@@ -88,12 +85,6 @@ func (ai *ActiveIndexer) mergeWorker() {
8885
}
8986
}
9087

91-
func (ai *ActiveIndexer) Stop() {
92-
if ai.stopFn != nil {
93-
ai.stopFn()
94-
}
95-
}
96-
9788
var metaDataPool = sync.Pool{
9889
New: func() any {
9990
return new(indexer.MetaData)

frac/active_indexer_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,8 @@ func getTestProcessor() *indexer.Processor {
7676

7777
func BenchmarkIndexer(b *testing.B) {
7878
logger.SetLevel(zapcore.FatalLevel)
79-
idx := NewActiveIndexer(8, 8)
80-
idx.Start()
81-
defer idx.Stop()
79+
idx, stop := NewActiveIndexer(8, 8)
80+
defer stop()
8281

8382
allLogs, err := readFileAllAtOnce(filepath.Join(common.TestDataDir, "k8s.logs"))
8483
readers := splitLogsToBulks(allLogs, 1000)

frac/empty.go

Lines changed: 0 additions & 50 deletions
This file was deleted.

frac/fraction.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/ozontech/seq-db/frac/processor"
1414
"github.com/ozontech/seq-db/metric"
1515
"github.com/ozontech/seq-db/seq"
16-
"github.com/ozontech/seq-db/storage"
1716
)
1817

1918
type Fraction interface {
@@ -22,8 +21,6 @@ type Fraction interface {
2221
Contains(mid seq.MID) bool
2322
Fetch(context.Context, []seq.ID) ([][]byte, error)
2423
Search(context.Context, processor.SearchParams) (*seq.QPR, error)
25-
Offload(ctx context.Context, u storage.Uploader) (bool, error)
26-
Suicide()
2724
}
2825

2926
var (

0 commit comments

Comments
 (0)