11package adapter
22
33import (
4+ "bytes"
45 "context"
56
67 "github.com/bootjp/elastickv/kv"
@@ -29,13 +30,19 @@ var _ pb.InternalServer = (*Internal)(nil)
2930
3031var ErrNotLeader = errors .New ("not leader" )
3132var ErrLeaderNotFound = errors .New ("leader not found" )
33+ var ErrTxnTimestampOverflow = errors .New ("txn timestamp overflow" )
3234
3335func (i * Internal ) Forward (_ context.Context , req * pb.ForwardRequest ) (* pb.ForwardResponse , error ) {
3436 if i .raft .State () != raft .Leader {
3537 return nil , errors .WithStack (ErrNotLeader )
3638 }
3739
38- i .stampTimestamps (req )
40+ if err := i .stampTimestamps (req ); err != nil {
41+ return & pb.ForwardResponse {
42+ Success : false ,
43+ CommitIndex : 0 ,
44+ }, errors .WithStack (err )
45+ }
3946
4047 r , err := i .transactionManager .Commit (req .Requests )
4148 if err != nil {
@@ -51,35 +58,125 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa
5158 }, nil
5259}
5360
54- func (i * Internal ) stampTimestamps (req * pb.ForwardRequest ) {
61+ func (i * Internal ) stampTimestamps (req * pb.ForwardRequest ) error {
5562 if req == nil {
56- return
63+ return nil
5764 }
5865 if req .IsTxn {
59- var startTs uint64
60- // All requests in a transaction must have the same timestamp.
61- // Find a timestamp from the requests, or generate a new one if none exist.
62- for _ , r := range req .Requests {
63- if r .Ts != 0 {
64- startTs = r .Ts
65- break
66- }
66+ return i .stampTxnTimestamps (req .Requests )
67+ }
68+
69+ i .stampRawTimestamps (req .Requests )
70+ return nil
71+ }
72+
73+ func (i * Internal ) stampRawTimestamps (reqs []* pb.Request ) {
74+ for _ , r := range reqs {
75+ if r == nil {
76+ continue
77+ }
78+ if r .Ts != 0 {
79+ continue
6780 }
81+ if i .clock == nil {
82+ r .Ts = 1
83+ continue
84+ }
85+ r .Ts = i .clock .Next ()
86+ }
87+ }
6888
69- if startTs == 0 && len (req .Requests ) > 0 {
70- startTs = i .clock .Next ()
89+ func (i * Internal ) stampTxnTimestamps (reqs []* pb.Request ) error {
90+ startTS := forwardedTxnStartTS (reqs )
91+ if startTS == 0 {
92+ if i .clock == nil {
93+ startTS = 1
94+ } else {
95+ startTS = i .clock .Next ()
7196 }
97+ }
98+ if startTS == ^ uint64 (0 ) {
99+ return errors .WithStack (ErrTxnTimestampOverflow )
100+ }
101+
102+ // Assign the unified timestamp to all requests in the transaction.
103+ for _ , r := range reqs {
104+ if r != nil {
105+ r .Ts = startTS
106+ }
107+ }
108+
109+ return i .fillForwardedTxnCommitTS (reqs , startTS )
110+ }
72111
73- // Assign the unified timestamp to all requests in the transaction.
74- for _ , r := range req .Requests {
75- r .Ts = startTs
112+ func forwardedTxnStartTS (reqs []* pb.Request ) uint64 {
113+ for _ , r := range reqs {
114+ if r != nil && r .Ts != 0 {
115+ return r .Ts
76116 }
77- return
117+ }
118+ return 0
119+ }
120+
121+ func forwardedTxnMetaMutation (r * pb.Request , metaPrefix []byte ) (* pb.Mutation , bool ) {
122+ if r == nil {
123+ return nil , false
124+ }
125+ if r .Phase != pb .Phase_COMMIT && r .Phase != pb .Phase_ABORT {
126+ return nil , false
127+ }
128+ if len (r .Mutations ) == 0 || r .Mutations [0 ] == nil {
129+ return nil , false
130+ }
131+ if ! bytes .HasPrefix (r .Mutations [0 ].Key , metaPrefix ) {
132+ return nil , false
133+ }
134+ return r .Mutations [0 ], true
135+ }
136+
137+ func (i * Internal ) fillForwardedTxnCommitTS (reqs []* pb.Request , startTS uint64 ) error {
138+ type metaToUpdate struct {
139+ m * pb.Mutation
140+ meta kv.TxnMeta
78141 }
79142
80- for _ , r := range req .Requests {
81- if r .Ts == 0 {
82- r .Ts = i .clock .Next ()
143+ metaMutations := make ([]metaToUpdate , 0 , len (reqs ))
144+ prefix := []byte (kv .TxnMetaPrefix )
145+ for _ , r := range reqs {
146+ m , ok := forwardedTxnMetaMutation (r , prefix )
147+ if ! ok {
148+ continue
83149 }
150+ meta , err := kv .DecodeTxnMeta (m .Value )
151+ if err != nil {
152+ continue
153+ }
154+ if meta .CommitTS != 0 {
155+ continue
156+ }
157+ metaMutations = append (metaMutations , metaToUpdate {m : m , meta : meta })
158+ }
159+ if len (metaMutations ) == 0 {
160+ return nil
161+ }
162+
163+ commitTS := startTS + 1
164+ if commitTS == 0 {
165+ // Overflow: can't choose a commit timestamp strictly greater than startTS.
166+ return errors .WithStack (ErrTxnTimestampOverflow )
167+ }
168+ if i .clock != nil {
169+ i .clock .Observe (startTS )
170+ commitTS = i .clock .Next ()
171+ }
172+ if commitTS <= startTS {
173+ // Defensive: avoid writing an invalid CommitTS.
174+ return errors .WithStack (ErrTxnTimestampOverflow )
175+ }
176+
177+ for _ , item := range metaMutations {
178+ item .meta .CommitTS = commitTS
179+ item .m .Value = kv .EncodeTxnMeta (item .meta )
84180 }
181+ return nil
85182}
0 commit comments