1+ import type { LoaderFunctionArgs } from "@remix-run/server-runtime" ;
12import { json } from "@remix-run/server-runtime" ;
23import { tryCatch } from "@trigger.dev/core/utils" ;
4+ import type { RunMetadataChangeOperation } from "@trigger.dev/core/v3/schemas" ;
35import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3" ;
46import { z } from "zod" ;
7+ import { $replica } from "~/db.server" ;
8+ import { authenticateApiRequest } from "~/services/apiAuth.server" ;
59import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server" ;
610import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server" ;
711import { ServiceValidationError } from "~/v3/services/common.server" ;
12+ import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server" ;
13+ import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server" ;
814
915const ParamsSchema = z . object ( {
1016 runId : z . string ( ) ,
1117} ) ;
1218
19+ // Phase A6 — fixes the pre-existing route bug where GET on this URL
20+ // returned a Remix "no loader" 400. The route only exposed PUT (update);
21+ // GET had no handler. Returns `{ metadata, metadataType }` from either
22+ // the Postgres row or the mollifier buffer snapshot.
23+ export async function loader ( { request, params } : LoaderFunctionArgs ) {
24+ const authenticationResult = await authenticateApiRequest ( request ) ;
25+ if ( ! authenticationResult ) {
26+ return json ( { error : "Invalid or Missing API Key" } , { status : 401 } ) ;
27+ }
28+
29+ const parsed = ParamsSchema . safeParse ( params ) ;
30+ if ( ! parsed . success ) {
31+ return json ( { error : "Invalid or missing run ID" } , { status : 400 } ) ;
32+ }
33+
34+ const env = authenticationResult . environment ;
35+
36+ const pgRun = await $replica . taskRun . findFirst ( {
37+ where : { friendlyId : parsed . data . runId , runtimeEnvironmentId : env . id } ,
38+ select : { metadata : true , metadataType : true } ,
39+ } ) ;
40+ if ( pgRun ) {
41+ return json ( { metadata : pgRun . metadata , metadataType : pgRun . metadataType } , { status : 200 } ) ;
42+ }
43+
44+ const buffered = await findRunByIdWithMollifierFallback ( {
45+ runId : parsed . data . runId ,
46+ environmentId : env . id ,
47+ organizationId : env . organizationId ,
48+ } ) ;
49+ if ( buffered ) {
50+ return json (
51+ {
52+ metadata : buffered . metadata ?? null ,
53+ metadataType : buffered . metadataType ?? "application/json" ,
54+ } ,
55+ { status : 200 }
56+ ) ;
57+ }
58+
59+ return json ( { error : "Run not found" } , { status : 404 } ) ;
60+ }
61+
62+ // Route parent/root operations to the existing PG service by directly
63+ // invoking it against the parent/root runId. The service ingests via
64+ // its batching worker, which targets PG by id. If the parent/root is
65+ // itself buffered we recurse through our buffered-mutation helper.
66+ // `_ingestion_only` flag: a synthetic body that has the operations
67+ // promoted to top-level `operations` so the service applies them to
68+ // `targetRunId` directly.
69+ async function routeOperationsToRun (
70+ targetRunId : string | undefined ,
71+ operations : RunMetadataChangeOperation [ ] | undefined ,
72+ env : { id : string ; organizationId : string }
73+ ) : Promise < void > {
74+ if ( ! targetRunId || ! operations || operations . length === 0 ) return ;
75+
76+ // Try PG first via the existing service (this is how parent/root
77+ // operations have always landed; preserve that).
78+ const [ error ] = await tryCatch (
79+ updateMetadataService . call (
80+ targetRunId ,
81+ { operations } ,
82+ { id : env . id , organizationId : env . organizationId } as unknown as Parameters <
83+ typeof updateMetadataService . call
84+ > [ 2 ]
85+ )
86+ ) ;
87+ if ( ! error ) return ;
88+
89+ // PG service threw — could be "Cannot update metadata for a completed
90+ // run" or similar. If the target is buffered, route operations to its
91+ // snapshot too. Best-effort; do not surface this failure to the
92+ // caller — the parent/root ops are auxiliary.
93+ await applyMetadataMutationToBufferedRun ( {
94+ runId : targetRunId ,
95+ body : { operations } ,
96+ } ) ;
97+ }
98+
1399const { action } = createActionApiRoute (
14100 {
15101 params : ParamsSchema ,
@@ -18,23 +104,72 @@ const { action } = createActionApiRoute(
18104 method : "PUT" ,
19105 } ,
20106 async ( { authentication, body, params } ) => {
21- const [ error , result ] = await tryCatch (
22- updateMetadataService . call ( params . runId , body , authentication . environment )
23- ) ;
107+ const env = authentication . environment ;
108+ const runId = params . runId ;
24109
25- if ( error ) {
26- if ( error instanceof ServiceValidationError ) {
27- return json ( { error : error . message } , { status : error . status ?? 422 } ) ;
110+ // PG-canonical path. If the run is in PG, the existing service
111+ // owns the full request shape including parent/root operations,
112+ // metadataVersion CAS, batching, validation — none of which the
113+ // buffer side needs to reimplement.
114+ const [ pgError , pgResult ] = await tryCatch (
115+ updateMetadataService . call ( runId , body , env )
116+ ) ;
117+ if ( pgError ) {
118+ if ( pgError instanceof ServiceValidationError ) {
119+ return json ( { error : pgError . message } , { status : pgError . status ?? 422 } ) ;
28120 }
29-
30121 return json ( { error : "Internal Server Error" } , { status : 500 } ) ;
31122 }
123+ if ( pgResult ) {
124+ return json ( pgResult , { status : 200 } ) ;
125+ }
32126
33- if ( ! result ) {
127+ // PG miss. Target run is either buffered or genuinely absent.
128+ const bufferOutcome = await applyMetadataMutationToBufferedRun ( {
129+ runId,
130+ body : { metadata : body . metadata , operations : body . operations } ,
131+ } ) ;
132+
133+ if ( bufferOutcome . kind === "not_found" ) {
34134 return json ( { error : "Task Run not found" } , { status : 404 } ) ;
35135 }
136+ if ( bufferOutcome . kind === "busy" ) {
137+ // Entry is materialising. Best path is to retry the PG call —
138+ // the row may be visible now. We don't waste a roundtrip in
139+ // the happy path, but a 503 here would be customer-visible
140+ // breakage for legitimately-burst workloads. Hand back 503 with
141+ // a retry hint; SDK retry policy converges.
142+ return json ( { error : "Run materialising, retry shortly" } , { status : 503 } ) ;
143+ }
144+ if ( bufferOutcome . kind === "version_exhausted" ) {
145+ // Pathological contention — many concurrent metadata writers on
146+ // the same buffered runId. Surface as 503 rather than silently
147+ // dropping the request.
148+ return json ( { error : "Metadata write contention; retry shortly" } , { status : 503 } ) ;
149+ }
150+
151+ // Buffered metadata mutation succeeded. Fan parent/root operations
152+ // out to their respective runs (parent/root are typically PG-
153+ // materialised by the time the child is buffered, so the existing
154+ // service handles them; if they're also buffered, the helper
155+ // recurses through the buffered mutation path).
156+ const bufferedEntry = await findRunByIdWithMollifierFallback ( {
157+ runId,
158+ environmentId : env . id ,
159+ organizationId : env . organizationId ,
160+ } ) ;
161+ if ( bufferedEntry ) {
162+ await Promise . all ( [
163+ routeOperationsToRun ( bufferedEntry . parentTaskRunId , body . parentOperations , env ) ,
164+ // The snapshot doesn't carry rootTaskRunId; fall back to parent
165+ // as a rough proxy (matches the existing service's nil-coalesce
166+ // behaviour where rootTaskRun defaults to the parent). Phase D
167+ // / future work could thread rootTaskRunId through the snapshot.
168+ routeOperationsToRun ( bufferedEntry . parentTaskRunId , body . rootOperations , env ) ,
169+ ] ) ;
170+ }
36171
37- return json ( result , { status : 200 } ) ;
172+ return json ( { metadata : bufferOutcome . newMetadata } , { status : 200 } ) ;
38173 }
39174) ;
40175
0 commit comments