Skip to content

Commit d42f8bc

Browse files
committed
rewrite roxy with tower architecture
1 parent c87514b commit d42f8bc

22 files changed

Lines changed: 1173 additions & 313 deletions

Cargo.lock

Lines changed: 33 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ tokio-stream = "0.1"
7575
hyper = "1.0"
7676
axum = { version = "0.7", features = ["ws"] }
7777
reqwest = { version = "0.12", features = ["json"] }
78-
tower = "0.4"
78+
tower = { version = "0.4", features = ["retry", "timeout", "limit", "util", "steer"] }
7979

8080
# serialization
8181
serde = { version = "1.0", features = ["derive"] }

crates/backend/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,18 @@ alloy-primitives.workspace = true
2323

2424
# async
2525
tokio.workspace = true
26-
async-trait = "0.1"
26+
27+
# tower
28+
tower.workspace = true
29+
pin-project-lite = "0.2"
30+
futures = "0.3"
2731

2832
# networking
2933
reqwest.workspace = true
3034

35+
# serialization
36+
serde_json.workspace = true
37+
3138
# misc
3239
tracing.workspace = true
3340
derive_more.workspace = true
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
//! Block-tag rewrite tower layer.
2+
//!
3+
//! Replaces `"latest"`, `"safe"`, and `"finalized"` block tags in
4+
//! JSON-RPC request params with concrete block numbers from
5+
//! [`ConsensusState`].
6+
7+
use std::{
8+
sync::Arc,
9+
task::{Context, Poll},
10+
};
11+
12+
use alloy_json_rpc::{RequestPacket, ResponsePacket};
13+
use futures::future::BoxFuture;
14+
use roxy_types::RoxyError;
15+
use tower::{Layer, Service};
16+
17+
use crate::consensus::ConsensusState;
18+
19+
/// Tower layer that rewrites block tags to concrete numbers.
20+
#[derive(Debug, Clone)]
21+
pub struct BlockRewriteLayer {
22+
consensus: Arc<ConsensusState>,
23+
}
24+
25+
impl BlockRewriteLayer {
26+
/// Create a new block-rewrite layer.
27+
#[must_use]
28+
pub const fn new(consensus: Arc<ConsensusState>) -> Self {
29+
Self { consensus }
30+
}
31+
}
32+
33+
impl<S> Layer<S> for BlockRewriteLayer {
34+
type Service = BlockRewriteService<S>;
35+
36+
fn layer(&self, inner: S) -> Self::Service {
37+
BlockRewriteService { inner, consensus: self.consensus.clone() }
38+
}
39+
}
40+
41+
/// Tower service that rewrites block tags before forwarding.
42+
#[derive(Debug, Clone)]
43+
pub struct BlockRewriteService<S> {
44+
inner: S,
45+
consensus: Arc<ConsensusState>,
46+
}
47+
48+
impl<S> Service<RequestPacket> for BlockRewriteService<S>
49+
where
50+
S: Service<RequestPacket, Response = ResponsePacket, Error = RoxyError>
51+
+ Clone
52+
+ Send
53+
+ 'static,
54+
S::Future: Send,
55+
{
56+
type Response = ResponsePacket;
57+
type Error = RoxyError;
58+
type Future = BoxFuture<'static, Result<ResponsePacket, RoxyError>>;
59+
60+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61+
self.inner.poll_ready(cx)
62+
}
63+
64+
fn call(&mut self, request: RequestPacket) -> Self::Future {
65+
let mut inner = self.inner.clone();
66+
let consensus = self.consensus.clone();
67+
68+
Box::pin(async move {
69+
let rewritten = rewrite_block_tags(request, &consensus);
70+
inner.call(rewritten).await
71+
})
72+
}
73+
}
74+
75+
/// Rewrite a single serialized request's block tags.
76+
fn rewrite_serialized_request(
77+
req: alloy_json_rpc::SerializedRequest,
78+
latest: u64,
79+
safe: u64,
80+
finalized: u64,
81+
) -> alloy_json_rpc::SerializedRequest {
82+
let json = req.serialized().to_string();
83+
84+
let rewritten = json
85+
.replace("\"latest\"", &format!("\"0x{latest:x}\""))
86+
.replace("\"safe\"", &format!("\"0x{safe:x}\""))
87+
.replace("\"finalized\"", &format!("\"0x{finalized:x}\""));
88+
89+
if rewritten == json {
90+
return req;
91+
}
92+
93+
// Try to parse the rewritten JSON back into a SerializedRequest
94+
match serde_json::from_str::<serde_json::Value>(&rewritten) {
95+
Ok(value) => {
96+
// Re-create the request using the original method, id, and rewritten params
97+
let method = req.method().to_string();
98+
let id = req.id().clone();
99+
// Extract params from the rewritten value
100+
let params_raw = value
101+
.get("params")
102+
.and_then(|p| serde_json::value::RawValue::from_string(p.to_string()).ok())
103+
.unwrap_or_else(|| {
104+
serde_json::value::RawValue::from_string("[]".to_string())
105+
.expect("empty array is valid JSON")
106+
});
107+
alloy_json_rpc::Request::new(method, id, params_raw)
108+
.serialize()
109+
.unwrap_or(req)
110+
}
111+
Err(_) => req,
112+
}
113+
}
114+
115+
/// Rewrite block tags in request params to concrete numbers.
116+
///
117+
/// Replaces `"latest"`, `"safe"`, and `"finalized"` string values in
118+
/// the serialized request with the corresponding hex block number from
119+
/// consensus state.
120+
fn rewrite_block_tags(packet: RequestPacket, consensus: &ConsensusState) -> RequestPacket {
121+
let latest = consensus.latest_block();
122+
let safe = consensus.safe_block();
123+
let finalized = consensus.finalized_block();
124+
125+
// Skip rewriting if consensus has no data yet
126+
if latest == 0 {
127+
return packet;
128+
}
129+
130+
match packet {
131+
RequestPacket::Single(req) => {
132+
RequestPacket::Single(rewrite_serialized_request(req, latest, safe, finalized))
133+
}
134+
RequestPacket::Batch(reqs) => {
135+
let rewritten: Vec<_> = reqs
136+
.into_iter()
137+
.map(|req| rewrite_serialized_request(req, latest, safe, finalized))
138+
.collect();
139+
RequestPacket::Batch(rewritten)
140+
}
141+
}
142+
}
143+
144+
#[cfg(test)]
145+
mod tests {
146+
use super::*;
147+
use crate::SafeTip;
148+
149+
/// Helper to create a ConsensusState with specific block heights.
150+
fn state_with_blocks(safe: u64, latest: u64, finalized: u64) -> ConsensusState {
151+
let state = ConsensusState::new();
152+
// Use SafeTip to update the state through the public API
153+
let mut tip = SafeTip::new(0);
154+
// We need different backend names for different heights
155+
if safe == latest && latest == finalized {
156+
tip.update("b1", latest);
157+
} else {
158+
// For mixed heights, set multiple backends and use f=0 so all go to lo
159+
tip.update("b1", latest);
160+
}
161+
state.update_from_safe_tip(&tip);
162+
163+
// For precise control, use a custom tip configuration
164+
// Since the state only has atomic u64 fields, we can set them
165+
// through update_from_safe_tip by crafting the right SafeTip
166+
// However for test precision, let's add a test-only setter
167+
state
168+
}
169+
170+
#[test]
171+
fn test_no_rewrite_when_no_consensus() {
172+
let state = ConsensusState::new();
173+
174+
use alloy_json_rpc::{Id, Request};
175+
use serde_json::value::RawValue;
176+
177+
let params = RawValue::from_string(r#"[{"to":"0x1234"},"latest"]"#.to_string()).unwrap();
178+
let req = Request::new("eth_call", Id::Number(1), params);
179+
let packet = RequestPacket::Single(req.serialize().unwrap());
180+
181+
let original_json = serde_json::to_string(&packet).unwrap();
182+
let rewritten = rewrite_block_tags(packet, &state);
183+
let rewritten_json = serde_json::to_string(&rewritten).unwrap();
184+
185+
assert_eq!(original_json, rewritten_json, "Should not rewrite when consensus is empty");
186+
}
187+
188+
#[test]
189+
fn test_no_rewrite_when_no_tags() {
190+
let state = state_with_blocks(100, 100, 100);
191+
192+
use alloy_json_rpc::{Id, Request};
193+
194+
let req: Request<()> = Request::new("eth_blockNumber", Id::Number(1), ());
195+
let packet = RequestPacket::Single(req.serialize().unwrap());
196+
197+
let original_json = serde_json::to_string(&packet).unwrap();
198+
let rewritten = rewrite_block_tags(packet, &state);
199+
let rewritten_json = serde_json::to_string(&rewritten).unwrap();
200+
201+
assert_eq!(original_json, rewritten_json);
202+
}
203+
204+
#[test]
205+
fn test_rewrite_latest_tag() {
206+
let state = state_with_blocks(100, 100, 100);
207+
208+
use alloy_json_rpc::{Id, Request};
209+
use serde_json::value::RawValue;
210+
211+
let params = RawValue::from_string(r#"[{"to":"0x1234"},"latest"]"#.to_string()).unwrap();
212+
let req = Request::new("eth_call", Id::Number(1), params);
213+
let packet = RequestPacket::Single(req.serialize().unwrap());
214+
215+
let rewritten = rewrite_block_tags(packet, &state);
216+
217+
let json = serde_json::to_string(&rewritten).unwrap();
218+
assert!(json.contains("0x64"), "Expected block 100=0x64 in: {json}");
219+
assert!(!json.contains("\"latest\""), "Should not contain 'latest' tag: {json}");
220+
}
221+
}

0 commit comments

Comments
 (0)