Skip to content

Make Message and Client pools thread-safe using ConcurrentDictionary#180

Open
CrossV4 wants to merge 1 commit into
RiptideNetworking:mainfrom
CrossV4:main
Open

Make Message and Client pools thread-safe using ConcurrentDictionary#180
CrossV4 wants to merge 1 commit into
RiptideNetworking:mainfrom
CrossV4:main

Conversation

@CrossV4
Copy link
Copy Markdown

@CrossV4 CrossV4 commented May 23, 2026

Summary of Changes

I have developed a highly efficient, multi-threaded, and thread-safe networking architectural pattern for RiptideNetworking, specifically designed for high-concurrency projects like MMORPGs.

The Problem: Thread Contention & Race Conditions

When offloading network I/O operations to a dedicated background thread (e.g., executing Server.Update() asynchronously to keep the main game loop responsive), serious race conditions occur. The primary culprit is Riptide’s internal message pool (Message.Create(), Message.Release()). Accessing the message pool concurrently from both the Main Thread (Game Logic) and the Background Thread (Network Loop) corrupts internal byte arrays, leading to network desynchronization, corrupted data, and server crashes.

The Solution: The Command Pattern & Asynchronous Action Queue

To bridge the gap between the Main Thread and the Network Thread safely without expensive resource locking, I implemented a custom Command Pattern execution pipeline combined with a ConcurrentQueue.

  1. Incoming Request Queuing: Network message handlers running on the background thread no longer execute game logic or manipulate the message pool directly. Instead, they deserialize raw data into lightweight, immutable INetworkCommand structures and enqueue them into a thread-safe ConcurrentQueue.
public static readonly ConcurrentQueue<INetworkCommand> ACTION_QUEUE = new();
public interface INetworkCommand
{
    ushort ClientId { get; }
    void Execute();
}

private class PPromo : INetworkCommand
{
    public ushort ClientId { get; set; }
    public string promocode { get; set; }

    public void Execute()
    {
        if (CheckIsInList(ClientId, out Player player))
        {
            bool success = TryClaimPromoLocal(player, promocode);
            Message message = Message.Create(MessageSendMode.Reliable, ServerToClientId.PromoResult);
            message.AddBool(success);
            NetworkManager.Singleton.Server.Send(message, ClientId);
        }
    }
}
[MessageHandler((ushort)ClientToServerId.PromoRequest)]
private static void PROCMO_ACTIVATE(ushort fromClientId, Message message)
{
    NetworkManager.ACTION_QUEUE.Enqueue(new PPromo() { ClientId = fromClientId, promocode = message.GetString() });
}
  1. Main Thread Thread-Safe Execution & Frame Budgeting: The Main Thread dequeues and processes these commands sequentially during its standard Update() loop. To ensure server stability and prevent CPU spikes caused by a massive surge of packets, I integrated a Frame Budget Monitor using a Stopwatch. If processing takes longer than the allocated frame time budget (MaxFrameMs), the execution breaks, deferred actions are carried over to the next frame, and a warning is logged.
private readonly System.Diagnostics.Stopwatch SW = new();
private void Update()
{
        SW.Restart();
        while (ACTION_QUEUE.TryDequeue(out INetworkCommand CR))
        {
            CR.Execute();

            ACTION_REQUEST_COUNTER.AddOrUpdate(CR.ClientId, 0, (o, k) => (byte)(k - 1));

            // Frame Budgeting Control (e.g., ~33.3ms for a 30-Tick Server)
            if (SW.ElapsedMilliseconds > MaxFrameMs)
            {
                Debug.LogWarning($"{{{TICK_FORDEBUG}}} - [Performans] Bu frame'de {SW.ElapsedMilliseconds}ms harcandı! (Dikkat: {CR.GetType().Name}). Kalan istekleri bir sonrakine aktarılıyor.");
                break;
            }
        }
}
  1. Isolated Background Network Loop: With the main loop freed from networking I/O overhead, Server.Update() runs continuously on a high-priority background worker thread, throttled naturally to prevent CPU core exhaustion.
            SERVER_THREAD = new Thread(() => ServerThreadLoop())
            {
                Name = "MokSha-Rin Riptide Server",
                IsBackground = true
            };
            SERVER_THREAD.Start();
    private void ServerThreadLoop()
    {
        try
        {
            while (!GLOBAL_SOURCE.Token.IsCancellationRequested && !QUIT_FLAG)
            {
                // read Riptide packages and fill the actionqueue
                if (Server != null && Server.IsRunning)
                {
                    Server.Update();
                }

                //preventing cpu lock making the thread sleep for a tiny duration and also check for requesting server is going to shutdown
                Thread.Sleep(16);
                GLOBAL_SOURCE.Token.ThrowIfCancellationRequested();
            }
        }
        catch (Exception ex)
        {
            Debug.LogError($"{{{TICK_FORDEBUG}}} - [SERVER - CORE] Server error: {ex.Message}");
        }
        finally
        {
            Debug.Log($"{{{TICK_FORDEBUG}}} - [SERVER - CORE] Server has shutdown (Loop end).");
        }
    }
  1. Why we need this ?
public void Execute()
    {
        if (CheckIsInList(ClientId, out Player player))
        {
            bool success = TryClaimPromoLocal(player, promocode);
            Message message = Message.Create(MessageSendMode.Reliable, ServerToClientId.PromoResult);
            message.AddBool(success);
            NetworkManager.Singleton.Server.Send(message, ClientId);
        }
    }

Message message = Message.Create(MessageSendMode.Reliable, ServerToClientId.PromoResult);
Why This is Vital for the Community
By enforcing this design, Riptide’s pooling mechanism (Message.Create) is only ever touched from a single thread sequence during actual processing, completely eliminating Riptide-pool-related Multi-Thread crashes. I wanted to share this architectural solution with the Riptide community to help anyone struggling to transition their dedicated servers to an asynchronous, high-performance, and crash-proof multi-threaded environment. Hope it helps!

…and ConcurrentStack for avoiding calling Update function from another thread.
@tom-weiland
Copy link
Copy Markdown
Collaborator

Thanks for the PR, Erol!

Regarding the // Modified from Erol Bircan lines you added, I'd prefer to keep file headers limited to mentioning the license and the primary copyright to avoid "header bloat." The commit history (and git blame) already tracks individual contributions and modifications, and it does so much more precisely than manual comments like this do.

Additionally, a number of your code comments read like they were AI generated and they don't fit in with the rest of the code-base, and some of the XML doc comments (the /// ones) you added have typos/grammatical mistakes or don't make a whole lot of sense.

Could you please remove those lines from the headers and do an "AI tone"/grammar/typo pass on your comments? (If English is not your first language I can also try to find some time to help with this.) I’m happy to take another look at the logic changes once that's done 🙂

@CrossV4
Copy link
Copy Markdown
Author

CrossV4 commented Jun 1, 2026

uhh yess, i am a turkish guy i couldnt properly write a professional english so i wote the lines in turkish then i translate them with ai 😅😅.

Okay, i will remove the command lines and i will try to write it withouit ai (but i dont think i will writte them professionally)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants