diff --git a/Maple2.Server.Core/Network/Session.cs b/Maple2.Server.Core/Network/Session.cs index 31fdac0a..554a4cc4 100644 --- a/Maple2.Server.Core/Network/Session.cs +++ b/Maple2.Server.Core/Network/Session.cs @@ -52,6 +52,7 @@ public abstract class Session : IDisposable { // Send queue for non-blocking sends private readonly BlockingCollection<(byte[] packet, int length)> sendQueue = new(new ConcurrentQueue<(byte[], int)>()); + private Thread sendWorkerThread = null!; public long AccountId { get; protected set; } public long CharacterId { get; protected set; } @@ -85,11 +86,11 @@ protected Session(TcpClient tcpClient) { recvCipher = new MapleCipher.Decryptor(VERSION, riv, BLOCK_IV); // Start send worker thread - var sendWorkerThread1 = new Thread(SendWorker) { + sendWorkerThread = new Thread(SendWorker) { Name = $"SendWorker-{name}", IsBackground = true, }; - sendWorkerThread1.Start(); + sendWorkerThread.Start(); } ~Session() => Dispose(false); @@ -133,6 +134,14 @@ public void Disconnect([CallerMemberName] string caller = "", [CallerLineNumber] if (Interlocked.Exchange(ref disconnecting, 1) == 1) return; Logger.Information("Disconnected {Session} at {Caller} in {FilePath} on line {LineNumber}", this, caller, filePath, line); + + // Drain the send queue before disposing — ensures queued packets (e.g. migration) + // are delivered to the client before the connection is closed. + try { sendQueue.CompleteAdding(); } catch (ObjectDisposedException) { } + try { sendWorkerThread.Join(STOP_TIMEOUT); } catch (Exception ex) { + Logger.Debug(ex, "SendWorker drain join failed"); + } + Dispose(); } @@ -320,7 +329,7 @@ private void SendInternal(byte[] packet, int length) { } private void SendRaw(ByteWriter packet) { - if (disposed || disconnecting == 1) return; + if (disposed) return; try { // Use async write with timeout to prevent indefinite blocking @@ -359,12 +368,12 @@ private void SendRaw(ByteWriter packet) { private void SendWorker() { try { foreach ((byte[] packet, int length) in sendQueue.GetConsumingEnumerable()) { - if (disposed || disconnecting == 1) break; + if (disposed) break; // Encrypt outside lock, then send with timeout PoolByteWriter encryptedPacket; lock (sendCipher) { - if (disposed || disconnecting == 1) break; + if (disposed) break; encryptedPacket = sendCipher.Encrypt(packet, 0, length); } try { diff --git a/Maple2.Server.Game/Manager/DungeonManager.cs b/Maple2.Server.Game/Manager/DungeonManager.cs index 52291f66..71a72549 100644 --- a/Maple2.Server.Game/Manager/DungeonManager.cs +++ b/Maple2.Server.Game/Manager/DungeonManager.cs @@ -609,6 +609,7 @@ private void MigrateToDungeon() { } catch (RpcException ex) { session.Send(MigrationPacket.GameToGameError(MigrationError.s_move_err_default)); session.Send(NoticePacket.Disconnect(new InterfaceText(ex.Message))); + } finally { session.Disconnect(); } } diff --git a/Maple2.Server.Game/PacketHandlers/ChannelHandler.cs b/Maple2.Server.Game/PacketHandlers/ChannelHandler.cs index 9ab7ec2d..c50d3413 100644 --- a/Maple2.Server.Game/PacketHandlers/ChannelHandler.cs +++ b/Maple2.Server.Game/PacketHandlers/ChannelHandler.cs @@ -38,7 +38,6 @@ public override void Handle(GameSession session, IByteReader packet) { var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port); session.Send(MigrationPacket.GameToGame(endpoint, response.Token, session.Field?.MapId ?? 0)); session.State = SessionState.ChangeChannel; - session.Disconnect(); } catch (RpcException ex) { Logger.Error(ex, "Failed to migrate to channel {Channel}", channel); @@ -47,6 +46,8 @@ public override void Handle(GameSession session, IByteReader packet) { // Update the client with the latest channel list. ChannelsResponse response = World.Channels(new ChannelsRequest()); session.Send(ChannelPacket.Load(response.Channels)); + } finally { + session.Disconnect(); } } } diff --git a/Maple2.Server.Game/PacketHandlers/QuitHandler.cs b/Maple2.Server.Game/PacketHandlers/QuitHandler.cs index e17150ea..ce012fd9 100644 --- a/Maple2.Server.Game/PacketHandlers/QuitHandler.cs +++ b/Maple2.Server.Game/PacketHandlers/QuitHandler.cs @@ -46,15 +46,11 @@ public override void Handle(GameSession session, IByteReader packet) { MigrateOutResponse response = World.MigrateOut(request); var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port); session.Send(MigrationPacket.GameToLogin(endpoint, response.Token)); - // Do NOT disconnect here — let the client close the TCP connection after - // receiving the migration packet. Calling Disconnect() immediately would - // set disconnecting=1, causing SendWorker to drop the queued packet. - // The natural TCP close will trigger the full Dispose chain (leave field, - // update PlayerInfo, save state, etc.). } catch (RpcException ex) { Logger.Error(ex, "MigrateOut failed for account={AccountId} char={CharacterId}", session.AccountId, session.CharacterId); session.Send(MigrationPacket.GameToLoginError(s_move_err_default)); + } finally { session.Disconnect(); } } diff --git a/Maple2.Server.Game/Session/GameSession.cs b/Maple2.Server.Game/Session/GameSession.cs index b2fc41db..1935a24a 100644 --- a/Maple2.Server.Game/Session/GameSession.cs +++ b/Maple2.Server.Game/Session/GameSession.cs @@ -702,6 +702,7 @@ public void Migrate(int mapId, long ownerId = 0) { } catch (RpcException ex) { Send(MigrationPacket.GameToGameError(MigrationError.s_move_err_default)); Send(NoticePacket.Disconnect(new InterfaceText(ex.Message))); + } finally { Disconnect(); } } diff --git a/Maple2.Server.Login/PacketHandlers/CharacterManagementHandler.cs b/Maple2.Server.Login/PacketHandlers/CharacterManagementHandler.cs index e2282d32..51b3fe9c 100644 --- a/Maple2.Server.Login/PacketHandlers/CharacterManagementHandler.cs +++ b/Maple2.Server.Login/PacketHandlers/CharacterManagementHandler.cs @@ -96,6 +96,7 @@ private void HandleSelect(LoginSession session, IByteReader packet) { MigrateOutResponse response = World.MigrateOut(request); var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port); session.Send(MigrationPacket.LoginToGame(endpoint, response.Token, character.MapId)); + session.Disconnect(); } catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable) { session.Send(MigrationPacket.LoginToGameError(s_move_err_no_server, ex.Message)); } catch (RpcException ex) when (ex.StatusCode == StatusCode.ResourceExhausted) { diff --git a/Maple2.Server.Tests/Tools/EventQueueTests.cs b/Maple2.Server.Tests/Tools/EventQueueTests.cs index a36ae0ee..14f90065 100644 --- a/Maple2.Server.Tests/Tools/EventQueueTests.cs +++ b/Maple2.Server.Tests/Tools/EventQueueTests.cs @@ -62,8 +62,13 @@ public void ScheduleRepeated_SkipFirst_SkipsInitialExecution() { queue.ScheduleRepeated(() => count++, TimeSpan.FromMilliseconds(20), skipFirst: true); queue.InvokeAll(); Assert.That(count, Is.EqualTo(0)); - Thread.Sleep(25); - queue.InvokeAll(); + + // Wait with retry to handle timing imprecision on CI environments + int maxRetries = 10; + for (int i = 0; i < maxRetries && count == 0; i++) { + Thread.Sleep(10); + queue.InvokeAll(); + } Assert.That(count, Is.EqualTo(1)); } }