Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
948d33e
fix(workflow-executor): operation activity log targets the acted reco…
Jun 4, 2026
fb7662d
refactor(workflow): extract OperationStepExecutor and migrate MCP to …
Jun 4, 2026
0ef60ae
refactor(workflow): address review findings on activity-log operation
Jun 4, 2026
4632412
refactor(workflow): extract activity-log audit into AgentWithLog
Jun 4, 2026
981cdd3
chore(workflow): address AgentWithLog review nits
Jun 4, 2026
63318a2
fix(workflow): fail loud when a collection schema is not cached
Jun 4, 2026
d9f5c31
refactor(workflow): fold schema cache-or-fetch into getOrLoad
Jun 4, 2026
8ba46f1
refactor(workflow): replace SchemaCache.getOrLoad with per-run Schema…
Jun 5, 2026
6d1bae2
refactor(workflow): drop errorMessage from ActivityLogPort.markFailed
Jun 5, 2026
f3fdeb4
test(workflow): cover beforeCall-throws in AgentWithLog audit
Jun 5, 2026
b69167d
refactor(workflow): drop dead schemaCache field from ExecutionContext
Jun 5, 2026
8ac4573
docs(workflow): drop markFailed comment
Jun 8, 2026
3eb8f30
refactor(workflow): build AgentWithLog in the factory, hide raw agent…
Jun 8, 2026
3b44f36
docs(workflow): drop comments leaking AgentWithLog internals
Jun 8, 2026
68e07c5
Merge remote-tracking branch 'origin/feat/prd-214-server-step-mapper'…
Jun 8, 2026
a7f536a
refactor(workflow): drop dead raw ports from ExecutionContext
Jun 8, 2026
6efee0f
docs(workflow): flag getActionFormInfo as unaudited, fix stale agentP…
Jun 8, 2026
ec949b6
Merge remote-tracking branch 'origin/feat/prd-214-server-step-mapper'…
Jun 8, 2026
e4e4186
refactor(workflow): thread WriteOptions through AgentWithLog audit
Jun 8, 2026
37f7048
refactor(workflow): extract ActivityLogger, decouple audit from Agent…
Jun 8, 2026
e41c488
refactor(workflow): rename ActivityLogger→ActivityLog, run→track, nam…
Jun 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ src/
- **Boundary errors** (`extends Error`) — Thrown outside step execution, at the HTTP or Runner layer (e.g. `RunNotFoundError`, `PendingDataNotFoundError`, `ConfigurationError`). Caught by the HTTP server and translated into HTTP status codes (404, 400, etc.). These intentionally do NOT extend `WorkflowExecutorError` to prevent `base-step-executor` from catching them as step failures.
- **Dual error messages** — `WorkflowExecutorError` carries two messages: `message` (technical, for dev logs) and `userMessage` (human-readable, surfaced to the Forest Admin UI via `stepOutcome.error`). The mapping happens in a single place: `base-step-executor.ts` uses `error.userMessage` when building the error outcome. When adding a new error subclass, always provide a distinct `userMessage` oriented toward end-users (no collection names, field names, or AI internals). If `userMessage` is omitted in the constructor call, it falls back to `message`.
- **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`).
- **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` is called before `runWithActivityLog()` so neither cache hits nor uncertain-state errors emit activity log entries. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe.
- **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` runs before `doExecute()` so neither cache hits nor uncertain-state errors reach the activity log emitted by `AgentWithLog`. The `executing` write-ahead marker is saved in the `beforeCall` thunk the executor passes to `AgentWithLog`'s write methods (run after `createPending`, just before the side effect) so an activity-log creation failure never leaves an orphan `executing` marker. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe.
- **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getAvailableRuns()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightRuns` (keyed by `runId`, to avoid running the same run twice concurrently; the key is the run, not the step, because a chain advances the `stepId` between iterations).
- **Auto-chain from `/update-step` response** — `WorkflowPort.updateStepExecution` returns `AvailableRunDispatch | null`: when non-null, the `Runner` executes the next step inline instead of waiting for the next poll. The chain exits on `null` (awaiting-input / finished / error), on a non-progressing `stepIndex` (server bug defense), at `maxChainDepth` (config, default 50), or when `stop()` is called. Each chained step uses the `forestServerToken` from its own dispatch — token freshness is preserved across the chain. The port retries `POST /update-step` on transient failures (network, 5xx) — this relies on server-side idempotency: the orchestrator MUST deduplicate identical outcomes for a given `(runId, stepIndex)` to prevent double side-effects on retry.
- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded **technical names** (`fieldName`/`fieldNames`/`actionName`/`relationName`) directly instead of invoking the AI — the orchestrator→executor wire references fields/relations/actions by their stable technical name, never by the mutable, non-unique `displayName`. The `displayName` persisted in the RunStore is always resolved from the live schema at execution time (still persisted for the AI and for the front — see "displayName in AI tools"). Technical names are matched exactly against the schema (`findFieldByTechnicalName` / the exact action lookup) — the displayName + fuzzy tolerances of `findField` are reserved for AI-returned names, so a technical name can't resolve to a different field whose displayName collides. Each record step type has its own typed `preRecordedArgs` shape. An unresolvable name throws `FieldNotFoundError` / `ActionNotFoundError` / `RelationNotFoundError` (read-record instead throws `NoResolvedFieldsError`, only when *no* field resolves — individual misses are surfaced per-field). Malformed arg shapes — e.g. `fieldName` without `value`, or an out-of-range `selectedRecordStepIndex` — throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ import type {
CreateActivityLogArgs,
} from '../ports/activity-log-port';
import type { Logger } from '../ports/logger-port';
import type {
ActivityLogAction,
ActivityLogsServiceInterface,
} from '@forestadmin/forestadmin-client';
import type { ActivityLogsServiceInterface } from '@forestadmin/forestadmin-client';

