Skip to content

Commit 68353fc

Browse files
committed
fix: address PR review on list delete and lsm scan visibility
1 parent 3defc84 commit 68353fc

4 files changed

Lines changed: 148 additions & 28 deletions

File tree

adapter/redis.go

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ const (
3939
const (
4040
redisLatestCommitTimeout = 5 * time.Second
4141
redisDispatchTimeout = 10 * time.Second
42-
listDeleteBatchSize = 1024
4342
maxByteValue = 0xFF
4443
)
4544

@@ -752,6 +751,13 @@ func (t *txnContext) applyRPush(cmd redcon.Command) (redisResult, error) {
752751
if err != nil {
753752
return redisResult{}, err
754753
}
754+
if st.deleted {
755+
// DEL followed by RPUSH in the same transaction recreates the list.
756+
st.deleted = false
757+
st.metaExists = false
758+
st.meta = store.ListMeta{}
759+
st.appends = nil
760+
}
755761

756762
for _, v := range cmd.Args[2:] {
757763
st.appends = append(st.appends, bytes.Clone(v))
@@ -1126,36 +1132,24 @@ func (r *RedisServer) deleteList(ctx context.Context, key []byte) error {
11261132
start := listItemKey(key, math.MinInt64)
11271133
end := listItemKey(key, math.MaxInt64)
11281134

1129-
for {
1130-
readTS := r.readTS()
1131-
kvs, scanErr := r.store.ScanAt(ctx, start, end, listDeleteBatchSize, readTS)
1132-
if scanErr != nil {
1133-
return errors.WithStack(scanErr)
1134-
}
1135-
if len(kvs) == 0 {
1136-
break
1137-
}
1138-
1139-
ops := make([]*kv.Elem[kv.OP], 0, len(kvs))
1140-
for _, kvp := range kvs {
1141-
ops = append(ops, &kv.Elem[kv.OP]{Op: kv.Del, Key: kvp.Key})
1142-
}
1135+
startTS := r.readTS()
1136+
if startTS == ^uint64(0) && r.coordinator != nil && r.coordinator.Clock() != nil {
1137+
startTS = r.coordinator.Clock().Next()
1138+
}
11431139

1144-
group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: ops}
1145-
if _, dispatchErr := r.coordinator.Dispatch(ctx, group); dispatchErr != nil {
1146-
return errors.WithStack(dispatchErr)
1147-
}
1148-
if len(kvs) < listDeleteBatchSize {
1149-
break
1150-
}
1140+
kvs, err := r.store.ScanAt(ctx, start, end, math.MaxInt, startTS)
1141+
if err != nil {
1142+
return errors.WithStack(err)
11511143
}
11521144

1153-
group := &kv.OperationGroup[kv.OP]{
1154-
IsTxn: true,
1155-
Elems: []*kv.Elem[kv.OP]{
1156-
{Op: kv.Del, Key: listMetaKey(key)},
1157-
},
1145+
ops := make([]*kv.Elem[kv.OP], 0, len(kvs)+1)
1146+
for _, kvp := range kvs {
1147+
ops = append(ops, &kv.Elem[kv.OP]{Op: kv.Del, Key: kvp.Key})
11581148
}
1149+
// delete meta last
1150+
ops = append(ops, &kv.Elem[kv.OP]{Op: kv.Del, Key: listMetaKey(key)})
1151+
1152+
group := &kv.OperationGroup[kv.OP]{IsTxn: true, StartTS: startTS, Elems: ops}
11591153
_, err = r.coordinator.Dispatch(ctx, group)
11601154
return errors.WithStack(err)
11611155
}

adapter/redis_multi_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package adapter
22

33
import (
44
"context"
5+
"math"
6+
"strconv"
57
"testing"
68

9+
"github.com/bootjp/elastickv/store"
710
"github.com/redis/go-redis/v9"
11+
"github.com/stretchr/testify/assert"
812
"github.com/stretchr/testify/require"
913
)
1014

@@ -90,3 +94,103 @@ func TestRedis_DiscardClearsTxn(t *testing.T) {
9094
require.NoError(t, err)
9195
require.Equal(t, "before", got)
9296
}
97+
98+
func TestRedis_DelList_RemovesLargeListAndInternalKeys(t *testing.T) {
99+
t.Parallel()
100+
nodes, _, _ := createNode(t, 3)
101+
defer shutdown(nodes)
102+
103+
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
104+
ctx := context.Background()
105+
106+
args := make([]interface{}, 0, 1302)
107+
args = append(args, "RPUSH", "list-big-del")
108+
for i := 0; i < 1300; i++ {
109+
args = append(args, "v"+strconv.Itoa(i))
110+
}
111+
_, err := rdb.Do(ctx, args...).Result()
112+
require.NoError(t, err)
113+
114+
delCount, err := rdb.Del(ctx, "list-big-del").Result()
115+
require.NoError(t, err)
116+
require.Equal(t, int64(1), delCount)
117+
118+
rangeRes, err := rdb.Do(ctx, "LRANGE", "list-big-del", 0, -1).Result()
119+
require.NoError(t, err)
120+
require.Equal(t, []interface{}{}, rangeRes)
121+
122+
readTS := nodes[0].redisServer.readTS()
123+
_, err = nodes[0].redisServer.store.GetAt(ctx, store.ListMetaKey([]byte("list-big-del")), readTS)
124+
require.ErrorIs(t, err, store.ErrKeyNotFound)
125+
126+
kvs, err := nodes[0].redisServer.store.ScanAt(
127+
ctx,
128+
store.ListItemKey([]byte("list-big-del"), math.MinInt64),
129+
store.ListItemKey([]byte("list-big-del"), math.MaxInt64),
130+
1,
131+
readTS,
132+
)
133+
require.NoError(t, err)
134+
assert.Len(t, kvs, 0)
135+
}
136+
137+
func TestRedis_DelList_EmptyAfterDeleteHasNoResidualInternalKeys(t *testing.T) {
138+
t.Parallel()
139+
nodes, _, _ := createNode(t, 3)
140+
defer shutdown(nodes)
141+
142+
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
143+
ctx := context.Background()
144+
145+
_, err := rdb.Do(ctx, "RPUSH", "list-empty-del", "a").Result()
146+
require.NoError(t, err)
147+
148+
_, err = rdb.Del(ctx, "list-empty-del").Result()
149+
require.NoError(t, err)
150+
151+
// Second DEL should be a no-op for list internals.
152+
_, err = rdb.Del(ctx, "list-empty-del").Result()
153+
require.NoError(t, err)
154+
155+
readTS := nodes[0].redisServer.readTS()
156+
_, err = nodes[0].redisServer.store.GetAt(ctx, store.ListMetaKey([]byte("list-empty-del")), readTS)
157+
require.ErrorIs(t, err, store.ErrKeyNotFound)
158+
159+
kvs, err := nodes[0].redisServer.store.ScanAt(
160+
ctx,
161+
store.ListItemKey([]byte("list-empty-del"), math.MinInt64),
162+
store.ListItemKey([]byte("list-empty-del"), math.MaxInt64),
163+
1,
164+
readTS,
165+
)
166+
require.NoError(t, err)
167+
assert.Len(t, kvs, 0)
168+
}
169+
170+
func TestRedis_MultiExec_DelThenRPushRecreatesList(t *testing.T) {
171+
t.Parallel()
172+
nodes, _, _ := createNode(t, 3)
173+
defer shutdown(nodes)
174+
175+
rdb := redis.NewClient(&redis.Options{Addr: nodes[1].redisAddress})
176+
ctx := context.Background()
177+
178+
_, err := rdb.Do(ctx, "RPUSH", "list-del-rpush", "old1", "old2").Result()
179+
require.NoError(t, err)
180+
181+
require.Equal(t, "OK", rdb.Do(ctx, "MULTI").Val())
182+
require.Equal(t, "QUEUED", rdb.Do(ctx, "DEL", "list-del-rpush").Val())
183+
require.Equal(t, "QUEUED", rdb.Do(ctx, "RPUSH", "list-del-rpush", "new1", "new2").Val())
184+
185+
execRes, err := rdb.Do(ctx, "EXEC").Result()
186+
require.NoError(t, err)
187+
vals, ok := execRes.([]interface{})
188+
require.True(t, ok)
189+
require.Len(t, vals, 2)
190+
require.Equal(t, int64(1), vals[0])
191+
require.Equal(t, int64(2), vals[1])
192+
193+
rangeRes, err := rdb.Do(ctx, "LRANGE", "list-del-rpush", 0, -1).Result()
194+
require.NoError(t, err)
195+
require.Equal(t, []interface{}{"new1", "new2"}, rangeRes)
196+
}

store/lsm_store.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ func decodeIterKey(iter *pebble.Iterator) ([]byte, uint64, bool) {
293293

294294
func (s *pebbleStore) scanCurrentUserKey(iter *pebble.Iterator, userKey []byte, ts uint64) (*KVPair, error) {
295295
var out *KVPair
296+
evaluatedVisible := false
296297

297298
for iter.Valid() {
298299
curKey, version, ok := decodeIterKey(iter)
@@ -306,12 +307,15 @@ func (s *pebbleStore) scanCurrentUserKey(iter *pebble.Iterator, userKey []byte,
306307
return out, nil
307308
}
308309

309-
if out == nil && version <= ts {
310+
// Only the newest version visible at ts should determine scan visibility.
311+
// If that version is tombstoned/expired, older versions must stay masked.
312+
if !evaluatedVisible && version <= ts {
310313
kv, err := s.processFoundValue(iter, userKey, ts)
311314
if err != nil {
312315
return nil, err
313316
}
314317
out = kv
318+
evaluatedVisible = true
315319
}
316320

317321
if !iter.Next() {

store/lsm_store_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,24 @@ func TestPebbleStore_Scan(t *testing.T) {
106106
assert.Len(t, pairs, 0)
107107
}
108108

109+
func TestPebbleStore_Scan_TombstoneMasksOlderVersions(t *testing.T) {
110+
dir, err := os.MkdirTemp("", "pebble-scan-tombstone-test")
111+
require.NoError(t, err)
112+
defer os.RemoveAll(dir)
113+
114+
s, err := NewPebbleStore(dir)
115+
require.NoError(t, err)
116+
defer s.Close()
117+
118+
ctx := context.Background()
119+
require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v1"), 10, 0))
120+
require.NoError(t, s.DeleteAt(ctx, []byte("k1"), 20))
121+
122+
pairs, err := s.ScanAt(ctx, []byte("k"), nil, 10, 25)
123+
require.NoError(t, err)
124+
assert.Len(t, pairs, 0)
125+
}
126+
109127
func TestPebbleStore_SnapshotRestore(t *testing.T) {
110128
dir, err := os.MkdirTemp("", "pebble-snap-test")
111129
require.NoError(t, err)

0 commit comments

Comments
 (0)