Skip to content

Commit b851c0f

Browse files
committed
fix(websocket): harden shutdown and runtime integration (avoid abort on exit)
1 parent e61c6b0 commit b851c0f

6 files changed

Lines changed: 266 additions & 65 deletions

File tree

include/vix/websocket/App.hpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
#ifndef VIX_WEBSOCKET_APP_HPP
1414
#define VIX_WEBSOCKET_APP_HPP
1515

16+
#include <atomic>
1617
#include <functional>
1718
#include <memory>
19+
#include <mutex>
1820
#include <string>
1921
#include <vector>
2022

@@ -59,6 +61,8 @@ namespace vix::websocket
5961
const std::string &configPath,
6062
std::shared_ptr<vix::executor::RuntimeExecutor> executor);
6163

64+
~App() noexcept;
65+
6266
App(const App &) = delete;
6367
App &operator=(const App &) = delete;
6468
App(App &&) = delete;
@@ -79,9 +83,11 @@ namespace vix::websocket
7983
void run_blocking();
8084

8185
/**
82-
* @brief Stop the WebSocket server.
86+
* @brief Stop the WebSocket app and shared executor.
87+
*
88+
* Safe to call multiple times.
8389
*/
84-
void stop();
90+
void stop() noexcept;
8591

8692
/**
8793
* @brief Access the underlying WebSocket server.
@@ -128,6 +134,14 @@ namespace vix::websocket
128134
*/
129135
void install_dispatcher();
130136

137+
/**
138+
* @brief Dispatch one typed message to all registered handlers.
139+
*/
140+
void dispatch_typed_message(
141+
Session &session,
142+
const std::string &type,
143+
const vix::json::kvs &payload);
144+
131145
private:
132146
/** @brief Application configuration source. */
133147
vix::config::Config config_;
@@ -140,6 +154,12 @@ namespace vix::websocket
140154

141155
/** @brief Registered typed routes. */
142156
std::vector<Route> routes_;
157+
158+
/** @brief Protects stop() from concurrent/double shutdown. */
159+
mutable std::mutex stopMutex_;
160+
161+
/** @brief Indicates whether shutdown was already requested. */
162+
std::atomic<bool> stopped_{false};
143163
};
144164

145165
} // namespace vix::websocket

include/vix/websocket/AttachedRuntime.hpp

Lines changed: 95 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
#define VIX_ATTACHED_RUNTIME_HPP
1616

1717
#include <atomic>
18-
#include <chrono>
1918
#include <filesystem>
2019
#include <memory>
2120
#include <mutex>
22-
#include <thread>
2321
#include <utility>
2422

