@@ -304,6 +304,9 @@ public void handleIO() throws IOException {
304304 }
305305 /* ENABLE_REPLICATION end */
306306
307+ // Deal with the memcached nodes that removed from ZK but has operation in queue.
308+ handleDelayedClosingNodes ();
309+
307310 // Deal with the memcached server group that's been added by CacheManager.
308311 handleCacheNodesChange ();
309312
@@ -330,12 +333,14 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
330333 }
331334 /* ENABLE_MIGRATION end */
332335
336+ if (node .isActive ()) {
337+ // if memcached node is removed from ZK but still can serve operations, do NOT cancel it.
338+ continue ;
339+ }
340+
333341 // removing node is not related to failure mode.
334342 // so, cancel operations regardless of failure mode.
335- String cause = "node removed." ;
336- cancelOperations (node .destroyReadQueue (false ), cause );
337- cancelOperations (node .destroyWriteQueue (false ), cause );
338- cancelOperations (node .destroyInputQueue (), cause );
343+ cancelAllOperations (node , "node removed." );
339344 }
340345 }
341346
@@ -680,6 +685,38 @@ public void complete() {
680685 addOperation (node , op );
681686 }
682687
688+ // Handle the memcached nodes that removed from ZK but has operation in queue.
689+ void handleDelayedClosingNodes () {
690+ Collection <MemcachedNode > closingNodes = locator .getDelayedClosingNodes ();
691+ if (closingNodes .isEmpty ()) {
692+ return ;
693+ }
694+
695+ Collection <MemcachedNode > closedNodes = new HashSet <MemcachedNode >();
696+ for (MemcachedNode node : closingNodes ) {
697+ boolean isActive = node .isActive ();
698+ boolean hasOp = node .hasOp ();
699+
700+ if (isActive && !hasOp ) {
701+ try {
702+ node .closeChannel ();
703+ } catch (IOException e ) {
704+ getLogger ().error ("Failed to closeChannel the node : " + node );
705+ }
706+ } else if (!isActive && hasOp ) {
707+ cancelAllOperations (node , "connection lost after node removed." );
708+ } else {
709+ continue ;
710+ }
711+
712+ closedNodes .add (node );
713+ }
714+
715+ if (!closedNodes .isEmpty ()) {
716+ locator .updateDelayedClosingNodes (closedNodes );
717+ }
718+ }
719+
683720 // Handle the memcached server group that's been added by CacheManager.
684721 void handleCacheNodesChange () throws IOException {
685722 /* ENABLE_MIGRATION if */
@@ -1225,6 +1262,12 @@ private void cancelOperations(Collection<Operation> ops, String cause) {
12251262 }
12261263 }
12271264
1265+ private void cancelAllOperations (MemcachedNode node , String cause ) {
1266+ cancelOperations (node .destroyReadQueue (false ), cause );
1267+ cancelOperations (node .destroyWriteQueue (false ), cause );
1268+ cancelOperations (node .destroyInputQueue (), cause );
1269+ }
1270+
12281271 private void redistributeOperations (Collection <Operation > ops , String cause ) {
12291272 for (Operation op : ops ) {
12301273 if (op instanceof KeyedOperation ) {
0 commit comments