Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
dc280eb
Refactor transaction timestamp management
bootjp Feb 15, 2026
fda8142
Fix concurrency issues with MULTI/EXEC commands
bootjp Feb 15, 2026
35f7783
Refactor transaction handling and error checks
bootjp Feb 15, 2026
b7e219a
kv/adapter: address review feedback
bootjp Feb 15, 2026
daa1385
kv/store: address latest PR review comments
bootjp Feb 16, 2026
bf234fd
kv/fsm: harden txn key dedupe and lock validation
bootjp Feb 16, 2026
e2a20b0
kv: reduce scan lock lookup overhead
bootjp Feb 16, 2026
ff9b016
kv: batch scan lock resolution by transaction
bootjp Feb 16, 2026
e25b944
kv/adapter: harden txn ttl and timestamp overflow
bootjp Feb 16, 2026
e480042
kv/adapter: handle latest PR review issues
bootjp Feb 16, 2026
965985a
kv/adapter: keep RawGet compatibility across versions
bootjp Feb 16, 2026
11c13ca
kv: harden lock resolution and txn commit retries
bootjp Feb 16, 2026
b2bb811
test: add txn primary-key and timestamp stamping coverage
bootjp Feb 16, 2026
d44348e
Initial plan
Copilot Feb 16, 2026
8eb0f7a
Fix timestamp overflow in lock cleanup operations
Copilot Feb 16, 2026
9b2ef8b
Use commitTS parameter in abortTSFrom calls where available
Copilot Feb 16, 2026
65ede5c
Add comment clarifying tryAbortExpiredPrimary timestamp handling
Copilot Feb 16, 2026
5099174
Pass 0 for commitTS in tryAbortExpiredPrimary for clarity
Copilot Feb 16, 2026
3ff3e3b
Fix gci linter error - remove extra blank lines
Copilot Feb 16, 2026
a3477b6
Add defensive programming comments for overflow checks
Copilot Feb 16, 2026
32dc5ef
Merge pull request #314 from bootjp/copilot/sub-pr-313
bootjp Feb 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 104 additions & 17 deletions adapter/internal.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package adapter

