Skip to content

Commit 0d98bc8

Browse files
committed
FEATURE: Add CompletableFuture Pipeline API
1 parent 011382e commit 0d98bc8

12 files changed

Lines changed: 1210 additions & 0 deletions

src/main/java/net/spy/memcached/OperationFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,4 +531,15 @@ BTreeInsertAndGetOperation bopInsertAndGet(String key,
531531
BTreeInsertAndGet<?> get, byte[] dataToInsert,
532532
OperationCallback cb);
533533

534+
/**
535+
* Create a pipeline operation for executing multiple commands in batch.
536+
*
537+
* @param ops the list of operations to execute
538+
* @param keys the unique keys related to the pipe operation
539+
* @param cb the status callback
540+
* @return a new pipeline operation
541+
*/
542+
Operation pipeline(List<KeyedOperation> ops, List<String> keys,
543+
OperationCallback cb);
544+
534545
}

src/main/java/net/spy/memcached/ops/APIType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public enum APIType {
6666
SETATTR(OperationType.WRITE), GETATTR(OperationType.READ),
6767

6868
// Other API
69+
PIPE(OperationType.WRITE),
6970
FLUSH(OperationType.WRITE),
7071
STATS(OperationType.ETC),
7172
VERSION(OperationType.ETC),

src/main/java/net/spy/memcached/ops/BaseOperationFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,16 @@ public Operation cloneMultiOperation(KeyedOperation op, MemcachedNode node,
151151
} else if (op instanceof BTreeSortMergeGetOperation) {
152152
final BTreeSMGet<?> smGet = ((BTreeSortMergeGetOperation) op).getSMGet();
153153
return bopsmget(smGet.clone(node, redirectKeys), (BTreeSortMergeGetOperation.Callback) mcb);
154+
} else if (op instanceof PipelineOperation) {
155+
List<KeyedOperation> originalOps = ((PipelineOperation) op).getOps();
156+
List<KeyedOperation> redirectOps = new ArrayList<>();
157+
for (KeyedOperation originalOp : originalOps) {
158+
// PipelineOperation's internal op has only one key.
159+
if (redirectKeys.contains(originalOp.getKeys().iterator().next())) {
160+
redirectOps.add(originalOp);
161+
}
162+
}
163+
return pipeline(redirectOps, redirectKeys, mcb);
154164
} else {
155165
assert false : "Unhandled operation type: " + op.getClass();
156166
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package net.spy.memcached.ops;
2+
3+
import java.util.List;
4+
5+
public interface PipelineOperation extends KeyedOperation {
6+
7+
List<KeyedOperation> getOps();
8+
9+
interface Callback extends OperationCallback {
10+
void gotStatus(Operation op, OperationStatus status);
11+
}
12+
}

src/main/java/net/spy/memcached/protocol/ascii/AsciiOperationFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package net.spy.memcached.protocol.ascii;
1818

1919
import java.util.Collection;
20+
import java.util.List;
2021

2122
import javax.security.sasl.SaslClient;
2223

@@ -66,9 +67,11 @@
6667
import net.spy.memcached.ops.GetAttrOperation;
6768
import net.spy.memcached.ops.GetOperation;
6869
import net.spy.memcached.ops.GetsOperation;
70+
import net.spy.memcached.ops.KeyedOperation;
6971
import net.spy.memcached.ops.Mutator;
7072
import net.spy.memcached.ops.MutatorOperation;
7173
import net.spy.memcached.ops.NoopOperation;
74+
import net.spy.memcached.ops.Operation;
7275
import net.spy.memcached.ops.OperationCallback;
7376
import net.spy.memcached.ops.SASLAuthOperation;
7477
import net.spy.memcached.ops.SASLMechsOperation;
@@ -299,4 +302,10 @@ public BTreeInsertAndGetOperation bopInsertAndGet(String key,
299302
return new BTreeInsertAndGetOperationImpl(key, get, dataToInsert, cb);
300303
}
301304

305+
@Override
306+
public Operation pipeline(List<KeyedOperation> ops, List<String> keys,
307+
OperationCallback cb) {
308+
return new PipelineOperationImpl(ops, keys, cb);
309+
}
310+
302311
}
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* arcus-java-client : Arcus Java client
3+
* Copyright 2010-2014 NAVER Corp.
4+
* Copyright 2014-present JaM2in Co., Ltd.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package net.spy.memcached.protocol.ascii;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.List;
22+
23+
import net.spy.memcached.ops.APIType;
24+
import net.spy.memcached.ops.KeyedOperation;
25+
import net.spy.memcached.ops.Operation;
26+
import net.spy.memcached.ops.OperationCallback;
27+
import net.spy.memcached.ops.OperationState;
28+
import net.spy.memcached.ops.OperationStatus;
29+
import net.spy.memcached.ops.OperationType;
30+
import net.spy.memcached.ops.PipelineOperation;
31+
import net.spy.memcached.ops.StatusCode;
32+
33+
/**
34+
* Operation for executing multiple commands as pipeline.
35+
*/
36+
public final class PipelineOperationImpl extends OperationImpl implements PipelineOperation {
37+
38+
// PIPE RESPONSES
39+
private static final OperationStatus END =
40+
new OperationStatus(true, "END", StatusCode.SUCCESS);
41+
private static final OperationStatus FAILED_END =
42+
new OperationStatus(false, "FAILED_END", StatusCode.ERR_FAILED_END);
43+
44+
// EACH COMMAND'S SUCCEED RESPONSES
45+
private static final OperationStatus CREATED_STORED =
46+
new OperationStatus(true, "CREATED_STORED", StatusCode.SUCCESS);
47+
private static final OperationStatus STORED =
48+
new OperationStatus(true, "STORED", StatusCode.SUCCESS);
49+
private static final OperationStatus REPLACED =
50+
new OperationStatus(true, "REPLACED", StatusCode.SUCCESS);
51+
private static final OperationStatus UPDATED =
52+
new OperationStatus(true, "UPDATED", StatusCode.SUCCESS);
53+
private static final OperationStatus EXIST =
54+
new OperationStatus(true, "EXIST", StatusCode.EXIST);
55+
private static final OperationStatus NOT_EXIST =
56+
new OperationStatus(true, "NOT_EXIST", StatusCode.NOT_EXIST);
57+
private static final OperationStatus DELETED =
58+
new OperationStatus(true, "DELETED", StatusCode.SUCCESS);
59+
private static final OperationStatus DELETED_DROPPED =
60+
new OperationStatus(true, "DELETED_DROPPED", StatusCode.SUCCESS);
61+
62+
// EACH COMMAND'S FAILED RESPONSES
63+
private static final OperationStatus NOT_FOUND =
64+
new OperationStatus(false, "NOT_FOUND", StatusCode.ERR_NOT_FOUND);
65+
private static final OperationStatus NOT_FOUND_ELEMENT =
66+
new OperationStatus(false, "NOT_FOUND_ELEMENT", StatusCode.ERR_NOT_FOUND_ELEMENT);
67+
private static final OperationStatus NOTHING_TO_UPDATE =
68+
new OperationStatus(false, "NOTHING_TO_UPDATE", StatusCode.ERR_NOTHING_TO_UPDATE);
69+
private static final OperationStatus ELEMENT_EXISTS =
70+
new OperationStatus(false, "ELEMENT_EXISTS", StatusCode.ERR_ELEMENT_EXISTS);
71+
private static final OperationStatus OVERFLOWED =
72+
new OperationStatus(false, "OVERFLOWED", StatusCode.ERR_OVERFLOWED);
73+
private static final OperationStatus OUT_OF_RANGE =
74+
new OperationStatus(false, "OUT_OF_RANGE", StatusCode.ERR_OUT_OF_RANGE);
75+
private static final OperationStatus TYPE_MISMATCH =
76+
new OperationStatus(false, "TYPE_MISMATCH", StatusCode.ERR_TYPE_MISMATCH);
77+
private static final OperationStatus BKEY_MISMATCH =
78+
new OperationStatus(false, "BKEY_MISMATCH", StatusCode.ERR_BKEY_MISMATCH);
79+
private static final OperationStatus EFLAG_MISMATCH =
80+
new OperationStatus(false, "EFLAG_MISMATCH", StatusCode.ERR_EFLAG_MISMATCH);
81+
private static final OperationStatus UNREADABLE =
82+
new OperationStatus(false, "UNREADABLE", StatusCode.ERR_UNREADABLE);
83+
84+
private final List<KeyedOperation> ops;
85+
private final List<String> keys;
86+
private final PipelineOperation.Callback cb;
87+
88+
private int responseIndex = 0;
89+
private boolean expectingResponse = false;
90+
private boolean successAll = true;
91+
92+
/**
93+
* @param ops each command's operation to be pipelined
94+
* @param keys keys involved in this pipeline operation without duplicate
95+
* @param cb callback for this pipeline operation
96+
*/
97+
public PipelineOperationImpl(List<KeyedOperation> ops, List<String> keys,
98+
OperationCallback cb) {
99+
super(cb);
100+
if (ops == null || ops.isEmpty()) {
101+
throw new IllegalArgumentException("Ops cannot be null or empty");
102+
}
103+
this.ops = ops;
104+
this.keys = keys;
105+
this.cb = (PipelineOperation.Callback) cb;
106+
setAPIType(APIType.PIPE);
107+
setOperationType(OperationType.WRITE);
108+
}
109+
110+
/**
111+
* Make a pipelined command buffer using each command's buffer.
112+
*/
113+
@Override
114+
public void initialize() {
115+
// 1) Initialize operations and collect each buffers
116+
// to handle switchover/redirect single key situations,
117+
// make buffer from responseIndex Operation
118+
int opCount = ops.size() - responseIndex;
119+
ByteBuffer[] buffers = new ByteBuffer[opCount];
120+
int bufferCount = 0;
121+
for (int i = responseIndex; i < ops.size(); i++) {
122+
Operation op = ops.get(i);
123+
op.initialize();
124+
ByteBuffer buffer = op.getBuffer();
125+
if (buffer != null && buffer.hasRemaining()) {
126+
buffers[bufferCount++] = buffer;
127+
}
128+
}
129+
130+
// 2) Remove "pipe" from the last command buffer
131+
if (bufferCount > 0) {
132+
buffers[bufferCount - 1] = removePipeFromLastBuffer(buffers[bufferCount - 1]);
133+
}
134+
135+
// 3) Create a concatenated pipedBuffer
136+
int totalSize = 0;
137+
for (int i = 0; i < bufferCount; i++) {
138+
totalSize += buffers[i].remaining();
139+
}
140+
141+
ByteBuffer pipedBuffer = ByteBuffer.allocate(totalSize);
142+
for (int i = 0; i < bufferCount; i++) {
143+
pipedBuffer.put(buffers[i]);
144+
}
145+
146+
pipedBuffer.flip();
147+
setBuffer(pipedBuffer);
148+
}
149+
150+
private static ByteBuffer removePipeFromLastBuffer(ByteBuffer buffer) {
151+
byte[] bufferBytes = new byte[buffer.remaining()];
152+
buffer.mark();
153+
buffer.get(bufferBytes);
154+
buffer.reset();
155+
156+
String command = new String(bufferBytes);
157+
String modifiedCommand = command.replaceAll("\\s+pipe\\r\\n", "\r\n");
158+
byte[] modifiedBytes = modifiedCommand.getBytes();
159+
ByteBuffer newBuffer = ByteBuffer.allocate(modifiedBytes.length);
160+
newBuffer.put(modifiedBytes);
161+
newBuffer.flip();
162+
return newBuffer;
163+
}
164+
165+
@Override
166+
public void handleLine(String line) {
167+
168+
/* ENABLE_REPLICATION if */
169+
if (hasSwitchedOver(line)) {
170+
prepareSwitchover(line);
171+
return;
172+
}
173+
/* ENABLE_REPLICATION end */
174+
175+
/* ENABLE_MIGRATION if */
176+
if (hasNotMyKey(line)) {
177+
String key = ops.get(responseIndex).getKeys().iterator().next();
178+
if (isBulkOperation()) {
179+
addRedirectMultiKeyOperation(line, key);
180+
responseIndex++;
181+
} else {
182+
// Only one NOT_MY_KEY is provided in response of
183+
// single key piped operation when redirection.
184+
addRedirectSingleKeyOperation(line, key);
185+
transitionState(OperationState.REDIRECT);
186+
}
187+
return;
188+
}
189+
/* ENABLE_MIGRATION end */
190+
191+
/*
192+
RESPONSE <count>\r\n
193+
<status of the 1st pipelined command>\r\n
194+
[ ... ]
195+
<status of the last pipelined command>\r\n
196+
END|PIPE_ERROR <error_string>\r\n
197+
*/
198+
if (line.startsWith("END")) {
199+
/* ENABLE_MIGRATION if */
200+
if (needRedirect()) {
201+
transitionState(OperationState.REDIRECT);
202+
return;
203+
}
204+
/* ENABLE_MIGRATION end */
205+
206+
OperationStatus status = successAll ? END : FAILED_END;
207+
complete(status);
208+
} else if (line.startsWith("PIPE_ERROR")) {
209+
String errorMessage = line.substring(11);
210+
OperationStatus status =
211+
new OperationStatus(false, errorMessage, StatusCode.ERR_INTERNAL);
212+
complete(status);
213+
} else if (line.startsWith("RESPONSE ")) {
214+
expectingResponse = true;
215+
responseIndex = 0;
216+
} else if (expectingResponse) {
217+
// Handle status line for each command
218+
OperationStatus status = parseStatusLine(line);
219+
if (!status.isSuccess()) {
220+
successAll = false;
221+
}
222+
223+
// Notify callback with current response index
224+
cb.gotStatus(ops.get(responseIndex), status);
225+
responseIndex++;
226+
} else {
227+
// Handle single command response (non-pipe case)
228+
// When only one command or last command without "pipe", server sends direct status
229+
OperationStatus status = parseStatusLine(line);
230+
if (!status.isSuccess()) {
231+
successAll = false;
232+
}
233+
234+
// Notify callback for single command
235+
cb.gotStatus(ops.get(0), status);
236+
237+
// Complete the operation immediately for single command
238+
complete(successAll ? END : FAILED_END);
239+
}
240+
}
241+
242+
private OperationStatus parseStatusLine(String line) {
243+
return matchStatus(line,
244+
END, FAILED_END, CREATED_STORED, STORED, REPLACED, UPDATED,
245+
EXIST, NOT_EXIST, DELETED, DELETED_DROPPED, NOT_FOUND, NOT_FOUND_ELEMENT,
246+
NOTHING_TO_UPDATE, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
247+
TYPE_MISMATCH, BKEY_MISMATCH, EFLAG_MISMATCH, UNREADABLE);
248+
}
249+
250+
@Override
251+
public boolean isPipeOperation() {
252+
return ops.size() > 1;
253+
}
254+
255+
@Override
256+
public boolean isBulkOperation() {
257+
return keys.size() > 1;
258+
}
259+
260+
@Override
261+
public List<String> getKeys() {
262+
return keys;
263+
}
264+
265+
@Override
266+
public List<KeyedOperation> getOps() {
267+
return ops;
268+
}
269+
}

src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package net.spy.memcached.protocol.binary;
1818

1919
import java.util.Collection;
20+
import java.util.List;
2021

2122
import javax.security.sasl.SaslClient;
2223

@@ -67,9 +68,11 @@
6768
import net.spy.memcached.ops.GetOperation;
6869
import net.spy.memcached.ops.GetOperation.Callback;
6970
import net.spy.memcached.ops.GetsOperation;
71+
import net.spy.memcached.ops.KeyedOperation;
7072
import net.spy.memcached.ops.Mutator;
7173
import net.spy.memcached.ops.MutatorOperation;
7274
import net.spy.memcached.ops.NoopOperation;
75+
import net.spy.memcached.ops.Operation;
7376
import net.spy.memcached.ops.OperationCallback;
7477
import net.spy.memcached.ops.SASLAuthOperation;
7578
import net.spy.memcached.ops.SASLMechsOperation;
@@ -329,4 +332,11 @@ public BTreeInsertAndGetOperation bopInsertAndGet(String key,
329332
"BTree insert and get operation is not supported in binary protocol yet.");
330333
}
331334

335+
@Override
336+
public Operation pipeline(List<KeyedOperation> ops, List<String> keys,
337+
OperationCallback cb) {
338+
throw new RuntimeException(
339+
"Pipeline operation is not supported in binary protocol yet.");
340+
}
341+
332342
}

0 commit comments

Comments
 (0)