Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions engine/packages/pegboard-gateway/src/keepalive_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@ pub async fn task(
request_id: protocol::RequestId,
mut keepalive_abort_rx: watch::Receiver<()>,
) -> Result<LifecycleResult> {
let mut ping_interval = tokio::time::interval(Duration::from_millis(
(ctx.config()
.pegboard()
.hibernating_request_eligible_threshold()
/ 2)
.try_into()?,
));
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
actor_id,
gateway_id,
request_id,
})
.await?;
shared_state.keepalive_hws(request_id).await?;

let ping_interval_ms = (ctx
.config()
.pegboard()
.hibernating_request_eligible_threshold()
/ 2)
.max(1);
let mut ping_interval =
tokio::time::interval(Duration::from_millis(ping_interval_ms.try_into()?));
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ impl PegboardGateway {
})
.await?
{
if actor.runner_id.is_some() {
if !actor.sleeping && actor.runner_id.is_some() {
tracing::debug!("actor became ready during hibernation");

return Ok(HibernationResult::Continue);
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@
let pegboard_config = config.pegboard();
Self(Arc::new(SharedStateInner {
ups,
gateway_id,

Check warning on line 151 in engine/packages/pegboard-gateway/src/shared_state.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/pegboard-gateway/src/shared_state.rs
receiver_subject,
in_flight_requests: HashMap::new(),
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(),
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold().max(1),
gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()),
tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(),
hws_message_ack_timeout: Duration::from_millis(
Expand Down
23 changes: 16 additions & 7 deletions engine/packages/pegboard-gateway2/src/keepalive_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@ pub async fn task(
request_id: protocol::RequestId,
mut keepalive_abort_rx: watch::Receiver<()>,
) -> Result<LifecycleResult> {
let mut ping_interval = tokio::time::interval(Duration::from_millis(
(ctx.config()
.pegboard()
.hibernating_request_eligible_threshold()
/ 2)
.try_into()?,
));
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
actor_id,
gateway_id,
request_id,
})
.await?;
shared_state.keepalive_hws(request_id).await?;

let ping_interval_ms = (ctx
.config()
.pegboard()
.hibernating_request_eligible_threshold()
/ 2)
.max(1);
let mut ping_interval =
tokio::time::interval(Duration::from_millis(ping_interval_ms.try_into()?));
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ impl PegboardGateway2 {
})
.await?
{
if actor.envoy_key.is_some() {
if !actor.sleeping && actor.envoy_key.is_some() {
tracing::debug!("actor became ready during hibernation");

return Ok(HibernationResult::Continue);
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway2/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@
let pegboard_config = config.pegboard();
Self(Arc::new(SharedStateInner {
ups,
gateway_id,

Check warning on line 98 in engine/packages/pegboard-gateway2/src/shared_state.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/pegboard-gateway2/src/shared_state.rs
receiver_subject,
in_flight_requests: HashMap::new(),
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(),
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold().max(1),
gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()),
tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(),
hws_message_ack_timeout: Duration::from_millis(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ pub async fn pegboard_actor_hibernating_request_delete(
ctx: &OperationCtx,
input: &Input,
) -> Result<()> {
tracing::info!(
actor_id=%input.actor_id,
gateway_id=%protocol::util::id_to_string(&input.gateway_id),
request_id=%protocol::util::id_to_string(&input.request_id),
"deleting hibernating request"
);

ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());
Expand Down
13 changes: 11 additions & 2 deletions engine/packages/pegboard/src/ops/actor/hibernating_request/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ pub async fn pegboard_actor_hibernating_request_list(
.pegboard()
.hibernating_request_eligible_threshold();

ctx.udb()?
let res = ctx
.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());

Expand Down Expand Up @@ -61,5 +62,13 @@ pub async fn pegboard_actor_hibernating_request_list(
.await
})
.custom_instrument(tracing::info_span!("hibernating_request_list_tx"))
.await
.await?;

tracing::info!(
actor_id=%input.actor_id,
count=res.len(),
"listed hibernating requests"
);

Ok(res)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ pub async fn pegboard_actor_hibernating_request_upsert(
ctx: &OperationCtx,
input: &Input,
) -> Result<()> {
tracing::info!(
actor_id=%input.actor_id,
gateway_id=%protocol::util::id_to_string(&input.gateway_id),
request_id=%protocol::util::id_to_string(&input.request_id),
"upserting hibernating request"
);

ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());
Expand Down
73 changes: 47 additions & 26 deletions engine/packages/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1319,46 +1319,68 @@ pub async fn insert_and_send_commands(
input: &InsertAndSendCommandsInput,
) -> Result<()> {
let mut state = ctx.state::<State>()?;
let mut commands = input.commands.clone();

for command in &mut commands {
if let protocol::mk2::Command::CommandStartActor(start) = command {
start.hibernating_requests = ctx
.op(crate::ops::actor::hibernating_request::list::Input {
actor_id: input.actor_id,
})
.await?
.into_iter()
.map(|req| protocol::mk2::HibernatingRequest {
gateway_id: req.gateway_id,
request_id: req.request_id,
})
.collect();
}
}

let runner_state = state.runner_state.get_or_insert_default();
let old_last_command_idx = runner_state.last_command_idx;
runner_state.last_command_idx += input.commands.len() as i64;
runner_state.last_command_idx += commands.len() as i64;

// This does not have to be part of its own activity because the txn is idempotent
let last_command_idx = runner_state.last_command_idx;
let commands_for_tx = commands.clone();
ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());
.run(|tx| {
let commands_for_tx = commands_for_tx.clone();

tx.write(
&keys::runner::ActorLastCommandIdxKey::new(
input.runner_id,
input.actor_id,
input.generation,
),
last_command_idx,
)?;
async move {
let tx = tx.with_subspace(keys::subspace());

for (i, command) in input.commands.iter().enumerate() {
tx.write(
&keys::runner::ActorCommandKey::new(
&keys::runner::ActorLastCommandIdxKey::new(
input.runner_id,
input.actor_id,
input.generation,
old_last_command_idx + i as i64 + 1,
),
match command {
protocol::mk2::Command::CommandStartActor(x) => {
protocol::mk2::ActorCommandKeyData::CommandStartActor(x.clone())
}
protocol::mk2::Command::CommandStopActor => {
protocol::mk2::ActorCommandKeyData::CommandStopActor
}
},
last_command_idx,
)?;
}

Ok(())
for (i, command) in commands_for_tx.iter().enumerate() {
tx.write(
&keys::runner::ActorCommandKey::new(
input.runner_id,
input.actor_id,
input.generation,
old_last_command_idx + i as i64 + 1,
),
match command {
protocol::mk2::Command::CommandStartActor(x) => {
protocol::mk2::ActorCommandKeyData::CommandStartActor(x.clone())
}
protocol::mk2::Command::CommandStopActor => {
protocol::mk2::ActorCommandKeyData::CommandStopActor
}
},
)?;
}

Ok(())
}
})
.await?;

Expand All @@ -1367,8 +1389,7 @@ pub async fn insert_and_send_commands(

let message_serialized =
versioned::ToRunnerMk2::wrap_latest(protocol::mk2::ToRunner::ToClientCommands(
input
.commands
commands
.iter()
.enumerate()
.map(|(i, command)| protocol::mk2::CommandWrapper {
Expand Down
93 changes: 62 additions & 31 deletions engine/packages/pegboard/src/workflows/actor2/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,18 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu
.await?;
}
Allocation::Serverful { envoy_key } => {
let hibernating_requests = ctx
.op(crate::ops::actor::hibernating_request::list::Input {
actor_id: state.actor_id,
})
.await?
.into_iter()
.map(|req| protocol::HibernatingRequest {
gateway_id: req.gateway_id,
request_id: req.request_id,
})
.collect();

let command = protocol::Command::CommandStartActor(protocol::CommandStartActor {
config: protocol::ActorConfig {
name: state.name.clone(),
Expand All @@ -362,9 +374,7 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu
.as_ref()
.and_then(|x| BASE64_STANDARD.decode(x).ok()),
},
// Empty because request ids are ephemeral. This is intercepted by guard and
// populated before it reaches the runner
hibernating_requests: Vec::new(),
hibernating_requests,
preloaded_kv: None,
});

Expand Down Expand Up @@ -901,45 +911,67 @@ pub async fn insert_and_send_commands(
let old_last_command_idx = state.envoy_last_command_idx;
let namespace_id = state.namespace_id;
let actor_id = state.actor_id;
let mut commands = input.commands.clone();

for command in &mut commands {
if let protocol::Command::CommandStartActor(start) = command {
start.hibernating_requests = ctx
.op(crate::ops::actor::hibernating_request::list::Input { actor_id })
.await?
.into_iter()
.map(|req| protocol::HibernatingRequest {
gateway_id: req.gateway_id,
request_id: req.request_id,
})
.collect();
}
}

let commands_for_tx = commands.clone();

ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());
.run(|tx| {
let commands_for_tx = commands_for_tx.clone();

async move {
let tx = tx.with_subspace(keys::subspace());

for (i, command) in commands_for_tx.iter().enumerate() {
tx.write(
&keys::envoy::ActorCommandKey::new(
namespace_id,
input.envoy_key.clone(),
actor_id,
input.generation,
old_last_command_idx + i as i64 + 1,
),
match command {
protocol::Command::CommandStartActor(x) => {
protocol::ActorCommandKeyData::CommandStartActor(x.clone())
}
protocol::Command::CommandStopActor(x) => {
protocol::ActorCommandKeyData::CommandStopActor(x.clone())
}
},
)?;
}

for (i, command) in input.commands.iter().enumerate() {
tx.write(
&keys::envoy::ActorCommandKey::new(
&keys::envoy::ActorLastCommandIdxKey::new(
namespace_id,
input.envoy_key.clone(),
actor_id,
input.generation,
old_last_command_idx + i as i64 + 1,
),
match command {
protocol::Command::CommandStartActor(x) => {
protocol::ActorCommandKeyData::CommandStartActor(x.clone())
}
protocol::Command::CommandStopActor(x) => {
protocol::ActorCommandKeyData::CommandStopActor(x.clone())
}
},
old_last_command_idx + commands_for_tx.len() as i64,
)?;
}

tx.write(
&keys::envoy::ActorLastCommandIdxKey::new(
namespace_id,
input.envoy_key.clone(),
actor_id,
input.generation,
),
old_last_command_idx + input.commands.len() as i64,
)?;

Ok(())
Ok(())
}
})
.await?;

state.envoy_last_command_idx += input.commands.len() as i64;
state.envoy_last_command_idx += commands.len() as i64;

let receiver_subject = crate::pubsub_subjects::EnvoyReceiverSubject::new(
state.namespace_id,
Expand All @@ -949,8 +981,7 @@ pub async fn insert_and_send_commands(

let message_serialized =
versioned::ToEnvoyConn::wrap_latest(protocol::ToEnvoyConn::ToEnvoyCommands(
input
.commands
commands
.iter()
.enumerate()
.map(|(i, command)| protocol::CommandWrapper {
Expand Down
2 changes: 1 addition & 1 deletion engine/sdks/rust/envoy-client/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn single_connection(
) -> anyhow::Result<Option<crate::utils::ParsedCloseReason>> {
let url = ws_url(shared);
let protocols = {
let mut p = vec!["rivet".to_string()];
let mut p = vec!["rivet".to_string(), "rivet_target.envoy".to_string()];
if let Some(token) = &shared.config.token {
p.push(format!("rivet_token.{token}"));
}
Expand Down
Loading