import (
"bytes"
"context"

"github.com/bootjp/elastickv/kv"
Expand Down Expand Up @@ -56,30 +57,116 @@ func (i *Internal) stampTimestamps(req *pb.ForwardRequest) {
return
}
if req.IsTxn {
var startTs uint64
// All requests in a transaction must have the same timestamp.
// Find a timestamp from the requests, or generate a new one if none exist.
for _, r := range req.Requests {
if r.Ts != 0 {
startTs = r.Ts
break
}
i.stampTxnTimestamps(req.Requests)
return
}

i.stampRawTimestamps(req.Requests)
}

func (i *Internal) stampRawTimestamps(reqs []*pb.Request) {
for _, r := range reqs {
if r == nil {
continue
}
if r.Ts != 0 {
continue
}
if i.clock == nil {
r.Ts = 1
continue
}
r.Ts = i.clock.Next()
}
}

if startTs == 0 && len(req.Requests) > 0 {
startTs = i.clock.Next()
func (i *Internal) stampTxnTimestamps(reqs []*pb.Request) {
startTS := forwardedTxnStartTS(reqs)
if startTS == 0 {
if i.clock == nil {
startTS = 1
} else {
startTS = i.clock.Next()
}
}

// Assign the unified timestamp to all requests in the transaction.
for _, r := range req.Requests {
r.Ts = startTs
// Assign the unified timestamp to all requests in the transaction.
for _, r := range reqs {
if r != nil {
r.Ts = startTS
}
return
}

for _, r := range req.Requests {
if r.Ts == 0 {
r.Ts = i.clock.Next()
i.fillForwardedTxnCommitTS(reqs, startTS)
}

func forwardedTxnStartTS(reqs []*pb.Request) uint64 {
for _, r := range reqs {
if r != nil && r.Ts != 0 {
return r.Ts
}
}
return 0
}

func forwardedTxnMetaMutation(r *pb.Request, metaPrefix []byte) (*pb.Mutation, bool) {
if r == nil {
return nil, false
}
if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT {
return nil, false
}
if len(r.Mutations) == 0 || r.Mutations[0] == nil {
return nil, false
}
if !bytes.HasPrefix(r.Mutations[0].Key, metaPrefix) {
return nil, false
}
return r.Mutations[0], true
}

func (i *Internal) fillForwardedTxnCommitTS(reqs []*pb.Request, startTS uint64) {
type metaToUpdate struct {
m *pb.Mutation
meta kv.TxnMeta
}

metaMutations := make([]metaToUpdate, 0, len(reqs))
prefix := []byte(kv.TxnMetaPrefix)
for _, r := range reqs {
m, ok := forwardedTxnMetaMutation(r, prefix)
if !ok {
continue
}
meta, err := kv.DecodeTxnMeta(m.Value)
if err != nil {
continue
}
if meta.CommitTS != 0 {
continue
}
metaMutations = append(metaMutations, metaToUpdate{m: m, meta: meta})
}
if len(metaMutations) == 0 {
return
}

commitTS := startTS + 1
if commitTS == 0 {
// Overflow: can't choose a commit timestamp strictly greater than startTS.
return
}
if i.clock != nil {
i.clock.Observe(startTS)
commitTS = i.clock.Next()
Comment thread
bootjp marked this conversation as resolved.
}
if commitTS <= startTS {
// Defensive: avoid writing an invalid CommitTS.
return
}
Comment on lines +163 to +175
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commitTS = 0 forwarding scenario at line 154, if startTS = ^uint64(0), then commitTS = startTS + 1 overflows to 0. The code returns early at line 157, silently leaving CommitTS = 0 in the metadata. This causes the FSM to reject the commit request with ErrTxnCommitTSRequired, but the caller (Forward) doesn't distinguish this case and may return success=false without a clear error. Consider logging a warning or returning an explicit error when overflow is detected to aid debugging.

Copilot uses AI. Check for mistakes.

for _, item := range metaMutations {
item.meta.CommitTS = commitTS
item.m.Value = kv.EncodeTxnMeta(item.meta)
}
}
27 changes: 24 additions & 3 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ func (r *RedisServer) Run() error {

name := strings.ToUpper(string(cmd.Args[0]))
if state.inTxn && name != cmdExec && name != cmdDiscard && name != cmdMulti {
state.queue = append(state.queue, cmd)
// redcon reuses the underlying argument buffers; copy queued commands
// so MULTI/EXEC works reliably under concurrency and with -race.
state.queue = append(state.queue, cloneCommand(cmd))
conn.WriteString("QUEUED")
return
}
Expand All @@ -170,6 +172,17 @@ func (r *RedisServer) Run() error {
return errors.WithStack(err)
}

func cloneCommand(cmd redcon.Command) redcon.Command {
out := redcon.Command{
Raw: bytes.Clone(cmd.Raw),
Args: make([][]byte, len(cmd.Args)),
}
for i := range cmd.Args {
out.Args[i] = bytes.Clone(cmd.Args[i])
}
return out
}

func (r *RedisServer) Stop() {
_ = r.listen.Close()
}
Expand Down Expand Up @@ -233,8 +246,14 @@ func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
return
}

key := cmd.Args[1]
readTS := r.readTS()
v, err := r.readValueAt(cmd.Args[1], readTS)
// When proxying reads to the leader, let the leader choose a safe snapshot.
// Our local store watermark may lag behind a just-committed transaction.
if !r.coordinator.IsLeaderForKey(key) {
readTS = 0
}
v, err := r.readValueAt(key, readTS)
if err != nil {
switch {
case errors.Is(err, store.ErrKeyNotFound):
Expand Down Expand Up @@ -1156,7 +1175,9 @@ func (r *RedisServer) tryLeaderGetAt(key []byte, ts uint64) ([]byte, error) {
if err != nil {
return nil, errors.WithStack(err)
}

if resp.Value == nil {
return nil, errors.WithStack(store.ErrKeyNotFound)
}
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RawGet/tryLeaderGetAt currently interprets a nil resp.Value as "key not found". In proto3, an explicitly-set empty byte slice and an unset bytes field are indistinguishable on the wire, so this can make it impossible to reliably store/read empty values across gRPC (an empty value may round-trip as nil and be treated as not found). If empty values should be supported, consider adding an explicit "exists" (or "found") flag to RawGetResponse / GetResponse or returning a NotFound gRPC status instead of encoding non-existence as a nil bytes field.

Suggested change
if resp.Value == nil {
return nil, errors.WithStack(store.ErrKeyNotFound)
}

Copilot uses AI. Check for mistakes.
return resp.Value, nil
}

Expand Down
2 changes: 1 addition & 1 deletion adapter/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestRedis_follower_redirect_node_set_get_deleted(t *testing.T) {
assert.Equal(t, int64(1), res3.Val())

res4 := rdb.Get(ctx, string(key))
assert.NoError(t, res4.Err())
assert.Equal(t, redis.Nil, res4.Err())
assert.Equal(t, "", res4.Val())
}

Expand Down
66 changes: 58 additions & 8 deletions kv/coordinator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kv

import (
"bytes"
"context"

pb "github.com/bootjp/elastickv/proto"
Expand Down Expand Up @@ -101,7 +102,18 @@ func (c *Coordinate) nextStartTS() uint64 {
}

func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64) (*CoordinateResponse, error) {
logs := txnRequests(startTS, reqs)
primary := primaryKeyForElems(reqs)
if len(primary) == 0 {
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
}

commitTS := c.clock.Next()
if commitTS <= startTS {
c.clock.Observe(startTS)
commitTS = c.clock.Next()
}
Comment on lines +110 to +114
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to determine the commitTS is mostly correct, ensuring it's greater than startTS. However, there's a rare edge case where c.clock.Next() after c.clock.Observe(startTS) might still not produce a timestamp greater than startTS (e.g., if the logical component of the HLC overflows).

The sharded coordinator (kv/sharded_coordinator.go) includes a final defensive check for this. For consistency and improved robustness, I suggest adding a similar check here.

Suggested change
commitTS := c.clock.Next()
if commitTS <= startTS {
c.clock.Observe(startTS)
commitTS = c.clock.Next()
}
commitTS := c.clock.Next()
if commitTS <= startTS {
c.clock.Observe(startTS)
commitTS = c.clock.Next()
}
if commitTS <= startTS {
return nil, errors.WithStack(ErrTxnCommitTSRequired)
}
References
  1. In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view. The proposed check for commitTS > startTS is crucial for maintaining the integrity and correct ordering of timestamps within an MVCC transaction, ensuring atomicity and a consistent snapshot view.


logs := txnRequests(startTS, commitTS, defaultTxnLockTTLms, primary, reqs)

r, err := c.transactionManager.Commit(logs)
if err != nil {
Expand Down Expand Up @@ -185,7 +197,11 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C

var requests []*pb.Request
if reqs.IsTxn {
requests = txnRequests(reqs.StartTS, reqs.Elems)
primary := primaryKeyForElems(reqs.Elems)
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the follower redirect path, a txn with all-empty keys will generate a TxnMeta with an empty PrimaryKey and forward it to the leader. The leader will later fail with ErrTxnPrimaryKeyRequired, but the leader path returns that error earlier. To keep behavior consistent (and avoid forwarding obviously-invalid txns), validate that primaryKeyForElems(reqs.Elems) is non-empty here and return ErrTxnPrimaryKeyRequired/ErrInvalidRequest before calling txnRequests.

Suggested change
primary := primaryKeyForElems(reqs.Elems)
primary := primaryKeyForElems(reqs.Elems)
if len(primary) == 0 {
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
}

Copilot uses AI. Check for mistakes.
if len(primary) == 0 {
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
}
requests = txnRequests(reqs.StartTS, 0, defaultTxnLockTTLms, primary, reqs.Elems)
} else {
for _, req := range reqs.Elems {
requests = append(requests, c.toRawRequest(req))
Expand Down Expand Up @@ -237,17 +253,51 @@ func elemToMutation(req *Elem[OP]) *pb.Mutation {
panic("unreachable")
}

func txnRequests(startTS uint64, reqs []*Elem[OP]) []*pb.Request {
muts := make([]*pb.Mutation, 0, len(reqs))
func txnRequests(startTS, commitTS, lockTTLms uint64, primaryKey []byte, reqs []*Elem[OP]) []*pb.Request {
meta := &pb.Mutation{
Op: pb.Op_PUT,
Key: []byte(txnMetaPrefix),
Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: lockTTLms, CommitTS: 0}),
}

prepareMuts := make([]*pb.Mutation, 0, len(reqs)+1)
prepareMuts = append(prepareMuts, meta)
for _, req := range reqs {
muts = append(muts, elemToMutation(req))
prepareMuts = append(prepareMuts, elemToMutation(req))
}

commitMeta := &pb.Mutation{
Op: pb.Op_PUT,
Key: []byte(txnMetaPrefix),
Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: 0, CommitTS: commitTS}),
}
commitMuts := make([]*pb.Mutation, 0, len(reqs)+1)
commitMuts = append(commitMuts, commitMeta)
for _, req := range reqs {
commitMuts = append(commitMuts, &pb.Mutation{Op: pb.Op_PUT, Key: req.Key})
}

// Use separate slices for PREPARE and COMMIT to avoid sharing slice header/state.
prepareMuts := append([]*pb.Mutation(nil), muts...)
commitMuts := append([]*pb.Mutation(nil), muts...)
return []*pb.Request{
{IsTxn: true, Phase: pb.Phase_PREPARE, Ts: startTS, Mutations: prepareMuts},
{IsTxn: true, Phase: pb.Phase_COMMIT, Ts: startTS, Mutations: commitMuts},
}
}

func primaryKeyForElems(reqs []*Elem[OP]) []byte {
var primary []byte
seen := make(map[string]struct{}, len(reqs))
for _, e := range reqs {
if e == nil || len(e.Key) == 0 {
continue
}
k := string(e.Key)
if _, ok := seen[k]; ok {
continue
}
seen[k] = struct{}{}
if primary == nil || bytes.Compare(e.Key, primary) < 0 {
primary = e.Key
}
}
return primary
}
Comment thread
bootjp marked this conversation as resolved.
Loading
Loading