From 892b567eb3a470d7380295c62857ba3b758a9402 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sun, 22 Feb 2026 10:29:23 -0500 Subject: [PATCH] fix for dropping messages when rollover while deleting --- log.go | 14 ++++++++ log_test.go | 79 ++++++++++++++++++++++++++++++++++++++++------ message/message.go | 2 +- 3 files changed, 84 insertions(+), 11 deletions(-) diff --git a/log.go b/log.go index 284bd20..db8bc67 100644 --- a/log.go +++ b/log.go @@ -344,13 +344,19 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err l.deleteMu.Lock() defer l.deleteMu.Unlock() + return l.delete(offsets) +} + +func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, error) { rdr, err := l.findDeleteReader(offsets) if err != nil { return nil, 0, err } + wasWriter := false l.writerMu.Lock() if l.writer.reader == rdr { + wasWriter = true if err := l.writer.Sync(); err != nil { l.writerMu.Unlock() return nil, 0, err @@ -396,6 +402,14 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err } l.writerMu.Unlock() + if wasWriter { + // A writing segment transformed into a reader, retry deleting + if err := rs.Segment.Remove(); err != nil { + return nil, 0, err + } + return l.delete(offsets) + } + // we are deleting in a reader segment l.readersMu.Lock() defer l.readersMu.Unlock() diff --git a/log_test.go b/log_test.go index 3409cca..7db4bc7 100644 --- a/log_test.go +++ b/log_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "testing" "time" @@ -20,10 +21,7 @@ import ( func publishBatched(t *testing.T, l Log, msgs []Message, batchLen int) { for begin := 0; begin < len(msgs); begin += batchLen { - end := begin + batchLen - if end > len(msgs) { - end = len(msgs) - } + end := min(begin+batchLen, len(msgs)) startOffset, err := l.NextOffset() require.NoError(t, err) @@ -1518,6 +1516,7 @@ func TestConcurrent(t *testing.T) { t.Run("PubsubRecent", testConcurrentPubsubRecent) t.Run("Consume", testConcurrentConsume) t.Run("Delete", testConcurrentDelete) + t.Run("DeleteRollover", testConcurrentDeleteRollover) t.Run("GC", testConcurrentGC) } @@ -1540,7 +1539,7 @@ func testConcurrentPubsubRecent(t *testing.T) { g.Go(func() error { for i := 0; ctx.Err() == nil; i++ { msgs := []Message{{ - Key: []byte(fmt.Sprintf("%010d", i)), + Key: fmt.Appendf(nil, "%010d", i), }} _, err := s.Publish(msgs) if err != nil { @@ -1565,7 +1564,7 @@ func testConcurrentPubsubRecent(t *testing.T) { offset = next for _, msg := range msgs { - require.Equal(t, []byte(fmt.Sprintf("%010d", msg.Offset)), msg.Key) + require.Equal(t, fmt.Appendf(nil, "%010d", msg.Offset), msg.Key) } } return nil @@ -1601,14 +1600,14 @@ func testConcurrentConsume(t *testing.T) { defer s.Close() var wg sync.WaitGroup - for i := 0; i < 3; i++ { + for range 3 { wg.Add(1) go func() { defer wg.Done() - for i := 0; i < 10000; i++ { + for i := range 10000 { msgs := []Message{{ - Key: []byte(fmt.Sprintf("%02d", i)), + Key: fmt.Appendf(nil, "%02d", i), }} _, err := s.Publish(msgs) require.NoError(t, err) @@ -1633,7 +1632,7 @@ func testConcurrentConsume(t *testing.T) { time.Sleep(time.Millisecond) for i := 0; i < 10000; i++ { - k := []byte(fmt.Sprintf("%02d", i)) + k := fmt.Appendf(nil, "%02d", i) _, err := s.GetByKey(k) if errors.Is(err, ErrNotFound) { i-- @@ -1696,6 +1695,66 @@ func testConcurrentDelete(t *testing.T) { wg.Wait() } +func testConcurrentDeleteRollover(t *testing.T) { + dir := t.TempDir() + + s, err := Open(dir, Options{KeyIndex: true, TimeIndex: true, Rollover: 1024}) + require.NoError(t, err) + defer s.Close() + + msgs := message.Gen(1000) + msgSize := s.Size(msgs[0]) + + var wg sync.WaitGroup + wg.Add(1) + wg.Add(1) + + var lastPublishOffset atomic.Int64 + lastPublishOffset.Store(-1) + go func() { + defer wg.Done() + + for i := range len(msgs) { + _, err := s.Publish(msgs[i : i+1]) + require.NoError(t, err) + lastPublishOffset.Store(msgs[i].Offset) + } + }() + + allDeleted := map[int64]struct{}{} + go func() { + defer wg.Done() + + for !t.Failed() { + currentOffset := lastPublishOffset.Load() + if currentOffset < 0 { + continue + } + + if currentOffset < int64(len(msgs)-1) { + deleted, sz, err := s.Delete(map[int64]struct{}{currentOffset: {}}) + require.NoError(t, err) + require.Len(t, deleted, 1) + require.Equal(t, msgSize, sz) + allDeleted[currentOffset] = struct{}{} + } else { + break + } + } + }() + + wg.Wait() + + for _, msg := range msgs { + _, err := s.Get(msg.Offset) + if _, ok := allDeleted[msg.Offset]; ok { + require.Error(t, err) + } else { + require.NoError(t, err, "missing offset: %d", msg.Offset) + } + } +} + func testConcurrentGC(t *testing.T) { dir := t.TempDir() diff --git a/message/message.go b/message/message.go index f6f16ed..690ceac 100644 --- a/message/message.go +++ b/message/message.go @@ -47,7 +47,7 @@ func Gen(count int) []Message { for i := range msgs { msgs[i] = Message{ Time: time.Date(2023, 1, 1, 0, 0, i, 0, time.UTC), - Key: []byte(fmt.Sprintf("%10d", i)), + Key: fmt.Appendf(nil, "%10d", i), Value: []byte(strings.Repeat(" ", 128)), } }