Skip to content

Commit 9191b42

Browse files
committed
fix(pegboard): persist and replay hibernating requests
1 parent 9550912 commit 9191b42

12 files changed

Lines changed: 171 additions & 78 deletions

File tree

engine/packages/pegboard-gateway/src/keepalive_task.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,22 @@ pub async fn task(
2020
request_id: protocol::RequestId,
2121
mut keepalive_abort_rx: watch::Receiver<()>,
2222
) -> Result<LifecycleResult> {
23-
let mut ping_interval = tokio::time::interval(Duration::from_millis(
24-
(ctx.config()
25-
.pegboard()
26-
.hibernating_request_eligible_threshold()
27-
/ 2)
28-
.try_into()?,
29-
));
23+
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
24+
actor_id,
25+
gateway_id,
26+
request_id,
27+
})
28+
.await?;
29+
shared_state.keepalive_hws(request_id).await?;
30+
31+
let ping_interval_ms = (ctx
32+
.config()
33+
.pegboard()
34+
.hibernating_request_eligible_threshold()
35+
/ 2)
36+
.max(1);
37+
let mut ping_interval =
38+
tokio::time::interval(Duration::from_millis(ping_interval_ms.try_into()?));
3039
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3140

3241
loop {

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,7 @@ impl PegboardGateway {
826826
})
827827
.await?
828828
{
829-
if actor.runner_id.is_some() {
829+
if !actor.sleeping && actor.runner_id.is_some() {
830830
tracing::debug!("actor became ready during hibernation");
831831

832832
return Ok(HibernationResult::Continue);

engine/packages/pegboard-gateway/src/shared_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ impl SharedState {
151151
gateway_id,
152152
receiver_subject,
153153
in_flight_requests: HashMap::new(),
154-
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(),
154+
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold().max(1),
155155
gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()),
156156
tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(),
157157
hws_message_ack_timeout: Duration::from_millis(

engine/packages/pegboard-gateway2/src/keepalive_task.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,22 @@ pub async fn task(
2020
request_id: protocol::RequestId,
2121
mut keepalive_abort_rx: watch::Receiver<()>,
2222
) -> Result<LifecycleResult> {
23-
let mut ping_interval = tokio::time::interval(Duration::from_millis(
24-
(ctx.config()
25-
.pegboard()
26-
.hibernating_request_eligible_threshold()
27-
/ 2)
28-
.try_into()?,
29-
));
23+
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
24+
actor_id,
25+
gateway_id,
26+
request_id,
27+
})
28+
.await?;
29+
shared_state.keepalive_hws(request_id).await?;
30+
31+
let ping_interval_ms = (ctx
32+
.config()
33+
.pegboard()
34+
.hibernating_request_eligible_threshold()
35+
/ 2)
36+
.max(1);
37+
let mut ping_interval =
38+
tokio::time::interval(Duration::from_millis(ping_interval_ms.try_into()?));
3039
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3140

3241
loop {

engine/packages/pegboard-gateway2/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,7 @@ impl PegboardGateway2 {
832832
})
833833
.await?
834834
{
835-
if actor.envoy_key.is_some() {
835+
if !actor.sleeping && actor.envoy_key.is_some() {
836836
tracing::debug!("actor became ready during hibernation");
837837

838838
return Ok(HibernationResult::Continue);

engine/packages/pegboard-gateway2/src/shared_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl SharedState {
9898
gateway_id,
9999
receiver_subject,
100100
in_flight_requests: HashMap::new(),
101-
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(),
101+
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold().max(1),
102102
gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()),
103103
tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(),
104104
hws_message_ack_timeout: Duration::from_millis(

engine/packages/pegboard/src/ops/actor/hibernating_request/delete.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ pub async fn pegboard_actor_hibernating_request_delete(
1616
ctx: &OperationCtx,
1717
input: &Input,
1818
) -> Result<()> {
19+
tracing::info!(
20+
actor_id=%input.actor_id,
21+
gateway_id=%protocol::util::id_to_string(&input.gateway_id),
22+
request_id=%protocol::util::id_to_string(&input.request_id),
23+
"deleting hibernating request"
24+
);
25+
1926
ctx.udb()?
2027
.run(|tx| async move {
2128
let tx = tx.with_subspace(keys::subspace());

engine/packages/pegboard/src/ops/actor/hibernating_request/list.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ pub async fn pegboard_actor_hibernating_request_list(
2727
.pegboard()
2828
.hibernating_request_eligible_threshold();
2929

30-
ctx.udb()?
30+
let res = ctx
31+
.udb()?
3132
.run(|tx| async move {
3233
let tx = tx.with_subspace(keys::subspace());
3334

@@ -61,5 +62,13 @@ pub async fn pegboard_actor_hibernating_request_list(
6162
.await
6263
})
6364
.custom_instrument(tracing::info_span!("hibernating_request_list_tx"))
64-
.await
65+
.await?;
66+
67+
tracing::info!(
68+
actor_id=%input.actor_id,
69+
count=res.len(),
70+
"listed hibernating requests"
71+
);
72+
73+
Ok(res)
6574
}

engine/packages/pegboard/src/ops/actor/hibernating_request/upsert.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ pub async fn pegboard_actor_hibernating_request_upsert(
1616
ctx: &OperationCtx,
1717
input: &Input,
1818
) -> Result<()> {
19+
tracing::info!(
20+
actor_id=%input.actor_id,
21+
gateway_id=%protocol::util::id_to_string(&input.gateway_id),
22+
request_id=%protocol::util::id_to_string(&input.request_id),
23+
"upserting hibernating request"
24+
);
25+
1926
ctx.udb()?
2027
.run(|tx| async move {
2128
let tx = tx.with_subspace(keys::subspace());

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,46 +1319,68 @@ pub async fn insert_and_send_commands(
13191319
input: &InsertAndSendCommandsInput,
13201320
) -> Result<()> {
13211321
let mut state = ctx.state::<State>()?;
1322+
let mut commands = input.commands.clone();
1323+
1324+
for command in &mut commands {
1325+
if let protocol::mk2::Command::CommandStartActor(start) = command {
1326+
start.hibernating_requests = ctx
1327+
.op(crate::ops::actor::hibernating_request::list::Input {
1328+
actor_id: input.actor_id,
1329+
})
1330+
.await?
1331+
.into_iter()
1332+
.map(|req| protocol::mk2::HibernatingRequest {
1333+
gateway_id: req.gateway_id,
1334+
request_id: req.request_id,
1335+
})
1336+
.collect();
1337+
}
1338+
}
13221339

13231340
let runner_state = state.runner_state.get_or_insert_default();
13241341
let old_last_command_idx = runner_state.last_command_idx;
1325-
runner_state.last_command_idx += input.commands.len() as i64;
1342+
runner_state.last_command_idx += commands.len() as i64;
13261343

13271344
// This does not have to be part of its own activity because the txn is idempotent
13281345
let last_command_idx = runner_state.last_command_idx;
1346+
let commands_for_tx = commands.clone();
13291347
ctx.udb()?
1330-
.run(|tx| async move {
1331-
let tx = tx.with_subspace(keys::subspace());
1348+
.run(|tx| {
1349+
let commands_for_tx = commands_for_tx.clone();
13321350

1333-
tx.write(
1334-
&keys::runner::ActorLastCommandIdxKey::new(
1335-
input.runner_id,
1336-
input.actor_id,
1337-
input.generation,
1338-
),
1339-
last_command_idx,
1340-
)?;
1351+
async move {
1352+
let tx = tx.with_subspace(keys::subspace());
13411353

1342-
for (i, command) in input.commands.iter().enumerate() {
13431354
tx.write(
1344-
&keys::runner::ActorCommandKey::new(
1355+
&keys::runner::ActorLastCommandIdxKey::new(
13451356
input.runner_id,
13461357
input.actor_id,
13471358
input.generation,
1348-
old_last_command_idx + i as i64 + 1,
13491359
),
1350-
match command {
1351-
protocol::mk2::Command::CommandStartActor(x) => {
1352-
protocol::mk2::ActorCommandKeyData::CommandStartActor(x.clone())
1353-
}
1354-
protocol::mk2::Command::CommandStopActor => {
1355-
protocol::mk2::ActorCommandKeyData::CommandStopActor
1356-
}
1357-
},
1360+
last_command_idx,
13581361
)?;
1359-
}
13601362

1361-
Ok(())
1363+
for (i, command) in commands_for_tx.iter().enumerate() {
1364+
tx.write(
1365+
&keys::runner::ActorCommandKey::new(
1366+
input.runner_id,
1367+
input.actor_id,
1368+
input.generation,
1369+
old_last_command_idx + i as i64 + 1,
1370+
),
1371+
match command {
1372+
protocol::mk2::Command::CommandStartActor(x) => {
1373+
protocol::mk2::ActorCommandKeyData::CommandStartActor(x.clone())
1374+
}
1375+
protocol::mk2::Command::CommandStopActor => {
1376+
protocol::mk2::ActorCommandKeyData::CommandStopActor
1377+
}
1378+
},
1379+
)?;
1380+
}
1381+
1382+
Ok(())
1383+
}
13621384
})
13631385
.await?;
13641386

@@ -1367,8 +1389,7 @@ pub async fn insert_and_send_commands(
13671389

13681390
let message_serialized =
13691391
versioned::ToRunnerMk2::wrap_latest(protocol::mk2::ToRunner::ToClientCommands(
1370-
input
1371-
.commands
1392+
commands
13721393
.iter()
13731394
.enumerate()
13741395
.map(|(i, command)| protocol::mk2::CommandWrapper {

0 commit comments

Comments
 (0)