Skip to content

Commit 03387c3

Browse files
authored
feat(fracmanager): add handling for upload queue overflow and disk space exhaustion (#266)
1 parent ad03474 commit 03387c3

16 files changed

Lines changed: 436 additions & 39 deletions

cmd/seq-db/seq-db.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ func startStore(
253253
DataDir: cfg.Storage.DataDir,
254254
FracSize: uint64(cfg.Storage.FracSize),
255255
TotalSize: uint64(cfg.Storage.TotalSize),
256+
SealingQueueLen: uint64(cfg.Storage.SealingQueueLen),
256257
CacheSize: uint64(cfg.Resources.CacheSize),
257258
SortCacheSize: uint64(cfg.Resources.SortDocsCacheSize),
258259
ReplayWorkers: cfg.Resources.ReplayWorkers,
@@ -281,8 +282,10 @@ func startStore(
281282
SkipSortDocs: !cfg.DocsSorting.Enabled,
282283
KeepMetaFile: false,
283284
},
284-
OffloadingEnabled: cfg.Offloading.Enabled,
285-
OffloadingRetention: cfg.Offloading.Retention,
285+
OffloadingEnabled: cfg.Offloading.Enabled,
286+
OffloadingRetention: cfg.Offloading.Retention,
287+
OffloadingRetryDelay: cfg.Offloading.RetryDelay,
288+
OffloadingQueueSize: uint64(float64(cfg.Storage.TotalSize) * cfg.Offloading.QueueSizePercent / 100),
286289
},
287290
API: storeapi.APIConfig{
288291
StoreMode: configMode,
@@ -342,7 +345,6 @@ func initS3Client(cfg config.Config) *s3.Client {
342345
cfg.Offloading.SecretKey,
343346
cfg.Offloading.Region,
344347
cfg.Offloading.Bucket,
345-
cfg.Offloading.RetryCount,
346348
)
347349

348350
if err != nil {

config/config.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ type Config struct {
6363
// TotalSize specifies upper bound of how much disk space can be occupied
6464
// by sealed fractions before they get deleted (or offloaded).
6565
TotalSize Bytes `config:"total_size" default:"1GiB"`
66+
// SealingQueueLen defines the maximum length of the sealing queue.
67+
// If the queue size exceeds this limit, writing to the store will be paused,
68+
// and bulk requests will start returning errors.
69+
// A value of zero disables this limit, allowing writes to proceed unconditionally.
70+
SealingQueueLen int `config:"sealing_queue_len" default:"10"`
6671
} `config:"storage"`
6772

6873
Cluster struct {
@@ -241,9 +246,12 @@ type Config struct {
241246
// SecretKey configures S3 Secret Key for S3 client.
242247
// You can learn more about secret keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html).
243248
SecretKey string `config:"secret_key"`
244-
// RetryCount sets [RetryMaxAttempts] for S3 client which is applied for all API calls.
245-
// Be aware that fraction is suicided when offloading attempts exceeds [RetryCount].
246-
RetryCount int `config:"retry_count" default:"5"`
249+
// Specifies the percentage of total local dataset size allocated to the offloading queue.
250+
// Note: When the queue overflows, the oldest fraction of data is automatically removed.
251+
// This automatic removal is disabled when set to zero.
252+
QueueSizePercent float64 `config:"queue_size_percent" default:"5"`
253+
// Delay duration between consecutive offloading retries
254+
RetryDelay time.Duration `config:"retry_delay" default:"2s"`
247255
} `config:"offloading"`
248256

249257
AsyncSearch struct {

config/validation.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@ func (c *Config) storeValidations() []validateFn {
6464
greaterThan("resources.search_workers", 0, c.Resources.SearchWorkers),
6565
greaterThan("resources.replay_workers", 0, c.Resources.ReplayWorkers),
6666
greaterThan("resources.cache_size", 0, c.Resources.CacheSize),
67+
greaterThan("storage.sealing_queue_len", -1, c.Storage.SealingQueueLen),
6768

6869
inRange("compression.sealed_zstd_compression_level", -7, 22, c.Compression.SealedZstdCompressionLevel),
6970
inRange("compression.doc_block_zstd_compression_level", -7, 22, c.Compression.DocBlockZstdCompressionLevel),
71+
inRange("offloading.queue_size_percent", 0, 100, c.Offloading.QueueSizePercent),
7072

7173
greaterThan("experimental.max_regex_tokens_check", -1, c.Experimental.MaxRegexTokensCheck),
7274
}

config/validation_test.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package config
2+
3+
import (
4+
"os"
5+
"path"
6+
"path/filepath"
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestValidation(t *testing.T) {
13+
base := `storage:
14+
data_dir: /seq-db-data
15+
frac_size: 16MiB
16+
total_size: 10GiB
17+
18+
mapping:
19+
path: /configs/mapping.yaml
20+
21+
resources:
22+
cache_size: 2GiB
23+
24+
limits:
25+
query_rate: 1024
26+
search_requests: 1024
27+
bulk_requests: 128
28+
inflight_bulks: 128
29+
doc_size: 1MiB
30+
`
31+
32+
baseCfg := createCfgFile(t, base)
33+
34+
tests := []struct {
35+
name string
36+
cfg string
37+
env map[string]string
38+
expectErr bool
39+
}{
40+
{
41+
name: "Invalid storage.sealing_queue_len 1",
42+
cfg: baseCfg,
43+
env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "-1"},
44+
expectErr: true,
45+
},
46+
{
47+
name: "Valid storage.sealing_queue_len 2",
48+
cfg: baseCfg,
49+
env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "0"},
50+
expectErr: false,
51+
},
52+
{
53+
name: "Valid storage.sealing_queue_len 3",
54+
cfg: baseCfg,
55+
env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "100"},
56+
expectErr: false,
57+
},
58+
59+
{
60+
name: "Invalid offloading.queue_size_percent 1",
61+
cfg: baseCfg,
62+
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "-1"},
63+
expectErr: true,
64+
},
65+
{
66+
name: "Invalid offloading.queue_size_percent 2",
67+
cfg: baseCfg,
68+
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100.1"},
69+
expectErr: true,
70+
},
71+
{
72+
name: "Valid offloading.queue_size_percent 3",
73+
cfg: baseCfg,
74+
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "0"},
75+
expectErr: false,
76+
},
77+
{
78+
name: "Valid offloading.queue_size_percent 4",
79+
cfg: baseCfg,
80+
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100"},
81+
expectErr: false,
82+
},
83+
{
84+
name: "Valid offloading.queue_size_percent 5",
85+
cfg: baseCfg,
86+
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "50"},
87+
expectErr: false,
88+
},
89+
}
90+
91+
for _, tt := range tests {
92+
t.Run(tt.name, func(t *testing.T) {
93+
for k, v := range tt.env {
94+
t.Setenv(k, v)
95+
}
96+
97+
c, err := Parse(tt.cfg)
98+
assert.NoError(t, err)
99+
100+
res := c.Validate("store")
101+
if tt.expectErr {
102+
assert.Error(t, res)
103+
} else {
104+
assert.NoError(t, res)
105+
}
106+
})
107+
}
108+
109+
}
110+
111+
func createCfgFile(t *testing.T, data string) string {
112+
f := path.Join(t.TempDir(), "config.yaml")
113+
err := os.WriteFile(f, []byte(data), 0o666)
114+
assert.NoError(t, err)
115+
116+
abs, err := filepath.Abs(f)
117+
assert.NoError(t, err)
118+
return abs
119+
}

