From 7f30a3e8312743f99520a64dc5f1d8da989330c9 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 10 Mar 2026 10:10:25 -0700 Subject: [PATCH] server/json-rpc: Add JSON-RPC batch request support The migration from jsonrpsee to axum in commit 58b0f25 lost batch request support (JSON arrays of requests). Restore it by parsing both single and batch formats, dispatching each request individually, and returning an array of responses for batches per the JSON-RPC 2.0 spec. --- server/json-rpc/src/handlers.rs | 121 +++++++++++++++++++++++--------- server/json-rpc/src/jsonrpc.rs | 55 +++++++++++++++ 2 files changed, 143 insertions(+), 33 deletions(-) diff --git a/server/json-rpc/src/handlers.rs b/server/json-rpc/src/handlers.rs index 23482a5f2bc..eda2ed67154 100644 --- a/server/json-rpc/src/handlers.rs +++ b/server/json-rpc/src/handlers.rs @@ -39,10 +39,46 @@ pub struct AppState { pub logger: Logger, } +/// Dispatch a single JSON-RPC request to the appropriate method handler. +/// +/// Returns `None` for notifications (requests without an id), since the +/// JSON-RPC 2.0 spec says notifications must not produce a response. +async fn dispatch_request( + state: &Arc>, + request: JsonRpcRequest, +) -> Option { + if !request.is_valid_version() { + return Some(JsonRpcResponse::invalid_request()); + } + + let id = request.id.clone().unwrap_or(JsonRpcId::Null); + + // Notifications (no id) should not produce a response per JSON-RPC 2.0 spec. + // But since all our methods have side effects, we still execute them; + // we just don't return the response. + let is_notification = request.id.is_none(); + + let response = match request.method.as_str() { + "subgraph_create" => handle_create(state, &request, id).await, + "subgraph_deploy" => handle_deploy(state, &request, id).await, + "subgraph_remove" => handle_remove(state, &request, id).await, + "subgraph_reassign" => handle_reassign(state, &request, id).await, + "subgraph_pause" => handle_pause(state, &request, id).await, + "subgraph_resume" => handle_resume(state, &request, id).await, + _ => JsonRpcResponse::error(id, JsonRpcError::method_not_found()), + }; + + if is_notification { + None + } else { + Some(response) + } +} + /// Main JSON-RPC request handler. /// -/// Processes incoming JSON-RPC requests, dispatches to the appropriate method handler, -/// and returns the response. +/// Supports both single requests and batch requests (JSON arrays). +/// Per JSON-RPC 2.0 spec, an empty batch array returns an invalid request error. pub async fn jsonrpc_handler( State(state): State>>, ConnectInfo(remote_addr): ConnectInfo, @@ -56,45 +92,64 @@ pub async fn jsonrpc_handler( .unwrap_or("unset") } - // Parse the JSON-RPC request + let log_request = |method: &str, params: &Option| { + info!( + &state.logger, + "JSON-RPC call"; + "method" => method, + "params" => ?params, + "remote_addr" => %remote_addr, + "x_forwarded_for" => header(&headers, "x-forwarded-for"), + "x_real_ip" => header(&headers, "x-real-ip"), + "x_forwarded_proto" => header(&headers, "x-forwarded-proto") + ); + }; + + // Try batch (JSON array) first, then single request. + if let Ok(requests) = serde_json::from_str::>(&body) { + if requests.is_empty() { + let resp = serde_json::to_value(JsonRpcResponse::invalid_request()) + .expect("failed to serialize response"); + return (StatusCode::OK, Json(resp)); + } + + info!( + &state.logger, + "JSON-RPC batch request"; + "batch_size" => requests.len(), + "remote_addr" => %remote_addr, + ); + + let mut responses = Vec::new(); + for request in requests { + log_request(&request.method, &request.params); + if let Some(resp) = dispatch_request(&state, request).await { + responses.push(resp); + } + } + + let value = serde_json::to_value(responses).expect("failed to serialize responses"); + return (StatusCode::OK, Json(value)); + } + + // Single request let request: JsonRpcRequest = match serde_json::from_str(&body) { Ok(req) => req, Err(_) => { - return (StatusCode::OK, Json(JsonRpcResponse::parse_error())); + let resp = serde_json::to_value(JsonRpcResponse::parse_error()) + .expect("failed to serialize response"); + return (StatusCode::OK, Json(resp)); } }; - // Validate JSON-RPC version - if !request.is_valid_version() { - return (StatusCode::OK, Json(JsonRpcResponse::invalid_request())); - } - - let id = request.id.clone().unwrap_or(JsonRpcId::Null); - - // Log the method call - info!( - &state.logger, - "JSON-RPC call"; - "method" => &request.method, - "params" => ?request.params, - "remote_addr" => %remote_addr, - "x_forwarded_for" => header(&headers, "x-forwarded-for"), - "x_real_ip" => header(&headers, "x-real-ip"), - "x_forwarded_proto" => header(&headers, "x-forwarded-proto") - ); + log_request(&request.method, &request.params); - // Dispatch to the appropriate handler - let response = match request.method.as_str() { - "subgraph_create" => handle_create(&state, &request, id.clone()).await, - "subgraph_deploy" => handle_deploy(&state, &request, id.clone()).await, - "subgraph_remove" => handle_remove(&state, &request, id.clone()).await, - "subgraph_reassign" => handle_reassign(&state, &request, id.clone()).await, - "subgraph_pause" => handle_pause(&state, &request, id.clone()).await, - "subgraph_resume" => handle_resume(&state, &request, id.clone()).await, - _ => JsonRpcResponse::error(id, JsonRpcError::method_not_found()), - }; + let response = dispatch_request(&state, request) + .await + .unwrap_or_else(|| JsonRpcResponse::success(JsonRpcId::Null, JsonValue::Null)); - (StatusCode::OK, Json(response)) + let value = serde_json::to_value(response).expect("failed to serialize response"); + (StatusCode::OK, Json(value)) } /// Parse parameters from a JSON-RPC request. diff --git a/server/json-rpc/src/jsonrpc.rs b/server/json-rpc/src/jsonrpc.rs index 3d2603db151..e8a3da155a2 100644 --- a/server/json-rpc/src/jsonrpc.rs +++ b/server/json-rpc/src/jsonrpc.rs @@ -195,4 +195,59 @@ mod tests { assert!(json.contains(r#""id":"req-1""#)); assert!(!json.contains("result")); } + + #[test] + fn deserialize_batch_request() { + let json = r#"[ + {"jsonrpc":"2.0","method":"subgraph_create","id":1}, + {"jsonrpc":"2.0","method":"subgraph_remove","id":2} + ]"#; + let requests: Vec = serde_json::from_str(json).unwrap(); + assert_eq!(requests.len(), 2); + assert_eq!(requests[0].method, "subgraph_create"); + assert_eq!(requests[0].id, Some(JsonRpcId::Number(1))); + assert_eq!(requests[1].method, "subgraph_remove"); + assert_eq!(requests[1].id, Some(JsonRpcId::Number(2))); + } + + #[test] + fn serialize_batch_response() { + let responses = vec![ + JsonRpcResponse::success(JsonRpcId::Number(1), serde_json::json!({"ok": true})), + JsonRpcResponse::error( + JsonRpcId::Number(2), + JsonRpcError::new(-32601, "Method not found"), + ), + ]; + let json = serde_json::to_string(&responses).unwrap(); + // Must be a JSON array + assert!(json.starts_with('[')); + assert!(json.ends_with(']')); + // Both responses present + let parsed: Vec = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.len(), 2); + assert_eq!(parsed[0]["id"], 1); + assert_eq!(parsed[1]["id"], 2); + } + + #[test] + fn batch_with_notification_omits_response() { + // A notification has no id — when filtered, the batch response should + // contain fewer entries than the batch request. + let json = r#"[ + {"jsonrpc":"2.0","method":"subgraph_create","id":1}, + {"jsonrpc":"2.0","method":"subgraph_remove"} + ]"#; + let requests: Vec = serde_json::from_str(json).unwrap(); + assert_eq!(requests.len(), 2); + assert!(requests[0].id.is_some()); + assert!(requests[1].id.is_none()); + } + + #[test] + fn empty_batch_is_valid_json_array() { + let json = "[]"; + let requests: Vec = serde_json::from_str(json).unwrap(); + assert!(requests.is_empty()); + } }