Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,19 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err
l.deleteMu.Lock()
defer l.deleteMu.Unlock()

return l.delete(offsets)
}

func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, error) {
rdr, err := l.findDeleteReader(offsets)
if err != nil {
return nil, 0, err
}

wasWriter := false
l.writerMu.Lock()
if l.writer.reader == rdr {
wasWriter = true
if err := l.writer.Sync(); err != nil {
l.writerMu.Unlock()
return nil, 0, err
Expand Down Expand Up @@ -396,6 +402,14 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err
}
l.writerMu.Unlock()

if wasWriter {
// A writing segment transformed into a reader, retry deleting
if err := rs.Segment.Remove(); err != nil {
return nil, 0, err
}
return l.delete(offsets)
}

// we are deleting in a reader segment
l.readersMu.Lock()
defer l.readersMu.Unlock()
Expand Down
79 changes: 69 additions & 10 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -20,10 +21,7 @@ import (

func publishBatched(t *testing.T, l Log, msgs []Message, batchLen int) {
for begin := 0; begin < len(msgs); begin += batchLen {
end := begin + batchLen
if end > len(msgs) {
end = len(msgs)
}
end := min(begin+batchLen, len(msgs))
startOffset, err := l.NextOffset()
require.NoError(t, err)

Expand Down Expand Up @@ -1518,6 +1516,7 @@ func TestConcurrent(t *testing.T) {
t.Run("PubsubRecent", testConcurrentPubsubRecent)
t.Run("Consume", testConcurrentConsume)
t.Run("Delete", testConcurrentDelete)
t.Run("DeleteRollover", testConcurrentDeleteRollover)
t.Run("GC", testConcurrentGC)
}

Expand All @@ -1540,7 +1539,7 @@ func testConcurrentPubsubRecent(t *testing.T) {
g.Go(func() error {
for i := 0; ctx.Err() == nil; i++ {
msgs := []Message{{
Key: []byte(fmt.Sprintf("%010d", i)),
Key: fmt.Appendf(nil, "%010d", i),
}}
_, err := s.Publish(msgs)
if err != nil {
Expand All @@ -1565,7 +1564,7 @@ func testConcurrentPubsubRecent(t *testing.T) {

offset = next
for _, msg := range msgs {
require.Equal(t, []byte(fmt.Sprintf("%010d", msg.Offset)), msg.Key)
require.Equal(t, fmt.Appendf(nil, "%010d", msg.Offset), msg.Key)
}
}
return nil
Expand Down Expand Up @@ -1601,14 +1600,14 @@ func testConcurrentConsume(t *testing.T) {
defer s.Close()

var wg sync.WaitGroup
for i := 0; i < 3; i++ {
for range 3 {
wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < 10000; i++ {
for i := range 10000 {
msgs := []Message{{
Key: []byte(fmt.Sprintf("%02d", i)),
Key: fmt.Appendf(nil, "%02d", i),
}}
_, err := s.Publish(msgs)
require.NoError(t, err)
Expand All @@ -1633,7 +1632,7 @@ func testConcurrentConsume(t *testing.T) {

time.Sleep(time.Millisecond)
for i := 0; i < 10000; i++ {
k := []byte(fmt.Sprintf("%02d", i))
k := fmt.Appendf(nil, "%02d", i)
_, err := s.GetByKey(k)
if errors.Is(err, ErrNotFound) {
i--
Expand Down Expand Up @@ -1696,6 +1695,66 @@ func testConcurrentDelete(t *testing.T) {
wg.Wait()
}

func testConcurrentDeleteRollover(t *testing.T) {
dir := t.TempDir()

s, err := Open(dir, Options{KeyIndex: true, TimeIndex: true, Rollover: 1024})
require.NoError(t, err)
defer s.Close()

msgs := message.Gen(1000)
msgSize := s.Size(msgs[0])

var wg sync.WaitGroup
wg.Add(1)
wg.Add(1)

var lastPublishOffset atomic.Int64
lastPublishOffset.Store(-1)
go func() {
defer wg.Done()

for i := range len(msgs) {
_, err := s.Publish(msgs[i : i+1])
require.NoError(t, err)
lastPublishOffset.Store(msgs[i].Offset)
}
}()

allDeleted := map[int64]struct{}{}
go func() {
defer wg.Done()

for !t.Failed() {
currentOffset := lastPublishOffset.Load()
if currentOffset < 0 {
continue
}

if currentOffset < int64(len(msgs)-1) {
deleted, sz, err := s.Delete(map[int64]struct{}{currentOffset: {}})
require.NoError(t, err)
require.Len(t, deleted, 1)
require.Equal(t, msgSize, sz)
allDeleted[currentOffset] = struct{}{}
} else {
break
}
}
}()

wg.Wait()

for _, msg := range msgs {
_, err := s.Get(msg.Offset)
if _, ok := allDeleted[msg.Offset]; ok {
require.Error(t, err)
} else {
require.NoError(t, err, "missing offset: %d", msg.Offset)
}
}
}

func testConcurrentGC(t *testing.T) {
dir := t.TempDir()

Expand Down
2 changes: 1 addition & 1 deletion message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Gen(count int) []Message {
for i := range msgs {
msgs[i] = Message{
Time: time.Date(2023, 1, 1, 0, 0, i, 0, time.UTC),
Key: []byte(fmt.Sprintf("%10d", i)),
Key: fmt.Appendf(nil, "%10d", i),
Value: []byte(strings.Repeat(" ", 128)),
}
}
Expand Down