Skip to content

Commit bfb8672

Browse files
authored
Transaction (#40)
* Fix: Correct buffer manager policy call (123) Corrected the way the buffer manager policy is called in `logmgr.go`. The change ensures the correct method is used to allocate buffers for log blocks. * feat: Improve transaction manager (transaction) Refactor transaction manager to improve code style and add functionality. Added error handling and improved concurrency control. Added FindCell and InsertCell functions. * Refactor: Improve transaction manager and buffer list (1) Improve the transaction manager and buffer list by refactoring the Pin, UnPin, and Buffer functions. Add error handling and improve the overall design for better efficiency and clarity. Also, simplify the FindCell and InsertCell functions in transactionMgr.go. * feat: Enhance transaction management with error handling and new log record types * feat: Add locking mechanism and log record interface for transaction management * #5 feat: Implement locking mechanism and log record creation for transaction management * #5 feat: Enhance transaction manager with error handling and new log record creation * #5 feat: Add error handling to Pin, UnPin, and InsertCell methods in transaction manager * Refactor: Rename GetFileName to FileName Renamed `GetFileName` method to `FileName` for better readability and consistency. Updated all references to the renamed method. (transaction) * feat(transaction): Add Buffer() method to LogMgr Add a method to access the internal buffer of the LogMgr struct. This allows external access to the buffer for inspection or manipulation. * feat: Implement concurrency manager (transaction) Refactor concurrency management to use a more robust and thread-safe implementation. Move the `ConcurrencyMgr` to the `concurrency` package. Add shared and exclusive locking mechanisms with error handling and a release method. Implement two-phase locking protocol. * #5 feat: Improve lock table concurrency and error handling (1) Refactor lock table to use time.Second for timeout and improve error messages. Add unlock functionality and GetLockInfo helper method. Change package name from transaction to concurrency. * feat: Implement unified update log record (main) This commit implements a new log record type, `UnifiedUpdateRecord`, for handling unified updates in the logging system. It includes serialization and deserialization methods, as well as an `Undo` method for transaction rollback. The `CreateLogRecord` function is updated to handle this new record type. The old `log_record.go` file has been removed and replaced with a new, improved version. * Fix: Use correct method to get block filename (#123 (transaction)) Corrected the method used to retrieve the block filename in LogIterator. The `GetFileName` method was replaced with `FileName` for accuracy. * #5 feat: Implement recovery manager (transaction) This commit implements a recovery manager that handles logging, commit, rollback, and recovery operations for transactions. The `recoveryMgr.go` file in the `recovery` package now contains the implementation. The previous `recoveryMgr.go` file in the `transaction` package has been removed. The recovery manager uses unified log records to efficiently handle undo and redo operations. * feat: Refactor transaction management and logging (#5) This commit refactors the transaction management and logging system. The `LogRecord` interface is enhanced to include new operation types. The `TransactionMgr` is updated to use the new `recovery` and `concurrency` packages, improving concurrency control and recovery mechanisms. The `ITransaction` interface is moved to a new `transaction_interface` package. Error handling is improved throughout the codebase.
1 parent 48b267a commit bfb8672

15 files changed

Lines changed: 728 additions & 79 deletions

File tree

concurrency/concurrencyMgr.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package concurrency
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"ultraSQL/kfile"
7+
)
8+
9+
type ConcurrencyMgr struct {
10+
lTble *LockTable
11+
locks map[kfile.BlockId]string
12+
mu sync.RWMutex // Protect shared map access
13+
}
14+
15+
func NewConcurrencyMgr() *ConcurrencyMgr {
16+
return &ConcurrencyMgr{
17+
locks: make(map[kfile.BlockId]string),
18+
}
19+
}
20+
21+
func (cM *ConcurrencyMgr) SLock(blk kfile.BlockId) error {
22+
cM.mu.Lock()
23+
defer cM.mu.Unlock()
24+
25+
// If we already have any lock (S or X), no need to acquire again
26+
if _, exists := cM.locks[blk]; exists {
27+
return nil
28+
}
29+
30+
err := cM.lTble.sLock(blk)
31+
if err != nil {
32+
return fmt.Errorf("failed to acquire shared lock: %w", err)
33+
}
34+
35+
cM.locks[blk] = "S"
36+
return nil
37+
}
38+
39+
func (cM *ConcurrencyMgr) XLock(blk kfile.BlockId) error {
40+
cM.mu.Lock()
41+
defer cM.mu.Unlock()
42+
43+
// If we already have an X lock, no need to acquire again
44+
if cM.hasXLock(blk) {
45+
return nil
46+
}
47+
48+
// Following the two-phase locking protocol:
49+
// 1. First acquire S lock if we don't have any lock
50+
if _, exists := cM.locks[blk]; !exists {
51+
err := cM.lTble.sLock(blk)
52+
if err != nil {
53+
return fmt.Errorf("failed to acquire initial shared lock: %w", err)
54+
}
55+
cM.locks[blk] = "S"
56+
}
57+
58+
// 2. Then upgrade to X lock
59+
err := cM.lTble.xLock(blk)
60+
if err != nil {
61+
return fmt.Errorf("failed to upgrade to exclusive lock: %w", err)
62+
}
63+
64+
cM.locks[blk] = "X"
65+
return nil
66+
}
67+
68+
func (cM *ConcurrencyMgr) Release() error {
69+
cM.mu.Lock()
70+
defer cM.mu.Unlock()
71+
72+
var errs []error
73+
for blk := range cM.locks {
74+
if err := cM.lTble.unlock(blk); err != nil {
75+
errs = append(errs, fmt.Errorf("failed to release lock for block %v: %w", blk, err))
76+
}
77+
}
78+
79+
// Clear the locks map regardless of errors
80+
cM.locks = make(map[kfile.BlockId]string)
81+
82+
if len(errs) > 0 {
83+
return fmt.Errorf("errors during release: %v", errs)
84+
}
85+
return nil
86+
}
87+
88+
func (cM *ConcurrencyMgr) hasXLock(blk kfile.BlockId) bool {
89+
// Note: Caller must hold mutex
90+
lockType, ok := cM.locks[blk]
91+
return ok && lockType == "X"
92+
}
93+
94+
// Helper method to check current lock status
95+
func (cM *ConcurrencyMgr) GetLockType(blk kfile.BlockId) (string, bool) {
96+
cM.mu.RLock()
97+
defer cM.mu.RUnlock()
98+
99+
lockType, exists := cM.locks[blk]
100+
return lockType, exists
101+
}