frac/fraction_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2134,7 +2134,6 @@ func (s *RemoteFractionTestSuite) SetupTest() {
21342134
"SECRET_KEY",
21352135
"eu-west-3",
21362136
bucketName,
2137-
3,
21382137
)
21392138
s.Require().NoError(err, "s3 client setup failed")
21402139

fracmanager/config.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ type Config struct {
1919
TotalSize uint64
2020
CacheSize uint64
2121

22+
suspendThreshold uint64
23+
SealingQueueLen uint64
24+
2225
ReplayWorkers int
2326
MaintenanceDelay time.Duration
2427
CacheCleanupDelay time.Duration
@@ -28,8 +31,10 @@ type Config struct {
2831
Fraction frac.Config
2932
MinSealFracSize uint64
3033

31-
OffloadingEnabled bool
32-
OffloadingRetention time.Duration
34+
OffloadingEnabled bool
35+
OffloadingQueueSize uint64
36+
OffloadingRetention time.Duration
37+
OffloadingRetryDelay time.Duration
3338
}
3439

3540
func FillConfigWithDefault(config *Config) *Config {
@@ -82,3 +87,14 @@ func FillConfigWithDefault(config *Config) *Config {
8287

8388
return config
8489
}
90+
91+
func (cfg *Config) SuspendThreshold() uint64 {
92+
if cfg.suspendThreshold == 0 {
93+
cfg.suspendThreshold = cfg.TotalSize
94+
cfg.suspendThreshold += cfg.TotalSize / 100 // small buffer
95+
if cfg.OffloadingEnabled {
96+
cfg.suspendThreshold += cfg.OffloadingQueueSize // offloading queue size
97+
}
98+
}
99+
return cfg.suspendThreshold
100+
}

fracmanager/fracmanager.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,18 @@ func (fm *FracManager) Append(ctx context.Context, docs, metas storage.DocBlock)
128128
}
129129
}
130130

