Skip to content

Commit 6c674af

Browse files
committed
INTERNAL: Do not cancel operations when node is removed from ZK but still alive.
1 parent f24a6c9 commit 6c674af

11 files changed

Lines changed: 133 additions & 5 deletions

src/main/java/net/spy/memcached/ArcusKetamaNodeLocator.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public final class ArcusKetamaNodeLocator extends SpyObject implements NodeLocat
4242

4343
private final TreeMap<Long, SortedSet<MemcachedNode>> ketamaNodes;
4444
private final Collection<MemcachedNode> allNodes;
45+
private final Collection<MemcachedNode> delayedClosingNodes = new HashSet<>();
4546

4647
/* ENABLE_MIGRATION if */
4748
private TreeMap<Long, SortedSet<MemcachedNode>> ketamaAlterNodes;
@@ -238,6 +239,10 @@ public void update(Collection<MemcachedNode> toAttach,
238239
for (MemcachedNode node : toDelete) {
239240
allNodes.remove(node);
240241
removeHash(node);
242+
if (node.hasOp() && node.isActive()) {
243+
delayedClosingNodes.add(node);
244+
continue;
245+
}
241246
try {
242247
node.closeChannel();
243248
} catch (IOException e) {
@@ -256,6 +261,14 @@ public void update(Collection<MemcachedNode> toAttach,
256261
}
257262
}
258263

264+
public Collection<MemcachedNode> getDelayedClosingNodes() {
265+
return Collections.unmodifiableCollection(delayedClosingNodes);
266+
}
267+
268+
public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
269+
delayedClosingNodes.removeAll(closedNodes);
270+
}
271+
259272
private Long getKetamaHashPoint(byte[] digest, int h) {
260273
return ((long) (digest[3 + h * 4] & 0xFF) << 24)
261274
| ((long) (digest[2 + h * 4] & 0xFF) << 16)
@@ -452,6 +465,10 @@ public void updateAlter(Collection<MemcachedNode> toAttach,
452465
for (MemcachedNode node : toDelete) {
453466
alterNodes.remove(node);
454467
removeHashOfAlter(node);
468+
if (node.hasOp() && node.isActive()) {
469+
delayedClosingNodes.add(node);
470+
continue;
471+
}
455472
try {
456473
node.closeChannel();
457474
} catch (IOException e) {

src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public final class ArcusReplKetamaNodeLocator extends SpyObject implements NodeL
4444
private final TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaGroups;
4545
private final ConcurrentHashMap<String, MemcachedReplicaGroup> allGroups;
4646
private final Collection<MemcachedNode> allNodes;
47+
private final Collection<MemcachedNode> delayedClosingNodes = new HashSet<>();
4748

4849
/* ENABLE_MIGRATION if */
4950
private TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaAlterGroups;
@@ -263,6 +264,10 @@ public void update(Collection<MemcachedNode> toAttach,
263264
for (MemcachedNode node : toDelete) {
264265
allNodes.remove(node);
265266
removeNodeFromGroup(node);
267+
if (node.hasOp() && node.isActive()) {
268+
delayedClosingNodes.add(node);
269+
continue;
270+
}
266271
try {
267272
node.closeChannel();
268273
} catch (IOException e) {
@@ -299,6 +304,14 @@ public void update(Collection<MemcachedNode> toAttach,
299304
}
300305
}
301306

307+
public Collection<MemcachedNode> getDelayedClosingNodes() {
308+
return Collections.unmodifiableCollection(delayedClosingNodes);
309+
}
310+
311+
public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
312+
delayedClosingNodes.removeAll(closedNodes);
313+
}
314+
302315
public void switchoverReplGroup(MemcachedReplicaGroup group) {
303316
lock.lock();
304317
group.changeRole();
@@ -590,6 +603,10 @@ public void updateAlter(Collection<MemcachedNode> toAttach,
590603
removeHashOfAlter(mrg);
591604
}
592605
}
606+
if (node.hasOp() && node.isActive()) {
607+
delayedClosingNodes.add(node);
608+
continue;
609+
}
593610
try {
594611
node.closeChannel();
595612
} catch (IOException e) {

src/main/java/net/spy/memcached/ArrayModNodeLocator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.Arrays;
2222
import java.util.Collection;
23+
import java.util.HashSet;
2324
import java.util.Iterator;
2425
import java.util.List;
2526

@@ -76,6 +77,14 @@ public void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode>
7677
throw new UnsupportedOperationException("update not supported");
7778
}
7879

80+
public Collection<MemcachedNode> getDelayedClosingNodes() {
81+
return new HashSet<MemcachedNode>();
82+
}
83+
84+
public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
85+
// do NOT throw UnsupportedOperationException here for test codes.
86+
}
87+
7988
/* ENABLE_MIGRATION if */
8089
public Collection<MemcachedNode> getAlterAll() {
8190
return new ArrayList<>();

src/main/java/net/spy/memcached/KetamaNodeLocator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.Collection;
2222
import java.util.Collections;
23+
import java.util.HashSet;
2324
import java.util.Iterator;
2425
import java.util.List;
2526
import java.util.Map;
@@ -144,6 +145,14 @@ public void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode>
144145
throw new UnsupportedOperationException("update not supported");
145146
}
146147

148+
public Collection<MemcachedNode> getDelayedClosingNodes() {
149+
return new HashSet<MemcachedNode>();
150+
}
151+
152+
public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
153+
// do NOT throw UnsupportedOperationException here for test codes.
154+
}
155+
147156
public SortedMap<Long, MemcachedNode> getKetamaNodes() {
148157
return Collections.unmodifiableSortedMap(ketamaNodes);
149158
}

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,9 @@ public void handleIO() throws IOException {
297297
}
298298
}
299299

300+
// Deal with the memcached nodes that removed from ZK but has operation in queue.
301+
handleDelayedClosingNodes();
302+
300303
// Deal with the memcached server group that's been added by CacheManager.
301304
handleCacheNodesChange();
302305

@@ -323,12 +326,18 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
323326
}
324327
/* ENABLE_MIGRATION end */
325328

329+
if (node.isActive()) {
330+
// if a memcached node is removed from ZK but can still serve operations, do NOT cancel it.
331+
// operations that remain in operation queue will be processed until connection is lost.
332+
// once all remaining operations are processed, client will close connection.
333+
// if connection is lost before remaining operations are processed,
334+
// all of them will be canceled after connection is lost.
335+
continue;
336+
}
337+
326338
// removing node is not related to failure mode.
327339
// so, cancel operations regardless of failure mode.
328-
String cause = "node removed.";
329-
cancelOperations(node.destroyReadQueue(false), cause);
330-
cancelOperations(node.destroyWriteQueue(false), cause);
331-
cancelOperations(node.destroyInputQueue(), cause);
340+
cancelAllOperations(node, "node removed.");
332341
}
333342
}
334343