2523
#include <vix/app/App.hpp>
@@ -36,49 +34,54 @@ namespace vix::websocket
3634
/**
3735
* @brief Runs a WebSocket server alongside an HTTP app, with shared lifecycle.
3836
*
39-
* Starts the WebSocket server on a dedicated thread and stops it when the HTTP
40-
* app requests shutdown or when this object is destroyed.
37+
* Starts the WebSocket server immediately and coordinates shutdown with the attached HTTP
38+
* application.
39+
*
40+
* Important lifecycle rule:
41+
* - The HTTP shutdown callback only requests an asynchronous stop.
42+
* - The final blocking shutdown is executed from an external safe control path.
4143
*/
4244
class AttachedRuntime
4345
{
4446
public:
4547
/**
4648
* @brief Attach a WebSocket server to an existing HTTP app.
4749
*
48-
* Starts the WebSocket server in a background thread and registers an app
49-
* shutdown callback to stop the WebSocket runtime.
50+
* Starts the WebSocket server immediately and registers an app shutdown
51+
* callback that requests a non-blocking WebSocket stop.
5052
*
5153
* @param app HTTP application instance.
5254
* @param ws WebSocket server instance.
55+
* @param exec Shared runtime executor.
5356
*/
54-
AttachedRuntime(vix::App &app, vix::websocket::Server &ws)
55-
: app_(app), ws_(ws), wsThread_(), stopped_(false)
57+
AttachedRuntime(
58+
vix::App &app,
59+
vix::websocket::Server &ws,
60+
std::shared_ptr<vix::executor::RuntimeExecutor> exec)
61+
: app_(app),
62+
ws_(ws),
63+
exec_(std::move(exec)),
64+
stop_requested_(false),
65+
finalized_(false)
5666
{
57-
wsThread_ = std::thread(
58-
[this]()
59-
{
60-
vix::utils::console_wait_banner();
61-
ws_.start();
62-
63-
while (!stopped_.load(std::memory_order_relaxed))
64-
{
65-
std::this_thread::sleep_for(std::chrono::milliseconds(250));
66-
}
67-
});
67+
vix::utils::console_wait_banner();
68+
ws_.start();
6869

6970
app_.set_shutdown_callback(
7071
[this]()
7172
{
72-
stop();
73+
request_stop();
7374
});
7475
}
7576

7677
/**
77-
* @brief Stop the runtime if still running.
78+
* @brief Finalize shutdown if still needed.
79+
*
80+
* This destructor is defensive and idempotent.
7881
*/
79-
~AttachedRuntime()
82+
~AttachedRuntime() noexcept
8083
{
81-
stop();
84+
finalize_shutdown();
8285
}
8386

8487
AttachedRuntime(const AttachedRuntime &) = delete;
@@ -87,23 +90,66 @@ namespace vix::websocket
8790
AttachedRuntime &operator=(AttachedRuntime &&) = delete;
8891

8992
/**
90-
* @brief Stop the WebSocket server and join the worker thread.
93+
* @brief Request an asynchronous WebSocket shutdown.
9194
*
92-
* Safe to call multiple times.
95+
* This function is idempotent and intentionally non-blocking.
96+
* It is safe to call from a shutdown callback running inside a server
97+
* worker thread.
9398
*/
94-
void stop() noexcept
99+
void request_stop() noexcept
95100
{
96101
bool expected = false;
97-
if (!stopped_.compare_exchange_strong(expected, true))
102+
if (!stop_requested_.compare_exchange_strong(
103+
expected,
104+
true,
105+
std::memory_order_acq_rel,
106+
std::memory_order_acquire))
98107
{
99108
return;
100109
}
101110

102-
ws_.stop();
111+
try
112+
{
113+
ws_.stop_async();
114+
}
115+
catch (...)
116+
{
117+
}
118+
}
119+
120+
/**
121+
* @brief Perform the final blocking shutdown exactly once.
122+
*
123+
* Order matters:
124+
* 1. stop websocket blocking
125+
* 2. stop shared executor blocking
126+
*/
127+
void finalize_shutdown() noexcept
128+
{
129+
std::lock_guard<std::mutex> lock(finalize_mutex_);
103130

104-
if (wsThread_.joinable())
131+
if (finalized_.exchange(true, std::memory_order_acq_rel))
132+
{
133+
return;
134+
}
135+
136+
try
137+
{
138+
ws_.stop();
139+
}
140+
catch (...)
141+
{
142+
}
143+
144+
try
145+
{
146+
if (exec_)
147+
{
148+
exec_->stop();
149+
}
150+
}
151+
catch (...)
105152
{
106-
wsThread_.join();
107153
}
108154
}
109155

@@ -114,11 +160,17 @@ namespace vix::websocket
114160
/** @brief Attached WebSocket server. */
115161
vix::websocket::Server &ws_;
116162

117-
/** @brief Background thread driving the WebSocket runtime. */
118-
std::thread wsThread_;
163+
/** @brief Shared runtime executor. */
164+
std::shared_ptr<vix::executor::RuntimeExecutor> exec_;
119165

120-
/** @brief Idempotent stop flag. */
121-
std::atomic<bool> stopped_;
166+
/** @brief Idempotent asynchronous stop flag. */
167+
std::atomic<bool> stop_requested_;
168+
169+
/** @brief Ensures final shutdown happens once. */
170+
std::atomic<bool> finalized_;
171+
172+
/** @brief Protects final shutdown sequence. */
173+
std::mutex finalize_mutex_;
122174
};
123175

