From a2d62e402f004df53fda64905882173efaa19af0 Mon Sep 17 00:00:00 2001 From: Adam Eivy Date: Thu, 9 Apr 2026 21:45:43 -0700 Subject: [PATCH] bugs perf: better audit remediation --- server/lib/errorHandler.js | 4 ++ server/services/agentActionExecutor.js | 9 +++- server/services/agentManagement.js | 8 +++- server/services/cosRunnerClient.js | 65 ++++++++------------------ server/services/dataManager.js | 8 ++-- server/services/dataSync.js | 4 +- server/services/feeds.js | 8 +++- server/services/memoryEmbeddings.js | 11 +---- server/services/memoryRetriever.js | 22 +++++---- 9 files changed, 64 insertions(+), 75 deletions(-) diff --git a/server/lib/errorHandler.js b/server/lib/errorHandler.js index 96b9a809f..b57155f22 100644 --- a/server/lib/errorHandler.js +++ b/server/lib/errorHandler.js @@ -220,5 +220,9 @@ export function setupProcessErrorHandlers(io) { if (io) { emitErrorEvent(io, serverError); } + + // Process is in undefined state after uncaught exception — must exit. + // Use a short delay to allow the socket event to flush before exiting. + setTimeout(() => process.exit(1), 100); }); } diff --git a/server/services/agentActionExecutor.js b/server/services/agentActionExecutor.js index 577cc1f9b..e793bbe95 100644 --- a/server/services/agentActionExecutor.js +++ b/server/services/agentActionExecutor.js @@ -569,7 +569,11 @@ async function executeMoltworldInteract(client, account, params) { * Listens to scheduler events and executes actions */ export function init() { - scheduleEvents.on('execute', async ({ scheduleId, schedule, timestamp }) => { + // Prevent duplicate listeners; safe to re-register after listeners are removed + if (scheduleEvents.listenerCount('execute') > 0) return; + + scheduleEvents.on('execute', ({ scheduleId, schedule, timestamp }) => { + (async () => { console.log(`⚡ Executing scheduled action: ${schedule.action.type} (${scheduleId})`); // Get account with full credentials @@ -657,6 +661,9 @@ export function init() { timestamp: new Date().toISOString(), durationMs: Date.now() - startTime }); + })().catch(err => { + console.error(`❌ Unhandled error in execute listener for schedule ${scheduleId}: ${err.message}`); + }); }); console.log('⚡ Agent action executor initialized'); diff --git a/server/services/agentManagement.js b/server/services/agentManagement.js index 31c68c8bf..2a9370dda 100644 --- a/server/services/agentManagement.js +++ b/server/services/agentManagement.js @@ -94,8 +94,8 @@ export async function terminateAgent(agentId) { // Kill the process agent.process.kill('SIGTERM'); - // Give it a moment, then force kill if needed - setTimeout(() => { + // Give it a moment, then force kill if still running + const killTimer = setTimeout(() => { if (activeAgents.has(agentId)) { agent.process.kill('SIGKILL'); unregisterSpawnedAgent(agent.pid); @@ -103,6 +103,10 @@ export async function terminateAgent(agentId) { } }, 5000); + // Store the timer so the close handler can clear it when the process exits cleanly + const agentEntry = activeAgents.get(agentId); + if (agentEntry) agentEntry.killTimer = killTimer; + return { success: true, agentId }; } diff --git a/server/services/cosRunnerClient.js b/server/services/cosRunnerClient.js index 89c7cf8da..e166709c1 100644 --- a/server/services/cosRunnerClient.js +++ b/server/services/cosRunnerClient.js @@ -12,7 +12,8 @@ const COS_RUNNER_URL = process.env.COS_RUNNER_URL || 'http://localhost:5558'; // Socket.IO client for real-time events let socket = null; -let eventHandlers = new Map(); +// Map of event name -> array of handlers (supports multiple listeners per event) +const eventHandlers = new Map(); /** * Initialize connection to CoS Runner @@ -26,17 +27,19 @@ export function initCosRunnerConnection() { reconnectionDelay: 1000 }); + const dispatch = (event, data) => { + const handlers = eventHandlers.get(event); + if (handlers) handlers.forEach(h => h(data)); + }; + socket.on('connect', () => { console.log('🔌 Connected to CoS Runner'); - // Emit reconnect event so services can sync their state - const handler = eventHandlers.get('connection:ready'); - if (handler) handler(); + dispatch('connection:ready', undefined); }); socket.on('disconnect', () => { console.log('🔌 Disconnected from CoS Runner'); - const handler = eventHandlers.get('connection:lost'); - if (handler) handler(); + dispatch('connection:lost', undefined); }); socket.on('connect_error', (err) => { @@ -44,54 +47,26 @@ export function initCosRunnerConnection() { }); // Forward events to registered handlers - socket.on('agent:output', (data) => { - const handler = eventHandlers.get('agent:output'); - if (handler) handler(data); - }); - - socket.on('agent:completed', (data) => { - const handler = eventHandlers.get('agent:completed'); - if (handler) handler(data); - }); - - socket.on('agent:error', (data) => { - const handler = eventHandlers.get('agent:error'); - if (handler) handler(data); - }); - - socket.on('agent:btw', (data) => { - const handler = eventHandlers.get('agent:btw'); - if (handler) handler(data); - }); + socket.on('agent:output', (data) => dispatch('agent:output', data)); + socket.on('agent:completed', (data) => dispatch('agent:completed', data)); + socket.on('agent:error', (data) => dispatch('agent:error', data)); + socket.on('agent:btw', (data) => dispatch('agent:btw', data)); // Batch orphaned agents event (startup cleanup) - socket.on('agents:orphaned', (data) => { - const handler = eventHandlers.get('agents:orphaned'); - if (handler) handler(data); - }); + socket.on('agents:orphaned', (data) => dispatch('agents:orphaned', data)); // Forward devtools run events to registered handlers - socket.on('run:data', (data) => { - const handler = eventHandlers.get('run:data'); - if (handler) handler(data); - }); - - socket.on('run:complete', (data) => { - const handler = eventHandlers.get('run:complete'); - if (handler) handler(data); - }); - - socket.on('run:error', (data) => { - const handler = eventHandlers.get('run:error'); - if (handler) handler(data); - }); + socket.on('run:data', (data) => dispatch('run:data', data)); + socket.on('run:complete', (data) => dispatch('run:complete', data)); + socket.on('run:error', (data) => dispatch('run:error', data)); } /** - * Register event handler + * Register event handler (multiple handlers per event are supported) */ export function onCosRunnerEvent(event, handler) { - eventHandlers.set(event, handler); + if (!eventHandlers.has(event)) eventHandlers.set(event, []); + eventHandlers.get(event).push(handler); } /** diff --git a/server/services/dataManager.js b/server/services/dataManager.js index 2f2c86061..d0ef83c25 100644 --- a/server/services/dataManager.js +++ b/server/services/dataManager.js @@ -1,5 +1,5 @@ import { readdir, stat, rm, mkdir, writeFile as fsWriteFile } from 'fs/promises'; -import { join, relative } from 'path'; +import { join, relative, resolve } from 'path'; import { existsSync } from 'fs'; import { execFile } from 'child_process'; import { promisify } from 'util'; @@ -169,9 +169,9 @@ export async function purgeCategory(categoryKey, options = {}) { if (!existsSync(dirPath)) throw new Error(`Category directory not found: ${categoryKey}`); if (options.subPath) { - if (!SAFE_NAME.test(options.subPath.replace(/\//g, ''))) throw new Error('Invalid subpath'); - const targetPath = join(dirPath, options.subPath); - if (!targetPath.startsWith(dirPath)) throw new Error('Path traversal not allowed'); + const resolvedTarget = resolve(join(dirPath, options.subPath)); + if (!resolvedTarget.startsWith(resolve(dirPath))) throw new Error('Invalid subPath'); + const targetPath = resolvedTarget; await rm(targetPath, { recursive: true, force: true }); } else { const entries = await readdir(dirPath).catch(() => []); diff --git a/server/services/dataSync.js b/server/services/dataSync.js index 4f3dbb6f9..8ec804143 100644 --- a/server/services/dataSync.js +++ b/server/services/dataSync.js @@ -158,8 +158,8 @@ async function applyGoalsRemote(remoteData) { // Merge top-level metadata (birthDate, lifeExpectancy, timeHorizons) via LWW // Use the most recent goal's updatedAt as proxy for file freshness - const localMaxTs = Math.max(0, ...(local.goals || []).map(g => new Date(g.updatedAt || 0).getTime())); - const remoteMaxTs = Math.max(0, ...(remoteData.goals || []).map(g => new Date(g.updatedAt || 0).getTime())); + const localMaxTs = (local.goals || []).reduce((max, g) => Math.max(max, new Date(g.updatedAt || 0).getTime()), 0); + const remoteMaxTs = (remoteData.goals || []).reduce((max, g) => Math.max(max, new Date(g.updatedAt || 0).getTime()), 0); const metaSource = remoteMaxTs > localMaxTs ? remoteData : local; const merged = { diff --git a/server/services/feeds.js b/server/services/feeds.js index ecde0af4e..78aa2d740 100644 --- a/server/services/feeds.js +++ b/server/services/feeds.js @@ -101,10 +101,14 @@ const parseFeed = (xml) => { export async function getFeeds() { const data = await store.load(); - // Attach unread counts + // Pre-compute unread counts in one pass: O(I) instead of O(F×I) + const unreadCounts = new Map(); + for (const item of data.items) { + if (!item.read) unreadCounts.set(item.feedId, (unreadCounts.get(item.feedId) || 0) + 1); + } return data.feeds.map(feed => ({ ...feed, - unreadCount: data.items.filter(i => i.feedId === feed.id && !i.read).length + unreadCount: unreadCounts.get(feed.id) || 0 })); } diff --git a/server/services/memoryEmbeddings.js b/server/services/memoryEmbeddings.js index a784fecd4..21a13113c 100644 --- a/server/services/memoryEmbeddings.js +++ b/server/services/memoryEmbeddings.js @@ -141,18 +141,11 @@ export function setEmbeddingConfig(config) { export async function checkAvailability() { await initConfig(); const config = getConfig(); - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), 5000); const response = await fetch(`${config.embeddingEndpoint.replace('/v1/embeddings', '/v1/models')}`, { method: 'GET', - signal: controller.signal - }).catch(err => { - clearTimeout(timeout); - return { ok: false, _err: err.message }; - }); - - clearTimeout(timeout); + signal: AbortSignal.timeout(5000) + }).catch(err => ({ ok: false, _err: err.message })); if (!response.ok) { return { available: false, error: response._err || `LM Studio returned ${response.status}`, endpoint: config.embeddingEndpoint }; diff --git a/server/services/memoryRetriever.js b/server/services/memoryRetriever.js index d3a3466ee..4a84fc434 100644 --- a/server/services/memoryRetriever.js +++ b/server/services/memoryRetriever.js @@ -44,8 +44,10 @@ export async function getRelevantMemories(task, options = {}) { }); } - for (const result of searchResults.memories) { - const mem = await getMemory(result.id); + const fetchedMems = await Promise.all(searchResults.memories.map(r => getMemory(r.id))); + for (let i = 0; i < searchResults.memories.length; i++) { + const result = searchResults.memories[i]; + const mem = fetchedMems[i]; if (mem) { const tokens = estimateTokens(mem.content); if (tokenCount + tokens <= maxTokens) { @@ -69,10 +71,10 @@ export async function getRelevantMemories(task, options = {}) { limit: 5 }); - for (const pref of preferences.memories) { - if (memories.some(m => m.id === pref.id)) continue; - - const mem = await getMemory(pref.id); + const prefIds = preferences.memories.filter(p => !memories.some(m => m.id === p.id)).map(p => p.id); + const prefMems = await Promise.all(prefIds.map(id => getMemory(id))); + for (let i = 0; i < prefIds.length; i++) { + const mem = prefMems[i]; if (mem) { const tokens = estimateTokens(mem.content); if (tokenCount + tokens <= maxTokens) { @@ -97,10 +99,10 @@ export async function getRelevantMemories(task, options = {}) { limit: 5 }); - for (const appMem of appMemories.memories) { - if (memories.some(m => m.id === appMem.id)) continue; - - const mem = await getMemory(appMem.id); + const appIds = appMemories.memories.filter(a => !memories.some(m => m.id === a.id)).map(a => a.id); + const appMems = await Promise.all(appIds.map(id => getMemory(id))); + for (let i = 0; i < appIds.length; i++) { + const mem = appMems[i]; if (mem) { const tokens = estimateTokens(mem.content); if (tokenCount + tokens <= maxTokens) {