-
Notifications
You must be signed in to change notification settings - Fork 2
Refactor transaction timestamp management for cross shard data #313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
dc280eb
fda8142
35f7783
b7e219a
daa1385
bf234fd
e2a20b0
ff9b016
e25b944
e480042
965985a
11c13ca
b2bb811
d44348e
8eb0f7a
9b2ef8b
65ede5c
5099174
3ff3e3b
a3477b6
32dc5ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| package adapter | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
|
|
||
| "github.com/bootjp/elastickv/kv" | ||
|
|
@@ -56,30 +57,116 @@ func (i *Internal) stampTimestamps(req *pb.ForwardRequest) { | |
| return | ||
| } | ||
| if req.IsTxn { | ||
| var startTs uint64 | ||
| // All requests in a transaction must have the same timestamp. | ||
| // Find a timestamp from the requests, or generate a new one if none exist. | ||
| for _, r := range req.Requests { | ||
| if r.Ts != 0 { | ||
| startTs = r.Ts | ||
| break | ||
| } | ||
| i.stampTxnTimestamps(req.Requests) | ||
| return | ||
| } | ||
|
|
||
| i.stampRawTimestamps(req.Requests) | ||
| } | ||
|
|
||
| func (i *Internal) stampRawTimestamps(reqs []*pb.Request) { | ||
| for _, r := range reqs { | ||
| if r == nil { | ||
| continue | ||
| } | ||
| if r.Ts != 0 { | ||
| continue | ||
| } | ||
| if i.clock == nil { | ||
| r.Ts = 1 | ||
| continue | ||
| } | ||
| r.Ts = i.clock.Next() | ||
| } | ||
| } | ||
|
|
||
| if startTs == 0 && len(req.Requests) > 0 { | ||
| startTs = i.clock.Next() | ||
| func (i *Internal) stampTxnTimestamps(reqs []*pb.Request) { | ||
| startTS := forwardedTxnStartTS(reqs) | ||
| if startTS == 0 { | ||
| if i.clock == nil { | ||
| startTS = 1 | ||
| } else { | ||
| startTS = i.clock.Next() | ||
| } | ||
| } | ||
|
|
||
| // Assign the unified timestamp to all requests in the transaction. | ||
| for _, r := range req.Requests { | ||
| r.Ts = startTs | ||
| // Assign the unified timestamp to all requests in the transaction. | ||
| for _, r := range reqs { | ||
| if r != nil { | ||
| r.Ts = startTS | ||
| } | ||
| return | ||
| } | ||
|
|
||
| for _, r := range req.Requests { | ||
| if r.Ts == 0 { | ||
| r.Ts = i.clock.Next() | ||
| i.fillForwardedTxnCommitTS(reqs, startTS) | ||
| } | ||
|
|
||
| func forwardedTxnStartTS(reqs []*pb.Request) uint64 { | ||
| for _, r := range reqs { | ||
| if r != nil && r.Ts != 0 { | ||
| return r.Ts | ||
| } | ||
| } | ||
| return 0 | ||
| } | ||
|
|
||
| func forwardedTxnMetaMutation(r *pb.Request, metaPrefix []byte) (*pb.Mutation, bool) { | ||
| if r == nil { | ||
| return nil, false | ||
| } | ||
| if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT { | ||
| return nil, false | ||
| } | ||
| if len(r.Mutations) == 0 || r.Mutations[0] == nil { | ||
| return nil, false | ||
| } | ||
| if !bytes.HasPrefix(r.Mutations[0].Key, metaPrefix) { | ||
| return nil, false | ||
| } | ||
| return r.Mutations[0], true | ||
| } | ||
|
|
||
| func (i *Internal) fillForwardedTxnCommitTS(reqs []*pb.Request, startTS uint64) { | ||
| type metaToUpdate struct { | ||
| m *pb.Mutation | ||
| meta kv.TxnMeta | ||
| } | ||
|
|
||
| metaMutations := make([]metaToUpdate, 0, len(reqs)) | ||
| prefix := []byte(kv.TxnMetaPrefix) | ||
| for _, r := range reqs { | ||
| m, ok := forwardedTxnMetaMutation(r, prefix) | ||
| if !ok { | ||
| continue | ||
| } | ||
| meta, err := kv.DecodeTxnMeta(m.Value) | ||
| if err != nil { | ||
| continue | ||
| } | ||
| if meta.CommitTS != 0 { | ||
| continue | ||
| } | ||
| metaMutations = append(metaMutations, metaToUpdate{m: m, meta: meta}) | ||
| } | ||
| if len(metaMutations) == 0 { | ||
| return | ||
| } | ||
|
|
||
| commitTS := startTS + 1 | ||
| if commitTS == 0 { | ||
| // Overflow: can't choose a commit timestamp strictly greater than startTS. | ||
| return | ||
| } | ||
| if i.clock != nil { | ||
| i.clock.Observe(startTS) | ||
| commitTS = i.clock.Next() | ||
| } | ||
| if commitTS <= startTS { | ||
| // Defensive: avoid writing an invalid CommitTS. | ||
| return | ||
| } | ||
|
Comment on lines
+163
to
+175
|
||
|
|
||
| for _, item := range metaMutations { | ||
| item.meta.CommitTS = commitTS | ||
| item.m.Value = kv.EncodeTxnMeta(item.meta) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -150,7 +150,9 @@ func (r *RedisServer) Run() error { | |||||||
|
|
||||||||
| name := strings.ToUpper(string(cmd.Args[0])) | ||||||||
| if state.inTxn && name != cmdExec && name != cmdDiscard && name != cmdMulti { | ||||||||
| state.queue = append(state.queue, cmd) | ||||||||
| // redcon reuses the underlying argument buffers; copy queued commands | ||||||||
| // so MULTI/EXEC works reliably under concurrency and with -race. | ||||||||
| state.queue = append(state.queue, cloneCommand(cmd)) | ||||||||
| conn.WriteString("QUEUED") | ||||||||
| return | ||||||||
| } | ||||||||
|
|
@@ -170,6 +172,17 @@ func (r *RedisServer) Run() error { | |||||||
| return errors.WithStack(err) | ||||||||
| } | ||||||||
|
|
||||||||
| func cloneCommand(cmd redcon.Command) redcon.Command { | ||||||||
| out := redcon.Command{ | ||||||||
| Raw: bytes.Clone(cmd.Raw), | ||||||||
| Args: make([][]byte, len(cmd.Args)), | ||||||||
| } | ||||||||
| for i := range cmd.Args { | ||||||||
| out.Args[i] = bytes.Clone(cmd.Args[i]) | ||||||||
| } | ||||||||
| return out | ||||||||
| } | ||||||||
|
|
||||||||
| func (r *RedisServer) Stop() { | ||||||||
| _ = r.listen.Close() | ||||||||
| } | ||||||||
|
|
@@ -233,8 +246,14 @@ func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) { | |||||||
| return | ||||||||
| } | ||||||||
|
|
||||||||
| key := cmd.Args[1] | ||||||||
| readTS := r.readTS() | ||||||||
| v, err := r.readValueAt(cmd.Args[1], readTS) | ||||||||
| // When proxying reads to the leader, let the leader choose a safe snapshot. | ||||||||
| // Our local store watermark may lag behind a just-committed transaction. | ||||||||
| if !r.coordinator.IsLeaderForKey(key) { | ||||||||
| readTS = 0 | ||||||||
| } | ||||||||
| v, err := r.readValueAt(key, readTS) | ||||||||
| if err != nil { | ||||||||
| switch { | ||||||||
| case errors.Is(err, store.ErrKeyNotFound): | ||||||||
|
|
@@ -1156,7 +1175,9 @@ func (r *RedisServer) tryLeaderGetAt(key []byte, ts uint64) ([]byte, error) { | |||||||
| if err != nil { | ||||||||
| return nil, errors.WithStack(err) | ||||||||
| } | ||||||||
|
|
||||||||
| if resp.Value == nil { | ||||||||
| return nil, errors.WithStack(store.ErrKeyNotFound) | ||||||||
| } | ||||||||
|
||||||||
| if resp.Value == nil { | |
| return nil, errors.WithStack(store.ErrKeyNotFound) | |
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,7 @@ | ||||||||||||||||||||||||||||
| package kv | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||
| "bytes" | ||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| pb "github.com/bootjp/elastickv/proto" | ||||||||||||||||||||||||||||
|
|
@@ -101,7 +102,18 @@ func (c *Coordinate) nextStartTS() uint64 { | |||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64) (*CoordinateResponse, error) { | ||||||||||||||||||||||||||||
| logs := txnRequests(startTS, reqs) | ||||||||||||||||||||||||||||
| primary := primaryKeyForElems(reqs) | ||||||||||||||||||||||||||||
| if len(primary) == 0 { | ||||||||||||||||||||||||||||
| return nil, errors.WithStack(ErrTxnPrimaryKeyRequired) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| commitTS := c.clock.Next() | ||||||||||||||||||||||||||||
| if commitTS <= startTS { | ||||||||||||||||||||||||||||
| c.clock.Observe(startTS) | ||||||||||||||||||||||||||||
| commitTS = c.clock.Next() | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
Comment on lines
+110
to
+114
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic to determine the The sharded coordinator (
Suggested change
References
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| logs := txnRequests(startTS, commitTS, defaultTxnLockTTLms, primary, reqs) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| r, err := c.transactionManager.Commit(logs) | ||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||
|
|
@@ -185,7 +197,11 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C | |||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| var requests []*pb.Request | ||||||||||||||||||||||||||||
| if reqs.IsTxn { | ||||||||||||||||||||||||||||
| requests = txnRequests(reqs.StartTS, reqs.Elems) | ||||||||||||||||||||||||||||
| primary := primaryKeyForElems(reqs.Elems) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
| primary := primaryKeyForElems(reqs.Elems) | |
| primary := primaryKeyForElems(reqs.Elems) | |
| if len(primary) == 0 { | |
| return nil, errors.WithStack(ErrTxnPrimaryKeyRequired) | |
| } |
Uh oh!
There was an error while loading. Please reload this page.