Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
[![Go Reference](https://pkg.go.dev/badge/github.com/klev-dev/klevdb.svg)](https://pkg.go.dev/github.com/klev-dev/klevdb)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

klevdb is a fast message store, written in Go. Think single partition on kafka, but stored locally.
klevdb is a fast message store, written in Go. Think single partition on Kafka, but stored locally.

In addition to basic consuming by offset, you can also configure klevdb to index times and keys. Times index allow you to quickly find a message by its time (or the first message after a certain time). Keys index allow you to quickly find the last message with a given key.
In addition to basic consuming by offset, you can also configure klevdb to index times and keys. Time indexes allow you to quickly find a message by its time (or the first message after a certain time). Key indexes allow you to quickly find the last message with a given key.

## Usage

Expand All @@ -22,6 +22,8 @@ To use klevdb:
package main

import (
"fmt"

"github.com/klev-dev/klevdb"
)

Expand Down Expand Up @@ -93,7 +95,7 @@ ok github.com/klev-dev/klevdb 12.433s
With default rollover of 1MB, for messages with keys 10B and values 128B:
* ~300,000 writes/sec, no indexes
* ~250,000 writes/sec, with all indexes enabled
* scales lineary with the batch size
* scales linearly with the batch size

### Consume
```
Expand Down
12 changes: 6 additions & 6 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
// OffsetOldest represents the smallest offset still available
// Use it to consume all messages, starting at the first available
OffsetOldest = message.OffsetOldest
// OffsetNewest represents the offset that will be used for the next produce
// OffsetNewest represents the offset that will be used for the next publish
// Use it to consume only new messages
OffsetNewest = message.OffsetNewest
// OffsetInvalid is the offset returned when error is detected
Expand All @@ -22,7 +22,7 @@ const (

type Message = message.Message

// InvalidMessage returned when an error have occurred
// InvalidMessage returned when an error has occurred
var InvalidMessage = message.Invalid

// ErrInvalidOffset error is returned when the offset attribute is invalid or out of bounds
Expand Down Expand Up @@ -50,7 +50,7 @@ type Options struct {
TimeIndex bool
// Force filesystem sync after each Publish
AutoSync bool
// At what segment size it will rollover to a new segment. Defaults to 1mb.
// At what segment size it will rollover to a new segment. Defaults to 1MB.
Rollover int64
// Check the head segment for integrity, before opening it for reading/writing.
Check bool
Expand All @@ -60,7 +60,7 @@ type Log interface {
// Publish appends messages to the log.
// It returns the offset of the next message to be appended.
// The offset of the message is ignored, set to the actual offset.
// If the time of the message is 0, it set to the current UTC time.
// If the time of the message is 0, it is set to the current UTC time.
Publish(messages []Message) (nextOffset int64, err error)

// NextOffset returns the offset of the next message to be published.
Expand All @@ -80,7 +80,7 @@ type Log interface {
// from the next available offset.
// Consume is allowed to return no messages, but with increasing nextOffset
// in case messages between offset and nextOffset have been deleted.
// NextOffset is always bigger then offset, unless we are caught up
// NextOffset is always bigger than offset, unless we are caught up
// to the head of the log in which case they are equal.
Consume(offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)

Expand All @@ -93,7 +93,7 @@ type Log interface {
// If offset is before the first available on the log, or is after
// NextOffset, it returns ErrInvalidOffset
// If log is empty, it returns ErrInvalidOffset
// If the exact offset have been deleted, it returns ErrNotFound
// If the exact offset has been deleted, it returns ErrNotFound
Get(offset int64) (message Message, err error)

// GetByKey retrieves the last message in the log for this key
Expand Down
6 changes: 3 additions & 3 deletions blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type BlockingLog interface {
Log

// ConsumeBlocking is similar to [Consume], but if offset is equal to the next offset it will block until next message is produced
// ConsumeBlocking is similar to [Consume], but if offset is equal to the next offset it will block until next message is published
ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)

// ConsumeByKeyBlocking is similar to [ConsumeBlocking], but only returns messages matching the key
Expand Down Expand Up @@ -52,14 +52,14 @@ func (l *blockingLog) Publish(messages []Message) (int64, error) {

func (l *blockingLog) ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (int64, []Message, error) {
if err := l.notify.Wait(ctx, offset); err != nil {
return 0, nil, err
return OffsetInvalid, nil, err
}
return l.Consume(offset, maxCount)
}

func (l *blockingLog) ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (int64, []Message, error) {
if err := l.notify.Wait(ctx, offset); err != nil {
return 0, nil, err
return OffsetInvalid, nil, err
}
return l.ConsumeByKey(key, offset, maxCount)
}
Expand Down
4 changes: 2 additions & 2 deletions compact/deletes.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ SEARCH:
continue
}

// not seen it (first instance) whithout value (e.g. delete)
// not seen it (first instance) without value (e.g. delete)
if msg.Value == nil {
offsets[msg.Offset] = struct{}{}
}
Expand All @@ -66,7 +66,7 @@ SEARCH:
// It will not remove messages for keys it sees before that offset.
//
// This is similar to removing keys, which were deleted (e.g. value set to nil)
// and are therfore no longer relevant/active.
// and are therefore no longer relevant/active.
//
// returns the offsets it deleted and the amount of storage freed
func Deletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) {
Expand Down
11 changes: 6 additions & 5 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"
)

// DeleteMultiBackoff is call on each iteration of [DeleteMulti] to give applications
// DeleteMultiBackoff is called on each iteration of [DeleteMulti] to give applications
// opportunity to not overload the target log with deletes
type DeleteMultiBackoff func(context.Context) error

Expand All @@ -31,13 +31,14 @@ func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff {
// and size, together with the error
//
// [DeleteMultiBackoff] is called on each iteration to give
// others a chanse to work with the log, while being deleted
// others a chance to work with the log, while being deleted
func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
var remainingOffsets = maps.Clone(offsets)
var deletedOffsets = map[int64]struct{}{}
var deletedSize int64

for len(offsets) > 0 {
deleted, size, err := l.Delete(offsets)
for len(remainingOffsets) > 0 {
deleted, size, err := l.Delete(remainingOffsets)
switch {
case err != nil:
return deletedOffsets, deletedSize, err
Expand All @@ -47,7 +48,7 @@ func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff

maps.Copy(deletedOffsets, deleted)
deletedSize += size
maps.DeleteFunc(offsets, func(k int64, v struct{}) bool {
maps.DeleteFunc(remainingOffsets, func(k int64, v struct{}) bool {
_, ok := deleted[k]
return ok
})
Expand Down
17 changes: 6 additions & 11 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package klevdb
import (
"errors"
"fmt"
"maps"
"os"
"path/filepath"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -218,7 +216,7 @@ func (l *log) ConsumeByKey(key []byte, offset int64, maxCount int64) (int64, []m
l.readersMu.RLock()
defer l.readersMu.RUnlock()

rdr, index := segment.Consume(l.readers, offset)
rdr, segmentIndex := segment.Consume(l.readers, offset)
for {
nextOffset, msgs, err := rdr.ConsumeByKey(key, hash, offset, maxCount)
if err != nil {
Expand All @@ -227,12 +225,12 @@ func (l *log) ConsumeByKey(key []byte, offset int64, maxCount int64) (int64, []m
if len(msgs) > 0 {
return nextOffset, msgs, err
}
if index >= len(l.readers)-1 {
if segmentIndex >= len(l.readers)-1 {
return nextOffset, msgs, err
}

index += 1
rdr = l.readers[index]
segmentIndex += 1
rdr = l.readers[segmentIndex]
offset = message.OffsetOldest
}
}
Expand Down Expand Up @@ -423,10 +421,7 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err
}

func (l *log) findDeleteReader(offsets map[int64]struct{}) (*reader, error) {
orderedOffsets := slices.Collect(maps.Keys(offsets))
slices.Sort(orderedOffsets)
lowestOffset := orderedOffsets[0]

lowestOffset := message.MinOffset(offsets)
if lowestOffset < 0 {
return nil, errDeleteRelative
}
Expand Down Expand Up @@ -502,7 +497,7 @@ func (l *log) Sync() (int64, error) {
defer l.writerMu.Unlock()

if err := l.writer.Sync(); err != nil {
return OffsetInvalid, nil
return OffsetInvalid, err
}
return l.writer.GetNextOffset()
}
Expand Down
6 changes: 3 additions & 3 deletions message/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func OpenWriter(path string) (*Writer, error) {
return &Writer{Path: path, f: f, pos: stat.Size()}, nil
}

func (w *Writer) Write(m Message) (int64, error) {
func (w *Writer) Write(m Message) (int64, error) { // TODO update message format
var fullSize = 8 + // offset
8 + // unix micro
4 + // key size
Expand Down Expand Up @@ -220,11 +220,11 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err
func (r *Reader) Close() error {
if r.ra != nil {
if err := r.ra.Close(); err != nil {
return fmt.Errorf("write mem log close: %w", err)
return fmt.Errorf("read mem log close: %w", err)
}
} else {
if err := r.r.Close(); err != nil {
return fmt.Errorf("write log close: %w", err)
return fmt.Errorf("read log close: %w", err)
}
}
return nil
Expand Down
12 changes: 12 additions & 0 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ type Message struct {

var Invalid = Message{Offset: OffsetInvalid}

func MinOffset(offsets map[int64]struct{}) int64 {
min := OffsetInvalid
first := true
for offset := range offsets {
if first || offset < min {
min = offset
first = false
}
}
return min
}

func Gen(count int) []Message {
var msgs = make([]Message, count)
for i := range msgs {
Expand Down
4 changes: 4 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package klevdb

import (
"bytes"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -402,6 +403,9 @@ func (r *reader) Close() error {
if r.messages == nil {
return nil
}
if r.messagesInuse.Load() > 0 {
return fmt.Errorf("close failed: consume in progress")
}

if err := r.messages.Close(); err != nil {
return err
Expand Down
6 changes: 1 addition & 5 deletions segment/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"errors"
"fmt"
"io"
"maps"
"os"
"path/filepath"
"slices"

"github.com/klev-dev/klevdb/index"
"github.com/klev-dev/klevdb/message"
Expand Down Expand Up @@ -327,9 +325,7 @@ type RewriteSegment struct {
}

func (r *RewriteSegment) GetNewSegment() Segment {
orderedOffsets := slices.Collect(maps.Keys(r.SurviveOffsets))
slices.Sort(orderedOffsets)
lowestOffset := orderedOffsets[0]
lowestOffset := message.MinOffset(r.SurviveOffsets)
return New(r.Segment.Dir, lowestOffset)
}

Expand Down
2 changes: 1 addition & 1 deletion trim/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// FindByCount returns a set of offsets for messages that when
// removed will keep number of the messages in the log less then max
// removed will keep the number of messages in the log under max
func FindByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{}, error) {
stats, err := l.Stat()
switch {
Expand Down
2 changes: 1 addition & 1 deletion trim/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/klev-dev/klevdb/message"
)

// FindByOffset returns a set of offsets for messages that
// FindByOffset returns a set of offsets for messages whose
// offset is before a given offset
func FindByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]struct{}, error) {
if before == message.OffsetOldest {
Expand Down
2 changes: 1 addition & 1 deletion trim/size.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func FindBySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{}
return offsets, nil
}

// BySize tries to remove messages until log size is less then sz
// BySize tries to remove messages until log size is less than sz
//
// returns the offsets it deleted and the amount of storage freed
func BySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{}, int64, error) {
Expand Down
6 changes: 3 additions & 3 deletions typed.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ type TLog[K any, V any] interface {
// GetByKey see [Log.GetByKey]
GetByKey(key K, empty bool) (message TMessage[K, V], err error)

// OffsetByKey see [Lot.OffsetByKey]
// OffsetByKey see [Log.OffsetByKey]
OffsetByKey(key K, empty bool) (int64, error)

// GetByTime see [Log.GetByTime]
GetByTime(start time.Time) (message TMessage[K, V], err error)

// OffsetByTime see [Lot.OffsetByTime]
// OffsetByTime see [Log.OffsetByTime]
OffsetByTime(start time.Time) (offset int64, messageTime time.Time, err error)

// Delete see [Log.Delete]
Expand All @@ -62,7 +62,7 @@ type TLog[K any, V any] interface {
// Close see [Log.Close]
Close() error

// Raw returns the wrapped in log
// Raw returns the underlying log
Raw() Log
}

Expand Down
4 changes: 2 additions & 2 deletions typed_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type TBlockingLog[K any, V any] interface {
ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)
}

// OpenTBlocking opens tlog and wraps it with support for blocking consume
// OpenTBlocking opens a [TLog] and wraps it with support for blocking consume
func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TBlockingLog[K, V], error) {
l, err := OpenT(dir, opts, keyCodec, valueCodec)
if err != nil {
Expand All @@ -26,7 +26,7 @@ func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], va
return WrapTBlocking(l)
}

// WrapTBlocking wraps tlog with support for blocking consume
// WrapTBlocking wraps a [TLog] with support for blocking consume
func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error) {
next, err := l.NextOffset()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions typed_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Codec[T any] interface {
Decode(b []byte) (t T, empty bool, err error)
}

// JsonCodec supports coding values as a JSON
// JsonCodec supports coding values as JSON
type JsonCodec[T any] struct{}

func (c JsonCodec[T]) Encode(t T, empty bool) ([]byte, error) {
Expand Down Expand Up @@ -49,7 +49,7 @@ func (c stringOptCodec) Decode(b []byte) (string, bool, error) {
return s, false, err
}

// StringOptCodec supports coding an optional string, e.g. differantiates between "" and nil strings
// StringOptCodec supports coding an optional string, e.g. differentiates between "" and nil strings
var StringOptCodec = stringOptCodec{}

type stringCodec struct{}
Expand Down