@@ -166,16 +166,15 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
166166
167167 tracing::debug!(?namespace_id, %pool_name, ?actor_id, ?generation, "received outbound request");
168168
169- // Check pool
170169 let db = ctx.udb()?;
171- let (pool_res, namespace_res, preloaded_kv) = tokio::try_join!(
170+ let (namespace_res, pool_res, preloaded_kv) = tokio::try_join!(
171+ ctx.op(namespace::ops::get_global::Input {
172+ namespace_ids: vec![namespace_id],
173+ }),
172174 ctx.op(pegboard::ops::runner_config::get::Input {
173175 runners: vec![(namespace_id, pool_name.clone())],
174176 bypass_cache: false,
175177 }),
176- ctx.op(namespace::ops::get_global::Input {
177- namespace_ids: vec![namespace_id],
178- }),
179178 pegboard::actor_kv::preload::fetch_preloaded_kv(
180179 &db,
181180 ctx.config().pegboard(),
@@ -184,10 +183,6 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
184183 &actor_config.name,
185184 ),
186185 )?;
187- let Some(pool) = pool_res.into_iter().next() else {
188- tracing::debug!("pool does not exist, ending outbound handler");
189- return Ok(());
190- };
191186 let Some(namespace) = namespace_res.into_iter().next() else {
192187 tracing::error!("namespace not found, ending outbound handler");
193188 report_error(
@@ -199,20 +194,10 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
199194 .await;
200195 return Ok(());
201196 };
202-
203- let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![
204- protocol::CommandWrapper {
205- checkpoint,
206- inner: protocol::Command::CommandStartActor(protocol::CommandStartActor {
207- config: actor_config,
208- // Empty because request ids are ephemeral. This is intercepted by guard and
209- // populated before it reaches the envoy
210- hibernating_requests: Vec::new(),
211- preloaded_kv,
212- }),
213- },
214- ]))
215- .serialize_with_embedded_version(pool.protocol_version.unwrap_or(PROTOCOL_VERSION))?;
197+ let Some(pool) = pool_res.into_iter().next() else {
198+ tracing::debug!("pool does not exist, ending outbound handler");
199+ return Ok(());
200+ };
216201
217202 let RunnerConfigKind::Serverless {
218203 url,
@@ -228,6 +213,20 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
228213 return Ok(());
229214 };
230215
216+ let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![
217+ protocol::CommandWrapper {
218+ checkpoint,
219+ inner: protocol::Command::CommandStartActor(protocol::CommandStartActor {
220+ config: actor_config,
221+ // Empty because request ids are ephemeral. This is intercepted by guard and
222+ // populated before it reaches the envoy
223+ hibernating_requests: Vec::new(),
224+ preloaded_kv,
225+ }),
226+ },
227+ ]))
228+ .serialize_with_embedded_version(pool.protocol_version.unwrap_or(PROTOCOL_VERSION))?;
229+
231230 // Send ack to actor wf before starting an outbound req
232231 ctx.signal(pegboard::workflows::actor2::Allocated { generation })
233232 .to_workflow::<pegboard::workflows::actor2::Workflow>()
@@ -250,6 +249,10 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
250249 &url,
251250 headers,
252251 request_lifespan,
252+ ctx.config()
253+ .auth
254+ .as_ref()
255+ .map(|a| a.admin_token.read().as_str()),
253256 )
254257 .await;
255258
@@ -272,15 +275,13 @@ async fn serverless_outbound_req(
272275 url: &str,
273276 headers: HashMap<String, String>,
274277 request_lifespan: u32,
278+ token: Option<&str>,
275279) -> Result<()> {
276280 let current_dc = ctx.config().topology().current_dc()?;
277281 let mut term_signal = TermSignal::get();
278282
279- let token = if let Some(auth) = &ctx.config().auth {
280- Some((
281- X_RIVET_TOKEN,
282- HeaderValue::try_from(auth.admin_token.read())?,
283- ))
283+ let token = if let Some(token) = token {
284+ Some((X_RIVET_TOKEN, HeaderValue::try_from(token)?))
284285 } else {
285286 None
286287 };
0 commit comments