Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/lib/errorHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,5 +220,9 @@ export function setupProcessErrorHandlers(io) {
if (io) {
emitErrorEvent(io, serverError);
}

// Process is in undefined state after uncaught exception — must exit.
// Use a short delay to allow the socket event to flush before exiting.
setTimeout(() => process.exit(1), 100);
});
}
9 changes: 8 additions & 1 deletion server/services/agentActionExecutor.js
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,11 @@ async function executeMoltworldInteract(client, account, params) {
* Listens to scheduler events and executes actions
*/
export function init() {
scheduleEvents.on('execute', async ({ scheduleId, schedule, timestamp }) => {
// Prevent duplicate listeners; safe to re-register after listeners are removed
if (scheduleEvents.listenerCount('execute') > 0) return;

scheduleEvents.on('execute', ({ scheduleId, schedule, timestamp }) => {
(async () => {
Comment on lines +572 to +576
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplicate-listener guard uses scheduleEvents.listenerCount('execute') > 0, but scheduleEvents has other legitimate listeners (e.g. server/services/socket.js listens to 'execute' to broadcast). If that listener is registered before this init runs, this function will return early and scheduled actions will never execute. Prefer a module-local initialized flag, or track/remove only this module’s specific handler (e.g. keep a reference and check listeners('execute').includes(handler)).

Copilot uses AI. Check for mistakes.
console.log(`⚡ Executing scheduled action: ${schedule.action.type} (${scheduleId})`);

// Get account with full credentials
Expand Down Expand Up @@ -657,6 +661,9 @@ export function init() {
timestamp: new Date().toISOString(),
durationMs: Date.now() - startTime
});
})().catch(err => {
console.error(`❌ Unhandled error in execute listener for schedule ${scheduleId}: ${err.message}`);
});
});

console.log('⚡ Agent action executor initialized');
Expand Down
8 changes: 6 additions & 2 deletions server/services/agentManagement.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,19 @@ export async function terminateAgent(agentId) {
// Kill the process
agent.process.kill('SIGTERM');

// Give it a moment, then force kill if needed
setTimeout(() => {
// Give it a moment, then force kill if still running
const killTimer = setTimeout(() => {
if (activeAgents.has(agentId)) {
agent.process.kill('SIGKILL');
unregisterSpawnedAgent(agent.pid);
activeAgents.delete(agentId);
}
}, 5000);

// Store the timer so the close handler can clear it when the process exits cleanly
const agentEntry = activeAgents.get(agentId);
if (agentEntry) agentEntry.killTimer = killTimer;

Comment on lines +106 to +109
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

killTimer is stored on the agent entry but is never cleared anywhere (no .killTimer references in the repo). If the process exits quickly and the close handler is still doing async cleanup, this timer can still fire and run the SIGKILL/unregister/delete path unnecessarily. Either clear the timeout immediately when termination succeeds (e.g. in the process close handler in agentCliSpawning.js), or don’t store it here and instead clear it at the start of close handling.

Suggested change
// Store the timer so the close handler can clear it when the process exits cleanly
const agentEntry = activeAgents.get(agentId);
if (agentEntry) agentEntry.killTimer = killTimer;
// Store the timer so it can be cleared when the process exits cleanly
const agentEntry = activeAgents.get(agentId);
if (agentEntry) {
agentEntry.killTimer = killTimer;
}
const clearKillTimer = () => {
const currentAgentEntry = activeAgents.get(agentId);
if (currentAgentEntry?.killTimer) {
clearTimeout(currentAgentEntry.killTimer);
delete currentAgentEntry.killTimer;
return;
}
clearTimeout(killTimer);
};
agent.process.once('close', clearKillTimer);
agent.process.once('exit', clearKillTimer);

Copilot uses AI. Check for mistakes.
return { success: true, agentId };
}

Expand Down
65 changes: 20 additions & 45 deletions server/services/cosRunnerClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ const COS_RUNNER_URL = process.env.COS_RUNNER_URL || 'http://localhost:5558';

// Socket.IO client for real-time events
let socket = null;
let eventHandlers = new Map();
// Map of event name -> array of handlers (supports multiple listeners per event)
const eventHandlers = new Map();

/**
* Initialize connection to CoS Runner
Expand All @@ -26,72 +27,46 @@ export function initCosRunnerConnection() {
reconnectionDelay: 1000
});

const dispatch = (event, data) => {
const handlers = eventHandlers.get(event);
if (handlers) handlers.forEach(h => h(data));
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispatch calls each registered handler directly. If any handler throws/rejects synchronously, the exception will bubble out of the Socket.IO event callback and can crash the process or prevent other handlers from running. Wrap each handler invocation in a try/catch (and optionally schedule async handlers with Promise.resolve().catch(...)) so one bad listener can’t take down the client.

Suggested change
if (handlers) handlers.forEach(h => h(data));
if (!handlers) return;
handlers.forEach((h) => {
try {
Promise.resolve(h(data)).catch((err) => {
console.error(`CoS Runner handler rejected for event "${event}":`, err);
});
} catch (err) {
console.error(`CoS Runner handler failed for event "${event}":`, err);
}
});

Copilot uses AI. Check for mistakes.
};

socket.on('connect', () => {
console.log('🔌 Connected to CoS Runner');
// Emit reconnect event so services can sync their state
const handler = eventHandlers.get('connection:ready');
if (handler) handler();
dispatch('connection:ready', undefined);
});

socket.on('disconnect', () => {
console.log('🔌 Disconnected from CoS Runner');
const handler = eventHandlers.get('connection:lost');
if (handler) handler();
dispatch('connection:lost', undefined);
});

socket.on('connect_error', (err) => {
console.error(`🔌 CoS Runner connection error: ${err.message}`);
});

// Forward events to registered handlers
socket.on('agent:output', (data) => {
const handler = eventHandlers.get('agent:output');
if (handler) handler(data);
});

socket.on('agent:completed', (data) => {
const handler = eventHandlers.get('agent:completed');
if (handler) handler(data);
});

socket.on('agent:error', (data) => {
const handler = eventHandlers.get('agent:error');
if (handler) handler(data);
});

socket.on('agent:btw', (data) => {
const handler = eventHandlers.get('agent:btw');
if (handler) handler(data);
});
socket.on('agent:output', (data) => dispatch('agent:output', data));
socket.on('agent:completed', (data) => dispatch('agent:completed', data));
socket.on('agent:error', (data) => dispatch('agent:error', data));
socket.on('agent:btw', (data) => dispatch('agent:btw', data));

// Batch orphaned agents event (startup cleanup)
socket.on('agents:orphaned', (data) => {
const handler = eventHandlers.get('agents:orphaned');
if (handler) handler(data);
});
socket.on('agents:orphaned', (data) => dispatch('agents:orphaned', data));

// Forward devtools run events to registered handlers
socket.on('run:data', (data) => {
const handler = eventHandlers.get('run:data');
if (handler) handler(data);
});

socket.on('run:complete', (data) => {
const handler = eventHandlers.get('run:complete');
if (handler) handler(data);
});

socket.on('run:error', (data) => {
const handler = eventHandlers.get('run:error');
if (handler) handler(data);
});
socket.on('run:data', (data) => dispatch('run:data', data));
socket.on('run:complete', (data) => dispatch('run:complete', data));
socket.on('run:error', (data) => dispatch('run:error', data));
}

/**
* Register event handler
* Register event handler (multiple handlers per event are supported)
*/
export function onCosRunnerEvent(event, handler) {
eventHandlers.set(event, handler);
if (!eventHandlers.has(event)) eventHandlers.set(event, []);
eventHandlers.get(event).push(handler);
}
Comment on lines +65 to 70
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change alters onCosRunnerEvent from single-handler-per-event to multi-handler-per-event, but the existing test suite doesn’t assert the new behavior (e.g. that multiple handlers for the same event all run, and that a throwing handler doesn’t prevent others if you add per-handler isolation). Adding targeted tests here would help prevent regressions in event fan-out semantics.

Copilot uses AI. Check for mistakes.

/**
Expand Down
8 changes: 4 additions & 4 deletions server/services/dataManager.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { readdir, stat, rm, mkdir, writeFile as fsWriteFile } from 'fs/promises';
import { join, relative } from 'path';
import { join, relative, resolve } from 'path';
import { existsSync } from 'fs';
import { execFile } from 'child_process';
import { promisify } from 'util';
Expand Down Expand Up @@ -169,9 +169,9 @@ export async function purgeCategory(categoryKey, options = {}) {
if (!existsSync(dirPath)) throw new Error(`Category directory not found: ${categoryKey}`);

if (options.subPath) {
if (!SAFE_NAME.test(options.subPath.replace(/\//g, ''))) throw new Error('Invalid subpath');
const targetPath = join(dirPath, options.subPath);
if (!targetPath.startsWith(dirPath)) throw new Error('Path traversal not allowed');
const resolvedTarget = resolve(join(dirPath, options.subPath));
if (!resolvedTarget.startsWith(resolve(dirPath))) throw new Error('Invalid subPath');
const targetPath = resolvedTarget;
Comment on lines +172 to +174
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path containment check is vulnerable because startsWith(resolve(dirPath)) can be bypassed by sibling paths with the same prefix (e.g. /data/cosmic startsWith /data/cos). Also subPath: '.' resolves to the category root and would delete the entire category directory. Use a boundary-safe containment check (e.g. const rel = relative(dirPath, resolvedTarget) and reject if rel.startsWith('..') || path.isAbsolute(rel)), and explicitly reject resolvedTarget === resolve(dirPath) if the intent is to only allow deleting children.

Copilot uses AI. Check for mistakes.
await rm(targetPath, { recursive: true, force: true });
} else {
const entries = await readdir(dirPath).catch(() => []);
Expand Down
4 changes: 2 additions & 2 deletions server/services/dataSync.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ async function applyGoalsRemote(remoteData) {

// Merge top-level metadata (birthDate, lifeExpectancy, timeHorizons) via LWW
// Use the most recent goal's updatedAt as proxy for file freshness
const localMaxTs = Math.max(0, ...(local.goals || []).map(g => new Date(g.updatedAt || 0).getTime()));
const remoteMaxTs = Math.max(0, ...(remoteData.goals || []).map(g => new Date(g.updatedAt || 0).getTime()));
const localMaxTs = (local.goals || []).reduce((max, g) => Math.max(max, new Date(g.updatedAt || 0).getTime()), 0);
const remoteMaxTs = (remoteData.goals || []).reduce((max, g) => Math.max(max, new Date(g.updatedAt || 0).getTime()), 0);
const metaSource = remoteMaxTs > localMaxTs ? remoteData : local;

const merged = {
Expand Down
8 changes: 6 additions & 2 deletions server/services/feeds.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,14 @@ const parseFeed = (xml) => {

export async function getFeeds() {
const data = await store.load();
// Attach unread counts
// Pre-compute unread counts in one pass: O(I) instead of O(F×I)
const unreadCounts = new Map();
for (const item of data.items) {
if (!item.read) unreadCounts.set(item.feedId, (unreadCounts.get(item.feedId) || 0) + 1);
}
return data.feeds.map(feed => ({
...feed,
unreadCount: data.items.filter(i => i.feedId === feed.id && !i.read).length
unreadCount: unreadCounts.get(feed.id) || 0
}));
}

Expand Down
11 changes: 2 additions & 9 deletions server/services/memoryEmbeddings.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,11 @@ export function setEmbeddingConfig(config) {
export async function checkAvailability() {
await initConfig();
const config = getConfig();
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 5000);

const response = await fetch(`${config.embeddingEndpoint.replace('/v1/embeddings', '/v1/models')}`, {
method: 'GET',
signal: controller.signal
}).catch(err => {
clearTimeout(timeout);
return { ok: false, _err: err.message };
});

clearTimeout(timeout);
signal: AbortSignal.timeout(5000)
}).catch(err => ({ ok: false, _err: err.message }));

if (!response.ok) {
return { available: false, error: response._err || `LM Studio returned ${response.status}`, endpoint: config.embeddingEndpoint };
Expand Down
22 changes: 12 additions & 10 deletions server/services/memoryRetriever.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ export async function getRelevantMemories(task, options = {}) {
});
}

for (const result of searchResults.memories) {
const mem = await getMemory(result.id);
const fetchedMems = await Promise.all(searchResults.memories.map(r => getMemory(r.id)));
for (let i = 0; i < searchResults.memories.length; i++) {
const result = searchResults.memories[i];
const mem = fetchedMems[i];
if (mem) {
const tokens = estimateTokens(mem.content);
if (tokenCount + tokens <= maxTokens) {
Expand All @@ -69,10 +71,10 @@ export async function getRelevantMemories(task, options = {}) {
limit: 5
});

for (const pref of preferences.memories) {
if (memories.some(m => m.id === pref.id)) continue;

const mem = await getMemory(pref.id);
const prefIds = preferences.memories.filter(p => !memories.some(m => m.id === p.id)).map(p => p.id);
const prefMems = await Promise.all(prefIds.map(id => getMemory(id)));
for (let i = 0; i < prefIds.length; i++) {
const mem = prefMems[i];
if (mem) {
const tokens = estimateTokens(mem.content);
if (tokenCount + tokens <= maxTokens) {
Expand All @@ -97,10 +99,10 @@ export async function getRelevantMemories(task, options = {}) {
limit: 5
});

for (const appMem of appMemories.memories) {
if (memories.some(m => m.id === appMem.id)) continue;

const mem = await getMemory(appMem.id);
const appIds = appMemories.memories.filter(a => !memories.some(m => m.id === a.id)).map(a => a.id);
const appMems = await Promise.all(appIds.map(id => getMemory(id)));
for (let i = 0; i < appIds.length; i++) {
const mem = appMems[i];
if (mem) {
const tokens = estimateTokens(mem.content);
if (tokenCount + tokens <= maxTokens) {
Expand Down