Skip to content

fix(workflow-executor): operation activity log targets the acted record (PRD-442 #1)#1628

Open
Scra3 wants to merge 18 commits into
feat/prd-214-server-step-mapperfrom
fix/prd-442-activity-log-target
Open

fix(workflow-executor): operation activity log targets the acted record (PRD-442 #1)#1628
Scra3 wants to merge 18 commits into
feat/prd-214-server-step-mapperfrom
fix/prd-442-activity-log-target

Conversation

@Scra3
Copy link
Copy Markdown
Member

@Scra3 Scra3 commented Jun 4, 2026

Context

fixes PRD-442

A workflow step's operation activity-log entry (audit trail) pointed at the run's trigger record, even when the step acted on a record loaded earlier in the run — possibly in another collection. Root cause: the audit args were built before doExecute() (so the acted record wasn't resolved yet) and used context.collectionId + baseRecordRef.

Changes

Audit moved into a dedicated AgentWithLog wrapper. Activity-log responsibility left BaseStepExecutor / OperationStepExecutor (deleted) and now lives in AgentWithLog, which wraps AgentPort and emits one audit entry per data-access / write call (pending → succeeded/failed). The target is derived from the call itself — recordId = query.id, collectionId resolved from the call's collection — so each entry points at the acted record and its own collection, and fires only when the operation actually happens. Covers read / update / trigger-action / load-related; the MCP step uses the generic logged() (its side effect is a tool invocation, audited against the run's base record).

Acted collection's numeric id. CollectionSchema gains a required collectionId, resolved from the live schema via the new SchemaCache.getOrLoad(name, loader) (lazy cache-or-fetch, decoupled from WorkflowPort). The audit is not optional, so a missing id fails loud at the boundary (DomainValidationError).

Fail-loud schema resolution. AgentClientAgentPort.resolveSchema now throws SchemaNotCachedError instead of returning a synthetic fallback schema (which had a wrong primaryKeyFields / collectionId). Unreachable in the normal flow — AgentWithLog resolves (and caches) the schema right before delegating — so it's a true invariant guard.

Idempotency unchanged, ordering preserved. Idempotency stays entirely in the executors. The executing write-ahead marker is saved in a beforeCall thunk that AgentWithLog runs after createPending, before the side effect — so an audit-creation failure never leaves an orphan executing marker, and AgentWithLog stays ignorant of run state.

Activity-log failures surface as user-facing step errors; markFailed carries a local-only diagnostic message (the server status endpoint accepts just { status }).

⚠️ Coordinated deploy (cross-repo)

The orchestrator must include collectionId in every getCollectionSchema response. Until it does, steps on that collection fail at schema parse — so this must ship in tandem with the orchestrator change.

Scope

Bug #1 (wrong target), plus the confirmation-flow premature / duplicate symptom of #2: logging at the operation point means a confirmation step logs nothing while awaiting input, nothing on reject, and exactly once on confirm (covered by tests). Remaining #2 work: exactly-once across network retries (orchestrator-side dedup of identical (runId, stepIndex) outcomes).

Verification

  • yarn workspace @forestadmin/workflow-executor build
  • 922 tests — incl. a cross-collection repro (trigger on customers, act on orders#99 → audit targets orders, not the trigger), per-executor target assertions, AgentWithLog unit tests (beforeCall ordering, createPending-throws, userMessage vs 'Unexpected error'), reject-emits-no-log, and getOrLoad hit/miss ✅
  • lint: 0 errors ✅

🤖 Generated with Claude Code

Note

Target the acted record in operation activity logs for workflow executor steps

  • Introduces AgentWithLog (agent-with-log.ts), a wrapper around AgentPort that emits per-operation activity log entries (pending → succeeded/failed) with collectionId resolved from the collection schema and recordId from the query target.
  • Removes the step-level buildActivityLogArgs()/runWithActivityLog() hooks from BaseStepExecutor; activity logging now happens at the individual data-access call level inside each executor.
  • Adds SchemaResolver (schema-resolver.ts) to lazily fetch and cache collection schemas per run, and extends CollectionSchemaSchema to require a non-empty collectionId field.
  • StepExecutorFactory.buildContext now constructs both SchemaResolver and AgentWithLog and injects them into the execution context, replacing the schemaCache and direct agentPort access previously used by executors.
  • markFailed on ActivityLogPort no longer accepts an errorMessage parameter; write operations pass a beforeCall thunk to persist idempotency state immediately before the side effect.
  • Risk: collectionId is now a required field on collection schemas — any schema missing it will fail Zod validation.

Changes since #1628 opened

  • Removed agentPort, workflowPort, and activityLogPort properties from ExecutionContext interface and removed the cached agent field from BaseStepExecutor class [a7f536a]
  • Updated all step executor implementations to access agent through this.context.agent instead of this.agent [a7f536a]
  • Updated test helpers and test cases to remove port properties from mock contexts and accept port overrides as separate parameters [a7f536a]
  • Documented that AgentWithLog.getActionFormInfo is an unaudited passthrough method for read-only form information [6efee0f]

Macroscope summarized ec949b6.

@linear-code
Copy link
Copy Markdown

linear-code Bot commented Jun 4, 2026

PRD-442

@qltysh
Copy link
Copy Markdown

qltysh Bot commented Jun 4, 2026

4 new issues

Tool Category Rule Count
qlty Structure Function with high complexity (count = 11): execute 2
qlty Duplication Found 21 lines of similar code in 2 locations (mass = 103) 1
qlty Structure Function with many returns (count = 5): execute 1

@qltysh
Copy link
Copy Markdown

qltysh Bot commented Jun 4, 2026

Qlty


Coverage Impact

Unable to calculate total coverage change because base branch coverage was not found.

Modified Files with Diff Coverage (12)

RatingFile% DiffUncovered Line #s
New Coverage rating: A
packages/workflow-executor/src/executors/step-executor-factory.ts100.0%
New Coverage rating: A
...ow-executor/src/executors/load-related-record-step-executor.ts100.0%
New Coverage rating: A
...s/workflow-executor/src/executors/read-record-step-executor.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/executors/record-step-executor.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/executors/base-step-executor.ts100.0%
New Coverage rating: A
...workflow-executor/src/executors/update-record-step-executor.ts100.0%
New Coverage rating: A
...-executor/src/executors/trigger-record-action-step-executor.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/errors.ts50.0%266
New Coverage rating: A
packages/workflow-executor/src/index.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/executors/mcp-step-executor.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/executors/agent-with-log.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/schema-resolver.ts100.0%
Total98.4%
🤖 Increase coverage with AI coding...
In the `fix/prd-442-activity-log-target` branch, add test coverage for this new code:

- `packages/workflow-executor/src/errors.ts` -- Line 266

🚦 See full report on Qlty Cloud »

🛟 Help
  • Diff Coverage: Coverage for added or modified lines of code (excludes deleted files). Learn more.

  • Total Coverage: Coverage for the whole repository, calculated as the sum of all File Coverage. Learn more.

  • File Coverage: Covered Lines divided by Covered Lines plus Missed Lines. (Excludes non-executable lines including blank lines and comments.)

    • Indirect Changes: Changes to File Coverage for files that were not modified in this PR. Learn more.

@Scra3 Scra3 force-pushed the fix/prd-442-activity-log-target branch from bdff84b to e8f778b Compare June 4, 2026 15:01
return (
cached ?? {
collectionName,
collectionId: collectionName,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not works

const cached = this.schemaCache.get(collectionName);

if (!cached) {
// eslint-disable-next-line no-console
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should never occur because schema is already loaded at this point

// `load` is injected so the cache stays decoupled from the orchestrator port.
async getOrLoad(
collectionName: string,
load: () => Promise<CollectionSchema>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to accept a load arg ? is it not always the same logic for getting the schema ?

This means you could access a cached value that was not loaded from the same loader as the one you provide

Copy link
Copy Markdown
Member Author

@Scra3 Scra3 Jun 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduce a schemaResolver to avoid to pass this ugly getOrLoad

} catch (err) {
const errorMessage =
err instanceof WorkflowExecutorError ? err.userMessage : 'Unexpected error';
void this.activityLogPort.markFailed(handle, errorMessage);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If beforeCall throws, we markFailed though the side effect never ran — the audit then shows a failed write on a record that was never touched. Intended, or should we only markFailed around run()?


private async resolveCollectionId(collectionName: string): Promise<string> {
const schema = await this.schemaCache.getOrLoad(collectionName, () =>
this.workflowPort.getCollectionSchema(collectionName, this.runId),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncovered (qlty): a test with a real SchemaCache miss would exercise the workflowPort loader path here.

Copy link
Copy Markdown
Member

@nbouliol nbouliol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code LGTM, needs manual testing

alban bertolini and others added 11 commits June 8, 2026 11:36
…rd (PRD-442 #1)

The operation audit-trail entry pointed at the run's trigger record, even
when the step acted on a record loaded earlier in the run — possibly in
another collection. Root cause: buildActivityLogArgs ran before doExecute
(so the acted record wasn't resolved yet) and used context.collectionId +
baseRecordRef.

Record executors now emit the entry at the point of the agent call, via a
new base helper withActivityLog(args, fn), targeting selectedRecordRef and
its collection's numeric id (read from the live schema). CollectionSchema
gains a required collectionId — the audit is not optional, so a missing id
fails loud at the boundary (DomainValidationError); the orchestrator must
include it in getCollectionSchema responses (coordinated deploy).

Scope: bug #1 (wrong target) only. As a consequence of logging at the
operation point, a confirmation step no longer logs before the write — the
remaining #2 work (exactly-once / no-log-on-reject, MCP step) is a follow-up.
The MCP executor still uses context.collectionId via the existing wrapper.

fixes PRD-442

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…logOperation

Move the activity-log machinery out of BaseStepExecutor into a new
OperationStepExecutor (Base -> Operation -> Record/Mcp). It owns the
`operation` descriptor, `logOperation(record, fn)` and the now-private
`withActivityLog` helper, so the base class no longer carries any audit
concern.

McpStepExecutor now extends OperationStepExecutor and logs its tool call
against the run base record via context.collectionId. RecordStepExecutor
overrides operationCollectionId to resolve the acted record's own
collection id.

Relocate the activity-log tests from base-step-executor.test.ts to a new
operation-step-executor.test.ts.

fixes PRD-442

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Tighten operation typing: CreateActivityLogArgs now uses the lib's
  ActivityLogAction/ActivityLogType (removes the `as ActivityLogAction`
  cast); extract OperationDescriptor (Pick) reused by the MCP executor.
  collectionId is now required on CreateActivityLogArgs.
- Drop the dead `errorMessage` param from ActivityLogPort.markFailed: the
  server status endpoint only accepts { status }, so the message went
  nowhere. The failure cause is still logged by BaseStepExecutor.
- Move the `executing` write-ahead marker inside the logOperation callback
  (after createPending, before the side effect) in update/trigger/mcp so an
  activity-log creation failure never leaves an orphan executing marker.
- Fix stale comment in base-step-executor + CLAUDE.md idempotency section.
- Reword the misleading MCP formatting-fallback log line.
- Tests: cross-collection activity-log target for update + trigger, log on
  the load-related awaiting-input branch, no-orphan-marker on createPending
  failure, and markFailed single-arg assertions.

fixes PRD-442

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Move the activity-log audit out of OperationStepExecutor into a dedicated
AgentWithLog collaborator that wraps AgentPort. Each data-access call now
emits one audit entry (pending -> success/failed); the action/type are
intrinsic to the method and the target (collectionId, recordId) is derived
from the call, so the per-executor `operation` descriptor and the
`operationCollectionId` hook are gone.

Idempotency stays in the executors. Write methods take a `beforeCall` thunk
run between createPending and the side effect, so the executor persists its
`executing` write-ahead marker in the correct order (createPending ->
executing -> write) without AgentWithLog knowing anything about idempotency.
MCP (whose side effect is a tool invocation, not an AgentPort call) uses the
generic `logged()` method.

Also restore the local-only `errorMessage` on markFailed: it never reaches
the server (status endpoint accepts only { status }) but is kept for the
adapter's diagnostic log line.

- delete OperationStepExecutor; RecordStepExecutor + McpStepExecutor extend
  BaseStepExecutor directly
- extract shared loadCollectionSchema helper (reused by getCollectionSchema
  and AgentWithLog collectionId resolution)
- getActionFormInfo / probe stay on the raw agentPort (intentionally not audited)
- new agent-with-log.test.ts (audit per method, beforeCall ordering,
  createPending-throws, userMessage vs 'Unexpected error'); relocate the
  audit suite out of the deleted operation-step-executor.test.ts

fixes PRD-442

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- AgentWithLog: replace LoggedOperation with AuditTarget
  (Omit<CreateActivityLogArgs,'renderingId'>), reused by logged() and
  audit() so the audit-arg shape stays anchored to the port type.
- CLAUDE.md: update the idempotency section to reference AgentWithLog and
  the beforeCall thunk (the deleted OperationStepExecutor.logOperation).
- tests: assert stepOutcome.error (audit-creation failure message) on the
  no-orphan-marker case; pin getSingleRelatedData (xToOne) audit action.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
resolveSchema fabricated a CollectionSchema on cache miss, including a
buggy `collectionId: cached.collectionId` (cached is undefined in that
branch). The fallback is unreachable in the normal flow — AgentWithLog
resolves (and caches, via the shared SchemaCache) the schema right before
delegating to the agent port — so throw a SchemaNotCachedError instead of
returning a fabricated schema with a wrong primaryKeyFields/collectionId.

- new SchemaNotCachedError (domain error)
- agent-client-agent-port test: fallback case becomes a throw assertion

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Move the cache-or-fetch logic onto SchemaCache as getOrLoad(name, load):
the cache owns "check cache, else load via the injected thunk, then store",
while staying decoupled from WorkflowPort (the loader closure captures both
the port and the per-step runId). Removes the free loadCollectionSchema
helper; RecordStepExecutor.getCollectionSchema and AgentWithLog.resolveCollectionId
both go through getOrLoad.

Also pin the confirmation-flow audit behavior (PRD-442 #2 premature/duplicate
symptom, fixed as a consequence of logging at the operation point): a rejected
step emits no activity-log entry. Plus direct getOrLoad unit tests.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…Resolver

Both getOrLoad callers rewrote the same loader closure, only because the
process-wide SchemaCache singleton can't hold the per-run runId that
getCollectionSchema needs. Bind (schemaCache, workflowPort, runId) once in a
SchemaResolver instead, and make SchemaCache a pure store again. The resolver
feeds the same shared cache AgentClientAgentPort reads from, preserving the
SchemaNotCachedError invariant. AgentWithLogDeps drops from 6 to 4 fields.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The step error is already logged/surfaced by base-step-executor when rethrown,
so threading it into markFailed only enriched a rare double-fault diagnostic.
Drop the param: markFailed(handle) is now symmetric with markSucceeded, and
AgentWithLog no longer computes a userMessage/'Unexpected error' string.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
When beforeCall (the write-ahead marker save) throws, the datasource write
must not run and the pending entry is resolved to failed — never left
orphan-pending. Documents the intended behavior raised in review.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
After the SchemaResolver extraction no executor reads context.schemaCache —
all schema access goes through context.schemaResolver. Exposing the raw cache
next to the resolver advertised a second, unsafe path (a direct .get() bypasses
the resolver's read-through fetch, re-opening the SchemaNotCachedError footgun).
The factory keeps cfg.schemaCache to wire the resolver and the agent-port.

Also soften the SchemaResolver header comment (the guard is unreachable under
normal TTLs, not absolutely) and note markFailed must be called on a rethrow path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@Scra3 Scra3 force-pushed the fix/prd-442-activity-log-target branch from 8bb1ba6 to b69167d Compare June 8, 2026 09:42
workflowPort: cfg.workflowPort,
runStore: cfg.runStore,
schemaCache: cfg.schemaCache,
schemaResolver: new SchemaResolver(cfg.schemaCache, cfg.workflowPort, step.runId),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schemaCache is only used by schema resolver. No need to inject schema cache

alban bertolini and others added 6 commits June 8, 2026 11:44
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…Port

AgentWithLog is now constructed in step-executor-factory and injected via
ExecutionContext (symmetric with schemaResolver), so base-step-executor just
reads context.agent. getActionFormInfo moves onto AgentWithLog as an unaudited
passthrough, so executors never reach for the raw AgentPort.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… into fix/prd-442-activity-log-target

# Conflicts:
#	packages/workflow-executor/src/adapters/agent-client-agent-port.ts
#	packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts
#	packages/workflow-executor/test/executors/read-record-step-executor.test.ts
agentPort, activityLogPort and workflowPort had no production reader on the
context: AgentWithLog and SchemaResolver hold their own injected copies, built
by the factory. Executors reach data only through context.agent and schemas
through context.schemaResolver. Test makeContext helpers take the raw ports as
helper inputs to build those, without exposing them on the context.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ort mention

Address review nits on the AgentWithLog encapsulation:
- comment getActionFormInfo as the lone unaudited passthrough (read-only probe),
  so its difference from the audited data-access methods is visible at a glance
- fix executeOnExecutor JSDoc that still said "via agentPort" (now context.agent)
- assert markFailed is not called in the getActionFormInfo passthrough test

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

constructor(context: ExecutionContext<TStep>) {
this.context = context;
this.agentPort = context.agentPort;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless

readonly baseRecordRef: RecordRef;
readonly stepDefinition: TStep;
readonly model: BaseChatModel;
readonly agentPort: AgentPort;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

readonly stepDefinition: TStep;
readonly model: BaseChatModel;
readonly agentPort: AgentPort;
readonly workflowPort: WorkflowPort;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

readonly incomingPendingData?: unknown;
readonly stepTimeoutMs?: number;
readonly aiInvokeTimeoutMs?: number;
readonly activityLogPort: ActivityLogPort;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

… into fix/prd-442-activity-log-target

# Conflicts:
#	packages/workflow-executor/src/types/execution-context.ts
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants