Skip to content

Commit 81d9b12

Browse files
danielmillerpclaude
andcommitted
feat: add HTTP-proxy LangGraph checkpoint API
Agents no longer need a direct Postgres connection for LangGraph checkpointing. Instead, checkpoint operations are proxied through 5 new backend endpoints under /checkpoints (get-tuple, put, put-writes, list, delete-thread). Binary blob data is base64-encoded for JSON transport. Includes ORM models for the 4 checkpoint tables, Alembic migration, repository with composite-PK queries, use case layer, Pydantic schemas, and FastAPI routes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f436743 commit 81d9b12

12 files changed

Lines changed: 1551 additions & 13 deletions

File tree

agentex-ui/hooks/use-task-messages.ts

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,27 @@ export function useSendMessage({
116116
throw new Error(response.error.message);
117117
}
118118

119-
queryClient.setQueryData<TaskMessagesData>(queryKey, data => ({
120-
messages: data?.messages || [],
121-
deltaAccumulator: data?.deltaAccumulator || null,
122-
rpcStatus: 'pending',
123-
}));
119+
// Refetch messages and spans now that the agent has finished processing
120+
await queryClient.invalidateQueries({ queryKey: taskMessagesKeys.byTaskId(taskId) });
121+
queryClient.invalidateQueries({ queryKey: ['spans', taskId] });
124122

125-
return (
126-
queryClient.getQueryData<TaskMessagesData>(queryKey) || {
127-
messages: [],
128-
deltaAccumulator: null,
129-
rpcStatus: 'pending',
130-
}
131-
);
123+
const finalMessages = await agentexClient.messages.list({
124+
task_id: taskId,
125+
});
126+
127+
const chronologicalMessages = finalMessages.slice().reverse();
128+
129+
queryClient.setQueryData<TaskMessagesData>(queryKey, {
130+
messages: chronologicalMessages,
131+
deltaAccumulator: null,
132+
rpcStatus: 'success',
133+
});
134+
135+
return {
136+
messages: chronologicalMessages,
137+
deltaAccumulator: null,
138+
rpcStatus: 'success',
139+
};
132140
}
133141

