Skip to content

Commit 892b567

Browse files
committed
fix for dropping messages when rollover while deleting
1 parent 04fd8f4 commit 892b567

3 files changed

Lines changed: 84 additions & 11 deletions

File tree

log.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,13 +344,19 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err
344344
l.deleteMu.Lock()
345345
defer l.deleteMu.Unlock()
346346

347+
return l.delete(offsets)
348+
}
349+
350+
func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, error) {
347351
rdr, err := l.findDeleteReader(offsets)
348352
if err != nil {
349353
return nil, 0, err
350354
}
351355

356+
wasWriter := false
352357
l.writerMu.Lock()
353358
if l.writer.reader == rdr {
359+
wasWriter = true
354360
if err := l.writer.Sync(); err != nil {
355361
l.writerMu.Unlock()
356362
return nil, 0, err
@@ -396,6 +402,14 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err
396402
}
397403
l.writerMu.Unlock()
398404

405+
if wasWriter {
406+
// A writing segment transformed into a reader, retry deleting
407+
if err := rs.Segment.Remove(); err != nil {
408+
return nil, 0, err
409+
}
410+
return l.delete(offsets)
411+
}
412+
399413
// we are deleting in a reader segment
400414
l.readersMu.Lock()
401415
defer l.readersMu.Unlock()

log_test.go

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"path/filepath"
99
"strings"
1010
"sync"
11+
"sync/atomic"
1112
"testing"
1213
"time"
1314

@@ -20,10 +21,7 @@ import (
2021

2122
func publishBatched(t *testing.T, l Log, msgs []Message, batchLen int) {
2223
for begin := 0; begin < len(msgs); begin += batchLen {
23-
end := begin + batchLen
24-
if end > len(msgs) {
25-
end = len(msgs)
26-
}
24+
end := min(begin+batchLen, len(msgs))
2725
startOffset, err := l.NextOffset()
2826
require.NoError(t, err)
2927

@@ -1518,6 +1516,7 @@ func TestConcurrent(t *testing.T) {
15181516
t.Run("PubsubRecent", testConcurrentPubsubRecent)
15191517
t.Run("Consume", testConcurrentConsume)
15201518
t.Run("Delete", testConcurrentDelete)
1519+
t.Run("DeleteRollover", testConcurrentDeleteRollover)
15211520
t.Run("GC", testConcurrentGC)
15221521
}
15231522

@@ -1540,7 +1539,7 @@ func testConcurrentPubsubRecent(t *testing.T) {
15401539
g.Go(func() error {
15411540
for i := 0; ctx.Err() == nil; i++ {
15421541
msgs := []Message{{
1543-
Key: []byte(fmt.Sprintf("%010d", i)),
1542+
Key: fmt.Appendf(nil, "%010d", i),
15441543
}}
15451544
_, err := s.Publish(msgs)
15461545
if err != nil {
@@ -1565,7 +1564,7 @@ func testConcurrentPubsubRecent(t *testing.T) {
15651564

15661565
offset = next
15671566
for _, msg := range msgs {
1568-
require.Equal(t, []byte(fmt.Sprintf("%010d", msg.Offset)), msg.Key)
1567+
require.Equal(t, fmt.Appendf(nil, "%010d", msg.Offset), msg.Key)
15691568
}
15701569
}
15711570
return nil
@@ -1601,14 +1600,14 @@ func testConcurrentConsume(t *testing.T) {
16011600
defer s.Close()
16021601

16031602
var wg sync.WaitGroup
1604-
for i := 0; i < 3; i++ {
1603+
for range 3 {
16051604
wg.Add(1)
16061605
go func() {
16071606
defer wg.Done()
16081607

1609-
for i := 0; i < 10000; i++ {
1608+
for i := range 10000 {
16101609
msgs := []Message{{
1611-
Key: []byte(fmt.Sprintf("%02d", i)),
1610+
Key: fmt.Appendf(nil, "%02d", i),
16121611
}}
16131612
_, err := s.Publish(msgs)
16141613
require.NoError(t, err)
@@ -1633,7 +1632,7 @@ func testConcurrentConsume(t *testing.T) {
16331632

16341633
time.Sleep(time.Millisecond)
16351634
for i := 0; i < 10000; i++ {
1636-
k := []byte(fmt.Sprintf("%02d", i))
1635+
k := fmt.Appendf(nil, "%02d", i)
16371636
_, err := s.GetByKey(k)
16381637
if errors.Is(err, ErrNotFound) {
16391638
i--
@@ -1696,6 +1695,66 @@ func testConcurrentDelete(t *testing.T) {
16961695
wg.Wait()
16971696
}
16981697

1698+
func testConcurrentDeleteRollover(t *testing.T) {
1699+
dir := t.TempDir()
1700+
1701+
s, err := Open(dir, Options{KeyIndex: true, TimeIndex: true, Rollover: 1024})
1702+
require.NoError(t, err)
1703+
defer s.Close()
1704+
1705+
msgs := message.Gen(1000)
1706+
msgSize := s.Size(msgs[0])
1707+
1708+
var wg sync.WaitGroup
1709+
wg.Add(1)
1710+
wg.Add(1)
1711+
1712+
var lastPublishOffset atomic.Int64
1713+
lastPublishOffset.Store(-1)
1714+
go func() {
1715+
defer wg.Done()
1716+
1717+
for i := range len(msgs) {
1718+
_, err := s.Publish(msgs[i : i+1])
1719+
require.NoError(t, err)
1720+
lastPublishOffset.Store(msgs[i].Offset)
1721+
}
1722+
}()
1723+
1724+
allDeleted := map[int64]struct{}{}
1725+
go func() {
1726+
defer wg.Done()
1727+
1728+
for !t.Failed() {
1729+
currentOffset := lastPublishOffset.Load()
1730+
if currentOffset < 0 {
1731+
continue
1732+
}
1733+
1734+
if currentOffset < int64(len(msgs)-1) {
1735+
deleted, sz, err := s.Delete(map[int64]struct{}{currentOffset: {}})
1736+
require.NoError(t, err)
1737+
require.Len(t, deleted, 1)
1738+
require.Equal(t, msgSize, sz)
1739+
allDeleted[currentOffset] = struct{}{}
1740+
} else {
1741+
break
1742+
}
1743+
}
1744+
}()
1745+
1746+
wg.Wait()
1747+
1748+
for _, msg := range msgs {
1749+
_, err := s.Get(msg.Offset)
1750+
if _, ok := allDeleted[msg.Offset]; ok {
1751+
require.Error(t, err)
1752+
} else {
1753+
require.NoError(t, err, "missing offset: %d", msg.Offset)
1754+
}
1755+
}
1756+
}
1757+
16991758
func testConcurrentGC(t *testing.T) {
17001759
dir := t.TempDir()
17011760

message/message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func Gen(count int) []Message {
4747
for i := range msgs {
4848
msgs[i] = Message{
4949
Time: time.Date(2023, 1, 1, 0, 0, i, 0, time.UTC),
50-
Key: []byte(fmt.Sprintf("%10d", i)),
50+
Key: fmt.Appendf(nil, "%10d", i),
5151
Value: []byte(strings.Repeat(" ", 128)),
5252
}
5353
}

0 commit comments

Comments
 (0)