Skip to content

Commit 4bfd057

Browse files
clean up
1 parent 5bb8f80 commit 4bfd057

5 files changed

Lines changed: 13 additions & 92 deletions

File tree

packages/socket.io-adapter/lib/cluster-adapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ export abstract class ClusterAdapter extends Adapter {
482482
opts: BroadcastOptions,
483483
clientCountCallback: (clientCount: number) => void,
484484
ack: (...args: any[]) => void,
485-
): { cleanup: () => void } {
485+
) {
486486
const onlyLocal = opts?.flags?.local;
487487
if (!onlyLocal) {
488488
const requestId = randomId();

packages/socket.io-adapter/lib/in-memory-adapter.ts

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ export class Adapter extends EventEmitter {
199199
opts: BroadcastOptions,
200200
clientCountCallback: (clientCount: number) => void,
201201
ack: (...args: any[]) => void,
202-
): { cleanup: () => void } {
202+
) {
203203
const flags = opts.flags || {};
204204
const packetOpts = {
205205
preEncoded: true,
@@ -214,15 +214,12 @@ export class Adapter extends EventEmitter {
214214
const encodedPackets = this._encode(packet, packetOpts);
215215

216216
let clientCount = 0;
217-
const sentToSockets: any[] = [];
218217

219218
this.apply(opts, (socket) => {
220219
// track the total number of acknowledgements that are expected
221220
clientCount++;
222221
// call the ack callback for each client response
223222
socket.acks.set(packet.id, ack);
224-
// track sockets for cleanup on timeout
225-
sentToSockets.push(socket);
226223

227224
if (typeof socket.notifyOutgoingListeners === "function") {
228225
socket.notifyOutgoingListeners(packet);
@@ -233,17 +230,11 @@ export class Adapter extends EventEmitter {
233230

234231
clientCountCallback(clientCount);
235232

236-
const packetId = packet.id;
237233
return {
238234
cleanup: () => {
239-
// remove pending acks from all sockets on timeout
240-
for (const socket of sentToSockets) {
241-
if (socket.acks) {
242-
socket.acks.delete(packetId);
243-
}
244-
}
245-
// clear array to release socket references
246-
sentToSockets.length = 0;
235+
this.apply(opts, (socket) => {
236+
socket.acks.delete(packet.id);
237+
});
247238
},
248239
};
249240
}

packages/socket.io/lib/broadcast-operator.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -232,14 +232,11 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
232232
const ack = data.pop() as (...args: any[]) => void;
233233
let timedOut = false;
234234
let responses: any[] = [];
235-
let cleanup: (() => void) | undefined;
235+
let cleanupPendingAcks: (() => void) | undefined;
236236

237237
const timer = setTimeout(() => {
238238
timedOut = true;
239-
// cleanup pending acks to prevent memory leak
240-
if (cleanup) {
241-
cleanup();
242-
}
239+
cleanupPendingAcks?.();
243240
ack.apply(this, [
244241
new Error("operation has timed out"),
245242
this.flags.expectSingleResponse ? null : responses,
@@ -284,9 +281,8 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
284281
},
285282
);
286283

287-
// store cleanup function for timeout handler
288284
if (result && typeof result.cleanup === "function") {
289-
cleanup = result.cleanup;
285+
cleanupPendingAcks = result.cleanup;
290286
}
291287

292288
this.adapter.serverCount().then((serverCount) => {

packages/socket.io/test/messaging-many.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,11 @@ describe("messaging many", () => {
534534
// @ts-ignore
535535
expect(err.responses).to.contain(1, 2);
536536

537+
for (const [, serverSocket] of io.of("/").sockets) {
538+
// @ts-ignore accessing private acks map to verify cleanup
539+
expect(serverSocket.acks.size).to.be(0);
540+
}
541+
537542
success(done, io, socket1, socket2, socket3);
538543
}
539544
});

packages/socket.io/test/socket-timeout.ts

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -84,75 +84,4 @@ describe("timeout", () => {
8484
success(done, io, client);
8585
});
8686
});
87-
88-
it("should cleanup pending acks on broadcast timeout (memory leak fix)", (done) => {
89-
const io = new Server(0);
90-
const client = createClient(io, "/");
91-
92-
// Client does not acknowledge the event (simulates timeout scenario)
93-
client.on("test-event", () => {
94-
// intentionally not calling the callback
95-
});
96-
97-
io.on("connection", async (socket) => {
98-
socket.join("test-room");
99-
100-
// Get initial acks count (cast to any to access private property in test)
101-
const initialAcksSize = (socket as any).acks.size;
102-
103-
try {
104-
await io.timeout(50).to("test-room").emitWithAck("test-event", "data");
105-
expect().fail("should have timed out");
106-
} catch (err) {
107-
expect(err).to.be.an(Error);
108-
109-
// After timeout, acks should be cleaned up (no memory leak)
110-
// Wait a bit for cleanup to complete
111-
setTimeout(() => {
112-
expect((socket as any).acks.size).to.be(initialAcksSize);
113-
success(done, io, client);
114-
}, 10);
115-
}
116-
});
117-
});
118-
119-
it("should cleanup pending acks on broadcast timeout with multiple clients", (done) => {
120-
const io = new Server(0);
121-
const client1 = createClient(io, "/");
122-
const client2 = createClient(io, "/");
123-
124-
let connectedSockets: any[] = [];
125-
126-
// Clients do not acknowledge
127-
client1.on("test-event", () => {});
128-
client2.on("test-event", () => {});
129-
130-
io.on("connection", (socket) => {
131-
socket.join("test-room");
132-
connectedSockets.push(socket);
133-
134-
if (connectedSockets.length === 2) {
135-
runTest();
136-
}
137-
});
138-
139-
async function runTest() {
140-
const initialAcksSizes = connectedSockets.map((s) => s.acks.size);
141-
142-
try {
143-
await io.timeout(50).to("test-room").emitWithAck("test-event", "data");
144-
expect().fail("should have timed out");
145-
} catch (err) {
146-
expect(err).to.be.an(Error);
147-
148-
setTimeout(() => {
149-
// All sockets should have their acks cleaned up
150-
connectedSockets.forEach((socket, i) => {
151-
expect(socket.acks.size).to.be(initialAcksSizes[i]);
152-
});
153-
success(done, io, client1, client2);
154-
}, 10);
155-
}
156-
}
157-
});
15887
});

0 commit comments

Comments
 (0)