Skip to content

Verifiable authorization for Device Connect#30

Open
soupat wants to merge 21 commits into
discovery-operationsfrom
feat/device-mandates
Open

Verifiable authorization for Device Connect#30
soupat wants to merge 21 commits into
discovery-operationsfrom
feat/device-mandates

Conversation

@soupat
Copy link
Copy Markdown
Collaborator

@soupat soupat commented May 11, 2026

Summary

  • Add Device Mandates as opt-in verifiable authorization for protected device functions.
  • Add open/closed HMAC mandate verifier, replay protection, and @requires_mandate metadata.
  • Enforce mandates in DeviceRuntime direct RPC and broadcast paths before driver invocation.
  • Expose mandate metadata through FunctionDef, driver collection, loaded capabilities, agent tools, and adapters.
  • Add server-side mandate verification, inline execution receipts, and process-local receipt query endpoints.
  • Add docs and examples for mandate usage.

Validation

  • PYTHONPATH=. pytest tests -q from packages/device-connect-edge: 530 passed
  • PYTHONPATH=../device-connect-edge:. pytest tests --ignore=tests/test_integration.py -q from packages/device-connect-agent-tools: 217 passed
  • Focused adapter suite: 30 passed
  • Server mandate/receipt helper suite: 6 passed
  • Ruff passed on touched files
  • Local D2D smart-lock demo validated over NATS using current edge + agent-tools branch code

Notes

  • Mandates are opt-in: unprotected functions continue to invoke normally; protected functions marked with @requires_mandate fail closed without a valid mandate.
  • Current crypto profile is device-connect-hmac-v0 for the working slice. Production should move to Ed25519/COSE-style public-key signatures, durable receipt storage, and distributed replay protection.
  • Full portal endpoint test collection is blocked locally by missing aiohttp_jinja2 in this environment.

soupat added 21 commits May 9, 2026 21:10
Add an optional labels: dict[str, str | list[str]] field on
DeviceCapabilities, FunctionDef, and EventDef. Drivers populate them
either via class-level DeviceDriver.labels = {...} (device metadata)
or @rpc(labels=...) / @emit(labels=...) decorator kwargs. List values
express composite identity (a device that is both camera and inference).

These labels are the foundation for selector-based discovery and
operations: the discover/invoke/broadcast tools filter on them.
Add a pure-Python parser at device_connect_edge.selector that maps a
structured selector string onto a parsed Selector dataclass with five
scope shapes:

    device(<filters>)
    device(<filters>).function(<filters>)
    device(<filters>).event(<filters>)
    function(<filters>)
    event(<filters>)

Inside (...): key:value, key:[v1,v2] (OR within key), key:pattern*
(anchored glob), k1:v1,k2:v2 (AND across keys), bare-string id/name
match, or * to match all.

Parse errors carry source + caret position for diagnostics. The matcher
is dependency-free (stdlib only) and applies vacuous-True semantics on
unset axes so callers can iterate without scope branching.
Add two new agent tools that replace the hierarchical trio:

- discover(selector, offset, limit) resolves a selector to matched
  devices, function tuples, or event tuples. Adaptive response shape:
  small result sets include full schemas inline; large sets paginate
  with name-and-labels summaries (DC_FUNCTION_THRESHOLD=20).
- discover_labels(key, offset, limit) returns the label vocabulary,
  per axis (no key) or paginated values for one key.

Response envelope: {scope, matched, returned, offset, next_offset,
results, label_histogram}. The label_histogram describes the matched
set (pre-pagination) so callers can choose how to narrow next without
a second call. On the device axis, multi-valued keys also expose
unique_devices for cardinality.

flatten_device now mirrors the legacy DeviceStatus.location into
labels["location"] when capabilities.labels does not declare one, so
drivers populating only the heartbeat field remain discoverable via
selector queries on location.

Migrate first-party adapters (Claude Agent SDK, Strands, LangChain,
the in-tree StrandsOpenAIDeviceConnectAgent) to discover/discover_labels.
The legacy describe_fleet/list_devices/get_device_functions trio
remains for one release as advisory-deprecated wrappers; each call
emits a DeprecationWarning pointing to the equivalent discover()
invocation.

