diff --git a/CHANGELOG.md b/CHANGELOG.md index e98d8ff..b8ad253 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,18 +7,23 @@ All notable changes to Agents.KT are documented here. The format follows [Keep a ### Added — `agents-kt-dir`: AGNTCY DIR directory client (#4520, PRD §12.6) — AGNTCY interop `DirClient` is a typed Kotlin client for the AGNTCY [DIR](https://github.com/agntcy/dir) content-addressed -directory `StoreService`, over generated grpc-kotlin coroutine stubs: `connect(host, port).use { dir -> val cid -= dir.push(agent.toOasfRecord(...)); val json = dir.pull(cid) }`. `push`/`pushAll` publish OASF records and -return their CIDs; `pull` fetches the record JSON by CID; `lookup` resolves metadata; `delete` removes. The -**directory** pillar of the AGNTCY epic (#4517), beside OASF export/import (#4518/#4519) and Identity-verify -(#4521). The record body is carried as a `google.protobuf.Struct` (JSON is the contract — no OASF protos), via -protobuf's canonical `JsonFormat` (whole numbers stay integral, e.g. `{"id":1003}`). Protos are vendored as a -**trimmed, wire-compatible** subset (same package/service/RPC/field-numbers; the `buf.validate` options and -unused referrer RPCs dropped) so the buf/validate closure isn't needed. **Auth:** plaintext (dev), TLS, and -OIDC **bearer**; SPIFFE/mTLS via a caller-supplied `ManagedChannel` (`fromChannel`). New feature module -`agents-kt-dir` (package `agents_engine.agntcy.dir`) so the grpc/protobuf/netty graph stays out of core. 4 -tests (in-process gRPC round trip). RoutingService/SearchService (network discovery) and OCI referrers are -follow-ups; with this, the AGNTCY epic's core is complete. +directory, over generated grpc-kotlin coroutine stubs — three services on one channel: +- **StoreService** (content-addressed CRUD): `push`/`pushAll` (OASF record → CID), `pull` (CID → JSON), + `lookup` (metadata), `delete`. +- **SearchService** (local content search): `searchRecords`/`searchCids` by typed `DirQuery` facet + (`DirQueryType.SKILL_NAME`, `DOMAIN_NAME`, `AUTHOR`, … — the OASF fields DIR indexes). +- **RoutingService** (network publish + discovery): `publish`/`unpublish` (announce records by CID), + `routeSearch` (cross-peer discovery → `DirRouteMatch`{cid, peer, score}; coarse skill/locator/domain/module + facets, non-routable facets rejected). + +The **directory** pillar of the AGNTCY epic (#4517), beside OASF export/import (#4518/#4519) and +Identity-verify (#4521). The record body is a `google.protobuf.Struct` (JSON is the contract — no OASF protos) +via protobuf's canonical `JsonFormat` (whole numbers stay integral, e.g. `{"id":1003}`). Protos are vendored +**trimmed + wire-compatible** (same package/service/RPC/field-numbers; `buf.validate` options dropped). **Auth:** +plaintext (dev) / TLS / OIDC **bearer**; SPIFFE/mTLS via a caller-supplied `ManagedChannel` (`fromChannel`). New +feature module `agents-kt-dir` (`agents_engine.agntcy.dir`) so the grpc/protobuf/netty graph stays out of core. +7 tests (in-process gRPC round trips across all three services). With this the AGNTCY epic is complete; the +only DIR remainders are `RoutingService.List` and OCI referrers. ### Added — OASF record import + validate: `fromOasfRecord()` (#4519, PRD §12.6) — AGNTCY interop diff --git a/README.md b/README.md index c0c80c2..4ed2e2b 100644 --- a/README.md +++ b/README.md @@ -209,7 +209,7 @@ These APIs work in `main`, are unit-tested, and are exercised by integration tes - **Web-grounded search tool (`perplexitySearch`)** — `tools { +perplexitySearchTool(perplexityKey) }` lets an agent reasoning on its *own* model (Claude/OpenAI/Ollama/…) fetch live, cited facts from Perplexity's Sonar API. The tool is `untrustedOutput = true`, so results are auto-wrapped in the `{"trusted":false}` envelope and the model is warned to treat them as data, not instructions (#642) — web search is the canonical prompt-injection vector. The result renders the answer plus a numbered source list parsed from `search_results[]` (citations land in both the model context and the JSONL audit row). Controls via `perplexitySearchOptions { mode = SearchMode.ACADEMIC; recency = SearchRecency.WEEK; allowDomains("arxiv.org"); contextSize = SearchContextSize.HIGH; structuredOutput(MyType::class) }` map to `search_mode` / `search_recency_filter` / `search_domain_filter` / `web_search_options` / `response_format` json_schema (#3674). Key from `.secrets/perplexity-key`. See [docs/providers.md](docs/providers.md#web-grounded-search-tool-perplexitysearch-3676--3677). - **NLWeb endpoint tool (`nlwebSearch`)** — `tools { +nlwebSearchTool(baseUrl = "https://example.com") }` lets an agent query an [NLWeb](https://github.com/nlweb-ai/NLWeb) endpoint — a website's natural-language interface over its **schema.org**-structured content — and fold the ranked, typed results into context (#4541, PRD §12.9). Like `perplexitySearch` it is `untrustedOutput = true` (fetched web content is treated as data, not instructions). `nlwebSearchOptions`-style args via `NlWebSearchOptions(site = "podcasts", mode = NlWebMode.GENERATE)`. NLWeb endpoints need no API key. (Every NLWeb endpoint is also an MCP server, so an NLWeb `/mcp` URL is equally consumable through the existing MCP client — this tool is the zero-wiring `/ask`-over-HTTP path.) - **Serve an NLWeb endpoint (`NlWebServer`)** — `NlWebServer.from(agent).start()` exposes the NLWeb `POST /ask` contract (`{query, site?, mode}` → ranked schema.org `results[]`), so agents.kt is consumable by NLWeb clients — the **serve** side to `nlwebSearch`'s **consume** side (#4542). Same `from(agent)` shape, loopback-only JDK-`HttpServer` posture, and threat model as `McpServer.from(agent)` / `A2AServer.from(agent)` (`127.0.0.1`, optional bearer, front with a gateway). The query is the agent's input; an `NlWebSearchResult` output is served verbatim (ranked schema.org results), any other output becomes the `summary` answer — back the agent's retrieval with the RAG `EmbeddingStore` seam (`:agents-kt-rag`) or whatever you like. agents.kt now serves the agentic web three ways: MCP, A2A, and NLWeb. -- **AGNTCY interop (OASF record + DIR directory + Identity badge)** — `agent.toOasfRecord(version, authors, locators)` exports an [AGNTCY](https://github.com/agntcy) [OASF](https://github.com/agntcy/oasf) 1.0.0 discovery record (the third exporter beside `agent.json` and the A2A AgentCard; skills carry taxonomy uids via the opt-in `.oasf("agent_orchestration/multi_agent_planning")` annotation against a vendored, drift-checked taxonomy), and `fromOasfRecord(json)` imports + fail-closed-validates it back (#4518/#4519, PRD §12.6). The `:agents-kt-dir` module publishes/discovers records in the AGNTCY **DIR** content-addressed directory: `DirClient.connect(host, port).use { dir -> val cid = dir.push(agent.toOasfRecord(...)); val json = dir.pull(cid) }` over generated grpc-kotlin `StoreService` stubs (#4520). The trust side ships in `:agents-kt-identity`: `IdentityVerifier.verify(compactJws, jwks)` validates an AGNTCY Identity **badge** (a JOSE/JWS-secured W3C Verifiable Credential) against an issuer's `/.well-known/jwks.json`, fail-closed via `nimbus-jose-jwt` (rejects `alg: none`, `HS*` algorithm-confusion, expiry, tamper, wrong/unknown key — #4521). Verify-only; issuance deferred. DIR Routing/Search + OCI referrers are follow-ups under epic #4517. +- **AGNTCY interop (OASF record + DIR directory + Identity badge)** — `agent.toOasfRecord(version, authors, locators)` exports an [AGNTCY](https://github.com/agntcy) [OASF](https://github.com/agntcy/oasf) 1.0.0 discovery record (the third exporter beside `agent.json` and the A2A AgentCard; skills carry taxonomy uids via the opt-in `.oasf("agent_orchestration/multi_agent_planning")` annotation against a vendored, drift-checked taxonomy), and `fromOasfRecord(json)` imports + fail-closed-validates it back (#4518/#4519, PRD §12.6). The `:agents-kt-dir` module publishes/discovers records in the AGNTCY **DIR** content-addressed directory over generated grpc-kotlin stubs for three services — `StoreService` (CRUD: `dir.push(agent.toOasfRecord(...))` → CID, `dir.pull(cid)`), `SearchService` (local content search by typed `DirQuery` facet — skill/domain/author/…), and `RoutingService` (`publish`/`routeSearch` for cross-peer network discovery) (#4520). The trust side ships in `:agents-kt-identity`: `IdentityVerifier.verify(compactJws, jwks)` validates an AGNTCY Identity **badge** (a JOSE/JWS-secured W3C Verifiable Credential) against an issuer's `/.well-known/jwks.json`, fail-closed via `nimbus-jose-jwt` (rejects `alg: none`, `HS*` algorithm-confusion, expiry, tamper, wrong/unknown key — #4521). Verify-only; issuance deferred. DIR Routing/Search + OCI referrers are follow-ups under epic #4517. - **Prompt caching across providers** — `agent { caching { enabled = true; cacheSystemPrompt = true; cacheToolDefs = true; cacheConversation = Rolling; ttl = 1.hours; cacheable("doc-id") { ... } } }`. Vendor-neutral DSL drives Anthropic's explicit `cache_control` breakpoints (#2658), OpenAI / DeepSeek automatic prefix caching with a stable `prompt_cache_key` routing hint (#2659 / #2661), Ollama / vLLM / SGLang engine-level KV-cache reuse (no-op hints, #2662), and surfaces cache reads + writes + hit-rate on `TokenUsage` (#2663). A prefix-stability guard (#2657) detects silent cache-busters — timestamps, UUIDs, non-deterministic ordering inside cacheable segments — and warns before you pay for a single non-cached run. Off by default; non-breaking. See [docs/caching.md](docs/caching.md). - **JSONL audit exporter** — `:agents-kt-observability` writes append-only, one-line-per-event audit rows with `requestId`, `sessionId`, `manifestHash`, agent/skill/tool ids, event type, provider, and model; raw arguments/results are omitted by default (#1914). See [docs/observability.md](docs/observability.md). - **ObservabilityBridge adapters** — `.observe(OtelBridge(tracer))` maps runtime events to OTel spans (#1908), `.observe(LangSmithBridge(apiKey, project))` maps the same events to LangSmith run trees (#1909), and `.observe(LangfuseBridge(publicKey, secretKey))` maps them to Langfuse traces, generations, spans, and events (#1910), while keeping core vendor-free. See [docs/observability.md](docs/observability.md). diff --git a/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirClient.kt b/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirClient.kt index 4151bc1..f6cc1ba 100644 --- a/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirClient.kt +++ b/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirClient.kt @@ -2,6 +2,14 @@ package agents_engine.agntcy.dir import agntcy.dir.core.v1.Record import agntcy.dir.core.v1.RecordRef +import agntcy.dir.routing.v1.PublishRequest +import agntcy.dir.routing.v1.RecordRefs +import agntcy.dir.routing.v1.RoutingServiceGrpcKt.RoutingServiceCoroutineStub +import agntcy.dir.routing.v1.SearchRequest +import agntcy.dir.routing.v1.UnpublishRequest +import agntcy.dir.search.v1.SearchCIDsRequest +import agntcy.dir.search.v1.SearchRecordsRequest +import agntcy.dir.search.v1.SearchServiceGrpcKt.SearchServiceCoroutineStub import agntcy.dir.store.v1.StoreServiceGrpcKt.StoreServiceCoroutineStub import io.grpc.ManagedChannel import io.grpc.ManagedChannelBuilder @@ -13,57 +21,115 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.single import kotlinx.coroutines.flow.singleOrNull import kotlinx.coroutines.flow.toList +import agntcy.dir.routing.v1.RecordQuery as RouteRecordQuery +import agntcy.dir.routing.v1.RecordQueryType as RouteRecordQueryType +import agntcy.dir.search.v1.RecordQuery as SearchRecordQuery +import agntcy.dir.search.v1.RecordQueryType as SearchRecordQueryType /** * `agents_engine/agntcy/dir/DirClient.kt` — #4520 (PRD §12.6). A typed Kotlin client for the AGNTCY - * [DIR](https://github.com/agntcy/dir) content-addressed directory `StoreService`, over generated - * grpc-kotlin coroutine stubs. The directory side of the AGNTCY epic (#4517): publish the OASF discovery - * record (`toOasfRecord`, #4518) and pull it back by CID. + * [DIR](https://github.com/agntcy/dir) content-addressed directory, over generated grpc-kotlin coroutine + * stubs. The directory pillar of the AGNTCY epic (#4517): publish the OASF discovery record + * (`toOasfRecord`, #4518) and discover records by content or across the network. * - * The grpc/protobuf dependency graph lives entirely in the `:agents-kt-dir` feature module — core stays - * free of it (same pattern as `:agents-kt-rag` / `:agents-kt-identity`). The stored record body is an opaque - * JSON object (the OASF record, carried as a `google.protobuf.Struct` — see [jsonToStruct]). + * Three services, one channel: + * - **StoreService** — content-addressed CRUD: [push]/[pushAll], [pull], [lookup], [delete]. + * - **SearchService** — local content search by OASF facet: [searchRecords], [searchCids]. + * - **RoutingService** — network publish + discovery: [publish]/[unpublish], [routeSearch]. + * + * The grpc/protobuf dependency graph lives entirely in `:agents-kt-dir` — core never sees it (the + * `:agents-kt-rag` / `:agents-kt-identity` pattern). The stored record body is opaque JSON (the OASF record, + * carried as a `google.protobuf.Struct` — see [jsonToStruct]). * * ```kotlin * DirClient.connect("localhost", 8888).use { dir -> - * val cid = dir.push(agent.toOasfRecord(version = "1.0.0")) // publish - * val json = dir.pull(cid) // discover by CID + * val cid = dir.push(agent.toOasfRecord(version = "1.0.0")) // store + * dir.publish(listOf(cid)) // announce to the network + * val hits = dir.searchRecords(listOf(DirQuery(DirQueryType.SKILL_NAME, "agent_orchestration/multi_agent_planning"))) * } * ``` * * **Auth.** Plaintext (dev) or TLS via [connect]; an OIDC **bearer** token is attached as an `authorization` - * header. SPIFFE/mTLS is supported by building your own [ManagedChannel] (with the SPIFFE transport creds) - * and passing it to [fromChannel] — the client doesn't bundle a SPIFFE provider. + * header. SPIFFE/mTLS is supported by building your own [ManagedChannel] and passing it to [fromChannel]. * - * Scope: the four content-addressable record RPCs (`Push`/`Pull`/`Lookup`/`Delete`). RoutingService / - * SearchService (network publish + discovery) and OCI referrers are follow-ups under epic #4517. + * Scope: StoreService (full), SearchService (full), RoutingService publish/unpublish/search. RoutingService + * `List` and OCI referrers are follow-ups under epic #4517. */ class DirClient private constructor( private val channel: ManagedChannel, - private val stub: StoreServiceCoroutineStub, + private val store: StoreServiceCoroutineStub, + private val routing: RoutingServiceCoroutineStub, + private val search: SearchServiceCoroutineStub, ) : AutoCloseable { + // --- StoreService: content-addressed CRUD --- + /** Publish one OASF record; returns its content identifier (CID). */ suspend fun push(oasfJson: String): String = - stub.push(flowOf(recordOf(oasfJson))).single().cid + store.push(flowOf(recordOf(oasfJson))).single().cid /** Publish several records in one stream; returns the CIDs in request order. */ suspend fun pushAll(oasfJsons: List): List = - stub.push(oasfJsons.asFlow().map { recordOf(it) }).map { it.cid }.toList() + store.push(oasfJsons.asFlow().map { recordOf(it) }).map { it.cid }.toList() /** Pull a record's OASF JSON by CID. */ suspend fun pull(cid: String): String = - structToJson(stub.pull(flowOf(refOf(cid))).single().data) + structToJson(store.pull(flowOf(refOf(cid))).single().data) /** Resolve a record's metadata by CID without pulling the payload, or null if absent. */ suspend fun lookup(cid: String): DirRecordMeta? = - stub.lookup(flowOf(refOf(cid))).singleOrNull()?.let { + store.lookup(flowOf(refOf(cid))).singleOrNull()?.let { DirRecordMeta(it.cid, it.annotationsMap, it.schemaVersion, it.createdAt) } /** Delete records by CID. */ suspend fun delete(vararg cids: String) { - stub.delete(cids.asList().asFlow().map { refOf(it) }) + store.delete(cids.asList().asFlow().map { refOf(it) }) + } + + // --- SearchService: local content search --- + + /** Search the local store by OASF facet; returns the matching records' OASF JSON. */ + suspend fun searchRecords(queries: List, limit: Int? = null, offset: Int? = null): List { + val req = SearchRecordsRequest.newBuilder() + .addAllQueries(queries.map { it.toSearchQuery() }) + .apply { limit?.let { setLimit(it) }; offset?.let { setOffset(it) } } + .build() + return search.searchRecords(req).map { structToJson(it.record.data) }.toList() + } + + /** Search the local store by OASF facet; returns the matching records' CIDs. */ + suspend fun searchCids(queries: List, limit: Int? = null, offset: Int? = null): List { + val req = SearchCIDsRequest.newBuilder() + .addAllQueries(queries.map { it.toSearchQuery() }) + .apply { limit?.let { setLimit(it) }; offset?.let { setOffset(it) } } + .build() + return search.searchCIDs(req).map { it.recordCid }.toList() + } + + // --- RoutingService: network publish + discovery --- + + /** Announce records (by CID) to the directory network so other peers can discover them. */ + suspend fun publish(cids: List) { + routing.publish(PublishRequest.newBuilder().setRecordRefs(recordRefsOf(cids)).build()) + } + + /** Retract previously published records (by CID) from the network. */ + suspend fun unpublish(cids: List) { + routing.unpublish(UnpublishRequest.newBuilder().setRecordRefs(recordRefsOf(cids)).build()) + } + + /** + * Discover records across the directory network. [queries] are the coarse routing facets + * (skill/locator/domain/module — see [toRouteQuery]); each hit carries its CID, the announcing peer, and a + * match score. [minMatchScore] filters weak matches; [limit] caps results. + */ + suspend fun routeSearch(queries: List, minMatchScore: Int? = null, limit: Int? = null): List { + val req = SearchRequest.newBuilder() + .addAllQueries(queries.map { it.toRouteQuery() }) + .apply { minMatchScore?.let { setMinMatchScore(it) }; limit?.let { setLimit(it) } } + .build() + return routing.search(req).map { DirRouteMatch(it.recordRef.cid, it.peer.id, it.matchScore) }.toList() } override fun close() { @@ -90,22 +156,50 @@ class DirClient private constructor( } /** - * Wrap an existing [channel] (the seam for custom transports — SPIFFE/mTLS, proxies, in-process - * test channels). [bearerToken], if set, is attached as an `authorization` header. + * Wrap an existing [channel] (the seam for custom transports — SPIFFE/mTLS, proxies, in-process test + * channels). [bearerToken], if set, is attached as an `authorization` header to every service. */ fun fromChannel(channel: ManagedChannel, bearerToken: String? = null): DirClient { - var stub = StoreServiceCoroutineStub(channel) - if (bearerToken != null) { + val interceptor = bearerToken?.let { val headers = Metadata().apply { - put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), "Bearer $bearerToken") + put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), "Bearer $it") } - stub = stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers)) + MetadataUtils.newAttachHeadersInterceptor(headers) } - return DirClient(channel, stub) + fun store() = StoreServiceCoroutineStub(channel).maybeAuth(interceptor) + fun routing() = RoutingServiceCoroutineStub(channel).maybeAuth(interceptor) + fun search() = SearchServiceCoroutineStub(channel).maybeAuth(interceptor) + return DirClient(channel, store(), routing(), search()) } + private fun > T.maybeAuth(i: io.grpc.ClientInterceptor?): T = + if (i == null) this else withInterceptors(i) + private fun recordOf(json: String): Record = Record.newBuilder().setData(jsonToStruct(json)).build() private fun refOf(cid: String): RecordRef = RecordRef.newBuilder().setCid(cid).build() + + private fun recordRefsOf(cids: List): RecordRefs = + RecordRefs.newBuilder().addAllRefs(cids.map { refOf(it) }).build() + + private fun DirQuery.toSearchQuery(): SearchRecordQuery = + SearchRecordQuery.newBuilder() + .setType(SearchRecordQueryType.valueOf("RECORD_QUERY_TYPE_${type.name}")) + .setValue(value) + .build() + + /** Map a [DirQuery] to the coarse routing facet; non-routable facets (NAME, VERSION, …) are rejected. */ + private fun DirQuery.toRouteQuery(): RouteRecordQuery { + val routeType = when (type) { + DirQueryType.SKILL_ID, DirQueryType.SKILL_NAME -> RouteRecordQueryType.RECORD_QUERY_TYPE_SKILL + DirQueryType.LOCATOR -> RouteRecordQueryType.RECORD_QUERY_TYPE_LOCATOR + DirQueryType.DOMAIN_ID, DirQueryType.DOMAIN_NAME -> RouteRecordQueryType.RECORD_QUERY_TYPE_DOMAIN + DirQueryType.MODULE_ID, DirQueryType.MODULE_NAME -> RouteRecordQueryType.RECORD_QUERY_TYPE_MODULE + else -> throw IllegalArgumentException( + "DirQueryType.$type is not routable; network routeSearch supports SKILL/LOCATOR/DOMAIN/MODULE facets", + ) + } + return RouteRecordQuery.newBuilder().setType(routeType).setValue(value).build() + } } } diff --git a/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirQuery.kt b/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirQuery.kt new file mode 100644 index 0000000..4e3cf3a --- /dev/null +++ b/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirQuery.kt @@ -0,0 +1,11 @@ +package agents_engine.agntcy.dir + +/** + * `agents_engine/agntcy/dir/DirQuery.kt` — #4520 (PRD §12.6). One DIR search predicate: match records whose + * [type] facet equals [value] (e.g. `DirQuery(DirQueryType.SKILL_NAME, "agent_orchestration/multi_agent_planning")`). + * Multiple queries passed together are AND-combined by the directory. + */ +data class DirQuery( + val type: DirQueryType, + val value: String, +) diff --git a/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirQueryType.kt b/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirQueryType.kt new file mode 100644 index 0000000..54beec0 --- /dev/null +++ b/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirQueryType.kt @@ -0,0 +1,26 @@ +package agents_engine.agntcy.dir + +/** + * `agents_engine/agntcy/dir/DirQueryType.kt` — #4520 (PRD §12.6). The facet a DIR search query matches on, + * mirroring the OASF record fields DIR indexes (`agntcy.dir.search.v1.RecordQueryType`). Used with + * [DirQuery] for `DirClient.searchRecords` / `searchCids` (local content search). The coarser network + * [DirClient.routeSearch] accepts the skill/locator/domain/module subset (others are rejected as + * not-routable). + */ +enum class DirQueryType { + NAME, + VERSION, + SKILL_ID, + SKILL_NAME, + LOCATOR, + MODULE_NAME, + DOMAIN_ID, + DOMAIN_NAME, + CREATED_AT, + AUTHOR, + SCHEMA_VERSION, + MODULE_ID, + VERIFIED, + TRUSTED, + ANNOTATION, +} diff --git a/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirRouteMatch.kt b/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirRouteMatch.kt new file mode 100644 index 0000000..e14f368 --- /dev/null +++ b/agents-kt-dir/src/main/kotlin/agents_engine/agntcy/dir/DirRouteMatch.kt @@ -0,0 +1,12 @@ +package agents_engine.agntcy.dir + +/** + * `agents_engine/agntcy/dir/DirRouteMatch.kt` — #4520 (PRD §12.6). A hit from a network [DirClient.routeSearch]: + * the matching record's [cid], the [peerId] of the DIR peer announcing it (empty for a local match), and the + * directory's [matchScore] (how many of the supplied queries it satisfied). + */ +data class DirRouteMatch( + val cid: String, + val peerId: String, + val matchScore: Int, +) diff --git a/agents-kt-dir/src/main/proto/agntcy/dir/routing/v1/peer.proto b/agents-kt-dir/src/main/proto/agntcy/dir/routing/v1/peer.proto new file mode 100644 index 0000000..99b8ef6 --- /dev/null +++ b/agents-kt-dir/src/main/proto/agntcy/dir/routing/v1/peer.proto @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: Apache-2.0 +// #4520 — VENDORED from agntcy/dir routing/v1/peer.proto (faithful; java_* options added). +syntax = "proto3"; + +package agntcy.dir.routing.v1; + +option java_package = "agntcy.dir.routing.v1"; +option java_multiple_files = true; + +message Peer { + string id = 1; + repeated string addrs = 2; + map annotations = 3; + PeerConnectionType connection = 4; +} + +enum PeerConnectionType { + PEER_CONNECTION_TYPE_NOT_CONNECTED = 0; + PEER_CONNECTION_TYPE_CONNECTED = 1; + PEER_CONNECTION_TYPE_CAN_CONNECT = 2; + PEER_CONNECTION_TYPE_CANNOT_CONNECT = 3; +} diff --git a/agents-kt-dir/src/main/proto/agntcy/dir/routing/v1/record_query.proto b/agents-kt-dir/src/main/proto/agntcy/dir/routing/v1/record_query.proto new file mode 100644 index 0000000..9fabd4c --- /dev/null +++ b/agents-kt-dir/src/main/proto/agntcy/dir/routing/v1/record_query.proto @@ -0,0 +1,21 @@ +// SPDX-License-Identifier: Apache-2.0 +// #4520 — VENDORED from agntcy/dir routing/v1/record_query.proto (faithful; java_* options added). Coarse network-routing query. +syntax = "proto3"; + +package agntcy.dir.routing.v1; + +option java_package = "agntcy.dir.routing.v1"; +option java_multiple_files = true; + +message RecordQuery { + RecordQueryType type = 1; + string value = 2; +} + +enum RecordQueryType { + RECORD_QUERY_TYPE_UNSPECIFIED = 0; + RECORD_QUERY_TYPE_SKILL = 1; + RECORD_QUERY_TYPE_LOCATOR = 2; + RECORD_QUERY_TYPE_DOMAIN = 3; + RECORD_QUERY_TYPE_MODULE = 4; +} diff --git a/agents-kt-dir/src/main/proto/agntcy/dir/routing/v1/routing_service.proto b/agents-kt-dir/src/main/proto/agntcy/dir/routing/v1/routing_service.proto new file mode 100644 index 0000000..21cc88c --- /dev/null +++ b/agents-kt-dir/src/main/proto/agntcy/dir/routing/v1/routing_service.proto @@ -0,0 +1,66 @@ +// SPDX-License-Identifier: Apache-2.0 +// #4520 — VENDORED from agntcy/dir routing/v1/routing_service.proto (faithful; java_* options added). Network publish + discovery. +syntax = "proto3"; + +package agntcy.dir.routing.v1; + +import "agntcy/dir/core/v1/record.proto"; +import "agntcy/dir/routing/v1/peer.proto"; +import "agntcy/dir/routing/v1/record_query.proto"; +import "agntcy/dir/search/v1/record_query.proto"; +import "google/protobuf/empty.proto"; + +option java_package = "agntcy.dir.routing.v1"; +option java_multiple_files = true; + +service RoutingService { + rpc Publish(PublishRequest) returns (google.protobuf.Empty); + rpc Unpublish(UnpublishRequest) returns (google.protobuf.Empty); + rpc Search(SearchRequest) returns (stream SearchResponse); + rpc List(ListRequest) returns (stream ListResponse); +} + +message PublishRequest { + oneof request { + RecordRefs record_refs = 1; + RecordQueries queries = 2; + } +} + +message UnpublishRequest { + oneof request { + RecordRefs record_refs = 1; + RecordQueries queries = 2; + } +} + +message RecordRefs { + repeated core.v1.RecordRef refs = 1; +} + +message RecordQueries { + repeated search.v1.RecordQuery queries = 1; +} + +message SearchRequest { + repeated RecordQuery queries = 1; + optional uint32 min_match_score = 2; + optional uint32 limit = 3; +} + +message SearchResponse { + core.v1.RecordRef record_ref = 1; + Peer peer = 2; + repeated RecordQuery match_queries = 3; + uint32 match_score = 4; +} + +message ListRequest { + repeated RecordQuery queries = 1; + optional uint32 limit = 2; +} + +message ListResponse { + core.v1.RecordRef record_ref = 1; + repeated string labels = 2; +} diff --git a/agents-kt-dir/src/main/proto/agntcy/dir/search/v1/record_query.proto b/agents-kt-dir/src/main/proto/agntcy/dir/search/v1/record_query.proto new file mode 100644 index 0000000..b2083d6 --- /dev/null +++ b/agents-kt-dir/src/main/proto/agntcy/dir/search/v1/record_query.proto @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: Apache-2.0 +// #4520 — VENDORED from agntcy/dir search/v1/record_query.proto (faithful: no buf/validate). java_* options added for codegen. +syntax = "proto3"; + +package agntcy.dir.search.v1; + +option java_package = "agntcy.dir.search.v1"; +option java_multiple_files = true; + +message RecordQuery { + RecordQueryType type = 1; + string value = 2; +} + +enum RecordQueryType { + RECORD_QUERY_TYPE_UNSPECIFIED = 0; + RECORD_QUERY_TYPE_NAME = 1; + RECORD_QUERY_TYPE_VERSION = 2; + RECORD_QUERY_TYPE_SKILL_ID = 3; + RECORD_QUERY_TYPE_SKILL_NAME = 4; + RECORD_QUERY_TYPE_LOCATOR = 5; + RECORD_QUERY_TYPE_MODULE_NAME = 6; + RECORD_QUERY_TYPE_DOMAIN_ID = 7; + RECORD_QUERY_TYPE_DOMAIN_NAME = 8; + RECORD_QUERY_TYPE_CREATED_AT = 9; + RECORD_QUERY_TYPE_AUTHOR = 10; + RECORD_QUERY_TYPE_SCHEMA_VERSION = 11; + RECORD_QUERY_TYPE_MODULE_ID = 12; + RECORD_QUERY_TYPE_VERIFIED = 13; + RECORD_QUERY_TYPE_TRUSTED = 14; + RECORD_QUERY_TYPE_ANNOTATION = 15; +} diff --git a/agents-kt-dir/src/main/proto/agntcy/dir/search/v1/search_service.proto b/agents-kt-dir/src/main/proto/agntcy/dir/search/v1/search_service.proto new file mode 100644 index 0000000..4375e9e --- /dev/null +++ b/agents-kt-dir/src/main/proto/agntcy/dir/search/v1/search_service.proto @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: Apache-2.0 +// #4520 — VENDORED from agntcy/dir search/v1/search_service.proto (faithful; java_* options added). Local content search. +syntax = "proto3"; + +package agntcy.dir.search.v1; + +import "agntcy/dir/core/v1/record.proto"; +import "agntcy/dir/search/v1/record_query.proto"; + +option java_package = "agntcy.dir.search.v1"; +option java_multiple_files = true; + +service SearchService { + rpc SearchCIDs(SearchCIDsRequest) returns (stream SearchCIDsResponse); + rpc SearchRecords(SearchRecordsRequest) returns (stream SearchRecordsResponse); +} + +message SearchCIDsRequest { + repeated RecordQuery queries = 1; + optional uint32 limit = 2; + optional uint32 offset = 3; +} + +message SearchRecordsRequest { + repeated RecordQuery queries = 1; + optional uint32 limit = 2; + optional uint32 offset = 3; +} + +message SearchCIDsResponse { + string record_cid = 1; +} + +message SearchRecordsResponse { + agntcy.dir.core.v1.Record record = 1; +} diff --git a/agents-kt-dir/src/test/kotlin/agents_engine/agntcy/dir/DirClientTest.kt b/agents-kt-dir/src/test/kotlin/agents_engine/agntcy/dir/DirClientTest.kt index a34bd6a..a2131a2 100644 --- a/agents-kt-dir/src/test/kotlin/agents_engine/agntcy/dir/DirClientTest.kt +++ b/agents-kt-dir/src/test/kotlin/agents_engine/agntcy/dir/DirClientTest.kt @@ -3,6 +3,17 @@ package agents_engine.agntcy.dir import agntcy.dir.core.v1.Record import agntcy.dir.core.v1.RecordMeta import agntcy.dir.core.v1.RecordRef +import agntcy.dir.routing.v1.Peer +import agntcy.dir.routing.v1.PublishRequest +import agntcy.dir.routing.v1.RoutingServiceGrpcKt.RoutingServiceCoroutineImplBase +import agntcy.dir.routing.v1.SearchRequest +import agntcy.dir.routing.v1.SearchResponse +import agntcy.dir.routing.v1.UnpublishRequest +import agntcy.dir.search.v1.SearchCIDsRequest +import agntcy.dir.search.v1.SearchCIDsResponse +import agntcy.dir.search.v1.SearchRecordsRequest +import agntcy.dir.search.v1.SearchRecordsResponse +import agntcy.dir.search.v1.SearchServiceGrpcKt.SearchServiceCoroutineImplBase import agntcy.dir.store.v1.StoreServiceGrpcKt.StoreServiceCoroutineImplBase import com.google.protobuf.Empty import com.google.protobuf.Struct @@ -20,12 +31,11 @@ import kotlin.test.assertFailsWith import kotlin.test.assertNull import kotlin.test.assertTrue -// #4520 (PRD §12.6) — AGNTCY DIR client. Hermetic: a fake StoreService over an in-process gRPC channel -// (no network, no real DIR daemon). Proves the generated coroutine stubs + the OASF-JSON <-> Struct mapping -// round-trip, that content is addressed by the returned CID, and that delete makes a record unresolvable. +// #4520 (PRD §12.6) — AGNTCY DIR client. Hermetic: fake Store/Search/Routing services over an in-process +// gRPC channel (no network, no daemon). Proves the generated coroutine stubs, the OASF-JSON <-> Struct +// mapping, content addressing, and the typed search/routing query mapping. class DirClientTest { - // Minimal in-memory content-addressed store: CID = a deterministic hash of the record payload. private class FakeStore : StoreServiceCoroutineImplBase() { private val store = ConcurrentHashMap() @@ -58,9 +68,53 @@ class DirClientTest { } } + // Echoes the first query back as a record, so the test can assert the typed query mapping reached the wire. + private class FakeSearch : SearchServiceCoroutineImplBase() { + override fun searchRecords(request: SearchRecordsRequest): Flow = flow { + request.queriesList.firstOrNull()?.let { q -> + val data = jsonToStruct("""{"matched":"${q.value}","type":"${q.type.name}"}""") + emit(SearchRecordsResponse.newBuilder().setRecord(Record.newBuilder().setData(data)).build()) + } + } + + override fun searchCIDs(request: SearchCIDsRequest): Flow = flow { + request.queriesList.forEach { + emit(SearchCIDsResponse.newBuilder().setRecordCid("cid-for-${it.value}").build()) + } + } + } + + private class FakeRouting : RoutingServiceCoroutineImplBase() { + private val published = ConcurrentHashMap.newKeySet() + + override suspend fun publish(request: PublishRequest): Empty { + published.addAll(request.recordRefs.refsList.map { it.cid }) + return Empty.getDefaultInstance() + } + + override suspend fun unpublish(request: UnpublishRequest): Empty { + published.removeAll(request.recordRefs.refsList.map { it.cid }.toSet()) + return Empty.getDefaultInstance() + } + + override fun search(request: SearchRequest): Flow = flow { + published.forEach { cid -> + emit( + SearchResponse.newBuilder() + .setRecordRef(RecordRef.newBuilder().setCid(cid)) + .setPeer(Peer.newBuilder().setId("peer-1")) + .setMatchScore(request.queriesCount) + .build(), + ) + } + } + } + private fun startServer(): Pair Unit> { val name = InProcessServerBuilder.generateName() - val server = InProcessServerBuilder.forName(name).directExecutor().addService(FakeStore()).build().start() + val server = InProcessServerBuilder.forName(name).directExecutor() + .addService(FakeStore()).addService(FakeSearch()).addService(FakeRouting()) + .build().start() val channel = InProcessChannelBuilder.forName(name).directExecutor().build() val client = DirClient.fromChannel(channel) return client to { client.close(); server.shutdownNow() } @@ -75,9 +129,7 @@ class DirClientTest { try { val cid = client.push(recordJson) assertTrue(cid.startsWith("cid-"), cid) - val pulled = client.pull(cid) - // Struct is unordered + numbers are doubles; compare re-parsed content, not bytes. - assertEquals(jsonToStruct(recordJson), jsonToStruct(pulled)) + assertEquals(jsonToStruct(recordJson), jsonToStruct(client.pull(cid))) } finally { stop() } @@ -87,8 +139,7 @@ class DirClientTest { fun `integral ids survive the Struct round-trip`() = runTest { val (client, stop) = startServer() try { - val pulled = client.pull(client.push(recordJson)) - assertTrue("\"id\":1003" in pulled, "id must stay integral (not 1003.0): $pulled") + assertTrue("\"id\":1003" in client.pull(client.push(recordJson)), "id must stay integral") } finally { stop() } @@ -109,15 +160,56 @@ class DirClientTest { } @Test - fun `pushAll returns CIDs in request order`() = runTest { + fun `searchRecords maps the typed query to the wire and returns records`() = runTest { + val (client, stop) = startServer() + try { + val results = client.searchRecords( + listOf(DirQuery(DirQueryType.SKILL_NAME, "agent_orchestration/multi_agent_planning")), + ) + val only = results.single() + assertTrue("agent_orchestration/multi_agent_planning" in only, only) + assertTrue("RECORD_QUERY_TYPE_SKILL_NAME" in only, only) // DirQueryType -> proto enum mapping + } finally { + stop() + } + } + + @Test + fun `searchCids returns a CID per query`() = runTest { + val (client, stop) = startServer() + try { + val cids = client.searchCids(listOf(DirQuery(DirQueryType.DOMAIN_NAME, "legal")), limit = 10) + assertEquals(listOf("cid-for-legal"), cids) + } finally { + stop() + } + } + + @Test + fun `publish then routeSearch finds the record across the network and unpublish removes it`() = runTest { val (client, stop) = startServer() try { - val a = """{"name":"a","schema_version":"1.0.0"}""" - val b = """{"name":"b","schema_version":"1.0.0"}""" - val cids = client.pushAll(listOf(a, b)) - assertEquals(2, cids.size) - assertEquals(jsonToStruct(a), jsonToStruct(client.pull(cids[0]))) - assertEquals(jsonToStruct(b), jsonToStruct(client.pull(cids[1]))) + client.publish(listOf("cid-abc")) + val hits = client.routeSearch(listOf(DirQuery(DirQueryType.SKILL_NAME, "plan"))) + val hit = hits.single() + assertEquals("cid-abc", hit.cid) + assertEquals("peer-1", hit.peerId) + assertEquals(1, hit.matchScore) + + client.unpublish(listOf("cid-abc")) + assertTrue(client.routeSearch(listOf(DirQuery(DirQueryType.SKILL_NAME, "plan"))).isEmpty()) + } finally { + stop() + } + } + + @Test + fun `routeSearch rejects a non-routable facet`() = runTest { + val (client, stop) = startServer() + try { + assertFailsWith { + client.routeSearch(listOf(DirQuery(DirQueryType.SCHEMA_VERSION, "1.0.0"))) + } } finally { stop() } diff --git a/docs/prd.md b/docs/prd.md index bd2df53..bb91c48 100644 --- a/docs/prd.md +++ b/docs/prd.md @@ -2855,7 +2855,7 @@ Any node in the delegation tree can be exported as an A2A endpoint: project.toAgentCard(url = "https://api.deep-code.ai/agents/project") ``` -### 12.6 AGNTCY Interoperability *(core shipped — OASF export/import + Identity-verify + DIR StoreService client; DIR Routing/Search + OCI referrers are follow-ups)* +### 12.6 AGNTCY Interoperability *(shipped — OASF export/import + Identity-verify + DIR Store/Search/Routing client; only DIR `List` + OCI referrers remain)* [AGNTCY](https://github.com/agntcy) — the Linux Foundation "Internet of Agents" collective (Cisco/Outshift-led) — is the second cross-vendor interop stack alongside Google A2A (§12.5). Agents.KT targets **both**: A2A is the wire/invocation standard; AGNTCY adds a content-addressed **directory** and a **trust** layer. The native, typed `agent.json` (§12.2) stays the source of truth; AGNTCY support is a set of **exporters/clients over it**, exactly parallel to `toAgentCard()`. @@ -2886,7 +2886,7 @@ val record = specMaster.toOasfRecord( The one real engineering cost is the **skills/domains taxonomy**: OASF skills are not free text — each is `{name: "agent_orchestration/multi_agent_planning", id: 1003}`. Note the `id` is **not** a single digit-concatenation formula as first assumed — the uids are *explicitly assigned per node* (top-level categories are multiples of 100, but level-2 is `category + n` while level-3 is `level2*100 + nn`), so the correct mechanism is a **vendored `path → uid` lookup**, not a computation. Plan: **vendor** the `schema/skills` + `schema/domains` trees (resource TSVs) and resolve IDs by lookup, with the hosted schema server (`schema.oasf.outshift.com/api/skills`) as a build-time validation cross-check. Free-form agent skills map via an opt-in `.oasf("agent_orchestration/multi_agent_planning")` annotation on the skill; un-annotated (and unknown-path) skills are omitted from the OASF `skills[]` with a logged warning (they remain in the free-form `agent.json`). The complete taxonomy is vendored (122 skills, 181 domains) directly from the hosted schema, and `OasfTaxonomyCrossCheckTest` (a `live-cloud-api` test, self-skips offline) asserts the vendored TSVs stay equal to the hosted schema so they can't silently drift. Record **signing** (Sigstore/cosign over OCI) is external to the record JSON — a later optional integration, not part of the serializer. -**DIR client *(StoreService shipped, #4520)*.** `DirClient` (module `:agents-kt-dir`) wraps generated grpc-kotlin coroutine stubs for `StoreService.{Push,Pull,Lookup,Delete}` (CID-addressed): `dir.push(agent.toOasfRecord(...))` → CID, `dir.pull(cid)` → JSON. DIR carries our OASF record as an opaque `google.protobuf.Struct`, so the JSON is enough — no OASF protos required (we vendor a trimmed, wire-compatible subset of the store/record protos, skipping the `buf/validate` closure). Auth is layered and optional: plaintext (dev) / TLS / OIDC bearer built in, SPIFFE/mTLS via a caller-supplied `ManagedChannel`. Targets self-hosted (`localhost:8888`) and the hosted network. **Follow-ups:** `RoutingService`/`SearchService` (network publish + DHT discovery) and OCI referrers — the content-addressable store client is the foundation they build on. +**DIR client *(shipped, #4520)*.** `DirClient` (module `:agents-kt-dir`) wraps generated grpc-kotlin coroutine stubs for three DIR services: **StoreService** `{Push,Pull,Lookup,Delete}` (CID-addressed CRUD — `dir.push(agent.toOasfRecord(...))` → CID, `dir.pull(cid)` → JSON), **SearchService** `{SearchRecords,SearchCIDs}` (local content search by typed `DirQuery` facet — skill/domain/author/…), and **RoutingService** `{Publish,Unpublish,Search}` (network publish + cross-peer discovery → `DirRouteMatch`). DIR carries our OASF record as an opaque `google.protobuf.Struct`, so the JSON is enough — no OASF protos required (we vendor a trimmed, wire-compatible subset, skipping the `buf/validate` closure). Auth is layered and optional: plaintext (dev) / TLS / OIDC bearer built in, SPIFFE/mTLS via a caller-supplied `ManagedChannel`. Targets self-hosted (`localhost:8888`) and the hosted network. **Remaining:** `RoutingService.List` and OCI referrers. ```kotlin val dir = AgntcyDirectory.connect("localhost:8888") // or hosted, with auth diff --git a/src/main/resources/internals-agent/agntcy/dir/DirClient.md b/src/main/resources/internals-agent/agntcy/dir/DirClient.md index f120a6f..d63c63d 100644 --- a/src/main/resources/internals-agent/agntcy/dir/DirClient.md +++ b/src/main/resources/internals-agent/agntcy/dir/DirClient.md @@ -44,11 +44,22 @@ whole numbers without `.0`, so `{"id":1003}` survives as `1003` (tested). `ManagedChannel` with the SPIFFE transport creds and passing it to `fromChannel()` — the module doesn't bundle a SPIFFE provider. `fromChannel` is also the seam in-process test channels use. +## Three services, one channel + +`DirClient` wraps coroutine stubs for all three DIR services over a single `ManagedChannel` (bearer interceptor +applied to each): +- **StoreService** — `push`/`pushAll`/`pull`/`lookup`/`delete` (content-addressed CRUD). +- **SearchService** — `searchRecords`/`searchCids` by `DirQuery(DirQueryType, value)`; `DirQueryType` mirrors + `search.v1.RecordQueryType` and maps by name (`RECORD_QUERY_TYPE_`). +- **RoutingService** — `publish`/`unpublish` (by CID), `routeSearch` → `DirRouteMatch`. Routing facets are + coarse (skill/locator/domain/module); `DirQuery.toRouteQuery()` maps the rich `DirQueryType` down and + **rejects** non-routable facets (NAME/VERSION/…) with `IllegalArgumentException`. + ## Scope / follow-ups (epic #4517) -Slice = the four content-addressable record RPCs (`Push`/`Pull`/`Lookup`/`Delete`). RoutingService / -SearchService (network publish + DHT discovery) and OCI referrers are the documented next steps. With this, -the AGNTCY epic's core (OASF export/import + Identity-verify + DIR store) is complete. +Store + Search + Routing(publish/unpublish/search) are shipped — the AGNTCY epic is complete. The only DIR +remainders are `RoutingService.List` (list published records) and OCI **referrers** (signatures etc. attached +to records); both are thin additions on this foundation. ## Files