diff --git a/src/api/instances.zig b/src/api/instances.zig index 1b61e59..aa1aa45 100644 --- a/src/api/instances.zig +++ b/src/api/instances.zig @@ -12,11 +12,13 @@ const local_binary = @import("../core/local_binary.zig"); const component_cli = @import("../core/component_cli.zig"); const integration_mod = @import("../core/integration.zig"); const launch_args_mod = @import("../core/launch_args.zig"); +const gateway_access = @import("../core/gateway_access.zig"); const managed_skills = @import("../managed_skills.zig"); const manifest_mod = @import("../core/manifest.zig"); const managed_cli = @import("managed_cli.zig"); const nullclaw_web_channel = @import("../core/nullclaw_web_channel.zig"); const query_api = @import("query.zig"); +const proxy_api = @import("proxy.zig"); const test_helpers = @import("../test_helpers.zig"); const instance_runtime = @import("instance_runtime.zig"); @@ -26,6 +28,8 @@ const jsonOk = helpers.jsonOk; const notFound = helpers.notFound; const badRequest = helpers.badRequest; const methodNotAllowed = helpers.methodNotAllowed; +const gateway_ready_timeout_ms: i64 = 15_000; +const gateway_ready_poll_ms: u64 = 100; // ─── Helpers ───────────────────────────────────────────────────────────────── @@ -884,6 +888,381 @@ fn writeJsonConfigValue(allocator: std.mem.Allocator, config_path: []const u8, v try out.writeAll("\n"); } +const GatewayAccess = struct { + token: []u8, + changed: bool, + + fn deinit(self: GatewayAccess, allocator: std.mem.Allocator) void { + allocator.free(self.token); + } +}; + +pub const GatewayProxyUpstream = struct { + port: u16, + token: []u8, + upstream_path: []const u8, + body: []const u8, + body_owned: bool = false, + event_stream: bool, + + pub fn deinit(self: GatewayProxyUpstream, allocator: std.mem.Allocator) void { + allocator.free(self.token); + if (self.body_owned) allocator.free(self.body); + } +}; + +pub const GatewayProxyPrepareResult = union(enum) { + no_match, + response: ApiResponse, + upstream: GatewayProxyUpstream, +}; + +fn ensureJsonObjectField( + allocator: std.mem.Allocator, + obj: *std.json.ObjectMap, + key: []const u8, + changed: *bool, +) !*std.json.ObjectMap { + const gop = try obj.getOrPut(allocator, key); + if (!gop.found_existing or gop.value_ptr.* != .object) { + gop.value_ptr.* = .{ .object = .empty }; + changed.* = true; + } + return &gop.value_ptr.object; +} + +fn setBoolField( + allocator: std.mem.Allocator, + obj: *std.json.ObjectMap, + key: []const u8, + value: bool, + changed: *bool, +) !void { + if (obj.get(key)) |existing| { + if (existing == .bool and existing.bool == value) return; + } + try obj.put(allocator, key, .{ .bool = value }); + changed.* = true; +} + +fn setIntegerAtLeast( + allocator: std.mem.Allocator, + obj: *std.json.ObjectMap, + key: []const u8, + minimum: i64, + changed: *bool, +) !void { + if (obj.get(key)) |existing| { + if (existing == .integer and existing.integer >= minimum) return; + } + try obj.put(allocator, key, .{ .integer = minimum }); + changed.* = true; +} + +fn ensureNullclawGatewayConfig( + allocator: std.mem.Allocator, + paths: paths_mod.Paths, + component: []const u8, + name: []const u8, +) !GatewayAccess { + if (!std.mem.eql(u8, component, "nullclaw")) return error.UnsupportedComponent; + + const config_path = try paths.instanceConfig(allocator, component, name); + defer allocator.free(config_path); + const file = try std_compat.fs.openFileAbsolute(config_path, .{}); + defer file.close(); + const contents = try file.readToEndAlloc(allocator, 4 * 1024 * 1024); + defer allocator.free(contents); + + var parsed = try std.json.parseFromSlice(std.json.Value, allocator, contents, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }); + defer parsed.deinit(); + if (parsed.value != .object) return error.InvalidConfig; + + const json_allocator = parsed.arena.allocator(); + var changed = false; + const root = &parsed.value.object; + const gateway_obj = try ensureJsonObjectField(json_allocator, root, "gateway", &changed); + const a2a_obj = try ensureJsonObjectField(json_allocator, root, "a2a", &changed); + + try setBoolField(json_allocator, gateway_obj, "require_pairing", true, &changed); + try setIntegerAtLeast(json_allocator, gateway_obj, "max_body_size_bytes", gateway_access.min_body_size, &changed); + try setIntegerAtLeast(json_allocator, gateway_obj, "request_timeout_secs", gateway_access.min_timeout_secs, &changed); + try setBoolField(json_allocator, a2a_obj, "enabled", true, &changed); + try setBoolField(json_allocator, a2a_obj, "multi_modal", true, &changed); + + const token = try gateway_access.ensurePairedToken(allocator, json_allocator, paths, component, name, gateway_obj, &changed); + errdefer allocator.free(token); + + if (changed) { + try writeJsonConfigValue(allocator, config_path, parsed.value); + } + + return .{ .token = token, .changed = changed }; +} + +fn resolveGatewayPort( + allocator: std.mem.Allocator, + paths: paths_mod.Paths, + manager: *manager_mod.Manager, + component: []const u8, + name: []const u8, + entry: state_mod.InstanceEntry, +) ?u16 { + const snapshot = instance_runtime.resolve(allocator, paths, manager, component, name, entry); + if (snapshot.port != 0) return snapshot.port; + return instance_runtime.readPortFromConfig(allocator, paths, component, name, "gateway.port"); +} + +fn isRuntimeRunning( + allocator: std.mem.Allocator, + paths: paths_mod.Paths, + manager: *manager_mod.Manager, + component: []const u8, + name: []const u8, + entry: state_mod.InstanceEntry, +) bool { + const snapshot = instance_runtime.resolve(allocator, paths, manager, component, name, entry); + return snapshot.status == .running; +} + +fn waitForRuntimeRunning( + allocator: std.mem.Allocator, + paths: paths_mod.Paths, + manager: *manager_mod.Manager, + component: []const u8, + name: []const u8, + entry: state_mod.InstanceEntry, +) bool { + const deadline = std_compat.time.milliTimestamp() + gateway_ready_timeout_ms; + while (true) { + manager.tick(); + const snapshot = instance_runtime.resolve(allocator, paths, manager, component, name, entry); + switch (snapshot.status) { + .running => return true, + .failed, .stopped => return false, + else => {}, + } + if (std_compat.time.milliTimestamp() >= deadline) return false; + std_compat.thread.sleep(gateway_ready_poll_ms * std.time.ns_per_ms); + } +} + +fn gatewayNotReady() ApiResponse { + return .{ + .status = "503 Service Unavailable", + .content_type = "application/json", + .body = "{\"error\":\"nullclaw gateway is not ready\"}", + }; +} + +fn ensureInstanceRunning( + allocator: std.mem.Allocator, + s: *state_mod.State, + manager: *manager_mod.Manager, + paths: paths_mod.Paths, + component: []const u8, + name: []const u8, + entry: state_mod.InstanceEntry, + config_changed: bool, +) ?ApiResponse { + if (config_changed and isRuntimeRunning(allocator, paths, manager, component, name, entry)) { + const restart_resp = handleRestart(allocator, s, manager, paths, component, name, ""); + if (!std.mem.eql(u8, restart_resp.status, "200 OK")) return restart_resp; + if (!waitForRuntimeRunning(allocator, paths, manager, component, name, entry)) return gatewayNotReady(); + return null; + } + if (!isRuntimeRunning(allocator, paths, manager, component, name, entry)) { + const start_resp = handleStart(allocator, s, manager, paths, component, name, ""); + if (!std.mem.eql(u8, start_resp.status, "200 OK")) return start_resp; + if (!waitForRuntimeRunning(allocator, paths, manager, component, name, entry)) return gatewayNotReady(); + } + return null; +} + +fn isSuccessStatus(status: []const u8) bool { + return status.len >= 1 and status[0] == '2'; +} + +fn handleGatewayProxy( + allocator: std.mem.Allocator, + s: *state_mod.State, + manager: *manager_mod.Manager, + paths: paths_mod.Paths, + component: []const u8, + name: []const u8, + method: []const u8, + upstream_path: []const u8, + body: []const u8, + event_stream: bool, +) ApiResponse { + if (!std.mem.eql(u8, component, "nullclaw")) { + return badRequest("{\"error\":\"gateway proxy routes are only supported for nullclaw instances\"}"); + } + const entry = s.getInstance(component, name) orelse return notFound(); + const access = ensureNullclawGatewayConfig(allocator, paths, component, name) catch |err| switch (err) { + error.UnsupportedComponent => return badRequest("{\"error\":\"unsupported component\"}"), + error.FileNotFound => return .{ .status = "404 Not Found", .content_type = "application/json", .body = "{\"error\":\"config not found\"}" }, + else => return helpers.serverError(), + }; + defer access.deinit(allocator); + + if (ensureInstanceRunning(allocator, s, manager, paths, component, name, entry, access.changed)) |resp| return resp; + + const port = resolveGatewayPort(allocator, paths, manager, component, name, entry) orelse + return .{ .status = "503 Service Unavailable", .content_type = "application/json", .body = "{\"error\":\"gateway port unavailable\"}" }; + const base_url = std.fmt.allocPrint(allocator, "http://127.0.0.1:{d}", .{port}) catch return helpers.serverError(); + defer allocator.free(base_url); + + const proxied = proxy_api.forward(allocator, .{ + .method = method, + .base_url = base_url, + .path = upstream_path, + .body = body, + .bearer_token = access.token, + .accept = if (event_stream) "text/event-stream" else null, + .unreachable_body = "{\"error\":\"nullclaw gateway unreachable\"}", + }); + return .{ + .status = proxied.status, + .content_type = if (event_stream and isSuccessStatus(proxied.status)) "text/event-stream" else proxied.content_type, + .body = proxied.body, + }; +} + +const GatewayProxyRoute = struct { + upstream_path: []const u8, + event_stream: bool, + body_mode: enum { raw, agent_stream_a2a }, +}; + +fn gatewayProxyRouteForAction(action: []const u8) ?GatewayProxyRoute { + if (std.mem.eql(u8, action, "agent-stream")) return .{ .upstream_path = "/a2a", .event_stream = true, .body_mode = .agent_stream_a2a }; + if (std.mem.eql(u8, action, "a2a")) return .{ .upstream_path = "/a2a", .event_stream = false, .body_mode = .raw }; + if (std.mem.eql(u8, action, "a2a-stream")) return .{ .upstream_path = "/a2a", .event_stream = true, .body_mode = .raw }; + if (std.mem.eql(u8, action, "transcribe")) return .{ .upstream_path = "/media/transcribe", .event_stream = false, .body_mode = .raw }; + return null; +} + +fn buildAgentStreamA2aBody(allocator: std.mem.Allocator, body: []const u8) ![]u8 { + const parsed = std.json.parseFromSlice(struct { + message: ?[]const u8 = null, + session_key: ?[]const u8 = null, + context_id: ?[]const u8 = null, + request_id: ?[]const u8 = null, + message_id: ?[]const u8 = null, + }, allocator, body, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }) catch return error.InvalidJson; + defer parsed.deinit(); + + const message = parsed.value.message orelse return error.MissingMessage; + if (message.len == 0) return error.MissingMessage; + + const now = std_compat.time.milliTimestamp(); + const request_id = parsed.value.request_id orelse ""; + const message_id = parsed.value.message_id orelse ""; + const context_id = parsed.value.context_id orelse (parsed.value.session_key orelse ""); + + var buf = std.array_list.Managed(u8).init(allocator); + errdefer buf.deinit(); + + try buf.appendSlice("{\"jsonrpc\":\"2.0\",\"id\":\""); + if (request_id.len > 0) { + try appendEscaped(&buf, request_id); + } else { + const generated = try std.fmt.allocPrint(allocator, "nullhub-agent-stream-{d}", .{now}); + defer allocator.free(generated); + try buf.appendSlice(generated); + } + try buf.appendSlice("\",\"method\":\"message/stream\",\"params\":{\"message\":{\"kind\":\"message\",\"role\":\"user\",\"messageId\":\""); + if (message_id.len > 0) { + try appendEscaped(&buf, message_id); + } else { + const generated = try std.fmt.allocPrint(allocator, "msg-nullhub-{d}", .{now}); + defer allocator.free(generated); + try buf.appendSlice(generated); + } + try buf.appendSlice("\""); + if (context_id.len > 0) { + try buf.appendSlice(",\"contextId\":\""); + try appendEscaped(&buf, context_id); + try buf.appendSlice("\""); + } + try buf.appendSlice(",\"parts\":[{\"kind\":\"text\",\"text\":\""); + try appendEscaped(&buf, message); + try buf.appendSlice("\"}]},\"configuration\":{\"acceptedOutputModes\":[\"text/plain\"]}}}"); + + return try buf.toOwnedSlice(); +} + +pub fn isGatewayProxyPath(target: []const u8) bool { + const parsed = parsePath(target) orelse return false; + const action = parsed.action orelse return false; + return gatewayProxyRouteForAction(action) != null; +} + +pub fn prepareGatewayProxy( + allocator: std.mem.Allocator, + s: *state_mod.State, + manager: *manager_mod.Manager, + paths: paths_mod.Paths, + method: []const u8, + target: []const u8, + body: []const u8, +) GatewayProxyPrepareResult { + const parsed = parsePath(target) orelse return .no_match; + const action = parsed.action orelse return .no_match; + const route = gatewayProxyRouteForAction(action) orelse return .no_match; + if (!std.mem.eql(u8, method, "POST")) return .{ .response = methodNotAllowed() }; + if (!std.mem.eql(u8, parsed.component, "nullclaw")) { + return .{ .response = badRequest("{\"error\":\"gateway proxy routes are only supported for nullclaw instances\"}") }; + } + + const proxy_body = switch (route.body_mode) { + .raw => body, + .agent_stream_a2a => buildAgentStreamA2aBody(allocator, body) catch |err| switch (err) { + error.InvalidJson => return .{ .response = badRequest("{\"error\":\"invalid JSON body\"}") }, + error.MissingMessage => return .{ .response = badRequest("{\"error\":\"message is required\"}") }, + else => return .{ .response = helpers.serverError() }, + }, + }; + const proxy_body_owned = route.body_mode == .agent_stream_a2a; + var proxy_body_transferred = false; + defer if (proxy_body_owned and !proxy_body_transferred) allocator.free(proxy_body); + + const entry = s.getInstance(parsed.component, parsed.name) orelse return .{ .response = notFound() }; + const access = ensureNullclawGatewayConfig(allocator, paths, parsed.component, parsed.name) catch |err| switch (err) { + error.UnsupportedComponent => return .{ .response = badRequest("{\"error\":\"unsupported component\"}") }, + error.FileNotFound => return .{ .response = .{ .status = "404 Not Found", .content_type = "application/json", .body = "{\"error\":\"config not found\"}" } }, + else => return .{ .response = helpers.serverError() }, + }; + errdefer access.deinit(allocator); + + if (ensureInstanceRunning(allocator, s, manager, paths, parsed.component, parsed.name, entry, access.changed)) |resp| { + access.deinit(allocator); + return .{ .response = resp }; + } + + const port = resolveGatewayPort(allocator, paths, manager, parsed.component, parsed.name, entry) orelse { + access.deinit(allocator); + return .{ .response = .{ .status = "503 Service Unavailable", .content_type = "application/json", .body = "{\"error\":\"gateway port unavailable\"}" } }; + }; + + proxy_body_transferred = true; + return .{ .upstream = .{ + .port = port, + .token = access.token, + .upstream_path = route.upstream_path, + .body = proxy_body, + .body_owned = proxy_body_owned, + .event_stream = route.event_stream, + } }; +} + const ProviderHealthConfig = struct { agents: ?struct { defaults: ?struct { @@ -4667,11 +5046,25 @@ pub fn dispatch( } if (std.mem.eql(u8, action, "agent-stream")) { if (!std.mem.eql(u8, method, "POST")) return methodNotAllowed(); - return .{ - .status = "501 Not Implemented", - .content_type = "application/json", - .body = "{\"error\":\"streaming agent sessions are not supported; use POST /agent\"}", + const proxy_body = buildAgentStreamA2aBody(allocator, body) catch |err| switch (err) { + error.InvalidJson => return badRequest("{\"error\":\"invalid JSON body\"}"), + error.MissingMessage => return badRequest("{\"error\":\"message is required\"}"), + else => return helpers.serverError(), }; + defer allocator.free(proxy_body); + return handleGatewayProxy(allocator, s, manager, paths, parsed.component, parsed.name, method, "/a2a", proxy_body, true); + } + if (std.mem.eql(u8, action, "a2a")) { + if (!std.mem.eql(u8, method, "POST")) return methodNotAllowed(); + return handleGatewayProxy(allocator, s, manager, paths, parsed.component, parsed.name, method, "/a2a", body, false); + } + if (std.mem.eql(u8, action, "a2a-stream")) { + if (!std.mem.eql(u8, method, "POST")) return methodNotAllowed(); + return handleGatewayProxy(allocator, s, manager, paths, parsed.component, parsed.name, method, "/a2a", body, true); + } + if (std.mem.eql(u8, action, "transcribe")) { + if (!std.mem.eql(u8, method, "POST")) return methodNotAllowed(); + return handleGatewayProxy(allocator, s, manager, paths, parsed.component, parsed.name, method, "/media/transcribe", body, false); } if (std.mem.eql(u8, action, "agent-sessions")) { return handleAgentSessions(allocator, s, paths, parsed.component, parsed.name, method, target); @@ -4823,6 +5216,78 @@ fn writeTestInstanceConfig( try file.writeAll("\n"); } +test "ensureNullclawGatewayConfig patches generic gateway capabilities" { + const allocator = std.testing.allocator; + var fixture = try test_helpers.TempPaths.init(allocator); + defer fixture.deinit(); + + try writeTestInstanceConfig( + allocator, + fixture.paths, + "nullclaw", + "hat", + "{\"gateway\":{\"port\":43123,\"max_body_size_bytes\":1024},\"a2a\":{\"enabled\":false},\"memory\":{\"profile\":\"minimal_none\",\"backend\":\"none\",\"auto_save\":false}}", + ); + + const access = try ensureNullclawGatewayConfig(allocator, fixture.paths, "nullclaw", "hat"); + defer access.deinit(allocator); + try std.testing.expect(std.mem.startsWith(u8, access.token, gateway_access.token_prefix)); + try std.testing.expect(access.changed); + + const config_path = try fixture.paths.instanceConfig(allocator, "nullclaw", "hat"); + defer allocator.free(config_path); + const file = try std_compat.fs.openFileAbsolute(config_path, .{}); + defer file.close(); + const bytes = try file.readToEndAlloc(allocator, 1024 * 1024); + defer allocator.free(bytes); + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, bytes, .{ .allocate = .alloc_always }); + defer parsed.deinit(); + + const gateway = parsed.value.object.get("gateway").?.object; + const a2a = parsed.value.object.get("a2a").?.object; + try std.testing.expect(gateway.get("require_pairing").?.bool); + try std.testing.expect(gateway.get("max_body_size_bytes").?.integer >= gateway_access.min_body_size); + try std.testing.expect(gateway.get("request_timeout_secs").?.integer >= gateway_access.min_timeout_secs); + try std.testing.expect(a2a.get("enabled").?.bool); + try std.testing.expect(a2a.get("multi_modal").?.bool); + + const expected_hash = try gateway_access.hashTokenAlloc(allocator, access.token); + defer allocator.free(expected_hash); + const paired_tokens = gateway.get("paired_tokens").?.array.items; + try std.testing.expectEqual(@as(usize, 1), paired_tokens.len); + try std.testing.expectEqualStrings(expected_hash, paired_tokens[0].string); + try std.testing.expect(!gateway_access.isNullhubToken(paired_tokens[0].string)); + + const token_path = try gateway_access.tokenPath(allocator, fixture.paths, "nullclaw", "hat"); + defer allocator.free(token_path); + const token_file = try std_compat.fs.openFileAbsolute(token_path, .{}); + defer token_file.close(); + const stored_token_bytes = try token_file.readToEndAlloc(allocator, 16 * 1024); + defer allocator.free(stored_token_bytes); + try std.testing.expectEqualStrings(access.token, std.mem.trim(u8, stored_token_bytes, " \t\r\n")); + + const access2 = try ensureNullclawGatewayConfig(allocator, fixture.paths, "nullclaw", "hat"); + defer access2.deinit(allocator); + try std.testing.expectEqualStrings(access.token, access2.token); + try std.testing.expect(!access2.changed); +} + +test "buildAgentStreamA2aBody translates managed agent request to A2A message stream" { + const allocator = std.testing.allocator; + const body = try buildAgentStreamA2aBody( + allocator, + "{\"message\":\"hello \\\"world\\\"\",\"session_key\":\"interview-1\",\"request_id\":\"req-1\",\"message_id\":\"msg-1\",\"provider\":\"ignored\"}", + ); + defer allocator.free(body); + + try std.testing.expect(std.mem.indexOf(u8, body, "\"method\":\"message/stream\"") != null); + try std.testing.expect(std.mem.indexOf(u8, body, "\"id\":\"req-1\"") != null); + try std.testing.expect(std.mem.indexOf(u8, body, "\"messageId\":\"msg-1\"") != null); + try std.testing.expect(std.mem.indexOf(u8, body, "\"contextId\":\"interview-1\"") != null); + try std.testing.expect(std.mem.indexOf(u8, body, "\"text\":\"hello \\\\\"world\\\\\"\"") != null); + try std.testing.expect(isGatewayProxyPath("/api/instances/nullclaw/hat/agent-stream")); +} + fn writeTestTrackerWorkflow( allocator: std.mem.Allocator, paths: paths_mod.Paths, @@ -7831,8 +8296,18 @@ test "dispatch routes agent invoke stream and sessions" { try std.testing.expectEqualStrings("200 OK", invoke_resp.status); try std.testing.expect(std.mem.indexOf(u8, invoke_resp.body, "\"response\":\"world\"") != null); - const stream_resp = dispatch(allocator, &s, &mctx.manager, &mctx.mutex, mctx.paths, "POST", "/api/instances/nullclaw/my-agent/agent-stream", "").?; - try std.testing.expectEqualStrings("501 Not Implemented", stream_resp.status); + const stream_resp = dispatch( + allocator, + &s, + &mctx.manager, + &mctx.mutex, + mctx.paths, + "POST", + "/api/instances/nullclaw/my-agent/agent-stream", + "{\"message\":\"hello\",\"session_key\":\"s-1\",\"provider\":\"openai\",\"model\":\"gpt-5\",\"temperature\":\"0.3\",\"agent\":\"helper\"}", + ).?; + try std.testing.expectEqualStrings("404 Not Found", stream_resp.status); + try std.testing.expect(std.mem.indexOf(u8, stream_resp.body, "config not found") != null); const list_resp = dispatch(allocator, &s, &mctx.manager, &mctx.mutex, mctx.paths, "GET", "/api/instances/nullclaw/my-agent/agent-sessions", "").?; defer allocator.free(list_resp.body); diff --git a/src/api/meta.zig b/src/api/meta.zig index 0313e21..18b9755 100644 --- a/src/api/meta.zig +++ b/src/api/meta.zig @@ -548,7 +548,7 @@ const routes = [_]RouteSpec{ .summary = "Create or update a component instance from wizard form data.", .auth_mode = "optional_bearer", .path_params = wizard_component_params[0..], - .body = "Wizard submission JSON.", + .body = "Wizard submission JSON. For nullclaw, optional generic NullHub hints such as nullhub_profile=stateless prepare memory=none, A2A multimodal, media-sized gateway limits, and a NullHub-managed gateway token.", .response = "Created instance payload or validation error.", }, .{ @@ -905,10 +905,44 @@ const routes = [_]RouteSpec{ .method = "POST", .path_template = "/api/instances/{component}/{name}/agent-stream", .category = "instances", - .summary = "Streaming agent turns are not supported through nullhub; this route returns 501.", + .summary = "Stream a managed nullclaw agent turn by translating the request to gateway A2A message/stream.", .auth_mode = "optional_bearer", .path_params = common_instance_params[0..], - .response = "Not implemented payload.", + .body = "JSON body with message and optional session_key. Provider/model overrides are not applied on this gateway-backed streaming route.", + .response = "text/event-stream A2A task events.", + }, + .{ + .id = "instances.a2a", + .method = "POST", + .path_template = "/api/instances/{component}/{name}/a2a", + .category = "instances", + .summary = "Proxy a JSON-RPC A2A request to a managed nullclaw gateway.", + .auth_mode = "optional_bearer", + .path_params = common_instance_params[0..], + .body = "A2A JSON-RPC request body.", + .response = "A2A JSON-RPC response payload.", + }, + .{ + .id = "instances.a2a.stream", + .method = "POST", + .path_template = "/api/instances/{component}/{name}/a2a-stream", + .category = "instances", + .summary = "Proxy a streaming A2A JSON-RPC request to a managed nullclaw gateway.", + .auth_mode = "optional_bearer", + .path_params = common_instance_params[0..], + .body = "A2A JSON-RPC request body using a streaming method.", + .response = "text/event-stream A2A response.", + }, + .{ + .id = "instances.transcribe", + .method = "POST", + .path_template = "/api/instances/{component}/{name}/transcribe", + .category = "instances", + .summary = "Transcribe audio through a managed nullclaw gateway.", + .auth_mode = "optional_bearer", + .path_params = common_instance_params[0..], + .body = "JSON body with audio_base64, mime_type, source, and optional language.", + .response = "Transcript JSON payload.", }, .{ .id = "instances.agent.sessions", diff --git a/src/api/proxy.zig b/src/api/proxy.zig index 7d9d11b..886e053 100644 --- a/src/api/proxy.zig +++ b/src/api/proxy.zig @@ -1,5 +1,6 @@ const std = @import("std"); const std_compat = @import("compat"); +const net_compat = @import("../net_compat.zig"); const Allocator = std.mem.Allocator; @@ -15,9 +16,16 @@ pub const ForwardOptions = struct { path: []const u8, body: []const u8, bearer_token: ?[]const u8 = null, + content_type: []const u8 = "application/json", + accept: ?[]const u8 = null, unreachable_body: []const u8 = "{\"error\":\"upstream unreachable\"}", }; +const LocalBaseUrl = struct { + host: []const u8, + port: u16, +}; + pub fn isPathInNamespace(target: []const u8, prefix: []const u8) bool { return std.mem.eql(u8, target, prefix) or (target.len > prefix.len and @@ -35,13 +43,23 @@ pub fn forward(allocator: Allocator, opts: ForwardOptions) Response { var auth_header: ?[]const u8 = null; defer if (auth_header) |value| allocator.free(value); - var header_buf: [1]std.http.Header = undefined; + var header_buf: [3]std.http.Header = undefined; + var header_count: usize = 0; + if (opts.body.len > 0 and opts.content_type.len > 0) { + header_buf[header_count] = .{ .name = "Content-Type", .value = opts.content_type }; + header_count += 1; + } + if (opts.accept) |accept| { + header_buf[header_count] = .{ .name = "Accept", .value = accept }; + header_count += 1; + } const extra_headers: []const std.http.Header = if (opts.bearer_token) |token| blk: { auth_header = std.fmt.allocPrint(allocator, "Bearer {s}", .{token}) catch return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; - header_buf[0] = .{ .name = "Authorization", .value = auth_header.? }; - break :blk header_buf[0..1]; - } else &.{}; + header_buf[header_count] = .{ .name = "Authorization", .value = auth_header.? }; + header_count += 1; + break :blk header_buf[0..header_count]; + } else header_buf[0..header_count]; var client: std.http.Client = .{ .allocator = allocator, .io = std_compat.io() }; defer client.deinit(); @@ -64,11 +82,184 @@ pub fn forward(allocator: Allocator, opts: ForwardOptions) Response { return .{ .status = mapStatus(@intFromEnum(result.status)), - .content_type = "application/json", + .content_type = if (@intFromEnum(result.status) >= 200 and @intFromEnum(result.status) < 300) (opts.accept orelse "application/json") else "application/json", .body = resp_body, }; } +pub fn forwardStream(allocator: Allocator, opts: ForwardOptions, downstream: std_compat.net.Stream, cors_headers: []const u8) !void { + _ = parseMethod(opts.method) orelse { + try writeDirectResponse(downstream, "405 Method Not Allowed", "application/json", "{\"error\":\"method not allowed\"}", cors_headers); + return; + }; + const base = parseLocalHttpBaseUrl(opts.base_url) orelse { + try writeDirectResponse(downstream, "502 Bad Gateway", "application/json", opts.unreachable_body, cors_headers); + return; + }; + + const upstream = std_compat.net.tcpConnectToHost(allocator, base.host, base.port) catch { + try writeDirectResponse(downstream, "502 Bad Gateway", "application/json", opts.unreachable_body, cors_headers); + return; + }; + defer upstream.close(); + + var auth_line: ?[]u8 = null; + defer if (auth_line) |line| allocator.free(line); + if (opts.bearer_token) |token| { + auth_line = try std.fmt.allocPrint(allocator, "Authorization: Bearer {s}\r\n", .{token}); + } + + var accept_line: ?[]u8 = null; + defer if (accept_line) |line| allocator.free(line); + if (opts.accept) |accept| { + accept_line = try std.fmt.allocPrint(allocator, "Accept: {s}\r\n", .{accept}); + } + + const content_type_line = if (opts.body.len > 0 and opts.content_type.len > 0) + try std.fmt.allocPrint(allocator, "Content-Type: {s}\r\n", .{opts.content_type}) + else + try allocator.dupe(u8, ""); + defer allocator.free(content_type_line); + + const header = try std.fmt.allocPrint( + allocator, + "{s} {s} HTTP/1.1\r\nHost: {s}:{d}\r\n{s}{s}{s}Content-Length: {d}\r\nConnection: close\r\n\r\n", + .{ + opts.method, + opts.path, + base.host, + base.port, + content_type_line, + accept_line orelse "", + auth_line orelse "", + opts.body.len, + }, + ); + defer allocator.free(header); + + try net_compat.streamWriteAll(upstream, header); + if (opts.body.len > 0) try net_compat.streamWriteAll(upstream, opts.body); + + var header_buf: [64 * 1024]u8 = undefined; + var header_len: usize = 0; + var header_end: ?usize = null; + while (header_end == null) { + if (header_len == header_buf.len) { + try writeDirectResponse(downstream, "502 Bad Gateway", "application/json", "{\"error\":\"upstream response headers too large\"}", cors_headers); + return; + } + const n = net_compat.streamRead(upstream, header_buf[header_len..]) catch { + try writeDirectResponse(downstream, "502 Bad Gateway", "application/json", opts.unreachable_body, cors_headers); + return; + }; + if (n == 0) { + try writeDirectResponse(downstream, "502 Bad Gateway", "application/json", opts.unreachable_body, cors_headers); + return; + } + header_len += n; + if (std.mem.indexOf(u8, header_buf[0..header_len], "\r\n\r\n")) |pos| { + header_end = pos; + } + } + + const end = header_end.?; + const upstream_headers = header_buf[0..end]; + const body_start = end + 4; + const status_code = parseHttpStatusCode(upstream_headers) orelse 502; + const status = mapStatus(@intCast(@min(status_code, 999))); + const content_type = extractHttpHeader(upstream_headers, "Content-Type") orelse + if (status_code >= 200 and status_code < 300) (opts.accept orelse "application/octet-stream") else "application/json"; + const upstream_chunked = if (extractHttpHeader(upstream_headers, "Transfer-Encoding")) |value| + headerValueContainsToken(value, "chunked") + else + false; + + try writeStreamingResponseHeaders(downstream, status, content_type, cors_headers, upstream_chunked); + if (header_len > body_start) { + try net_compat.streamWriteAll(downstream, header_buf[body_start..header_len]); + } + + var buf: [16 * 1024]u8 = undefined; + while (true) { + const n = net_compat.streamRead(upstream, &buf) catch return; + if (n == 0) return; + try net_compat.streamWriteAll(downstream, buf[0..n]); + } +} + +fn parseLocalHttpBaseUrl(base_url: []const u8) ?LocalBaseUrl { + const prefix = "http://"; + if (!std.mem.startsWith(u8, base_url, prefix)) return null; + const rest = base_url[prefix.len..]; + const host_port = if (std.mem.indexOfScalar(u8, rest, '/')) |slash| rest[0..slash] else rest; + const colon = std.mem.lastIndexOfScalar(u8, host_port, ':') orelse return null; + const host = host_port[0..colon]; + if (host.len == 0) return null; + const port = std.fmt.parseInt(u16, host_port[colon + 1 ..], 10) catch return null; + return .{ .host = host, .port = port }; +} + +fn parseHttpStatusCode(headers: []const u8) ?u16 { + const line_end = std.mem.indexOf(u8, headers, "\r\n") orelse headers.len; + const status_line = headers[0..line_end]; + var parts = std.mem.splitScalar(u8, status_line, ' '); + _ = parts.next() orelse return null; + const code_text = parts.next() orelse return null; + return std.fmt.parseInt(u16, code_text, 10) catch null; +} + +fn extractHttpHeader(headers: []const u8, name: []const u8) ?[]const u8 { + var lines = std.mem.splitSequence(u8, headers, "\r\n"); + _ = lines.next(); + while (lines.next()) |line| { + const colon = std.mem.indexOfScalar(u8, line, ':') orelse continue; + if (!std.ascii.eqlIgnoreCase(line[0..colon], name)) continue; + return std_compat.mem.trimLeft(u8, line[colon + 1 ..], " \t"); + } + return null; +} + +fn headerValueContainsToken(value: []const u8, token: []const u8) bool { + var parts = std.mem.splitScalar(u8, value, ','); + while (parts.next()) |part| { + const trimmed = std.mem.trim(u8, part, " \t"); + if (std.ascii.eqlIgnoreCase(trimmed, token)) return true; + } + return false; +} + +fn writeDirectResponse(stream: std_compat.net.Stream, status: []const u8, content_type: []const u8, body: []const u8, cors_headers: []const u8) !void { + var buf: [4096]u8 = undefined; + var writer: std.Io.Writer = .fixed(&buf); + try writer.print("HTTP/1.1 {s}\r\n", .{status}); + try writer.print("Content-Type: {s}\r\n", .{content_type}); + try writer.print("Content-Length: {d}\r\n", .{body.len}); + try writer.writeAll(cors_headers); + try writer.writeAll("Connection: close\r\n\r\n"); + if (body.len <= buf.len - writer.buffered().len) { + try writer.writeAll(body); + try net_compat.streamWriteAll(stream, writer.buffered()); + return; + } + try net_compat.streamWriteAll(stream, writer.buffered()); + if (body.len > 0) try net_compat.streamWriteAll(stream, body); +} + +fn writeStreamingResponseHeaders(stream: std_compat.net.Stream, status: []const u8, content_type: []const u8, cors_headers: []const u8, transfer_encoding_chunked: bool) !void { + var buf: [4096]u8 = undefined; + var writer: std.Io.Writer = .fixed(&buf); + try writer.print("HTTP/1.1 {s}\r\n", .{status}); + try writer.print("Content-Type: {s}\r\n", .{content_type}); + try writer.writeAll("Cache-Control: no-cache\r\n"); + try writer.writeAll("X-Accel-Buffering: no\r\n"); + if (transfer_encoding_chunked) { + try writer.writeAll("Transfer-Encoding: chunked\r\n"); + } + try writer.writeAll(cors_headers); + try writer.writeAll("Connection: close\r\n\r\n"); + try net_compat.streamWriteAll(stream, writer.buffered()); +} + fn parseMethod(method: []const u8) ?std.http.Method { if (std.mem.eql(u8, method, "GET")) return .GET; if (std.mem.eql(u8, method, "POST")) return .POST; @@ -106,3 +297,9 @@ test "isPathInNamespace matches exact and slash-delimited paths" { try std.testing.expect(!isPathInNamespace("/api/observability-extra", "/api/observability")); try std.testing.expect(!isPathInNamespace("/api/orchestration", "/api/observability")); } + +test "headerValueContainsToken detects comma-separated transfer encodings" { + try std.testing.expect(headerValueContainsToken("gzip, chunked", "chunked")); + try std.testing.expect(headerValueContainsToken(" Chunked ", "chunked")); + try std.testing.expect(!headerValueContainsToken("gzip", "chunked")); +} diff --git a/src/core/gateway_access.zig b/src/core/gateway_access.zig new file mode 100644 index 0000000..f37d516 --- /dev/null +++ b/src/core/gateway_access.zig @@ -0,0 +1,156 @@ +const std = @import("std"); +const std_compat = @import("compat"); +const paths_mod = @import("paths.zig"); + +pub const token_prefix = "nullhub-local-"; +pub const token_file = ".nullhub-gateway-token"; +pub const min_body_size: i64 = 64 * 1024 * 1024; +pub const min_timeout_secs: i64 = 120; + +pub fn generateToken(allocator: std.mem.Allocator) ![]u8 { + var random_bytes: [24]u8 = undefined; + std_compat.crypto.random.bytes(&random_bytes); + const hex = "0123456789abcdef"; + var token = try allocator.alloc(u8, token_prefix.len + random_bytes.len * 2); + @memcpy(token[0..token_prefix.len], token_prefix); + for (random_bytes, 0..) |b, i| { + token[token_prefix.len + i * 2] = hex[b >> 4]; + token[token_prefix.len + i * 2 + 1] = hex[b & 0x0f]; + } + return token; +} + +pub fn isNullhubToken(token: []const u8) bool { + return std.mem.startsWith(u8, token, token_prefix); +} + +pub fn tokenPath( + allocator: std.mem.Allocator, + paths: paths_mod.Paths, + component: []const u8, + name: []const u8, +) ![]u8 { + const instance_dir = try paths.instanceDir(allocator, component, name); + defer allocator.free(instance_dir); + return std.fs.path.join(allocator, &.{ instance_dir, token_file }); +} + +pub fn readStoredToken( + allocator: std.mem.Allocator, + paths: paths_mod.Paths, + component: []const u8, + name: []const u8, +) !?[]u8 { + const path = try tokenPath(allocator, paths, component, name); + defer allocator.free(path); + + const file = std_compat.fs.openFileAbsolute(path, .{}) catch |err| switch (err) { + error.FileNotFound => return null, + else => return err, + }; + defer file.close(); + + const contents = try file.readToEndAlloc(allocator, 16 * 1024); + errdefer allocator.free(contents); + const trimmed = std.mem.trim(u8, contents, " \t\r\n"); + if (!isNullhubToken(trimmed)) { + allocator.free(contents); + return null; + } + if (trimmed.len == contents.len) return contents; + const token = try allocator.dupe(u8, trimmed); + allocator.free(contents); + return token; +} + +pub fn writeStoredToken( + allocator: std.mem.Allocator, + paths: paths_mod.Paths, + component: []const u8, + name: []const u8, + token: []const u8, +) !void { + const path = try tokenPath(allocator, paths, component, name); + defer allocator.free(path); + + const file = try std_compat.fs.createFileAbsolute(path, .{ .truncate = true }); + defer file.close(); + if (comptime std_compat.fs.has_executable_bit) file.chmod(0o600) catch {}; + try file.writeAll(token); + try file.writeAll("\n"); +} + +pub fn hashTokenAlloc(allocator: std.mem.Allocator, token: []const u8) ![]u8 { + var digest: [std.crypto.hash.sha2.Sha256.digest_length]u8 = undefined; + std.crypto.hash.sha2.Sha256.hash(token, &digest, .{}); + const hex = "0123456789abcdef"; + var out = try allocator.alloc(u8, digest.len * 2); + for (digest, 0..) |b, i| { + out[i * 2] = hex[b >> 4]; + out[i * 2 + 1] = hex[b & 0x0f]; + } + return out; +} + +pub fn ensurePairedToken( + allocator: std.mem.Allocator, + json_allocator: std.mem.Allocator, + paths: paths_mod.Paths, + component: []const u8, + name: []const u8, + gateway_obj: *std.json.ObjectMap, + changed: *bool, +) ![]u8 { + const token = blk: { + if (try readStoredToken(allocator, paths, component, name)) |stored| { + break :blk stored; + } + const generated = try generateToken(allocator); + errdefer allocator.free(generated); + try writeStoredToken(allocator, paths, component, name, generated); + break :blk generated; + }; + errdefer allocator.free(token); + + const token_hash = try hashTokenAlloc(allocator, token); + defer allocator.free(token_hash); + + if (gateway_obj.getPtr("paired_tokens")) |tokens_value| { + if (tokens_value.* == .array) { + var has_hash = false; + var has_plaintext_nullhub_token = false; + for (tokens_value.array.items) |item| { + if (item == .string and std.mem.eql(u8, item.string, token_hash)) { + has_hash = true; + } else if (item == .string and isNullhubToken(item.string)) { + has_plaintext_nullhub_token = true; + } + } + + if (has_hash and !has_plaintext_nullhub_token) return token; + + var tokens = std.json.Array.init(json_allocator); + var inserted_hash = false; + for (tokens_value.array.items) |item| { + if (item == .string and isNullhubToken(item.string)) continue; + if (item == .string and std.mem.eql(u8, item.string, token_hash)) { + if (inserted_hash) continue; + inserted_hash = true; + } + try tokens.append(item); + } + if (!inserted_hash) { + try tokens.append(.{ .string = try json_allocator.dupe(u8, token_hash) }); + } + tokens_value.* = .{ .array = tokens }; + changed.* = true; + return token; + } + } + + var tokens = std.json.Array.init(json_allocator); + try tokens.append(.{ .string = try json_allocator.dupe(u8, token_hash) }); + try gateway_obj.put(json_allocator, "paired_tokens", .{ .array = tokens }); + changed.* = true; + return token; +} diff --git a/src/installer/orchestrator.zig b/src/installer/orchestrator.zig index b257d9b..994bfc9 100644 --- a/src/installer/orchestrator.zig +++ b/src/installer/orchestrator.zig @@ -9,6 +9,7 @@ const paths_mod = @import("../core/paths.zig"); const state_mod = @import("../core/state.zig"); const platform = @import("../core/platform.zig"); const local_binary = @import("../core/local_binary.zig"); +const gateway_access = @import("../core/gateway_access.zig"); const fs_compat = @import("../fs_compat.zig"); const launch_args_mod = @import("../core/launch_args.zig"); const nullclaw_web_channel = @import("../core/nullclaw_web_channel.zig"); @@ -222,12 +223,20 @@ pub fn install( // If any selected provider is openai-compatible (has a base_url), strip only those // entries before passing answers to the binary. The binary only knows standard // provider names; custom credentials and fallback order are restored afterwards. - const custom_provider_result = extractCustomProviders(allocator, opts.answers_json) catch |err| blk: { + const nullclaw_runtime_profile = parseNullclawRuntimeProfile(allocator, opts.component, opts.answers_json); + const answers_without_nullhub_hints = stripNullhubRuntimeProfileHints(allocator, opts.component, opts.answers_json, nullclaw_runtime_profile) catch |err| { + std.log.warn("stripNullhubRuntimeProfileHints failed: {s}", .{@errorName(err)}); + setLastErrorDetail("failed to prepare nullclaw runtime profile answers"); + return error.ConfigGenerationFailed; + }; + defer if (answers_without_nullhub_hints.ptr != opts.answers_json.ptr) allocator.free(answers_without_nullhub_hints); + + const custom_provider_result = extractCustomProviders(allocator, answers_without_nullhub_hints) catch |err| blk: { std.log.warn("extractCustomProviders failed: {s}", .{@errorName(err)}); break :blk null; }; defer if (custom_provider_result) |cp| cp.deinit(allocator); - const answers_for_binary = if (custom_provider_result) |cp| cp.stripped_json else opts.answers_json; + const answers_for_binary = if (custom_provider_result) |cp| cp.stripped_json else answers_without_nullhub_hints; const answers_with_port = injectPortFields(allocator, answers_for_binary, port, managed_port) catch answers_for_binary; defer if (answers_with_port.ptr != answers_for_binary.ptr) allocator.free(answers_with_port); @@ -268,6 +277,18 @@ pub fn install( } } + patchNullclawRuntimeProfileIntoConfig( + allocator, + p, + opts.component, + opts.instance_name, + nullclaw_runtime_profile, + ) catch |err| { + std.log.warn("failed to patch nullclaw runtime profile config: {s}", .{@errorName(err)}); + setLastErrorDetail("failed to patch nullclaw runtime profile config"); + return error.ConfigGenerationFailed; + }; + _ = nullclaw_web_channel.ensureNullclawWebChannelConfig( allocator, p, @@ -728,6 +749,16 @@ const ProviderSelection = struct { model: []const u8, }; +const NullclawRuntimeProfile = struct { + requested: bool = false, + stateless: bool = false, + gateway_require_pairing: bool = true, + gateway_max_body_size_bytes: i64 = gateway_access.min_body_size, + gateway_request_timeout_secs: i64 = gateway_access.min_timeout_secs, + a2a_enabled: bool = true, + a2a_multi_modal: bool = true, +}; + const CustomProvidersRewrite = struct { custom_providers: []CustomProvider, selections: []ProviderSelection, @@ -787,6 +818,28 @@ fn stringField(obj: *std.json.ObjectMap, key: []const u8) []const u8 { }; } +fn boolField(obj: *std.json.ObjectMap, key: []const u8) ?bool { + return switch (obj.get(key) orelse .null) { + .bool => |value| value, + .string => |value| if (std.ascii.eqlIgnoreCase(value, "true")) + true + else if (std.ascii.eqlIgnoreCase(value, "false")) + false + else + null, + else => null, + }; +} + +fn integerField(obj: *std.json.ObjectMap, key: []const u8) ?i64 { + return switch (obj.get(key) orelse .null) { + .integer => |value| value, + .number_string => |value| std.fmt.parseInt(i64, value, 10) catch null, + .string => |value| std.fmt.parseInt(i64, value, 10) catch null, + else => null, + }; +} + fn appendProviderSelection( allocator: std.mem.Allocator, selections: *std.array_list.Managed(ProviderSelection), @@ -921,6 +974,159 @@ fn extractCustomProviders(allocator: std.mem.Allocator, json: []const u8) !?Cust }; } +fn parseNullclawRuntimeProfile( + allocator: std.mem.Allocator, + component: []const u8, + answers_json: []const u8, +) NullclawRuntimeProfile { + if (!std.mem.eql(u8, component, "nullclaw")) return .{}; + + var parsed = std.json.parseFromSlice(std.json.Value, allocator, answers_json, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }) catch return .{}; + defer parsed.deinit(); + if (parsed.value != .object) return .{}; + + const root = &parsed.value.object; + var profile: NullclawRuntimeProfile = .{}; + + const profile_name = blk: { + const nullhub_profile = stringField(root, "nullhub_profile"); + if (nullhub_profile.len > 0) break :blk nullhub_profile; + const runtime_profile = stringField(root, "nullhub_runtime_profile"); + if (runtime_profile.len > 0) break :blk runtime_profile; + break :blk stringField(root, "runtime_profile"); + }; + if (std.ascii.eqlIgnoreCase(profile_name, "stateless") or std.ascii.eqlIgnoreCase(profile_name, "minimal_none")) { + profile.requested = true; + profile.stateless = true; + } + if (boolField(root, "stateless") == true) { + profile.requested = true; + profile.stateless = true; + } + + if (std.ascii.eqlIgnoreCase(stringField(root, "memory_profile"), "minimal_none") or + std.ascii.eqlIgnoreCase(stringField(root, "memory_backend"), "none") or + boolField(root, "memory_auto_save") == false) + { + profile.requested = true; + profile.stateless = true; + } + + if (root.get("memory")) |memory_value| { + if (memory_value == .object) { + var memory = memory_value.object; + if (std.ascii.eqlIgnoreCase(stringField(&memory, "profile"), "minimal_none") or + std.ascii.eqlIgnoreCase(stringField(&memory, "backend"), "none") or + boolField(&memory, "auto_save") == false) + { + profile.requested = true; + profile.stateless = true; + } + } + } + + if (boolField(root, "gateway_require_pairing")) |value| { + profile.requested = true; + profile.gateway_require_pairing = value; + } + if (integerField(root, "gateway_max_body_size_bytes")) |value| { + profile.requested = true; + profile.gateway_max_body_size_bytes = @max(value, gateway_access.min_body_size); + } + if (integerField(root, "gateway_request_timeout_secs")) |value| { + profile.requested = true; + profile.gateway_request_timeout_secs = @max(value, gateway_access.min_timeout_secs); + } + if (boolField(root, "a2a_enabled")) |value| { + profile.requested = true; + profile.a2a_enabled = value; + } + if (boolField(root, "a2a_multi_modal")) |value| { + profile.requested = true; + profile.a2a_multi_modal = value; + } + if (root.get("gateway")) |gateway_value| { + if (gateway_value == .object) { + var gateway = gateway_value.object; + if (boolField(&gateway, "require_pairing")) |value| { + profile.requested = true; + profile.gateway_require_pairing = value; + } + if (integerField(&gateway, "max_body_size_bytes")) |value| { + profile.requested = true; + profile.gateway_max_body_size_bytes = @max(value, gateway_access.min_body_size); + } + if (integerField(&gateway, "request_timeout_secs")) |value| { + profile.requested = true; + profile.gateway_request_timeout_secs = @max(value, gateway_access.min_timeout_secs); + } + } + } + if (root.get("a2a")) |a2a_value| { + if (a2a_value == .object) { + var a2a = a2a_value.object; + if (boolField(&a2a, "enabled")) |value| { + profile.requested = true; + profile.a2a_enabled = value; + } + if (boolField(&a2a, "multi_modal")) |value| { + profile.requested = true; + profile.a2a_multi_modal = value; + } + } + } + + return profile; +} + +fn stripObjectHintFields(root: *std.json.ObjectMap, object_key: []const u8, fields: []const []const u8) void { + if (root.getPtr(object_key)) |value| { + if (value.* != .object) return; + for (fields) |field| { + _ = value.object.orderedRemove(field); + } + if (value.object.count() == 0) { + _ = root.orderedRemove(object_key); + } + } +} + +fn stripNullhubRuntimeProfileHints(allocator: std.mem.Allocator, component: []const u8, answers_json: []const u8, profile: NullclawRuntimeProfile) ![]const u8 { + if (!std.mem.eql(u8, component, "nullclaw") or !profile.requested) return answers_json; + + var parsed = try std.json.parseFromSlice(std.json.Value, allocator, answers_json, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }); + defer parsed.deinit(); + if (parsed.value != .object) return answers_json; + + var root = &parsed.value.object; + _ = root.orderedRemove("nullhub_profile"); + _ = root.orderedRemove("nullhub_runtime_profile"); + _ = root.orderedRemove("runtime_profile"); + _ = root.orderedRemove("stateless"); + _ = root.orderedRemove("memory_profile"); + _ = root.orderedRemove("memory_backend"); + _ = root.orderedRemove("memory_auto_save"); + _ = root.orderedRemove("gateway_require_pairing"); + _ = root.orderedRemove("gateway_max_body_size_bytes"); + _ = root.orderedRemove("gateway_request_timeout_secs"); + _ = root.orderedRemove("a2a_enabled"); + _ = root.orderedRemove("a2a_multi_modal"); + const memory_fields = [_][]const u8{ "profile", "backend", "auto_save" }; + const gateway_fields = [_][]const u8{ "require_pairing", "max_body_size_bytes", "request_timeout_secs" }; + const a2a_fields = [_][]const u8{ "enabled", "multi_modal" }; + stripObjectHintFields(root, "memory", memory_fields[0..]); + stripObjectHintFields(root, "gateway", gateway_fields[0..]); + stripObjectHintFields(root, "a2a", a2a_fields[0..]); + + return std.json.Stringify.valueAlloc(allocator, parsed.value, .{}); +} + fn selectionContainsProvider(selections: []const ProviderSelection, provider: []const u8) bool { for (selections) |selection| { if (std.mem.eql(u8, selection.provider, provider)) return true; @@ -1038,6 +1244,97 @@ fn patchCustomProvidersIntoConfig( try out.writeAll("\n"); } +fn setStringField(allocator: std.mem.Allocator, obj: *std.json.ObjectMap, key: []const u8, value: []const u8, changed: *bool) !void { + if (obj.get(key)) |existing| { + if (existing == .string and std.mem.eql(u8, existing.string, value)) return; + } + try obj.put(allocator, key, .{ .string = try allocator.dupe(u8, value) }); + changed.* = true; +} + +fn setBoolField(allocator: std.mem.Allocator, obj: *std.json.ObjectMap, key: []const u8, value: bool, changed: *bool) !void { + if (obj.get(key)) |existing| { + if (existing == .bool and existing.bool == value) return; + } + try obj.put(allocator, key, .{ .bool = value }); + changed.* = true; +} + +fn setIntegerAtLeast(allocator: std.mem.Allocator, obj: *std.json.ObjectMap, key: []const u8, minimum: i64, changed: *bool) !void { + if (obj.get(key)) |existing| { + if (existing == .integer and existing.integer >= minimum) return; + } + try obj.put(allocator, key, .{ .integer = minimum }); + changed.* = true; +} + +fn patchNullclawRuntimeProfileIntoConfig( + allocator: std.mem.Allocator, + p: paths_mod.Paths, + component: []const u8, + name: []const u8, + profile: NullclawRuntimeProfile, +) !void { + if (!std.mem.eql(u8, component, "nullclaw") or !profile.requested) return; + + const config_path = try p.instanceConfig(allocator, component, name); + defer allocator.free(config_path); + + const contents = blk: { + const file = std_compat.fs.openFileAbsolute(config_path, .{}) catch |err| switch (err) { + error.FileNotFound => break :blk try allocator.dupe(u8, "{}"), + else => return err, + }; + defer file.close(); + break :blk try file.readToEndAlloc(allocator, MAX_CONFIG_BYTES); + }; + defer allocator.free(contents); + + var parsed = try std.json.parseFromSlice(std.json.Value, allocator, contents, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }); + defer parsed.deinit(); + if (parsed.value != .object) return error.InvalidConfig; + + const json_allocator = parsed.arena.allocator(); + var changed = false; + const root = &parsed.value.object; + const gateway_obj = try ensureObjectInMap(json_allocator, root, "gateway"); + const a2a_obj = try ensureObjectInMap(json_allocator, root, "a2a"); + + try setBoolField(json_allocator, gateway_obj, "require_pairing", profile.gateway_require_pairing, &changed); + try setIntegerAtLeast(json_allocator, gateway_obj, "max_body_size_bytes", profile.gateway_max_body_size_bytes, &changed); + try setIntegerAtLeast(json_allocator, gateway_obj, "request_timeout_secs", profile.gateway_request_timeout_secs, &changed); + try setBoolField(json_allocator, a2a_obj, "enabled", profile.a2a_enabled, &changed); + try setBoolField(json_allocator, a2a_obj, "multi_modal", profile.a2a_multi_modal, &changed); + + if (profile.gateway_require_pairing) { + const token = try gateway_access.ensurePairedToken(allocator, json_allocator, p, component, name, gateway_obj, &changed); + allocator.free(token); + } + + if (profile.stateless) { + const memory_obj = try ensureObjectInMap(json_allocator, root, "memory"); + try setStringField(json_allocator, memory_obj, "profile", "minimal_none", &changed); + try setStringField(json_allocator, memory_obj, "backend", "none", &changed); + try setBoolField(json_allocator, memory_obj, "auto_save", false, &changed); + } + + if (!changed) return; + + const rendered = try std.json.Stringify.valueAlloc(allocator, parsed.value, .{ + .whitespace = .indent_2, + .emit_null_optional_fields = false, + }); + defer allocator.free(rendered); + + const out = try std_compat.fs.createFileAbsolute(config_path, .{ .truncate = true }); + defer out.close(); + try out.writeAll(rendered); + try out.writeAll("\n"); +} + fn ensureObjectInMap( allocator: std.mem.Allocator, obj: *std.json.ObjectMap, @@ -1765,3 +2062,91 @@ test "patchCustomProvidersIntoConfig restores primary custom and keeps standard try std.testing.expectEqual(@as(usize, 1), fallbacks.len); try std.testing.expectEqualStrings("openrouter", fallbacks[0].string); } + +test "stripNullhubRuntimeProfileHints removes nullhub-only fields before component config generation" { + const allocator = std.testing.allocator; + const body = + \\{"instance_name":"stateless-agent","provider":"openrouter","nullhub_profile":"stateless","memory_backend":"none","gateway_max_body_size_bytes":33554432,"a2a_multi_modal":true,"memory":{"backend":"none","auto_save":false},"gateway":{"max_body_size_bytes":33554432},"a2a":{"multi_modal":true}} + ; + const profile = parseNullclawRuntimeProfile(allocator, "nullclaw", body); + try std.testing.expect(profile.requested); + try std.testing.expect(profile.stateless); + try std.testing.expect(profile.gateway_max_body_size_bytes >= 33_554_432); + + const stripped = try stripNullhubRuntimeProfileHints(allocator, "nullclaw", body, profile); + defer allocator.free(stripped); + try std.testing.expect(std.mem.indexOf(u8, stripped, "nullhub_profile") == null); + try std.testing.expect(std.mem.indexOf(u8, stripped, "memory_backend") == null); + try std.testing.expect(std.mem.indexOf(u8, stripped, "gateway_max_body_size_bytes") == null); + try std.testing.expect(std.mem.indexOf(u8, stripped, "\"memory\"") == null); + try std.testing.expect(std.mem.indexOf(u8, stripped, "\"gateway\"") == null); + try std.testing.expect(std.mem.indexOf(u8, stripped, "\"a2a\"") == null); + try std.testing.expect(std.mem.indexOf(u8, stripped, "\"provider\":\"openrouter\"") != null); +} + +test "patchNullclawRuntimeProfileIntoConfig prepares stateless gateway config for generic instances" { + const allocator = std.testing.allocator; + var fixture = try test_helpers.TempPaths.init(allocator); + defer fixture.deinit(); + try fixture.paths.ensureDirs(); + + const comp_dir = try std.fs.path.join(allocator, &.{ fixture.paths.root, "instances", "nullclaw" }); + defer allocator.free(comp_dir); + std_compat.fs.makeDirAbsolute(comp_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + const inst_dir = try fixture.paths.instanceDir(allocator, "nullclaw", "stateless-agent"); + defer allocator.free(inst_dir); + std_compat.fs.makeDirAbsolute(inst_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + const config_path = try fixture.paths.instanceConfig(allocator, "nullclaw", "stateless-agent"); + defer allocator.free(config_path); + try writeFile(config_path, + \\{"gateway":{"port":43123,"max_body_size_bytes":1024},"a2a":{"enabled":false},"memory":{"profile":"hybrid_keyword","backend":"hybrid","auto_save":true}} + ); + + const profile = parseNullclawRuntimeProfile(allocator, "nullclaw", + \\{"nullhub_profile":"stateless"} + ); + try patchNullclawRuntimeProfileIntoConfig(allocator, fixture.paths, "nullclaw", "stateless-agent", profile); + + const file = try std_compat.fs.openFileAbsolute(config_path, .{}); + defer file.close(); + const contents = try file.readToEndAlloc(allocator, 1024 * 1024); + defer allocator.free(contents); + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, contents, .{ .allocate = .alloc_always }); + defer parsed.deinit(); + + const root = parsed.value.object; + const gateway = root.get("gateway").?.object; + const a2a = root.get("a2a").?.object; + const memory = root.get("memory").?.object; + try std.testing.expect(gateway.get("require_pairing").?.bool); + try std.testing.expect(gateway.get("max_body_size_bytes").?.integer >= gateway_access.min_body_size); + try std.testing.expect(gateway.get("request_timeout_secs").?.integer >= gateway_access.min_timeout_secs); + try std.testing.expect(a2a.get("enabled").?.bool); + try std.testing.expect(a2a.get("multi_modal").?.bool); + try std.testing.expectEqualStrings("minimal_none", memory.get("profile").?.string); + try std.testing.expectEqualStrings("none", memory.get("backend").?.string); + try std.testing.expect(!memory.get("auto_save").?.bool); + + const token_path = try gateway_access.tokenPath(allocator, fixture.paths, "nullclaw", "stateless-agent"); + defer allocator.free(token_path); + const token_file = try std_compat.fs.openFileAbsolute(token_path, .{}); + defer token_file.close(); + const token_bytes = try token_file.readToEndAlloc(allocator, 16 * 1024); + defer allocator.free(token_bytes); + const token = std.mem.trim(u8, token_bytes, " \t\r\n"); + try std.testing.expect(gateway_access.isNullhubToken(token)); + + const expected_hash = try gateway_access.hashTokenAlloc(allocator, token); + defer allocator.free(expected_hash); + const paired_tokens = gateway.get("paired_tokens").?.array.items; + try std.testing.expectEqual(@as(usize, 1), paired_tokens.len); + try std.testing.expectEqualStrings(expected_hash, paired_tokens[0].string); + try std.testing.expect(!gateway_access.isNullhubToken(paired_tokens[0].string)); +} diff --git a/src/server.zig b/src/server.zig index 1d7bec5..3d2598a 100644 --- a/src/server.zig +++ b/src/server.zig @@ -3,6 +3,7 @@ const std_compat = @import("compat"); const net_compat = @import("net_compat.zig"); const auth = @import("auth.zig"); const instances_api = @import("api/instances.zig"); +const proxy_api = @import("api/proxy.zig"); const platform = @import("core/platform.zig"); const components_api = @import("api/components.zig"); const config_api = @import("api/config.zig"); @@ -16,6 +17,7 @@ const mdns_mod = @import("mdns.zig"); const state_mod = @import("core/state.zig"); const integration_mod = @import("core/integration.zig"); const paths_mod = @import("core/paths.zig"); +const gateway_access = @import("core/gateway_access.zig"); const manager_mod = @import("supervisor/manager.zig"); const process_mod = @import("supervisor/process.zig"); const runtime_state_mod = @import("supervisor/runtime_state.zig"); @@ -35,7 +37,8 @@ const ui_assets = @import("ui_assets"); const version = @import("version.zig"); const test_helpers = @import("test_helpers.zig"); -const max_request_size: usize = 65_536; +const max_request_size: usize = 64 * 1024 * 1024; +const initial_request_buffer_size: usize = 64 * 1024; pub const Server = struct { allocator: std.mem.Allocator, @@ -459,7 +462,7 @@ pub const Server = struct { } fn handleConnection(self: *Server, conn: std_compat.net.Server.Connection, alloc: std.mem.Allocator) !void { - var req_buf: [max_request_size]u8 = undefined; + var req_buf: [initial_request_buffer_size]u8 = undefined; const n = net_compat.streamRead(conn.stream, &req_buf) catch return; if (n == 0) return; const raw = req_buf[0..n]; @@ -491,7 +494,16 @@ pub const Server = struct { } // Read remaining body if Content-Length indicates more data - const body = readBody(raw, n, conn.stream, alloc) catch return; + const body = readBody(raw, n, conn.stream, alloc) catch |err| { + if (err == error.RequestTooLarge) { + try sendResponse(conn.stream, .{ + .status = "413 Payload Too Large", + .content_type = "application/json", + .body = "{\"error\":\"request body too large\"}", + }, raw, self.host, self.port, extra_origins); + } + return; + }; // Handle OPTIONS preflight if (std.mem.eql(u8, method, "OPTIONS")) { @@ -515,6 +527,44 @@ pub const Server = struct { } } + if (instances_api.isGatewayProxyPath(target)) { + const prepared = blk: { + self.mutex.lock(); + defer self.mutex.unlock(); + break :blk instances_api.prepareGatewayProxy(alloc, self.state, self.manager, self.paths, method, target, body); + }; + switch (prepared) { + .no_match => {}, + .response => |response| { + try sendResponse(conn.stream, .{ .status = response.status, .content_type = response.content_type, .body = response.body }, raw, self.host, self.port, extra_origins); + return; + }, + .upstream => |upstream| { + defer upstream.deinit(alloc); + const cors_headers = try buildCorsHeaders(alloc, raw, self.host, self.port, extra_origins); + defer alloc.free(cors_headers); + const base_url = try std.fmt.allocPrint(alloc, "http://127.0.0.1:{d}", .{upstream.port}); + defer alloc.free(base_url); + const proxy_options = proxy_api.ForwardOptions{ + .method = method, + .base_url = base_url, + .path = upstream.upstream_path, + .body = upstream.body, + .bearer_token = upstream.token, + .accept = if (upstream.event_stream) "text/event-stream" else null, + .unreachable_body = "{\"error\":\"nullclaw gateway unreachable\"}", + }; + if (upstream.event_stream) { + try proxy_api.forwardStream(alloc, proxy_options, conn.stream, cors_headers); + } else { + const proxied = proxy_api.forward(alloc, proxy_options); + try sendResponse(conn.stream, .{ .status = proxied.status, .content_type = proxied.content_type, .body = proxied.body }, raw, self.host, self.port, extra_origins); + } + return; + }, + } + } + // Route dispatch (lock mutex so supervisor thread doesn't race) const response = if (routeWithoutServerMutex(target)) self.route(alloc, method, target, body) @@ -1452,12 +1502,12 @@ fn readBody(raw: []const u8, n: usize, stream: std_compat.net.Stream, alloc: std const header_end_pos = std.mem.indexOf(u8, raw, "\r\n\r\n") orelse return ""; const body_start = header_end_pos + 4; const body_received = n - body_start; + if (requestBodyExceedsLimit(content_length)) return error.RequestTooLarge; if (body_received >= content_length) { return raw[body_start .. body_start + content_length]; } // Need to read more data from the stream const total_size = body_start + content_length; - if (total_size > max_request_size) return error.RequestTooLarge; const full_buf = try alloc.alloc(u8, total_size); @memcpy(full_buf[0..n], raw); var total_read = n; @@ -1472,6 +1522,10 @@ fn readBody(raw: []const u8, n: usize, stream: std_compat.net.Stream, alloc: std return extractBody(raw); } +fn requestBodyExceedsLimit(content_length: usize) bool { + return content_length > max_request_size; +} + fn sendResponse(stream: std_compat.net.Stream, response: Response, raw_request: []const u8, bind_host: []const u8, port: u16, extra_origins: []const []const u8) !void { var buf: [4096]u8 = undefined; var writer: std.Io.Writer = .fixed(&buf); @@ -1548,6 +1602,13 @@ fn appendCorsHeaders(writer: anytype, raw_request: []const u8, bind_host: []cons try writer.writeAll("Vary: Origin\r\n"); } +fn buildCorsHeaders(allocator: std.mem.Allocator, raw_request: []const u8, bind_host: []const u8, port: u16, extra_origins: []const []const u8) ![]u8 { + var out: std.Io.Writer.Allocating = .init(allocator); + errdefer out.deinit(); + try appendCorsHeaders(&out.writer, raw_request, bind_host, port, extra_origins); + return try out.toOwnedSlice(); +} + fn allowedCorsOrigin(raw_request: []const u8, bind_host: []const u8, port: u16, extra_origins: []const []const u8) ?[]const u8 { const origin = extractHeader(raw_request, "Origin") orelse return null; if (!isAllowedCorsOrigin(origin, bind_host, port, extra_origins)) return null; @@ -2564,6 +2625,13 @@ test "contentType returns correct MIME type for .html" { try std.testing.expectEqualStrings("text/html", contentType("index.html")); } +test "initial request buffer stays small while media body limit remains high" { + try std.testing.expect(initial_request_buffer_size <= 128 * 1024); + try std.testing.expect(max_request_size >= @as(usize, @intCast(gateway_access.min_body_size))); + try std.testing.expect(!requestBodyExceedsLimit(max_request_size)); + try std.testing.expect(requestBodyExceedsLimit(max_request_size + 1)); +} + test "contentType returns correct MIME type for .js" { try std.testing.expectEqualStrings("application/javascript", contentType("app.js")); }