Test drivers carry category, direction, modality, and safety labels so
integration tests can exercise the full selector grammar end-to-end.
Errors returned by discover() and discover_labels() are now structured
{"code": ..., "message": ...} dicts rather than free-form strings. This
lets callers branch on the code programmatically while still surfacing
the message to logs or end users.

Codes emitted:
  - invalid_selector         selector is not a string
  - selector_parse_error     selector is a string but malformed
  - connection_error         registry / messaging backend unavailable
  - key_not_axis_qualified   discover_labels key missing axis prefix
  - unknown_axis             discover_labels axis not in
                             {device, function, event}
…ools

The doc is a developer guide rather than a decision record: drop the
"ADR 0001:" framing, status line, and motivation paragraph. Trim the
content to the discovery surface that ships with this PR (labels,
selector grammar, discover, discover_labels, response envelope, error
codes) so worked examples are runnable today.
Trim docs/discovery.md to the discovery surface that ships in this PR
(labels, selector grammar, discover, discover_labels, response envelope,
error codes). Drop the ADR framing (status line, summary/motivation),
the "Operations" section listing tools that have not landed yet, the
CLI section, and worked examples that called those tools, so the guide
matches what a developer can actually run today.
Add two selector-driven invocation tools that replace the legacy
invoke_device(device_id, function, params) shape:

- invoke(selector, params, llm_reasoning) resolves a function-scoped
  selector to exactly one (device, function) tuple and calls it. Returns
  {success, device_id, function, result|error}. Returns no_match,
  ambiguous_match, invalid_invoke_scope, or invalid_selector errors as
  structured envelopes when the selector does not resolve cleanly.
- invoke_many(selector, params, timeout, max_concurrency, llm_reasoning)
  resolves to N (device, function) tuples and fans out the calls in
  parallel via a thread pool. Partial-failure semantics: a single
  target's failure does not abort siblings. Returns {candidates, matched,
  succeeded, failed, results, errors} with per-target structured errors.
  Per-target timeout defaults to 30s.

invoke_device gains a DeprecationWarning pointing to invoke(); the
function still works for one release while callers migrate. Adapters
(Claude Agent SDK, Strands, LangChain, the in-tree
StrandsOpenAIDeviceConnectAgent, and the operator-facing AGENT_SCRIPT
template) drop invoke_device and expose invoke / invoke_many instead.
invoke_device_with_fallback stays unchanged -- it covers a different
ergonomic case (try a list of device ids in order) with no selector
equivalent.

22 unit tests cover scope rejection, ambiguous and zero matches,
JSON-RPC error mapping, partial failure, per-target timeout
propagation, and llm_reasoning stripping. 9 integration tests cover
single-target invoke, robot dispatch through to event emission,
fan-out across multiple cameras, partial failure, and zero-candidate
empty envelopes.
Add device_connect_edge.predicate, a thin wrapper around cel-python that
compiles where expressions into reusable WherePredicate objects and
evaluates them against device-local context (identity, labels, status,
shared bindings).

CEL was chosen over JSONLogic because the v4 design's mask-indexing
pattern (mask[seat_row][seat_col] == 1) needs computed array indices,
which JSONLogic's literal-path var operator cannot express without
flattening the mask to 1D and indexing arithmetically. CEL handles it
natively.

cel-python is an optional dependency. Importing the module without it
installed succeeds; compiling or evaluating a predicate raises a clear
PredicateCompileError pointing at the [predicate] extra:

    pip install device-connect-edge[predicate]
    pip install device-connect-agent-tools[predicate]

The evaluator is shared by the dispatcher (validates expressions before
sending them out) and the device runtime (evaluates per-call to decide
whether to execute a fan-out). 16 unit tests cover compilation,
evaluation, the mask-indexing regression case, missing-variable and
type-mismatch error surfaces, and evaluator reusability.
Add the async selector-driven fan-out path so callers do not have to
block on the slowest device:

