diff --git a/README.md b/README.md index 02e9895..9cc9770 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,9 @@ [![Go Reference](https://pkg.go.dev/badge/github.com/klev-dev/klevdb.svg)](https://pkg.go.dev/github.com/klev-dev/klevdb) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) -klevdb is a fast message store, written in Go. Think single partition on kafka, but stored locally. +klevdb is a fast message store, written in Go. Think single partition on Kafka, but stored locally. -In addition to basic consuming by offset, you can also configure klevdb to index times and keys. Times index allow you to quickly find a message by its time (or the first message after a certain time). Keys index allow you to quickly find the last message with a given key. +In addition to basic consuming by offset, you can also configure klevdb to index times and keys. Time indexes allow you to quickly find a message by its time (or the first message after a certain time). Key indexes allow you to quickly find the last message with a given key. ## Usage @@ -22,6 +22,8 @@ To use klevdb: package main import ( + "fmt" + "github.com/klev-dev/klevdb" ) @@ -93,7 +95,7 @@ ok github.com/klev-dev/klevdb 12.433s With default rollover of 1MB, for messages with keys 10B and values 128B: * ~300,000 writes/sec, no indexes * ~250,000 writes/sec, with all indexes enabled - * scales lineary with the batch size + * scales linearly with the batch size ### Consume ``` diff --git a/api.go b/api.go index bb7996e..7cef87b 100644 --- a/api.go +++ b/api.go @@ -13,7 +13,7 @@ const ( // OffsetOldest represents the smallest offset still available // Use it to consume all messages, starting at the first available OffsetOldest = message.OffsetOldest - // OffsetNewest represents the offset that will be used for the next produce + // OffsetNewest represents the offset that will be used for the next publish // Use it to consume only new messages OffsetNewest = message.OffsetNewest // OffsetInvalid is the offset returned when error is detected @@ -22,7 +22,7 @@ const ( type Message = message.Message -// InvalidMessage returned when an error have occurred +// InvalidMessage returned when an error has occurred var InvalidMessage = message.Invalid // ErrInvalidOffset error is returned when the offset attribute is invalid or out of bounds @@ -50,7 +50,7 @@ type Options struct { TimeIndex bool // Force filesystem sync after each Publish AutoSync bool - // At what segment size it will rollover to a new segment. Defaults to 1mb. + // At what segment size it will rollover to a new segment. Defaults to 1MB. Rollover int64 // Check the head segment for integrity, before opening it for reading/writing. Check bool @@ -60,7 +60,7 @@ type Log interface { // Publish appends messages to the log. // It returns the offset of the next message to be appended. // The offset of the message is ignored, set to the actual offset. - // If the time of the message is 0, it set to the current UTC time. + // If the time of the message is 0, it is set to the current UTC time. Publish(messages []Message) (nextOffset int64, err error) // NextOffset returns the offset of the next message to be published. @@ -80,7 +80,7 @@ type Log interface { // from the next available offset. // Consume is allowed to return no messages, but with increasing nextOffset // in case messages between offset and nextOffset have been deleted. - // NextOffset is always bigger then offset, unless we are caught up + // NextOffset is always bigger than offset, unless we are caught up // to the head of the log in which case they are equal. Consume(offset int64, maxCount int64) (nextOffset int64, messages []Message, err error) @@ -93,7 +93,7 @@ type Log interface { // If offset is before the first available on the log, or is after // NextOffset, it returns ErrInvalidOffset // If log is empty, it returns ErrInvalidOffset - // If the exact offset have been deleted, it returns ErrNotFound + // If the exact offset has been deleted, it returns ErrNotFound Get(offset int64) (message Message, err error) // GetByKey retrieves the last message in the log for this key diff --git a/blocking.go b/blocking.go index 51701c6..bf6ed8a 100644 --- a/blocking.go +++ b/blocking.go @@ -10,7 +10,7 @@ import ( type BlockingLog interface { Log - // ConsumeBlocking is similar to [Consume], but if offset is equal to the next offset it will block until next message is produced + // ConsumeBlocking is similar to [Consume], but if offset is equal to the next offset it will block until next message is published ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error) // ConsumeByKeyBlocking is similar to [ConsumeBlocking], but only returns messages matching the key @@ -52,14 +52,14 @@ func (l *blockingLog) Publish(messages []Message) (int64, error) { func (l *blockingLog) ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (int64, []Message, error) { if err := l.notify.Wait(ctx, offset); err != nil { - return 0, nil, err + return OffsetInvalid, nil, err } return l.Consume(offset, maxCount) } func (l *blockingLog) ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (int64, []Message, error) { if err := l.notify.Wait(ctx, offset); err != nil { - return 0, nil, err + return OffsetInvalid, nil, err } return l.ConsumeByKey(key, offset, maxCount) } diff --git a/compact/deletes.go b/compact/deletes.go index 45daa18..7d7e734 100644 --- a/compact/deletes.go +++ b/compact/deletes.go @@ -41,7 +41,7 @@ SEARCH: continue } - // not seen it (first instance) whithout value (e.g. delete) + // not seen it (first instance) without value (e.g. delete) if msg.Value == nil { offsets[msg.Offset] = struct{}{} } @@ -66,7 +66,7 @@ SEARCH: // It will not remove messages for keys it sees before that offset. // // This is similar to removing keys, which were deleted (e.g. value set to nil) -// and are therfore no longer relevant/active. +// and are therefore no longer relevant/active. // // returns the offsets it deleted and the amount of storage freed func Deletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) { diff --git a/delete.go b/delete.go index 6147a55..d2e821f 100644 --- a/delete.go +++ b/delete.go @@ -6,7 +6,7 @@ import ( "time" ) -// DeleteMultiBackoff is call on each iteration of [DeleteMulti] to give applications +// DeleteMultiBackoff is called on each iteration of [DeleteMulti] to give applications // opportunity to not overload the target log with deletes type DeleteMultiBackoff func(context.Context) error @@ -31,13 +31,14 @@ func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff { // and size, together with the error // // [DeleteMultiBackoff] is called on each iteration to give -// others a chanse to work with the log, while being deleted +// others a chance to work with the log, while being deleted func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + var remainingOffsets = maps.Clone(offsets) var deletedOffsets = map[int64]struct{}{} var deletedSize int64 - for len(offsets) > 0 { - deleted, size, err := l.Delete(offsets) + for len(remainingOffsets) > 0 { + deleted, size, err := l.Delete(remainingOffsets) switch { case err != nil: return deletedOffsets, deletedSize, err @@ -47,7 +48,7 @@ func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff maps.Copy(deletedOffsets, deleted) deletedSize += size - maps.DeleteFunc(offsets, func(k int64, v struct{}) bool { + maps.DeleteFunc(remainingOffsets, func(k int64, v struct{}) bool { _, ok := deleted[k] return ok }) diff --git a/log.go b/log.go index b0c00c5..6a62580 100644 --- a/log.go +++ b/log.go @@ -3,10 +3,8 @@ package klevdb import ( "errors" "fmt" - "maps" "os" "path/filepath" - "slices" "sync" "time" @@ -218,7 +216,7 @@ func (l *log) ConsumeByKey(key []byte, offset int64, maxCount int64) (int64, []m l.readersMu.RLock() defer l.readersMu.RUnlock() - rdr, index := segment.Consume(l.readers, offset) + rdr, segmentIndex := segment.Consume(l.readers, offset) for { nextOffset, msgs, err := rdr.ConsumeByKey(key, hash, offset, maxCount) if err != nil { @@ -227,12 +225,12 @@ func (l *log) ConsumeByKey(key []byte, offset int64, maxCount int64) (int64, []m if len(msgs) > 0 { return nextOffset, msgs, err } - if index >= len(l.readers)-1 { + if segmentIndex >= len(l.readers)-1 { return nextOffset, msgs, err } - index += 1 - rdr = l.readers[index] + segmentIndex += 1 + rdr = l.readers[segmentIndex] offset = message.OffsetOldest } } @@ -423,10 +421,7 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err } func (l *log) findDeleteReader(offsets map[int64]struct{}) (*reader, error) { - orderedOffsets := slices.Collect(maps.Keys(offsets)) - slices.Sort(orderedOffsets) - lowestOffset := orderedOffsets[0] - + lowestOffset := message.MinOffset(offsets) if lowestOffset < 0 { return nil, errDeleteRelative } @@ -502,7 +497,7 @@ func (l *log) Sync() (int64, error) { defer l.writerMu.Unlock() if err := l.writer.Sync(); err != nil { - return OffsetInvalid, nil + return OffsetInvalid, err } return l.writer.GetNextOffset() } diff --git a/message/format.go b/message/format.go index 028677c..4717a09 100644 --- a/message/format.go +++ b/message/format.go @@ -45,7 +45,7 @@ func OpenWriter(path string) (*Writer, error) { return &Writer{Path: path, f: f, pos: stat.Size()}, nil } -func (w *Writer) Write(m Message) (int64, error) { +func (w *Writer) Write(m Message) (int64, error) { // TODO update message format var fullSize = 8 + // offset 8 + // unix micro 4 + // key size @@ -220,11 +220,11 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err func (r *Reader) Close() error { if r.ra != nil { if err := r.ra.Close(); err != nil { - return fmt.Errorf("write mem log close: %w", err) + return fmt.Errorf("read mem log close: %w", err) } } else { if err := r.r.Close(); err != nil { - return fmt.Errorf("write log close: %w", err) + return fmt.Errorf("read log close: %w", err) } } return nil diff --git a/message/message.go b/message/message.go index 2d7c3c2..f6f16ed 100644 --- a/message/message.go +++ b/message/message.go @@ -30,6 +30,18 @@ type Message struct { var Invalid = Message{Offset: OffsetInvalid} +func MinOffset(offsets map[int64]struct{}) int64 { + min := OffsetInvalid + first := true + for offset := range offsets { + if first || offset < min { + min = offset + first = false + } + } + return min +} + func Gen(count int) []Message { var msgs = make([]Message, count) for i := range msgs { diff --git a/reader.go b/reader.go index b6e0325..df35b52 100644 --- a/reader.go +++ b/reader.go @@ -2,6 +2,7 @@ package klevdb import ( "bytes" + "fmt" "sync" "sync/atomic" "time" @@ -402,6 +403,9 @@ func (r *reader) Close() error { if r.messages == nil { return nil } + if r.messagesInuse.Load() > 0 { + return fmt.Errorf("close failed: consume in progress") + } if err := r.messages.Close(); err != nil { return err diff --git a/segment/segment.go b/segment/segment.go index ce80d7b..afd401d 100644 --- a/segment/segment.go +++ b/segment/segment.go @@ -4,10 +4,8 @@ import ( "errors" "fmt" "io" - "maps" "os" "path/filepath" - "slices" "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" @@ -327,9 +325,7 @@ type RewriteSegment struct { } func (r *RewriteSegment) GetNewSegment() Segment { - orderedOffsets := slices.Collect(maps.Keys(r.SurviveOffsets)) - slices.Sort(orderedOffsets) - lowestOffset := orderedOffsets[0] + lowestOffset := message.MinOffset(r.SurviveOffsets) return New(r.Segment.Dir, lowestOffset) } diff --git a/trim/count.go b/trim/count.go index 72b6581..b37d8dc 100644 --- a/trim/count.go +++ b/trim/count.go @@ -7,7 +7,7 @@ import ( ) // FindByCount returns a set of offsets for messages that when -// removed will keep number of the messages in the log less then max +// removed will keep the number of messages in the log under max func FindByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{}, error) { stats, err := l.Stat() switch { diff --git a/trim/offset.go b/trim/offset.go index 8107a6d..7cb90a2 100644 --- a/trim/offset.go +++ b/trim/offset.go @@ -7,7 +7,7 @@ import ( "github.com/klev-dev/klevdb/message" ) -// FindByOffset returns a set of offsets for messages that +// FindByOffset returns a set of offsets for messages whose // offset is before a given offset func FindByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]struct{}, error) { if before == message.OffsetOldest { diff --git a/trim/size.go b/trim/size.go index 15db6d8..8b70f88 100644 --- a/trim/size.go +++ b/trim/size.go @@ -53,7 +53,7 @@ func FindBySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{} return offsets, nil } -// BySize tries to remove messages until log size is less then sz +// BySize tries to remove messages until log size is less than sz // // returns the offsets it deleted and the amount of storage freed func BySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{}, int64, error) { diff --git a/typed.go b/typed.go index 26d248d..35a6729 100644 --- a/typed.go +++ b/typed.go @@ -32,13 +32,13 @@ type TLog[K any, V any] interface { // GetByKey see [Log.GetByKey] GetByKey(key K, empty bool) (message TMessage[K, V], err error) - // OffsetByKey see [Lot.OffsetByKey] + // OffsetByKey see [Log.OffsetByKey] OffsetByKey(key K, empty bool) (int64, error) // GetByTime see [Log.GetByTime] GetByTime(start time.Time) (message TMessage[K, V], err error) - // OffsetByTime see [Lot.OffsetByTime] + // OffsetByTime see [Log.OffsetByTime] OffsetByTime(start time.Time) (offset int64, messageTime time.Time, err error) // Delete see [Log.Delete] @@ -62,7 +62,7 @@ type TLog[K any, V any] interface { // Close see [Log.Close] Close() error - // Raw returns the wrapped in log + // Raw returns the underlying log Raw() Log } diff --git a/typed_blocking.go b/typed_blocking.go index 9f3d7f7..758429e 100644 --- a/typed_blocking.go +++ b/typed_blocking.go @@ -17,7 +17,7 @@ type TBlockingLog[K any, V any] interface { ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) } -// OpenTBlocking opens tlog and wraps it with support for blocking consume +// OpenTBlocking opens a [TLog] and wraps it with support for blocking consume func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TBlockingLog[K, V], error) { l, err := OpenT(dir, opts, keyCodec, valueCodec) if err != nil { @@ -26,7 +26,7 @@ func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], va return WrapTBlocking(l) } -// WrapTBlocking wraps tlog with support for blocking consume +// WrapTBlocking wraps a [TLog] with support for blocking consume func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error) { next, err := l.NextOffset() if err != nil { diff --git a/typed_codec.go b/typed_codec.go index cfa4e5d..529a09a 100644 --- a/typed_codec.go +++ b/typed_codec.go @@ -12,7 +12,7 @@ type Codec[T any] interface { Decode(b []byte) (t T, empty bool, err error) } -// JsonCodec supports coding values as a JSON +// JsonCodec supports coding values as JSON type JsonCodec[T any] struct{} func (c JsonCodec[T]) Encode(t T, empty bool) ([]byte, error) { @@ -49,7 +49,7 @@ func (c stringOptCodec) Decode(b []byte) (string, bool, error) { return s, false, err } -// StringOptCodec supports coding an optional string, e.g. differantiates between "" and nil strings +// StringOptCodec supports coding an optional string, e.g. differentiates between "" and nil strings var StringOptCodec = stringOptCodec{} type stringCodec struct{}