Verifiable authorization for Device Connect#30
Open
soupat wants to merge 21 commits into
Open
Conversation
Add an optional labels: dict[str, str | list[str]] field on
DeviceCapabilities, FunctionDef, and EventDef. Drivers populate them
either via class-level DeviceDriver.labels = {...} (device metadata)
or @rpc(labels=...) / @emit(labels=...) decorator kwargs. List values
express composite identity (a device that is both camera and inference).
These labels are the foundation for selector-based discovery and
operations: the discover/invoke/broadcast tools filter on them.
Add a pure-Python parser at device_connect_edge.selector that maps a
structured selector string onto a parsed Selector dataclass with five
scope shapes:
device(<filters>)
device(<filters>).function(<filters>)
device(<filters>).event(<filters>)
function(<filters>)
event(<filters>)
Inside (...): key:value, key:[v1,v2] (OR within key), key:pattern*
(anchored glob), k1:v1,k2:v2 (AND across keys), bare-string id/name
match, or * to match all.
Parse errors carry source + caret position for diagnostics. The matcher
is dependency-free (stdlib only) and applies vacuous-True semantics on
unset axes so callers can iterate without scope branching.
Add two new agent tools that replace the hierarchical trio:
- discover(selector, offset, limit) resolves a selector to matched
devices, function tuples, or event tuples. Adaptive response shape:
small result sets include full schemas inline; large sets paginate
with name-and-labels summaries (DC_FUNCTION_THRESHOLD=20).
- discover_labels(key, offset, limit) returns the label vocabulary,
per axis (no key) or paginated values for one key.
Response envelope: {scope, matched, returned, offset, next_offset,
results, label_histogram}. The label_histogram describes the matched
set (pre-pagination) so callers can choose how to narrow next without
a second call. On the device axis, multi-valued keys also expose
unique_devices for cardinality.
flatten_device now mirrors the legacy DeviceStatus.location into
labels["location"] when capabilities.labels does not declare one, so
drivers populating only the heartbeat field remain discoverable via
selector queries on location.
Migrate first-party adapters (Claude Agent SDK, Strands, LangChain,
the in-tree StrandsOpenAIDeviceConnectAgent) to discover/discover_labels.
The legacy describe_fleet/list_devices/get_device_functions trio
remains for one release as advisory-deprecated wrappers; each call
emits a DeprecationWarning pointing to the equivalent discover()
invocation.
Test drivers carry category, direction, modality, and safety labels so
integration tests can exercise the full selector grammar end-to-end.
Errors returned by discover() and discover_labels() are now structured
{"code": ..., "message": ...} dicts rather than free-form strings. This
lets callers branch on the code programmatically while still surfacing
the message to logs or end users.
Codes emitted:
- invalid_selector selector is not a string
- selector_parse_error selector is a string but malformed
- connection_error registry / messaging backend unavailable
- key_not_axis_qualified discover_labels key missing axis prefix
- unknown_axis discover_labels axis not in
{device, function, event}
…ools The doc is a developer guide rather than a decision record: drop the "ADR 0001:" framing, status line, and motivation paragraph. Trim the content to the discovery surface that ships with this PR (labels, selector grammar, discover, discover_labels, response envelope, error codes) so worked examples are runnable today.
Trim docs/discovery.md to the discovery surface that ships in this PR (labels, selector grammar, discover, discover_labels, response envelope, error codes). Drop the ADR framing (status line, summary/motivation), the "Operations" section listing tools that have not landed yet, the CLI section, and worked examples that called those tools, so the guide matches what a developer can actually run today.
Add two selector-driven invocation tools that replace the legacy
invoke_device(device_id, function, params) shape:
- invoke(selector, params, llm_reasoning) resolves a function-scoped
selector to exactly one (device, function) tuple and calls it. Returns
{success, device_id, function, result|error}. Returns no_match,
ambiguous_match, invalid_invoke_scope, or invalid_selector errors as
structured envelopes when the selector does not resolve cleanly.
- invoke_many(selector, params, timeout, max_concurrency, llm_reasoning)
resolves to N (device, function) tuples and fans out the calls in
parallel via a thread pool. Partial-failure semantics: a single
target's failure does not abort siblings. Returns {candidates, matched,
succeeded, failed, results, errors} with per-target structured errors.
Per-target timeout defaults to 30s.
invoke_device gains a DeprecationWarning pointing to invoke(); the
function still works for one release while callers migrate. Adapters
(Claude Agent SDK, Strands, LangChain, the in-tree
StrandsOpenAIDeviceConnectAgent, and the operator-facing AGENT_SCRIPT
template) drop invoke_device and expose invoke / invoke_many instead.
invoke_device_with_fallback stays unchanged -- it covers a different
ergonomic case (try a list of device ids in order) with no selector
equivalent.
22 unit tests cover scope rejection, ambiguous and zero matches,
JSON-RPC error mapping, partial failure, per-target timeout
propagation, and llm_reasoning stripping. 9 integration tests cover
single-target invoke, robot dispatch through to event emission,
fan-out across multiple cameras, partial failure, and zero-candidate
empty envelopes.
Add device_connect_edge.predicate, a thin wrapper around cel-python that
compiles where expressions into reusable WherePredicate objects and
evaluates them against device-local context (identity, labels, status,
shared bindings).
CEL was chosen over JSONLogic because the v4 design's mask-indexing
pattern (mask[seat_row][seat_col] == 1) needs computed array indices,
which JSONLogic's literal-path var operator cannot express without
flattening the mask to 1D and indexing arithmetically. CEL handles it
natively.
cel-python is an optional dependency. Importing the module without it
installed succeeds; compiling or evaluating a predicate raises a clear
PredicateCompileError pointing at the [predicate] extra:
pip install device-connect-edge[predicate]
pip install device-connect-agent-tools[predicate]
The evaluator is shared by the dispatcher (validates expressions before
sending them out) and the device runtime (evaluates per-call to decide
whether to execute a fan-out). 16 unit tests cover compilation,
evaluation, the mask-indexing regression case, missing-variable and
type-mismatch error surfaces, and evaluator reusability.
Add the async selector-driven fan-out path so callers do not have to
block on the slowest device:
- broadcast(selector, params, where=, bindings=, fire_at=, on_late=)
publishes a single envelope to a fanout subject keyed by tenant.
Returns immediately with a correlation_id and the candidate count.
Compile-validates the optional CEL where predicate at the dispatcher
so syntax errors short-circuit before reaching the wire.
- DeviceRuntime._broadcast_subscription receives envelopes on
``device-connect.<tenant>.broadcast``. Each candidate self-elects via
the target_device_ids gate (pre-resolved by the dispatcher from the
selector), then evaluates the optional where predicate against its
own context (identity, labels, status, shared bindings). On match the
device executes the function and emits a reply on
``device-connect.<tenant>.<device_id>.event.async_reply.<correlation_id>``
carrying {success, result|error, actually_fired_at}.
- fire_at + on_late synchronized fan-out: the edge holds the message
until the wall-clock deadline and fires from its own clock.
on_late=skip drops late arrivals (preserves coherence for
card-stunt / light-show style workloads); on_late=fire executes
immediately. The achieved spread depends on NTP residual (~5-10 ms
typical) rather than network jitter (~50-150 ms).
- subscribe(selector) returns a Subscription handle. Two selector
forms: ``correlation:<id>`` for broadcast replies, and event-scoped
selectors (``event(<name>)`` or ``device(...).event(<name>)``) for
live event streams. The handle exposes sync read() and a yielding
iter() with idle-timeout reset.
- await_replies(correlation_id, timeout, until) sync helper for the
common broadcast-then-collect pattern; subscribes, drains, returns
the list of reply payloads.
The edge predicate context mirrors DeviceStatus.location into
labels["location"] when the driver did not declare a labels.location
itself, matching the dispatcher-side flatten_device contract so the
same selector and predicate strings work on both sides.
Test coverage: 38 unit tests across broadcast (12), subscribe (12),
and existing modules; 5 NATS integration tests cover end-to-end
broadcast + reply, where filter at the edge, fire_at synchronization
spread, on_late=skip late-arrival drop, and subscribe(correlation:<id>)
streaming.
Add the operator-facing shell surface for selector-driven discovery and
operations:
devctl verbs (read-side):
- devctl discover "<selector>" [--offset N] [--limit M]
- devctl discover-labels [--key K] [--offset N] [--limit M]
statectl verbs (write-side):
- statectl invoke "<selector>" [--param k=v ...]
- statectl invoke-many "<selector>" [--param k=v ...] [--timeout T]
[--max-concurrency N]
- statectl broadcast "<selector>" [--param k=v ...] [--where E]
[--bindings JSON] [--fire-at T]
[--on-late skip|fire]
- statectl subscribe "<selector>" [--timeout T] [--until N]
- statectl await <correlation_id> [--timeout T] [--until N]
Each verb is a thin wrapper over the Python tool of the same name and
exits non-zero on tool-side errors so they compose into shell pipelines
naturally. Parameter values are decoded as JSON when they look like
JSON (numbers, booleans, arrays, objects, quoted strings) and pass
through as strings otherwise, so common shapes (--param resolution=4k,
--param zones='[1,2,3]') work without quoting heroics.
The historical ``devctl discover`` verb (mDNS scan for uncommissioned
devices) is renamed to ``mdns-scan`` with ``scan`` as an alias, so
``discover`` is free for the selector-driven sense. Existing scripts
should switch from ``devctl discover`` to ``devctl scan`` if they were
exercising the mDNS path.
22 parser-shape unit tests guard against argument drift; the underlying
tools already have full unit and integration coverage from earlier
phases.
Add the operations layer (invoke / invoke_many / broadcast / subscribe / await_replies) to docs/discovery.md, with the edge-side ``where`` predicate, synchronized fan-out via ``fire_at`` / ``on_late``, worked examples that exercise each tool, and the corresponding devctl / statectl CLI verbs. The guide now covers everything the discovery API ships: labels schema, selector grammar, the five scope shapes, response envelope, error codes, all seven tools, and the CLI surface.
Applies findings from the pre-merge review of the operations stack:
Edge runtime (device.py):
- Hand the broadcast envelope off to a tracked task so the subscription
callback returns immediately. A long fire_at hold or slow driver
function no longer blocks subsequent broadcasts from being received.
- Extract _handle_broadcast_envelope and _evaluate_where so the where
self-election step is isolated, unit-testable, and the callback body
stays flat.
- Splice device_id into the predicate's identity context so the natural
``identity.device_id == "..."`` form works (DeviceIdentity itself
does not carry device_id; that lives on the runtime).
Wire format (tools.py + device.py):
- Rename the broadcast envelope's ``target_device_ids`` field to
``targets`` before any edge ships. Shorter, less prescriptive, and
matches the dispatcher-side ``candidates`` naming.
Subscription handle (tools.py):
- Fix a race in Subscription.read(): truncate by the snapshot length
captured BEFORE iteration, not by clearing post-iteration. A message
appended by the messaging callback during draining now survives to
the next read instead of being silently dropped.
- Add __iter__ so ``for msg in sub:`` works with a sensible 30s idle
timeout, matching the standard Python iteration protocol.
CLI (statectl/operations_cli.py):
- statectl subscribe now catches KeyboardInterrupt cleanly (exit 130),
distinguishes "got messages" (exit 0) from "idle timeout with no
messages" (exit 4), so shell pipelines can branch on either outcome.
- statectl invoke-many exits 3 when any target failed (alongside the
existing 1 for top-level errors), so partial failure is visible to
callers without parsing JSON.
ASCII compliance (predicate.py, tools.py):
- Drop a banned-vocabulary token from a docstring.
- Replace an em-dash in invoke_device's docstring with ASCII text.
New tests:
- Unit: __iter__ protocol + race-safety guard for Subscription.read.
- Integration: broadcast where=identity.device_id in bindings.allow
(exercises the new identity context + bindings path),
await_replies(until=) early-return timing, ``for msg in sub:``
iteration end-to-end, and subscribe(event(...)) live-event capture.
…ters Phases 4-5 added broadcast() and await_replies() to the agent-tools surface but the adapter migration in feat(invoke) only carried invoke / invoke_many across. The flashlight-auditorium demo needs the LLM to issue selector-driven broadcasts with where + bindings + fire_at, so broadcast and await_replies both need to be Strands/LangChain/Claude tools as well. Tool descriptions for the Claude adapter spell out the broadcast + await_replies pairing (caller fires broadcast, then awaits replies by correlation_id) so agents discover the workflow from the tool docs. subscribe() is intentionally NOT exposed via the adapters: it returns a Subscription handle that does not serialise cleanly as a tool result and is more natural to call from operator code or the CLI than from an LLM. Agents needing the same shape use broadcast + await_replies.
The broadcast handler built the where-predicate context from
``caps.identity`` -- but DeviceCapabilities does not carry an
``identity`` field; that lives on the driver as a separate
DeviceIdentity model. The ``getattr(caps, "identity", None)`` fallback
masked the bug: identity_dict was always just ``{"device_id": ...}``
with none of the driver's extra fields (seat_row, seat_col, x-mhp slot
metadata, ...) reaching the predicate.
Symptom: a where predicate like
``bindings.mask[identity.seat_row][identity.seat_col] == 1`` failed at
every candidate (CEL surfaces undefined field access as CELEvalError,
fail-closed fires, nobody self-elects).
Fix: read identity from ``self._driver.identity`` and splice in
``device_id`` from the runtime. Backwards-compatible with drivers that
don't expose an identity property (driver_identity is None -> only
device_id is present, same as before for those drivers).
Surfaced while building the flashlight-auditorium demo, where each
phone exposes its seat coordinates as extra fields on DeviceIdentity
and the spell-CMU broadcast indexes a 2D mask by those coordinates.
936d343 to
4dac2af
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Validation
Notes