From c0b30d1cb891aa108438a05a15563746f21c92eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=82ngelo=20Tadeucci?= Date: Sun, 8 Feb 2026 22:23:47 -0300 Subject: [PATCH 1/2] Drain send queue on disconnect and ensure session cleanup Make send worker a field and drain the send queue during disconnect by calling sendQueue.CompleteAdding() and joining the worker thread, allowing queued packets (e.g. migration) to be delivered before close. Remove disconnecting checks in SendRaw/SendWorker so the worker can finish draining even when a disconnect is in progress. Move/ensure session.Disconnect() calls into finally blocks across several handlers (DungeonManager, ChannelHandler, QuitHandler, GameSession, CharacterManagementHandler) to guarantee cleanup. Also fix the send worker thread variable usage when starting the thread. --- Maple2.Server.Core/Network/Session.cs | 19 ++++++++++++++----- Maple2.Server.Game/Manager/DungeonManager.cs | 1 + .../PacketHandlers/ChannelHandler.cs | 3 ++- .../PacketHandlers/QuitHandler.cs | 6 +----- Maple2.Server.Game/Session/GameSession.cs | 1 + .../CharacterManagementHandler.cs | 1 + 6 files changed, 20 insertions(+), 11 deletions(-) diff --git a/Maple2.Server.Core/Network/Session.cs b/Maple2.Server.Core/Network/Session.cs index 31fdac0a..554a4cc4 100644 --- a/Maple2.Server.Core/Network/Session.cs +++ b/Maple2.Server.Core/Network/Session.cs @@ -52,6 +52,7 @@ public abstract class Session : IDisposable { // Send queue for non-blocking sends private readonly BlockingCollection<(byte[] packet, int length)> sendQueue = new(new ConcurrentQueue<(byte[], int)>()); + private Thread sendWorkerThread = null!; public long AccountId { get; protected set; } public long CharacterId { get; protected set; } @@ -85,11 +86,11 @@ protected Session(TcpClient tcpClient) { recvCipher = new MapleCipher.Decryptor(VERSION, riv, BLOCK_IV); // Start send worker thread - var sendWorkerThread1 = new Thread(SendWorker) { + sendWorkerThread = new Thread(SendWorker) { Name = $"SendWorker-{name}", IsBackground = true, }; - sendWorkerThread1.Start(); + sendWorkerThread.Start(); } ~Session() => Dispose(false); @@ -133,6 +134,14 @@ public void Disconnect([CallerMemberName] string caller = "", [CallerLineNumber] if (Interlocked.Exchange(ref disconnecting, 1) == 1) return; Logger.Information("Disconnected {Session} at {Caller} in {FilePath} on line {LineNumber}", this, caller, filePath, line); + + // Drain the send queue before disposing — ensures queued packets (e.g. migration) + // are delivered to the client before the connection is closed. + try { sendQueue.CompleteAdding(); } catch (ObjectDisposedException) { } + try { sendWorkerThread.Join(STOP_TIMEOUT); } catch (Exception ex) { + Logger.Debug(ex, "SendWorker drain join failed"); + } + Dispose(); } @@ -320,7 +329,7 @@ private void SendInternal(byte[] packet, int length) { } private void SendRaw(ByteWriter packet) { - if (disposed || disconnecting == 1) return; + if (disposed) return; try { // Use async write with timeout to prevent indefinite blocking @@ -359,12 +368,12 @@ private void SendRaw(ByteWriter packet) { private void SendWorker() { try { foreach ((byte[] packet, int length) in sendQueue.GetConsumingEnumerable()) { - if (disposed || disconnecting == 1) break; + if (disposed) break; // Encrypt outside lock, then send with timeout PoolByteWriter encryptedPacket; lock (sendCipher) { - if (disposed || disconnecting == 1) break; + if (disposed) break; encryptedPacket = sendCipher.Encrypt(packet, 0, length); } try { diff --git a/Maple2.Server.Game/Manager/DungeonManager.cs b/Maple2.Server.Game/Manager/DungeonManager.cs index 52291f66..71a72549 100644 --- a/Maple2.Server.Game/Manager/DungeonManager.cs +++ b/Maple2.Server.Game/Manager/DungeonManager.cs @@ -609,6 +609,7 @@ private void MigrateToDungeon() { } catch (RpcException ex) { session.Send(MigrationPacket.GameToGameError(MigrationError.s_move_err_default)); session.Send(NoticePacket.Disconnect(new InterfaceText(ex.Message))); + } finally { session.Disconnect(); } } diff --git a/Maple2.Server.Game/PacketHandlers/ChannelHandler.cs b/Maple2.Server.Game/PacketHandlers/ChannelHandler.cs index 9ab7ec2d..c50d3413 100644 --- a/Maple2.Server.Game/PacketHandlers/ChannelHandler.cs +++ b/Maple2.Server.Game/PacketHandlers/ChannelHandler.cs @@ -38,7 +38,6 @@ public override void Handle(GameSession session, IByteReader packet) { var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port); session.Send(MigrationPacket.GameToGame(endpoint, response.Token, session.Field?.MapId ?? 0)); session.State = SessionState.ChangeChannel; - session.Disconnect(); } catch (RpcException ex) { Logger.Error(ex, "Failed to migrate to channel {Channel}", channel); @@ -47,6 +46,8 @@ public override void Handle(GameSession session, IByteReader packet) { // Update the client with the latest channel list. ChannelsResponse response = World.Channels(new ChannelsRequest()); session.Send(ChannelPacket.Load(response.Channels)); + } finally { + session.Disconnect(); } } } diff --git a/Maple2.Server.Game/PacketHandlers/QuitHandler.cs b/Maple2.Server.Game/PacketHandlers/QuitHandler.cs index e17150ea..ce012fd9 100644 --- a/Maple2.Server.Game/PacketHandlers/QuitHandler.cs +++ b/Maple2.Server.Game/PacketHandlers/QuitHandler.cs @@ -46,15 +46,11 @@ public override void Handle(GameSession session, IByteReader packet) { MigrateOutResponse response = World.MigrateOut(request); var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port); session.Send(MigrationPacket.GameToLogin(endpoint, response.Token)); - // Do NOT disconnect here — let the client close the TCP connection after - // receiving the migration packet. Calling Disconnect() immediately would - // set disconnecting=1, causing SendWorker to drop the queued packet. - // The natural TCP close will trigger the full Dispose chain (leave field, - // update PlayerInfo, save state, etc.). } catch (RpcException ex) { Logger.Error(ex, "MigrateOut failed for account={AccountId} char={CharacterId}", session.AccountId, session.CharacterId); session.Send(MigrationPacket.GameToLoginError(s_move_err_default)); + } finally { session.Disconnect(); } } diff --git a/Maple2.Server.Game/Session/GameSession.cs b/Maple2.Server.Game/Session/GameSession.cs index b2fc41db..1935a24a 100644 --- a/Maple2.Server.Game/Session/GameSession.cs +++ b/Maple2.Server.Game/Session/GameSession.cs @@ -702,6 +702,7 @@ public void Migrate(int mapId, long ownerId = 0) { } catch (RpcException ex) { Send(MigrationPacket.GameToGameError(MigrationError.s_move_err_default)); Send(NoticePacket.Disconnect(new InterfaceText(ex.Message))); + } finally { Disconnect(); } } diff --git a/Maple2.Server.Login/PacketHandlers/CharacterManagementHandler.cs b/Maple2.Server.Login/PacketHandlers/CharacterManagementHandler.cs index e2282d32..51b3fe9c 100644 --- a/Maple2.Server.Login/PacketHandlers/CharacterManagementHandler.cs +++ b/Maple2.Server.Login/PacketHandlers/CharacterManagementHandler.cs @@ -96,6 +96,7 @@ private void HandleSelect(LoginSession session, IByteReader packet) { MigrateOutResponse response = World.MigrateOut(request); var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port); session.Send(MigrationPacket.LoginToGame(endpoint, response.Token, character.MapId)); + session.Disconnect(); } catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable) { session.Send(MigrationPacket.LoginToGameError(s_move_err_no_server, ex.Message)); } catch (RpcException ex) when (ex.StatusCode == StatusCode.ResourceExhausted) { From 78ec723b7af806040328cfef8c96bc5ad1e868d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=82ngelo=20Tadeucci?= Date: Sun, 8 Feb 2026 22:33:49 -0300 Subject: [PATCH 2/2] Make EventQueueTests timing robust on CI Replace a single Thread.Sleep(25) with a retry loop that sleeps 10ms and calls queue.InvokeAll up to 10 times until the scheduled action runs. This reduces flakiness by handling timing imprecision in CI environments. --- Maple2.Server.Tests/Tools/EventQueueTests.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Maple2.Server.Tests/Tools/EventQueueTests.cs b/Maple2.Server.Tests/Tools/EventQueueTests.cs index a36ae0ee..14f90065 100644 --- a/Maple2.Server.Tests/Tools/EventQueueTests.cs +++ b/Maple2.Server.Tests/Tools/EventQueueTests.cs @@ -62,8 +62,13 @@ public void ScheduleRepeated_SkipFirst_SkipsInitialExecution() { queue.ScheduleRepeated(() => count++, TimeSpan.FromMilliseconds(20), skipFirst: true); queue.InvokeAll(); Assert.That(count, Is.EqualTo(0)); - Thread.Sleep(25); - queue.InvokeAll(); + + // Wait with retry to handle timing imprecision on CI environments + int maxRetries = 10; + for (int i = 0; i < maxRetries && count == 0; i++) { + Thread.Sleep(10); + queue.InvokeAll(); + } Assert.That(count, Is.EqualTo(1)); } }