Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 88 additions & 33 deletions server/json-rpc/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,46 @@ pub struct AppState<R> {
pub logger: Logger,
}

/// Dispatch a single JSON-RPC request to the appropriate method handler.
///
/// Returns `None` for notifications (requests without an id), since the
/// JSON-RPC 2.0 spec says notifications must not produce a response.
async fn dispatch_request<R: SubgraphRegistrar>(
state: &Arc<AppState<R>>,
request: JsonRpcRequest,
) -> Option<JsonRpcResponse> {
if !request.is_valid_version() {
return Some(JsonRpcResponse::invalid_request());
}

let id = request.id.clone().unwrap_or(JsonRpcId::Null);

// Notifications (no id) should not produce a response per JSON-RPC 2.0 spec.
// But since all our methods have side effects, we still execute them;
// we just don't return the response.
let is_notification = request.id.is_none();

let response = match request.method.as_str() {
"subgraph_create" => handle_create(state, &request, id).await,
"subgraph_deploy" => handle_deploy(state, &request, id).await,
"subgraph_remove" => handle_remove(state, &request, id).await,
"subgraph_reassign" => handle_reassign(state, &request, id).await,
"subgraph_pause" => handle_pause(state, &request, id).await,
"subgraph_resume" => handle_resume(state, &request, id).await,
_ => JsonRpcResponse::error(id, JsonRpcError::method_not_found()),
};

if is_notification {
None
} else {
Some(response)
}
}

/// Main JSON-RPC request handler.
///
/// Processes incoming JSON-RPC requests, dispatches to the appropriate method handler,
/// and returns the response.
/// Supports both single requests and batch requests (JSON arrays).
/// Per JSON-RPC 2.0 spec, an empty batch array returns an invalid request error.
pub async fn jsonrpc_handler<R: SubgraphRegistrar>(
State(state): State<Arc<AppState<R>>>,
ConnectInfo(remote_addr): ConnectInfo<SocketAddr>,
Expand All @@ -56,45 +92,64 @@ pub async fn jsonrpc_handler<R: SubgraphRegistrar>(
.unwrap_or("unset")
}

// Parse the JSON-RPC request
let log_request = |method: &str, params: &Option<JsonValue>| {
info!(
&state.logger,
"JSON-RPC call";
"method" => method,
"params" => ?params,
"remote_addr" => %remote_addr,
"x_forwarded_for" => header(&headers, "x-forwarded-for"),
"x_real_ip" => header(&headers, "x-real-ip"),
"x_forwarded_proto" => header(&headers, "x-forwarded-proto")
);
};

// Try batch (JSON array) first, then single request.
if let Ok(requests) = serde_json::from_str::<Vec<JsonRpcRequest>>(&body) {
if requests.is_empty() {
let resp = serde_json::to_value(JsonRpcResponse::invalid_request())
.expect("failed to serialize response");
return (StatusCode::OK, Json(resp));
}

info!(
&state.logger,
"JSON-RPC batch request";
"batch_size" => requests.len(),
"remote_addr" => %remote_addr,
);

let mut responses = Vec::new();
for request in requests {
log_request(&request.method, &request.params);
if let Some(resp) = dispatch_request(&state, request).await {
responses.push(resp);
}
}

let value = serde_json::to_value(responses).expect("failed to serialize responses");
return (StatusCode::OK, Json(value));
}

// Single request
let request: JsonRpcRequest = match serde_json::from_str(&body) {
Ok(req) => req,
Err(_) => {
return (StatusCode::OK, Json(JsonRpcResponse::parse_error()));
let resp = serde_json::to_value(JsonRpcResponse::parse_error())
.expect("failed to serialize response");
return (StatusCode::OK, Json(resp));
}
};

// Validate JSON-RPC version
if !request.is_valid_version() {
return (StatusCode::OK, Json(JsonRpcResponse::invalid_request()));
}

let id = request.id.clone().unwrap_or(JsonRpcId::Null);

// Log the method call
info!(
&state.logger,
"JSON-RPC call";
"method" => &request.method,
"params" => ?request.params,
"remote_addr" => %remote_addr,
"x_forwarded_for" => header(&headers, "x-forwarded-for"),
"x_real_ip" => header(&headers, "x-real-ip"),
"x_forwarded_proto" => header(&headers, "x-forwarded-proto")
);
log_request(&request.method, &request.params);

// Dispatch to the appropriate handler
let response = match request.method.as_str() {
"subgraph_create" => handle_create(&state, &request, id.clone()).await,
"subgraph_deploy" => handle_deploy(&state, &request, id.clone()).await,
"subgraph_remove" => handle_remove(&state, &request, id.clone()).await,
"subgraph_reassign" => handle_reassign(&state, &request, id.clone()).await,
"subgraph_pause" => handle_pause(&state, &request, id.clone()).await,
"subgraph_resume" => handle_resume(&state, &request, id.clone()).await,
_ => JsonRpcResponse::error(id, JsonRpcError::method_not_found()),
};
let response = dispatch_request(&state, request)
.await
.unwrap_or_else(|| JsonRpcResponse::success(JsonRpcId::Null, JsonValue::Null));

(StatusCode::OK, Json(response))
let value = serde_json::to_value(response).expect("failed to serialize response");
(StatusCode::OK, Json(value))
}

/// Parse parameters from a JSON-RPC request.
Expand Down
55 changes: 55 additions & 0 deletions server/json-rpc/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,59 @@ mod tests {
assert!(json.contains(r#""id":"req-1""#));
assert!(!json.contains("result"));
}

#[test]
fn deserialize_batch_request() {
let json = r#"[
{"jsonrpc":"2.0","method":"subgraph_create","id":1},
{"jsonrpc":"2.0","method":"subgraph_remove","id":2}
]"#;
let requests: Vec<JsonRpcRequest> = serde_json::from_str(json).unwrap();
assert_eq!(requests.len(), 2);
assert_eq!(requests[0].method, "subgraph_create");
assert_eq!(requests[0].id, Some(JsonRpcId::Number(1)));
assert_eq!(requests[1].method, "subgraph_remove");
assert_eq!(requests[1].id, Some(JsonRpcId::Number(2)));
}

#[test]
fn serialize_batch_response() {
let responses = vec![
JsonRpcResponse::success(JsonRpcId::Number(1), serde_json::json!({"ok": true})),
JsonRpcResponse::error(
JsonRpcId::Number(2),
JsonRpcError::new(-32601, "Method not found"),
),
];
let json = serde_json::to_string(&responses).unwrap();
// Must be a JSON array
assert!(json.starts_with('['));
assert!(json.ends_with(']'));
// Both responses present
let parsed: Vec<serde_json::Value> = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.len(), 2);
assert_eq!(parsed[0]["id"], 1);
assert_eq!(parsed[1]["id"], 2);
}

#[test]
fn batch_with_notification_omits_response() {
// A notification has no id — when filtered, the batch response should
// contain fewer entries than the batch request.
let json = r#"[
{"jsonrpc":"2.0","method":"subgraph_create","id":1},
{"jsonrpc":"2.0","method":"subgraph_remove"}
]"#;
let requests: Vec<JsonRpcRequest> = serde_json::from_str(json).unwrap();
assert_eq!(requests.len(), 2);
assert!(requests[0].id.is_some());
assert!(requests[1].id.is_none());
}

#[test]
fn empty_batch_is_valid_json_array() {
let json = "[]";
let requests: Vec<JsonRpcRequest> = serde_json::from_str(json).unwrap();
assert!(requests.is_empty());
}
}