Skip to content

Commit 85d514f

Browse files
committed
chore: tunnel auth
1 parent d52f9f7 commit 85d514f

13 files changed

Lines changed: 354 additions & 37 deletions

File tree

CLAUDE.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,14 @@ When the user asks to track something in a note, store it in `.agent/notes/` by
216216
- Any behavior, protocol handling, or test coverage added to one runner should be mirrored in the other runner in the same change whenever possible.
217217
- When parity cannot be completed in the same change, explicitly document the gap and add a follow-up task.
218218

219+
### Trust Boundaries
220+
- Treat `client <-> engine` as untrusted.
221+
- Treat `envoy <-> pegboard-envoy` as untrusted.
222+
- Treat traffic inside the engine over `nats`, `fdb`, and other internal backends as trusted.
223+
- Treat `gateway`, `api`, `pegboard-envoy`, `nats`, `fdb`, and similar engine-internal services as one trusted internal boundary once traffic is inside the engine.
224+
- Validate and authorize all client-originated data at the engine edge before it reaches trusted internal systems.
225+
- Validate and authorize all envoy-originated data at `pegboard-envoy` before it reaches trusted internal systems.
226+
219227
### Important Patterns
220228

221229
**Error Handling**

Cargo.lock

Lines changed: 15 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ members = [
4646
"engine/packages/universaldb",
4747
"engine/packages/universalpubsub",
4848
"engine/packages/util",
49+
"engine/packages/util-serde",
4950
"engine/packages/util-id",
5051
"engine/packages/workflow-worker",
5152
"engine/sdks/rust/api-full",
@@ -491,6 +492,9 @@ members = [
491492
[workspace.dependencies.rivet-util-id]
492493
path = "engine/packages/util-id"
493494

495+
[workspace.dependencies.rivet-util-serde]
496+
path = "engine/packages/util-serde"
497+
494498
[workspace.dependencies.rivet-workflow-worker]
495499
path = "engine/packages/workflow-worker"
496500

engine/packages/pegboard-envoy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ rivet-metrics.workspace = true
2727
rivet-envoy-protocol.workspace = true
2828
rivet-runtime.workspace = true
2929
rivet-types.workspace = true
30+
scc.workspace = true
3031
serde_bare.workspace = true
3132
serde_json.workspace = true
3233
serde.workspace = true

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use rivet_data::converted::{ActorNameKeyData, MetadataKeyData};
1515
use rivet_envoy_protocol::{self as protocol, versioned};
1616
use rivet_guard_core::WebSocketHandle;
1717
use rivet_types::runner_configs::RunnerConfigKind;
18+
use scc::HashMap;
1819
use universaldb::prelude::*;
1920
use vbare::OwnedVersionedData;
2021

@@ -26,6 +27,7 @@ pub struct Conn {
2627
pub envoy_key: String,
2728
pub protocol_version: u16,
2829
pub ws_handle: WebSocketHandle,
30+
pub authorized_tunnel_routes: HashMap<(protocol::GatewayId, protocol::RequestId), ()>,
2931
pub is_serverless: bool,
3032
pub last_rtt: AtomicU32,
3133
/// Timestamp (epoch ms) of the last pong received from the envoy.
@@ -101,6 +103,7 @@ pub async fn init_conn(
101103
envoy_key,
102104
protocol_version,
103105
ws_handle,
106+
authorized_tunnel_routes: HashMap::new(),
104107
is_serverless: false,
105108
last_rtt: AtomicU32::new(0),
106109
last_ping_ts: AtomicI64::new(util::timestamp::now()),
@@ -114,7 +117,6 @@ pub async fn init_conn(
114117

115118
Ok(Arc::new(conn))
116119
}
117-
118120
#[tracing::instrument(skip_all)]
119121
pub async fn handle_init(
120122
ctx: &StandaloneCtx,

engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ async fn handle_message(
162162
}
163163
protocol::ToEnvoyConn::ToEnvoyAckEvents(x) => protocol::ToEnvoy::ToEnvoyAckEvents(x),
164164
protocol::ToEnvoyConn::ToEnvoyTunnelMessage(x) => {
165+
let _ = conn
166+
.authorized_tunnel_routes
167+
.insert_async((x.message_id.gateway_id, x.message_id.request_id), ())
168+
.await;
165169
protocol::ToEnvoy::ToEnvoyTunnelMessage(x)
166170
}
167171
};

engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ use pegboard::actor_kv;
88
use pegboard::pubsub_subjects::GatewayReceiverSubject;
99
use rivet_envoy_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
1010
use rivet_guard_core::websocket_handle::WebSocketReceiver;
11+
use scc::HashMap;
1112
use std::sync::{Arc, atomic::Ordering};
1213
use tokio::sync::{Mutex, MutexGuard, watch};
1314
use universaldb::utils::end_of_key_range;
14-
use universalpubsub::PublishOpts;
15+
use universalpubsub::{PubSub, PublishOpts};
1516
use vbare::OwnedVersionedData;
1617

1718
use crate::{LifecycleResult, actor_event_demuxer::ActorEventDemuxer, conn::Conn, errors};
@@ -366,7 +367,12 @@ async fn handle_message(
366367
}
367368
}
368369
protocol::ToRivet::ToRivetTunnelMessage(tunnel_msg) => {
369-
handle_tunnel_message(&ctx, tunnel_msg)
370+
handle_tunnel_message(
371+
&ctx.ups().context("failed to get UPS instance for tunnel message")?,
372+
ctx.config().pegboard().envoy_max_response_payload_size(),
373+
&conn.authorized_tunnel_routes,
374+
tunnel_msg,
375+
)
370376
.await
371377
.context("failed to handle tunnel message")?;
372378
}
@@ -446,17 +452,28 @@ async fn ack_commands(
446452

447453
#[tracing::instrument(skip_all)]
448454
async fn handle_tunnel_message(
449-
ctx: &StandaloneCtx,
455+
ups: &PubSub,
456+
max_payload_size: usize,
457+
authorized_tunnel_routes: &HashMap<(protocol::GatewayId, protocol::RequestId), ()>,
450458
msg: protocol::ToRivetTunnelMessage,
451459
) -> Result<()> {
452460
// Extract inner data length before consuming msg
453461
let inner_data_len = tunnel_message_inner_data_len(&msg.message_kind);
454462

455463
// Enforce incoming payload size
456-
if inner_data_len > ctx.config().pegboard().envoy_max_response_payload_size() {
464+
if inner_data_len > max_payload_size {
457465
return Err(errors::WsError::InvalidPacket("payload too large".to_string()).build());
458466
}
459467

468+
if !authorized_tunnel_routes
469+
.contains_async(&(msg.message_id.gateway_id, msg.message_id.request_id))
470+
.await
471+
{
472+
return Err(
473+
errors::WsError::InvalidPacket("unauthorized tunnel message".to_string()).build(),
474+
);
475+
}
476+
460477
let gateway_reply_to = GatewayReceiverSubject::new(msg.message_id.gateway_id).to_string();
461478
let msg_serialized =
462479
versioned::ToGateway::wrap_latest(protocol::ToGateway::ToRivetTunnelMessage(msg))
@@ -470,9 +487,7 @@ async fn handle_tunnel_message(
470487
);
471488

472489
// Publish message to UPS
473-
ctx.ups()
474-
.context("failed to get UPS instance for tunnel message")?
475-
.publish(&gateway_reply_to, &msg_serialized, PublishOpts::one())
490+
ups.publish(&gateway_reply_to, &msg_serialized, PublishOpts::one())
476491
.await
477492
.with_context(|| {
478493
format!(
@@ -500,6 +515,10 @@ fn tunnel_message_inner_data_len(kind: &protocol::ToRivetTunnelMessageKind) -> u
500515
}
501516
}
502517

518+
#[cfg(test)]
519+
#[path = "../tests/support/ws_to_tunnel_task.rs"]
520+
mod tests;
521+
503522
async fn send_actor_kv_error(conn: &Conn, request_id: u32, message: &str) -> Result<()> {
504523
let res_msg = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyKvResponse(
505524
protocol::ToEnvoyKvResponse {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use pegboard::pubsub_subjects::GatewayReceiverSubject;
4+
use rivet_envoy_protocol as protocol;
5+
use scc::HashMap;
6+
use universalpubsub::{NextOutput, PubSub, driver::memory::MemoryDriver};
7+
8+
use super::handle_tunnel_message;
9+
10+
fn memory_pubsub(channel: &str) -> PubSub {
11+
PubSub::new(Arc::new(MemoryDriver::new(channel.to_string())))
12+
}
13+
14+
fn response_abort_message(
15+
gateway_id: protocol::GatewayId,
16+
request_id: protocol::RequestId,
17+
) -> protocol::ToRivetTunnelMessage {
18+
protocol::ToRivetTunnelMessage {
19+
message_id: protocol::MessageId {
20+
gateway_id,
21+
request_id,
22+
message_index: 0,
23+
},
24+
message_kind: protocol::ToRivetTunnelMessageKind::ToRivetResponseAbort,
25+
}
26+
}
27+
28+
#[tokio::test]
29+
async fn rejects_unissued_tunnel_message_pairs() {
30+
let pubsub = memory_pubsub("pegboard-envoy-ws-to-tunnel-test-reject");
31+
let gateway_id = [1, 2, 3, 4];
32+
let request_id = [5, 6, 7, 8];
33+
let mut sub = pubsub
34+
.subscribe(&GatewayReceiverSubject::new(gateway_id).to_string())
35+
.await
36+
.unwrap();
37+
let authorized_tunnel_routes = HashMap::new();
38+
39+
let err = handle_tunnel_message(
40+
&pubsub,
41+
1024,
42+
&authorized_tunnel_routes,
43+
response_abort_message(gateway_id, request_id),
44+
)
45+
.await
46+
.unwrap_err();
47+
assert!(err.to_string().contains("unauthorized tunnel message"));
48+
49+
let recv = tokio::time::timeout(Duration::from_millis(50), sub.next()).await;
50+
assert!(recv.is_err());
51+
}
52+
53+
#[tokio::test]
54+
async fn republishes_issued_tunnel_message_pairs() {
55+
let pubsub = memory_pubsub("pegboard-envoy-ws-to-tunnel-test-allow");
56+
let gateway_id = [9, 10, 11, 12];
57+
let request_id = [13, 14, 15, 16];
58+
let mut sub = pubsub
59+
.subscribe(&GatewayReceiverSubject::new(gateway_id).to_string())
60+
.await
61+
.unwrap();
62+
let authorized_tunnel_routes = HashMap::new();
63+
let _ = authorized_tunnel_routes
64+
.insert_async((gateway_id, request_id), ())
65+
.await;
66+
67+
handle_tunnel_message(
68+
&pubsub,
69+
1024,
70+
&authorized_tunnel_routes,
71+
response_abort_message(gateway_id, request_id),
72+
)
73+
.await
74+
.unwrap();
75+
76+
let msg = tokio::time::timeout(Duration::from_secs(1), sub.next())
77+
.await
78+
.unwrap()
79+
.unwrap();
80+
assert!(matches!(msg, NextOutput::Message(_)));
81+
}

engine/packages/pegboard-runner/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ rivet-metrics.workspace = true
2828
rivet-runner-protocol.workspace = true
2929
rivet-runtime.workspace = true
3030
rivet-types.workspace = true
31+
scc.workspace = true
3132
serde_bare.workspace = true
3233
serde_json.workspace = true
3334
serde.workspace = true

engine/packages/pegboard-runner/src/conn.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use rivet_data::converted::{ActorNameKeyData, MetadataKeyData};
1616
use rivet_guard_core::WebSocketHandle;
1717
use rivet_runner_protocol::{self as protocol, versioned};
1818
use rivet_types::runner_configs::RunnerConfigKind;
19+
use scc::HashMap;
1920
use universaldb::prelude::*;
2021
use vbare::OwnedVersionedData;
2122

@@ -29,6 +30,7 @@ pub struct Conn {
2930
pub workflow_id: Id,
3031
pub protocol_version: u16,
3132
pub ws_handle: WebSocketHandle,
33+
pub authorized_tunnel_routes: HashMap<(protocol::mk2::GatewayId, protocol::mk2::RequestId), ()>,
3234
pub last_rtt: AtomicU32,
3335
/// Timestamp (epoch ms) of the last pong received from the runner.
3436
pub last_ping_ts: AtomicI64,
@@ -188,6 +190,7 @@ pub async fn init_conn(
188190
workflow_id,
189191
protocol_version,
190192
ws_handle,
193+
authorized_tunnel_routes: HashMap::new(),
191194
last_rtt: AtomicU32::new(0),
192195
last_ping_ts: AtomicI64::new(util::timestamp::now()),
193196
});
@@ -213,7 +216,6 @@ pub async fn init_conn(
213216

214217
Ok(conn)
215218
}
216-
217219
enum Init {
218220
Mk2(protocol::mk2::ToServerInit),
219221
Mk1(protocol::ToServerInit),

0 commit comments

Comments
 (0)