concurrency/lockTable.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package concurrency
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
"ultraSQL/kfile"
8+
)
9+
10+
const MaxWaitTime = 10 * time.Second
11+
12+
type LockTable struct {
13+
locks map[kfile.BlockId]int // positive: number of shared locks, negative: exclusive lock
14+
mu sync.RWMutex
15+
cond *sync.Cond
16+
}
17+
18+
func NewLockTable() *LockTable {
19+
lt := &LockTable{
20+
locks: make(map[kfile.BlockId]int),
21+
}
22+
lt.cond = sync.NewCond(&lt.mu)
23+
return lt
24+
}
25+
26+
func (lT *LockTable) sLock(blk kfile.BlockId) error {
27+
lT.mu.Lock()
28+
defer lT.mu.Unlock()
29+
30+
deadline := time.Now().Add(MaxWaitTime)
31+
32+
// Wait while there's an exclusive lock on the block
33+
for lT.hasXLock(blk) {
34+
if time.Now().After(deadline) {
35+
return fmt.Errorf("shared lock acquisition timed out for block %v", blk)
36+
}
37+
lT.cond.Wait()
38+
}
39+
40+
// Increment the number of shared locks (or initialize to 1)
41+
val := lT.getLockVal(blk)
42+
lT.locks[blk] = val + 1
43+
return nil
44+
}
45+
46+
func (lT *LockTable) xLock(blk kfile.BlockId) error {
47+
lT.mu.Lock()
48+
defer lT.mu.Unlock()
49+
50+
deadline := time.Now().Add(MaxWaitTime)
51+
52+
// Wait while there are other locks (shared or exclusive)
53+
for lT.hasOtherLocks(blk) {
54+
if time.Now().After(deadline) {
55+
return fmt.Errorf("exclusive lock acquisition timed out for block %v", blk)
56+
}
57+
lT.cond.Wait()
58+
}
59+
60+
// Set to -1 to indicate exclusive lock
61+
lT.locks[blk] = -1
62+
return nil
63+
}
64+
65+
func (lT *LockTable) hasXLock(blk kfile.BlockId) bool {
66+
return lT.getLockVal(blk) < 0
67+
}
68+
69+
func (lT *LockTable) getLockVal(blk kfile.BlockId) int {
70+
val, exists := lT.locks[blk]
71+
if !exists {
72+
return 0
73+
}
74+
return val
75+
}
76+
77+
func (lT *LockTable) hasOtherLocks(blk kfile.BlockId) bool {
78+
val := lT.getLockVal(blk)
79+
return val != 0 && val != 1 // Allow upgrade from single shared lock
80+
}
81+
82+
func (lT *LockTable) unlock(blk kfile.BlockId) error {
83+
lT.mu.Lock()
84+
defer lT.mu.Unlock()
85+
86+
val := lT.getLockVal(blk)
87+
if val == 0 {
88+
return fmt.Errorf("attempting to unlock block %v which is not locked", blk)
89+
}
90+
91+
if val > 1 {
92+
// Decrement shared lock count
93+
lT.locks[blk] = val - 1
94+
} else {
95+
// Remove last shared lock or exclusive lock
96+
delete(lT.locks, blk)
97+
lT.cond.Broadcast() // Wake up waiting goroutines
98+
}
99+
return nil
100+
}
101+
102+
// Helper method to get lock information
103+
func (lT *LockTable) GetLockInfo(blk kfile.BlockId) (lockType string, count int) {
104+
lT.mu.RLock()
105+
defer lT.mu.RUnlock()
106+
107+
val := lT.getLockVal(blk)
108+
if val < 0 {
109+
return "exclusive", 1
110+
} else if val > 0 {
111+
return "shared", val
112+
}
113+
return "none", 0
114+
}

kfile/file.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func NewBlockId(filename string, blknum int) *BlockId {
2323
}
2424
}
2525

26-
func (b *BlockId) GetFileName() string {
26+
func (b *BlockId) FileName() string {
2727
return b.Filename
2828
}
2929

kfile/fileMgr.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (fm *FileMgr) PreallocateFile(blk *BlockId, size int64) error {
112112
return err
113113
}
114114

115-
filename := blk.GetFileName()
115+
filename := blk.FileName()
116116
if err := fm.validatePermissions(); err != nil {
117117
return err
118118
}
@@ -125,7 +125,7 @@ func (fm *FileMgr) validatePreallocationParams(blk *BlockId, size int64) error {
125125
if size%int64(fm.blocksize) != 0 {
126126
return fmt.Errorf("size must be a multiple of blocksize %d", fm.blocksize)
127127
}
128-
if blk.GetFileName() == "" {
128+
if blk.FileName() == "" {
129129
return fmt.Errorf("invalid filename")
130130
}
131131
return nil
@@ -193,14 +193,14 @@ func (fm *FileMgr) Read(blk *BlockId, p *SlottedPage) error {
193193
fm.mutex.RLock()
194194
defer fm.mutex.RUnlock()
195195

196-
f, err := fm.getFile(blk.GetFileName())
196+
f, err := fm.getFile(blk.FileName())
197197
if err != nil {
198198
return fmt.Errorf("failed to get file for block %v: %w", blk, err)
199199
}
200200

201201
offset := int64(blk.Number() * fm.blocksize)
202202
if _, err = f.Seek(offset, io.SeekStart); err != nil {
203-
return fmt.Errorf(seekErrFormat, offset, blk.GetFileName(), err)
203+
return fmt.Errorf(seekErrFormat, offset, blk.FileName(), err)
204204
}
205205
bytesRead, err := f.Read(p.Contents())
206206
if err != nil {
@@ -224,14 +224,14 @@ func (fm *FileMgr) Write(blk *BlockId, p *SlottedPage) error {
224224
fm.mutex.Lock()
225225
defer fm.mutex.Unlock()
226226

227-
f, err := fm.getFile(blk.GetFileName())
227+
f, err := fm.getFile(blk.FileName())
228228
if err != nil {
229229
return fmt.Errorf("failed to get file for block %v: %w", blk, err)
230230
}
231231

232232
offset := int64(blk.Number() * fm.blocksize)
233233
if _, err = f.Seek(offset, io.SeekStart); err != nil {
234-
return fmt.Errorf(seekErrFormat, offset, blk.GetFileName(), err)
234+
return fmt.Errorf(seekErrFormat, offset, blk.FileName(), err)
235235
}
236236
bytesWritten, err := f.Write(p.Contents())
237237
if err != nil {
@@ -241,7 +241,7 @@ func (fm *FileMgr) Write(blk *BlockId, p *SlottedPage) error {
241241
return fmt.Errorf("incomplete write: expected %d bytes, wrote %d", fm.blocksize, bytesWritten)
242242
}
243243
if err = f.Sync(); err != nil {
244-
return fmt.Errorf("failed to sync file %s: %w", blk.GetFileName(), err)
244+
return fmt.Errorf("failed to sync file %s: %w", blk.FileName(), err)
245245
}
246246

247247
fm.blocksWritten++
@@ -379,7 +379,7 @@ func (fm *FileMgr) WriteLog() []ReadWriteLogEntry {
379379

380380
// ensureFileSize ensures the file has at least the required number of blocks.
381381
func (fm *FileMgr) ensureFileSize(blk *BlockId, requiredBlocks int) error {
382-
currentBlocks, err := fm.Length(blk.GetFileName())
382+
currentBlocks, err := fm.Length(blk.FileName())
383383
if err != nil {
384384
return err
385385
}
@@ -399,7 +399,7 @@ func (fm *FileMgr) RenameFile(blk *BlockId, newFileName string) error {
399399
return fmt.Errorf("invalid new filename: %s", newFileName)
400400
}
401401

402-
oldFileName := blk.GetFileName()
402+
oldFileName := blk.FileName()
403403

404404
// Close the old file if it is open.
405405
fm.openFilesLock.Lock()

kfile/file__dir_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ func TestBlock(t *testing.T) {
4646
Blknum := 5
4747
blk := NewBlockId(Filename, Blknum)
4848

49-
if blk.GetFileName() != Filename {
50-
t.Errorf("Expected Filename %s, got %s", Filename, blk.GetFileName())
49+
if blk.FileName() != Filename {
50+
t.Errorf("Expected Filename %s, got %s", Filename, blk.FileName())
5151
}
5252

5353
if blk.Number() != Blknum {
@@ -126,13 +126,13 @@ func TestBlock(t *testing.T) {
126126

127127
// Test NextBlock
128128
next := blk.NextBlock()
129-
if next.Number() != 6 || next.GetFileName() != "test.db" {
129+
if next.Number() != 6 || next.FileName() != "test.db" {
130130
t.Error("NextBlock returned incorrect block")
131131
}
132132

133133
// Test PrevBlock
134134
prev := blk.PrevBlock()
135-
if prev.Number() != 4 || prev.GetFileName() != "test.db" {
135+
if prev.Number() != 4 || prev.FileName() != "test.db" {
136136
t.Error("PrevBlock returned incorrect block")
137137
}
138138

@@ -372,8 +372,8 @@ func TestBlockId(t *testing.T) {
372372
blknum := 5
373373
blk := NewBlockId(filename, blknum)
374374

375-
if blk.GetFileName() != filename {
376-
t.Errorf("Expected Filename %s, got %s", filename, blk.GetFileName())
375+
if blk.FileName() != filename {
376+
t.Errorf("Expected Filename %s, got %s", filename, blk.FileName())
377377
}
378378

379379
if blk.Number() != blknum {
@@ -448,12 +448,12 @@ func TestBlockId(t *testing.T) {
448448
blk := NewBlockId("test.db", 5)
449449

450450
next := blk.NextBlock()
451-
if next.Number() != 6 || next.GetFileName() != "test.db" {
451+
if next.Number() != 6 || next.FileName() != "test.db" {
452452
t.Error("NextBlock returned incorrect block")
453453
}
454454

455455
prev := blk.PrevBlock()
456-
if prev.Number() != 4 || prev.GetFileName() != "test.db" {
456+
if prev.Number() != 4 || prev.FileName() != "test.db" {
457457
t.Error("PrevBlock returned incorrect block")
458458
}
459459

@@ -779,7 +779,7 @@ func TestFileRename(t *testing.T) {
779779
t.Errorf("Could not rename file %s", err)
780780
}
781781
want := new_file
782-
got := blk.GetFileName()
782+
got := blk.FileName()
783783
if want != got {
784784
t.Errorf("want %s but got %s", want, got)
785785
}

log/IRecord.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package log
2+
3+
const (
4+
CHECKPOINT = iota
5+
START
6+
COMMIT
7+
ROLLBACK
8+
SETINT
9+
SETSTRING
10+
)
11+
12+
type LogRecord interface {
13+
Op() int
14+
TxNumber() int
15+
Undo(txNum int)
16+
// Optionally: a method to serialize or convert to a Cell
17+
}

0 commit comments

Comments
 (0)