124176
} // namespace vix::websocket
@@ -148,16 +200,18 @@ namespace vix
148200
/**
149201
* @brief Run HTTP and WebSocket servers together in blocking mode.
150202
*
151-
* Registers WebSocket OpenAPI docs once, starts the WebSocket runtime,
152-
* then starts the HTTP server and waits until shutdown.
203+
* Registers WebSocket OpenAPI docs once, starts the attached WebSocket
204+
* runtime, then starts the HTTP server and waits until shutdown.
153205
*
154206
* @param app HTTP application instance.
155207
* @param ws WebSocket server instance.
208+
* @param exec Shared runtime executor.
156209
* @param port HTTP listening port.
157210
*/
158211
inline void run_http_and_ws(
159212
vix::App &app,
160213
vix::websocket::Server &ws,
214+
std::shared_ptr<vix::executor::RuntimeExecutor> exec,
161215
int port = 8080)
162216
{
163217
register_ws_openapi_docs_once();
@@ -168,7 +222,7 @@ namespace vix
168222
vix::openapi::register_openapi_and_docs(*r, "Vix API", "1.33.0");
169223
}
170224

171-
vix::websocket::AttachedRuntime runtime{app, ws};
225+
vix::websocket::AttachedRuntime runtime{app, ws, exec};
172226

173227
app.listen(
174228
port,
@@ -192,6 +246,8 @@ namespace vix
192246

193247
app.wait();
194248
app.close();
249+
250+
runtime.finalize_shutdown();
195251
}
196252

197253
/**
@@ -223,7 +279,7 @@ namespace vix
223279

224280
fn(app, ws);
225281

226-
run_http_and_ws(app, ws, port);
282+
run_http_and_ws(app, ws, exec, port);
227283
}
228284

229285
/**
@@ -240,4 +296,4 @@ namespace vix
240296

241297
} // namespace vix
242298

243-
#endif
299+
#endif // VIX_ATTACHED_RUNTIME_HPP

include/vix/websocket/server.hpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,37 @@ namespace vix::websocket
217217
}
218218

219219
/**
220-
* @brief Stop the WebSocket engine and join its internal threads.
220+
* @brief Request an asynchronous stop of the WebSocket engine.
221+
*
222+
* This function is non-blocking and only signals shutdown to the internal
223+
* low-level engine. It does not join worker threads.
221224
*/
222-
void stop()
225+
void stop_async()
223226
{
224227
engine_.stop_async();
228+
}
229+
230+
/**
231+
* @brief Join internal WebSocket worker threads.
232+
*
233+
* This function blocks until the low-level engine threads have exited.
234+
*/
235+
void join()
236+
{
225237
engine_.join_threads();
226238
}
227239

240+
/**
241+
* @brief Stop the WebSocket engine and join its internal threads.
242+
*
243+
* This is the blocking shutdown path.
244+
*/
245+
void stop()
246+
{
247+
stop_async();
248+
join();
249+
}
250+
228251
/**
229252
* @brief Start the server and block until stop is requested.
230253
*/

include/vix/websocket/websocket.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,12 @@ namespace vix::websocket
176176

177177
/** @brief Effective bound port, useful when binding to port 0. */
178178
std::atomic<int> boundPort_{0};
179+
180+
/** @brief Protects the join phase against concurrent callers. */
181+
mutable std::mutex joinMutex_;
182+
183+
/** @brief Ensures IO worker threads are joined only once. */
184+
std::atomic<bool> threadsJoined_{false};
179185
};
180186

181187
} // namespace vix::websocket

0 commit comments

Comments
 (0)