Skip to content

Commit d8ce7a9

Browse files
authored
Merge pull request #315 from bootjp/feature/refactor
fix: harden leader verification and optimize scans; add MVCC/route co…
2 parents 8d7753a + 16d3b50 commit d8ce7a9

20 files changed

Lines changed: 1423 additions & 65 deletions
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/bootjp/elastickv/distribution"
8+
pb "github.com/bootjp/elastickv/proto"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestDistributionServerGetRoute_HitAndMiss(t *testing.T) {
13+
t.Parallel()
14+
15+
engine := distribution.NewEngine()
16+
engine.UpdateRoute([]byte("a"), []byte("m"), 1)
17+
engine.UpdateRoute([]byte("m"), nil, 2)
18+
19+
s := NewDistributionServer(engine)
20+
ctx := context.Background()
21+
22+
hit, err := s.GetRoute(ctx, &pb.GetRouteRequest{Key: []byte("b")})
23+
require.NoError(t, err)
24+
require.Equal(t, []byte("a"), hit.Start)
25+
require.Equal(t, []byte("m"), hit.End)
26+
require.Equal(t, uint64(1), hit.RaftGroupId)
27+
28+
miss, err := s.GetRoute(ctx, &pb.GetRouteRequest{Key: []byte("0")})
29+
require.NoError(t, err)
30+
require.Equal(t, uint64(0), miss.RaftGroupId)
31+
require.Nil(t, miss.Start)
32+
require.Nil(t, miss.End)
33+
}
34+
35+
func TestDistributionServerGetTimestamp_IsMonotonic(t *testing.T) {
36+
t.Parallel()
37+
38+
s := NewDistributionServer(distribution.NewEngine())
39+
ctx := context.Background()
40+
41+
first, err := s.GetTimestamp(ctx, &pb.GetTimestampRequest{})
42+
require.NoError(t, err)
43+
44+
second, err := s.GetTimestamp(ctx, &pb.GetTimestampRequest{})
45+
require.NoError(t, err)
46+
47+
require.Greater(t, second.Timestamp, first.Timestamp)
48+
}

adapter/redis.go

