diff --git a/README.md b/README.md index ad7ed46..c25a813 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,21 @@ Map events = client.listIntentEvents(intentId, null, RequestOpti --- +## Agent Mesh - Monitor and Govern + +Agent Mesh gives every agent real-time health monitoring, policy enforcement, and a kill switch - all from a single dashboard. + +```java +client.mesh().startHeartbeat(); +client.mesh().reportMetric(Metric.builder().success(true).latencyMs(230).costUsd(0.02).build()); +``` + +Set action policies (allowlist/denylist intent types) and cost policies (intents/day, $/day limits) per agent via dashboard or API. Mesh module coming soon to this SDK - [Python SDK](https://github.com/AxmeAI/axme-sdk-python) available now. [Full overview](https://github.com/AxmeAI/axme#agent-mesh---see-and-control-your-agents). + +Open the live dashboard at [mesh.axme.ai](https://mesh.axme.ai) or run `axme mesh dashboard` from the CLI. + +--- + ## Examples See [`examples/BasicSubmit.java`](examples/BasicSubmit.java). More: [axme-examples](https://github.com/AxmeAI/axme-examples) diff --git a/src/main/java/dev/axme/sdk/AxmeClient.java b/src/main/java/dev/axme/sdk/AxmeClient.java index 1585201..38c9a44 100644 --- a/src/main/java/dev/axme/sdk/AxmeClient.java +++ b/src/main/java/dev/axme/sdk/AxmeClient.java @@ -22,6 +22,7 @@ public final class AxmeClient { private final String actorToken; private final HttpClient httpClient; private final ObjectMapper objectMapper = new ObjectMapper(); + private volatile MeshClient mesh; public AxmeClient(AxmeClientConfig config) { this(config, HttpClient.newHttpClient()); @@ -34,6 +35,25 @@ public AxmeClient(AxmeClientConfig config, HttpClient httpClient) { this.httpClient = httpClient; } + /** + * Returns the Agent Mesh sub-client (lazy-initialized, thread-safe). + * + * @return the shared {@link MeshClient} instance + */ + public MeshClient getMesh() { + MeshClient result = mesh; + if (result == null) { + synchronized (this) { + result = mesh; + if (result == null) { + result = new MeshClient(this); + mesh = result; + } + } + } + return result; + } + public Map registerNick(Map payload, RequestOptions options) throws IOException, InterruptedException { return requestJson("POST", "/v1/users/register-nick", Map.of(), payload, normalizeOptions(options)); @@ -830,7 +850,7 @@ private static boolean isTerminalIntentEvent(Map event) { return eventType instanceof String && TERMINAL_EVENT_TYPES.contains(eventType); } - private Map requestJson( + Map requestJson( String method, String path, Map query, diff --git a/src/main/java/dev/axme/sdk/MeshClient.java b/src/main/java/dev/axme/sdk/MeshClient.java new file mode 100644 index 0000000..840345d --- /dev/null +++ b/src/main/java/dev/axme/sdk/MeshClient.java @@ -0,0 +1,282 @@ +package dev.axme.sdk; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Agent Mesh operations - heartbeat, health monitoring, metrics reporting, + * agent management, and event listing. + * + *

Obtain an instance via {@link AxmeClient#getMesh()}. + */ +public final class MeshClient { + private final AxmeClient client; + private volatile Thread heartbeatThread; + private volatile boolean heartbeatStopRequested; + private final ConcurrentHashMap> metricsBuffer = new ConcurrentHashMap<>(); + private final AtomicInteger intentsTotal = new AtomicInteger(0); + private final AtomicInteger intentsSucceeded = new AtomicInteger(0); + private final AtomicInteger intentsFailed = new AtomicInteger(0); + + MeshClient(AxmeClient client) { + this.client = client; + } + + // -- Heartbeat --------------------------------------------------------------- + + /** + * Send a single heartbeat to the mesh. Optionally include metrics. + * + * @param metrics optional metrics map to include in the heartbeat + * @param traceId optional trace-id header value + * @return the gateway response body + */ + public Map heartbeat(Map metrics, String traceId) + throws IOException, InterruptedException { + Map body = new LinkedHashMap<>(); + if (metrics != null && !metrics.isEmpty()) { + body.put("metrics", metrics); + } + RequestOptions opts = traceId != null ? new RequestOptions(null, traceId) : RequestOptions.none(); + return client.requestJson( + "POST", + "/v1/mesh/heartbeat", + Map.of(), + body.isEmpty() ? null : body, + opts); + } + + /** + * Start a background daemon thread that sends heartbeats at regular intervals. + * + *

If a heartbeat thread is already running, this method is a no-op. + * + * @param intervalSeconds seconds between heartbeats (default 30) + * @param includeMetrics whether to flush and include buffered metrics with each heartbeat + */ + public synchronized void startHeartbeat(double intervalSeconds, boolean includeMetrics) { + if (heartbeatThread != null && heartbeatThread.isAlive()) { + return; // already running + } + heartbeatStopRequested = false; + + heartbeatThread = new Thread(() -> { + while (!heartbeatStopRequested) { + try { + Thread.sleep((long) (intervalSeconds * 1000)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + if (heartbeatStopRequested) { + return; + } + try { + Map metrics = includeMetrics ? flushMetrics() : null; + heartbeat(metrics, null); + } catch (Exception e) { + // Heartbeat failures are non-fatal + } + } + }, "axme-mesh-heartbeat"); + heartbeatThread.setDaemon(true); + heartbeatThread.start(); + } + + /** + * Start heartbeat with default settings (30s interval, include metrics). + */ + public void startHeartbeat() { + startHeartbeat(30.0, true); + } + + /** + * Stop the background heartbeat thread. Blocks up to 5 seconds for the + * thread to terminate. + */ + public synchronized void stopHeartbeat() { + heartbeatStopRequested = true; + Thread thread = heartbeatThread; + if (thread != null) { + thread.interrupt(); + try { + thread.join(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + heartbeatThread = null; + } + } + + // -- Metrics ----------------------------------------------------------------- + + /** + * Buffer a metric observation. Flushed with next heartbeat. + * + * @param success whether the intent succeeded + * @param latencyMs latency in milliseconds (may be null) + * @param costUsd cost in USD (may be null) + */ + public void reportMetric(boolean success, Double latencyMs, Double costUsd) { + int total = intentsTotal.incrementAndGet(); + if (success) { + intentsSucceeded.incrementAndGet(); + } else { + intentsFailed.incrementAndGet(); + } + if (latencyMs != null) { + metricsBuffer.compute("avg_latency_ms", (key, ref) -> { + if (ref == null) { + ref = new AtomicReference<>(0.0); + } + double prevAvg = ref.get(); + ref.set(prevAvg + (latencyMs - prevAvg) / total); + return ref; + }); + } + if (costUsd != null) { + metricsBuffer.compute("cost_usd", (key, ref) -> { + if (ref == null) { + ref = new AtomicReference<>(0.0); + } + ref.set(ref.get() + costUsd); + return ref; + }); + } + } + + /** + * Flush the buffered metrics and return them, or null if nothing was buffered. + */ + Map flushMetrics() { + int total = intentsTotal.getAndSet(0); + int succeeded = intentsSucceeded.getAndSet(0); + int failed = intentsFailed.getAndSet(0); + if (total == 0 && metricsBuffer.isEmpty()) { + return null; + } + Map result = new LinkedHashMap<>(); + if (total > 0) { + result.put("intents_total", total); + } + if (succeeded > 0) { + result.put("intents_succeeded", succeeded); + } + if (failed > 0) { + result.put("intents_failed", failed); + } + AtomicReference avgRef = metricsBuffer.remove("avg_latency_ms"); + if (avgRef != null) { + result.put("avg_latency_ms", avgRef.get()); + } + AtomicReference costRef = metricsBuffer.remove("cost_usd"); + if (costRef != null) { + result.put("cost_usd", costRef.get()); + } + return result.isEmpty() ? null : result; + } + + // -- Agent management -------------------------------------------------------- + + /** + * List all agents in the workspace with health status. + * + * @param limit maximum number of agents to return + * @param health optional health filter (e.g. "healthy", "degraded", "dead") + * @return gateway response body + */ + public Map listAgents(int limit, String health) + throws IOException, InterruptedException { + Map params = new LinkedHashMap<>(); + params.put("limit", String.valueOf(limit)); + if (health != null && !health.trim().isEmpty()) { + params.put("health", health); + } + return client.requestJson("GET", "/v1/mesh/agents", params, null, RequestOptions.none()); + } + + /** + * List agents with default limit of 100 and no health filter. + */ + public Map listAgents() throws IOException, InterruptedException { + return listAgents(100, null); + } + + /** + * Get single agent detail with metrics and events. + * + * @param addressId the agent address ID + * @return gateway response body + */ + public Map getAgent(String addressId) + throws IOException, InterruptedException { + return client.requestJson( + "GET", + "/v1/mesh/agents/" + addressId, + Map.of(), + null, + RequestOptions.none()); + } + + /** + * Kill an agent - block all intents to and from it. + * + * @param addressId the agent address ID to kill + * @return gateway response body + */ + public Map kill(String addressId) + throws IOException, InterruptedException { + return client.requestJson( + "POST", + "/v1/mesh/agents/" + addressId + "/kill", + Map.of(), + null, + RequestOptions.none()); + } + + /** + * Resume a killed agent. + * + * @param addressId the agent address ID to resume + * @return gateway response body + */ + public Map resume(String addressId) + throws IOException, InterruptedException { + return client.requestJson( + "POST", + "/v1/mesh/agents/" + addressId + "/resume", + Map.of(), + null, + RequestOptions.none()); + } + + // -- Events ------------------------------------------------------------------ + + /** + * List recent mesh events (kills, resumes, health changes). + * + * @param limit maximum number of events to return + * @param eventType optional filter by event type + * @return gateway response body + */ + public Map listEvents(int limit, String eventType) + throws IOException, InterruptedException { + Map params = new LinkedHashMap<>(); + params.put("limit", String.valueOf(limit)); + if (eventType != null && !eventType.trim().isEmpty()) { + params.put("event_type", eventType); + } + return client.requestJson("GET", "/v1/mesh/events", params, null, RequestOptions.none()); + } + + /** + * List events with default limit of 50 and no type filter. + */ + public Map listEvents() throws IOException, InterruptedException { + return listEvents(50, null); + } +} diff --git a/src/test/java/dev/axme/sdk/MeshClientTest.java b/src/test/java/dev/axme/sdk/MeshClientTest.java new file mode 100644 index 0000000..939079f --- /dev/null +++ b/src/test/java/dev/axme/sdk/MeshClientTest.java @@ -0,0 +1,315 @@ +package dev.axme.sdk; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class MeshClientTest { + private final ObjectMapper objectMapper = new ObjectMapper(); + private MockWebServer server; + private AxmeClient client; + private MeshClient mesh; + + @BeforeEach + void setUp() throws Exception { + server = new MockWebServer(); + server.start(); + client = new AxmeClient(new AxmeClientConfig(server.url("/").toString(), "test-key")); + mesh = client.getMesh(); + } + + @AfterEach + void tearDown() throws Exception { + mesh.stopHeartbeat(); + server.shutdown(); + } + + // -- getMesh lazy init ------------------------------------------------------- + + @Test + void getMeshReturnsSameInstance() { + MeshClient a = client.getMesh(); + MeshClient b = client.getMesh(); + assertTrue(a == b, "getMesh() must return the same instance"); + } + + // -- heartbeat --------------------------------------------------------------- + + @Test + void heartbeatPostsWithoutMetrics() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"ok\":true}")); + + Map response = mesh.heartbeat(null, null); + + RecordedRequest req = server.takeRequest(); + assertEquals("POST", req.getMethod()); + assertEquals("/v1/mesh/heartbeat", req.getPath()); + assertEquals("test-key", req.getHeader("x-api-key")); + assertTrue((Boolean) response.get("ok")); + // No body when metrics is null + String body = req.getBody().readUtf8(); + assertTrue(body.isEmpty(), "Expected empty body when no metrics, got: " + body); + } + + @Test + void heartbeatPostsWithMetrics() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"ok\":true}")); + + Map metrics = Map.of("intents_total", 5, "avg_latency_ms", 120.5); + mesh.heartbeat(metrics, "trace-abc"); + + RecordedRequest req = server.takeRequest(); + assertEquals("POST", req.getMethod()); + assertEquals("/v1/mesh/heartbeat", req.getPath()); + assertEquals("trace-abc", req.getHeader("X-Trace-Id")); + Map body = + objectMapper.readValue(req.getBody().readUtf8(), new TypeReference>() {}); + assertNotNull(body.get("metrics"), "Heartbeat body must contain metrics"); + } + + // -- reportMetric + flushMetrics --------------------------------------------- + + @Test + void reportMetricBuffersAndFlushes() { + mesh.reportMetric(true, 100.0, 0.01); + mesh.reportMetric(false, 200.0, 0.02); + mesh.reportMetric(true, null, null); + + Map flushed = mesh.flushMetrics(); + + assertNotNull(flushed); + assertEquals(3, ((Number) flushed.get("intents_total")).intValue()); + assertEquals(2, ((Number) flushed.get("intents_succeeded")).intValue()); + assertEquals(1, ((Number) flushed.get("intents_failed")).intValue()); + assertNotNull(flushed.get("avg_latency_ms")); + double costUsd = ((Number) flushed.get("cost_usd")).doubleValue(); + assertEquals(0.03, costUsd, 0.001); + + // Second flush should be null (buffer was drained) + assertNull(mesh.flushMetrics()); + } + + @Test + void flushMetricsReturnsNullWhenEmpty() { + assertNull(mesh.flushMetrics()); + } + + // -- startHeartbeat / stopHeartbeat ------------------------------------------ + + @Test + void startHeartbeatSendsPeriodicRequests() throws Exception { + // Enqueue enough responses for a few heartbeats + for (int i = 0; i < 5; i++) { + server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"ok\":true}")); + } + + mesh.startHeartbeat(0.1, false); + Thread.sleep(350); // Wait for a few intervals + mesh.stopHeartbeat(); + + // Should have received at least 2 heartbeat requests + assertTrue(server.getRequestCount() >= 2, + "Expected at least 2 heartbeats, got " + server.getRequestCount()); + RecordedRequest req = server.takeRequest(); + assertEquals("POST", req.getMethod()); + assertEquals("/v1/mesh/heartbeat", req.getPath()); + } + + @Test + void startHeartbeatIsIdempotent() throws Exception { + for (int i = 0; i < 10; i++) { + server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"ok\":true}")); + } + mesh.startHeartbeat(0.1, false); + mesh.startHeartbeat(0.1, false); // second call is a no-op + Thread.sleep(250); + mesh.stopHeartbeat(); + // Just checking it doesn't throw or start duplicate threads + assertTrue(server.getRequestCount() >= 1); + } + + @Test + void stopHeartbeatIsIdempotent() { + // stop without start should not throw + mesh.stopHeartbeat(); + mesh.stopHeartbeat(); + } + + // -- listAgents -------------------------------------------------------------- + + @Test + void listAgentsGetsWithLimitAndHealth() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200) + .setBody("{\"ok\":true,\"agents\":[{\"address_id\":\"agent-1\"}]}")); + + Map response = mesh.listAgents(50, "healthy"); + + RecordedRequest req = server.takeRequest(); + assertEquals("GET", req.getMethod()); + assertEquals("/v1/mesh/agents?limit=50&health=healthy", req.getPath()); + assertTrue((Boolean) response.get("ok")); + } + + @Test + void listAgentsGetsWithDefaultParams() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200) + .setBody("{\"ok\":true,\"agents\":[]}")); + + mesh.listAgents(); + + RecordedRequest req = server.takeRequest(); + assertEquals("GET", req.getMethod()); + assertEquals("/v1/mesh/agents?limit=100", req.getPath()); + } + + @Test + void listAgentsOmitsBlankHealth() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200) + .setBody("{\"ok\":true,\"agents\":[]}")); + + mesh.listAgents(25, null); + + RecordedRequest req = server.takeRequest(); + assertEquals("/v1/mesh/agents?limit=25", req.getPath()); + } + + // -- getAgent ---------------------------------------------------------------- + + @Test + void getAgentGetsById() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200) + .setBody("{\"ok\":true,\"address_id\":\"agent-1\",\"health\":\"healthy\"}")); + + Map response = mesh.getAgent("agent-1"); + + RecordedRequest req = server.takeRequest(); + assertEquals("GET", req.getMethod()); + assertEquals("/v1/mesh/agents/agent-1", req.getPath()); + assertEquals("healthy", response.get("health")); + } + + // -- kill -------------------------------------------------------------------- + + @Test + void killPostsToCorrectPath() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200) + .setBody("{\"ok\":true,\"status\":\"killed\"}")); + + Map response = mesh.kill("agent-bad"); + + RecordedRequest req = server.takeRequest(); + assertEquals("POST", req.getMethod()); + assertEquals("/v1/mesh/agents/agent-bad/kill", req.getPath()); + assertEquals("killed", response.get("status")); + } + + // -- resume ------------------------------------------------------------------ + + @Test + void resumePostsToCorrectPath() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200) + .setBody("{\"ok\":true,\"status\":\"alive\"}")); + + Map response = mesh.resume("agent-bad"); + + RecordedRequest req = server.takeRequest(); + assertEquals("POST", req.getMethod()); + assertEquals("/v1/mesh/agents/agent-bad/resume", req.getPath()); + assertEquals("alive", response.get("status")); + } + + // -- listEvents -------------------------------------------------------------- + + @Test + void listEventsGetsWithLimitAndType() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200) + .setBody("{\"ok\":true,\"events\":[{\"event_type\":\"agent.killed\"}]}")); + + Map response = mesh.listEvents(10, "agent.killed"); + + RecordedRequest req = server.takeRequest(); + assertEquals("GET", req.getMethod()); + assertEquals("/v1/mesh/events?limit=10&event_type=agent.killed", req.getPath()); + assertTrue((Boolean) response.get("ok")); + } + + @Test + void listEventsGetsWithDefaults() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200) + .setBody("{\"ok\":true,\"events\":[]}")); + + mesh.listEvents(); + + RecordedRequest req = server.takeRequest(); + assertEquals("GET", req.getMethod()); + assertEquals("/v1/mesh/events?limit=50", req.getPath()); + } + + @Test + void listEventsOmitsBlankEventType() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200) + .setBody("{\"ok\":true,\"events\":[]}")); + + mesh.listEvents(20, null); + + RecordedRequest req = server.takeRequest(); + assertEquals("/v1/mesh/events?limit=20", req.getPath()); + } + + // -- HTTP error propagation -------------------------------------------------- + + @Test + void meshMethodsThrowOnHttpError() { + server.enqueue(new MockResponse().setResponseCode(403).setBody("{\"error\":\"forbidden\"}")); + + try { + mesh.listAgents(); + } catch (Exception e) { + assertTrue(e instanceof AxmeHttpException); + assertEquals(403, ((AxmeHttpException) e).getStatusCode()); + } + } + + // -- heartbeat with flushed metrics ------------------------------------------ + + @Test + void heartbeatIncludesBufferedMetricsWhenPresent() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"ok\":true}")); + + mesh.reportMetric(true, 50.0, 0.005); + Map flushed = mesh.flushMetrics(); + mesh.heartbeat(flushed, null); + + RecordedRequest req = server.takeRequest(); + Map body = + objectMapper.readValue(req.getBody().readUtf8(), new TypeReference>() {}); + @SuppressWarnings("unchecked") + Map metrics = (Map) body.get("metrics"); + assertNotNull(metrics); + assertEquals(1, ((Number) metrics.get("intents_total")).intValue()); + assertEquals(1, ((Number) metrics.get("intents_succeeded")).intValue()); + } + + // -- auth header forwarding -------------------------------------------------- + + @Test + void meshRequestsIncludeApiKeyHeader() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"ok\":true}")); + + mesh.getAgent("a1"); + + RecordedRequest req = server.takeRequest(); + assertEquals("test-key", req.getHeader("x-api-key")); + } +}