134142
case 'sync': {
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""add_langgraph_checkpoint_tables
2+
3+
Revision ID: d1a6cde41b3f
4+
Revises: d024851e790c
5+
Create Date: 2026-02-11 08:02:10.739927
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
from sqlalchemy.dialects import postgresql
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = 'd1a6cde41b3f'
16+
down_revision: Union[str, None] = 'd024851e790c'
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
# checkpoint_migrations
23+
op.create_table('checkpoint_migrations',
24+
sa.Column('v', sa.Integer(), nullable=False),
25+
sa.PrimaryKeyConstraint('v')
26+
)
27+
28+
# checkpoints
29+
op.create_table('checkpoints',
30+
sa.Column('thread_id', sa.Text(), nullable=False),
31+
sa.Column('checkpoint_ns', sa.Text(), server_default='', nullable=False),
32+
sa.Column('checkpoint_id', sa.Text(), nullable=False),
33+
sa.Column('parent_checkpoint_id', sa.Text(), nullable=True),
34+
sa.Column('type', sa.Text(), nullable=True),
35+
sa.Column('checkpoint', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
36+
sa.Column('metadata', postgresql.JSONB(astext_type=sa.Text()), server_default='{}', nullable=False),
37+
sa.PrimaryKeyConstraint('thread_id', 'checkpoint_ns', 'checkpoint_id')
38+
)
39+
op.create_index('checkpoints_thread_id_idx', 'checkpoints', ['thread_id'], unique=False)
40+
41+
# checkpoint_blobs
42+
op.create_table('checkpoint_blobs',
43+
sa.Column('thread_id', sa.Text(), nullable=False),
44+
sa.Column('checkpoint_ns', sa.Text(), server_default='', nullable=False),
45+
sa.Column('channel', sa.Text(), nullable=False),
46+
sa.Column('version', sa.Text(), nullable=False),
47+
sa.Column('type', sa.Text(), nullable=False),
48+
sa.Column('blob', sa.LargeBinary(), nullable=True),
49+
sa.PrimaryKeyConstraint('thread_id', 'checkpoint_ns', 'channel', 'version')
50+
)
51+
op.create_index('checkpoint_blobs_thread_id_idx', 'checkpoint_blobs', ['thread_id'], unique=False)
52+
53+
# checkpoint_writes
54+
op.create_table('checkpoint_writes',
55+
sa.Column('thread_id', sa.Text(), nullable=False),
56+
sa.Column('checkpoint_ns', sa.Text(), server_default='', nullable=False),
57+
sa.Column('checkpoint_id', sa.Text(), nullable=False),
58+
sa.Column('task_id', sa.Text(), nullable=False),
59+
sa.Column('idx', sa.Integer(), nullable=False),
60+
sa.Column('channel', sa.Text(), nullable=False),
61+
sa.Column('type', sa.Text(), nullable=True),
62+
sa.Column('blob', sa.LargeBinary(), nullable=False),
63+
sa.Column('task_path', sa.Text(), server_default='', nullable=False),
64+
sa.PrimaryKeyConstraint('thread_id', 'checkpoint_ns', 'checkpoint_id', 'task_id', 'idx')
65+
)
66+
op.create_index('checkpoint_writes_thread_id_idx', 'checkpoint_writes', ['thread_id'], unique=False)
67+
68+
# Pre-populate checkpoint_migrations so LangGraph sees all its
69+
# internal migrations as already applied and skips setup().
70+
op.execute(
71+
sa.text(
72+
"INSERT INTO checkpoint_migrations (v) VALUES (0),(1),(2),(3),(4),(5),(6),(7),(8),(9)"
73+
)
74+
)
75+
76+
77+
def downgrade() -> None:
78+
op.drop_index('checkpoint_writes_thread_id_idx', table_name='checkpoint_writes')
79+
op.drop_table('checkpoint_writes')
80+
op.drop_index('checkpoint_blobs_thread_id_idx', table_name='checkpoint_blobs')
81+
op.drop_table('checkpoint_blobs')
82+
op.drop_index('checkpoints_thread_id_idx', table_name='checkpoints')
83+
op.drop_table('checkpoints')
84+
op.drop_table('checkpoint_migrations')

agentex/database/migrations/migration_history.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
24429f13b8bd -> d024851e790c (head), add_performance_indexes
1+
d024851e790c -> d1a6cde41b3f (head), add_langgraph_checkpoint_tables
2+
24429f13b8bd -> d024851e790c, add_performance_indexes
23
a5d67f2d7356 -> 24429f13b8bd, add agent input type
34
329fbafa4ff9 -> a5d67f2d7356, add unhealthy status
45
d7addd4229e8 -> 329fbafa4ff9, change_default_acp_to_async

agentex/src/adapters/orm.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
DateTime,
66
ForeignKey,
77
Index,
8+
Integer,
9+
LargeBinary,
810
String,
911
Text,
1012
func,
@@ -213,3 +215,56 @@ class DeploymentHistoryORM(BaseORM):
213215
"commit_hash",
214216
),
215217
)
218+
219+
220+
# LangGraph checkpoint tables
221+
# These mirror the schema from langgraph.checkpoint.postgres so that
222+
# tables are created via Alembic migrations rather than at agent runtime.
223+
224+
225+
class CheckpointMigrationORM(BaseORM):
226+
__tablename__ = "checkpoint_migrations"
227+
v = Column(Integer, primary_key=True)
228+
229+
230+
class CheckpointORM(BaseORM):
231+
__tablename__ = "checkpoints"
232+
thread_id = Column(Text, nullable=False, primary_key=True)
233+
checkpoint_ns = Column(Text, nullable=False, primary_key=True, server_default="")
234+
checkpoint_id = Column(Text, nullable=False, primary_key=True)
235+
parent_checkpoint_id = Column(Text, nullable=True)
236+
type = Column(Text, nullable=True)
237+
checkpoint = Column(JSONB, nullable=False)
238+
metadata_ = Column("metadata", JSONB, nullable=False, server_default="{}")
239+
__table_args__ = (
240+
Index("checkpoints_thread_id_idx", "thread_id"),
241+
)
242+
243+
244+
class CheckpointBlobORM(BaseORM):
245+
__tablename__ = "checkpoint_blobs"
246+
thread_id = Column(Text, nullable=False, primary_key=True)
247+
checkpoint_ns = Column(Text, nullable=False, primary_key=True, server_default="")
248+
channel = Column(Text, nullable=False, primary_key=True)
249+
version = Column(Text, nullable=False, primary_key=True)
250+
type = Column(Text, nullable=False)
251+
blob = Column(LargeBinary, nullable=True)
252+
__table_args__ = (
253+
Index("checkpoint_blobs_thread_id_idx", "thread_id"),
254+
)
255+
256+
257+
class CheckpointWriteORM(BaseORM):
258+
__tablename__ = "checkpoint_writes"
259+
thread_id = Column(Text, nullable=False, primary_key=True)
260+
checkpoint_ns = Column(Text, nullable=False, primary_key=True, server_default="")
261+
checkpoint_id = Column(Text, nullable=False, primary_key=True)
262+
task_id = Column(Text, nullable=False, primary_key=True)
263+
idx = Column(Integer, nullable=False, primary_key=True)
264+
channel = Column(Text, nullable=False)
265+
type = Column(Text, nullable=True)
266+
blob = Column(LargeBinary, nullable=False)
267+
task_path = Column(Text, nullable=False, server_default="")
268+
__table_args__ = (
269+
Index("checkpoint_writes_thread_id_idx", "thread_id"),
270+
)

agentex/src/api/app.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
agent_api_keys,
2020
agent_task_tracker,
2121
agents,
22+
checkpoints,
2223
deployment_history,
2324
events,
2425
messages,
@@ -183,6 +184,7 @@ async def handle_unexpected(request, exc):
183184
fastapi_app.include_router(agent_api_keys.router)
184185
fastapi_app.include_router(deployment_history.router)
185186
fastapi_app.include_router(schedules.router)
187+
fastapi_app.include_router(checkpoints.router)
186188

187189
# Wrap FastAPI app with health check interceptor for sub-millisecond K8s probe responses.
188190
# This must be the outermost layer to bypass all middleware.

0 commit comments

Comments
 (0)