-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathdiskcache.go
More file actions
173 lines (136 loc) · 4.93 KB
/
diskcache.go
File metadata and controls
173 lines (136 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Unless explicitly stated otherwise all files in this repository are licensed
// under the MIT License.
// This product includes software developed at Guance Cloud (https://www.guance.com/).
// Copyright 2021-present Guance, Inc.
// Package diskcache is a simple local-disk cache implements.
//
// The diskcache package is a local-disk cache, it implements following functions:
//
// 1. Concurrent Put()/Get().
// 2. Recoverable last-read-position on restart.
// 3. Exclusive Open() on same path.
// 4. Errors during Get() are retriable.
// 5. Auto-rotate on batch size.
// 6. Drop in FIFO policy when max capacity reached.
// 7. We can configure various specifics in environments without to modify options source code.
package diskcache
import (
"errors"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/GuanceCloud/cliutils/logger"
)
const (
dataHeaderLen = 4
// EOFHint labels a file's end.
EOFHint = uint32(0xdeadbeef)
)
// Generic diskcache errors.
var (
// Invalid read size.
ErrUnexpectedReadSize = errors.New("unexpected read size")
ErrTooSmallReadBuf = errors.New("too small read buffer")
// Data send to Put() exceed the maxDataSize.
ErrTooLargeData = errors.New("too large data")
// Get on no data cache.
ErrNoData = errors.New("no data")
// Diskcache full, no data can be write now.
ErrCacheFull = errors.New("cache full")
ErrInvalidStreamSize = errors.New("invalid stream size")
// Invalid cache filename.
ErrInvalidDataFileName = errors.New("invalid datafile name")
ErrInvalidDataFileNameSuffix = errors.New("invalid datafile name suffix")
// Invalid file header.
ErrBadHeader = errors.New("bad header")
l = logger.DefaultSLogger("diskcache")
once sync.Once
)
// DiskCache is the representation of a disk cache.
// A DiskCache is safe for concurrent use by multiple goroutines.
// Do not Open the same-path diskcache among goroutines.
type DiskCache struct {
path string
dataFiles []string
// current writing/reading file.
curWriteFile,
curReadfile string
// current write/read fd
wfd, rfd *os.File
// If current write file go nothing put for a
// long time(wakeup), we rotate it manually.
wfdLastWrite time.Time
// how long to wakeup a sleeping write-file
wakeup time.Duration
wlock *InstrumentedMutex // write-lock: used to exclude concurrent Put to the header file.
rlock *InstrumentedMutex // read-lock: used to exclude concurrent Get on the tail file.
rwlock *InstrumentedMutex // used to exclude switch/rotate/drop/Close on current disk cache instance.
flock *walLock // disabled multi-Open on same path
pos *pos // current read fd position info
// specs of current diskcache
size atomic.Int64 // current byte size
curBatchSize, // current writing file's size
curReadSize, // current reading file's size
batchSize, // current batch size(static)
capacity int64 // capacity of the diskcache
maxDataSize int32 // max data size of single Put()
batchHeader []byte
// File permission, default 0750/0640
dirPerms,
filePerms os.FileMode
// various flags
noSync, // NoSync if enabled, may cause data missing, default false
noFallbackOnError, // ignore Fn() error
noPos, // no position
filoDrop, // first-in-last-out drop, meas we chooes to drop the new-coming data first
noDrop, // disable drop on cache full
noLock bool // no file lock
// labels used to export prometheus flags
labels []string
LastErr error
}
func (c *DiskCache) String() string {
if c.rwlock != nil {
c.rwlock.Lock()
defer c.rwlock.Unlock()
}
// nolint: lll
// if there too many files(>10), only print file count
if n := len(c.dataFiles); n > 10 {
return fmt.Sprintf("%s/[size: %d][fallback: %v][nosync: %v][nopos: %v][nolock: %v][files: %d][maxDataSize: %d][batchSize: %d][capacity: %d][dataFiles: %d]",
c.path, c.size.Load(), c.noFallbackOnError, c.noSync, c.noPos, c.noLock, len(c.dataFiles), c.maxDataSize, c.batchSize, c.capacity, n,
)
} else {
// nolint: lll
return fmt.Sprintf("%s/[size: %d][fallback: %v][nosync: %v][nopos: %v][nolock: %v][files: %d][maxDataSize: %d][batchSize: %d][capacity: %d][dataFiles: %v]",
c.path, c.size.Load(), c.noFallbackOnError, c.noSync, c.noLock, c.noPos, len(c.dataFiles), c.maxDataSize, c.batchSize, c.capacity, c.dataFiles,
)
}
}
func (c *DiskCache) Pretty() string {
if c.rwlock != nil {
c.rwlock.Lock()
defer c.rwlock.Unlock()
}
arr := []string{}
arr = append(arr, "path: "+c.path)
arr = append(arr, fmt.Sprintf("size: %d", c.size.Load()))
arr = append(arr, fmt.Sprintf("max-data-size: %d", c.maxDataSize))
arr = append(arr, fmt.Sprintf("capacity: %d", c.capacity))
arr = append(arr, fmt.Sprintf("data-files(%d):", len(c.dataFiles)))
for i, df := range c.dataFiles {
arr = append(arr, "\t"+df)
if i > 10 {
arr = append(arr, fmt.Sprintf("omitted %d files...", len(c.dataFiles)-i))
}
}
if c.rfd != nil {
arr = append(arr, "cur-read: "+c.rfd.Name())
} else {
arr = append(arr, "no-Get()")
}
return strings.Join(arr, "\n")
}