@@ -706,6 +715,38 @@ public void complete() {
706715
getLogger().debug("Added %s to writeQ of %s", op, node);
707716
}
708717

718+
// Handle the memcached nodes that removed from ZK but has operation in queue.
719+
void handleDelayedClosingNodes() {
720+
Collection<MemcachedNode> closingNodes = locator.getDelayedClosingNodes();
721+
if (closingNodes.isEmpty()) {
722+
return;
723+
}
724+
725+
Collection<MemcachedNode> closedNodes = new HashSet<>();
726+
for (MemcachedNode node : closingNodes) {
727+
boolean isConnected = node.isConnected();
728+
boolean hasOp = node.hasOp();
729+
730+
if (isConnected && !hasOp) {
731+
try {
732+
node.closeChannel();
733+
} catch (IOException e) {
734+
getLogger().error("Failed to closeChannel the node : " + node);
735+
}
736+
} else if (!isConnected && hasOp) {
737+
cancelAllOperations(node, "connection lost after node removed.");
738+
} else {
739+
continue;
740+
}
741+
742+
closedNodes.add(node);
743+
}
744+
745+
if (!closedNodes.isEmpty()) {
746+
locator.updateDelayedClosingNodes(closedNodes);
747+
}
748+
}
749+
709750
// Handle the memcached server group that's been added by CacheManager.
710751
void handleCacheNodesChange() throws IOException {
711752
/* ENABLE_MIGRATION if */
@@ -1279,6 +1320,12 @@ private void cancelOperations(Collection<Operation> ops, String cause) {
12791320
}
12801321
}
12811322

1323+
private void cancelAllOperations(MemcachedNode node, String cause) {
1324+
cancelOperations(node.destroyReadQueue(false), cause);
1325+
cancelOperations(node.destroyWriteQueue(false), cause);
1326+
cancelOperations(node.destroyInputQueue(), cause);
1327+
}
1328+
12821329
private void redistributeOperations(Collection<Operation> ops, String cause) {
12831330
for (Operation op : ops) {
12841331
if (op instanceof KeyedOperation) {

src/main/java/net/spy/memcached/MemcachedNode.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ public interface MemcachedNode {
105105
*/
106106
boolean hasWriteOp();
107107

108+
/**
109+
* True if any operation is in operation queue.
110+
*/
111+
boolean hasOp();
112+
108113
/**
109114
* Add an operation to the queue. Authentication operations should
110115
* never be added to the queue, but this is not checked.

src/main/java/net/spy/memcached/MemcachedNodeROImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ public boolean isConnected() {
130130
return root.isConnected();
131131
}
132132

133+
public boolean hasOp() {
134+
return root.hasOp();
135+
}
136+
133137
public boolean isActive() {
134138
return root.isActive();
135139
}

src/main/java/net/spy/memcached/NodeLocator.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,18 @@ public interface NodeLocator {
5858
*/
5959
void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode> toDelete);
6060

61+
/**
62+
* Get all memcached nodes that removed from ZK but has operation in queue.
63+
* Note that this feature is only available in ArcusKetamaNodeLocator.
64+
*/
65+
Collection<MemcachedNode> getDelayedClosingNodes();
66+
67+
/**
68+
* Update all memcached nodes that removed from ZK but has operation in queue.
69+
* Note that this feature is only available in ArcusKetamaNodeLocator.
70+
*/
71+
void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes);
72+
6173
/* ENABLE_MIGRATION if */
6274
/**
6375
* Get all alter memcached nodes.

src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,10 @@ public final boolean hasWriteOp() {
292292
return !(optimizedOp == null && writeQ.isEmpty());
293293
}
294294

295+
public final boolean hasOp() {
296+
return hasReadOp() || hasWriteOp() || !inputQueue.isEmpty();
297+
}
298+
295299
public final void addOpToInputQ(Operation op) {
296300
op.setHandlingNode(this);
297301
op.initialize();

src/test/java/net/spy/memcached/MemcachedNodeROImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ void testReadOnliness() throws Exception {
5656
Set<String> acceptable = new HashSet<>(Arrays.asList(
5757
"toString", "getSocketAddress", "getBytesRemainingToWrite",
5858
"getReconnectCount", "getSelectionOps", "getNodeName", "hasReadOp",
59-
"hasWriteOp", "isActive", "isConnected"));
59+
"hasWriteOp", "hasOp", "isActive", "isConnected"));
6060

6161
for (Method meth : MemcachedNode.class.getMethods()) {
6262
if (acceptable.contains(meth.getName())) {

0 commit comments

Comments
 (0)