Skip to content

Commit 426dc84

Browse files
Drain send queue on disconnect and ensure session cleanup (#630)
* Drain send queue on disconnect and ensure session cleanup Make send worker a field and drain the send queue during disconnect by calling sendQueue.CompleteAdding() and joining the worker thread, allowing queued packets (e.g. migration) to be delivered before close. Remove disconnecting checks in SendRaw/SendWorker so the worker can finish draining even when a disconnect is in progress. Move/ensure session.Disconnect() calls into finally blocks across several handlers (DungeonManager, ChannelHandler, QuitHandler, GameSession, CharacterManagementHandler) to guarantee cleanup. Also fix the send worker thread variable usage when starting the thread. * Make EventQueueTests timing robust on CI Replace a single Thread.Sleep(25) with a retry loop that sleeps 10ms and calls queue.InvokeAll up to 10 times until the scheduled action runs. This reduces flakiness by handling timing imprecision in CI environments.
1 parent 20fcd63 commit 426dc84

7 files changed

Lines changed: 27 additions & 13 deletions

File tree

Maple2.Server.Core/Network/Session.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public abstract class Session : IDisposable {
5252

5353
// Send queue for non-blocking sends
5454
private readonly BlockingCollection<(byte[] packet, int length)> sendQueue = new(new ConcurrentQueue<(byte[], int)>());
55+
private Thread sendWorkerThread = null!;
5556

5657
public long AccountId { get; protected set; }
5758
public long CharacterId { get; protected set; }
@@ -85,11 +86,11 @@ protected Session(TcpClient tcpClient) {
8586
recvCipher = new MapleCipher.Decryptor(VERSION, riv, BLOCK_IV);
8687

8788
// Start send worker thread
88-
var sendWorkerThread1 = new Thread(SendWorker) {
89+
sendWorkerThread = new Thread(SendWorker) {
8990
Name = $"SendWorker-{name}",
9091
IsBackground = true,
9192
};
92-
sendWorkerThread1.Start();
93+
sendWorkerThread.Start();
9394
}
9495

9596
~Session() => Dispose(false);
@@ -133,6 +134,14 @@ public void Disconnect([CallerMemberName] string caller = "", [CallerLineNumber]
133134

134135
if (Interlocked.Exchange(ref disconnecting, 1) == 1) return;
135136
Logger.Information("Disconnected {Session} at {Caller} in {FilePath} on line {LineNumber}", this, caller, filePath, line);
137+
138+
// Drain the send queue before disposing — ensures queued packets (e.g. migration)
139+
// are delivered to the client before the connection is closed.
140+
try { sendQueue.CompleteAdding(); } catch (ObjectDisposedException) { }
141+
try { sendWorkerThread.Join(STOP_TIMEOUT); } catch (Exception ex) {
142+
Logger.Debug(ex, "SendWorker drain join failed");
143+
}
144+
136145
Dispose();
137146
}
138147

@@ -320,7 +329,7 @@ private void SendInternal(byte[] packet, int length) {
320329
}
321330

322331
private void SendRaw(ByteWriter packet) {
323-
if (disposed || disconnecting == 1) return;
332+
if (disposed) return;
324333

325334
try {
326335
// Use async write with timeout to prevent indefinite blocking
@@ -359,12 +368,12 @@ private void SendRaw(ByteWriter packet) {
359368
private void SendWorker() {
360369
try {
361370
foreach ((byte[] packet, int length) in sendQueue.GetConsumingEnumerable()) {
362-
if (disposed || disconnecting == 1) break;
371+
if (disposed) break;
363372

364373
// Encrypt outside lock, then send with timeout
365374
PoolByteWriter encryptedPacket;
366375
lock (sendCipher) {
367-
if (disposed || disconnecting == 1) break;
376+
if (disposed) break;
368377
encryptedPacket = sendCipher.Encrypt(packet, 0, length);
369378
}
370379
try {

Maple2.Server.Game/Manager/DungeonManager.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ private void MigrateToDungeon() {
609609
} catch (RpcException ex) {
610610
session.Send(MigrationPacket.GameToGameError(MigrationError.s_move_err_default));
611611
session.Send(NoticePacket.Disconnect(new InterfaceText(ex.Message)));
612+
} finally {
612613
session.Disconnect();
613614
}
614615
}

Maple2.Server.Game/PacketHandlers/ChannelHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public override void Handle(GameSession session, IByteReader packet) {
3838
var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port);
3939
session.Send(MigrationPacket.GameToGame(endpoint, response.Token, session.Field?.MapId ?? 0));
4040
session.State = SessionState.ChangeChannel;
41-
session.Disconnect();
4241
} catch (RpcException ex) {
4342
Logger.Error(ex, "Failed to migrate to channel {Channel}", channel);
4443

@@ -47,6 +46,8 @@ public override void Handle(GameSession session, IByteReader packet) {
4746
// Update the client with the latest channel list.
4847
ChannelsResponse response = World.Channels(new ChannelsRequest());
4948
session.Send(ChannelPacket.Load(response.Channels));
49+
} finally {
50+
session.Disconnect();
5051
}
5152
}
5253
}

Maple2.Server.Game/PacketHandlers/QuitHandler.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,11 @@ public override void Handle(GameSession session, IByteReader packet) {
4646
MigrateOutResponse response = World.MigrateOut(request);
4747
var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port);
4848
session.Send(MigrationPacket.GameToLogin(endpoint, response.Token));
49-
// Do NOT disconnect here — let the client close the TCP connection after
50-
// receiving the migration packet. Calling Disconnect() immediately would
51-
// set disconnecting=1, causing SendWorker to drop the queued packet.
52-
// The natural TCP close will trigger the full Dispose chain (leave field,
53-
// update PlayerInfo, save state, etc.).
5449
} catch (RpcException ex) {
5550
Logger.Error(ex, "MigrateOut failed for account={AccountId} char={CharacterId}",
5651
session.AccountId, session.CharacterId);
5752
session.Send(MigrationPacket.GameToLoginError(s_move_err_default));
53+
} finally {
5854
session.Disconnect();
5955
}
6056
}

Maple2.Server.Game/Session/GameSession.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,7 @@ public void Migrate(int mapId, long ownerId = 0) {
702702
} catch (RpcException ex) {
703703
Send(MigrationPacket.GameToGameError(MigrationError.s_move_err_default));
704704
Send(NoticePacket.Disconnect(new InterfaceText(ex.Message)));
705+
} finally {
705706
Disconnect();
706707
}
707708
}

Maple2.Server.Login/PacketHandlers/CharacterManagementHandler.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ private void HandleSelect(LoginSession session, IByteReader packet) {
9696
MigrateOutResponse response = World.MigrateOut(request);
9797
var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port);
9898
session.Send(MigrationPacket.LoginToGame(endpoint, response.Token, character.MapId));
99+
session.Disconnect();
99100
} catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable) {
100101
session.Send(MigrationPacket.LoginToGameError(s_move_err_no_server, ex.Message));
101102
} catch (RpcException ex) when (ex.StatusCode == StatusCode.ResourceExhausted) {

Maple2.Server.Tests/Tools/EventQueueTests.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,13 @@ public void ScheduleRepeated_SkipFirst_SkipsInitialExecution() {
6262
queue.ScheduleRepeated(() => count++, TimeSpan.FromMilliseconds(20), skipFirst: true);
6363
queue.InvokeAll();
6464
Assert.That(count, Is.EqualTo(0));
65-
Thread.Sleep(25);
66-
queue.InvokeAll();
65+
66+
// Wait with retry to handle timing imprecision on CI environments
67+
int maxRetries = 10;
68+
for (int i = 0; i < maxRetries && count == 0; i++) {
69+
Thread.Sleep(10);
70+
queue.InvokeAll();
71+
}
6772
Assert.That(count, Is.EqualTo(1));
6873
}
6974
}

0 commit comments

Comments
 (0)