Skip to content

Commit 4abfc8c

Browse files
committed
feat(fracmanager) add handling for upload queue overflow and disk space exhaustion
1 parent a88cd5b commit 4abfc8c

10 files changed

Lines changed: 270 additions & 66 deletions

File tree

cmd/seq-db/seq-db.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,10 @@ func startStore(
280280
SkipSortDocs: !cfg.DocsSorting.Enabled,
281281
KeepMetaFile: false,
282282
},
283-
OffloadingEnabled: cfg.Offloading.Enabled,
284-
OffloadingRetention: cfg.Offloading.Retention,
283+
OffloadingEnabled: cfg.Offloading.Enabled,
284+
OffloadingRetention: cfg.Offloading.Retention,
285+
OffloadingRetryDelay: cfg.Offloading.RetryDelay,
286+
OffloadingQueueSize: uint64(float64(cfg.Storage.TotalSize) * cfg.Offloading.QueueSizePercent / 100),
285287
},
286288
API: storeapi.APIConfig{
287289
StoreMode: configMode,

config/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,13 @@ type Config struct {
234234
// You can learn more about secret keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html).
235235
SecretKey string `config:"secret_key"`
236236
// RetryCount sets [RetryMaxAttempts] for S3 client which is applied for all API calls.
237-
// Be aware that fraction is suicided when offloading attempts exceeds [RetryCount].
238237
RetryCount int `config:"retry_count" default:"5"`
238+
// Specifies the percentage of total local dataset size allocated to the offloading queue.
239+
// Note: When the queue overflows, the oldest fraction of data is automatically removed.
240+
// This automatic removal is disabled when set to zero.
241+
QueueSizePercent float64 `config:"queue_size_percent" default:"5"`
242+
// Delay duration between consecutive offloading retries
243+
RetryDelay time.Duration `config:"retry_delay" default:"2s"`
239244
} `config:"offloading"`
240245

241246
AsyncSearch struct {

fracmanager/config.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ type Config struct {
2828
Fraction frac.Config
2929
MinSealFracSize uint64
3030

31-
OffloadingEnabled bool
32-
OffloadingRetention time.Duration
31+
OffloadingEnabled bool
32+
OffloadingQueueSize uint64
33+
OffloadingRetention time.Duration
34+
OffloadingRetryDelay time.Duration
3335
}
3436

3537
func FillConfigWithDefault(config *Config) *Config {

fracmanager/fracmanager.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client) (*FracManager, Stop
8181

8282
// Freeze active fraction to prevent new writes
8383
active := lc.registry.Active()
84-
if err := active.Freeze(); err != nil {
84+
if err := active.Finalize(); err != nil {
8585
logger.Fatal("shutdown fraction freezing error", zap.Error(err))
8686
}
8787

@@ -100,11 +100,6 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client) (*FracManager, Stop
100100
return &fm, stop, nil
101101
}
102102

103-
// Writer returns the active fraction for write operations
104-
func (fm *FracManager) Writer() *activeProxy {
105-
return fm.lc.registry.Active()
106-
}
107-
108103
func (fm *FracManager) Fractions() List {
109104
return fm.lc.registry.AllFractions()
110105
}
@@ -119,24 +114,27 @@ func (fm *FracManager) Flags() *StateManager {
119114

120115
// Active returns the currently active fraction
121116
func (fm *FracManager) Active() frac.Fraction {
122-
return fm.Writer().proxy
117+
return fm.lc.registry.Active().proxy
123118
}
124119

125120
// Append writes documents and metadata to the active fraction
126121
// Implements retry logic in case of fraction sealing during write
127122
func (fm *FracManager) Append(ctx context.Context, docs, metas storage.DocBlock) error {
128-
var err error
129123
for {
130124
select {
131125
case <-ctx.Done():
132126
return ctx.Err()
133127
default:
134-
// Attempt to append to active fraction
135-
if err = fm.Writer().Append(docs, metas); err == nil {
136-
return nil
128+
// Try to append data to the currently active fraction
129+
err := fm.lc.registry.Active().Append(docs, metas)
130+
if err != nil {
131+
logger.Info("append fail", zap.Error(err))
132+
if err == ErrFractionNotWritable {
133+
// Fraction is currently being sealed, retry the operation
134+
continue
135+
}
137136
}
138-
// Log error (typically occurs when fraction is sealed during write)
139-
logger.Info("append fail", zap.Error(err))
137+
return err
140138
}
141139
}
142140
}

fracmanager/fracmanager_for_tests.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ package fracmanager
33
import "sync"
44

55
func (fm *FracManager) WaitIdleForTests() {
6-
fm.Writer().WaitWriteIdle()
6+
fm.lc.registry.Active().WaitWriteIdle()
77
}
88

99
func (fm *FracManager) SealForcedForTests() {
1010
wg := sync.WaitGroup{}
1111
fm.mu.Lock() // todo: get rid of mutex after removing SealForcedForTests method
12-
fm.lc.Rotate(0, &wg)
12+
fm.lc.ManageRotation(0, 0, &wg)
1313
fm.mu.Unlock()
1414

1515
wg.Wait()

fracmanager/fraction_registry.go

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,29 +110,47 @@ func (r *fractionRegistry) OldestLocal() uint64 {
110110
return r.oldestLocal
111111
}
112112

113-
// Rotate completes the current active fraction and starts a new one.
113+
// RotateIfNeeded completes the current active fraction and starts a new one.
114114
// Moves previous active fraction to sealing queue.
115115
// Updates statistics and maintains chronological order.
116116
// Should be called when creating a new fraction.
117-
func (r *fractionRegistry) Rotate(newActive *activeProxy) (*activeProxy, error) {
117+
func (r *fractionRegistry) RotateIfNeeded(activeLimit uint64, activeProvider func() *activeProxy) (*activeProxy, error) {
118118
r.mu.Lock()
119119
defer r.mu.Unlock()
120120

121-
prevActive := r.active
122-
r.sealing = append(r.sealing, prevActive)
123-
r.addNewActive(newActive)
121+
if r.active.instance.Info().DocsOnDisk < activeLimit {
122+
return nil, nil
123+
}
124+
125+
oldActive := r.active
126+
r.sealing = append(r.sealing, oldActive)
127+
r.addNewActive(activeProvider())
124128

125129
// Freezing rotated fraction
126130
// todo can be too long!
127-
if err := prevActive.Freeze(); err != nil {
128-
return prevActive, err
131+
if err := oldActive.Finalize(); err != nil {
132+
return oldActive, err
129133
}
130134

131135
// Update statistics
132-
finalInfo := prevActive.instance.Info()
136+
finalInfo := oldActive.instance.Info()
133137
r.stats.sealing.Add(finalInfo)
134138

135-
return prevActive, nil
139+
return oldActive, nil
140+
}
141+
142+
func (r *fractionRegistry) SuspendIfOverCapacity(maxTotalSize uint64) bool {
143+
r.mu.Lock()
144+
defer r.mu.Unlock()
145+
146+
currentTotalUsage := r.active.instance.Info().FullSize() +
147+
r.stats.locals.totalSizeOnDisk +
148+
r.stats.sealing.totalSizeOnDisk +
149+
r.stats.offloading.totalSizeOnDisk
150+
151+
val := currentTotalUsage > maxTotalSize
152+
r.active.Suspend(val)
153+
return val
136154
}
137155

138156
// addNewActive sets a new active fraction and updates the complete fractions list.
@@ -232,6 +250,38 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy {
232250
return evicted
233251
}
234252

253+
// EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit.
254+
// Selects fractions that haven't finished offloading yet to minimize data loss.
255+
// Used when offloading queue grows too large due to slow remote storage performance.
256+
func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy {
257+
r.mu.Lock()
258+
defer r.mu.Unlock()
259+
260+
// Fast path: skip processing if within size limits
261+
if r.stats.offloading.totalSizeOnDisk <= sizeLimit {
262+
return nil
263+
}
264+
265+
i := 0
266+
evicted := []*sealedProxy{}
267+
// filter fractions
268+
for _, item := range r.offloading {
269+
// keep items that are within limits or already have remote references
270+
if r.stats.offloading.totalSizeOnDisk <= sizeLimit || item.remote != nil {
271+
r.offloading[i] = item
272+
i++
273+
continue
274+
}
275+
evicted = append(evicted, item)
276+
r.stats.offloading.Sub(item.instance.Info())
277+
}
278+
279+
r.offloading = r.offloading[:i]
280+
r.rebuildAllFractions()
281+
282+
return evicted
283+
}
284+
235285
// PromoteToLocal moves fractions from sealing to local queue when sealing completes.
236286
// Maintains strict ordering - younger fractions wait for older ones to seal first.
237287
func (r *fractionRegistry) PromoteToLocal(active *activeProxy, sealed *frac.Sealed) {
@@ -306,6 +356,11 @@ func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) {
306356
count++
307357
}
308358
}
359+
360+
if count == len(r.offloading) { // not found to remove
361+
return
362+
}
363+
309364
r.offloading = r.offloading[:count]
310365
r.stats.offloading.Sub(sealed.instance.Info())
311366

fracmanager/lifecycle_manager.go

Lines changed: 84 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type lifecycleManager struct {
2121
provider *fractionProvider // Provider for fraction operations
2222
flags *StateManager // Storage state flags
2323
registry *fractionRegistry // Fraction state registry
24+
tasks *TaskManager // Background offloading tasks
2425

2526
sealingWg sync.WaitGroup
2627
}
@@ -36,16 +37,21 @@ func newLifecycleManager(
3637
provider: provider,
3738
flags: flags,
3839
registry: registry,
40+
tasks: NewTaskManager(),
3941
}
4042
}
4143

4244
// Maintain performs periodic lifecycle management tasks.
4345
// It is a CORE method of lifecycleManager
4446
// Coordinates rotation, offloading, cleanup based on configuration.
4547
func (lc *lifecycleManager) Maintain(ctx context.Context, config *Config, wg *sync.WaitGroup) {
46-
lc.Rotate(config.FracSize, wg)
48+
maxTotalSize := config.TotalSize + config.OffloadingQueueSize
49+
lc.ManageRotation(config.FracSize, maxTotalSize, wg)
4750
if config.OffloadingEnabled {
48-
lc.OffloadLocal(ctx, config.TotalSize, wg)
51+
lc.OffloadLocal(ctx, config.TotalSize, config.OffloadingRetryDelay, wg)
52+
if config.OffloadingQueueSize > 0 {
53+
lc.RemoveOverflowed(config.OffloadingQueueSize, wg)
54+
}
4955
lc.CleanRemote(config.OffloadingRetention, wg)
5056
} else {
5157
lc.CleanLocal(config.TotalSize, wg)
@@ -85,57 +91,95 @@ func (lc *lifecycleManager) Seal(active *activeProxy) error {
8591
return nil
8692
}
8793

88-
// RotateIfNeeded checks if active fraction needs rotation based on size limit
89-
// Creates new active fraction and starts sealing the previous one.
90-
func (lc *lifecycleManager) Rotate(sizeLimit uint64, wg *sync.WaitGroup) {
91-
if lc.registry.Active().instance.Info().DocsOnDisk > sizeLimit {
92-
active := lc.rotate()
93-
94-
wg.Add(1)
95-
lc.sealingWg.Add(1)
96-
go func() {
97-
defer wg.Done()
98-
defer lc.sealingWg.Done()
99-
if err := lc.Seal(active); err != nil {
100-
logger.Fatal("sealing error", zap.Error(err))
101-
}
102-
}()
94+
// ManageRotation checks if rotation is needed and manages the entire process
95+
// including suspension checks, fraction rotation, and asynchronous sealing.
96+
func (lc *lifecycleManager) ManageRotation(maxActiveSize, maxTotalSize uint64, wg *sync.WaitGroup) {
97+
if maxTotalSize > 0 && lc.registry.SuspendIfOverCapacity(maxTotalSize) {
98+
return
10399
}
104-
}
105100

106-
func (lc *lifecycleManager) rotate() *activeProxy {
107-
active, err := lc.registry.Rotate(newActiveProxy(lc.provider.CreateActive()))
101+
activeToSeal, err := lc.registry.RotateIfNeeded(maxActiveSize, func() *activeProxy {
102+
return newActiveProxy(lc.provider.CreateActive())
103+
})
108104
if err != nil {
109105
logger.Fatal("active fraction rotation error", zap.Error(err))
110106
}
111-
return active
107+
if activeToSeal == nil {
108+
return
109+
}
110+
111+
wg.Add(1)
112+
lc.sealingWg.Add(1)
113+
go func() {
114+
defer wg.Done()
115+
defer lc.sealingWg.Done()
116+
if err := lc.Seal(activeToSeal); err != nil {
117+
logger.Fatal("sealing error", zap.Error(err))
118+
}
119+
}()
112120
}
113121

114122
// OffloadLocal starts offloading of local fractions to remote storage
115123
// Selects fractions based on disk space usage and retention policy.
116-
func (lc *lifecycleManager) OffloadLocal(ctx context.Context, sizeLimit uint64, wg *sync.WaitGroup) {
124+
func (lc *lifecycleManager) OffloadLocal(ctx context.Context, sizeLimit uint64, retryDelay time.Duration, wg *sync.WaitGroup) {
117125
toOffload, err := lc.registry.EvictLocal(true, sizeLimit)
118126
if err != nil {
119127
logger.Fatal("error releasing old fractions:", zap.Error(err))
120128
}
121129
for _, sealed := range toOffload {
122130
wg.Add(1)
123-
go func() {
131+
lc.tasks.Run(sealed.instance.BaseFileName, ctx, func(ctx context.Context) {
124132
defer wg.Done()
125133

126-
remote, _ := lc.TryOffload(ctx, sealed.instance)
134+
remote := lc.OffloadWithRetry(ctx, sealed.instance, retryDelay)
135+
127136
lc.registry.PromoteToRemote(sealed, remote)
128137

129138
if remote == nil {
130139
sealed.proxy.Redirect(emptyFraction{})
140+
lc.infoCache.Remove(sealed.instance.Info().Name())
131141
} else {
132142
sealed.proxy.Redirect(remote)
133143
}
134144

135145
// Free up local resources
136146
sealed.instance.Suicide()
137147
maintenanceTruncateTotal.Add(1)
138-
}()
148+
})
149+
}
150+
}
151+
152+
// OffloadWithRetry attempts to offload a fraction with retries until success or cancellation.
153+
// Returns the remote fraction instance and a boolean indicating whether offloading was not canceled.
154+
func (lc *lifecycleManager) OffloadWithRetry(ctx context.Context, sealed *frac.Sealed, retryDelay time.Duration) *frac.Remote {
155+
start := time.Now()
156+
for i := 0; ; i++ {
157+
remote, err := lc.TryOffload(ctx, sealed)
158+
if err == nil {
159+
return remote
160+
}
161+
162+
logger.Warn(
163+
"fail to offload fraction",
164+
zap.String("name", sealed.BaseFileName),
165+
zap.Duration("offloading_time", time.Since(start)),
166+
zap.Int("attempts", i),
167+
zap.Error(err),
168+
)
169+
170+
select {
171+
case <-ctx.Done():
172+
logger.Info(
173+
"fraction offloading was stopped",
174+
zap.String("name", sealed.BaseFileName),
175+
zap.Duration("offloading_time", time.Since(start)),
176+
zap.Int("attempts", i),
177+
zap.Error(ctx.Err()),
178+
)
179+
return nil
180+
case <-time.After(retryDelay):
181+
// Wait before next retry attempt
182+
}
139183
}
140184
}
141185

@@ -201,6 +245,21 @@ func (lc *lifecycleManager) CleanLocal(sizeLimit uint64, wg *sync.WaitGroup) {
201245
}()
202246
}
203247

248+
// RemoveOverflowed removes fractions from offloading queue that exceed size limit
249+
// Stops ongoing offloading tasks and cleans up both local and remote resources.
250+
func (lc *lifecycleManager) RemoveOverflowed(sizeLimit uint64, wg *sync.WaitGroup) {
251+
evicted := lc.registry.EvictOverflowed(sizeLimit)
252+
for _, item := range evicted {
253+
wg.Add(1)
254+
go func() {
255+
defer wg.Done()
256+
// Cancel the offloading task - this operation may take significant time
257+
// hence executed in a separate goroutine to avoid blocking
258+
lc.tasks.Cancel(item.instance.BaseFileName)
259+
}()
260+
}
261+
}
262+
204263
// UpdateOldestMetric updates the prometheus metric with oldest fraction timestamp
205264
func (lc *lifecycleManager) UpdateOldestMetric() {
206265
oldestFracTime.WithLabelValues("remote").Set((time.Duration(lc.registry.OldestTotal()) * time.Millisecond).Seconds())

0 commit comments

Comments
 (0)