Skip to content

Commit e79d372

Browse files
authored
IGNITE-26959: Fix thin client continuous query cleanup on disconnect (#12861)
1 parent 81dda22 commit e79d372

4 files changed

Lines changed: 50 additions & 11 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
9999
private ClientRequestHandler handler;
100100

101101
/** Handle registry. */
102-
private final ClientResourceRegistry resReg = new ClientResourceRegistry();
102+
private final ClientResourceRegistry resReg;
103103

104104
/** Max cursors. */
105105
private final int maxCursors;
@@ -151,6 +151,7 @@ public ClientConnectionContext(
151151
this.maxCursors = maxCursors;
152152
maxActiveTxCnt = thinCfg.getMaxActiveTxPerConnection();
153153
maxActiveComputeTasks = thinCfg.getMaxActiveComputeTasksPerConnection();
154+
resReg = new ClientResourceRegistry(ctx.log(ClientResourceRegistry.class));
154155
}
155156

156157
/**
@@ -267,7 +268,7 @@ public void incrementCursors() {
267268
}
268269

269270
/**
270-
* Increments the cursor count.
271+
* Decrement the cursor count.
271272
*/
272273
public void decrementCursors() {
273274
curCnt.decrementAndGet();

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientResourceRegistry.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Map;
2121
import java.util.concurrent.ConcurrentHashMap;
2222
import java.util.concurrent.atomic.AtomicLong;
23+
import org.apache.ignite.IgniteLogger;
2324

2425
/**
2526
* Per-connection resource registry.
@@ -31,6 +32,17 @@ public class ClientResourceRegistry {
3132
/** ID generator. */
3233
private final AtomicLong idGen = new AtomicLong();
3334

35+
/** Logger. */
36+
private final IgniteLogger log;
37+
38+
/**
39+
* Logger for cleanup errors logging.
40+
* @param log Logger.
41+
*/
42+
public ClientResourceRegistry(IgniteLogger log) {
43+
this.log = log;
44+
}
45+
3446
/**
3547
* Allocates server handle for an object.
3648
*
@@ -85,8 +97,23 @@ public void release(long hnd) {
8597
* Cleans all handles and closes all ClientCloseableResources.
8698
*/
8799
public void clean() {
88-
for (Map.Entry e : res.entrySet())
89-
closeIfNeeded(e.getValue());
100+
for (Map.Entry<Long, Object> e : res.entrySet()) {
101+
Long id = e.getKey();
102+
Object obj = e.getValue();
103+
104+
try {
105+
closeIfNeeded(obj);
106+
}
107+
catch (Exception ex) {
108+
if (log != null && log.isDebugEnabled())
109+
log.debug("Failed to close client resource on disconnect [id=" + id +
110+
", res=" + obj +
111+
", err=" + ex.getClass().getSimpleName() + ": " + ex.getMessage() + ']');
112+
}
113+
finally {
114+
res.remove(id, obj);
115+
}
116+
}
90117
}
91118

92119
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheQueryContinuousHandle.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,12 @@ public void startNotifications(long id) {
8686
@Override public void close() {
8787
if (closeGuard.compareAndSet(false, true)) {
8888
assert cur != null;
89-
cur.close();
90-
91-
ctx.decrementCursors();
89+
try {
90+
cur.close();
91+
}
92+
finally {
93+
ctx.decrementCursors();
94+
}
9295
}
9396
}
9497
}

modules/platforms/cpp/thin-client/src/impl/notification_handler.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,11 @@ namespace ignite
240240
{
241241
disconnected = true;
242242

243+
// Clear the queue as we won't be able to process these notifications after disconnect.
244+
queue.clear();
245+
243246
if (handler.IsValid())
244-
return common::SP_ThreadPoolTask(new DisconnectedTask(handler));
247+
handler.Get()->OnDisconnected();
245248

246249
return common::SP_ThreadPoolTask();
247250
}
@@ -258,13 +261,18 @@ namespace ignite
258261
"Internal error: handler is already set for the notification");
259262

260263
handler = handler0;
264+
265+
// If we are already disconnected, then there is no point in processing notifications.
266+
if (disconnected) {
267+
queue.clear();
268+
handler.Get()->OnDisconnected();
269+
return;
270+
}
271+
261272
for (MessageQueue::iterator it = queue.begin(); it != queue.end(); ++it)
262273
handler.Get()->OnNotification(*it);
263274

264275
queue.clear();
265-
266-
if (disconnected)
267-
handler.Get()->OnDisconnected();
268276
}
269277

270278
private:

0 commit comments

Comments
 (0)