From b2ddf7d41b8399647d6a2abfbbfcc4bf3e0b7d3d Mon Sep 17 00:00:00 2001 From: Daniel Dyer Date: Sun, 29 Mar 2026 20:32:52 -0700 Subject: [PATCH] Harden agent/session observation lifecycles with scoped cleanup --- lib/application/agent-interpreter-provider.ts | 29 ++++++++++++++----- .../local-runtime-scenario-runner.ts | 4 +-- lib/composition/local-services.ts | 4 +-- .../observation/playwright-screen-observer.ts | 29 ++++++++++++++++--- 4 files changed, 50 insertions(+), 16 deletions(-) diff --git a/lib/application/agent-interpreter-provider.ts b/lib/application/agent-interpreter-provider.ts index dad4e5fe..3573b496 100644 --- a/lib/application/agent-interpreter-provider.ts +++ b/lib/application/agent-interpreter-provider.ts @@ -22,7 +22,7 @@ * - `disabled` when no agent capability is available (ci-batch profile) */ -import { Effect, Duration } from 'effect'; +import { Effect, Duration, Fiber } from 'effect'; import type { ResolutionTarget, ResolutionProposalDraft } from '../domain/types'; import type { StepAction } from '../domain/types'; import type { ScreenId, ElementId, PostureId, SnapshotTemplateId } from '../domain/identity'; @@ -609,13 +609,13 @@ export function withAgentTimeout( const budgetMs = options?.budgetMs ?? DEFAULT_AGENT_TIMEOUT_MS; const providerId = options?.provider ?? 'agent-timeout-wrapper'; - return (request) => Effect.runPromise(withAgentTimeoutEffect( + return (request) => Effect.runPromise(Effect.scoped(withAgentTimeoutEffect( Effect.tryPromise({ try: () => interpret(request), catch: (err) => err instanceof Error ? err : new Error(String(err)), }), { budgetMs, provider: providerId }, - )); + ))); } export function withAgentTimeoutEffect( @@ -624,11 +624,24 @@ export function withAgentTimeoutEffect( ): Effect.Effect { const budgetMs = options?.budgetMs ?? DEFAULT_AGENT_TIMEOUT_MS; const providerId = options?.provider ?? 'agent-timeout-wrapper'; - return interpretEffect.pipe( - Effect.timeout(Duration.millis(budgetMs)), - Effect.map((result) => result ?? timeoutFallbackResult(providerId, budgetMs)), - Effect.catchAll(() => Effect.succeed(timeoutFallbackResult(providerId, budgetMs))), - ); + return Effect.scoped(Effect.gen(function* () { + const interpretationFiber = yield* Effect.forkScoped( + interpretEffect.pipe( + Effect.map((result) => ({ kind: 'result' as const, result })), + ), + ); + + const outcome = yield* Effect.raceFirst( + Fiber.join(interpretationFiber), + Effect.sleep(Duration.millis(budgetMs)).pipe(Effect.as({ kind: 'timeout' as const })), + ).pipe( + Effect.catchAll(() => Effect.succeed({ kind: 'timeout' as const })), + ); + + return outcome.kind === 'result' + ? outcome.result + : timeoutFallbackResult(providerId, budgetMs); + })); } /** diff --git a/lib/composition/local-runtime-scenario-runner.ts b/lib/composition/local-runtime-scenario-runner.ts index 34cd7deb..e9441ee1 100644 --- a/lib/composition/local-runtime-scenario-runner.ts +++ b/lib/composition/local-runtime-scenario-runner.ts @@ -85,7 +85,7 @@ function buildDefaultTranslator( function buildRunnerWithInterpreter(interpreterOverride?: AgentInterpreterProvider | undefined): RuntimeScenarioRunnerPort { return { runSteps(input) { - return Effect.gen(function* () { + return Effect.scoped(Effect.gen(function* () { const paths = createProjectPaths(input.rootDir, input.suiteRoot); const translationDisabled = Boolean(input.translationOptions?.disableTranslation); const cacheDisabled = Boolean(input.translationOptions?.disableTranslationCache); @@ -126,7 +126,7 @@ function buildRunnerWithInterpreter(interpreterOverride?: AgentInterpreterProvid ); return [...results]; - }); + })); }, }; } diff --git a/lib/composition/local-services.ts b/lib/composition/local-services.ts index 67e8659a..30817ef7 100644 --- a/lib/composition/local-services.ts +++ b/lib/composition/local-services.ts @@ -121,10 +121,10 @@ export function createLocalServiceContext(rootDir: string, options?: LocalServic posture, writeJournal: () => executionContext.writeJournal(), provide(program: Effect.Effect): Effect.Effect { - return Effect.provide( + return Effect.scoped(Effect.provide( program as Effect.Effect, layer, - ) as Effect.Effect; + )) as Effect.Effect; }, }; } diff --git a/lib/infrastructure/observation/playwright-screen-observer.ts b/lib/infrastructure/observation/playwright-screen-observer.ts index 213c7085..f345a1cf 100644 --- a/lib/infrastructure/observation/playwright-screen-observer.ts +++ b/lib/infrastructure/observation/playwright-screen-observer.ts @@ -104,9 +104,30 @@ async function observeScreen( * Reuses existing locator resolution, ARIA capture, and strategy matching. */ export function createPlaywrightScreenObserver(page: Page): ScreenObservationPort { return { - observe: (input) => Effect.tryPromise({ - try: () => observeScreen(page, input), - catch: (error) => new TesseractError('screen-observation-failed', `Screen observation failed: ${error}`, error), - }).pipe(Effect.withSpan('playwright-screen-observation')), + observe: (input) => Effect.scoped(Effect.gen(function* () { + const pageState = { closed: false }; + + const onClose = () => { + pageState.closed = true; + }; + + yield* Effect.acquireRelease( + Effect.sync(() => { + page.on('close', onClose); + }), + () => Effect.sync(() => { + page.off('close', onClose); + }), + ); + + if (pageState.closed) { + return yield* Effect.fail(new TesseractError('screen-observation-failed', 'Screen observation failed: page already closed.')); + } + + return yield* Effect.tryPromise({ + try: () => observeScreen(page, input), + catch: (error) => new TesseractError('screen-observation-failed', `Screen observation failed: ${error}`, error), + }); + })).pipe(Effect.withSpan('playwright-screen-observation')), }; }