From 9b00e9ec5bcd3c21b56287ba0c5ab4a2185357f2 Mon Sep 17 00:00:00 2001 From: coanor Date: Mon, 19 Jan 2026 11:38:44 +0800 Subject: [PATCH 1/4] feat: use flock for WAL --- diskcache/diskcache.go | 4 +- diskcache/flock.go | 24 +++++ diskcache/{lock_test.go => flock_test.go} | 30 ++---- diskcache/flock_unix.go | 44 +++++++++ diskcache/flock_windows.go | 61 ++++++++++++ diskcache/lock.go | 110 ---------------------- diskcache/open.go | 4 +- 7 files changed, 141 insertions(+), 136 deletions(-) create mode 100644 diskcache/flock.go rename diskcache/{lock_test.go => flock_test.go} (70%) create mode 100644 diskcache/flock_unix.go create mode 100644 diskcache/flock_windows.go delete mode 100644 diskcache/lock.go diff --git a/diskcache/diskcache.go b/diskcache/diskcache.go index 38e93a5c..3fb4c9da 100644 --- a/diskcache/diskcache.go +++ b/diskcache/diskcache.go @@ -90,8 +90,8 @@ type DiskCache struct { rlock *InstrumentedMutex // read-lock: used to exclude concurrent Get on the tail file. rwlock *InstrumentedMutex // used to exclude switch/rotate/drop/Close on current disk cache instance. - flock *flock // disabled multi-Open on same path - pos *pos // current read fd position info + flock *walLock // disabled multi-Open on same path + pos *pos // current read fd position info // specs of current diskcache size atomic.Int64 // current byte size diff --git a/diskcache/flock.go b/diskcache/flock.go new file mode 100644 index 00000000..02c74d63 --- /dev/null +++ b/diskcache/flock.go @@ -0,0 +1,24 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import ( + "os" + "path/filepath" +) + +type walLock struct { + file string + f *os.File +} + +func newFlock(path string) *walLock { + file := filepath.Clean(filepath.Join(path, ".lock")) + + return &walLock{ + file: file, + } +} diff --git a/diskcache/lock_test.go b/diskcache/flock_test.go similarity index 70% rename from diskcache/lock_test.go rename to diskcache/flock_test.go index 22f93ec1..7e08de5b 100644 --- a/diskcache/lock_test.go +++ b/diskcache/flock_test.go @@ -6,8 +6,6 @@ package diskcache import ( - "os" - "runtime" "sync" T "testing" "time" @@ -15,22 +13,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestPidAlive(t *T.T) { - t.Run("pid-1", func(t *T.T) { - if runtime.GOOS != "windows" { - assert.True(t, pidAlive(1)) - } - }) - - t.Run("pid-not-exist", func(t *T.T) { - assert.False(t, pidAlive(-1)) - }) - - t.Run("cur-pid", func(t *T.T) { - assert.True(t, pidAlive(os.Getpid())) - }) -} - func TestLockUnlock(t *T.T) { t.Run("lock", func(t *T.T) { p := t.TempDir() @@ -42,8 +24,11 @@ func TestLockUnlock(t *T.T) { defer wg.Done() fl := newFlock(p) - assert.NoError(t, fl.lock()) - defer fl.unlock() + ok, err := fl.TryLock() + + assert.True(t, ok) + assert.NoError(t, err) + defer fl.Unlock() time.Sleep(time.Second * 5) }() @@ -54,7 +39,8 @@ func TestLockUnlock(t *T.T) { defer wg.Done() fl := newFlock(p) - err := fl.lock() + ok, err := fl.TryLock() + assert.False(t, ok) assert.Error(t, err) t.Logf("[expect] err: %s", err.Error()) @@ -68,7 +54,7 @@ func TestLockUnlock(t *T.T) { // try lock until ok for { - if err := fl.lock(); err != nil { + if ok, err := fl.TryLock(); !ok { t.Logf("[expect] err: %s", err.Error()) time.Sleep(time.Second) } else { diff --git a/diskcache/flock_unix.go b/diskcache/flock_unix.go new file mode 100644 index 00000000..65c9e7fb --- /dev/null +++ b/diskcache/flock_unix.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +//go:build !windows +// +build !windows + +package diskcache + +import ( + "fmt" + "os" + "syscall" +) + +func (l *walLock) TryLock() (bool, error) { + f, err := os.OpenFile(l.file, os.O_CREATE|os.O_RDWR, 0o666) + if err != nil { + return false, err + } + l.f = f + + // LOCK_EX = Exclusive, LOCK_NB = Non-blocking + err = syscall.Flock(int(l.f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + // If the error is EWOULDBLOCK, it means someone else has the lock + if err == syscall.EWOULDBLOCK { + l.f.Close() + return false, fmt.Errorf("locked") + } + l.f.Close() + return false, err + } + return true, nil +} + +func (l *walLock) Unlock() { + if l.f != nil { + syscall.Flock(int(l.f.Fd()), syscall.LOCK_UN) + l.f.Close() + os.Remove(l.file) // Optional on Unix + } +} diff --git a/diskcache/flock_windows.go b/diskcache/flock_windows.go new file mode 100644 index 00000000..338f88da --- /dev/null +++ b/diskcache/flock_windows.go @@ -0,0 +1,61 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +//go:build windows +// +build windows + +package diskcache + +import ( + "os" + "syscall" + "unsafe" +) + +var ( + modkernel32 = syscall.NewLazyDLL("kernel32.dll") + procLockFileEx = modkernel32.NewProc("LockFileEx") +) + +func (l *walLock) TryLock() (bool, error) { + f, err := os.OpenFile(l.file, os.O_CREATE|os.O_RDWR, 0o666) + if err != nil { + return false, err + } + l.f = f + + // LOCKFILE_EXCLUSIVE_LOCK = 2, LOCKFILE_FAIL_IMMEDIATELY = 1 + flags := uint32(2 | 1) + var overlapped syscall.Overlapped + + // Call Win32 LockFileEx + ret, _, err := procLockFileEx.Call( + uintptr(l.f.Fd()), + uintptr(flags), + 0, // reserved + 0, // length low + 1, // length high (lock 1 byte) + uintptr(unsafe.Pointer(&overlapped)), + ) + + if ret == 0 { + // ERROR_LOCK_VIOLATION = 33 + if errno, ok := err.(syscall.Errno); ok && errno == 33 { + l.f.Close() + return false, nil + } + l.f.Close() + return false, err + } + + return true, nil +} + +func (l *walLock) Unlock() { + if l.f != nil { + l.f.Close() // Closing the file handle automatically releases the lock in Windows + os.Remove(l.file) + } +} diff --git a/diskcache/lock.go b/diskcache/lock.go deleted file mode 100644 index 162920d8..00000000 --- a/diskcache/lock.go +++ /dev/null @@ -1,110 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the MIT License. -// This product includes software developed at Guance Cloud (https://www.guance.com/). -// Copyright 2021-present Guance, Inc. - -package diskcache - -import ( - "fmt" - "os" - "path/filepath" - "runtime" - "strconv" - "sync" - "syscall" -) - -type flock struct { - file string - mtx *sync.Mutex -} - -func newFlock(path string) *flock { - return &flock{ - file: filepath.Clean(filepath.Join(path, ".lock")), - mtx: &sync.Mutex{}, - } -} - -func (l *flock) lock() error { - l.mtx.Lock() - defer l.mtx.Unlock() - - curPid := os.Getpid() - - if _, err := os.Stat(l.file); err != nil { - goto write // file not exist - } else { - x, err := os.ReadFile(l.file) - if err != nil { - return WrapFileOperationError(OpRead, err, "", l.file). - WithDetails("failed_to_read_lock_file") - } - - if len(x) == 0 { - goto write - } - - pidInFile, err := strconv.Atoi(string(x)) - if err != nil { - return NewCacheError(OpLock, err, - fmt.Sprintf("failed_to_parse_pid_from_lock_file: content=%q", string(x))). - WithFile(l.file) - } else { - switch pidInFile { - case -1: // unlocked - goto write - case curPid: - return NewCacheError(OpLock, fmt.Errorf("already_locked_by_current_process"), ""). - WithFile(l.file).WithDetails(fmt.Sprintf("current_pid=%d", curPid)) - default: // other pid, may terminated - if pidAlive(pidInFile) { - return WrapLockError(fmt.Errorf("process_already_has_lock"), "", pidInFile). - WithFile(l.file) - } - } - } - } - -write: - if err := os.WriteFile(l.file, []byte(strconv.Itoa(curPid)), 0o600); err != nil { - return WrapFileOperationError(OpWrite, err, "", l.file). - WithDetails(fmt.Sprintf("failed_to_write_pid_to_lock_file: pid=%d", curPid)) - } - return nil -} - -func (l *flock) unlock() error { - l.mtx.Lock() - defer l.mtx.Unlock() - - if err := os.WriteFile(l.file, []byte(strconv.Itoa(-1)), 0o600); err != nil { - return WrapFileOperationError(OpWrite, err, "", l.file). - WithDetails("failed_to_write_unlock_marker") - } - return nil -} - -func pidAlive(pid int) bool { - p, err := os.FindProcess(pid) - if err != nil { - return false - } - - // Signal not available on windows. - if runtime.GOOS == "windows" { - return true - } - - if err := p.Signal(syscall.Signal(0)); err != nil { - switch err.Error() { - case "operation not permitted": - return true - default: - return false - } - } else { - return true - } -} diff --git a/diskcache/open.go b/diskcache/open.go index 38c22ad7..47a47181 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -107,7 +107,7 @@ func (c *DiskCache) doOpen() error { // disable open multiple times if !c.noLock { fl := newFlock(c.path) - if err := fl.lock(); err != nil { + if ok, err := fl.TryLock(); !ok { return WrapLockError(err, c.path, 0).WithDetails("failed_to_acquire_directory_lock") } else { c.flock = fl @@ -205,7 +205,7 @@ func (c *DiskCache) Close() error { if !c.noLock { if c.flock != nil { - if err := c.flock.unlock(); err != nil { + if ok, err := c.flock.TryLock(); !ok { return WrapLockError(err, c.path, 0).WithDetails("failed_to_release_directory_lock") } } From 47c4af1d515304ce67c9f8a95588874296d3e232 Mon Sep 17 00:00:00 2001 From: coanor Date: Mon, 19 Jan 2026 14:02:22 +0800 Subject: [PATCH 2/4] fix --- diskcache/flock_test.go | 23 +++++++++++++++++++---- diskcache/flock_unix.go | 4 ++-- diskcache/open.go | 6 ++---- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/diskcache/flock_test.go b/diskcache/flock_test.go index 7e08de5b..8598ec41 100644 --- a/diskcache/flock_test.go +++ b/diskcache/flock_test.go @@ -6,6 +6,8 @@ package diskcache import ( + "os" + "path/filepath" "sync" T "testing" "time" @@ -14,6 +16,19 @@ import ( ) func TestLockUnlock(t *T.T) { + t.Run("unlock-remove", func(t *T.T) { + p := t.TempDir() + fl := newFlock(p) + + ok, err := fl.tryLock() + assert.True(t, ok) + assert.NoError(t, err) + fl.unlock() + + _, err = os.Stat(filepath.Join(p, ".lock")) + assert.Error(t, err) + }) + t.Run("lock", func(t *T.T) { p := t.TempDir() @@ -24,11 +39,11 @@ func TestLockUnlock(t *T.T) { defer wg.Done() fl := newFlock(p) - ok, err := fl.TryLock() + ok, err := fl.tryLock() assert.True(t, ok) assert.NoError(t, err) - defer fl.Unlock() + defer fl.unlock() time.Sleep(time.Second * 5) }() @@ -39,7 +54,7 @@ func TestLockUnlock(t *T.T) { defer wg.Done() fl := newFlock(p) - ok, err := fl.TryLock() + ok, err := fl.tryLock() assert.False(t, ok) assert.Error(t, err) @@ -54,7 +69,7 @@ func TestLockUnlock(t *T.T) { // try lock until ok for { - if ok, err := fl.TryLock(); !ok { + if ok, err := fl.tryLock(); !ok { t.Logf("[expect] err: %s", err.Error()) time.Sleep(time.Second) } else { diff --git a/diskcache/flock_unix.go b/diskcache/flock_unix.go index 65c9e7fb..bef2952c 100644 --- a/diskcache/flock_unix.go +++ b/diskcache/flock_unix.go @@ -14,7 +14,7 @@ import ( "syscall" ) -func (l *walLock) TryLock() (bool, error) { +func (l *walLock) tryLock() (bool, error) { f, err := os.OpenFile(l.file, os.O_CREATE|os.O_RDWR, 0o666) if err != nil { return false, err @@ -35,7 +35,7 @@ func (l *walLock) TryLock() (bool, error) { return true, nil } -func (l *walLock) Unlock() { +func (l *walLock) unlock() { if l.f != nil { syscall.Flock(int(l.f.Fd()), syscall.LOCK_UN) l.f.Close() diff --git a/diskcache/open.go b/diskcache/open.go index 47a47181..47ff415f 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -107,7 +107,7 @@ func (c *DiskCache) doOpen() error { // disable open multiple times if !c.noLock { fl := newFlock(c.path) - if ok, err := fl.TryLock(); !ok { + if ok, err := fl.tryLock(); !ok { return WrapLockError(err, c.path, 0).WithDetails("failed_to_acquire_directory_lock") } else { c.flock = fl @@ -205,9 +205,7 @@ func (c *DiskCache) Close() error { if !c.noLock { if c.flock != nil { - if ok, err := c.flock.TryLock(); !ok { - return WrapLockError(err, c.path, 0).WithDetails("failed_to_release_directory_lock") - } + c.flock.unlock() } } From b57bdd2c1127a4635ef22dab4c61331fe0ace88d Mon Sep 17 00:00:00 2001 From: coanor Date: Mon, 19 Jan 2026 14:23:25 +0800 Subject: [PATCH 3/4] save --- diskcache/flock_test.go | 5 +++++ diskcache/flock_windows.go | 4 ++-- diskcache/open.go | 1 + diskcache/open_test.go | 4 ++++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/diskcache/flock_test.go b/diskcache/flock_test.go index 8598ec41..99525e7e 100644 --- a/diskcache/flock_test.go +++ b/diskcache/flock_test.go @@ -23,6 +23,11 @@ func TestLockUnlock(t *T.T) { ok, err := fl.tryLock() assert.True(t, ok) assert.NoError(t, err) + + fi, err := os.Stat(filepath.Join(p, ".lock")) + assert.NoError(t, err) + t.Logf("fi: %+#v", fi) + fl.unlock() _, err = os.Stat(filepath.Join(p, ".lock")) diff --git a/diskcache/flock_windows.go b/diskcache/flock_windows.go index 338f88da..4638bb70 100644 --- a/diskcache/flock_windows.go +++ b/diskcache/flock_windows.go @@ -19,7 +19,7 @@ var ( procLockFileEx = modkernel32.NewProc("LockFileEx") ) -func (l *walLock) TryLock() (bool, error) { +func (l *walLock) tryLock() (bool, error) { f, err := os.OpenFile(l.file, os.O_CREATE|os.O_RDWR, 0o666) if err != nil { return false, err @@ -53,7 +53,7 @@ func (l *walLock) TryLock() (bool, error) { return true, nil } -func (l *walLock) Unlock() { +func (l *walLock) unlock() { if l.f != nil { l.f.Close() // Closing the file handle automatically releases the lock in Windows os.Remove(l.file) diff --git a/diskcache/open.go b/diskcache/open.go index 47ff415f..1d2ddc4b 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -110,6 +110,7 @@ func (c *DiskCache) doOpen() error { if ok, err := fl.tryLock(); !ok { return WrapLockError(err, c.path, 0).WithDetails("failed_to_acquire_directory_lock") } else { + l.Infof("locked file %s ok", fl.file) c.flock = fl } } diff --git a/diskcache/open_test.go b/diskcache/open_test.go index 2f87f56a..befd5ec3 100644 --- a/diskcache/open_test.go +++ b/diskcache/open_test.go @@ -22,7 +22,11 @@ func TestOpen(t *T.T) { // lock then no-lock c, err := Open(WithPath(p)) assert.NoError(t, err) + + assert.FileExists(t, filepath.Join(p, ".lock")) + assert.NoError(t, c.Close()) + assert.NoFileExists(t, filepath.Join(p, ".lock")) c2, err := Open(WithPath(p), WithNoPos(true)) assert.NoError(t, err) From 66166b4d1db0ecaf2f1b603e9eda841c53675797 Mon Sep 17 00:00:00 2001 From: coanor Date: Fri, 30 Jan 2026 14:23:08 +0800 Subject: [PATCH 4/4] save --- diskcache/flock_unix.go | 39 +++++++++++++++++++++++++------------- diskcache/flock_windows.go | 20 +++++++++---------- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/diskcache/flock_unix.go b/diskcache/flock_unix.go index bef2952c..eacd04f0 100644 --- a/diskcache/flock_unix.go +++ b/diskcache/flock_unix.go @@ -4,41 +4,54 @@ // Copyright 2021-present Guance, Inc. //go:build !windows -// +build !windows package diskcache import ( + "errors" "fmt" "os" "syscall" ) -func (l *walLock) tryLock() (bool, error) { - f, err := os.OpenFile(l.file, os.O_CREATE|os.O_RDWR, 0o666) +func (wl *walLock) tryLock() (bool, error) { + f, err := os.OpenFile(wl.file, os.O_CREATE|os.O_RDWR, 0o666) // nolint: gosec if err != nil { return false, err } - l.f = f + wl.f = f // LOCK_EX = Exclusive, LOCK_NB = Non-blocking - err = syscall.Flock(int(l.f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + err = syscall.Flock(int(wl.f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) if err != nil { // If the error is EWOULDBLOCK, it means someone else has the lock - if err == syscall.EWOULDBLOCK { - l.f.Close() + if errors.Is(err, syscall.EWOULDBLOCK) { + if err := wl.f.Close(); err != nil { + l.Errorf("Close: %s", err.Error()) + } return false, fmt.Errorf("locked") } - l.f.Close() + + if err := wl.f.Close(); err != nil { + l.Errorf("Close: %s", err.Error()) + } return false, err } return true, nil } -func (l *walLock) unlock() { - if l.f != nil { - syscall.Flock(int(l.f.Fd()), syscall.LOCK_UN) - l.f.Close() - os.Remove(l.file) // Optional on Unix +func (wl *walLock) unlock() { + if wl.f != nil { + if err := syscall.Flock(int(wl.f.Fd()), syscall.LOCK_UN); err != nil { + l.Errorf("Flock: %s", err.Error()) + } + + if err := wl.f.Close(); err != nil { + l.Errorf("CLose: %s", err.Error()) + } + + if err := os.Remove(wl.file); err != nil { // Optional on Unix + l.Errorf("Remove: %s", err.Error()) + } } } diff --git a/diskcache/flock_windows.go b/diskcache/flock_windows.go index 4638bb70..ce7ffd9f 100644 --- a/diskcache/flock_windows.go +++ b/diskcache/flock_windows.go @@ -19,12 +19,12 @@ var ( procLockFileEx = modkernel32.NewProc("LockFileEx") ) -func (l *walLock) tryLock() (bool, error) { - f, err := os.OpenFile(l.file, os.O_CREATE|os.O_RDWR, 0o666) +func (wl *walLock) tryLock() (bool, error) { + f, err := os.OpenFile(wl.file, os.O_CREATE|os.O_RDWR, 0o666) if err != nil { return false, err } - l.f = f + wl.f = f // LOCKFILE_EXCLUSIVE_LOCK = 2, LOCKFILE_FAIL_IMMEDIATELY = 1 flags := uint32(2 | 1) @@ -32,7 +32,7 @@ func (l *walLock) tryLock() (bool, error) { // Call Win32 LockFileEx ret, _, err := procLockFileEx.Call( - uintptr(l.f.Fd()), + uintptr(wl.f.Fd()), uintptr(flags), 0, // reserved 0, // length low @@ -43,19 +43,19 @@ func (l *walLock) tryLock() (bool, error) { if ret == 0 { // ERROR_LOCK_VIOLATION = 33 if errno, ok := err.(syscall.Errno); ok && errno == 33 { - l.f.Close() + wl.f.Close() return false, nil } - l.f.Close() + wl.f.Close() return false, err } return true, nil } -func (l *walLock) unlock() { - if l.f != nil { - l.f.Close() // Closing the file handle automatically releases the lock in Windows - os.Remove(l.file) +func (wl *walLock) unlock() { + if wl.f != nil { + wl.f.Close() // Closing the file handle automatically releases the lock in Windows + os.Remove(wl.file) } }