diff --git a/roboflow/adapters/vision_events_api.py b/roboflow/adapters/vision_events_api.py new file mode 100644 index 00000000..f43ad34a --- /dev/null +++ b/roboflow/adapters/vision_events_api.py @@ -0,0 +1,169 @@ +import json +import os +from typing import Any, Dict, List, Optional + +import requests +from requests_toolbelt.multipart.encoder import MultipartEncoder + +from roboflow.adapters.rfapi import RoboflowError +from roboflow.config import API_URL + +_BASE = f"{API_URL}/vision-events" + + +def _auth_headers(api_key: str) -> Dict[str, str]: + return {"Authorization": f"Bearer {api_key}"} + + +def write_event(api_key: str, event: Dict[str, Any]) -> dict: + """Create a single vision event. + + Args: + api_key: Roboflow API key. + event: Event payload dict (eventId, eventType, useCaseId, timestamp, etc.). + + Returns: + Parsed JSON response with ``eventId`` and ``created``. + + Raises: + RoboflowError: On non-201 response status codes. + """ + response = requests.post(_BASE, json=event, headers=_auth_headers(api_key)) + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() + + +def write_batch(api_key: str, events: List[Dict[str, Any]]) -> dict: + """Create multiple vision events in a single request. + + Args: + api_key: Roboflow API key. + events: List of event payload dicts (max 100 per the server). + + Returns: + Parsed JSON response with ``created`` count and ``eventIds``. + + Raises: + RoboflowError: On non-201 response status codes. + """ + response = requests.post( + f"{_BASE}/batch", + json={"events": events}, + headers=_auth_headers(api_key), + ) + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() + + +def query(api_key: str, query_params: Dict[str, Any]) -> dict: + """Query vision events with filters and pagination. + + Args: + api_key: Roboflow API key. + query_params: Query payload (useCaseId, eventType, startTime, endTime, + cursor, limit, customMetadataFilters, etc.). + + Returns: + Parsed JSON response with ``events``, ``nextCursor``, ``hasMore``, + and ``lookbackDays``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.post( + f"{_BASE}/query", + json=query_params, + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def list_use_cases(api_key: str, status: Optional[str] = None) -> dict: + """List all use cases for a workspace. + + Args: + api_key: Roboflow API key. + status: Optional status filter (default server-side: "active"). + + Returns: + Parsed JSON response with ``useCases`` list and ``lookbackDays``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + params: Dict[str, str] = {} + if status is not None: + params["status"] = status + response = requests.get( + f"{_BASE}/use-cases", + params=params, + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def get_custom_metadata_schema(api_key: str, use_case_id: str) -> dict: + """Get the custom metadata schema for a use case. + + Args: + api_key: Roboflow API key. + use_case_id: Use case identifier. + + Returns: + Parsed JSON response with ``fields`` mapping field names to their types. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.get( + f"{_BASE}/custom-metadata-schema/{use_case_id}", + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def upload_image( + api_key: str, + image_path: str, + name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> dict: + """Upload an image for use in vision events. + + Args: + api_key: Roboflow API key. + image_path: Local filesystem path to the image file. + name: Optional custom image name. + metadata: Optional flat dict of metadata to attach. + + Returns: + Parsed JSON response with ``sourceId`` (and optionally ``url``). + + Raises: + RoboflowError: On non-201 response status codes. + """ + filename = name or os.path.basename(image_path) + with open(image_path, "rb") as f: + fields: Dict[str, Any] = { + "file": (filename, f, "application/octet-stream"), + } + if name is not None: + fields["name"] = name + if metadata is not None: + fields["metadata"] = json.dumps(metadata) + m = MultipartEncoder(fields=fields) + headers = _auth_headers(api_key) + headers["Content-Type"] = m.content_type + response = requests.post(f"{_BASE}/upload", data=m, headers=headers) + + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() diff --git a/roboflow/cli/__init__.py b/roboflow/cli/__init__.py index 3b3d0c42..45d4cce8 100644 --- a/roboflow/cli/__init__.py +++ b/roboflow/cli/__init__.py @@ -185,6 +185,7 @@ def _walk(group: Any, prefix: str = "") -> None: from roboflow.cli.handlers.universe import universe_app # noqa: E402 from roboflow.cli.handlers.version import version_app # noqa: E402 from roboflow.cli.handlers.video import video_app # noqa: E402 +from roboflow.cli.handlers.vision_events import vision_events_app # noqa: E402 from roboflow.cli.handlers.workflow import workflow_app # noqa: E402 from roboflow.cli.handlers.workspace import workspace_app # noqa: E402 @@ -210,6 +211,7 @@ def _walk(group: Any, prefix: str = "") -> None: app.add_typer(universe_app, name="universe") app.add_typer(version_app, name="version") app.add_typer(video_app, name="video") +app.add_typer(vision_events_app, name="vision-events") app.add_typer(workflow_app, name="workflow") app.add_typer(workspace_app, name="workspace") diff --git a/roboflow/cli/handlers/vision_events.py b/roboflow/cli/handlers/vision_events.py new file mode 100644 index 00000000..497936c0 --- /dev/null +++ b/roboflow/cli/handlers/vision_events.py @@ -0,0 +1,308 @@ +"""Vision events commands: write, query, list use cases, and upload images.""" + +from __future__ import annotations + +from typing import Annotated, Optional + +import typer + +from roboflow.cli._compat import SortedGroup, ctx_to_args + +vision_events_app = typer.Typer( + help="Create, query, and manage vision events.", + cls=SortedGroup, + no_args_is_help=True, +) + + +def _resolve(args): # noqa: ANN001 + """Return api_key or call output_error and return None.""" + from roboflow.cli._resolver import resolve_ws_and_key + + resolved = resolve_ws_and_key(args) + if resolved is None: + return None + _ws, api_key = resolved + return api_key + + +# --------------------------------------------------------------------------- +# write +# --------------------------------------------------------------------------- + + +@vision_events_app.command("write") +def write( + ctx: typer.Context, + event: Annotated[str, typer.Argument(help="JSON string of the event payload")], +) -> None: + """Create a single vision event.""" + args = ctx_to_args(ctx, event=event) + _write(args) + + +def _write(args) -> None: # noqa: ANN001 + import json + + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + event = json.loads(args.event) + except (json.JSONDecodeError, TypeError) as exc: + output_error(args, f"Invalid JSON: {exc}", hint="Pass a valid JSON string.") + return + + try: + result = vision_events_api.write_event(api_key, event) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Created event {result.get('eventId', '')}") + + +# --------------------------------------------------------------------------- +# write-batch +# --------------------------------------------------------------------------- + + +@vision_events_app.command("write-batch") +def write_batch( + ctx: typer.Context, + events: Annotated[str, typer.Argument(help="JSON string of the events array")], +) -> None: + """Create multiple vision events in a single request.""" + args = ctx_to_args(ctx, events=events) + _write_batch(args) + + +def _write_batch(args) -> None: # noqa: ANN001 + import json + + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + events = json.loads(args.events) + except (json.JSONDecodeError, TypeError) as exc: + output_error(args, f"Invalid JSON: {exc}", hint="Pass a valid JSON array string.") + return + + try: + result = vision_events_api.write_batch(api_key, events) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Created {result.get('created', 0)} event(s)") + + +# --------------------------------------------------------------------------- +# query +# --------------------------------------------------------------------------- + + +@vision_events_app.command("query") +def query( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier to query")], + event_type: Annotated[Optional[str], typer.Option("-t", "--event-type", help="Filter by event type")] = None, + start_time: Annotated[Optional[str], typer.Option("--start", help="ISO 8601 start time")] = None, + end_time: Annotated[Optional[str], typer.Option("--end", help="ISO 8601 end time")] = None, + limit: Annotated[Optional[int], typer.Option("-l", "--limit", help="Max events to return")] = None, + cursor: Annotated[Optional[str], typer.Option("--cursor", help="Pagination cursor")] = None, +) -> None: + """Query vision events with filters and pagination.""" + args = ctx_to_args( + ctx, + use_case=use_case, + event_type=event_type, + start_time=start_time, + end_time=end_time, + limit=limit, + cursor=cursor, + ) + _query(args) + + +def _query(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + payload = {"useCaseId": args.use_case} + if args.event_type is not None: + payload["eventType"] = args.event_type + if args.start_time is not None: + payload["startTime"] = args.start_time + if args.end_time is not None: + payload["endTime"] = args.end_time + if args.limit is not None: + payload["limit"] = args.limit + if args.cursor is not None: + payload["cursor"] = args.cursor + + try: + result = vision_events_api.query(api_key, payload) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + events = result.get("events", []) + lines = [f"Found {len(events)} event(s)."] + for evt in events: + lines.append(f" {evt.get('eventId', '')} [{evt.get('eventType', '')}]") + if result.get("nextCursor"): + lines.append(f"\nNext page: --cursor {result['nextCursor']}") + + output(args, result, text="\n".join(lines)) + + +# --------------------------------------------------------------------------- +# use-cases +# --------------------------------------------------------------------------- + + +@vision_events_app.command("use-cases") +def use_cases( + ctx: typer.Context, + status: Annotated[Optional[str], typer.Option("-s", "--status", help="Filter by status (active, inactive)")] = None, +) -> None: + """List vision event use cases for the workspace.""" + args = ctx_to_args(ctx, status=status) + _use_cases(args) + + +def _use_cases(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.list_use_cases(api_key, status=args.status) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + items = result.get("useCases") or result.get("solutions", []) + lines = [f"{len(items)} use case(s):"] + for uc in items: + name = uc.get("name", uc.get("id", "")) + if uc.get("eventCount") is not None: + detail = f" ({uc['eventCount']} events)" + elif uc.get("status"): + detail = f" [{uc['status']}]" + else: + detail = "" + lines.append(f" {name}{detail}") + + output(args, result, text="\n".join(lines)) + + +# --------------------------------------------------------------------------- +# metadata-schema +# --------------------------------------------------------------------------- + + +@vision_events_app.command("metadata-schema") +def metadata_schema( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier")], +) -> None: + """Get the custom metadata schema for a use case.""" + args = ctx_to_args(ctx, use_case=use_case) + _metadata_schema(args) + + +def _metadata_schema(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.get_custom_metadata_schema(api_key, args.use_case) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + fields = result.get("fields", {}) + lines = [f"{len(fields)} field(s):"] + for name, info in fields.items(): + types = ", ".join(info.get("types", [])) + lines.append(f" {name} ({types})") + + output(args, result, text="\n".join(lines)) + + +# --------------------------------------------------------------------------- +# upload-image +# --------------------------------------------------------------------------- + + +@vision_events_app.command("upload-image") +def upload_image( + ctx: typer.Context, + image: Annotated[str, typer.Argument(help="Path to the image file")], + name: Annotated[Optional[str], typer.Option("-n", "--name", help="Custom image name")] = None, + metadata: Annotated[ + Optional[str], + typer.Option("-M", "--metadata", help='JSON string of metadata (e.g. \'{"camera_id":"cam001"}\')'), + ] = None, +) -> None: + """Upload an image for use in vision events.""" + args = ctx_to_args(ctx, image=image, name=name, metadata=metadata) + _upload_image(args) + + +def _upload_image(args) -> None: # noqa: ANN001 + import json + + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + parsed_metadata = json.loads(args.metadata) if args.metadata else None + except (json.JSONDecodeError, TypeError) as exc: + output_error(args, f"Invalid metadata JSON: {exc}", hint="Pass a valid JSON string.") + return + + try: + result = vision_events_api.upload_image( + api_key, + image_path=args.image, + name=args.name, + metadata=parsed_metadata, + ) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Uploaded image: sourceId={result.get('sourceId', '')}") diff --git a/roboflow/core/workspace.py b/roboflow/core/workspace.py index 698aa59d..398d56b4 100644 --- a/roboflow/core/workspace.py +++ b/roboflow/core/workspace.py @@ -13,7 +13,7 @@ from requests.exceptions import HTTPError from tqdm import tqdm -from roboflow.adapters import rfapi +from roboflow.adapters import rfapi, vision_events_api from roboflow.adapters.rfapi import AnnotationSaveError, ImageUploadError, RoboflowError from roboflow.config import API_URL, APP_URL, CLIP_FEATURIZE_URL, DEMO_KEYS from roboflow.core.project import Project @@ -938,6 +938,242 @@ def get_plan(self): return rfapi.get_plan_info(self.__api_key) + # --- Vision Events --- + + def write_vision_event(self, event: Dict[str, Any]) -> dict: + """Create a single vision event. + + The event dict is passed directly to the server with no client-side + validation, so new event types and fields work without an SDK update. + + Args: + event: Event payload containing at minimum ``eventId``, + ``eventType``, ``useCaseId``, and ``timestamp``. + + Returns: + Dict with ``eventId`` and ``created``. + + Example: + >>> ws = rf.workspace() + >>> ws.write_vision_event({ + ... "eventId": "evt-001", + ... "eventType": "quality_check", + ... "useCaseId": "manufacturing-qa", + ... "timestamp": "2024-01-15T10:30:00.000Z", + ... "eventData": {"result": "pass"}, + ... }) + """ + return vision_events_api.write_event( + api_key=self.__api_key, + event=event, + ) + + def write_vision_events_batch(self, events: List[Dict[str, Any]]) -> dict: + """Create multiple vision events in a single request. + + Args: + events: List of event payload dicts (server enforces max 100). + + Returns: + Dict with ``created`` count and ``eventIds`` list. + + Example: + >>> ws = rf.workspace() + >>> ws.write_vision_events_batch([ + ... {"eventId": "e1", "eventType": "custom", "useCaseId": "uc", "timestamp": "2024-01-15T10:00:00Z"}, + ... {"eventId": "e2", "eventType": "custom", "useCaseId": "uc", "timestamp": "2024-01-15T10:01:00Z"}, + ... ]) + """ + return vision_events_api.write_batch( + api_key=self.__api_key, + events=events, + ) + + def query_vision_events( + self, + use_case: str, + *, + event_type: Optional[str] = None, + event_types: Optional[List[str]] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + **filters: Any, + ) -> dict: + """Query vision events with filters and pagination. + + Common filter kwargs are passed through to the server as-is, + supporting ``deviceId``, ``streamId``, ``workflowId``, + ``customMetadataFilters``, ``eventFieldFilters``, etc. + + Args: + use_case: Use case identifier to query. + event_type: Filter by a single event type. + event_types: Filter by multiple event types. + start_time: ISO 8601 start time filter. + end_time: ISO 8601 end time filter. + limit: Maximum number of events to return. + cursor: Pagination cursor from a previous response. + **filters: Additional filter parameters passed to the API. + + Returns: + Dict with ``events``, ``nextCursor``, ``hasMore``, and ``lookbackDays``. + + Example: + >>> ws = rf.workspace() + >>> page = ws.query_vision_events("manufacturing-qa", event_type="quality_check", limit=50) + >>> for evt in page["events"]: + ... print(evt["eventId"]) + """ + payload: Dict[str, Any] = {"useCaseId": use_case} + if event_type is not None: + payload["eventType"] = event_type + if event_types is not None: + payload["eventTypes"] = event_types + if start_time is not None: + payload["startTime"] = start_time + if end_time is not None: + payload["endTime"] = end_time + if limit is not None: + payload["limit"] = limit + if cursor is not None: + payload["cursor"] = cursor + payload.update(filters) + + return vision_events_api.query( + api_key=self.__api_key, + query_params=payload, + ) + + def query_all_vision_events( + self, + use_case: str, + *, + event_type: Optional[str] = None, + event_types: Optional[List[str]] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + limit: Optional[int] = None, + **filters: Any, + ) -> Generator[List[dict], None, None]: + """Paginated query across vision events, yielding one page at a time. + + Automatically follows ``nextCursor`` until all matching events have + been returned. + + Args: + use_case: Use case identifier to query. + event_type: Filter by a single event type. + event_types: Filter by multiple event types. + start_time: ISO 8601 start time filter. + end_time: ISO 8601 end time filter. + limit: Maximum events per page. + **filters: Additional filter parameters passed to the API. + + Yields: + A list of event dicts for each page. + + Example: + >>> ws = rf.workspace() + >>> for page in ws.query_all_vision_events("manufacturing-qa"): + ... for evt in page: + ... print(evt["eventId"]) + """ + cursor = None + while True: + response = self.query_vision_events( + use_case, + event_type=event_type, + event_types=event_types, + start_time=start_time, + end_time=end_time, + limit=limit, + cursor=cursor, + **filters, + ) + events = response.get("events", []) + if not events: + break + yield events + cursor = response.get("nextCursor") + if not cursor or not response.get("hasMore", False): + break + + def list_vision_event_use_cases(self, status: Optional[str] = None) -> dict: + """List all vision event use cases for the workspace. + + Args: + status: Optional status filter (e.g. "active", "inactive"). + + Returns: + Dict with ``useCases`` list and ``lookbackDays``. + + Example: + >>> ws = rf.workspace() + >>> result = ws.list_vision_event_use_cases() + >>> for uc in result["useCases"]: + ... print(uc["name"], uc.get("status")) + """ + result = vision_events_api.list_use_cases( + api_key=self.__api_key, + status=status, + ) + if "useCases" not in result and "solutions" in result: + result["useCases"] = result["solutions"] + return result + + def get_vision_event_metadata_schema(self, use_case: str) -> dict: + """Get the custom metadata schema for a vision event use case. + + Returns discovered field names and their types, useful for building + queries with ``customMetadataFilters``. + + Args: + use_case: Use case identifier. + + Returns: + Dict with ``fields`` mapping field names to ``{"types": [...]}``. + + Example: + >>> ws = rf.workspace() + >>> schema = ws.get_vision_event_metadata_schema("manufacturing-qa") + >>> for field, info in schema["fields"].items(): + ... print(field, info["types"]) + """ + return vision_events_api.get_custom_metadata_schema( + api_key=self.__api_key, + use_case_id=use_case, + ) + + def upload_vision_event_image( + self, + image_path: str, + name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> dict: + """Upload an image for use in vision events. + + Args: + image_path: Local path to the image file. + name: Optional custom name for the image. + metadata: Optional flat dict of metadata to attach. + + Returns: + Dict with ``sourceId`` for referencing in events. + + Example: + >>> ws = rf.workspace() + >>> result = ws.upload_vision_event_image("photo.jpg") + >>> source_id = result["sourceId"] + """ + return vision_events_api.upload_image( + api_key=self.__api_key, + image_path=image_path, + name=name, + metadata=metadata, + ) + def __str__(self): projects = self.projects() json_value = {"name": self.name, "url": self.url, "projects": projects} diff --git a/tests/test_vision_events.py b/tests/test_vision_events.py new file mode 100644 index 00000000..6982083c --- /dev/null +++ b/tests/test_vision_events.py @@ -0,0 +1,438 @@ +import json +import os +import tempfile +import unittest + +import responses + +from roboflow.adapters.rfapi import RoboflowError +from roboflow.config import API_URL + +# The vision events API does not include workspace in the URL. +# Auth is via Bearer token; workspace is derived server-side from the API key. +_BASE = f"{API_URL}/vision-events" + + +class TestVisionEvents(unittest.TestCase): + API_KEY = "test_key" + WORKSPACE = "test-ws" + + def _make_workspace(self): + from roboflow.core.workspace import Workspace + + info = { + "workspace": { + "name": "Test", + "url": self.WORKSPACE, + "projects": [], + "members": [], + } + } + return Workspace(info, api_key=self.API_KEY, default_workspace=self.WORKSPACE, model_format="yolov8") + + def _assert_bearer_auth(self, call_index=0): + auth = responses.calls[call_index].request.headers.get("Authorization") + self.assertEqual(auth, f"Bearer {self.API_KEY}") + + # --- write_vision_event --- + + @responses.activate + def test_write_event(self): + responses.add(responses.POST, _BASE, json={"eventId": "evt-001"}, status=201) + + ws = self._make_workspace() + event = { + "eventId": "evt-001", + "eventType": "quality_check", + "useCaseId": "uc-1", + "timestamp": "2024-01-15T10:00:00Z", + "eventData": {"result": "pass"}, + } + result = ws.write_vision_event(event) + + self.assertEqual(result["eventId"], "evt-001") + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["eventId"], "evt-001") + self.assertEqual(sent["eventType"], "quality_check") + self.assertEqual(sent["useCaseId"], "uc-1") + self.assertEqual(sent["eventData"], {"result": "pass"}) + + @responses.activate + def test_write_event_passthrough(self): + """The event dict must be sent to the server unchanged (no filtering or transformation).""" + responses.add(responses.POST, _BASE, json={"eventId": "e1"}, status=201) + + ws = self._make_workspace() + event = { + "eventId": "e1", + "eventType": "safety_alert", + "useCaseId": "warehouse-safety", + "timestamp": "2024-06-01T12:00:00Z", + "deviceId": "cam-5", + "streamId": "stream-a", + "workflowId": "wf-1", + "images": [{"sourceId": "src-1", "label": "frame"}], + "eventData": {"alertType": "fire", "severity": "high"}, + "customMetadata": {"zone": "B3", "temperature": 42.5, "active": True}, + } + ws.write_vision_event(event) + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent, event) + + @responses.activate + def test_write_event_error(self): + responses.add(responses.POST, _BASE, json={"error": "forbidden"}, status=403) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.write_vision_event({"eventId": "x", "eventType": "custom", "useCaseId": "s", "timestamp": "t"}) + + # --- write_vision_events_batch --- + + @responses.activate + def test_write_batch(self): + responses.add(responses.POST, f"{_BASE}/batch", json={"created": 2, "eventIds": ["e1", "e2"]}, status=201) + + ws = self._make_workspace() + events = [ + {"eventId": "e1", "eventType": "custom", "useCaseId": "s", "timestamp": "2024-01-15T10:00:00Z"}, + {"eventId": "e2", "eventType": "custom", "useCaseId": "s", "timestamp": "2024-01-15T10:01:00Z"}, + ] + result = ws.write_vision_events_batch(events) + + self.assertEqual(result["created"], 2) + self.assertEqual(result["eventIds"], ["e1", "e2"]) + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(len(sent["events"]), 2) + + @responses.activate + def test_write_batch_error(self): + responses.add(responses.POST, f"{_BASE}/batch", json={"error": "validation"}, status=400) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.write_vision_events_batch([{"bad": "event"}]) + + # --- query_vision_events --- + + @responses.activate + def test_query_basic(self): + body = { + "events": [{"eventId": "e1"}, {"eventId": "e2"}], + "nextCursor": None, + "hasMore": False, + "lookbackDays": 14, + } + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + result = ws.query_vision_events("my-use-case") + + self.assertEqual(len(result["events"]), 2) + self.assertFalse(result["hasMore"]) + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["useCaseId"], "my-use-case") + + @responses.activate + def test_query_with_filters(self): + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + ws.query_vision_events( + "my-uc", + event_type="quality_check", + start_time="2024-01-01T00:00:00Z", + end_time="2024-02-01T00:00:00Z", + limit=10, + cursor="abc123", + deviceId={"operator": "eq", "value": "cam-01"}, + ) + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["useCaseId"], "my-uc") + self.assertEqual(sent["eventType"], "quality_check") + self.assertEqual(sent["startTime"], "2024-01-01T00:00:00Z") + self.assertEqual(sent["endTime"], "2024-02-01T00:00:00Z") + self.assertEqual(sent["limit"], 10) + self.assertEqual(sent["cursor"], "abc123") + self.assertEqual(sent["deviceId"], {"operator": "eq", "value": "cam-01"}) + + @responses.activate + def test_query_with_event_types_plural(self): + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + ws.query_vision_events("uc", event_types=["quality_check", "safety_alert"]) + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["eventTypes"], ["quality_check", "safety_alert"]) + self.assertNotIn("eventType", sent) + + @responses.activate + def test_query_omits_none_params(self): + """Optional params that are None must not appear in the payload.""" + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + ws.query_vision_events("uc") + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent, {"useCaseId": "uc"}) + + @responses.activate + def test_query_error(self): + responses.add(responses.POST, f"{_BASE}/query", json={"error": "unauthorized"}, status=401) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.query_vision_events("my-uc") + + # --- query_all_vision_events --- + + @responses.activate + def test_query_all_single_page(self): + body = { + "events": [{"eventId": "e1"}], + "nextCursor": None, + "hasMore": False, + "lookbackDays": 14, + } + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + pages = list(ws.query_all_vision_events("my-uc")) + + self.assertEqual(len(pages), 1) + self.assertEqual(pages[0][0]["eventId"], "e1") + + @responses.activate + def test_query_all_multiple_pages(self): + page1 = {"events": [{"eventId": "e1"}], "nextCursor": "cursor2", "hasMore": True, "lookbackDays": 14} + page2 = {"events": [{"eventId": "e2"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=page1, status=200) + responses.add(responses.POST, f"{_BASE}/query", json=page2, status=200) + + ws = self._make_workspace() + pages = list(ws.query_all_vision_events("my-uc")) + + self.assertEqual(len(pages), 2) + self.assertEqual(pages[0][0]["eventId"], "e1") + self.assertEqual(pages[1][0]["eventId"], "e2") + + # Verify cursor was sent in second request + sent2 = json.loads(responses.calls[1].request.body) + self.assertEqual(sent2["cursor"], "cursor2") + + @responses.activate + def test_query_all_forwards_filters(self): + """Filters must be forwarded to every page request, not just the first.""" + page1 = {"events": [{"eventId": "e1"}], "nextCursor": "c2", "hasMore": True, "lookbackDays": 14} + page2 = {"events": [{"eventId": "e2"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=page1, status=200) + responses.add(responses.POST, f"{_BASE}/query", json=page2, status=200) + + ws = self._make_workspace() + list(ws.query_all_vision_events("uc", event_type="quality_check", limit=1)) + + sent1 = json.loads(responses.calls[0].request.body) + sent2 = json.loads(responses.calls[1].request.body) + + # Both requests should have the filter + self.assertEqual(sent1["eventType"], "quality_check") + self.assertEqual(sent2["eventType"], "quality_check") + # Second request should also have the cursor + self.assertNotIn("cursor", sent1) + self.assertEqual(sent2["cursor"], "c2") + + @responses.activate + def test_query_all_empty(self): + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + pages = list(ws.query_all_vision_events("my-uc")) + + self.assertEqual(len(pages), 0) + + # --- list_vision_event_use_cases --- + + @responses.activate + def test_list_use_cases(self): + body = { + "useCases": [ + {"id": "uc-1", "name": "QA", "status": "active"}, + ], + "lookbackDays": 14, + } + responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200) + + ws = self._make_workspace() + result = ws.list_vision_event_use_cases() + + self.assertEqual(len(result["useCases"]), 1) + self.assertEqual(result["useCases"][0]["name"], "QA") + self._assert_bearer_auth() + + @responses.activate + def test_list_use_cases_with_status(self): + body = {"useCases": [], "lookbackDays": 14} + responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200) + + ws = self._make_workspace() + result = ws.list_vision_event_use_cases(status="inactive") + + self.assertEqual(len(result["useCases"]), 0) + # Verify status was sent as query param + self.assertIn("status=inactive", responses.calls[0].request.url) + + @responses.activate + def test_list_use_cases_legacy_solutions_response(self): + responses.add( + responses.GET, + f"{_BASE}/use-cases", + json={"solutions": [{"id": "uc-legacy", "name": "Legacy"}], "lookbackDays": 14}, + status=200, + ) + + ws = self._make_workspace() + result = ws.list_vision_event_use_cases() + self.assertEqual(result["useCases"][0]["id"], "uc-legacy") + + @responses.activate + def test_list_use_cases_error(self): + responses.add(responses.GET, f"{_BASE}/use-cases", json={"error": "forbidden"}, status=403) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.list_vision_event_use_cases() + + # --- get_vision_event_metadata_schema --- + + @responses.activate + def test_get_metadata_schema(self): + body = { + "useCaseId": "manufacturing-qa", + "fields": { + "temperature": {"types": ["number"]}, + "zone": {"types": ["string"]}, + "active": {"types": ["boolean"]}, + }, + } + responses.add( + responses.GET, + f"{_BASE}/custom-metadata-schema/manufacturing-qa", + json=body, + status=200, + ) + + ws = self._make_workspace() + result = ws.get_vision_event_metadata_schema("manufacturing-qa") + + self.assertEqual(len(result["fields"]), 3) + self.assertEqual(result["fields"]["temperature"]["types"], ["number"]) + self._assert_bearer_auth() + + @responses.activate + def test_get_metadata_schema_error(self): + responses.add( + responses.GET, + f"{_BASE}/custom-metadata-schema/nonexistent", + json={"error": "not found"}, + status=404, + ) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.get_vision_event_metadata_schema("nonexistent") + + # --- upload_vision_event_image --- + + @responses.activate + def test_upload_image(self): + responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-123"}, status=201) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(b"\xff\xd8\xff\xe0fake-jpeg-data") + tmp_path = f.name + + try: + result = ws.upload_vision_event_image(tmp_path) + self.assertEqual(result["sourceId"], "src-123") + self._assert_bearer_auth() + finally: + os.unlink(tmp_path) + + @responses.activate + def test_upload_image_uses_basename(self): + """When no name is provided, the multipart filename should be the basename of the path.""" + responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-789"}, status=201) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False, prefix="myimage_") as f: + f.write(b"\xff\xd8\xff\xe0fake") + tmp_path = f.name + + try: + ws.upload_vision_event_image(tmp_path) + request_body = responses.calls[0].request.body + basename = os.path.basename(tmp_path).encode() + if isinstance(request_body, bytes): + self.assertIn(basename, request_body) + finally: + os.unlink(tmp_path) + + @responses.activate + def test_upload_image_with_metadata(self): + responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-456"}, status=201) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: + f.write(b"\x89PNGfake-png-data") + tmp_path = f.name + + try: + result = ws.upload_vision_event_image( + tmp_path, + name="custom-name.png", + metadata={"camera_id": "cam-01"}, + ) + self.assertEqual(result["sourceId"], "src-456") + + request_body = responses.calls[0].request.body + # Verify metadata and name were included in the multipart body + if isinstance(request_body, bytes): + self.assertIn(b"cam-01", request_body) + self.assertIn(b"custom-name.png", request_body) + finally: + os.unlink(tmp_path) + + @responses.activate + def test_upload_image_error(self): + responses.add(responses.POST, f"{_BASE}/upload", json={"error": "forbidden"}, status=403) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(b"data") + tmp_path = f.name + + try: + with self.assertRaises(RoboflowError): + ws.upload_vision_event_image(tmp_path) + finally: + os.unlink(tmp_path) + + +if __name__ == "__main__": + unittest.main()