Skip to content

Commit d144d79

Browse files
committed
fix(gateway): pin firecrawl async status requests to tracked account
1 parent 8e7a9f2 commit d144d79

1 file changed

Lines changed: 221 additions & 75 deletions

File tree

crates/gateway/src/lib.rs

Lines changed: 221 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,13 @@ struct WebhookQuery {
237237

238238
include!("webhook.rs");
239239

240+
#[derive(Clone)]
241+
struct PinnedAsyncRouteSelection {
242+
account: ProviderAccount,
243+
egress_proxy: Option<EgressProxy>,
244+
selection_reason: String,
245+
}
246+
240247
async fn proxy(
241248
State(state): State<GatewayState>,
242249
method: Method,
@@ -289,83 +296,98 @@ async fn proxy_request(
289296
body: body.to_vec(),
290297
received_at: time::OffsetDateTime::now_utc(),
291298
};
299+
let pinned_async_route =
300+
resolve_pinned_async_route(&state, provider_id, &method, route.upstream_path.as_str())
301+
.await?;
292302

293303
let max_retries = state.scheduler.config().max_retries;
304+
let using_pinned_async_route = pinned_async_route.is_some();
294305
let mut excluded_account_ids: Vec<String> = Vec::new();
295306
let mut excluded_routes: Vec<RouteRetryExclusion> = Vec::new();
296307
let mut last_error: Option<GatewayError> = None;
297308
let mut last_retryable_response: Option<DeferredRetryableResponse> = None;
298309

299310
for attempt in 0..=max_retries {
300-
let excluded_account_refs = excluded_account_ids
301-
.iter()
302-
.map(String::as_str)
303-
.collect::<Vec<_>>();
304-
let excluded_route_refs = excluded_routes
305-
.iter()
306-
.map(|route| SchedulerRouteExclusion {
307-
account_id: route.account_id.as_str(),
308-
proxy_id: route.proxy_id.as_deref(),
309-
})
310-
.collect::<Vec<_>>();
311-
let selection = match state
312-
.scheduler
313-
.select_route_excluding(
314-
&state.storage,
315-
provider_id,
316-
SchedulerExclusions {
317-
account_ids: &excluded_account_refs,
318-
routes: &excluded_route_refs,
319-
},
320-
)
321-
.await
322-
{
323-
Ok(sel) => sel,
324-
Err(err) => {
325-
if attempt > 0 {
326-
if let Some(pending) = last_retryable_response.take() {
327-
let DeferredRetryableResponse {
328-
account,
329-
egress_proxy,
330-
upstream,
331-
response_class,
332-
latency_ms,
333-
selection_reason,
334-
plan_url,
335-
route_upstream_path,
336-
webhook_secret,
337-
attempts,
338-
} = pending;
339-
return finalize_upstream_response(
340-
&state,
341-
FinalizeUpstreamResponseArgs {
342-
provider: provider.as_str(),
343-
provider_id,
344-
upstream_path: &route_upstream_path,
345-
request_envelope: &request_envelope,
346-
plan_url: &plan_url,
347-
account: &account,
348-
egress_proxy: egress_proxy.as_ref(),
349-
response_class,
350-
latency_ms,
351-
selection_reason: &selection_reason,
352-
attempts,
353-
platform_api_key: &platform_api_key,
354-
webhook_secret,
355-
outcome_recorded: true,
356-
},
357-
upstream,
358-
)
359-
.await;
311+
let (selection_reason, account, egress_proxy) =
312+
if let Some(pinned_route) = &pinned_async_route {
313+
(
314+
pinned_route.selection_reason.clone(),
315+
pinned_route.account.clone(),
316+
pinned_route.egress_proxy.clone(),
317+
)
318+
} else {
319+
let excluded_account_refs = excluded_account_ids
320+
.iter()
321+
.map(String::as_str)
322+
.collect::<Vec<_>>();
323+
let excluded_route_refs = excluded_routes
324+
.iter()
325+
.map(|route| SchedulerRouteExclusion {
326+
account_id: route.account_id.as_str(),
327+
proxy_id: route.proxy_id.as_deref(),
328+
})
329+
.collect::<Vec<_>>();
330+
let selection = match state
331+
.scheduler
332+
.select_route_excluding(
333+
&state.storage,
334+
provider_id,
335+
SchedulerExclusions {
336+
account_ids: &excluded_account_refs,
337+
routes: &excluded_route_refs,
338+
},
339+
)
340+
.await
341+
{
342+
Ok(selection) => selection,
343+
Err(err) => {
344+
if attempt > 0 {
345+
if let Some(pending) = last_retryable_response.take() {
346+
let DeferredRetryableResponse {
347+
account,
348+
egress_proxy,
349+
upstream,
350+
response_class,
351+
latency_ms,
352+
selection_reason,
353+
plan_url,
354+
route_upstream_path,
355+
webhook_secret,
356+
attempts,
357+
} = pending;
358+
return finalize_upstream_response(
359+
&state,
360+
FinalizeUpstreamResponseArgs {
361+
provider: provider.as_str(),
362+
provider_id,
363+
upstream_path: &route_upstream_path,
364+
request_envelope: &request_envelope,
365+
plan_url: &plan_url,
366+
account: &account,
367+
egress_proxy: egress_proxy.as_ref(),
368+
response_class,
369+
latency_ms,
370+
selection_reason: &selection_reason,
371+
attempts,
372+
platform_api_key: &platform_api_key,
373+
webhook_secret,
374+
outcome_recorded: true,
375+
},
376+
upstream,
377+
)
378+
.await;
379+
}
380+
break;
381+
}
382+
return Err(err.into());
360383
}
361-
break;
362-
}
363-
return Err(err.into());
364-
}
365-
};
366-
let selection_reason = selection.selection_reason;
367-
let account = selection.account;
368-
let egress_proxy = selection.egress_proxy;
384+
};
385+
(
386+
selection.selection_reason,
387+
selection.account,
388+
selection.egress_proxy,
389+
)
390+
};
369391
let egress_mode = if egress_proxy.is_some() {
370392
"proxy"
371393
} else {
@@ -478,13 +500,21 @@ async fn proxy_request(
478500
provider = %provider_id,
479501
provider_account_id = %account.id,
480502
attempt,
503+
pinned_async_route = using_pinned_async_route,
481504
error = %error_message,
482-
"transport error, retrying with different route"
505+
"{}",
506+
if using_pinned_async_route {
507+
"transport error, retrying tracked async route"
508+
} else {
509+
"transport error, retrying with different route"
510+
}
483511
);
484-
excluded_routes.push(RouteRetryExclusion {
485-
account_id: account.id,
486-
proxy_id: egress_proxy.as_ref().map(|value| value.id.clone()),
487-
});
512+
if !using_pinned_async_route {
513+
excluded_routes.push(RouteRetryExclusion {
514+
account_id: account.id,
515+
proxy_id: egress_proxy.as_ref().map(|value| value.id.clone()),
516+
});
517+
}
488518
last_error = Some(error);
489519
continue;
490520
}
@@ -539,7 +569,13 @@ async fn proxy_request(
539569
provider_account_id = %account.id,
540570
status = upstream.status.as_u16(),
541571
attempt,
542-
"retryable response, trying next account"
572+
pinned_async_route = using_pinned_async_route,
573+
"{}",
574+
if using_pinned_async_route {
575+
"retryable response, retrying tracked async route"
576+
} else {
577+
"retryable response, trying next account"
578+
}
543579
);
544580
if let Err(storage_error) = record_provider_account_outcome(
545581
&state,
@@ -584,7 +620,9 @@ async fn proxy_request(
584620
webhook_secret,
585621
attempts: attempt + 1,
586622
});
587-
excluded_account_ids.push(excluded_account_id);
623+
if !using_pinned_async_route {
624+
excluded_account_ids.push(excluded_account_id);
625+
}
588626
continue;
589627
}
590628

@@ -614,6 +652,114 @@ async fn proxy_request(
614652
Err(last_error.unwrap_or_else(|| GatewayError::ProviderUnavailable(provider_id)))
615653
}
616654

655+
async fn resolve_pinned_async_route(
656+
state: &GatewayState,
657+
provider_id: ProviderId,
658+
method: &Method,
659+
upstream_path: &str,
660+
) -> Result<Option<PinnedAsyncRouteSelection>, GatewayError> {
661+
if provider_id != ProviderId::Firecrawl || *method != Method::GET {
662+
return Ok(None);
663+
}
664+
let Some((route, upstream_job_id)) = parse_firecrawl_async_status_path(upstream_path) else {
665+
return Ok(None);
666+
};
667+
let Some(job) = state
668+
.storage
669+
.find_provider_async_job_by_upstream_id(provider_id, upstream_job_id)
670+
.await
671+
.map_err(GatewayError::WebhookStorage)?
672+
else {
673+
info!(
674+
provider = %provider_id,
675+
route,
676+
upstream_job_id,
677+
"firecrawl async status request not pinned because no tracked job was found"
678+
);
679+
return Ok(None);
680+
};
681+
let Some(account_id) = job.provider_account_id.as_deref() else {
682+
return Err(GatewayError::Provider(ProviderError::InvalidRoute(
683+
format!(
684+
"tracked async job `{}` is missing provider account binding",
685+
job.id
686+
),
687+
)));
688+
};
689+
let Some(account) = state
690+
.storage
691+
.find_provider_account(account_id)
692+
.await
693+
.map_err(GatewayError::WebhookStorage)?
694+
else {
695+
return Err(GatewayError::Provider(ProviderError::InvalidRoute(
696+
format!(
697+
"tracked async job `{}` provider account `{account_id}` was not found",
698+
job.id
699+
),
700+
)));
701+
};
702+
let egress_proxy = match job.egress_proxy_id.as_deref() {
703+
Some(proxy_id) => {
704+
let proxy = state
705+
.storage
706+
.find_egress_proxy(proxy_id)
707+
.await
708+
.map_err(GatewayError::WebhookStorage)?;
709+
if proxy.is_none() {
710+
return Err(GatewayError::Provider(ProviderError::InvalidRoute(
711+
format!(
712+
"tracked async job `{}` egress proxy `{proxy_id}` was not found",
713+
job.id
714+
),
715+
)));
716+
}
717+
proxy
718+
}
719+
None => None,
720+
};
721+
let selection_reason = match &egress_proxy {
722+
Some(proxy) => format!(
723+
"async_job:{} upstream_job:{} account:{} egress:proxy id:{} target:{}",
724+
job.id,
725+
job.upstream_job_id,
726+
account.id,
727+
proxy.id,
728+
summarize_proxy_url(&proxy.proxy_url)
729+
),
730+
None => format!(
731+
"async_job:{} upstream_job:{} account:{} egress:direct reason:tracked_async_job",
732+
job.id, job.upstream_job_id, account.id
733+
),
734+
};
735+
info!(
736+
async_job_id = job.id.as_str(),
737+
upstream_job_id = job.upstream_job_id.as_str(),
738+
provider_account_id = account.id.as_str(),
739+
egress_proxy_id = egress_proxy.as_ref().map(|value| value.id.as_str()),
740+
selection_reason = selection_reason.as_str(),
741+
"firecrawl async status request pinned to tracked route"
742+
);
743+
Ok(Some(PinnedAsyncRouteSelection {
744+
account,
745+
egress_proxy,
746+
selection_reason,
747+
}))
748+
}
749+
750+
fn parse_firecrawl_async_status_path(upstream_path: &str) -> Option<(&str, &str)> {
751+
let normalized = upstream_path.trim_matches('/');
752+
normalized
753+
.strip_prefix("v2/crawl/")
754+
.map(|job_id| ("v2/crawl", job_id))
755+
.or_else(|| {
756+
normalized
757+
.strip_prefix("v2/batch/scrape/")
758+
.map(|job_id| ("v2/batch/scrape", job_id))
759+
})
760+
.filter(|(_, job_id)| !job_id.is_empty() && !job_id.contains('/'))
761+
}
762+
617763
async fn forward_request(
618764
client: &Client,
619765
method: &Method,

0 commit comments

Comments
 (0)