diff --git a/server/json-rpc/src/handlers.rs b/server/json-rpc/src/handlers.rs index 23482a5f2bc..eda2ed67154 100644 --- a/server/json-rpc/src/handlers.rs +++ b/server/json-rpc/src/handlers.rs @@ -39,10 +39,46 @@ pub struct AppState { pub logger: Logger, } +/// Dispatch a single JSON-RPC request to the appropriate method handler. +/// +/// Returns `None` for notifications (requests without an id), since the +/// JSON-RPC 2.0 spec says notifications must not produce a response. +async fn dispatch_request( + state: &Arc>, + request: JsonRpcRequest, +) -> Option { + if !request.is_valid_version() { + return Some(JsonRpcResponse::invalid_request()); + } + + let id = request.id.clone().unwrap_or(JsonRpcId::Null); + + // Notifications (no id) should not produce a response per JSON-RPC 2.0 spec. + // But since all our methods have side effects, we still execute them; + // we just don't return the response. + let is_notification = request.id.is_none(); + + let response = match request.method.as_str() { + "subgraph_create" => handle_create(state, &request, id).await, + "subgraph_deploy" => handle_deploy(state, &request, id).await, + "subgraph_remove" => handle_remove(state, &request, id).await, + "subgraph_reassign" => handle_reassign(state, &request, id).await, + "subgraph_pause" => handle_pause(state, &request, id).await, + "subgraph_resume" => handle_resume(state, &request, id).await, + _ => JsonRpcResponse::error(id, JsonRpcError::method_not_found()), + }; + + if is_notification { + None + } else { + Some(response) + } +} + /// Main JSON-RPC request handler. /// -/// Processes incoming JSON-RPC requests, dispatches to the appropriate method handler, -/// and returns the response. +/// Supports both single requests and batch requests (JSON arrays). +/// Per JSON-RPC 2.0 spec, an empty batch array returns an invalid request error. pub async fn jsonrpc_handler( State(state): State>>, ConnectInfo(remote_addr): ConnectInfo, @@ -56,45 +92,64 @@ pub async fn jsonrpc_handler( .unwrap_or("unset") } - // Parse the JSON-RPC request + let log_request = |method: &str, params: &Option| { + info!( + &state.logger, + "JSON-RPC call"; + "method" => method, + "params" => ?params, + "remote_addr" => %remote_addr, + "x_forwarded_for" => header(&headers, "x-forwarded-for"), + "x_real_ip" => header(&headers, "x-real-ip"), + "x_forwarded_proto" => header(&headers, "x-forwarded-proto") + ); + }; + + // Try batch (JSON array) first, then single request. + if let Ok(requests) = serde_json::from_str::>(&body) { + if requests.is_empty() { + let resp = serde_json::to_value(JsonRpcResponse::invalid_request()) + .expect("failed to serialize response"); + return (StatusCode::OK, Json(resp)); + } + + info!( + &state.logger, + "JSON-RPC batch request"; + "batch_size" => requests.len(), + "remote_addr" => %remote_addr, + ); + + let mut responses = Vec::new(); + for request in requests { + log_request(&request.method, &request.params); + if let Some(resp) = dispatch_request(&state, request).await { + responses.push(resp); + } + } + + let value = serde_json::to_value(responses).expect("failed to serialize responses"); + return (StatusCode::OK, Json(value)); + } + + // Single request let request: JsonRpcRequest = match serde_json::from_str(&body) { Ok(req) => req, Err(_) => { - return (StatusCode::OK, Json(JsonRpcResponse::parse_error())); + let resp = serde_json::to_value(JsonRpcResponse::parse_error()) + .expect("failed to serialize response"); + return (StatusCode::OK, Json(resp)); } }; - // Validate JSON-RPC version - if !request.is_valid_version() { - return (StatusCode::OK, Json(JsonRpcResponse::invalid_request())); - } - - let id = request.id.clone().unwrap_or(JsonRpcId::Null); - - // Log the method call - info!( - &state.logger, - "JSON-RPC call"; - "method" => &request.method, - "params" => ?request.params, - "remote_addr" => %remote_addr, - "x_forwarded_for" => header(&headers, "x-forwarded-for"), - "x_real_ip" => header(&headers, "x-real-ip"), - "x_forwarded_proto" => header(&headers, "x-forwarded-proto") - ); + log_request(&request.method, &request.params); - // Dispatch to the appropriate handler - let response = match request.method.as_str() { - "subgraph_create" => handle_create(&state, &request, id.clone()).await, - "subgraph_deploy" => handle_deploy(&state, &request, id.clone()).await, - "subgraph_remove" => handle_remove(&state, &request, id.clone()).await, - "subgraph_reassign" => handle_reassign(&state, &request, id.clone()).await, - "subgraph_pause" => handle_pause(&state, &request, id.clone()).await, - "subgraph_resume" => handle_resume(&state, &request, id.clone()).await, - _ => JsonRpcResponse::error(id, JsonRpcError::method_not_found()), - }; + let response = dispatch_request(&state, request) + .await + .unwrap_or_else(|| JsonRpcResponse::success(JsonRpcId::Null, JsonValue::Null)); - (StatusCode::OK, Json(response)) + let value = serde_json::to_value(response).expect("failed to serialize response"); + (StatusCode::OK, Json(value)) } /// Parse parameters from a JSON-RPC request. diff --git a/server/json-rpc/src/jsonrpc.rs b/server/json-rpc/src/jsonrpc.rs index 3d2603db151..e8a3da155a2 100644 --- a/server/json-rpc/src/jsonrpc.rs +++ b/server/json-rpc/src/jsonrpc.rs @@ -195,4 +195,59 @@ mod tests { assert!(json.contains(r#""id":"req-1""#)); assert!(!json.contains("result")); } + + #[test] + fn deserialize_batch_request() { + let json = r#"[ + {"jsonrpc":"2.0","method":"subgraph_create","id":1}, + {"jsonrpc":"2.0","method":"subgraph_remove","id":2} + ]"#; + let requests: Vec = serde_json::from_str(json).unwrap(); + assert_eq!(requests.len(), 2); + assert_eq!(requests[0].method, "subgraph_create"); + assert_eq!(requests[0].id, Some(JsonRpcId::Number(1))); + assert_eq!(requests[1].method, "subgraph_remove"); + assert_eq!(requests[1].id, Some(JsonRpcId::Number(2))); + } + + #[test] + fn serialize_batch_response() { + let responses = vec![ + JsonRpcResponse::success(JsonRpcId::Number(1), serde_json::json!({"ok": true})), + JsonRpcResponse::error( + JsonRpcId::Number(2), + JsonRpcError::new(-32601, "Method not found"), + ), + ]; + let json = serde_json::to_string(&responses).unwrap(); + // Must be a JSON array + assert!(json.starts_with('[')); + assert!(json.ends_with(']')); + // Both responses present + let parsed: Vec = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.len(), 2); + assert_eq!(parsed[0]["id"], 1); + assert_eq!(parsed[1]["id"], 2); + } + + #[test] + fn batch_with_notification_omits_response() { + // A notification has no id — when filtered, the batch response should + // contain fewer entries than the batch request. + let json = r#"[ + {"jsonrpc":"2.0","method":"subgraph_create","id":1}, + {"jsonrpc":"2.0","method":"subgraph_remove"} + ]"#; + let requests: Vec = serde_json::from_str(json).unwrap(); + assert_eq!(requests.len(), 2); + assert!(requests[0].id.is_some()); + assert!(requests[1].id.is_none()); + } + + #[test] + fn empty_batch_is_valid_json_array() { + let json = "[]"; + let requests: Vec = serde_json::from_str(json).unwrap(); + assert!(requests.is_empty()); + } }