Skip to content

Commit 697c08b

Browse files
amingclawdevclaude
andcommitted
fix(pipeline): P0 — guarantee _finishPipeline in all paths + wire up _selfHealAndRetry
Bug #1: Pipeline never finishes (running stays true forever) - Wrap main _runPipeline body in try/finally safety net - Replace manual `running = false` with _finishPipeline() calls in: - early returns (no queries, no AI provider) - consecutive error abort - startPipeline .catch() handler - stopPipeline() - Make _finishPipeline idempotent to prevent double history-save Bug #2: _selfHealAndRetry was dead code, never called - Wire up at search script failure point (scriptResult.success=false) - Wire up at low-result anomaly (<3 results, possible Cloudflare block) - Add max 2 attempts per source to prevent infinite loops - Capture browser screenshot for AI diagnosis when possible - Log clearly when self-heal is triggered/succeeds/fails Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent bf1b7f7 commit 697c08b

1 file changed

Lines changed: 100 additions & 29 deletions

File tree

assets/agents/job-seek/lib/searchPipeline.js

Lines changed: 100 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ function startPipeline(sessionId, config, direction, profile) {
466466
_pageOffsets: { ...(history.pageOffsets || {}) }, // restore page offsets per query key
467467
_sourceQualified: {}, // source → number of qualified jobs (persisted across rounds)
468468
_sourceResultCount: {}, // source → number of results fetched (persisted across rounds)
469+
_selfHealAttempts: {}, // source → number of self-heal attempts (max 2 per source)
469470
stoppedAt: null
470471
};
471472

@@ -478,14 +479,14 @@ function startPipeline(sessionId, config, direction, profile) {
478479

479480
_pipelines.set(sessionId, pipeline);
480481

481-
// Run asynchronously
482+
// Run asynchronously — _finishPipeline is guaranteed via try/finally inside _runPipeline,
483+
// but add a safety catch here in case something goes wrong before the try block.
482484
_runPipeline(sessionId).catch(err => {
483485
console.error(`[searchPipeline] Error in pipeline ${sessionId}:`, err.message);
484486
const p = _pipelines.get(sessionId);
485487
if (p) {
486-
p.running = false;
487-
p.progress.phase = 'error';
488488
p.progress.errors.push(err.message);
489+
_finishPipeline(sessionId, 'error');
489490
}
490491
});
491492

@@ -537,23 +538,24 @@ async function _runPipeline(sessionId) {
537538
_log(`Queries: ${queries.map(q => `[${q.source}] "${q.query}" @ ${q.location || 'remote'}${q.pageOffset ? ` (page ${q.pageOffset})` : ''}`).join(' | ')}`);
538539

539540
if (queries.length === 0) {
540-
pipeline.running = false;
541-
pipeline.progress.phase = 'error';
542541
pipeline.progress.errors.push('No job title set — cannot search');
543542
_log('ERROR: No job title set — cannot search');
543+
_finishPipeline(sessionId, 'error');
544544
return;
545545
}
546546

547547
// AI pre-check: algorithm fallback removed, AI is required
548548
if (!config.aiMatcher && !config.aiInvoke) {
549-
pipeline.running = false;
550-
pipeline.progress.phase = 'error';
551549
pipeline.progress.errors.push('No AI provider configured — AI matching is required');
552550
_log('ERROR: No AI provider — cannot start (algorithm fallback removed)');
551+
_finishPipeline(sessionId, 'error');
553552
return;
554553
}
555554

556555
// ── Search + Match (merged): process each job inline as it's found ──
556+
// Wrap in try/finally to guarantee _finishPipeline is called even on unexpected errors
557+
try {
558+
557559
pipeline.progress.phase = 'searching';
558560
let _totalFetched = 0; // total listings fetched across all sources (replaces allListings.length)
559561
if (previouslySeen.size > 0) {
@@ -1000,9 +1002,8 @@ async function _runPipeline(sessionId) {
10001002
// Abort pipeline if too many consecutive errors (e.g. browser died, AI provider down)
10011003
if (_consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) {
10021004
_log(`ABORT: ${MAX_CONSECUTIVE_ERRORS} consecutive errors — stopping pipeline`);
1003-
pipeline.running = false;
1004-
pipeline.progress.phase = 'error';
10051005
pipeline.progress.errors.push(`Pipeline aborted: ${MAX_CONSECUTIVE_ERRORS} consecutive failures`);
1006+
_finishPipeline(sessionId, 'error');
10061007
}
10071008
}
10081009
}
@@ -1068,31 +1069,84 @@ async function _runPipeline(sessionId) {
10681069
}));
10691070
_log(`[${q.source}] Platform tool returned ${listings.length} results`);
10701071

1071-
// ── Low/zero result anomaly: mark cell error so user can rebuild manually ──
1072+
// ── Low/zero result anomaly: attempt self-heal or mark for rebuild ──
10721073
const LOW_RESULT_THRESHOLD = 3;
10731074
if (listings.length < LOW_RESULT_THRESHOLD) {
1074-
_log(`⚠ [${q.source}] Suspiciously low results (${listings.length}) — marking for rebuild`);
1075-
dashboardServer.updatePlatformCell(sessionId, platformTool.id, {
1076-
cell: 'search', status: 'error',
1077-
message: `Only ${listings.length} result(s) for "${q.query}" — please Rebuild search tool`
1078-
});
1075+
_log(`⚠ [${q.source}] Suspiciously low results (${listings.length}) — attempting self-heal`);
1076+
1077+
// Try self-heal for low-result anomaly (may indicate Cloudflare block)
1078+
if (config.aiInvoke && !(pipeline._selfHealAttempts?.[q.source] >= 2)) {
1079+
pipeline._selfHealAttempts = pipeline._selfHealAttempts || {};
1080+
pipeline._selfHealAttempts[q.source] = (pipeline._selfHealAttempts[q.source] || 0) + 1;
1081+
const anomalyMsg = `Search returned only ${listings.length} result(s) — possible Cloudflare block or broken selector`;
1082+
const healedListings = await _selfHealAndRetry(
1083+
sessionId, platformTool, q, config, anomalyMsg, null, _log
1084+
);
1085+
if (healedListings && healedListings.length > listings.length) {
1086+
_log(`✓ [${q.source}] Self-heal improved results: ${listings.length}${healedListings.length}`);
1087+
listings = healedListings;
1088+
}
1089+
}
1090+
1091+
// Still low after heal attempt — mark for manual rebuild
1092+
if (listings.length < LOW_RESULT_THRESHOLD) {
1093+
dashboardServer.updatePlatformCell(sessionId, platformTool.id, {
1094+
cell: 'search', status: 'error',
1095+
message: `Only ${listings.length} result(s) for "${q.query}" — please Rebuild search tool`
1096+
});
1097+
}
10791098
}
10801099
} else {
10811100
const errMsg = scriptResult.error || 'unknown error';
10821101
_log(`✗ [${q.source}] Platform tool failed: ${errMsg}`);
1083-
_failedSources.add(q.source);
1084-
dashboardServer.updatePlatformCell(sessionId, platformTool.id, {
1085-
cell: 'search', status: 'error',
1086-
message: `Search failed: ${errMsg} — please Rebuild search tool`
1087-
});
1088-
dashboardServer.updatePipelineProgress(sessionId, {
1089-
phase: 'taskFailed',
1090-
title: `${platformTool.name} — Search failed`,
1091-
company: '', platform: q.source, failPhase: 'search',
1092-
error: errMsg,
1093-
at: new Date().toISOString(), currentJob: null
1094-
});
1095-
pipeline.progress.errors.push(`[${q.source}] Search tool failed: ${errMsg}`);
1102+
1103+
// ── Self-heal: attempt AI-driven script repair + retry ──
1104+
let healed = false;
1105+
if (config.aiInvoke && !(pipeline._selfHealAttempts?.[q.source] >= 2)) {
1106+
pipeline._selfHealAttempts = pipeline._selfHealAttempts || {};
1107+
pipeline._selfHealAttempts[q.source] = (pipeline._selfHealAttempts[q.source] || 0) + 1;
1108+
_log(`🔧 [${q.source}] Self-heal attempt ${pipeline._selfHealAttempts[q.source]}/2...`);
1109+
1110+
// Try to get a screenshot for better AI diagnosis
1111+
let screenshot = null;
1112+
try {
1113+
const platform = getPlatformStore().getPlatform(sessionId, platformTool.id);
1114+
if (platform?._browserId) {
1115+
const toolClient = require('./core/toolServiceClient');
1116+
const ssResult = await toolClient.executeTool('page_screenshot', {
1117+
browserId: platform._browserId, pageIndex: platform._pageIndex || 0
1118+
});
1119+
screenshot = ssResult?.screenshot || ssResult?.result || null;
1120+
}
1121+
} catch (_) { /* screenshot is optional */ }
1122+
1123+
const healedListings = await _selfHealAndRetry(
1124+
sessionId, platformTool, q, config, errMsg, screenshot, _log
1125+
);
1126+
if (healedListings && healedListings.length > 0) {
1127+
healed = true;
1128+
listings = healedListings;
1129+
_log(`✓ [${q.source}] Self-heal succeeded: ${healedListings.length} results after repair`);
1130+
} else {
1131+
_log(`✗ [${q.source}] Self-heal failed — marking source as failed`);
1132+
}
1133+
}
1134+
1135+
if (!healed) {
1136+
_failedSources.add(q.source);
1137+
dashboardServer.updatePlatformCell(sessionId, platformTool.id, {
1138+
cell: 'search', status: 'error',
1139+
message: `Search failed: ${errMsg} — please Rebuild search tool`
1140+
});
1141+
dashboardServer.updatePipelineProgress(sessionId, {
1142+
phase: 'taskFailed',
1143+
title: `${platformTool.name} — Search failed`,
1144+
company: '', platform: q.source, failPhase: 'search',
1145+
error: errMsg,
1146+
at: new Date().toISOString(), currentJob: null
1147+
});
1148+
pipeline.progress.errors.push(`[${q.source}] Search tool failed: ${errMsg}`);
1149+
}
10961150
}
10971151
} else {
10981152
_log(`[${q.source}] No platform tool available — skipped`);
@@ -1231,11 +1285,24 @@ async function _runPipeline(sessionId) {
12311285
const finalSummary = Object.entries(_sourceQualified).map(([s, n]) => `${s}: ${n}/${config.targetCount}`).join(', ');
12321286
_log(`Final (${pipeline._searchRound} round${pipeline._searchRound > 1 ? 's' : ''}): ${p.qualified} qualified — ${finalSummary || 'none'}`);
12331287
_finishPipeline(sessionId, allMet ? 'completed' : 'done');
1288+
1289+
} finally {
1290+
// Safety net: if pipeline is still marked as running (e.g. uncaught exception skipped
1291+
// all _finishPipeline calls), force-finish to prevent the workflow step from polling forever.
1292+
if (pipeline.running) {
1293+
console.error(`[searchPipeline] Safety net: pipeline ${sessionId} still running after _runPipeline — force finishing`);
1294+
_finishPipeline(sessionId, 'error');
1295+
}
1296+
}
12341297
}
12351298

12361299
function _finishPipeline(sessionId, reason) {
12371300
const pipeline = _pipelines.get(sessionId);
12381301
if (!pipeline) return;
1302+
1303+
// Idempotent: skip if already finished (prevents double history-save)
1304+
if (!pipeline.running && pipeline.stoppedAt) return;
1305+
12391306
pipeline.running = false;
12401307
pipeline.progress.phase = reason;
12411308
pipeline.stoppedAt = new Date().toISOString();
@@ -1264,7 +1331,11 @@ function _finishPipeline(sessionId, reason) {
12641331
function stopPipeline(sessionId) {
12651332
const pipeline = _pipelines.get(sessionId);
12661333
if (!pipeline) return { error: 'No pipeline found' };
1267-
pipeline.running = false;
1334+
// Mark as not running — the _runPipeline loop will detect this and call _finishPipeline('stopped').
1335+
// Also call _finishPipeline here as a safety net in case the loop already exited.
1336+
if (pipeline.running) {
1337+
_finishPipeline(sessionId, 'stopped');
1338+
}
12681339
return { stopped: true };
12691340
}
12701341

0 commit comments

Comments
 (0)