Skip to content

Commit c9a8505

Browse files
committed
chore(docsfilter): cache tombstones files' headers
1 parent a9e7e64 commit c9a8505

7 files changed

Lines changed: 55 additions & 30 deletions

File tree

docsfilter/docs_filter.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"go.uber.org/zap"
1515

16+
"github.com/ozontech/seq-db/cache"
1617
"github.com/ozontech/seq-db/frac"
1718
"github.com/ozontech/seq-db/frac/processor"
1819
"github.com/ozontech/seq-db/fracmanager"
@@ -55,6 +56,8 @@ type DocsFilter struct {
5556
createDirOnce *sync.Once
5657

5758
maintenanceInterval time.Duration
59+
60+
headersCache *cache.Cache[[]lidsBlockHeader]
5861
}
5962

6063
func New(
@@ -85,6 +88,8 @@ func New(
8588
rateLimit: make(chan struct{}, workers),
8689
createDirOnce: &sync.Once{},
8790
maintenanceInterval: defaultMaintenanceInterval,
91+
// TODO: create cache properly (cleaner, metrics) (use cacheMaintainer ???)
92+
headersCache: cache.NewCache[[]lidsBlockHeader](nil, nil),
8893
}
8994
}
9095

@@ -127,7 +132,7 @@ func (df *DocsFilter) GetTombstonesIteratorByFrac(fracName string, minLID, maxLI
127132

128133
iterators := make([]node.Node, 0, len(fracFiles))
129134
for _, f := range fracFiles {
130-
loader, err := newLoader(f)
135+
loader, err := newLoader(f, df.headersCache)
131136
if err != nil {
132137
logger.Error("can't open filtered lids file", zap.String("path", f), zap.Error(err))
133138
return nil, err

docsfilter/iterator_asc.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ func (it *IteratorAsc) String() string {
1414

1515
func (it *IteratorAsc) Next() (uint32, bool) {
1616
if it.loader.headers == nil {
17-
err := it.loader.loadHeaders()
17+
headers, err := it.loader.getHeaders()
1818
if err != nil {
1919
logger.Panic("can't load tombstones headers", zap.Error(err))
2020
}
21+
it.loader.headers = headers
2122
it.blockIndex = len(it.loader.headers) - 1
2223
}
2324

docsfilter/iterator_asc_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/stretchr/testify/require"
1111

12+
"github.com/ozontech/seq-db/cache"
1213
"github.com/ozontech/seq-db/seq"
1314
)
1415

@@ -57,7 +58,7 @@ func TestIteratorAsc(t *testing.T) {
5758
err := os.WriteFile(filePath, rawDocsFilter, 0o644)
5859
require.NoError(t, err)
5960

60-
loader, err := newLoader(filePath)
61+
loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil))
6162
require.NoError(t, err)
6263

6364
iterator := (*IteratorAsc)(NewIterator(loader, tc.minLID, tc.maxLID))

docsfilter/iterator_desc.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ func (it *IteratorDesc) String() string {
1414

1515
func (it *IteratorDesc) Next() (uint32, bool) {
1616
if it.loader.headers == nil {
17-
err := it.loader.loadHeaders()
17+
headers, err := it.loader.getHeaders()
1818
if err != nil {
1919
logger.Panic("can't load tombstones headers", zap.Error(err))
2020
}
21+
it.loader.headers = headers
2122
}
2223

2324
for len(it.lids) == 0 {

docsfilter/iterator_desc_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/stretchr/testify/require"
1010

11+
"github.com/ozontech/seq-db/cache"
1112
"github.com/ozontech/seq-db/seq"
1213
)
1314

@@ -52,7 +53,7 @@ func TestIteratorDesc(t *testing.T) {
5253
err := os.WriteFile(filePath, rawDocsFilter, 0o644)
5354
require.NoError(t, err)
5455

55-
loader, err := newLoader(filePath)
56+
loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil))
5657
require.NoError(t, err)
5758

5859
iterator := (*IteratorDesc)(NewIterator(loader, tc.minLID, tc.maxLID))

docsfilter/loader.go

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,63 @@ package docsfilter
33
import (
44
"encoding/binary"
55
"fmt"
6+
"hash/fnv"
67
"io"
78
"os"
89

910
"go.uber.org/zap"
1011

12+
"github.com/ozontech/seq-db/cache"
1113
"github.com/ozontech/seq-db/logger"
1214
)
1315

1416
type loader struct {
15-
headers []lidsBlockHeader
16-
file *os.File
17-
// TODO: seems like cache needs to be populated somewhere outside of this struct and passed here
18-
// cache *cache.Cache[[]lidsBlockHeader]
17+
headers []lidsBlockHeader
18+
file *os.File
19+
headersCache *cache.Cache[[]lidsBlockHeader]
20+
cashKey uint32
1921
}
2022

21-
func newLoader(filePath string) (*loader, error) {
23+
func newLoader(filePath string, headersCache *cache.Cache[[]lidsBlockHeader]) (*loader, error) {
2224
f, err := os.Open(filePath)
2325
if err != nil {
2426
return nil, err
2527
}
2628

29+
hash := fnv.New32a()
30+
hash.Write([]byte(filterNameFromTombstonesPath(filePath) + fracNameFromFilePath(filePath)))
31+
2732
return &loader{
28-
file: f,
33+
file: f,
34+
headersCache: headersCache,
35+
cashKey: hash.Sum32(),
2936
}, nil
3037
}
3138

32-
func (l *loader) loadHeaders() error {
39+
func (l *loader) getHeaders() ([]lidsBlockHeader, error) {
40+
return l.headersCache.GetWithError(l.cashKey, func() ([]lidsBlockHeader, int, error) {
41+
headers, err := l.loadHeaders()
42+
if err != nil {
43+
return headers, 0, err
44+
}
45+
size := len(headers) * int(lidsBlockHeaderSizeBytes)
46+
return headers, size, nil
47+
})
48+
}
49+
50+
func (l *loader) loadHeaders() ([]lidsBlockHeader, error) {
3351
numBuf := make([]byte, 1+4) // block version 1 byte + number of blocks 4 bytes
3452
n, err := l.file.ReadAt(numBuf, 0)
3553
if err != nil {
36-
return fmt.Errorf("can't read headers from disk: %s", err.Error())
54+
return nil, fmt.Errorf("can't read headers from disk: %s", err.Error())
3755
}
3856
if n == 0 {
39-
return fmt.Errorf("can't read headers from disk: n=0")
57+
return nil, fmt.Errorf("can't read headers from disk: n=0")
4058
}
4159

4260
version := docsFilterBinVersion(numBuf[0])
4361
if _, ok := availableVersions[version]; !ok {
44-
return fmt.Errorf("invalid LIDs binary version: %d", version)
62+
return nil, fmt.Errorf("invalid LIDs binary version: %d", version)
4563
}
4664

4765
headersPos := n
@@ -50,38 +68,39 @@ func (l *loader) loadHeaders() error {
5068

5169
n, err = l.file.ReadAt(headersBuf, int64(headersPos))
5270
if err != nil && err != io.EOF {
53-
return fmt.Errorf("can't read headers, %s", err.Error())
71+
return nil, fmt.Errorf("can't read headers, %s", err.Error())
5472
}
5573
if n != len(headersBuf) {
56-
return fmt.Errorf("can't read headers, read=%d, requested=%d", n, len(headersBuf))
74+
return nil, fmt.Errorf("can't read headers, read=%d, requested=%d", n, len(headersBuf))
5775
}
5876
if len(headersBuf)%int(lidsBlockHeaderSizeBytes) != 0 {
59-
return fmt.Errorf("wrong headers format")
77+
return nil, fmt.Errorf("wrong headers format")
6078
}
6179

62-
l.headers = make([]lidsBlockHeader, 0, numberOfBlocks)
80+
headers := make([]lidsBlockHeader, 0, numberOfBlocks)
6381
for range numberOfBlocks {
6482
header := lidsBlockHeader{}
6583
headersBuf, err = header.unmarshal(headersBuf)
6684
if err != nil {
67-
return fmt.Errorf("can't unmarshal lids header: %s", err)
85+
return nil, fmt.Errorf("can't unmarshal lids header: %s", err)
6886
}
69-
l.headers = append(l.headers, header)
87+
headers = append(headers, header)
7088
}
7189

7290
if len(headersBuf) > 0 {
73-
return fmt.Errorf("unexpected tail when unmarshaling LIDs headers")
91+
return nil, fmt.Errorf("unexpected tail when unmarshaling LIDs headers")
7492
}
7593

76-
return nil
94+
return headers, nil
7795
}
7896

7997
func (l *loader) loadBlock(index int) ([]uint32, error) {
8098
if l.headers == nil {
81-
err := l.loadHeaders()
99+
headers, err := l.getHeaders()
82100
if err != nil {
83101
return nil, err
84102
}
103+
l.headers = headers
85104
}
86105

87106
if len(l.headers) < index+1 {
@@ -90,7 +109,7 @@ func (l *loader) loadBlock(index int) ([]uint32, error) {
90109

91110
header := l.headers[index]
92111

93-
blockBuf := make([]byte, header.Size) // TODO: buffer pool (???)
112+
blockBuf := make([]byte, header.Size)
94113
n, err := l.file.ReadAt(blockBuf, int64(header.Offset))
95114
if err != nil {
96115
return nil, err

docsfilter/loader_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/stretchr/testify/require"
99

10+
"github.com/ozontech/seq-db/cache"
1011
"github.com/ozontech/seq-db/seq"
1112
)
1213

@@ -22,13 +23,9 @@ func TestLoader(t *testing.T) {
2223
err := os.WriteFile(filePath, rawDocsFilter, 0o644)
2324
require.NoError(t, err)
2425

25-
loader, err := newLoader(filePath)
26+
loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil))
2627
require.NoError(t, err)
2728

28-
err = loader.loadHeaders()
29-
require.NoError(t, err)
30-
require.Len(t, loader.headers, 4)
31-
3229
resLIDs := make([]uint32, 0, len(multipleBlocksLIDs))
3330
const numberOfBlocks = 4
3431
for i := range numberOfBlocks {

0 commit comments

Comments
 (0)