Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func startStore(
DataDir: cfg.Storage.DataDir,
FracSize: uint64(cfg.Storage.FracSize),
TotalSize: uint64(cfg.Storage.TotalSize),
SealingQueueLen: uint64(cfg.Storage.SealingQueueLen),
CacheSize: uint64(cfg.Resources.CacheSize),
SortCacheSize: uint64(cfg.Resources.SortDocsCacheSize),
ReplayWorkers: cfg.Resources.ReplayWorkers,
Expand Down Expand Up @@ -281,8 +282,10 @@ func startStore(
SkipSortDocs: !cfg.DocsSorting.Enabled,
KeepMetaFile: false,
},
OffloadingEnabled: cfg.Offloading.Enabled,
OffloadingRetention: cfg.Offloading.Retention,
OffloadingEnabled: cfg.Offloading.Enabled,
OffloadingRetention: cfg.Offloading.Retention,
OffloadingRetryDelay: cfg.Offloading.RetryDelay,
OffloadingQueueSize: uint64(float64(cfg.Storage.TotalSize) * cfg.Offloading.QueueSizePercent / 100),
},
API: storeapi.APIConfig{
StoreMode: configMode,
Expand Down Expand Up @@ -342,7 +345,6 @@ func initS3Client(cfg config.Config) *s3.Client {
cfg.Offloading.SecretKey,
cfg.Offloading.Region,
cfg.Offloading.Bucket,
cfg.Offloading.RetryCount,
)

if err != nil {
Expand Down
14 changes: 11 additions & 3 deletions config/config.go
Comment thread
dkharms marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ type Config struct {
// TotalSize specifies upper bound of how much disk space can be occupied
// by sealed fractions before they get deleted (or offloaded).
TotalSize Bytes `config:"total_size" default:"1GiB"`
// SealingQueueLen defines the maximum length of the sealing queue.
// If the queue size exceeds this limit, writing to the store will be paused,
// and bulk requests will start returning errors.
// A value of zero disables this limit, allowing writes to proceed unconditionally.
SealingQueueLen int `config:"sealing_queue_len" default:"10"`
} `config:"storage"`

Cluster struct {
Expand Down Expand Up @@ -241,9 +246,12 @@ type Config struct {
// SecretKey configures S3 Secret Key for S3 client.
// You can learn more about secret keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html).
SecretKey string `config:"secret_key"`
// RetryCount sets [RetryMaxAttempts] for S3 client which is applied for all API calls.
// Be aware that fraction is suicided when offloading attempts exceeds [RetryCount].
RetryCount int `config:"retry_count" default:"5"`
// Specifies the percentage of total local dataset size allocated to the offloading queue.
// Note: When the queue overflows, the oldest fraction of data is automatically removed.
// This automatic removal is disabled when set to zero.
QueueSizePercent float64 `config:"queue_size_percent" default:"5"`
// Delay duration between consecutive offloading retries
RetryDelay time.Duration `config:"retry_delay" default:"2s"`
} `config:"offloading"`

AsyncSearch struct {
Expand Down
2 changes: 2 additions & 0 deletions config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ func (c *Config) storeValidations() []validateFn {
greaterThan("resources.search_workers", 0, c.Resources.SearchWorkers),
greaterThan("resources.replay_workers", 0, c.Resources.ReplayWorkers),
greaterThan("resources.cache_size", 0, c.Resources.CacheSize),
greaterThan("storage.sealing_queue_len", -1, c.Storage.SealingQueueLen),

inRange("compression.sealed_zstd_compression_level", -7, 22, c.Compression.SealedZstdCompressionLevel),
inRange("compression.doc_block_zstd_compression_level", -7, 22, c.Compression.DocBlockZstdCompressionLevel),
inRange("offloading.queue_size_percent", 0, 100, c.Offloading.QueueSizePercent),

greaterThan("experimental.max_regex_tokens_check", -1, c.Experimental.MaxRegexTokensCheck),
}
Expand Down
119 changes: 119 additions & 0 deletions config/validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package config

import (
"os"
"path"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidation(t *testing.T) {
base := `storage:
data_dir: /seq-db-data
frac_size: 16MiB
total_size: 10GiB

mapping:
path: /configs/mapping.yaml

resources:
cache_size: 2GiB

limits:
query_rate: 1024
search_requests: 1024
bulk_requests: 128
inflight_bulks: 128
doc_size: 1MiB
`

baseCfg := createCfgFile(t, base)

tests := []struct {
name string
cfg string
env map[string]string
expectErr bool
}{
{
name: "Invalid storage.sealing_queue_len 1",
cfg: baseCfg,
env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "-1"},
expectErr: true,
},
{
name: "Valid storage.sealing_queue_len 2",
cfg: baseCfg,
env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "0"},
expectErr: false,
},
{
name: "Valid storage.sealing_queue_len 3",
cfg: baseCfg,
env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "100"},
expectErr: false,
},

{
name: "Invalid offloading.queue_size_percent 1",
cfg: baseCfg,
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "-1"},
expectErr: true,
},
{
name: "Invalid offloading.queue_size_percent 2",
cfg: baseCfg,
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100.1"},
expectErr: true,
},
{
name: "Valid offloading.queue_size_percent 3",
cfg: baseCfg,
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "0"},
expectErr: false,
},
{
name: "Valid offloading.queue_size_percent 4",
cfg: baseCfg,
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100"},
expectErr: false,
},
{
name: "Valid offloading.queue_size_percent 5",
cfg: baseCfg,
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "50"},
expectErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for k, v := range tt.env {
t.Setenv(k, v)
}

c, err := Parse(tt.cfg)
assert.NoError(t, err)

res := c.Validate("store")
if tt.expectErr {
assert.Error(t, res)
} else {
assert.NoError(t, res)
}
})
}

}

func createCfgFile(t *testing.T, data string) string {
f := path.Join(t.TempDir(), "config.yaml")
err := os.WriteFile(f, []byte(data), 0666)

Check failure on line 113 in config/validation_test.go

View workflow job for this annotation

GitHub Actions / lint

octalLiteral: use new octal literal style, 0o666 (gocritic)
assert.NoError(t, err)

abs, err := filepath.Abs(f)
assert.NoError(t, err)
return abs
}
1 change: 0 additions & 1 deletion frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2134,7 +2134,6 @@ func (s *RemoteFractionTestSuite) SetupTest() {
"SECRET_KEY",
"eu-west-3",
bucketName,
3,
)
s.Require().NoError(err, "s3 client setup failed")

Expand Down
9 changes: 7 additions & 2 deletions fracmanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Config struct {
TotalSize uint64
CacheSize uint64

SuspendThreshold uint64
SealingQueueLen uint64

ReplayWorkers int
MaintenanceDelay time.Duration
CacheCleanupDelay time.Duration
Expand All @@ -28,8 +31,10 @@ type Config struct {
Fraction frac.Config
MinSealFracSize uint64

OffloadingEnabled bool
OffloadingRetention time.Duration
OffloadingEnabled bool
OffloadingQueueSize uint64
OffloadingRetention time.Duration
OffloadingRetryDelay time.Duration
}

func FillConfigWithDefault(config *Config) *Config {
Expand Down
25 changes: 17 additions & 8 deletions fracmanager/fracmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ func (fm *FracManager) Append(ctx context.Context, docs, metas storage.DocBlock)
}
}

// Perform fraction maintenance (rotation, truncating, offloading, etc.)
func (fm *FracManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) {
n := time.Now()
logger.Debug("maintenance iteration started")

fm.mu.Lock()
defer fm.mu.Unlock()

fm.lc.Maintain(ctx, cfg, wg)
logger.Debug("maintenance iteration finished", zap.Int64("took_ms", time.Since(n).Milliseconds()))
}

