Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions packages/agent/src/server/agent-server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,15 @@ vi.mock("@anthropic-ai/claude-agent-sdk", async (importOriginal) => ({
interface TestableServer {
getInitialPromptOverride(run: TaskRun): string | null;
getClearedPendingUserState(run: TaskRun | null): string[] | null;
isAwaitingFirstUserMessage(run: TaskRun | null): boolean;
buildResumePromptWithMessage(
context: {
conversationSummary: string;
sandboxContext: string;
checkpointApplied: boolean;
},
userPrompt: Array<{ type: string; text?: string }>,
): Array<{ type: string; text?: string }>;
clearPendingInitialPromptState(
payload: JwtPayload,
run: TaskRun | null,
Expand Down Expand Up @@ -1100,6 +1109,56 @@ describe("AgentServer HTTP Mode", () => {
});
});

describe("warm resume defers the first turn to the user's message", () => {
it("treats a run with await_user_message as warm", () => {
const s = createServer() as unknown as TestableServer;
expect(
s.isAwaitingFirstUserMessage({
state: { await_user_message: true },
} as unknown as TaskRun),
).toBe(true);
});

it("does not treat a normal (non-warm) run as awaiting a first message", () => {
const s = createServer() as unknown as TestableServer;
expect(
s.isAwaitingFirstUserMessage({
state: { resume_from_run_id: "prev" },
} as unknown as TaskRun),
).toBe(false);
expect(s.isAwaitingFirstUserMessage({ state: {} } as TaskRun)).toBe(
false,
);
expect(s.isAwaitingFirstUserMessage(null)).toBe(false);
});

it("merges resumed history with the user's message into a single turn", () => {
const s = createServer() as unknown as TestableServer;
Comment thread
skoob13 marked this conversation as resolved.
const blocks = s.buildResumePromptWithMessage(
{
conversationSummary: "**User**: what's my pageview count",
sandboxContext: "sandbox-context-sentinel",
checkpointApplied: false,
},
[{ type: "text", text: "break down by a country" }],
);

// Preamble first (so the frontend's resume-context filter hides it), the
// user's own message stays a distinct block, and a closing instruction.
expect(blocks[0].text).toMatch(
/^You are resuming a previous conversation\./,
);
expect(blocks[0].text).toContain("sandbox-context-sentinel");
expect(blocks[0].text).toContain("what's my pageview count");
expect(blocks.some((b) => b.text === "break down by a country")).toBe(
true,
);
expect(blocks[blocks.length - 1].text).toContain(
"Respond to the user's new message above",
);
});
});

describe("runtime adapter selection", () => {
it("defaults to claude when no runtime adapter is configured", () => {
const s = createServer();
Expand Down
230 changes: 153 additions & 77 deletions packages/agent/src/server/agent-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ interface ActiveSession {
pendingHandoffGitState?: HandoffLocalGitState;
}

interface PreparedResumeContext {
conversationSummary: string;
sandboxContext: string;
checkpointApplied: boolean;
}

function getTaskRunStateString(
taskRun: TaskRun | null,
key: string,
Expand Down Expand Up @@ -258,6 +264,13 @@ export class AgentServer {
private readonly evaluatedPrUrls = new Set<string>();
private lastReportedBranch: string | null = null;
private resumeState: ResumeState | null = null;
// Set on a warm (pre-provisioned) resume boot: the conversation history is
// hydrated but NOT prompted, so the agent stays silent until the user's first
// real message. That message's turn (see the user_message command) prepends
// this context, so the agent answers the message with full continuity in a
// single turn instead of speaking a "continue where you left off" turn into
// an empty room.
private pendingResumeContext: PreparedResumeContext | null = null;
// Guards against concurrent session initialization. autoInitializeSession() and
// the GET /events SSE handler can both call initializeSession() — the SSE connection
// often arrives while newSession() is still awaited (this.session is still null),
Expand Down Expand Up @@ -687,19 +700,28 @@ export class AgentServer {
? params.artifacts.length
: 0,
});
const prompt = await this.buildPromptFromContentAndArtifacts({
const userPrompt = await this.buildPromptFromContentAndArtifacts({
content: params.content as string | ContentBlock[] | undefined,
artifacts: Array.isArray(params.artifacts)
? (params.artifacts as TaskRunArtifact[])
: [],
taskId: this.session.payload.task_id,
runId: this.session.payload.run_id,
});
if (prompt.length === 0) {
if (userPrompt.length === 0) {
throw new Error("User message cannot be empty");
}
// A warm resume hydrated the conversation context without prompting; the
// first real message carries that history so the agent answers it with
// full continuity in a single turn (one-shot — cleared after use).
const resumedContext = this.pendingResumeContext;
this.pendingResumeContext = null;
const prompt = resumedContext
? this.buildResumePromptWithMessage(resumedContext, userPrompt)
: userPrompt;
this.logger.debug("Built user_message prompt", {
blockTypes: prompt.map((block) => block.type),
resumedContext: resumedContext !== null,
});
const promptPreview = promptBlocksToText(prompt);

Expand Down Expand Up @@ -1216,8 +1238,17 @@ export class AgentServer {
}
}

// Resume flow: if we have resume state, format conversation history as context
// Resume flow: if we have resume state, format conversation history as context.
if (this.resumeState && this.resumeState.conversation.length > 0) {
if (this.isAwaitingFirstUserMessage(taskRun)) {
// Warm pre-provision (no user is waiting yet): hydrate the conversation
// context + filesystem, but do NOT prompt. The resumed history rides
// along with the user's first real message (see the user_message
// command), so the agent never speaks before the user has sent anything.
this.pendingResumeContext = await this.prepareResumeContext(payload);
this.resumeState = null;
return;
}
await this.sendResumeMessage(payload, taskRun);
return;
}
Expand Down Expand Up @@ -1281,95 +1312,140 @@ export class AgentServer {
}
}

/** A warm (pre-provisioned) resume run idles awaiting its first user message. */
private isAwaitingFirstUserMessage(taskRun: TaskRun | null): boolean {
const state = taskRun?.state;
if (!state || typeof state !== "object") {
return false;
}
return (state as Record<string, unknown>).await_user_message === true;
}

/**
* Apply the resume checkpoint (filesystem) and build the conversation-history
* preamble. Split out of `sendResumeMessage` so a warm boot can hydrate this
* context without prompting — the agent stays silent until the first real user
* message, which then prepends the returned context via
* `buildResumePromptWithMessage`.
*/
private async prepareResumeContext(
payload: JwtPayload,
): Promise<PreparedResumeContext> {
if (!this.resumeState) {
return {
conversationSummary: "",
sandboxContext: "",
checkpointApplied: false,
};
}

const conversationSummary = formatConversationForResume(
this.resumeState.conversation,
);

let checkpointApplied = false;
if (
this.resumeState.latestGitCheckpoint &&
this.config.repositoryPath &&
this.posthogAPI
) {
try {
const checkpointTracker = new HandoffCheckpointTracker({
repositoryPath: this.config.repositoryPath,
taskId: payload.task_id,
runId: payload.run_id,
apiClient: this.posthogAPI,
logger: this.logger.child("HandoffCheckpoint"),
});
const metrics = await checkpointTracker.applyFromHandoff(
this.resumeState.latestGitCheckpoint,
);
checkpointApplied = true;
this.logger.debug("Git checkpoint applied", {
branch: this.resumeState.latestGitCheckpoint.branch,
head: this.resumeState.latestGitCheckpoint.head,
packBytes: metrics.packBytes,
indexBytes: metrics.indexBytes,
totalBytes: metrics.totalBytes,
});
} catch (error) {
this.logger.warn("Failed to apply git checkpoint", {
error: error instanceof Error ? error.message : String(error),
branch: this.resumeState.latestGitCheckpoint.branch,
});
}
}

const sandboxContext = checkpointApplied
? `The workspace environment (all files, packages, and code changes) has been fully restored from the latest checkpoint.`
: `The workspace from the previous session was not restored from a checkpoint, so you are starting with a fresh environment. Your conversation history is fully preserved below.`;

return { conversationSummary, sandboxContext, checkpointApplied };
}

/**
* The resume preamble + the user's message + the closing instruction, as
* separate content blocks (the user's own message stays a distinct block).
* Used both when a pending message exists at resume time and when a warm
* resume's first real message arrives.
*/
private buildResumePromptWithMessage(
context: PreparedResumeContext,
userPrompt: ContentBlock[],
): ContentBlock[] {
return [
{
type: "text",
text:
`You are resuming a previous conversation. ${context.sandboxContext}\n\n` +
`Here is the conversation history from the previous session:\n\n` +
`${context.conversationSummary}\n\n` +
`The user has sent a new message:\n\n`,
},
...userPrompt,
{
type: "text",
text: "\n\nRespond to the user's new message above. You have full context from the previous session.",
},
];
}

private async sendResumeMessage(
payload: JwtPayload,
taskRun: TaskRun | null,
): Promise<void> {
if (!this.session || !this.resumeState) return;

try {
const conversationSummary = formatConversationForResume(
this.resumeState.conversation,
);

let checkpointApplied = false;
if (
this.resumeState.latestGitCheckpoint &&
this.config.repositoryPath &&
this.posthogAPI
) {
try {
const checkpointTracker = new HandoffCheckpointTracker({
repositoryPath: this.config.repositoryPath,
taskId: payload.task_id,
runId: payload.run_id,
apiClient: this.posthogAPI,
logger: this.logger.child("HandoffCheckpoint"),
});
const metrics = await checkpointTracker.applyFromHandoff(
this.resumeState.latestGitCheckpoint,
);
checkpointApplied = true;
this.logger.debug("Git checkpoint applied", {
branch: this.resumeState.latestGitCheckpoint.branch,
head: this.resumeState.latestGitCheckpoint.head,
packBytes: metrics.packBytes,
indexBytes: metrics.indexBytes,
totalBytes: metrics.totalBytes,
});
} catch (error) {
this.logger.warn("Failed to apply git checkpoint", {
error: error instanceof Error ? error.message : String(error),
branch: this.resumeState.latestGitCheckpoint.branch,
});
}
}
const conversationTurns = this.resumeState.conversation.length;
const hasGitCheckpoint = !!this.resumeState.latestGitCheckpoint;
const gitCheckpointBranch =
this.resumeState.latestGitCheckpoint?.branch ?? null;

const context = await this.prepareResumeContext(payload);
const pendingUserPrompt = await this.getPendingUserPrompt(taskRun);

const sandboxContext = checkpointApplied
? `The workspace environment (all files, packages, and code changes) has been fully restored from the latest checkpoint.`
: `The workspace from the previous session was not restored from a checkpoint, so you are starting with a fresh environment. Your conversation history is fully preserved below.`;

let resumePromptBlocks: ContentBlock[];
if (pendingUserPrompt?.length) {
resumePromptBlocks = [
{
type: "text",
text:
`You are resuming a previous conversation. ${sandboxContext}\n\n` +
`Here is the conversation history from the previous session:\n\n` +
`${conversationSummary}\n\n` +
`The user has sent a new message:\n\n`,
},
...pendingUserPrompt,
{
type: "text",
text: "\n\nRespond to the user's new message above. You have full context from the previous session.",
},
];
} else {
resumePromptBlocks = [
{
type: "text",
text:
`You are resuming a previous conversation. ${sandboxContext}\n\n` +
`Here is the conversation history from the previous session:\n\n` +
`${conversationSummary}\n\n` +
`Continue from where you left off. The user is waiting for your response.`,
},
];
}
const resumePromptBlocks: ContentBlock[] = pendingUserPrompt?.length
? this.buildResumePromptWithMessage(context, pendingUserPrompt)
: [
{
type: "text",
text:
`You are resuming a previous conversation. ${context.sandboxContext}\n\n` +
`Here is the conversation history from the previous session:\n\n` +
`${context.conversationSummary}\n\n` +
`Continue from where you left off. The user is waiting for your response.`,
},
];

this.logger.debug("Sending resume message", {
taskId: payload.task_id,
conversationTurns: this.resumeState.conversation.length,
conversationTurns,
promptLength: promptBlocksToText(resumePromptBlocks).length,
hasPendingUserMessage: !!pendingUserPrompt?.length,
checkpointApplied,
hasGitCheckpoint: !!this.resumeState.latestGitCheckpoint,
gitCheckpointBranch:
this.resumeState.latestGitCheckpoint?.branch ?? null,
checkpointApplied: context.checkpointApplied,
hasGitCheckpoint,
gitCheckpointBranch,
});

// Clear resume state so it's not reused
Expand Down
Loading