import { serializeRecordId } from './record-id-serializer';
import withRetry from './with-retry';
Expand All @@ -30,7 +27,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
this.service.createActivityLog({
forestServerToken: this.forestServerToken,
renderingId: String(args.renderingId),
action: args.action as ActivityLogAction,
action: args.action,
type: args.type,
// The lib writes this value verbatim into relationships.collection.data.id
// (JSON:API). The Forest server audit-trail API expects the numeric collectionId.
Expand Down Expand Up @@ -76,7 +73,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
});
}

async markFailed(handle: ActivityLogHandle, errorMessage: string): Promise<void> {
async markFailed(handle: ActivityLogHandle): Promise<void> {
return this.drainer.track(async () => {
try {
await withRetry(
Expand All @@ -92,7 +89,6 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
} catch (err) {
this.logger.error('activity log mark-as-failed failed', {
handleId: handle.id,
stepErrorMessage: errorMessage,
error: extractErrorMessage(err),
});
}
Expand Down
11 changes: 11 additions & 0 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,17 @@ export class AgentPortError extends WorkflowExecutorError {
}
}

// Invariant guard: the agent port reads a collection's schema (for its primary keys) from the
// cache, which the executor must populate via getCollectionSchema before any record access.
export class SchemaNotCachedError extends WorkflowExecutorError {
constructor(collectionName: string) {
super(
`Collection schema for "${collectionName}" was not loaded before access — call getCollectionSchema first`,
'An error occurred while accessing your data. Please try again.',
);
}
}

export class WorkflowPortError extends WorkflowExecutorError {
constructor(operation: string, cause: unknown) {
super(
Expand Down
47 changes: 47 additions & 0 deletions packages/workflow-executor/src/executors/activity-log.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { ActivityLogPort, CreateActivityLogArgs } from '../ports/activity-log-port';
import type { StepUser } from '../types/execution-context';

// The activity-log target minus renderingId, which track() stamps centrally.
export type AuditTarget = Omit<CreateActivityLogArgs, 'renderingId'>;

export type TrackOptions<T> = {
operation: () => Promise<T>;
// Runs between createPending and the operation — the executor's write-ahead marker. Optional:
// read operations have no marker to persist.
beforeCall?: () => Promise<void>;
};

// Runs an operation while recording an activity-log entry around it (pending → success/failed).
// It both executes `operation` and owns the activity-log transitions, so callers never touch the
// ActivityLogPort directly. `beforeCall` runs after createPending, just before the operation, so
// an audit-creation failure never leaves an orphan write-ahead marker.
export default class ActivityLog {
private readonly activityLogPort: ActivityLogPort;

private readonly user: StepUser;

constructor(activityLogPort: ActivityLogPort, user: StepUser) {
this.activityLogPort = activityLogPort;
this.user = user;
}

async track<T>(target: AuditTarget, { operation, beforeCall }: TrackOptions<T>): Promise<T> {
const handle = await this.activityLogPort.createPending({
renderingId: this.user.renderingId,
...target,
});

try {
if (beforeCall) await beforeCall();
const result = await operation();
void this.activityLogPort.markSucceeded(handle);

return result;
} catch (err) {
// The step error is logged/surfaced by base-step-executor when rethrown, so the audit
// transition only needs the handle.
void this.activityLogPort.markFailed(handle);
throw err;
}
}
}
106 changes: 106 additions & 0 deletions packages/workflow-executor/src/executors/agent-with-log.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import type ActivityLog from './activity-log';
import type {
AgentPort,
ExecuteActionQuery,
GetActionFormInfoQuery,
GetRecordQuery,
GetRelatedDataQuery,
GetSingleRelatedDataQuery,
UpdateRecordQuery,
} from '../ports/agent-port';
import type SchemaResolver from '../schema-resolver';
import type { StepUser } from '../types/execution-context';
import type { RecordData } from '../types/validated/collection';

type WriteOptions = { beforeCall: () => Promise<void> };

export interface AgentWithLogDeps {
agentPort: AgentPort;
schemaResolver: SchemaResolver;
user: StepUser;
activityLog: ActivityLog;
}

// Wraps AgentPort and runs each data-access call through the ActivityLog so it records an
// activity-log entry. The audit target is derived from the call: the numeric collectionId is
// resolved from the call's collection name, the recordId from its id. Idempotency stays in the
// executors: write methods forward a `beforeCall` thunk (the executor's write-ahead marker).
export default class AgentWithLog {
private readonly agentPort: AgentPort;

private readonly schemaResolver: SchemaResolver;

private readonly user: StepUser;

private readonly activityLog: ActivityLog;

constructor(deps: AgentWithLogDeps) {
this.agentPort = deps.agentPort;
this.schemaResolver = deps.schemaResolver;
this.user = deps.user;
this.activityLog = deps.activityLog;
}

async getRecord(query: GetRecordQuery): Promise<RecordData> {
const collectionId = await this.resolveCollectionId(query.collection);

return this.activityLog.track(
{ action: 'index', type: 'read', collectionId, recordId: query.id },
{ operation: () => this.agentPort.getRecord(query, this.user) },
);
}

async getRelatedData(query: GetRelatedDataQuery): Promise<RecordData[]> {
const collectionId = await this.resolveCollectionId(query.collection);

return this.activityLog.track(
{ action: 'listRelatedData', type: 'read', collectionId, recordId: query.id },
{ operation: () => this.agentPort.getRelatedData(query, this.user) },
);
}

async getSingleRelatedData(query: GetSingleRelatedDataQuery): Promise<RecordData | null> {
const collectionId = await this.resolveCollectionId(query.collection);

return this.activityLog.track(
{ action: 'listRelatedData', type: 'read', collectionId, recordId: query.id },
{ operation: () => this.agentPort.getSingleRelatedData(query, this.user) },
);
}

async updateRecord(query: UpdateRecordQuery, opts: WriteOptions): Promise<RecordData> {
const collectionId = await this.resolveCollectionId(query.collection);

return this.activityLog.track(
{ action: 'update', type: 'write', collectionId, recordId: query.id },
{
operation: () => this.agentPort.updateRecord(query, this.user),
beforeCall: opts.beforeCall,
},
);
}

async executeAction(query: ExecuteActionQuery, opts: WriteOptions): Promise<unknown> {
const collectionId = await this.resolveCollectionId(query.collection);

return this.activityLog.track(
{ action: 'action', type: 'write', collectionId, recordId: query.id },
{
operation: () => this.agentPort.executeAction(query, this.user),
beforeCall: opts.beforeCall,
},
);
}

// Unaudited passthrough: form-info is a read-only probe (does this action have a form?),
// not a data access, so unlike the methods above it records NO activity-log entry.
getActionFormInfo(query: GetActionFormInfoQuery): Promise<{ hasForm: boolean }> {
return this.agentPort.getActionFormInfo(query, this.user);
}

private async resolveCollectionId(collectionName: string): Promise<string> {
const schema = await this.schemaResolver.resolve(collectionName);

return schema.collectionId;
}
}
49 changes: 3 additions & 46 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import type { CreateActivityLogArgs } from '../ports/activity-log-port';
import type { AgentPort } from '../ports/agent-port';
import type {
ExecutionContext,
IStepExecutor,
Expand Down Expand Up @@ -35,11 +33,8 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
{
protected readonly context: ExecutionContext<TStep>;

protected readonly agentPort: AgentPort;

constructor(context: ExecutionContext<TStep>) {
this.context = context;
this.agentPort = context.agentPort;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless

}

async execute(): Promise<StepExecutionResult> {
Expand All @@ -51,8 +46,8 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
});

try {
// Idempotency guard — mutating executors override this. Called before runWithActivityLog
// so that cache hits and uncertain-state errors never emit an activity log entry.
// Idempotency guard — mutating executors override this. Runs before doExecute so a cache
// hit or uncertain-state error short-circuits before any side effect.
const cached = await this.checkIdempotency();

if (cached) {
Expand All @@ -64,7 +59,7 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
return cached;
}

const result = await this.runWithActivityLog();
const result = await this.runWithTimeout();

this.context.logger.info('Step execution completed', {
...this.logCtx,
Expand Down Expand Up @@ -113,44 +108,6 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
return Promise.resolve(null);
}

// Return null when the frontend performs the action (e.g. TriggerAction without
// executionType=FullyAutomated) — the front logs on its side. Override when the
// executor itself calls the agent.
protected buildActivityLogArgs(): CreateActivityLogArgs | null {
return null;
}

private async runWithActivityLog(): Promise<StepExecutionResult> {
const args = this.buildActivityLogArgs();
if (!args) return this.runWithTimeout();

const handle = await this.context.activityLogPort.createPending(args);

let result: StepExecutionResult;

try {
result = await this.runWithTimeout();
} catch (err) {
// Use userMessage (not the technical message) — errorMessage is rendered to end-users
// in the Forest Admin UI. Privacy: no collection/field/AI internals in the audit trail.
const errorMessage =
err instanceof WorkflowExecutorError ? err.userMessage : 'Unexpected error';
void this.context.activityLogPort.markFailed(handle, errorMessage);
throw err;
}

if (result.stepOutcome.status === 'error') {
void this.context.activityLogPort.markFailed(
handle,
result.stepOutcome.error ?? 'Step failed',
);
} else {
void this.context.activityLogPort.markSucceeded(handle);
}

return result;
}

// Promise.race doesn't abort the losing branch — it keeps running in the background. The .catch()
// on execPromise must be attached BEFORE the race so a late rejection doesn't trigger
// UnhandledPromiseRejection. Late resolutions are silently discarded.
Expand Down
Loading
Loading