Skip to content

Commit 1644d37

Browse files
authored
chore: parallel cache de/serialization (#2868)
## Overview Speed up cache write/loads via parallel execution. Pulled from #2836
1 parent c19136e commit 1644d37

1 file changed

Lines changed: 89 additions & 63 deletions

File tree

block/internal/cache/generic_cache.go

Lines changed: 89 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package cache
22

33
import (
4+
"bufio"
45
"encoding/gob"
6+
"errors"
57
"fmt"
68
"os"
79
"path/filepath"
810
"sync"
11+
912
"sync/atomic"
13+
14+
"golang.org/x/sync/errgroup"
1015
)
1116

1217
// Cache is a generic cache that maintains items that are seen and hard confirmed
@@ -97,7 +102,7 @@ func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint6
97102
c.hashByHeight.Store(blockHeight, hash)
98103

99104
// Update max DA height if necessary
100-
for {
105+
for range 1_000 {
101106
current := c.maxDAHeight.Load()
102107
if daHeight <= current {
103108
return
@@ -137,15 +142,17 @@ const (
137142
)
138143

139144
// saveMapGob saves a map to a file using gob encoding.
140-
func saveMapGob[K comparable, V any](filePath string, data map[K]V) error {
145+
func saveMapGob[K comparable, V any](filePath string, data map[K]V) (err error) {
141146
file, err := os.Create(filePath)
142147
if err != nil {
143148
return fmt.Errorf("failed to create file %s: %w", filePath, err)
144149
}
145-
defer file.Close()
150+
writer := bufio.NewWriter(file)
146151

147-
encoder := gob.NewEncoder(file)
148-
if err := encoder.Encode(data); err != nil {
152+
defer func() {
153+
err = errors.Join(err, writer.Flush(), file.Sync(), file.Close())
154+
}()
155+
if err := gob.NewEncoder(writer).Encode(data); err != nil {
149156
return fmt.Errorf("failed to encode to file %s: %w", filePath, err)
150157
}
151158
return nil
@@ -178,86 +185,105 @@ func (c *Cache[T]) SaveToDisk(folderPath string) error {
178185
if err := os.MkdirAll(folderPath, 0o755); err != nil {
179186
return fmt.Errorf("failed to create directory %s: %w", folderPath, err)
180187
}
181-
188+
var wg errgroup.Group
182189
// prepare items maps
183-
itemsByHeightMap := make(map[uint64]*T)
190+
wg.Go(func() error {
191+
itemsByHeightMap := make(map[uint64]*T)
184192

185-
c.itemsByHeight.Range(func(k, v any) bool {
186-
if hk, ok := k.(uint64); ok {
187-
if it, ok := v.(*T); ok {
188-
itemsByHeightMap[hk] = it
193+
c.itemsByHeight.Range(func(k, v any) bool {
194+
if hk, ok := k.(uint64); ok {
195+
if it, ok := v.(*T); ok {
196+
itemsByHeightMap[hk] = it
197+
}
189198
}
199+
return true
200+
})
201+
202+
if err := saveMapGob(filepath.Join(folderPath, itemsByHeightFilename), itemsByHeightMap); err != nil {
203+
return fmt.Errorf("save %s: %w", itemsByHeightFilename, err)
190204
}
191-
return true
205+
return nil
192206
})
193207

194-
if err := saveMapGob(filepath.Join(folderPath, itemsByHeightFilename), itemsByHeightMap); err != nil {
195-
return err
196-
}
197-
198208
// prepare hashes map
199-
hashesToSave := make(map[string]bool)
200-
c.hashes.Range(func(k, v any) bool {
201-
keyStr, okKey := k.(string)
202-
valBool, okVal := v.(bool)
203-
if okKey && okVal {
204-
hashesToSave[keyStr] = valBool
209+
wg.Go(func() error {
210+
hashesToSave := make(map[string]bool)
211+
c.hashes.Range(func(k, v any) bool {
212+
keyStr, okKey := k.(string)
213+
valBool, okVal := v.(bool)
214+
if okKey && okVal {
215+
hashesToSave[keyStr] = valBool
216+
}
217+
return true
218+
})
219+
if err := saveMapGob(filepath.Join(folderPath, hashesFilename), hashesToSave); err != nil {
220+
return fmt.Errorf("save %s: %w", hashesFilename, err)
205221
}
206-
return true
222+
return nil
207223
})
208-
if err := saveMapGob(filepath.Join(folderPath, hashesFilename), hashesToSave); err != nil {
209-
return err
210-
}
211224

212225
// prepare daIncluded map
213-
daIncludedToSave := make(map[string]uint64)
214-
c.daIncluded.Range(func(k, v any) bool {
215-
keyStr, okKey := k.(string)
216-
valUint64, okVal := v.(uint64)
217-
if okKey && okVal {
218-
daIncludedToSave[keyStr] = valUint64
226+
wg.Go(func() error {
227+
daIncludedToSave := make(map[string]uint64)
228+
c.daIncluded.Range(func(k, v any) bool {
229+
keyStr, okKey := k.(string)
230+
valUint64, okVal := v.(uint64)
231+
if okKey && okVal {
232+
daIncludedToSave[keyStr] = valUint64
233+
}
234+
return true
235+
})
236+
if err := saveMapGob(filepath.Join(folderPath, daIncludedFilename), daIncludedToSave); err != nil {
237+
return fmt.Errorf("save %s: %w", daIncludedFilename, err)
219238
}
220-
return true
239+
return nil
221240
})
222-
return saveMapGob(filepath.Join(folderPath, daIncludedFilename), daIncludedToSave)
241+
return wg.Wait()
223242
}
224243

225244
// LoadFromDisk loads the cache contents from disk from the specified folder.
226245
// It populates the current cache instance. If files are missing, corresponding parts of the cache will be empty.
227246
// It's the caller's responsibility to ensure that type T (and any types it contains)
228247
// are registered with the gob package if necessary (e.g., using gob.Register).
229248
func (c *Cache[T]) LoadFromDisk(folderPath string) error {
249+
var wg errgroup.Group
230250
// load items by height
231-
itemsByHeightMap, err := loadMapGob[uint64, *T](filepath.Join(folderPath, itemsByHeightFilename))
232-
if err != nil {
233-
return fmt.Errorf("failed to load items by height: %w", err)
234-
}
235-
for k, v := range itemsByHeightMap {
236-
c.itemsByHeight.Store(k, v)
237-
}
238-
251+
wg.Go(func() error {
252+
itemsByHeightMap, err := loadMapGob[uint64, *T](filepath.Join(folderPath, itemsByHeightFilename))
253+
if err != nil {
254+
return fmt.Errorf("failed to load %s : %w", itemsByHeightFilename, err)
255+
}
256+
for k, v := range itemsByHeightMap {
257+
c.itemsByHeight.Store(k, v)
258+
}
259+
return nil
260+
})
239261
// load hashes
240-
hashesMap, err := loadMapGob[string, bool](filepath.Join(folderPath, hashesFilename))
241-
if err != nil {
242-
return fmt.Errorf("failed to load hashes: %w", err)
243-
}
244-
for k, v := range hashesMap {
245-
c.hashes.Store(k, v)
246-
}
247-
262+
wg.Go(func() error {
263+
hashesMap, err := loadMapGob[string, bool](filepath.Join(folderPath, hashesFilename))
264+
if err != nil {
265+
return fmt.Errorf("failed to load %s : %w", hashesFilename, err)
266+
}
267+
for k, v := range hashesMap {
268+
c.hashes.Store(k, v)
269+
}
270+
return nil
271+
})
248272
// load daIncluded
249-
daIncludedMap, err := loadMapGob[string, uint64](filepath.Join(folderPath, daIncludedFilename))
250-
if err != nil {
251-
return fmt.Errorf("failed to load daIncluded: %w", err)
252-
}
253-
for k, v := range daIncludedMap {
254-
c.daIncluded.Store(k, v)
255-
// Update max DA height during load
256-
current := c.maxDAHeight.Load()
257-
if v > current {
258-
c.maxDAHeight.Store(v)
273+
wg.Go(func() error {
274+
daIncludedMap, err := loadMapGob[string, uint64](filepath.Join(folderPath, daIncludedFilename))
275+
if err != nil {
276+
return fmt.Errorf("failed to load %s : %w", daIncludedFilename, err)
259277
}
260-
}
261-
262-
return nil
278+
for k, v := range daIncludedMap {
279+
c.daIncluded.Store(k, v)
280+
// Update max DA height during load
281+
current := c.maxDAHeight.Load()
282+
if v > current {
283+
c.maxDAHeight.Store(v)
284+
}
285+
}
286+
return nil
287+
})
288+
return wg.Wait()
263289
}

0 commit comments

Comments
 (0)