Skip to content

Commit a91a6e5

Browse files
authored
fix(envoy): require runner config for ws conn (#4612)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. ## Checklist: - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes
1 parent 2dec26f commit a91a6e5

3 files changed

Lines changed: 40 additions & 29 deletions

File tree

engine/artifacts/errors/ws.no_runner_config.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use scc::HashMap;
1818
use universaldb::prelude::*;
1919
use vbare::OwnedVersionedData;
2020

21-
use crate::{metrics, utils::UrlData};
21+
use crate::{errors, metrics, utils::UrlData};
2222

2323
pub struct Conn {
2424
pub namespace_id: Id,
@@ -54,6 +54,20 @@ pub async fn init_conn(
5454
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())
5555
.with_context(|| format!("namespace not found: {}", namespace_name))?;
5656

57+
let pool_res = ctx
58+
.op(pegboard::ops::runner_config::get::Input {
59+
runners: vec![(namespace.namespace_id, pool_name.clone())],
60+
bypass_cache: false,
61+
})
62+
.await?;
63+
64+
let Some(pool) = pool_res.into_iter().next() else {
65+
return Err(errors::WsError::NoRunnerConfig {
66+
pool_name: pool_name.clone(),
67+
}
68+
.build());
69+
};
70+
5771
tracing::debug!(namespace_id=?namespace.namespace_id, "new envoy connection");
5872

5973
metrics::CONNECTION_TOTAL
@@ -67,27 +81,20 @@ pub async fn init_conn(
6781
.with_label_values(&[namespace.namespace_id.to_string().as_str(), &pool_name])
6882
.observe(start.elapsed().as_secs_f64());
6983

84+
let serverless_drain_grace_period = if let RunnerConfigKind::Serverless {
85+
drain_grace_period,
86+
..
87+
} = &pool.config.kind
88+
{
89+
Some(*drain_grace_period as i64)
90+
} else {
91+
None
92+
};
93+
7094
let udb = ctx.udb()?;
71-
let (is_serverless, mut missed_commands, _) = tokio::try_join!(
95+
let (_, mut missed_commands) = tokio::try_join!(
7296
// Send init packet as soon as possible
7397
async {
74-
let pool_res = ctx
75-
.op(pegboard::ops::runner_config::get::Input {
76-
runners: vec![(namespace.namespace_id, pool_name.clone())],
77-
bypass_cache: false,
78-
})
79-
.await?;
80-
81-
let serverless_drain_grace_period = pool_res.first().and_then(|c| {
82-
if let RunnerConfigKind::Serverless {
83-
drain_grace_period, ..
84-
} = &c.config.kind
85-
{
86-
Some(*drain_grace_period as i64)
87-
} else {
88-
None
89-
}
90-
});
9198
let pb = ctx.config().pegboard();
9299

93100
// Send init packet
@@ -104,9 +111,7 @@ pub async fn init_conn(
104111
let init_msg_serialized = init_msg.serialize(protocol_version)?;
105112
ws_handle
106113
.send(Message::Binary(init_msg_serialized.into()))
107-
.await?;
108-
109-
anyhow::Ok(serverless_drain_grace_period.is_some())
114+
.await
110115
},
111116
udb.run(|tx| {
112117
let namespace_id = namespace.namespace_id;
@@ -279,12 +284,6 @@ pub async fn init_conn(
279284
}
280285
})
281286
.custom_instrument(tracing::info_span!("envoy_init_tx")),
282-
ctx.op(
283-
pegboard::ops::runner_config::ensure_normal_if_missing::Input {
284-
namespace_id: namespace.namespace_id,
285-
name: pool_name.clone(),
286-
}
287-
),
288287
)?;
289288

290289
// Send missed commands (must be after init packet)
@@ -318,6 +317,8 @@ pub async fn init_conn(
318317
.await?;
319318
}
320319

320+
let is_serverless = serverless_drain_grace_period.is_some();
321+
321322
if is_serverless {
322323
report_success(ctx, namespace.namespace_id, &pool_name).await;
323324
}
@@ -329,7 +330,7 @@ pub async fn init_conn(
329330
protocol_version,
330331
ws_handle,
331332
authorized_tunnel_routes: HashMap::new(),
332-
is_serverless: false,
333+
is_serverless,
333334
last_rtt: AtomicU32::new(0),
334335
last_ping_ts: AtomicI64::new(util::timestamp::now()),
335336
}))

engine/packages/pegboard-envoy/src/errors.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ pub enum WsError {
1414
"The Rivet Engine is migrating. The websocket should attempt to reconnect as soon as possible."
1515
)]
1616
GoingAway,
17+
#[error(
18+
"no_runner_config",
19+
"Must create a runner config before connecting an envoy with pool name {pool_name:?}."
20+
)]
21+
NoRunnerConfig { pool_name: String },
1722
#[error("timed_out", "Ping timed out.")]
1823
TimedOut,
1924
#[error(

0 commit comments

Comments
 (0)