// startCacheWorker starts background cache garbage collection
func startCacheWorker(ctx context.Context, cfg *Config, cache *CacheMaintainer, wg *sync.WaitGroup) {
wg.Add(1)
Expand Down Expand Up @@ -161,21 +173,18 @@ func startStatsWorker(ctx context.Context, reg *fractionRegistry, wg *sync.WaitG
// startMaintWorker starts periodic fraction maintenance operations
func startMaintWorker(ctx context.Context, cfg *Config, fm *FracManager, wg *sync.WaitGroup) {
wg.Add(1)
maintWg := sync.WaitGroup{}

go func() {
defer wg.Done()

logger.Info("maintenance loop is started")
// Run maintenance at configured interval
util.RunEvery(ctx.Done(), cfg.MaintenanceDelay, func() {
n := time.Now()
logger.Debug("maintenance iteration started")
fm.mu.Lock()
// Perform fraction maintenance (rotation, truncating, offloading, etc.)
fm.lc.Maintain(ctx, cfg, wg)
fm.mu.Unlock()
logger.Debug("maintenance iteration finished", zap.Int64("took_ms", time.Since(n).Milliseconds()))
fm.Maintain(ctx, cfg, &maintWg)
})
logger.Info("waiting maintenance complete background tasks")
logger.Info("waiting maintenance complete background tasks...")
maintWg.Wait()
logger.Info("maintenance loop is stopped")
}()
}
Expand Down
2 changes: 1 addition & 1 deletion fracmanager/fraction_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func setupS3Client(t testing.TB) (*s3.Client, func()) {
err := s3Backend.CreateBucket(bucketName)
require.NoError(t, err, "create bucket failed")

s3cli, err := s3.NewClient(s3server.URL, "ACCESS_KEY", "SECRET_KEY", "eu-west-3", bucketName, 3)
s3cli, err := s3.NewClient(s3server.URL, "ACCESS_KEY", "SECRET_KEY", "eu-west-3", bucketName)
require.NoError(t, err, "s3 client setup failed")

return s3cli, s3server.Close
Expand Down
71 changes: 71 additions & 0 deletions fracmanager/fraction_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active
curInfo := old.instance.Info()
r.stats.sealing.Add(curInfo)

r.active.Suspend(old.Suspended())

wg := sync.WaitGroup{}
wg.Add(1)
// since old.WaitWriteIdle() can take some time, we don't want to do it under the lock
Expand All @@ -153,6 +155,30 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active
return old, wg.Wait, nil
}

func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) {
r.mu.Lock()
defer r.mu.Unlock()

if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) {
r.active.Suspend(true)
return
}

if maxSize > 0 && r.diskUsage() > maxSize {
r.active.Suspend(true)
return
}

r.active.Suspend(false)
}

func (r *fractionRegistry) diskUsage() uint64 {
return r.active.instance.Info().FullSize() +
r.stats.sealed.totalSizeOnDisk +
r.stats.sealing.totalSizeOnDisk +
r.stats.offloading.totalSizeOnDisk
}

// addActive sets a new active fraction and updates the complete fractions list.
func (r *fractionRegistry) addActive(a *activeProxy) {
r.muAll.Lock()
Expand Down Expand Up @@ -229,6 +255,10 @@ func (r *fractionRegistry) EvictLocal(shouldOffload bool, sizeLimit uint64) ([]*
// Fractions older than retention period are permanently deleted.
// Returns removed fractions or empty slice if nothing to remove.
func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy {
if retention == 0 {
return nil
}

r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -250,6 +280,42 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy {
return evicted
}

// EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit.
// Selects fractions that haven't finished offloading yet to minimize data loss.
// Used when offloading queue grows too large due to slow remote storage performance.
func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy {
if sizeLimit == 0 {
return nil
}

r.mu.Lock()
defer r.mu.Unlock()

// Fast path: skip processing if within size limits
if r.stats.offloading.totalSizeOnDisk <= sizeLimit {
return nil
}

count := 0
evicted := []*sealedProxy{}
// filter fractions
for _, item := range r.offloading {
// keep items that are within limits or already offloaded
if r.stats.offloading.totalSizeOnDisk <= sizeLimit || item.remote != nil {
r.offloading[count] = item
count++
continue
}
evicted = append(evicted, item)
r.stats.offloading.Sub(item.instance.Info())
}

r.offloading = r.offloading[:count]
r.rebuildAllFractions()

return evicted
}

// PromoteToSealed moves fractions from sealing to local queue when sealing completes.
// Maintains strict ordering - younger fractions wait for older ones to seal first.
func (r *fractionRegistry) PromoteToSealed(active *activeProxy, sealed *frac.Sealed) {
Expand Down Expand Up @@ -324,6 +390,11 @@ func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) {
count++
}
}

if count == len(r.offloading) { // not found to remove (can be removed earlier in EvictOverflowed)
return
}

r.offloading = r.offloading[:count]
r.stats.offloading.Sub(sealed.instance.Info())

Expand Down
Loading
Loading