From 210c9cc9dd0d80bd97813c92108bf009722879cc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 16 May 2026 06:36:37 +0000 Subject: [PATCH 1/2] Initial plan From 045b07a3d25f2c7828f4600467b802728612f908 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 16 May 2026 06:49:13 +0000 Subject: [PATCH 2/2] refactor: reuse shared datetime and coercion helpers Agent-Logs-Url: https://github.com/DarinShapiro/ThreadObservabilityPOC/sessions/3adbc643-580b-4a6f-a2de-d8057fbadff4 Co-authored-by: DarinShapiro <23219821+DarinShapiro@users.noreply.github.com> --- .../src/thread_observability/api/http_api.py | 33 +++-- .../src/thread_observability/api/mcp_tools.py | 123 ++++++++++-------- .../src/thread_observability/api/triage.py | 6 +- .../pipeline/device_discovery.py | 71 +++++----- .../thread_observability/pipeline/nodes.py | 6 +- .../pipeline/otbr_adapter.py | 10 +- .../pipeline/otbr_diagnostics.py | 36 ++--- .../pipeline/otbr_parser.py | 12 +- .../pipeline/otbr_rest.py | 116 ++++++++++------- .../thread_observability/pipeline/reasoner.py | 17 +-- .../thread_observability/pipeline/timeline.py | 5 +- .../services/assessment/scheduler.py | 60 ++++----- .../storage/sqlite_store.py | 56 ++++---- .../app/tests/test_utils_helpers.py | 41 ++++++ 14 files changed, 330 insertions(+), 262 deletions(-) create mode 100644 addons/thread-observability/app/tests/test_utils_helpers.py diff --git a/addons/thread-observability/app/src/thread_observability/api/http_api.py b/addons/thread-observability/app/src/thread_observability/api/http_api.py index 9e44fd5..60a5749 100644 --- a/addons/thread-observability/app/src/thread_observability/api/http_api.py +++ b/addons/thread-observability/app/src/thread_observability/api/http_api.py @@ -316,7 +316,7 @@ def _record_chat_turn_telemetry( try: get_store().record_chat_turn_stat( conversation_id=conversation_id, - recorded_at=_utc_now(), + recorded_at=utc_now_iso(), backend=backend, agent_id=agent_id, model_name=model_name, @@ -442,10 +442,6 @@ def _read_addon_version() -> str: _HA_INTEGRATIONS_URL = "/config/integrations/dashboard" -def _utc_now() -> str: - return utc_now_iso() - - def _tail_log(n: int = 80) -> list[str]: if not LOG_PATH.exists(): return [] @@ -855,14 +851,15 @@ def chat_transcript(conversation_id: str) -> dict[str, object]: @app.get("/health") def health() -> dict[str, str]: - return {"status": "ok", "service": "core", "checked_at": _utc_now()} + return {"status": "ok", "service": "core", "checked_at": utc_now_iso()} @app.get("/v1/health/snapshot") def health_snapshot() -> dict[str, object]: try: return build_health_snapshot() except Exception as exc: # noqa: BLE001 - return {"error": str(exc), "computed_at": _utc_now()} + log.exception("health_snapshot failed") + return {"error": "health snapshot unavailable", "computed_at": utc_now_iso()} @app.get("/v1/issues/active") def list_active_issues() -> dict[str, object]: @@ -878,20 +875,32 @@ def list_active_issues() -> dict[str, object]: "issues": [], "status": "placeholder", "note": ISSUES_PAUSED_NOTE, - "computed_at": _utc_now(), + "computed_at": utc_now_iso(), } try: issues = get_store().list_active_issues() - return {"count": len(issues), "issues": issues, "computed_at": _utc_now()} + return {"count": len(issues), "issues": issues, "computed_at": utc_now_iso()} except Exception as exc: # noqa: BLE001 - return {"count": 0, "issues": [], "error": str(exc), "computed_at": _utc_now()} + log.exception("list_active_issues failed") + return { + "count": 0, + "issues": [], + "error": "active issues unavailable", + "computed_at": utc_now_iso(), + } @app.get("/v1/topology") def topology_snapshot(include_phantoms: bool = False) -> dict[str, object]: try: return topology_mod.build_topology(include_phantoms=include_phantoms) except Exception as exc: # noqa: BLE001 - return {"nodes": [], "links": [], "error": str(exc), "computed_at": _utc_now()} + log.exception("topology_snapshot failed") + return { + "nodes": [], + "links": [], + "error": "topology unavailable", + "computed_at": utc_now_iso(), + } @app.get("/v1/topology/history") def topology_history(limit: int = 20) -> dict[str, object]: @@ -1217,7 +1226,7 @@ async def dev_status(include_phantoms: bool = False) -> dict[str, object]: ) return { "addon_version": ADDON_VERSION, - "checked_at": _utc_now(), + "checked_at": utc_now_iso(), "supervisor": sup, "health": health, "issues": list_active_issues(), diff --git a/addons/thread-observability/app/src/thread_observability/api/mcp_tools.py b/addons/thread-observability/app/src/thread_observability/api/mcp_tools.py index d7c0a74..7c4a6cb 100644 --- a/addons/thread-observability/app/src/thread_observability/api/mcp_tools.py +++ b/addons/thread-observability/app/src/thread_observability/api/mcp_tools.py @@ -72,10 +72,6 @@ def _read_addon_version() -> str: ADDON_VERSION = _read_addon_version() -def _utc_now() -> str: - return utc_now_iso() - - def _tail_log(n: int = LOG_TAIL_LINES) -> list[str]: """Return up to n lines from the tail of the add-on log file.""" candidates = [ @@ -973,6 +969,8 @@ def _meta(name: str) -> dict[str, Any]: cache_age_s = round(max(0.0, now_ts - float(finished_at)), 3) def _iso(v: Any) -> str | None: + # Intentionally broader than ``to_iso_utc`` because runner state may + # contain persisted epoch values or already-normalized ISO strings. if isinstance(v, (int, float)): return datetime.fromtimestamp(v, tz=UTC).isoformat() if isinstance(v, str): @@ -987,7 +985,7 @@ def _iso(v: Any) -> str | None: return { "tool": name, - "as_of": _utc_now(), + "as_of": utc_now_iso(), "data_source": "persisted_state", "cache_age_s": cache_age_s, "stale_after_s": stale_after_s, @@ -1003,6 +1001,17 @@ def _iso(v: Any) -> str | None: } +def _redact_external_errors(value: Any) -> Any: + if isinstance(value, dict): + return { + key: ("Internal error" if key == "error" and isinstance(item, str) else _redact_external_errors(item)) + for key, item in value.items() + } + if isinstance(value, list): + return [_redact_external_errors(item) for item in value] + return value + + async def _dispatch_and_wrap( name: str, arguments: dict[str, Any] ) -> dict[str, Any]: @@ -1029,7 +1038,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] include_phantoms=include_phantoms, ) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "list_active_issues": # Mirrors /v1/issues/active. Issue detection is paused # pending redesign (#5); return an explicit placeholder so AI @@ -1046,18 +1055,18 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] issues = get_store().list_active_issues() return {"count": len(issues), "issues": issues} except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_health_snapshot": try: return _build_health_snapshot() except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "close_issue": try: ok = get_store().close_issue(int(arguments["id"])) return {"closed": ok, "id": int(arguments["id"])} except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_recent_logs": n = min(int(arguments.get("lines", 100)), LOG_TAIL_LINES) lines = _tail_log(n) @@ -1068,7 +1077,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] try: return await supervisor_client.get_addon_info() except Exception as exc: # noqa: BLE001 - return {"error": str(exc), "hint": "Supervisor unreachable; running outside HA?"} + return {"error": "Internal error", "hint": "Supervisor unreachable; running outside HA?"} if name == "ha_get_addon_logs": n = max(1, min(int(arguments.get("lines", 200)), 1000)) slug = arguments.get("slug") or None @@ -1082,58 +1091,58 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] ) return {"lines": lines, "count": len(lines), "source": source, "slug": slug} except Exception as exc: # noqa: BLE001 - return {"error": str(exc), "slug": slug} + return {"error": "Internal error", "slug": slug} if name == "ha_get_supervisor_logs": n = max(1, min(int(arguments.get("lines", 200)), 1000)) try: lines = await supervisor_client.get_supervisor_logs(n) return {"lines": lines, "count": len(lines), "source": "supervisor:/supervisor/logs"} except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "ha_restart_addon": try: res = await supervisor_client.restart_addon() - return {"action": "restart", "result": res, "requested_at": _utc_now()} + return {"action": "restart", "result": res, "requested_at": utc_now_iso()} except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "ha_rebuild_addon": try: res = await supervisor_client.rebuild_addon() - return {"action": "rebuild", "result": res, "requested_at": _utc_now()} + return {"action": "rebuild", "result": res, "requested_at": utc_now_iso()} except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "ha_check_for_update": try: return await supervisor_client.check_for_update() except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "ha_update_addon": dry_run = bool(arguments.get("dry_run", False)) try: res = await supervisor_client.update_addon(dry_run=dry_run) - return {"result": res, "requested_at": _utc_now()} + return {"result": res, "requested_at": utc_now_iso()} except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "ha_set_auto_update": enabled = bool(arguments.get("enabled", False)) try: res = await supervisor_client.set_auto_update(enabled) return {"action": "set_auto_update", "enabled": enabled, "result": res} except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "ha_reinstall_addon": try: res = await supervisor_client.reinstall_addon("thread-observability") - return {"action": "reinstall", "result": res, "requested_at": _utc_now()} + return {"action": "reinstall", "result": res, "requested_at": utc_now_iso()} except Exception as exc: # noqa: BLE001 # Connection reset mid-uninstall is the expected success path. return {"action": "reinstall", "note": "connection terminated (expected)", - "error": str(exc)} + "error": "Internal error"} if name == "list_thread_datasets": try: return await supervisor_client.list_thread_datasets() except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} # ---- Storage / config tools (Phase 1) --------------------------------- if name == "get_storage_stats": @@ -1142,15 +1151,15 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] try: ts_health = await ts_store.timeseries_health() except Exception as exc: # noqa: BLE001 - ts_health = {"backend": "unknown", "error": str(exc)} + ts_health = {"backend": "unknown", "error": "Internal error"} return {"sqlite": stats, "timeseries": ts_health} except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_chat_stats": try: return get_store().get_chat_turn_stats(since=arguments.get("since")) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "query_history": try: from ..pipeline import timeline as timeline_mod @@ -1168,7 +1177,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] limit=int(arguments.get("limit", 500)), ) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_topology_history_entry": try: sid = arguments.get("snapshot_id") @@ -1180,7 +1189,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] ) return snap or {"snapshot": None} except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "list_topology_history": try: snaps = get_store().list_topology_snapshots( @@ -1190,7 +1199,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] ) return {"snapshots": snaps, "count": len(snaps)} except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "diff_topology_history": try: from ..pipeline import topology_snapshot as ts_mod @@ -1207,14 +1216,14 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] snapshot_id_b=int(b), ) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "list_playbooks": try: from ..pipeline import playbooks as pb_mod return pb_mod.list_playbooks() except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "lookup_playbook": try: from ..pipeline import playbooks as pb_mod @@ -1225,7 +1234,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] query=arguments.get("query"), ) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "analyze_node": try: from ..pipeline import analyze_node as an_mod @@ -1240,7 +1249,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] baseline_days=int(arguments.get("baseline_days", 7)), ) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_config": try: cfg = get_config() @@ -1255,12 +1264,12 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] payload["ai"]["api_key"] = "***" return payload except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_timeseries_health": try: return await ts_store.timeseries_health() except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} # ---- OTBR ingestion tools (Phase 2.5) --------------------------------- if name == "list_otbr_candidates": @@ -1268,7 +1277,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] cands = await otbr_adapter.list_candidates() return {"count": len(cands), "candidates": cands} except Exception as exc: # noqa: BLE001 - return {"error": str(exc), "candidates": []} + return {"error": "Internal error", "candidates": []} if name == "set_otbr_slug": try: slug = str(arguments.get("slug", "")).strip() @@ -1276,18 +1285,18 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] return {"error": "slug required"} return otbr_adapter.set_slug(slug) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "ingest_now": try: slug = arguments.get("slug") return await otbr_adapter.ingest_once(slug=slug) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_ingest_state": try: return otbr_adapter.get_state() except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} # ---- Node metadata tools ---------------------------------- if name == "list_all_nodes": @@ -1308,13 +1317,13 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] ] return {"nodes": nodes, "count": len(nodes)} except Exception as exc: # noqa: BLE001 - return {"error": str(exc), "nodes": []} + return {"error": "Internal error", "nodes": []} if name == "sync_ha_devices": try: from ..pipeline import device_discovery return await device_discovery.discover_and_sync() except Exception as exc: # noqa: BLE001 - return {"error": str(exc), "matched": 0, "updated": 0} + return {"error": "Internal error", "matched": 0, "updated": 0} # ---- Phase 3 triage tools ----------------------------------------- if name == "start_triage": @@ -1322,19 +1331,19 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] from .http_api import ADDON_VERSION return await triage_mod.start_triage(addon_version=ADDON_VERSION) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_environment": try: from .http_api import ADDON_VERSION return await triage_mod.get_environment(addon_version=ADDON_VERSION) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_pipeline_health": try: limit = int(arguments.get("limit", 20)) return triage_mod.get_pipeline_health(limit=limit) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} # ---- Phase 4 counter time-series tools ---------------------------- if name == "get_counter_series": @@ -1347,7 +1356,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] resolution=str(arguments.get("resolution") or "raw"), ) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "compare_node_counters": try: return counter_series_mod.compare_node_counters( @@ -1359,7 +1368,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] resolution=str(arguments.get("resolution") or "raw"), ) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_signal_series": try: return signal_series_mod.get_signal_series( @@ -1369,7 +1378,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] resolution=str(arguments.get("resolution") or "raw"), ) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_node_link_signal_history": try: return link_signal_history_mod.get_node_link_signal_history( @@ -1381,7 +1390,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] limit=int(arguments.get("limit") or 5000), ) except Exception as exc: # noqa: BLE001 - return {"error": str(exc)} + return {"error": "Internal error"} if name == "get_assessment_state": from ..services.assessment.scheduler import ( @@ -1411,7 +1420,7 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] ) return {"recorded": True, "feedback": rec} except (LookupError, ValueError) as exc: - return {"recorded": False, "error": str(exc)} + return {"recorded": False, "error": "Internal error"} if name == "get_assessment_quality": from ..services.assessment import feedback as feedback_mod @@ -1473,7 +1482,7 @@ async def _handle_mcp_jsonrpc(body: dict[str, Any]) -> tuple[dict[str, Any], int except KeyError: return _jsonrpc_error(req_id, -32602, f"Unknown resource: {resource_name}"), 200 except FileNotFoundError as exc: - return _jsonrpc_error(req_id, -32603, str(exc)), 200 + return _jsonrpc_error(req_id, -32603, "Internal error"), 200 return _jsonrpc_ok( req_id, { @@ -1522,7 +1531,7 @@ def root() -> dict[str, str]: @app.get("/health") def health() -> dict[str, str]: - return {"status": "ok", "service": "mcp", "checked_at": _utc_now()} + return {"status": "ok", "service": "mcp", "checked_at": utc_now_iso()} @app.get("/mcp/tools") def list_tools_rest() -> dict[str, object]: @@ -1539,8 +1548,8 @@ def read_resource_rest(resource_name: str) -> dict[str, object]: except KeyError as exc: raise HTTPException(status_code=404, detail=f"Unknown resource: {resource_name}") from exc except FileNotFoundError as exc: - raise HTTPException(status_code=500, detail=str(exc)) from exc - return {"resource": resource, "contents": contents, "read_at": _utc_now()} + raise HTTPException(status_code=500, detail="Internal error") from exc + return {"resource": resource, "contents": contents, "read_at": utc_now_iso()} @app.get("/mcp/sse") async def open_mcp_sse(request: Request) -> StreamingResponse: @@ -1555,7 +1564,7 @@ async def event_stream() -> Any: try: payload = await asyncio.wait_for(queue.get(), timeout=15.0) except TimeoutError: - yield _encode_sse("ping", {"ts": _utc_now()}) + yield _encode_sse("ping", {"ts": utc_now_iso()}) continue yield _encode_sse("message", payload) finally: @@ -1598,7 +1607,11 @@ async def call_tool_rest(tool_name: str, request: ToolCallRequest) -> dict[str, if tool_name not in _TOOL_MAP: raise HTTPException(status_code=404, detail=f"Unknown tool: {tool_name}") result = await _dispatch_and_wrap(tool_name, request.arguments) - return {"tool": tool_name, "result": result, "called_at": _utc_now()} + return { + "tool": tool_name, + "result": _redact_external_errors(result), + "called_at": utc_now_iso(), + } # ── MCP JSON-RPC 2.0 endpoint (VS Code MCP client) ─────────────────────── diff --git a/addons/thread-observability/app/src/thread_observability/api/triage.py b/addons/thread-observability/app/src/thread_observability/api/triage.py index 3f12031..f218163 100644 --- a/addons/thread-observability/app/src/thread_observability/api/triage.py +++ b/addons/thread-observability/app/src/thread_observability/api/triage.py @@ -24,10 +24,6 @@ from . import supervisor_client -def _utc_now_iso() -> str: - return utc_now_iso() - - # --------------------------------------------------------------------------- # Environment bundle # --------------------------------------------------------------------------- @@ -256,7 +252,7 @@ async def start_triage(*, addon_version: str | None = None) -> dict[str, Any]: issues = get_store().list_active_issues() recommended = _build_recommendations(issues, health, environment) return { - "as_of": _utc_now_iso(), + "as_of": utc_now_iso(), "environment": environment, "health": health, "active_issues_count": len(issues), diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/device_discovery.py b/addons/thread-observability/app/src/thread_observability/pipeline/device_discovery.py index f0b9f5d..4aa751e 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/device_discovery.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/device_discovery.py @@ -239,15 +239,6 @@ def _ext_address_to_eui64(raw: Any) -> str | None: return _hardware_address_to_eui64(raw) -def _field(struct: dict[str, Any], int_key: int, *str_keys: str) -> Any: - """Defensively read a struct field by Matter integer id or named alias.""" - return first_present_field(struct, *str_keys, int_key=int_key) - - -def _coerce_int(v: Any) -> int | None: - return coerce_int(v) - - def _decode_neighbor_table(raw: Any) -> list[dict[str, Any]]: """Decode a Matter NeighborTable attribute (cluster 53 attr 7). @@ -267,29 +258,43 @@ def _decode_neighbor_table(raw: Any) -> list[dict[str, Any]]: for entry in raw: if not isinstance(entry, dict): continue - eui = _ext_address_to_eui64(_field(entry, 0, "extAddress", "ExtAddress")) + eui = _ext_address_to_eui64( + first_present_field(entry, "extAddress", "ExtAddress", int_key=0) + ) if not eui: continue - is_child_raw = _field(entry, 13, "isChild", "IsChild") - rx_on_raw = _field(entry, 10, "rxOnWhenIdle", "RxOnWhenIdle") - ftd_raw = _field(entry, 11, "fullThreadDevice", "FullThreadDevice") - fnd_raw = _field(entry, 12, "fullNetworkData", "FullNetworkData") + is_child_raw = first_present_field(entry, "isChild", "IsChild", int_key=13) + rx_on_raw = first_present_field(entry, "rxOnWhenIdle", "RxOnWhenIdle", int_key=10) + ftd_raw = first_present_field(entry, "fullThreadDevice", "FullThreadDevice", int_key=11) + fnd_raw = first_present_field(entry, "fullNetworkData", "FullNetworkData", int_key=12) out.append({ "neighbor_eui64": eui, - "rssi_avg": _coerce_int(_field(entry, 6, "averageRssi", "AverageRssi")), - "rssi_last": _coerce_int(_field(entry, 7, "lastRssi", "LastRssi")), - "lqi_in": _coerce_int(_field(entry, 5, "lqi", "LQI")), + "rssi_avg": coerce_int( + first_present_field(entry, "averageRssi", "AverageRssi", int_key=6) + ), + "rssi_last": coerce_int( + first_present_field(entry, "lastRssi", "LastRssi", int_key=7) + ), + "lqi_in": coerce_int(first_present_field(entry, "lqi", "LQI", int_key=5)), "lqi_out": None, "is_child": to_tristate_int(is_child_raw), - "age_seconds": _coerce_int(_field(entry, 1, "age", "Age")), - "frame_error_rate": _coerce_int(_field(entry, 8, "frameErrorRate", "FrameErrorRate")), - "message_error_rate": _coerce_int(_field(entry, 9, "messageErrorRate", "MessageErrorRate")), + "age_seconds": coerce_int(first_present_field(entry, "age", "Age", int_key=1)), + "frame_error_rate": coerce_int( + first_present_field(entry, "frameErrorRate", "FrameErrorRate", int_key=8) + ), + "message_error_rate": coerce_int( + first_present_field(entry, "messageErrorRate", "MessageErrorRate", int_key=9) + ), "path_cost": None, "rx_on_when_idle": to_tristate_int(rx_on_raw), "full_thread_device": to_tristate_int(ftd_raw), "full_network_data": to_tristate_int(fnd_raw), - "link_frame_counter": _coerce_int(_field(entry, 3, "linkFrameCounter", "LinkFrameCounter")), - "mle_frame_counter": _coerce_int(_field(entry, 4, "mleFrameCounter", "MleFrameCounter")), + "link_frame_counter": coerce_int( + first_present_field(entry, "linkFrameCounter", "LinkFrameCounter", int_key=3) + ), + "mle_frame_counter": coerce_int( + first_present_field(entry, "mleFrameCounter", "MleFrameCounter", int_key=4) + ), }) return out @@ -312,24 +317,28 @@ def _decode_route_table(raw: Any) -> list[dict[str, Any]]: for entry in raw: if not isinstance(entry, dict): continue - eui = _ext_address_to_eui64(_field(entry, 0, "extAddress", "ExtAddress")) + eui = _ext_address_to_eui64( + first_present_field(entry, "extAddress", "ExtAddress", int_key=0) + ) if not eui: continue - alloc_raw = _field(entry, 8, "allocated", "Allocated") - est_raw = _field(entry, 9, "linkEstablished", "LinkEstablished") + alloc_raw = first_present_field(entry, "allocated", "Allocated", int_key=8) + est_raw = first_present_field(entry, "linkEstablished", "LinkEstablished", int_key=9) out.append({ "neighbor_eui64": eui, "rssi_avg": None, "rssi_last": None, - "lqi_in": _coerce_int(_field(entry, 5, "lqiIn", "LQIIn")), - "lqi_out": _coerce_int(_field(entry, 6, "lqiOut", "LQIOut")), + "lqi_in": coerce_int(first_present_field(entry, "lqiIn", "LQIIn", int_key=5)), + "lqi_out": coerce_int(first_present_field(entry, "lqiOut", "LQIOut", int_key=6)), "is_child": None, - "age_seconds": _coerce_int(_field(entry, 7, "age", "Age")), + "age_seconds": coerce_int(first_present_field(entry, "age", "Age", int_key=7)), "frame_error_rate": None, "message_error_rate": None, - "path_cost": _coerce_int(_field(entry, 4, "pathCost", "PathCost")), - "router_id": _coerce_int(_field(entry, 2, "routerId", "RouterId")), - "next_hop_router_id": _coerce_int(_field(entry, 3, "nextHop", "NextHop")), + "path_cost": coerce_int(first_present_field(entry, "pathCost", "PathCost", int_key=4)), + "router_id": coerce_int(first_present_field(entry, "routerId", "RouterId", int_key=2)), + "next_hop_router_id": coerce_int( + first_present_field(entry, "nextHop", "NextHop", int_key=3) + ), "allocated": to_tristate_int(alloc_raw), "link_established": to_tristate_int(est_raw), }) diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/nodes.py b/addons/thread-observability/app/src/thread_observability/pipeline/nodes.py index 4d2cc6c..4326651 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/nodes.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/nodes.py @@ -11,11 +11,7 @@ from typing import Any from ..storage.sqlite_store import SQLiteStore, get_store -from ..utils.datetime import parse_iso_datetime, utc_now_iso - - -def _utc_now() -> str: - return utc_now_iso() +from ..utils.datetime import parse_iso_datetime def _physical_identity_key(node: dict[str, Any]) -> tuple[int, int, str] | None: diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/otbr_adapter.py b/addons/thread-observability/app/src/thread_observability/pipeline/otbr_adapter.py index 4a15be3..630aab9 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/otbr_adapter.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/otbr_adapter.py @@ -17,13 +17,13 @@ import hashlib import logging import os -from datetime import UTC, datetime from typing import Any import httpx from . import otbr_parser from ..storage.sqlite_store import SQLiteStore, get_store +from ..utils.datetime import utc_now_iso log = logging.getLogger(__name__) @@ -38,10 +38,6 @@ _STATE_KEY_PREFIX = "otbr:" -def _now() -> str: - return datetime.now(tz=UTC).isoformat() - - def _token_or_raise() -> str: tok = os.getenv(SUPERVISOR_TOKEN_ENV) if not tok: @@ -97,7 +93,7 @@ def set_slug(slug: str, store: SQLiteStore | None = None) -> dict[str, Any]: # Reset cursor when switching add-ons. state["last_line_hash"] = None state["last_event_ts"] = None - state["last_run_at"] = _now() + state["last_run_at"] = utc_now_iso() state["last_error"] = None _write_state(s, state) return state @@ -219,7 +215,7 @@ async def ingest_once( state = _read_state(s) target = slug or state.get("slug") summary: dict[str, Any] = { - "ran_at": _now(), + "ran_at": utc_now_iso(), "slug": target, "lines_seen": 0, "lines_new": 0, diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/otbr_diagnostics.py b/addons/thread-observability/app/src/thread_observability/pipeline/otbr_diagnostics.py index bcc62c7..266498e 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/otbr_diagnostics.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/otbr_diagnostics.py @@ -33,10 +33,6 @@ log = logging.getLogger(__name__) -def _coerce_int(v: Any) -> int | None: - return coerce_int(v, allow_strings=True) - - def _extract_mac_counters(payload: dict[str, Any]) -> dict[str, int | None]: """Pull MAC counter fields out of a ``/diagnostics`` response. @@ -56,23 +52,29 @@ def _extract_mac_counters(payload: dict[str, Any]) -> dict[str, int | None]: "rx_total", "rx_err", "rx_dup", )} return { - "tx_total": _coerce_int( - mc.get("IfOutUcastPkts") or mc.get("tx_total") or mc.get("TxTotal") + "tx_total": coerce_int( + mc.get("IfOutUcastPkts") or mc.get("tx_total") or mc.get("TxTotal"), + allow_strings=True, ), - "tx_retry": _coerce_int( - mc.get("IfOutRetries") or mc.get("tx_retry") or mc.get("TxRetry") + "tx_retry": coerce_int( + mc.get("IfOutRetries") or mc.get("tx_retry") or mc.get("TxRetry"), + allow_strings=True, ), - "tx_err": _coerce_int( - mc.get("IfOutErrors") or mc.get("tx_err") or mc.get("TxErrAbort") + "tx_err": coerce_int( + mc.get("IfOutErrors") or mc.get("tx_err") or mc.get("TxErrAbort"), + allow_strings=True, ), - "rx_total": _coerce_int( - mc.get("IfInUcastPkts") or mc.get("rx_total") or mc.get("RxTotal") + "rx_total": coerce_int( + mc.get("IfInUcastPkts") or mc.get("rx_total") or mc.get("RxTotal"), + allow_strings=True, ), - "rx_err": _coerce_int( - mc.get("IfInErrors") or mc.get("rx_err") or mc.get("RxErrNoFrame") + "rx_err": coerce_int( + mc.get("IfInErrors") or mc.get("rx_err") or mc.get("RxErrNoFrame"), + allow_strings=True, ), - "rx_dup": _coerce_int( - mc.get("IfInDup") or mc.get("rx_dup") or mc.get("RxDuplicated") + "rx_dup": coerce_int( + mc.get("IfInDup") or mc.get("rx_dup") or mc.get("RxDuplicated"), + allow_strings=True, ), } @@ -115,7 +117,7 @@ async def poll_otbr_diagnostics( or router.get("rloc16") or router.get("RLOC16") ) - rloc16 = _coerce_int(rloc16_raw) + rloc16 = coerce_int(rloc16_raw, allow_strings=True) if not eui or rloc16 is None: continue summary["routers_polled"] += 1 diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/otbr_parser.py b/addons/thread-observability/app/src/thread_observability/pipeline/otbr_parser.py index d8a93b9..79bb535 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/otbr_parser.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/otbr_parser.py @@ -21,7 +21,7 @@ from datetime import UTC, datetime from typing import Any -from ..utils.datetime import parse_iso_datetime +from ..utils.datetime import parse_iso_datetime, utc_now, utc_now_iso _EUI = r"[0-9a-fA-F]{16}" @@ -108,10 +108,6 @@ def to_storage_kwargs(self) -> dict[str, Any]: } -def _now_iso() -> str: - return datetime.now(tz=UTC).isoformat() - - def _extract_ts(line: str) -> tuple[str, str]: """Return (iso_ts, remainder). Falls back to now() if no leading ts.""" for pat in _TS_PATTERNS: @@ -122,7 +118,7 @@ def _extract_ts(line: str) -> tuple[str, str]: try: # Handle time-only format (HH:MM:SS from OTBR daemon logs) if len(raw) <= 8: # Just HH:MM:SS - today = datetime.now(tz=UTC).date() + today = utc_now().date() ms = m.group("ms") if "ms" in m.groupdict() else None ms_str = f".{ms}" if ms else "" normalised = f"{today}T{raw}{ms_str}" @@ -143,8 +139,8 @@ def _extract_ts(line: str) -> tuple[str, str]: raise ValueError("invalid timestamp") return dt.astimezone(UTC).isoformat(), line[m.end():] except ValueError: - return _now_iso(), line[m.end():] - return _now_iso(), line + return utc_now_iso(), line[m.end():] + return utc_now_iso(), line def parse_line(line: str) -> ParsedEvent | None: diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/otbr_rest.py b/addons/thread-observability/app/src/thread_observability/pipeline/otbr_rest.py index 3d8b365..2f8e847 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/otbr_rest.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/otbr_rest.py @@ -330,13 +330,8 @@ async def fetch_otbr_diagnostics( return None -def _otbr_field(entry: dict[str, Any], *keys: str) -> Any: - """Return the first non-None value among ``keys`` (case variants).""" - return first_present_field(entry, *keys) - - def _otbr_eui_from(entry: dict[str, Any]) -> str | None: - raw = _otbr_field(entry, "ExtAddress", "extAddress", "ext_address") + raw = first_present_field(entry, "ExtAddress", "extAddress", "ext_address") if raw is None: return None try: @@ -348,14 +343,6 @@ def _otbr_eui_from(entry: dict[str, Any]) -> str | None: return None -def _otbr_coerce_int(v: Any) -> int | None: - return coerce_int(v, allow_strings=True) - - -def _otbr_tri(v: Any) -> int | None: - return to_tristate_int(v) - - def _decode_otbr_neighbors(raw: list[dict[str, Any]] | None) -> list[dict[str, Any]]: """Map OTBR REST ``/node/neighbors`` JSON to ``links`` row dicts. @@ -374,30 +361,51 @@ def _decode_otbr_neighbors(raw: list[dict[str, Any]] | None) -> list[dict[str, A continue out.append({ "neighbor_eui64": eui, - "rssi_avg": _otbr_coerce_int(_otbr_field(entry, "AverageRssi", "averageRssi")), - "rssi_last": _otbr_coerce_int(_otbr_field(entry, "LastRssi", "lastRssi")), - "lqi_in": _otbr_coerce_int(_otbr_field(entry, "LinkQualityIn", "linkQualityIn")), - "lqi_out": _otbr_coerce_int(_otbr_field(entry, "LinkQualityOut", "linkQualityOut")), - "is_child": _otbr_tri(_otbr_field(entry, "IsChild", "isChild")), - "age_seconds": _otbr_coerce_int(_otbr_field(entry, "Age", "age")), - "frame_error_rate": _otbr_coerce_int( - _otbr_field(entry, "FrameErrorRate", "frameErrorRate") + "rssi_avg": coerce_int( + first_present_field(entry, "AverageRssi", "averageRssi"), + allow_strings=True, + ), + "rssi_last": coerce_int( + first_present_field(entry, "LastRssi", "lastRssi"), + allow_strings=True, + ), + "lqi_in": coerce_int( + first_present_field(entry, "LinkQualityIn", "linkQualityIn"), + allow_strings=True, ), - "message_error_rate": _otbr_coerce_int( - _otbr_field(entry, "MessageErrorRate", "messageErrorRate") + "lqi_out": coerce_int( + first_present_field(entry, "LinkQualityOut", "linkQualityOut"), + allow_strings=True, ), - "link_frame_counter": _otbr_coerce_int( - _otbr_field(entry, "LinkFrameCounter", "linkFrameCounter") + "is_child": to_tristate_int(first_present_field(entry, "IsChild", "isChild")), + "age_seconds": coerce_int( + first_present_field(entry, "Age", "age"), + allow_strings=True, ), - "mle_frame_counter": _otbr_coerce_int( - _otbr_field(entry, "MleFrameCounter", "mleFrameCounter") + "frame_error_rate": coerce_int( + first_present_field(entry, "FrameErrorRate", "frameErrorRate"), + allow_strings=True, ), - "rx_on_when_idle": _otbr_tri(_otbr_field(entry, "RxOnWhenIdle", "rxOnWhenIdle")), - "full_thread_device": _otbr_tri( - _otbr_field(entry, "FullThreadDevice", "fullThreadDevice") + "message_error_rate": coerce_int( + first_present_field(entry, "MessageErrorRate", "messageErrorRate"), + allow_strings=True, ), - "full_network_data": _otbr_tri( - _otbr_field(entry, "FullNetworkData", "fullNetworkData") + "link_frame_counter": coerce_int( + first_present_field(entry, "LinkFrameCounter", "linkFrameCounter"), + allow_strings=True, + ), + "mle_frame_counter": coerce_int( + first_present_field(entry, "MleFrameCounter", "mleFrameCounter"), + allow_strings=True, + ), + "rx_on_when_idle": to_tristate_int( + first_present_field(entry, "RxOnWhenIdle", "rxOnWhenIdle") + ), + "full_thread_device": to_tristate_int( + first_present_field(entry, "FullThreadDevice", "fullThreadDevice") + ), + "full_network_data": to_tristate_int( + first_present_field(entry, "FullNetworkData", "fullNetworkData") ), }) return out @@ -422,24 +430,42 @@ def _decode_otbr_routers(raw: list[dict[str, Any]] | None) -> list[dict[str, Any "neighbor_eui64": eui, "rssi_avg": None, "rssi_last": None, - "lqi_in": _otbr_coerce_int( - _otbr_field(entry, "LinkQualityIn", "LqiIn", "lqiIn", "linkQualityIn") + "lqi_in": coerce_int( + first_present_field( + entry, "LinkQualityIn", "LqiIn", "lqiIn", "linkQualityIn" + ), + allow_strings=True, ), - "lqi_out": _otbr_coerce_int( - _otbr_field(entry, "LinkQualityOut", "LqiOut", "lqiOut", "linkQualityOut") + "lqi_out": coerce_int( + first_present_field( + entry, "LinkQualityOut", "LqiOut", "lqiOut", "linkQualityOut" + ), + allow_strings=True, ), "is_child": None, - "age_seconds": _otbr_coerce_int(_otbr_field(entry, "Age", "age")), + "age_seconds": coerce_int( + first_present_field(entry, "Age", "age"), + allow_strings=True, + ), "frame_error_rate": None, "message_error_rate": None, - "path_cost": _otbr_coerce_int(_otbr_field(entry, "PathCost", "pathCost")), - "router_id": _otbr_coerce_int(_otbr_field(entry, "RouterId", "routerId")), - "next_hop_router_id": _otbr_coerce_int( - _otbr_field(entry, "NextHopRouterId", "NextHop", "nextHop", "nextHopRouterId") + "path_cost": coerce_int( + first_present_field(entry, "PathCost", "pathCost"), + allow_strings=True, + ), + "router_id": coerce_int( + first_present_field(entry, "RouterId", "routerId"), + allow_strings=True, + ), + "next_hop_router_id": coerce_int( + first_present_field( + entry, "NextHopRouterId", "NextHop", "nextHop", "nextHopRouterId" + ), + allow_strings=True, ), - "allocated": _otbr_tri(_otbr_field(entry, "Allocated", "allocated")), - "link_established": _otbr_tri( - _otbr_field(entry, "LinkEstablished", "linkEstablished") + "allocated": to_tristate_int(first_present_field(entry, "Allocated", "allocated")), + "link_established": to_tristate_int( + first_present_field(entry, "LinkEstablished", "linkEstablished") ), }) return out diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/reasoner.py b/addons/thread-observability/app/src/thread_observability/pipeline/reasoner.py index f2785e0..9e2af25 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/reasoner.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/reasoner.py @@ -26,6 +26,7 @@ import json from ..storage.sqlite_store import SQLiteStore, get_store +from ..utils.datetime import to_iso_utc PARENT_CHURN_WINDOW_MIN = 30 PARENT_CHURN_THRESHOLD = 3 @@ -86,10 +87,6 @@ ) -def _iso(dt: datetime) -> str: - return dt.isoformat() - - def run_reasoner( *, now: datetime | None = None, @@ -128,11 +125,11 @@ def run_reasoner( skipped: list[int] = [] # ---- gather raw inputs in one lock ---- - churn_window = _iso(now_dt - timedelta(minutes=PARENT_CHURN_WINDOW_MIN)) - attach_window = _iso(now_dt - timedelta(minutes=ATTACH_FAIL_WINDOW_MIN)) - offline_cutoff = _iso(now_dt - timedelta(minutes=OFFLINE_THRESHOLD_MIN)) - re_attach_window = _iso(now_dt - timedelta(minutes=RE_ATTACH_STORM_WINDOW_MIN)) - mesh_disagree_cutoff = _iso( + churn_window = to_iso_utc(now_dt - timedelta(minutes=PARENT_CHURN_WINDOW_MIN)) + attach_window = to_iso_utc(now_dt - timedelta(minutes=ATTACH_FAIL_WINDOW_MIN)) + offline_cutoff = to_iso_utc(now_dt - timedelta(minutes=OFFLINE_THRESHOLD_MIN)) + re_attach_window = to_iso_utc(now_dt - timedelta(minutes=RE_ATTACH_STORM_WINDOW_MIN)) + mesh_disagree_cutoff = to_iso_utc( now_dt - timedelta(minutes=MESH_DISAGREEMENT_MAX_AGE_MIN) ) @@ -510,7 +507,7 @@ def _emit( pass return { - "ran_at": _iso(now_dt), + "ran_at": to_iso_utc(now_dt), "opened": opened, "still_open": skipped, "closed": closed, diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/timeline.py b/addons/thread-observability/app/src/thread_observability/pipeline/timeline.py index c4e425f..eef7e53 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/timeline.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/timeline.py @@ -20,7 +20,8 @@ from typing import Any, Iterable -from ..storage.sqlite_store import SQLiteStore, _utc_now +from ..storage.sqlite_store import SQLiteStore +from ..utils.datetime import utc_now_iso # Canonical sources callers can ask for; "all" means union of everything. @@ -56,7 +57,7 @@ def query_timeline( tables are read at all. ``limit`` caps the final merged list. """ limit = max(1, min(int(limit), 5000)) - upper = until or _utc_now() + upper = until or utc_now_iso() kind_set = set(kinds) if kinds else None source_set = set(sources) if sources else None diff --git a/addons/thread-observability/app/src/thread_observability/services/assessment/scheduler.py b/addons/thread-observability/app/src/thread_observability/services/assessment/scheduler.py index 6a1e9fa..52bb7d7 100644 --- a/addons/thread-observability/app/src/thread_observability/services/assessment/scheduler.py +++ b/addons/thread-observability/app/src/thread_observability/services/assessment/scheduler.py @@ -124,18 +124,6 @@ class SchedulerDecision: budget_exhausted: bool = False -def _utc_now() -> datetime: - return utc_now() - - -def _iso(dt: datetime) -> str: - return to_iso_utc(dt) - - -def _parse(ts: str | None) -> datetime | None: - return parse_iso_datetime(ts) - - class AssessmentScheduler: """Pure logic + persistence wrapper. No I/O beyond the store.""" @@ -155,7 +143,7 @@ def snapshot(self, *, now: datetime | None = None) -> ScheduleSnapshot: row = self._store.get_assessment_schedule() if row is None: row = self._initialize(now=now) - self._roll_budget_if_needed(row, now=now or _utc_now()) + self._roll_budget_if_needed(row, now=now or utc_now()) return self._row_to_snapshot(row) def decide( @@ -169,7 +157,7 @@ def decide( ``force=True`` honors the daily budget but ignores cadence. If the feature is disabled, returns ``should_run=False`` regardless. """ - now = now or _utc_now() + now = now or utc_now() row = self._store.get_assessment_schedule() or self._initialize(now=now) self._roll_budget_if_needed(row, now=now) @@ -200,7 +188,7 @@ def decide( next_run_at=row.get("next_assessment_at"), ) - next_at = _parse(row.get("next_assessment_at")) + next_at = parse_iso_datetime(row.get("next_assessment_at")) if next_at is None or now >= next_at: return SchedulerDecision( should_run=True, @@ -222,7 +210,7 @@ def record_assessment( now: datetime | None = None, ) -> ScheduleSnapshot: """Update the state machine after an assessment ran.""" - now = now or _utc_now() + now = now or utc_now() row = self._store.get_assessment_schedule() or self._initialize(now=now) self._roll_budget_if_needed(row, now=now) @@ -250,16 +238,16 @@ def record_assessment( next_assessment_at = now + timedelta(seconds=current_interval) budget_used = int(row.get("budget_calls_used") or 0) + 1 - state_since = row.get("state_since") or _iso(now) + state_since = row.get("state_since") or to_iso_utc(now) if state != row.get("state"): - state_since = _iso(now) + state_since = to_iso_utc(now) updated = self._store.upsert_assessment_schedule( { "state": state, "state_since": state_since, - "last_assessment_at": _iso(now), - "next_assessment_at": _iso(next_assessment_at), + "last_assessment_at": to_iso_utc(now), + "next_assessment_at": to_iso_utc(next_assessment_at), "consecutive_ok": consecutive_ok, "consecutive_concern": consecutive_concern, "current_interval_seconds": current_interval, @@ -272,15 +260,15 @@ def record_assessment( def note_user_engaged(self, *, now: datetime | None = None) -> ScheduleSnapshot: """User opened the chat drawer / started triage — bump to engaged.""" - now = now or _utc_now() + now = now or utc_now() row = self._store.get_assessment_schedule() or self._initialize(now=now) interval = self.config.engaged_interval_minutes * 60 updated = self._store.upsert_assessment_schedule( { "state": "engaged", - "state_since": _iso(now), + "state_since": to_iso_utc(now), "current_interval_seconds": interval, - "next_assessment_at": _iso(now + timedelta(seconds=interval)), + "next_assessment_at": to_iso_utc(now + timedelta(seconds=interval)), "reason": "user_engaged", "budget_calls_used": row.get("budget_calls_used", 0), "budget_window_start_at": row.get("budget_window_start_at"), @@ -291,16 +279,16 @@ def note_user_engaged(self, *, now: datetime | None = None) -> ScheduleSnapshot: def set_enabled(self, enabled: bool, *, now: datetime | None = None) -> ScheduleSnapshot: """Runtime enable/disable (e.g., from the switch entity).""" self.config = replace(self.config, enabled=enabled) - now = now or _utc_now() + now = now or utc_now() row = self._store.get_assessment_schedule() or self._initialize(now=now) if enabled: interval = self.config.probation_interval_minutes * 60 updated = self._store.upsert_assessment_schedule( { "state": "probation", - "state_since": _iso(now), + "state_since": to_iso_utc(now), "current_interval_seconds": interval, - "next_assessment_at": _iso(now + timedelta(seconds=interval)), + "next_assessment_at": to_iso_utc(now + timedelta(seconds=interval)), "consecutive_ok": 0, "consecutive_concern": 0, "reason": "enabled_by_user", @@ -310,7 +298,7 @@ def set_enabled(self, enabled: bool, *, now: datetime | None = None) -> Schedule updated = self._store.upsert_assessment_schedule( { "state": "disabled", - "state_since": _iso(now), + "state_since": to_iso_utc(now), "next_assessment_at": None, "reason": "disabled_by_user", } @@ -320,39 +308,39 @@ def set_enabled(self, enabled: bool, *, now: datetime | None = None) -> Schedule # ----- internals -------------------------------------------------- def _initialize(self, *, now: datetime | None = None) -> dict[str, Any]: - now = now or _utc_now() + now = now or utc_now() interval = self.config.probation_interval_minutes * 60 state: SchedulerState = "probation" if self.config.enabled else "disabled" return self._store.upsert_assessment_schedule( { "state": state, - "state_since": _iso(now), + "state_since": to_iso_utc(now), "current_interval_seconds": interval, - "next_assessment_at": _iso(now + timedelta(seconds=interval)) + "next_assessment_at": to_iso_utc(now + timedelta(seconds=interval)) if self.config.enabled else None, "consecutive_ok": 0, "consecutive_concern": 0, "budget_calls_used": 0, - "budget_window_start_at": _iso(now), + "budget_window_start_at": to_iso_utc(now), "reason": "initial", } ) def _roll_budget_if_needed(self, row: dict[str, Any], *, now: datetime) -> None: """Reset budget at UTC-midnight rollover.""" - window_start = _parse(row.get("budget_window_start_at")) or now + window_start = parse_iso_datetime(row.get("budget_window_start_at")) or now if window_start.date() != now.date(): self._store.upsert_assessment_schedule( { "budget_calls_used": 0, - "budget_window_start_at": _iso( + "budget_window_start_at": to_iso_utc( now.replace(hour=0, minute=0, second=0, microsecond=0) ), } ) row["budget_calls_used"] = 0 - row["budget_window_start_at"] = _iso( + row["budget_window_start_at"] = to_iso_utc( now.replace(hour=0, minute=0, second=0, microsecond=0) ) @@ -399,7 +387,7 @@ def _on_concern( def _row_to_snapshot(self, row: dict[str, Any]) -> ScheduleSnapshot: return ScheduleSnapshot( state=row.get("state", "probation"), - state_since=row.get("state_since") or _iso(_utc_now()), + state_since=row.get("state_since") or to_iso_utc(utc_now()), last_assessment_at=row.get("last_assessment_at"), next_assessment_at=row.get("next_assessment_at"), current_interval_seconds=int(row.get("current_interval_seconds") or 900), @@ -407,7 +395,7 @@ def _row_to_snapshot(self, row: dict[str, Any]) -> ScheduleSnapshot: consecutive_concern=int(row.get("consecutive_concern") or 0), budget_calls_used=int(row.get("budget_calls_used") or 0), budget_window_start_at=row.get("budget_window_start_at") - or _iso(_utc_now()), + or to_iso_utc(utc_now()), daily_budget_calls=self.config.daily_budget_calls, reason=row.get("reason"), enabled=self.config.enabled and row.get("state") != "disabled", diff --git a/addons/thread-observability/app/src/thread_observability/storage/sqlite_store.py b/addons/thread-observability/app/src/thread_observability/storage/sqlite_store.py index fa2e489..eb862f7 100644 --- a/addons/thread-observability/app/src/thread_observability/storage/sqlite_store.py +++ b/addons/thread-observability/app/src/thread_observability/storage/sqlite_store.py @@ -681,10 +681,6 @@ ] -def _utc_now() -> str: - return utc_now_iso() - - class SQLiteStore: """Thin wrapper around the on-disk SQLite database.""" @@ -718,7 +714,7 @@ def _migrate(self) -> None: self._conn.executescript(sql) self._conn.execute( "INSERT OR REPLACE INTO schema_version(version, applied_at) VALUES (?, ?)", - (idx, _utc_now()), + (idx, utc_now_iso()), ) @property @@ -751,7 +747,7 @@ def insert_event( lqi: int | None = None, payload: dict[str, Any] | None = None, ) -> int: - ts = ts or _utc_now() + ts = ts or utc_now_iso() payload_json = json.dumps(payload) if payload is not None else None with self._tx() as conn: cur = conn.execute( @@ -898,7 +894,7 @@ def upsert_node_metadata( product_id: int | None = None, serial_number: str | None = None, ) -> None: - now = _utc_now() + now = utc_now_iso() # Keep legacy `area` column populated with the resolved name so older # readers continue to work. legacy_area = area if area is not None else area_name @@ -1116,7 +1112,7 @@ def set_node_diagnostics( rx_err_no_frame_count, rx_err_sec_count, rx_err_fcs_count, network_name, extended_pan_id, - _utc_now(), eui64, + utc_now_iso(), eui64, ), ) return cur.rowcount > 0 @@ -1189,7 +1185,7 @@ def insert_otbr_diagnostic( ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( - target_eui64, target_rloc16, _utc_now(), partition_id, + target_eui64, target_rloc16, utc_now_iso(), partition_id, mac_tx_total, mac_tx_retry, mac_tx_err, mac_rx_total, mac_rx_err, mac_rx_dup, json.dumps(mle_counters) if mle_counters is not None else None, @@ -1242,7 +1238,7 @@ def insert_observer_event( ``start``, ``stop``, ``restart``, ``outage``. ``started_at`` defaults to ``now()``. Returns the new row id. """ - ts = started_at or _utc_now() + ts = started_at or utc_now_iso() with self._tx() as conn: cur = conn.execute( """ @@ -1261,7 +1257,7 @@ def close_observer_event( self, event_id: int, *, ended_at: str | None = None ) -> bool: """Stamp ``ended_at`` on an open event window.""" - ts = ended_at or _utc_now() + ts = ended_at or utc_now_iso() with self._tx() as conn: cur = conn.execute( "UPDATE observer_events SET ended_at = ?" @@ -1293,7 +1289,7 @@ def list_observer_events_in_window( reasoner to find blackouts that overlap an issue's trigger window. """ - upper = until or _utc_now() + upper = until or utc_now_iso() with self._lock: rows = self._conn.execute( """ @@ -1334,7 +1330,7 @@ def insert_topology_snapshot( normalized snapshot content so the capture stage can skip writing rows when nothing has changed. """ - ts = captured_at or _utc_now() + ts = captured_at or utc_now_iso() # Pull denormalized summary columns from the snapshot dict so # ``list_topology_snapshots`` doesn't have to parse JSON. node_count = int(snapshot.get("node_count") or len(snapshot.get("nodes") or [])) @@ -1474,11 +1470,13 @@ def record_pipeline_tick(self, tick: dict[str, Any]) -> int: finished_at = tick.get("finished_at") def _iso(v: Any) -> str: + # Intentionally broader than ``to_iso_utc``: pipeline state can + # carry epoch seconds, ISO strings, or missing timestamps. if isinstance(v, (int, float)): return datetime.fromtimestamp(v, tz=UTC).isoformat() if isinstance(v, str): return v - return _utc_now() + return utc_now_iso() try: db_size_bytes = int(self.db_path.stat().st_size) @@ -1546,7 +1544,7 @@ def record_counter_sample( cleaned = {k: v for k, v in counters.items() if v is not None} if not cleaned: return False - ts = observed_at or _utc_now() + ts = observed_at or utc_now_iso() payload = json.dumps(cleaned, separators=(",", ":"), sort_keys=True) with self._tx() as conn: cur = conn.execute( @@ -1763,7 +1761,7 @@ def _j(v: Any) -> str | None: network_name, channel, channel_mask, mesh_local_prefix, _j(on_mesh_prefixes), _j(external_routes), _j(services), _j(br_servers), - active_timestamp, _utc_now(), + active_timestamp, utc_now_iso(), ), ) @@ -1816,7 +1814,7 @@ def bump_last_referenced(self, euis: Iterable[str]) -> int: Returns the number of rows that were actually touched. """ - ts = _utc_now() + ts = utc_now_iso() n = 0 with self._tx() as conn: for eui in euis: @@ -1843,7 +1841,7 @@ def apply_availability( Returns ``{applied, skipped}``. """ - ts = _utc_now() + ts = utc_now_iso() applied = 0 skipped = 0 with self._tx() as conn: @@ -1946,7 +1944,7 @@ def recompute_node_statuses( Returns ``{state: count}`` summary plus ``changed`` (number of rows whose status flipped this call). """ - now = _utc_now() + now = utc_now_iso() offline_cutoff = ( datetime.now(tz=UTC) - timedelta(seconds=offline_seconds) ).isoformat() @@ -2160,7 +2158,7 @@ def replace_links_for_reporter( race — a second concurrent sweep on the same reporter would otherwise destroy the evidence before we read it. """ - now = observed_at or _utc_now() + now = observed_at or utc_now_iso() inserted = 0 with self._tx() as conn: # Snapshot the previous neighbour set + frame counters BEFORE @@ -2657,7 +2655,7 @@ def open_issue( ``eui64`` already exists, that issue's id is returned and the evidence is merged via REPLACE (last-write-wins). """ - now = _utc_now() + now = utc_now_iso() evidence_json = json.dumps(evidence) if evidence is not None else None with self._tx() as conn: if dedupe: @@ -2687,7 +2685,7 @@ def close_issue(self, issue_id: int) -> bool: with self._tx() as conn: cur = conn.execute( "UPDATE issues SET closed_at = ? WHERE id = ? AND closed_at IS NULL", - (_utc_now(), issue_id), + (utc_now_iso(), issue_id), ) return cur.rowcount > 0 @@ -2875,7 +2873,7 @@ def list_issues_in_window( unified timeline to synthesize open/close events for a node or for the whole mesh. """ - upper = until or _utc_now() + upper = until or utc_now_iso() clauses = ["opened_at <= ?", "(closed_at IS NULL OR closed_at >= ?)"] params: list[Any] = [upper, since] if eui64: @@ -2916,7 +2914,7 @@ def upsert_assessment_schedule(self, fields: dict[str, Any]) -> dict[str, Any]: ``fields`` is merged into the existing row (if any). ``updated_at`` is always set to now. Returns the resulting row. """ - now = _utc_now() + now = utc_now_iso() merged = { "state": "probation", "state_since": now, @@ -2980,7 +2978,7 @@ def upsert_assessment_finding( bump ``last_seen_at`` + ``seen_count`` and take the max confidence. Otherwise insert a new row. """ - now = _utc_now() + now = utc_now_iso() ev_json = json.dumps(evidence or []) with self._tx() as conn: existing = conn.execute( @@ -3082,7 +3080,7 @@ def clear_assessment_findings_by_key( cleared_by: str = "assessment", ) -> int: """Mark all open rows with this key as cleared. Returns count affected.""" - now = _utc_now() + now = utc_now_iso() with self._tx() as conn: cur = conn.execute( "UPDATE assessment_findings" @@ -3129,7 +3127,7 @@ def record_assessment_run( cleared_count: int = 0, model_name: str | None = None, ) -> dict[str, Any]: - assessed_at = _utc_now() + assessed_at = utc_now_iso() with self._tx() as conn: cur = conn.execute( """ @@ -3179,7 +3177,7 @@ def list_assessment_runs( def is_finding_key_suppressed(self, finding_key: str, *, at: str | None = None) -> bool: """Return True if any dismissed row with the same key has suppress_until > now.""" - ts = at or _utc_now() + ts = at or utc_now_iso() with self._lock: row = self._conn.execute( "SELECT 1 FROM assessment_findings" @@ -3198,7 +3196,7 @@ def record_assessment_feedback( finding_type: str | None = None, notes: str | None = None, ) -> dict[str, Any]: - now = _utc_now() + now = utc_now_iso() with self._tx() as conn: conn.execute( """ diff --git a/addons/thread-observability/app/tests/test_utils_helpers.py b/addons/thread-observability/app/tests/test_utils_helpers.py new file mode 100644 index 0000000..97c9dd1 --- /dev/null +++ b/addons/thread-observability/app/tests/test_utils_helpers.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +from thread_observability.utils.coercion import ( + coerce_int, + first_present_field, + to_tristate_int, +) +from thread_observability.utils.datetime import parse_iso_datetime, to_iso_utc + + +def test_parse_iso_datetime_normalizes_z_and_naive_values() -> None: + assert parse_iso_datetime("2026-05-12T10:00:00Z") == datetime(2026, 5, 12, 10, 0, 0, tzinfo=UTC) + assert parse_iso_datetime("2026-05-12T10:00:00") == datetime(2026, 5, 12, 10, 0, 0, tzinfo=UTC) + + +def test_parse_iso_datetime_converts_offsets_to_utc() -> None: + assert parse_iso_datetime("2026-05-12T12:30:00+02:00") == datetime(2026, 5, 12, 10, 30, 0, tzinfo=UTC) + + +def test_to_iso_utc_normalizes_timezone() -> None: + value = datetime(2026, 5, 12, 12, 30, 0, tzinfo=UTC) + timedelta(hours=2) + assert to_iso_utc(value) == "2026-05-12T14:30:00+00:00" + + +def test_coerce_int_supports_hex_and_blank_string_handling() -> None: + assert coerce_int("0x10", allow_strings=True) == 16 + assert coerce_int(" ", allow_strings=True) is None + assert coerce_int("10") is None + + +def test_first_present_field_checks_integer_key_before_aliases() -> None: + payload = {"0": "by-id", "ExtAddress": "by-name"} + assert first_present_field(payload, "ExtAddress", int_key=0) == "by-id" + + +def test_to_tristate_int_coerces_truthy_and_falsey_values() -> None: + assert to_tristate_int(True) == 1 + assert to_tristate_int(0) == 0 + assert to_tristate_int(None) is None