|
1 | 1 | package cache |
2 | 2 |
|
3 | 3 | import ( |
4 | | - "bufio" |
5 | | - "encoding/gob" |
6 | | - "errors" |
| 4 | + "context" |
| 5 | + "encoding/binary" |
7 | 6 | "fmt" |
8 | | - "os" |
9 | | - "path/filepath" |
10 | 7 | "sync" |
11 | 8 |
|
12 | 9 | "sync/atomic" |
13 | 10 |
|
14 | 11 | lru "github.com/hashicorp/golang-lru/v2" |
15 | | - "golang.org/x/sync/errgroup" |
| 12 | + |
| 13 | + "github.com/evstack/ev-node/pkg/store" |
16 | 14 | ) |
17 | 15 |
|
18 | 16 | const ( |
@@ -47,62 +45,33 @@ type Cache[T any] struct { |
47 | 45 |
|
48 | 46 | // maxDAHeight tracks the maximum DA height seen |
49 | 47 | maxDAHeight *atomic.Uint64 |
50 | | -} |
51 | | - |
52 | | -// CacheConfig holds configuration for cache sizes. |
53 | | -type CacheConfig struct { |
54 | | - ItemsCacheSize int |
55 | | - HashesCacheSize int |
56 | | - DAIncludedCacheSize int |
57 | | -} |
58 | 48 |
|
59 | | -// DefaultCacheConfig returns the default cache configuration. |
60 | | -func DefaultCacheConfig() CacheConfig { |
61 | | - return CacheConfig{ |
62 | | - ItemsCacheSize: DefaultItemsCacheSize, |
63 | | - HashesCacheSize: DefaultHashesCacheSize, |
64 | | - DAIncludedCacheSize: DefaultDAIncludedCacheSize, |
65 | | - } |
66 | | -} |
| 49 | + // store is used for persisting DA inclusion data (optional, can be nil for ephemeral caches) |
| 50 | + store store.Store |
67 | 51 |
|
68 | | -// NewCache returns a new Cache struct with default sizes |
69 | | -func NewCache[T any]() *Cache[T] { |
70 | | - cache, _ := NewCacheWithConfig[T](DefaultCacheConfig()) |
71 | | - return cache |
| 52 | + // storeKeyPrefix is the prefix used for store keys (allows different caches to use different namespaces) |
| 53 | + storeKeyPrefix string |
72 | 54 | } |
73 | 55 |
|
74 | | -// NewCacheWithConfig returns a new Cache struct with custom sizes |
75 | | -func NewCacheWithConfig[T any](config CacheConfig) (*Cache[T], error) { |
76 | | - itemsCache, err := lru.New[uint64, *T](config.ItemsCacheSize) |
77 | | - if err != nil { |
78 | | - return nil, fmt.Errorf("failed to create items cache: %w", err) |
79 | | - } |
80 | | - |
81 | | - hashesCache, err := lru.New[string, bool](config.HashesCacheSize) |
82 | | - if err != nil { |
83 | | - return nil, fmt.Errorf("failed to create hashes cache: %w", err) |
84 | | - } |
85 | | - |
86 | | - daIncludedCache, err := lru.New[string, uint64](config.DAIncludedCacheSize) |
87 | | - if err != nil { |
88 | | - return nil, fmt.Errorf("failed to create daIncluded cache: %w", err) |
89 | | - } |
90 | | - |
| 56 | +// NewCache returns a new Cache struct with default sizes. |
| 57 | +// If store and keyPrefix are provided, DA inclusion data will be persisted to the store for populating the cache on restarts. |
| 58 | +func NewCache[T any](s store.Store, keyPrefix string) *Cache[T] { |
| 59 | + // LRU cache creation only fails if size <= 0, which won't happen with our defaults |
| 60 | + itemsCache, _ := lru.New[uint64, *T](DefaultItemsCacheSize) |
| 61 | + hashesCache, _ := lru.New[string, bool](DefaultHashesCacheSize) |
| 62 | + daIncludedCache, _ := lru.New[string, uint64](DefaultDAIncludedCacheSize) |
91 | 63 | // hashByHeight must be at least as large as hashes cache to ensure proper pruning. |
92 | | - // If an entry is evicted from hashByHeight before hashes, the corresponding hash |
93 | | - // entry can no longer be pruned by height, causing a slow memory leak. |
94 | | - hashByHeightCache, err := lru.New[uint64, string](config.HashesCacheSize) |
95 | | - if err != nil { |
96 | | - return nil, fmt.Errorf("failed to create hashByHeight cache: %w", err) |
97 | | - } |
| 64 | + hashByHeightCache, _ := lru.New[uint64, string](DefaultHashesCacheSize) |
98 | 65 |
|
99 | 66 | return &Cache[T]{ |
100 | | - itemsByHeight: itemsCache, |
101 | | - hashes: hashesCache, |
102 | | - daIncluded: daIncludedCache, |
103 | | - hashByHeight: hashByHeightCache, |
104 | | - maxDAHeight: &atomic.Uint64{}, |
105 | | - }, nil |
| 67 | + itemsByHeight: itemsCache, |
| 68 | + hashes: hashesCache, |
| 69 | + daIncluded: daIncludedCache, |
| 70 | + hashByHeight: hashByHeightCache, |
| 71 | + maxDAHeight: &atomic.Uint64{}, |
| 72 | + store: s, |
| 73 | + storeKeyPrefix: keyPrefix, |
| 74 | + } |
106 | 75 | } |
107 | 76 |
|
108 | 77 | // getItem returns an item from the cache by height. |
@@ -158,11 +127,20 @@ func (c *Cache[T]) getDAIncluded(hash string) (uint64, bool) { |
158 | 127 | return daHeight, true |
159 | 128 | } |
160 | 129 |
|
161 | | -// setDAIncluded sets the hash as DA-included with the given DA height and tracks block height for pruning |
| 130 | +// setDAIncluded sets the hash as DA-included with the given DA height and tracks block height for pruning. |
162 | 131 | func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) { |
163 | 132 | c.daIncluded.Add(hash, daHeight) |
164 | 133 | c.hashByHeight.Add(blockHeight, hash) |
165 | 134 |
|
| 135 | + // Persist to store if configured |
| 136 | + if c.store != nil { |
| 137 | + key := c.storeKeyPrefix + hash |
| 138 | + value := make([]byte, 16) // 8 bytes for daHeight + 8 bytes for blockHeight |
| 139 | + binary.LittleEndian.PutUint64(value[0:8], daHeight) |
| 140 | + binary.LittleEndian.PutUint64(value[8:16], blockHeight) |
| 141 | + _ = c.store.SetMetadata(context.Background(), key, value) |
| 142 | + } |
| 143 | + |
166 | 144 | // Update max DA height if necessary |
167 | 145 | for range 1_000 { |
168 | 146 | current := c.maxDAHeight.Load() |
@@ -208,156 +186,91 @@ func (c *Cache[T]) deleteAllForHeight(height uint64) { |
208 | 186 | } |
209 | 187 | } |
210 | 188 |
|
211 | | -const ( |
212 | | - itemsByHeightFilename = "items_by_height.gob" |
213 | | - hashesFilename = "hashes.gob" |
214 | | - daIncludedFilename = "da_included.gob" |
215 | | -) |
216 | | - |
217 | | -// saveMapGob saves a map to a file using gob encoding. |
218 | | -func saveMapGob[K comparable, V any](filePath string, data map[K]V) (err error) { |
219 | | - file, err := os.Create(filePath) |
220 | | - if err != nil { |
221 | | - return fmt.Errorf("failed to create file %s: %w", filePath, err) |
| 189 | +// RestoreFromStore loads DA inclusion data from the store into the in-memory cache. |
| 190 | +// This should be called during initialization to restore persisted state. |
| 191 | +// It iterates through store metadata keys with the cache's prefix and populates the LRU cache. |
| 192 | +func (c *Cache[T]) RestoreFromStore(ctx context.Context, hashes []string) error { |
| 193 | + if c.store == nil { |
| 194 | + return nil // No store configured, nothing to restore |
222 | 195 | } |
223 | | - writer := bufio.NewWriter(file) |
224 | 196 |
|
225 | | - defer func() { |
226 | | - err = errors.Join(err, writer.Flush(), file.Sync(), file.Close()) |
227 | | - }() |
228 | | - if err := gob.NewEncoder(writer).Encode(data); err != nil { |
229 | | - return fmt.Errorf("failed to encode to file %s: %w", filePath, err) |
230 | | - } |
231 | | - return nil |
232 | | -} |
| 197 | + for _, hash := range hashes { |
| 198 | + key := c.storeKeyPrefix + hash |
| 199 | + value, err := c.store.GetMetadata(ctx, key) |
| 200 | + if err != nil { |
| 201 | + // Key not found is not an error - the hash may not have been DA included yet |
| 202 | + continue |
| 203 | + } |
| 204 | + if len(value) != 16 { |
| 205 | + continue // Invalid data, skip |
| 206 | + } |
| 207 | + |
| 208 | + daHeight := binary.LittleEndian.Uint64(value[0:8]) |
| 209 | + blockHeight := binary.LittleEndian.Uint64(value[8:16]) |
233 | 210 |
|
234 | | -// loadMapGob loads a map from a file using gob encoding. |
235 | | -// if the file does not exist, it returns an empty map and no error. |
236 | | -func loadMapGob[K comparable, V any](filePath string) (map[K]V, error) { |
237 | | - m := make(map[K]V) |
238 | | - file, err := os.Open(filePath) |
239 | | - if err != nil { |
240 | | - if os.IsNotExist(err) { |
241 | | - return m, nil // return empty map if file not found |
| 211 | + c.daIncluded.Add(hash, daHeight) |
| 212 | + c.hashByHeight.Add(blockHeight, hash) |
| 213 | + |
| 214 | + // Update max DA height |
| 215 | + current := c.maxDAHeight.Load() |
| 216 | + if daHeight > current { |
| 217 | + c.maxDAHeight.Store(daHeight) |
242 | 218 | } |
243 | | - return nil, fmt.Errorf("failed to open file %s: %w", filePath, err) |
244 | 219 | } |
245 | | - defer file.Close() |
246 | 220 |
|
247 | | - decoder := gob.NewDecoder(file) |
248 | | - if err := decoder.Decode(&m); err != nil { |
249 | | - return nil, fmt.Errorf("failed to decode file %s: %w", filePath, err) |
250 | | - } |
251 | | - return m, nil |
| 221 | + return nil |
252 | 222 | } |
253 | 223 |
|
254 | | -// SaveToDisk saves the cache contents to disk in the specified folder. |
255 | | -// It's the caller's responsibility to ensure that type T (and any types it contains) |
256 | | -// are registered with the gob package if necessary (e.g., using gob.Register). |
257 | | -func (c *Cache[T]) SaveToDisk(folderPath string) error { |
258 | | - if err := os.MkdirAll(folderPath, 0o755); err != nil { |
259 | | - return fmt.Errorf("failed to create directory %s: %w", folderPath, err) |
| 224 | +// SaveToStore persists all current DA inclusion entries to the store. |
| 225 | +// This can be called before shutdown to ensure all data is persisted. |
| 226 | +func (c *Cache[T]) SaveToStore(ctx context.Context) error { |
| 227 | + if c.store == nil { |
| 228 | + return nil // No store configured |
260 | 229 | } |
261 | | - var wg errgroup.Group |
262 | | - |
263 | | - // save items by height |
264 | | - wg.Go(func() error { |
265 | | - itemsByHeightMap := make(map[uint64]*T) |
266 | | - keys := c.itemsByHeight.Keys() |
267 | | - for _, k := range keys { |
268 | | - if v, ok := c.itemsByHeight.Peek(k); ok { |
269 | | - itemsByHeightMap[k] = v |
270 | | - } |
271 | | - } |
272 | 230 |
|
273 | | - if err := saveMapGob(filepath.Join(folderPath, itemsByHeightFilename), itemsByHeightMap); err != nil { |
274 | | - return fmt.Errorf("save %s: %w", itemsByHeightFilename, err) |
275 | | - } |
276 | | - return nil |
277 | | - }) |
278 | | - |
279 | | - // save hashes |
280 | | - wg.Go(func() error { |
281 | | - hashesToSave := make(map[string]bool) |
282 | | - keys := c.hashes.Keys() |
283 | | - for _, k := range keys { |
284 | | - if v, ok := c.hashes.Peek(k); ok { |
285 | | - hashesToSave[k] = v |
286 | | - } |
| 231 | + keys := c.daIncluded.Keys() |
| 232 | + for _, hash := range keys { |
| 233 | + daHeight, ok := c.daIncluded.Peek(hash) |
| 234 | + if !ok { |
| 235 | + continue |
287 | 236 | } |
288 | 237 |
|
289 | | - if err := saveMapGob(filepath.Join(folderPath, hashesFilename), hashesToSave); err != nil { |
290 | | - return fmt.Errorf("save %s: %w", hashesFilename, err) |
291 | | - } |
292 | | - return nil |
293 | | - }) |
294 | | - |
295 | | - // save daIncluded |
296 | | - wg.Go(func() error { |
297 | | - daIncludedToSave := make(map[string]uint64) |
298 | | - keys := c.daIncluded.Keys() |
299 | | - for _, k := range keys { |
300 | | - if v, ok := c.daIncluded.Peek(k); ok { |
301 | | - daIncludedToSave[k] = v |
| 238 | + // We need to find the block height for this hash |
| 239 | + // Since we track hash by height, we need to iterate |
| 240 | + var blockHeight uint64 |
| 241 | + heightKeys := c.hashByHeight.Keys() |
| 242 | + for _, h := range heightKeys { |
| 243 | + if storedHash, ok := c.hashByHeight.Peek(h); ok && storedHash == hash { |
| 244 | + blockHeight = h |
| 245 | + break |
302 | 246 | } |
303 | 247 | } |
304 | 248 |
|
305 | | - if err := saveMapGob(filepath.Join(folderPath, daIncludedFilename), daIncludedToSave); err != nil { |
306 | | - return fmt.Errorf("save %s: %w", daIncludedFilename, err) |
| 249 | + key := c.storeKeyPrefix + hash |
| 250 | + value := make([]byte, 16) |
| 251 | + binary.LittleEndian.PutUint64(value[0:8], daHeight) |
| 252 | + binary.LittleEndian.PutUint64(value[8:16], blockHeight) |
| 253 | + |
| 254 | + if err := c.store.SetMetadata(ctx, key, value); err != nil { |
| 255 | + return fmt.Errorf("failed to save DA inclusion for hash %s: %w", hash, err) |
307 | 256 | } |
308 | | - return nil |
309 | | - }) |
| 257 | + } |
310 | 258 |
|
311 | | - return wg.Wait() |
| 259 | + return nil |
312 | 260 | } |
313 | 261 |
|
314 | | -// LoadFromDisk loads the cache contents from disk from the specified folder. |
315 | | -// It populates the current cache instance. If files are missing, corresponding parts of the cache will be empty. |
316 | | -// It's the caller's responsibility to ensure that type T (and any types it contains) |
317 | | -// are registered with the gob package if necessary (e.g., using gob.Register). |
318 | | -func (c *Cache[T]) LoadFromDisk(folderPath string) error { |
319 | | - var wg errgroup.Group |
320 | | - |
321 | | - // load items by height |
322 | | - wg.Go(func() error { |
323 | | - itemsByHeightMap, err := loadMapGob[uint64, *T](filepath.Join(folderPath, itemsByHeightFilename)) |
324 | | - if err != nil { |
325 | | - return fmt.Errorf("failed to load %s : %w", itemsByHeightFilename, err) |
326 | | - } |
327 | | - for k, v := range itemsByHeightMap { |
328 | | - c.itemsByHeight.Add(k, v) |
329 | | - } |
330 | | - return nil |
331 | | - }) |
332 | | - |
333 | | - // load hashes |
334 | | - wg.Go(func() error { |
335 | | - hashesMap, err := loadMapGob[string, bool](filepath.Join(folderPath, hashesFilename)) |
336 | | - if err != nil { |
337 | | - return fmt.Errorf("failed to load %s : %w", hashesFilename, err) |
338 | | - } |
339 | | - for k, v := range hashesMap { |
340 | | - c.hashes.Add(k, v) |
341 | | - } |
| 262 | +// ClearFromStore removes all DA inclusion entries from the store for this cache. |
| 263 | +func (c *Cache[T]) ClearFromStore(ctx context.Context, hashes []string) error { |
| 264 | + if c.store == nil { |
342 | 265 | return nil |
343 | | - }) |
| 266 | + } |
344 | 267 |
|
345 | | - // load daIncluded |
346 | | - wg.Go(func() error { |
347 | | - daIncludedMap, err := loadMapGob[string, uint64](filepath.Join(folderPath, daIncludedFilename)) |
348 | | - if err != nil { |
349 | | - return fmt.Errorf("failed to load %s : %w", daIncludedFilename, err) |
350 | | - } |
351 | | - for k, v := range daIncludedMap { |
352 | | - c.daIncluded.Add(k, v) |
353 | | - // Update max DA height during load |
354 | | - current := c.maxDAHeight.Load() |
355 | | - if v > current { |
356 | | - c.maxDAHeight.Store(v) |
357 | | - } |
| 268 | + for _, hash := range hashes { |
| 269 | + key := c.storeKeyPrefix + hash |
| 270 | + if err := c.store.DeleteMetadata(ctx, key); err != nil { |
| 271 | + return fmt.Errorf("failed to delete DA inclusion for hash %s: %w", hash, err) |
358 | 272 | } |
359 | | - return nil |
360 | | - }) |
| 273 | + } |
361 | 274 |
|
362 | | - return wg.Wait() |
| 275 | + return nil |
363 | 276 | } |
0 commit comments