131+
// Perform fraction maintenance (rotation, truncating, offloading, etc.)
132+
func (fm *FracManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) {
133+
n := time.Now()
134+
logger.Debug("maintenance iteration started")
135+
136+
fm.mu.Lock()
137+
defer fm.mu.Unlock()
138+
139+
fm.lc.Maintain(ctx, cfg, wg)
140+
logger.Debug("maintenance iteration finished", zap.Int64("took_ms", time.Since(n).Milliseconds()))
141+
}
142+
131143
// startCacheWorker starts background cache garbage collection
132144
func startCacheWorker(ctx context.Context, cfg *Config, cache *CacheMaintainer, wg *sync.WaitGroup) {
133145
wg.Add(1)
@@ -161,21 +173,18 @@ func startStatsWorker(ctx context.Context, reg *fractionRegistry, wg *sync.WaitG
161173
// startMaintWorker starts periodic fraction maintenance operations
162174
func startMaintWorker(ctx context.Context, cfg *Config, fm *FracManager, wg *sync.WaitGroup) {
163175
wg.Add(1)
176+
maintWg := sync.WaitGroup{}
177+
164178
go func() {
165179
defer wg.Done()
166180

167181
logger.Info("maintenance loop is started")
168182
// Run maintenance at configured interval
169183
util.RunEvery(ctx.Done(), cfg.MaintenanceDelay, func() {
170-
n := time.Now()
171-
logger.Debug("maintenance iteration started")
172-
fm.mu.Lock()
173-
// Perform fraction maintenance (rotation, truncating, offloading, etc.)
174-
fm.lc.Maintain(ctx, cfg, wg)
175-
fm.mu.Unlock()
176-
logger.Debug("maintenance iteration finished", zap.Int64("took_ms", time.Since(n).Milliseconds()))
184+
fm.Maintain(ctx, cfg, &maintWg)
177185
})
178-
logger.Info("waiting maintenance complete background tasks")
186+
logger.Info("waiting maintenance complete background tasks...")
187+
maintWg.Wait()
179188
logger.Info("maintenance loop is stopped")
180189
}()
181190
}

fracmanager/fraction_provider_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func setupS3Client(t testing.TB) (*s3.Client, func()) {
2626
err := s3Backend.CreateBucket(bucketName)
2727
require.NoError(t, err, "create bucket failed")
2828

29-
s3cli, err := s3.NewClient(s3server.URL, "ACCESS_KEY", "SECRET_KEY", "eu-west-3", bucketName, 3)
29+
s3cli, err := s3.NewClient(s3server.URL, "ACCESS_KEY", "SECRET_KEY", "eu-west-3", bucketName)
3030
require.NoError(t, err, "s3 client setup failed")
3131

3232
return s3cli, s3server.Close

0 commit comments

Comments
 (0)