From c862fec3b1186ae436a84281cfc0f3161e40d175 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Tue, 14 Apr 2026 15:14:57 -0700 Subject: [PATCH] fix(pegboard): persist and replay hibernating requests --- .../pegboard-gateway/src/keepalive_task.rs | 23 +++-- engine/packages/pegboard-gateway/src/lib.rs | 2 +- .../pegboard-gateway/src/shared_state.rs | 2 +- .../pegboard-gateway2/src/keepalive_task.rs | 23 +++-- engine/packages/pegboard-gateway2/src/lib.rs | 2 +- .../pegboard-gateway2/src/shared_state.rs | 2 +- .../ops/actor/hibernating_request/delete.rs | 7 ++ .../src/ops/actor/hibernating_request/list.rs | 13 ++- .../ops/actor/hibernating_request/upsert.rs | 7 ++ .../pegboard/src/workflows/actor/runtime.rs | 73 +++++++++------ .../pegboard/src/workflows/actor2/runtime.rs | 93 ++++++++++++------- .../sdks/rust/envoy-client/src/connection.rs | 2 +- 12 files changed, 171 insertions(+), 78 deletions(-) diff --git a/engine/packages/pegboard-gateway/src/keepalive_task.rs b/engine/packages/pegboard-gateway/src/keepalive_task.rs index 3ea3378956..2cfdd3ee34 100644 --- a/engine/packages/pegboard-gateway/src/keepalive_task.rs +++ b/engine/packages/pegboard-gateway/src/keepalive_task.rs @@ -20,13 +20,22 @@ pub async fn task( request_id: protocol::RequestId, mut keepalive_abort_rx: watch::Receiver<()>, ) -> Result { - let mut ping_interval = tokio::time::interval(Duration::from_millis( - (ctx.config() - .pegboard() - .hibernating_request_eligible_threshold() - / 2) - .try_into()?, - )); + ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input { + actor_id, + gateway_id, + request_id, + }) + .await?; + shared_state.keepalive_hws(request_id).await?; + + let ping_interval_ms = (ctx + .config() + .pegboard() + .hibernating_request_eligible_threshold() + / 2) + .max(1); + let mut ping_interval = + tokio::time::interval(Duration::from_millis(ping_interval_ms.try_into()?)); ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index 6e0c073d44..49b8378afd 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -826,7 +826,7 @@ impl PegboardGateway { }) .await? { - if actor.runner_id.is_some() { + if !actor.sleeping && actor.runner_id.is_some() { tracing::debug!("actor became ready during hibernation"); return Ok(HibernationResult::Continue); diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index d504f74961..59dd44348b 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -151,7 +151,7 @@ impl SharedState { gateway_id, receiver_subject, in_flight_requests: HashMap::new(), - hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(), + hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold().max(1), gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()), tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(), hws_message_ack_timeout: Duration::from_millis( diff --git a/engine/packages/pegboard-gateway2/src/keepalive_task.rs b/engine/packages/pegboard-gateway2/src/keepalive_task.rs index 099ba798f5..6ba0454327 100644 --- a/engine/packages/pegboard-gateway2/src/keepalive_task.rs +++ b/engine/packages/pegboard-gateway2/src/keepalive_task.rs @@ -20,13 +20,22 @@ pub async fn task( request_id: protocol::RequestId, mut keepalive_abort_rx: watch::Receiver<()>, ) -> Result { - let mut ping_interval = tokio::time::interval(Duration::from_millis( - (ctx.config() - .pegboard() - .hibernating_request_eligible_threshold() - / 2) - .try_into()?, - )); + ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input { + actor_id, + gateway_id, + request_id, + }) + .await?; + shared_state.keepalive_hws(request_id).await?; + + let ping_interval_ms = (ctx + .config() + .pegboard() + .hibernating_request_eligible_threshold() + / 2) + .max(1); + let mut ping_interval = + tokio::time::interval(Duration::from_millis(ping_interval_ms.try_into()?)); ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { diff --git a/engine/packages/pegboard-gateway2/src/lib.rs b/engine/packages/pegboard-gateway2/src/lib.rs index b70d4c961a..8742a5c69d 100644 --- a/engine/packages/pegboard-gateway2/src/lib.rs +++ b/engine/packages/pegboard-gateway2/src/lib.rs @@ -832,7 +832,7 @@ impl PegboardGateway2 { }) .await? { - if actor.envoy_key.is_some() { + if !actor.sleeping && actor.envoy_key.is_some() { tracing::debug!("actor became ready during hibernation"); return Ok(HibernationResult::Continue); diff --git a/engine/packages/pegboard-gateway2/src/shared_state.rs b/engine/packages/pegboard-gateway2/src/shared_state.rs index 8bb009c32a..47bca6f349 100644 --- a/engine/packages/pegboard-gateway2/src/shared_state.rs +++ b/engine/packages/pegboard-gateway2/src/shared_state.rs @@ -98,7 +98,7 @@ impl SharedState { gateway_id, receiver_subject, in_flight_requests: HashMap::new(), - hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(), + hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold().max(1), gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()), tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(), hws_message_ack_timeout: Duration::from_millis( diff --git a/engine/packages/pegboard/src/ops/actor/hibernating_request/delete.rs b/engine/packages/pegboard/src/ops/actor/hibernating_request/delete.rs index 09f1a96bd1..b83883d565 100644 --- a/engine/packages/pegboard/src/ops/actor/hibernating_request/delete.rs +++ b/engine/packages/pegboard/src/ops/actor/hibernating_request/delete.rs @@ -16,6 +16,13 @@ pub async fn pegboard_actor_hibernating_request_delete( ctx: &OperationCtx, input: &Input, ) -> Result<()> { + tracing::info!( + actor_id=%input.actor_id, + gateway_id=%protocol::util::id_to_string(&input.gateway_id), + request_id=%protocol::util::id_to_string(&input.request_id), + "deleting hibernating request" + ); + ctx.udb()? .run(|tx| async move { let tx = tx.with_subspace(keys::subspace()); diff --git a/engine/packages/pegboard/src/ops/actor/hibernating_request/list.rs b/engine/packages/pegboard/src/ops/actor/hibernating_request/list.rs index 0ff8f98da8..b8541321d1 100644 --- a/engine/packages/pegboard/src/ops/actor/hibernating_request/list.rs +++ b/engine/packages/pegboard/src/ops/actor/hibernating_request/list.rs @@ -27,7 +27,8 @@ pub async fn pegboard_actor_hibernating_request_list( .pegboard() .hibernating_request_eligible_threshold(); - ctx.udb()? + let res = ctx + .udb()? .run(|tx| async move { let tx = tx.with_subspace(keys::subspace()); @@ -61,5 +62,13 @@ pub async fn pegboard_actor_hibernating_request_list( .await }) .custom_instrument(tracing::info_span!("hibernating_request_list_tx")) - .await + .await?; + + tracing::info!( + actor_id=%input.actor_id, + count=res.len(), + "listed hibernating requests" + ); + + Ok(res) } diff --git a/engine/packages/pegboard/src/ops/actor/hibernating_request/upsert.rs b/engine/packages/pegboard/src/ops/actor/hibernating_request/upsert.rs index 6fb0981a41..f3f936101a 100644 --- a/engine/packages/pegboard/src/ops/actor/hibernating_request/upsert.rs +++ b/engine/packages/pegboard/src/ops/actor/hibernating_request/upsert.rs @@ -16,6 +16,13 @@ pub async fn pegboard_actor_hibernating_request_upsert( ctx: &OperationCtx, input: &Input, ) -> Result<()> { + tracing::info!( + actor_id=%input.actor_id, + gateway_id=%protocol::util::id_to_string(&input.gateway_id), + request_id=%protocol::util::id_to_string(&input.request_id), + "upserting hibernating request" + ); + ctx.udb()? .run(|tx| async move { let tx = tx.with_subspace(keys::subspace()); diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index f9c45854b6..125d00df73 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -1319,46 +1319,68 @@ pub async fn insert_and_send_commands( input: &InsertAndSendCommandsInput, ) -> Result<()> { let mut state = ctx.state::()?; + let mut commands = input.commands.clone(); + + for command in &mut commands { + if let protocol::mk2::Command::CommandStartActor(start) = command { + start.hibernating_requests = ctx + .op(crate::ops::actor::hibernating_request::list::Input { + actor_id: input.actor_id, + }) + .await? + .into_iter() + .map(|req| protocol::mk2::HibernatingRequest { + gateway_id: req.gateway_id, + request_id: req.request_id, + }) + .collect(); + } + } let runner_state = state.runner_state.get_or_insert_default(); let old_last_command_idx = runner_state.last_command_idx; - runner_state.last_command_idx += input.commands.len() as i64; + runner_state.last_command_idx += commands.len() as i64; // This does not have to be part of its own activity because the txn is idempotent let last_command_idx = runner_state.last_command_idx; + let commands_for_tx = commands.clone(); ctx.udb()? - .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); + .run(|tx| { + let commands_for_tx = commands_for_tx.clone(); - tx.write( - &keys::runner::ActorLastCommandIdxKey::new( - input.runner_id, - input.actor_id, - input.generation, - ), - last_command_idx, - )?; + async move { + let tx = tx.with_subspace(keys::subspace()); - for (i, command) in input.commands.iter().enumerate() { tx.write( - &keys::runner::ActorCommandKey::new( + &keys::runner::ActorLastCommandIdxKey::new( input.runner_id, input.actor_id, input.generation, - old_last_command_idx + i as i64 + 1, ), - match command { - protocol::mk2::Command::CommandStartActor(x) => { - protocol::mk2::ActorCommandKeyData::CommandStartActor(x.clone()) - } - protocol::mk2::Command::CommandStopActor => { - protocol::mk2::ActorCommandKeyData::CommandStopActor - } - }, + last_command_idx, )?; - } - Ok(()) + for (i, command) in commands_for_tx.iter().enumerate() { + tx.write( + &keys::runner::ActorCommandKey::new( + input.runner_id, + input.actor_id, + input.generation, + old_last_command_idx + i as i64 + 1, + ), + match command { + protocol::mk2::Command::CommandStartActor(x) => { + protocol::mk2::ActorCommandKeyData::CommandStartActor(x.clone()) + } + protocol::mk2::Command::CommandStopActor => { + protocol::mk2::ActorCommandKeyData::CommandStopActor + } + }, + )?; + } + + Ok(()) + } }) .await?; @@ -1367,8 +1389,7 @@ pub async fn insert_and_send_commands( let message_serialized = versioned::ToRunnerMk2::wrap_latest(protocol::mk2::ToRunner::ToClientCommands( - input - .commands + commands .iter() .enumerate() .map(|(i, command)| protocol::mk2::CommandWrapper { diff --git a/engine/packages/pegboard/src/workflows/actor2/runtime.rs b/engine/packages/pegboard/src/workflows/actor2/runtime.rs index eff05c0367..63c606a9bf 100644 --- a/engine/packages/pegboard/src/workflows/actor2/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor2/runtime.rs @@ -352,6 +352,18 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu .await?; } Allocation::Serverful { envoy_key } => { + let hibernating_requests = ctx + .op(crate::ops::actor::hibernating_request::list::Input { + actor_id: state.actor_id, + }) + .await? + .into_iter() + .map(|req| protocol::HibernatingRequest { + gateway_id: req.gateway_id, + request_id: req.request_id, + }) + .collect(); + let command = protocol::Command::CommandStartActor(protocol::CommandStartActor { config: protocol::ActorConfig { name: state.name.clone(), @@ -362,9 +374,7 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu .as_ref() .and_then(|x| BASE64_STANDARD.decode(x).ok()), }, - // Empty because request ids are ephemeral. This is intercepted by guard and - // populated before it reaches the runner - hibernating_requests: Vec::new(), + hibernating_requests, preloaded_kv: None, }); @@ -901,45 +911,67 @@ pub async fn insert_and_send_commands( let old_last_command_idx = state.envoy_last_command_idx; let namespace_id = state.namespace_id; let actor_id = state.actor_id; + let mut commands = input.commands.clone(); + + for command in &mut commands { + if let protocol::Command::CommandStartActor(start) = command { + start.hibernating_requests = ctx + .op(crate::ops::actor::hibernating_request::list::Input { actor_id }) + .await? + .into_iter() + .map(|req| protocol::HibernatingRequest { + gateway_id: req.gateway_id, + request_id: req.request_id, + }) + .collect(); + } + } + + let commands_for_tx = commands.clone(); + ctx.udb()? - .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); + .run(|tx| { + let commands_for_tx = commands_for_tx.clone(); + + async move { + let tx = tx.with_subspace(keys::subspace()); + + for (i, command) in commands_for_tx.iter().enumerate() { + tx.write( + &keys::envoy::ActorCommandKey::new( + namespace_id, + input.envoy_key.clone(), + actor_id, + input.generation, + old_last_command_idx + i as i64 + 1, + ), + match command { + protocol::Command::CommandStartActor(x) => { + protocol::ActorCommandKeyData::CommandStartActor(x.clone()) + } + protocol::Command::CommandStopActor(x) => { + protocol::ActorCommandKeyData::CommandStopActor(x.clone()) + } + }, + )?; + } - for (i, command) in input.commands.iter().enumerate() { tx.write( - &keys::envoy::ActorCommandKey::new( + &keys::envoy::ActorLastCommandIdxKey::new( namespace_id, input.envoy_key.clone(), actor_id, input.generation, - old_last_command_idx + i as i64 + 1, ), - match command { - protocol::Command::CommandStartActor(x) => { - protocol::ActorCommandKeyData::CommandStartActor(x.clone()) - } - protocol::Command::CommandStopActor(x) => { - protocol::ActorCommandKeyData::CommandStopActor(x.clone()) - } - }, + old_last_command_idx + commands_for_tx.len() as i64, )?; - } - - tx.write( - &keys::envoy::ActorLastCommandIdxKey::new( - namespace_id, - input.envoy_key.clone(), - actor_id, - input.generation, - ), - old_last_command_idx + input.commands.len() as i64, - )?; - Ok(()) + Ok(()) + } }) .await?; - state.envoy_last_command_idx += input.commands.len() as i64; + state.envoy_last_command_idx += commands.len() as i64; let receiver_subject = crate::pubsub_subjects::EnvoyReceiverSubject::new( state.namespace_id, @@ -949,8 +981,7 @@ pub async fn insert_and_send_commands( let message_serialized = versioned::ToEnvoyConn::wrap_latest(protocol::ToEnvoyConn::ToEnvoyCommands( - input - .commands + commands .iter() .enumerate() .map(|(i, command)| protocol::CommandWrapper { diff --git a/engine/sdks/rust/envoy-client/src/connection.rs b/engine/sdks/rust/envoy-client/src/connection.rs index 30f979192a..925f28cf59 100644 --- a/engine/sdks/rust/envoy-client/src/connection.rs +++ b/engine/sdks/rust/envoy-client/src/connection.rs @@ -74,7 +74,7 @@ async fn single_connection( ) -> anyhow::Result> { let url = ws_url(shared); let protocols = { - let mut p = vec!["rivet".to_string()]; + let mut p = vec!["rivet".to_string(), "rivet_target.envoy".to_string()]; if let Some(token) = &shared.config.token { p.push(format!("rivet_token.{token}")); }