Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions Maple2.Server.Core/Network/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public abstract class Session : IDisposable {

// Send queue for non-blocking sends
private readonly BlockingCollection<(byte[] packet, int length)> sendQueue = new(new ConcurrentQueue<(byte[], int)>());
private Thread sendWorkerThread = null!;

public long AccountId { get; protected set; }
public long CharacterId { get; protected set; }
Expand Down Expand Up @@ -85,11 +86,11 @@ protected Session(TcpClient tcpClient) {
recvCipher = new MapleCipher.Decryptor(VERSION, riv, BLOCK_IV);

// Start send worker thread
var sendWorkerThread1 = new Thread(SendWorker) {
sendWorkerThread = new Thread(SendWorker) {
Name = $"SendWorker-{name}",
IsBackground = true,
};
sendWorkerThread1.Start();
sendWorkerThread.Start();
}

~Session() => Dispose(false);
Expand Down Expand Up @@ -133,6 +134,14 @@ public void Disconnect([CallerMemberName] string caller = "", [CallerLineNumber]

if (Interlocked.Exchange(ref disconnecting, 1) == 1) return;
Logger.Information("Disconnected {Session} at {Caller} in {FilePath} on line {LineNumber}", this, caller, filePath, line);

// Drain the send queue before disposing — ensures queued packets (e.g. migration)
// are delivered to the client before the connection is closed.
try { sendQueue.CompleteAdding(); } catch (ObjectDisposedException) { }
try { sendWorkerThread.Join(STOP_TIMEOUT); } catch (Exception ex) {
Logger.Debug(ex, "SendWorker drain join failed");
}

Dispose();
}

Expand Down Expand Up @@ -320,7 +329,7 @@ private void SendInternal(byte[] packet, int length) {
}

private void SendRaw(ByteWriter packet) {
if (disposed || disconnecting == 1) return;
if (disposed) return;

try {
// Use async write with timeout to prevent indefinite blocking
Expand Down Expand Up @@ -359,12 +368,12 @@ private void SendRaw(ByteWriter packet) {
private void SendWorker() {
try {
foreach ((byte[] packet, int length) in sendQueue.GetConsumingEnumerable()) {
if (disposed || disconnecting == 1) break;
if (disposed) break;

// Encrypt outside lock, then send with timeout
PoolByteWriter encryptedPacket;
lock (sendCipher) {
if (disposed || disconnecting == 1) break;
if (disposed) break;
encryptedPacket = sendCipher.Encrypt(packet, 0, length);
}
try {
Expand Down
1 change: 1 addition & 0 deletions Maple2.Server.Game/Manager/DungeonManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ private void MigrateToDungeon() {
} catch (RpcException ex) {
session.Send(MigrationPacket.GameToGameError(MigrationError.s_move_err_default));
session.Send(NoticePacket.Disconnect(new InterfaceText(ex.Message)));
} finally {
session.Disconnect();
}
}
Expand Down
3 changes: 2 additions & 1 deletion Maple2.Server.Game/PacketHandlers/ChannelHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public override void Handle(GameSession session, IByteReader packet) {
var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port);
session.Send(MigrationPacket.GameToGame(endpoint, response.Token, session.Field?.MapId ?? 0));
session.State = SessionState.ChangeChannel;
session.Disconnect();
} catch (RpcException ex) {
Logger.Error(ex, "Failed to migrate to channel {Channel}", channel);

Expand All @@ -47,6 +46,8 @@ public override void Handle(GameSession session, IByteReader packet) {
// Update the client with the latest channel list.
ChannelsResponse response = World.Channels(new ChannelsRequest());
session.Send(ChannelPacket.Load(response.Channels));
} finally {
session.Disconnect();
}
}
}
6 changes: 1 addition & 5 deletions Maple2.Server.Game/PacketHandlers/QuitHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,11 @@ public override void Handle(GameSession session, IByteReader packet) {
MigrateOutResponse response = World.MigrateOut(request);
var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port);
session.Send(MigrationPacket.GameToLogin(endpoint, response.Token));
// Do NOT disconnect here — let the client close the TCP connection after
// receiving the migration packet. Calling Disconnect() immediately would
// set disconnecting=1, causing SendWorker to drop the queued packet.
// The natural TCP close will trigger the full Dispose chain (leave field,
// update PlayerInfo, save state, etc.).
} catch (RpcException ex) {
Logger.Error(ex, "MigrateOut failed for account={AccountId} char={CharacterId}",
session.AccountId, session.CharacterId);
session.Send(MigrationPacket.GameToLoginError(s_move_err_default));
} finally {
session.Disconnect();
}
}
Expand Down
1 change: 1 addition & 0 deletions Maple2.Server.Game/Session/GameSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ public void Migrate(int mapId, long ownerId = 0) {
} catch (RpcException ex) {
Send(MigrationPacket.GameToGameError(MigrationError.s_move_err_default));
Send(NoticePacket.Disconnect(new InterfaceText(ex.Message)));
} finally {
Disconnect();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private void HandleSelect(LoginSession session, IByteReader packet) {
MigrateOutResponse response = World.MigrateOut(request);
var endpoint = new IPEndPoint(IPAddress.Parse(response.IpAddress), response.Port);
session.Send(MigrationPacket.LoginToGame(endpoint, response.Token, character.MapId));
session.Disconnect();
} catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable) {
session.Send(MigrationPacket.LoginToGameError(s_move_err_no_server, ex.Message));
} catch (RpcException ex) when (ex.StatusCode == StatusCode.ResourceExhausted) {
Expand Down
9 changes: 7 additions & 2 deletions Maple2.Server.Tests/Tools/EventQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@ public void ScheduleRepeated_SkipFirst_SkipsInitialExecution() {
queue.ScheduleRepeated(() => count++, TimeSpan.FromMilliseconds(20), skipFirst: true);
queue.InvokeAll();
Assert.That(count, Is.EqualTo(0));
Thread.Sleep(25);
queue.InvokeAll();

// Wait with retry to handle timing imprecision on CI environments
int maxRetries = 10;
for (int i = 0; i < maxRetries && count == 0; i++) {
Thread.Sleep(10);
queue.InvokeAll();
}
Assert.That(count, Is.EqualTo(1));
}
}
Loading