Skip to content

Commit cfcac6f

Browse files
committed
chore: tunnel auth
1 parent d52f9f7 commit cfcac6f

11 files changed

Lines changed: 371 additions & 34 deletions

File tree

CLAUDE.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,14 @@ When the user asks to track something in a note, store it in `.agent/notes/` by
216216
- Any behavior, protocol handling, or test coverage added to one runner should be mirrored in the other runner in the same change whenever possible.
217217
- When parity cannot be completed in the same change, explicitly document the gap and add a follow-up task.
218218

219+
### Trust Boundaries
220+
- Treat `client <-> engine` as untrusted.
221+
- Treat `envoy <-> pegboard-envoy` as untrusted.
222+
- Treat traffic inside the engine over `nats`, `fdb`, and other internal backends as trusted.
223+
- Treat `gateway`, `api`, `pegboard-envoy`, `nats`, `fdb`, and similar engine-internal services as one trusted internal boundary once traffic is inside the engine.
224+
- Validate and authorize all client-originated data at the engine edge before it reaches trusted internal systems.
225+
- Validate and authorize all envoy-originated data at `pegboard-envoy` before it reaches trusted internal systems.
226+
219227
### Important Patterns
220228

221229
**Error Handling**

Cargo.lock

Lines changed: 15 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ members = [
4646
"engine/packages/universaldb",
4747
"engine/packages/universalpubsub",
4848
"engine/packages/util",
49+
"engine/packages/util-serde",
4950
"engine/packages/util-id",
5051
"engine/packages/workflow-worker",
5152
"engine/sdks/rust/api-full",
@@ -491,6 +492,9 @@ members = [
491492
[workspace.dependencies.rivet-util-id]
492493
path = "engine/packages/util-id"
493494

495+
[workspace.dependencies.rivet-util-serde]
496+
path = "engine/packages/util-serde"
497+
494498
[workspace.dependencies.rivet-workflow-worker]
495499
path = "engine/packages/workflow-worker"
496500

engine/packages/pegboard-envoy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ rivet-metrics.workspace = true
2727
rivet-envoy-protocol.workspace = true
2828
rivet-runtime.workspace = true
2929
rivet-types.workspace = true
30+
scc.workspace = true
3031
serde_bare.workspace = true
3132
serde_json.workspace = true
3233
serde.workspace = true

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use rivet_data::converted::{ActorNameKeyData, MetadataKeyData};
1515
use rivet_envoy_protocol::{self as protocol, versioned};
1616
use rivet_guard_core::WebSocketHandle;
1717
use rivet_types::runner_configs::RunnerConfigKind;
18+
use scc::HashMap;
1819
use universaldb::prelude::*;
1920
use vbare::OwnedVersionedData;
2021

@@ -26,6 +27,7 @@ pub struct Conn {
2627
pub envoy_key: String,
2728
pub protocol_version: u16,
2829
pub ws_handle: WebSocketHandle,
30+
pub authorized_tunnel_routes: HashMap<(protocol::GatewayId, protocol::RequestId), ()>,
2931
pub is_serverless: bool,
3032
pub last_rtt: AtomicU32,
3133
/// Timestamp (epoch ms) of the last pong received from the envoy.
@@ -101,6 +103,7 @@ pub async fn init_conn(
101103
envoy_key,
102104
protocol_version,
103105
ws_handle,
106+
authorized_tunnel_routes: HashMap::new(),
104107
is_serverless: false,
105108
last_rtt: AtomicU32::new(0),
106109
last_ping_ts: AtomicI64::new(util::timestamp::now()),
@@ -114,7 +117,6 @@ pub async fn init_conn(
114117

115118
Ok(Arc::new(conn))
116119
}
117-
118120
#[tracing::instrument(skip_all)]
119121
pub async fn handle_init(
120122
ctx: &StandaloneCtx,

engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ async fn handle_message(
162162
}
163163
protocol::ToEnvoyConn::ToEnvoyAckEvents(x) => protocol::ToEnvoy::ToEnvoyAckEvents(x),
164164
protocol::ToEnvoyConn::ToEnvoyTunnelMessage(x) => {
165+
let _ = conn
166+
.authorized_tunnel_routes
167+
.insert_async((x.message_id.gateway_id, x.message_id.request_id), ())
168+
.await;
165169
protocol::ToEnvoy::ToEnvoyTunnelMessage(x)
166170
}
167171
};

engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs

Lines changed: 110 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ use pegboard::actor_kv;
88
use pegboard::pubsub_subjects::GatewayReceiverSubject;
99
use rivet_envoy_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
1010
use rivet_guard_core::websocket_handle::WebSocketReceiver;
11+
use scc::HashMap;
1112
use std::sync::{Arc, atomic::Ordering};
1213
use tokio::sync::{Mutex, MutexGuard, watch};
1314
use universaldb::utils::end_of_key_range;
14-
use universalpubsub::PublishOpts;
15+
use universalpubsub::{PubSub, PublishOpts};
1516
use vbare::OwnedVersionedData;
1617

1718
use crate::{LifecycleResult, actor_event_demuxer::ActorEventDemuxer, conn::Conn, errors};
@@ -366,7 +367,7 @@ async fn handle_message(
366367
}
367368
}
368369
protocol::ToRivet::ToRivetTunnelMessage(tunnel_msg) => {
369-
handle_tunnel_message(&ctx, tunnel_msg)
370+
handle_tunnel_message(&ctx, conn, tunnel_msg)
370371
.await
371372
.context("failed to handle tunnel message")?;
372373
}
@@ -447,16 +448,41 @@ async fn ack_commands(
447448
#[tracing::instrument(skip_all)]
448449
async fn handle_tunnel_message(
449450
ctx: &StandaloneCtx,
451+
conn: &Conn,
452+
msg: protocol::ToRivetTunnelMessage,
453+
) -> Result<()> {
454+
forward_tunnel_message(
455+
&ctx.ups().context("failed to get UPS instance for tunnel message")?,
456+
ctx.config().pegboard().envoy_max_response_payload_size(),
457+
&conn.authorized_tunnel_routes,
458+
msg,
459+
)
460+
.await
461+
}
462+
463+
async fn forward_tunnel_message(
464+
ups: &PubSub,
465+
max_payload_size: usize,
466+
authorized_tunnel_routes: &HashMap<(protocol::GatewayId, protocol::RequestId), ()>,
450467
msg: protocol::ToRivetTunnelMessage,
451468
) -> Result<()> {
452469
// Extract inner data length before consuming msg
453470
let inner_data_len = tunnel_message_inner_data_len(&msg.message_kind);
454471

455472
// Enforce incoming payload size
456-
if inner_data_len > ctx.config().pegboard().envoy_max_response_payload_size() {
473+
if inner_data_len > max_payload_size {
457474
return Err(errors::WsError::InvalidPacket("payload too large".to_string()).build());
458475
}
459476

477+
if !authorized_tunnel_routes
478+
.contains_async(&(msg.message_id.gateway_id, msg.message_id.request_id))
479+
.await
480+
{
481+
return Err(
482+
errors::WsError::InvalidPacket("unauthorized tunnel message".to_string()).build(),
483+
);
484+
}
485+
460486
let gateway_reply_to = GatewayReceiverSubject::new(msg.message_id.gateway_id).to_string();
461487
let msg_serialized =
462488
versioned::ToGateway::wrap_latest(protocol::ToGateway::ToRivetTunnelMessage(msg))
@@ -470,9 +496,7 @@ async fn handle_tunnel_message(
470496
);
471497

472498
// Publish message to UPS
473-
ctx.ups()
474-
.context("failed to get UPS instance for tunnel message")?
475-
.publish(&gateway_reply_to, &msg_serialized, PublishOpts::one())
499+
ups.publish(&gateway_reply_to, &msg_serialized, PublishOpts::one())
476500
.await
477501
.with_context(|| {
478502
format!(
@@ -500,6 +524,86 @@ fn tunnel_message_inner_data_len(kind: &protocol::ToRivetTunnelMessageKind) -> u
500524
}
501525
}
502526

527+
#[cfg(test)]
528+
mod tests {
529+
use std::sync::Arc;
530+
use std::time::Duration;
531+
532+
use super::*;
533+
use universalpubsub::driver::memory::MemoryDriver;
534+
use universalpubsub::NextOutput;
535+
536+
fn test_pubsub(channel: &str) -> PubSub {
537+
PubSub::new(Arc::new(MemoryDriver::new(channel.to_string())))
538+
}
539+
540+
fn test_message(gateway_id: [u8; 4], request_id: [u8; 4]) -> protocol::ToRivetTunnelMessage {
541+
protocol::ToRivetTunnelMessage {
542+
message_id: protocol::MessageId {
543+
gateway_id,
544+
request_id,
545+
message_index: 0,
546+
},
547+
message_kind: protocol::ToRivetTunnelMessageKind::ToRivetResponseAbort,
548+
}
549+
}
550+
551+
#[tokio::test]
552+
async fn rejects_unissued_tunnel_message_pairs() {
553+
let pubsub = test_pubsub("pegboard-envoy-ws-to-tunnel-test-reject");
554+
let gateway_id = [1, 2, 3, 4];
555+
let request_id = [5, 6, 7, 8];
556+
let mut sub = pubsub
557+
.subscribe(&GatewayReceiverSubject::new(gateway_id).to_string())
558+
.await
559+
.unwrap();
560+
let authorized_tunnel_routes = HashMap::new();
561+
562+
let err = forward_tunnel_message(
563+
&pubsub,
564+
1024,
565+
&authorized_tunnel_routes,
566+
test_message(gateway_id, request_id),
567+
)
568+
.await
569+
.unwrap_err();
570+
assert!(err.to_string().contains("unauthorized tunnel message"));
571+
572+
let recv = tokio::time::timeout(Duration::from_millis(50), sub.next()).await;
573+
assert!(recv.is_err());
574+
}
575+
576+
#[tokio::test]
577+
async fn republishes_issued_tunnel_message_pairs() {
578+
let pubsub = test_pubsub("pegboard-envoy-ws-to-tunnel-test-allow");
579+
let gateway_id = [9, 10, 11, 12];
580+
let request_id = [13, 14, 15, 16];
581+
let mut sub = pubsub
582+
.subscribe(&GatewayReceiverSubject::new(gateway_id).to_string())
583+
.await
584+
.unwrap();
585+
let authorized_tunnel_routes = HashMap::new();
586+
let _ = authorized_tunnel_routes
587+
.insert_async((gateway_id, request_id), ())
588+
.await;
589+
590+
forward_tunnel_message(
591+
&pubsub,
592+
1024,
593+
&authorized_tunnel_routes,
594+
test_message(gateway_id, request_id),
595+
)
596+
.await
597+
.unwrap();
598+
599+
let msg = tokio::time::timeout(Duration::from_secs(1), sub.next())
600+
.await
601+
.unwrap()
602+
.unwrap();
603+
assert!(matches!(msg, NextOutput::Message(_)));
604+
}
605+
}
606+
503607
async fn send_actor_kv_error(conn: &Conn, request_id: u32, message: &str) -> Result<()> {
504608
let res_msg = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyKvResponse(
505609
protocol::ToEnvoyKvResponse {

engine/packages/pegboard-runner/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ rivet-metrics.workspace = true
2828
rivet-runner-protocol.workspace = true
2929
rivet-runtime.workspace = true
3030
rivet-types.workspace = true
31+
scc.workspace = true
3132
serde_bare.workspace = true
3233
serde_json.workspace = true
3334
serde.workspace = true

engine/packages/pegboard-runner/src/conn.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use rivet_data::converted::{ActorNameKeyData, MetadataKeyData};
1616
use rivet_guard_core::WebSocketHandle;
1717
use rivet_runner_protocol::{self as protocol, versioned};
1818
use rivet_types::runner_configs::RunnerConfigKind;
19+
use scc::HashMap;
1920
use universaldb::prelude::*;
2021
use vbare::OwnedVersionedData;
2122

@@ -29,6 +30,7 @@ pub struct Conn {
2930
pub workflow_id: Id,
3031
pub protocol_version: u16,
3132
pub ws_handle: WebSocketHandle,
33+
pub authorized_tunnel_routes: HashMap<(protocol::mk2::GatewayId, protocol::mk2::RequestId), ()>,
3234
pub last_rtt: AtomicU32,
3335
/// Timestamp (epoch ms) of the last pong received from the runner.
3436
pub last_ping_ts: AtomicI64,
@@ -188,6 +190,7 @@ pub async fn init_conn(
188190
workflow_id,
189191
protocol_version,
190192
ws_handle,
193+
authorized_tunnel_routes: HashMap::new(),
191194
last_rtt: AtomicU32::new(0),
192195
last_ping_ts: AtomicI64::new(util::timestamp::now()),
193196
});
@@ -213,7 +216,6 @@ pub async fn init_conn(
213216

214217
Ok(conn)
215218
}
216-
217219
enum Init {
218220
Mk2(protocol::mk2::ToServerInit),
219221
Mk1(protocol::ToServerInit),

engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ async fn handle_message_mk2(
158158
protocol::mk2::ToClient::ToClientAckEvents(x)
159159
}
160160
protocol::mk2::ToRunner::ToClientTunnelMessage(x) => {
161+
let _ = conn
162+
.authorized_tunnel_routes
163+
.insert_async((x.message_id.gateway_id, x.message_id.request_id), ())
164+
.await;
161165
protocol::mk2::ToClient::ToClientTunnelMessage(x)
162166
}
163167
};
@@ -250,6 +254,10 @@ async fn handle_message_mk1(
250254
protocol::ToRunner::ToClientAckEvents(x) => protocol::ToClient::ToClientAckEvents(x),
251255
protocol::ToRunner::ToClientKvResponse(x) => protocol::ToClient::ToClientKvResponse(x),
252256
protocol::ToRunner::ToClientTunnelMessage(x) => {
257+
let _ = conn
258+
.authorized_tunnel_routes
259+
.insert_async((x.message_id.gateway_id, x.message_id.request_id), ())
260+
.await;
253261
protocol::ToClient::ToClientTunnelMessage(x)
254262
}
255263
};

0 commit comments

Comments
 (0)