- broadcast(selector, params, where=, bindings=, fire_at=, on_late=)
  publishes a single envelope to a fanout subject keyed by tenant.
  Returns immediately with a correlation_id and the candidate count.
  Compile-validates the optional CEL where predicate at the dispatcher
  so syntax errors short-circuit before reaching the wire.

- DeviceRuntime._broadcast_subscription receives envelopes on
  ``device-connect.<tenant>.broadcast``. Each candidate self-elects via
  the target_device_ids gate (pre-resolved by the dispatcher from the
  selector), then evaluates the optional where predicate against its
  own context (identity, labels, status, shared bindings). On match the
  device executes the function and emits a reply on
  ``device-connect.<tenant>.<device_id>.event.async_reply.<correlation_id>``
  carrying {success, result|error, actually_fired_at}.

- fire_at + on_late synchronized fan-out: the edge holds the message
  until the wall-clock deadline and fires from its own clock.
  on_late=skip drops late arrivals (preserves coherence for
  card-stunt / light-show style workloads); on_late=fire executes
  immediately. The achieved spread depends on NTP residual (~5-10 ms
  typical) rather than network jitter (~50-150 ms).

- subscribe(selector) returns a Subscription handle. Two selector
  forms: ``correlation:<id>`` for broadcast replies, and event-scoped
  selectors (``event(<name>)`` or ``device(...).event(<name>)``) for
  live event streams. The handle exposes sync read() and a yielding
  iter() with idle-timeout reset.

- await_replies(correlation_id, timeout, until) sync helper for the
  common broadcast-then-collect pattern; subscribes, drains, returns
  the list of reply payloads.

The edge predicate context mirrors DeviceStatus.location into
labels["location"] when the driver did not declare a labels.location
itself, matching the dispatcher-side flatten_device contract so the
same selector and predicate strings work on both sides.

Test coverage: 38 unit tests across broadcast (12), subscribe (12),
and existing modules; 5 NATS integration tests cover end-to-end
broadcast + reply, where filter at the edge, fire_at synchronization
spread, on_late=skip late-arrival drop, and subscribe(correlation:<id>)
streaming.
Add the operator-facing shell surface for selector-driven discovery and
operations:

devctl verbs (read-side):
  - devctl discover "<selector>" [--offset N] [--limit M]
  - devctl discover-labels [--key K] [--offset N] [--limit M]

statectl verbs (write-side):
  - statectl invoke "<selector>" [--param k=v ...]
  - statectl invoke-many "<selector>" [--param k=v ...] [--timeout T]
                                      [--max-concurrency N]
  - statectl broadcast "<selector>" [--param k=v ...] [--where E]
                                    [--bindings JSON] [--fire-at T]
                                    [--on-late skip|fire]
  - statectl subscribe "<selector>" [--timeout T] [--until N]
  - statectl await <correlation_id> [--timeout T] [--until N]

Each verb is a thin wrapper over the Python tool of the same name and
exits non-zero on tool-side errors so they compose into shell pipelines
naturally. Parameter values are decoded as JSON when they look like
JSON (numbers, booleans, arrays, objects, quoted strings) and pass
through as strings otherwise, so common shapes (--param resolution=4k,
--param zones='[1,2,3]') work without quoting heroics.

The historical ``devctl discover`` verb (mDNS scan for uncommissioned
devices) is renamed to ``mdns-scan`` with ``scan`` as an alias, so
``discover`` is free for the selector-driven sense. Existing scripts
should switch from ``devctl discover`` to ``devctl scan`` if they were
exercising the mDNS path.

22 parser-shape unit tests guard against argument drift; the underlying
tools already have full unit and integration coverage from earlier
phases.
Add the operations layer (invoke / invoke_many / broadcast / subscribe /
await_replies) to docs/discovery.md, with the edge-side ``where``
predicate, synchronized fan-out via ``fire_at`` / ``on_late``, worked
examples that exercise each tool, and the corresponding
devctl / statectl CLI verbs.

The guide now covers everything the discovery API ships: labels schema,
selector grammar, the five scope shapes, response envelope, error codes,
all seven tools, and the CLI surface.
Applies findings from the pre-merge review of the operations stack:

Edge runtime (device.py):
  - Hand the broadcast envelope off to a tracked task so the subscription
    callback returns immediately. A long fire_at hold or slow driver
    function no longer blocks subsequent broadcasts from being received.
  - Extract _handle_broadcast_envelope and _evaluate_where so the where
    self-election step is isolated, unit-testable, and the callback body
    stays flat.
  - Splice device_id into the predicate's identity context so the natural
    ``identity.device_id == "..."`` form works (DeviceIdentity itself
    does not carry device_id; that lives on the runtime).

Wire format (tools.py + device.py):
  - Rename the broadcast envelope's ``target_device_ids`` field to
    ``targets`` before any edge ships. Shorter, less prescriptive, and
    matches the dispatcher-side ``candidates`` naming.

Subscription handle (tools.py):
  - Fix a race in Subscription.read(): truncate by the snapshot length
    captured BEFORE iteration, not by clearing post-iteration. A message
    appended by the messaging callback during draining now survives to
    the next read instead of being silently dropped.
  - Add __iter__ so ``for msg in sub:`` works with a sensible 30s idle
    timeout, matching the standard Python iteration protocol.

CLI (statectl/operations_cli.py):
  - statectl subscribe now catches KeyboardInterrupt cleanly (exit 130),
    distinguishes "got messages" (exit 0) from "idle timeout with no
    messages" (exit 4), so shell pipelines can branch on either outcome.
  - statectl invoke-many exits 3 when any target failed (alongside the
    existing 1 for top-level errors), so partial failure is visible to
    callers without parsing JSON.

ASCII compliance (predicate.py, tools.py):
  - Drop a banned-vocabulary token from a docstring.
  - Replace an em-dash in invoke_device's docstring with ASCII text.

New tests:
  - Unit: __iter__ protocol + race-safety guard for Subscription.read.
  - Integration: broadcast where=identity.device_id in bindings.allow
    (exercises the new identity context + bindings path),
    await_replies(until=) early-return timing, ``for msg in sub:``
    iteration end-to-end, and subscribe(event(...)) live-event capture.
…ters

Phases 4-5 added broadcast() and await_replies() to the agent-tools
surface but the adapter migration in feat(invoke) only carried invoke /
invoke_many across. The flashlight-auditorium demo needs the LLM to
issue selector-driven broadcasts with where + bindings + fire_at, so
broadcast and await_replies both need to be Strands/LangChain/Claude
tools as well.

Tool descriptions for the Claude adapter spell out the broadcast +
await_replies pairing (caller fires broadcast, then awaits replies by
correlation_id) so agents discover the workflow from the tool docs.

subscribe() is intentionally NOT exposed via the adapters: it returns
a Subscription handle that does not serialise cleanly as a tool result
and is more natural to call from operator code or the CLI than from an
LLM. Agents needing the same shape use broadcast + await_replies.
The broadcast handler built the where-predicate context from
``caps.identity`` -- but DeviceCapabilities does not carry an
``identity`` field; that lives on the driver as a separate
DeviceIdentity model. The ``getattr(caps, "identity", None)`` fallback
masked the bug: identity_dict was always just ``{"device_id": ...}``
with none of the driver's extra fields (seat_row, seat_col, x-mhp slot
metadata, ...) reaching the predicate.

Symptom: a where predicate like
``bindings.mask[identity.seat_row][identity.seat_col] == 1`` failed at
every candidate (CEL surfaces undefined field access as CELEvalError,
fail-closed fires, nobody self-elects).

Fix: read identity from ``self._driver.identity`` and splice in
``device_id`` from the runtime. Backwards-compatible with drivers that
don't expose an identity property (driver_identity is None -> only
device_id is present, same as before for those drivers).

Surfaced while building the flashlight-auditorium demo, where each
phone exposes its seat coordinates as extra fields on DeviceIdentity
and the spell-CMU broadcast indexes a 2D mask by those coordinates.
@soupat soupat force-pushed the discovery-operations branch 2 times, most recently from 936d343 to 4dac2af Compare May 19, 2026 16:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant