Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions crates/bashkit-js/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,33 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Mutex;

// ---------------------------------------------------------------------------
// Shared tokio runtime + concurrency limiter for JS tool callbacks (issue #982).
// A single multi-thread runtime is created lazily and reused for every callback
// invocation, replacing the previous pattern of spawning an unbounded number of
// OS threads each with its own single-threaded runtime. A semaphore caps the
// maximum number of concurrent in-flight callbacks to prevent DoS.
// ---------------------------------------------------------------------------
const MAX_CONCURRENT_TOOL_CALLBACKS: usize = 10;

fn callback_runtime() -> &'static tokio::runtime::Runtime {
use std::sync::OnceLock;
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RT.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.expect("failed to create shared callback runtime")
})
}

fn callback_semaphore() -> &'static tokio::sync::Semaphore {
use std::sync::OnceLock;
static SEM: OnceLock<tokio::sync::Semaphore> = OnceLock::new();
SEM.get_or_init(|| tokio::sync::Semaphore::new(MAX_CONCURRENT_TOOL_CALLBACKS))
}

// ============================================================================
// MontyObject <-> JSON conversion
// ============================================================================
Expand Down Expand Up @@ -1064,20 +1091,21 @@ impl ScriptedTool {
});
let request_str = serde_json::to_string(&request).map_err(|e| e.to_string())?;

// Use a dedicated thread so the TSFN can dispatch to the JS event loop.
// The main thread must NOT be blocked (use async `execute`, not `executeSync`).
// Dispatch the TSFN call on the shared callback runtime with a
// concurrency semaphore to prevent unbounded thread/task creation
// (see issue #982).
let tsfn_clone = tsfn.clone();
let tool_name_clone = tool_name.clone();
let rt = callback_runtime();
let sem = callback_semaphore();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let result = match rt {
Ok(rt) => rt
.block_on(tsfn_clone.call_async((request_str,)))
rt.spawn(async move {
let result = match sem.acquire().await {
Ok(_permit) => tsfn_clone
.call_async((request_str,))
.await
.map_err(|e| format!("{}: {}", tool_name_clone, e)),
Err(e) => Err(format!("{}: runtime error: {}", tool_name_clone, e)),
Err(e) => Err(format!("{}: semaphore error: {}", tool_name_clone, e)),
};
let _ = tx.send(result);
});
Expand Down
Loading