Lines changed: 192 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ const (
3939
const (
4040
redisLatestCommitTimeout = 5 * time.Second
4141
redisDispatchTimeout = 10 * time.Second
42+
maxByteValue = 0xFF
4243
)
4344

4445
//nolint:mnd
@@ -383,19 +384,50 @@ func (r *RedisServer) localKeysExact(pattern []byte) ([][]byte, error) {
383384
if res {
384385
return [][]byte{bytes.Clone(pattern)}, nil
385386
}
387+
388+
isList, err := r.isListKeyAt(context.Background(), pattern, readTS)
389+
if err != nil {
390+
return nil, err
391+
}
392+
if isList {
393+
return [][]byte{bytes.Clone(pattern)}, nil
394+
}
386395
return [][]byte{}, nil
387396
}
388397

389398
func (r *RedisServer) localKeysPattern(pattern []byte) ([][]byte, error) {
390-
start := r.patternStart(pattern)
391-
399+
start, end := patternScanBounds(pattern)
400+
keyset := map[string][]byte{}
392401
readTS := r.readTS()
393-
keys, err := r.store.ScanAt(context.Background(), start, nil, math.MaxInt, readTS)
394-
if err != nil {
395-
return nil, errors.WithStack(err)
402+
403+
mergeScannedKeys := func(scanStart, scanEnd []byte) error {
404+
keys, err := r.store.ScanAt(context.Background(), scanStart, scanEnd, math.MaxInt, readTS)
405+
if err != nil {
406+
return errors.WithStack(err)
407+
}
408+
for k, v := range r.collectUserKeys(keys, pattern) {
409+
keyset[k] = v
410+
}
411+
return nil
396412
}
397413

398-
keyset := r.collectUserKeys(keys)
414+
if err := mergeScannedKeys(start, end); err != nil {
415+
return nil, err
416+
}
417+
418+
// User-key bounded scans like "foo*" do not naturally include internal list
419+
// keys ("!lst|..."), so scan list namespaces separately with mapped bounds.
420+
if start != nil || end != nil {
421+
metaStart, metaEnd := listPatternScanBounds(store.ListMetaPrefix, pattern)
422+
if err := mergeScannedKeys(metaStart, metaEnd); err != nil {
423+
return nil, err
424+
}
425+
426+
itemStart, itemEnd := listPatternScanBounds(store.ListItemPrefix, pattern)
427+
if err := mergeScannedKeys(itemStart, itemEnd); err != nil {
428+
return nil, err
429+
}
430+
}
399431

400432
out := make([][]byte, 0, len(keyset))
401433
for _, v := range keyset {
@@ -404,22 +436,102 @@ func (r *RedisServer) localKeysPattern(pattern []byte) ([][]byte, error) {
404436
return out, nil
405437
}
406438

407-
func (r *RedisServer) patternStart(pattern []byte) []byte {
439+
func patternScanBounds(pattern []byte) ([]byte, []byte) {
408440
if bytes.Equal(pattern, []byte("*")) {
441+
return nil, nil
442+
}
443+
444+
i := bytes.IndexByte(pattern, '*')
445+
if i <= 0 {
446+
return nil, nil
447+
}
448+
449+
start := bytes.Clone(pattern[:i])
450+
return start, prefixScanEnd(start)
451+
}
452+
453+
func listPatternScanBounds(prefix string, pattern []byte) ([]byte, []byte) {
454+
userStart, userEnd := patternScanBounds(pattern)
455+
prefixBytes := []byte(prefix)
456+
457+
if userStart == nil && userEnd == nil {
458+
return prefixBytes, prefixScanEnd(prefixBytes)
459+
}
460+
461+
start := append(bytes.Clone(prefixBytes), userStart...)
462+
if userEnd == nil {
463+
return start, prefixScanEnd(prefixBytes)
464+
}
465+
end := append(bytes.Clone(prefixBytes), userEnd...)
466+
return start, end
467+
}
468+
469+
func prefixScanEnd(prefix []byte) []byte {
470+
if len(prefix) == 0 {
409471
return nil
410472
}
411-
return bytes.ReplaceAll(pattern, []byte("*"), nil)
473+
474+
end := bytes.Clone(prefix)
475+
for i := len(end) - 1; i >= 0; i-- {
476+
if end[i] == maxByteValue {
477+
continue
478+
}
479+
end[i]++
480+
return end[:i+1]
481+
}
482+
483+
return nil
484+
}
485+
486+
func matchesAsteriskPattern(pattern, key []byte) bool {
487+
parts := bytes.Split(pattern, []byte("*"))
488+
if len(parts) == 1 {
489+
return bytes.Equal(pattern, key)
490+
}
491+
492+
pos := 0
493+
if len(parts[0]) > 0 {
494+
if !bytes.HasPrefix(key, parts[0]) {
495+
return false
496+
}
497+
pos = len(parts[0])
498+
}
499+
500+
for i := 1; i < len(parts)-1; i++ {
501+
part := parts[i]
502+
if len(part) == 0 {
503+
continue
504+
}
505+
idx := bytes.Index(key[pos:], part)
506+
if idx < 0 {
507+
return false
508+
}
509+
pos += idx + len(part)
510+
}
511+
512+
last := parts[len(parts)-1]
513+
if len(last) > 0 && !bytes.HasSuffix(key, last) {
514+
return false
515+
}
516+
517+
return true
412518
}
413519

414-
func (r *RedisServer) collectUserKeys(kvs []*store.KVPair) map[string][]byte {
520+
func (r *RedisServer) collectUserKeys(kvs []*store.KVPair, pattern []byte) map[string][]byte {
415521
keyset := map[string][]byte{}
416522
for _, kvPair := range kvs {
417523
if store.IsListMetaKey(kvPair.Key) || store.IsListItemKey(kvPair.Key) {
418524
if userKey := store.ExtractListUserKey(kvPair.Key); userKey != nil {
525+
if !matchesAsteriskPattern(pattern, userKey) {
526+
continue
527+
}
419528
keyset[string(userKey)] = userKey
420529
}
421530
continue
422531
}
532+
if !matchesAsteriskPattern(pattern, kvPair.Key) {
533+
continue
534+
}
423535
keyset[string(kvPair.Key)] = kvPair.Key
424536
}
425537
return keyset
@@ -507,6 +619,20 @@ type listTxnState struct {
507619
metaExists bool
508620
appends [][]byte
509621
deleted bool
622+
purge bool
623+
purgeMeta store.ListMeta
624+
}
625+
626+
func stageListDelete(st *listTxnState) {
627+
if st == nil {
628+
return
629+
}
630+
if st.metaExists {
631+
st.purge = true
632+
st.purgeMeta = st.meta
633+
}
634+
st.deleted = true
635+
st.appends = nil
510636
}
511637

512638
func (t *txnContext) load(key []byte) (*txnValue, error) {
@@ -586,16 +712,22 @@ func (t *txnContext) applySet(cmd redcon.Command) (redisResult, error) {
586712
}
587713

588714
func (t *txnContext) applyDel(cmd redcon.Command) (redisResult, error) {
589-
// handle list delete separately
590-
if isList, err := t.server.isListKeyAt(context.Background(), cmd.Args[1], t.startTS); err != nil {
715+
// Handle list delete through txn-local list state so subsequent commands in
716+
// the same MULTI observe the staged delete consistently.
717+
if st, ok := t.listStates[string(cmd.Args[1])]; ok {
718+
stageListDelete(st)
719+
return redisResult{typ: resultInt, integer: 1}, nil
720+
}
721+
isList, err := t.server.isListKeyAt(context.Background(), cmd.Args[1], t.startTS)
722+
if err != nil {
591723
return redisResult{}, err
592-
} else if isList {
724+
}
725+
if isList {
593726
st, err := t.loadListState(cmd.Args[1])
594727
if err != nil {
595728
return redisResult{}, err
596729
}
597-
st.deleted = true
598-
st.appends = nil
730+
stageListDelete(st)
599731
return redisResult{typ: resultInt, integer: 1}, nil
600732
}
601733

@@ -647,6 +779,17 @@ func (t *txnContext) applyRPush(cmd redcon.Command) (redisResult, error) {
647779
if err != nil {
648780
return redisResult{}, err
649781
}
782+
if st.deleted {
783+
if st.metaExists {
784+
st.purge = true
785+
st.purgeMeta = st.meta
786+
}
787+
// DEL followed by RPUSH in the same transaction recreates the list.
788+
st.deleted = false
789+
st.metaExists = false
790+
st.meta = store.ListMeta{}
791+
st.appends = nil
792+
}
650793

651794
for _, v := range cmd.Args[2:] {
652795
st.appends = append(st.appends, bytes.Clone(v))
@@ -761,6 +904,24 @@ func (t *txnContext) buildKeyElems() []*kv.Elem[kv.OP] {
761904
return elems
762905
}
763906

907+
func listDeleteMeta(st *listTxnState) (store.ListMeta, bool) {
908+
switch {
909+
case st.metaExists:
910+
return st.meta, true
911+
case st.purge:
912+
return st.purgeMeta, true
913+
default:
914+
return store.ListMeta{}, false
915+
}
916+
}
917+
918+
func appendListDeleteOps(elems []*kv.Elem[kv.OP], userKey []byte, meta store.ListMeta) []*kv.Elem[kv.OP] {
919+
for seq := meta.Head; seq < meta.Tail; seq++ {
920+
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listItemKey(userKey, seq)})
921+
}
922+
return append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listMetaKey(userKey)})
923+
}
924+
764925
func (t *txnContext) buildListElems() ([]*kv.Elem[kv.OP], error) {
765926
listKeys := make([]string, 0, len(t.listStates))
766927
for k := range t.listStates {
@@ -774,16 +935,17 @@ func (t *txnContext) buildListElems() ([]*kv.Elem[kv.OP], error) {
774935
userKey := []byte(k)
775936

776937
if st.deleted {
777-
// delete all persisted list items
778-
for seq := st.meta.Head; seq < st.meta.Tail; seq++ {
779-
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listItemKey(userKey, seq)})
938+
if meta, ok := listDeleteMeta(st); ok {
939+
elems = appendListDeleteOps(elems, userKey, meta)
780940
}
781-
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listMetaKey(userKey)})
782941
continue
783942
}
784943
if len(st.appends) == 0 {
785944
continue
786945
}
946+
if st.purge {
947+
elems = appendListDeleteOps(elems, userKey, st.purgeMeta)
948+
}
787949

788950
startSeq := st.meta.Head + st.meta.Len
789951
for i, v := range st.appends {
@@ -1010,7 +1172,7 @@ func (r *RedisServer) listRPush(ctx context.Context, key []byte, values [][]byte
10101172
}
10111173

10121174
func (r *RedisServer) deleteList(ctx context.Context, key []byte) error {
1013-
meta, exists, err := r.loadListMeta(ctx, key)
1175+
_, exists, err := r.loadListMeta(ctx, key)
10141176
if err != nil {
10151177
return err
10161178
}
@@ -1021,8 +1183,16 @@ func (r *RedisServer) deleteList(ctx context.Context, key []byte) error {
10211183
start := listItemKey(key, math.MinInt64)
10221184
end := listItemKey(key, math.MaxInt64)
10231185

1024-
readTS := r.readTS()
1025-
kvs, err := r.store.ScanAt(ctx, start, end, math.MaxInt, readTS)
1186+
startTS := r.readTS()
1187+
if startTS == ^uint64(0) && r.coordinator != nil && r.coordinator.Clock() != nil {
1188+
startTS = r.coordinator.Clock().Next()
1189+
}
1190+
1191+
// Keep DEL atomic by deleting all persisted list entries and metadata in one
1192+
// transaction at a single snapshot timestamp. This can allocate large slices
1193+
// for very large lists; if the storage layer grows range-delete support, this
1194+
// path should move to a streaming/range tombstone strategy.
1195+
kvs, err := r.store.ScanAt(ctx, start, end, math.MaxInt, startTS)
10261196
if err != nil {
10271197
return errors.WithStack(err)
10281198
}
@@ -1034,10 +1204,7 @@ func (r *RedisServer) deleteList(ctx context.Context, key []byte) error {
10341204
// delete meta last
10351205
ops = append(ops, &kv.Elem[kv.OP]{Op: kv.Del, Key: listMetaKey(key)})
10361206

1037-
// ensure meta bounds consistent even if scan missed (in case of empty list)
1038-
_ = meta
1039-
1040-
group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: ops}
1207+
group := &kv.OperationGroup[kv.OP]{IsTxn: true, StartTS: startTS, Elems: ops}
10411208
_, err = r.coordinator.Dispatch(ctx, group)
10421209
return errors.WithStack(err)
10431210
}

0 commit comments

Comments
 (0)