diff --git a/dlclivegui/cameras/backends/basler_backend.py b/dlclivegui/cameras/backends/basler_backend.py index 8e7b0e1..30a5ea0 100644 --- a/dlclivegui/cameras/backends/basler_backend.py +++ b/dlclivegui/cameras/backends/basler_backend.py @@ -9,10 +9,21 @@ import numpy as np +from ...config import CameraTriggerSettings from ..base import CameraBackend, SupportLevel, register_backend LOG = logging.getLogger(__name__) + +# NOTE @C-Achard: This could be added in settings eventually +# Forces pypylon to create N emulation virtual cameras, +# mostly for testing. This should not be enabled for release. +ENABLE_PYLON_EMU = True +if ENABLE_PYLON_EMU: + import os + + os.environ["PYLON_CAMEMU"] = "4" + try: # pragma: no cover - optional dependency from pypylon import pylon except Exception: # pragma: no cover - optional dependency @@ -25,6 +36,10 @@ class BaslerCameraBackend(CameraBackend): OPTIONS_KEY: ClassVar[str] = "basler" + # Keep RetrieveResult calls short enough that controller shutdown can stop + # worker threads promptly while waiting for external hardware triggers. + _MAX_HARDWARE_TRIGGER_RETRIEVE_TIMEOUT_MS: ClassVar[int] = 1000 + def __init__(self, settings): super().__init__(settings) @@ -33,6 +48,37 @@ def __init__(self, settings): # Optional fast-start hint for probe workers # (may skip StartGrabbing and converter setup for faster capability probing; not suitable for normal capture) self._fast_start: bool = bool(self.ns.get("fast_start", False)) + self._retrieve_timeout_ms: int = 100 # default; may be overridden by trigger settings + + # ---- Trigger settings ---- + raw_trigger = self.ns.get("trigger", self._props.get("trigger")) + raw_trigger_strict = isinstance(raw_trigger, dict) and bool(raw_trigger.get("strict", False)) + + try: + self._trigger = CameraTriggerSettings.from_any(raw_trigger) + except Exception as exc: + if raw_trigger_strict: + raise ValueError(f"Strict mode failure - Invalid Basler trigger configuration: {exc}") from exc + + LOG.warning( + "Invalid Basler trigger config; falling back to trigger role=off: %s. " + "Enable strict mode to force this to raise.", + exc, + ) + self._trigger = CameraTriggerSettings() + + trigger_timeout = self._positive_float(self._trigger_attr(self._trigger, "timeout", None)) + if trigger_timeout is not None: + # pypylon RetrieveResult timeout is milliseconds. + self._retrieve_timeout_ms = max(1, int(float(trigger_timeout) * 1000.0)) + else: + self._retrieve_timeout_ms = 100 + + if self.waits_for_hardware_trigger: + self._retrieve_timeout_ms = min( + self._retrieve_timeout_ms, + self._MAX_HARDWARE_TRIGGER_RETRIEVE_TIMEOUT_MS, + ) # Stable identity (serial-based). Prefer new namespace; fall back to legacy keys read-only. self._device_id: str | None = None @@ -95,6 +141,7 @@ def static_capabilities(cls) -> dict[str, SupportLevel]: "set_gain": SupportLevel.SUPPORTED, "device_discovery": SupportLevel.BEST_EFFORT, "stable_identity": SupportLevel.SUPPORTED, + "hardware_trigger": SupportLevel.BEST_EFFORT, } ) return caps @@ -314,6 +361,26 @@ def _positive_float(value) -> float | None: except Exception: return None + def trigger_once(self) -> None: + if self._camera is None: + raise RuntimeError("Basler camera not opened") + + # pypylon commonly exposes ExecuteSoftwareTrigger on InstantCamera. + method = getattr(self._camera, "ExecuteSoftwareTrigger", None) + if method is not None: + method() + return + + command = self._feature("TriggerSoftware") + if command is not None: + try: + command.Execute() + return + except Exception as exc: + raise RuntimeError(f"Failed to execute Basler software trigger: {exc}") from exc + + raise RuntimeError("Basler software trigger command is not available") + def open(self) -> None: if pylon is None: raise RuntimeError("pypylon is required for the Basler backend but is not installed") @@ -374,6 +441,19 @@ def open(self) -> None: except Exception: LOG.debug("Frame rate not writable or not supported", exc_info=True) + # ---------------------------- + # Trigger configuration + # ---------------------------- + self._debug_trigger_nodes(context="before configuration") + self._configure_trigger() + self._debug_trigger_nodes(context="after configuration") + + try: + ns = self._ensure_mutable_ns() + ns["trigger_actual"] = self._trigger_to_dict(self._trigger) + except Exception: + pass + # ---------------------------- # Read back actual values (telemetry for GUI / probe) # ---------------------------- @@ -443,6 +523,7 @@ def open(self) -> None: getattr(self.settings, "exposure", None), getattr(self.settings, "gain", None), ) + # ---------------------------- # Persist stable identity into namespace (migration-safe) # ---------------------------- @@ -464,8 +545,13 @@ def read(self) -> tuple[np.ndarray, float]: if self._converter is None: raise RuntimeError("Basler camera opened in fast-start probe mode; cannot read frames") try: - grab_result = self._camera.RetrieveResult(100, pylon.TimeoutHandling_ThrowException) + grab_result = self._camera.RetrieveResult( + int(getattr(self, "_retrieve_timeout_ms", 100)), + pylon.TimeoutHandling_ThrowException, + ) except Exception as exc: + if self.waits_for_hardware_trigger: + raise TimeoutError(f"Basler timeout while waiting for hardware trigger: {exc}") from exc raise RuntimeError("Failed to retrieve image from Basler camera.") from exc if not grab_result.GrabSucceeded(): grab_result.Release() @@ -494,7 +580,13 @@ def close(self) -> None: self._camera.StopGrabbing() except Exception: pass + if self._camera.IsOpen(): + try: + self._restore_trigger_idle() + except Exception: + pass + self._camera.Close() self._camera = None self._converter = None @@ -571,6 +663,365 @@ def _snap_to_node(value: int, node) -> int: return int(v) + @property + def waits_for_hardware_trigger(self) -> bool: + role = str(self._trigger_attr(getattr(self, "_trigger", None), "role", "off") or "off").lower() + return role in {"external", "follower"} + + @staticmethod + def _trigger_attr(trigger, name: str, default=None): + if isinstance(trigger, dict): + return trigger.get(name, default) + return getattr(trigger, name, default) + + @staticmethod + def _trigger_to_dict(trigger) -> dict: + if trigger is None: + return {} + if isinstance(trigger, dict): + return dict(trigger) + if hasattr(trigger, "model_dump"): + try: + return trigger.model_dump(exclude_none=True) + except Exception: + pass + return {} + + def _feature(self, name: str): + if self._camera is None: + return None + try: + return getattr(self._camera, name) + except Exception: + return None + + @staticmethod + def _feature_value(feature, default=None): + if feature is None: + return default + try: + return feature.GetValue() + except Exception: + return default + + @staticmethod + def _feature_symbolics(feature) -> list[str]: + if feature is None: + return [] + + for method_name in ("GetSymbolics", "GetEntries"): + try: + method = getattr(feature, method_name, None) + if method is None: + continue + + values = method() + out = [] + + for value in values: + try: + if hasattr(value, "GetSymbolic"): + out.append(str(value.GetSymbolic())) + else: + out.append(str(value)) + except Exception: + continue + + return [v for v in out if v] + except Exception: + continue + + return [] + + def _set_enum_feature(self, name: str, value: str, *, strict: bool = False) -> bool: + feature = self._feature(name) + + if feature is None: + if strict: + raise RuntimeError(f"Basler feature '{name}' is not available") + LOG.debug("Basler feature '%s' is not available; skipping", name) + return False + + symbolics = self._feature_symbolics(feature) + if symbolics and value not in symbolics: + if strict: + raise RuntimeError(f"Basler feature '{name}' does not support '{value}'. Available: {symbolics}") + LOG.warning("Basler feature '%s' does not support '%s'. Available: %s", name, value, symbolics) + return False + + try: + feature.SetValue(value) + return True + except Exception as exc: + if strict: + raise RuntimeError(f"Failed to set Basler feature '{name}' to '{value}': {exc}") from exc + LOG.warning("Failed to set Basler feature '%s' to '%s': %s", name, value, exc) + return False + + def _set_numeric_feature(self, name: str, value, *, strict: bool = False) -> bool: + feature = self._feature(name) + + if feature is None: + if strict: + raise RuntimeError(f"Basler feature '{name}' is not available") + LOG.debug("Basler feature '%s' is not available; skipping", name) + return False + + try: + feature.SetValue(value) + return True + except Exception as exc: + if strict: + raise RuntimeError(f"Failed to set Basler feature '{name}' to '{value}': {exc}") from exc + LOG.warning("Failed to set Basler feature '%s' to '%s': %s", name, value, exc) + return False + + def _debug_trigger_nodes(self, *, context: str = "") -> None: + names = ( + "TriggerSelector", + "TriggerMode", + "TriggerSource", + "TriggerActivation", + "TriggerDelay", + "TriggerDelayAbs", + "AcquisitionMode", + "LineSelector", + "LineMode", + "LineSource", + "LineInverter", + ) + + label = f"Basler trigger debug {context}".strip() + + for name in names: + feature = self._feature(name) + if feature is None: + continue + + value = self._feature_value(feature, None) + symbolics = self._feature_symbolics(feature) + + extras = [] + if symbolics: + extras.append(f"symbolics={symbolics}") + + for method_name in ("IsReadable", "IsWritable"): + try: + method = getattr(feature, method_name, None) + if method is not None: + extras.append(f"{method_name}={method()}") + except Exception: + pass + + LOG.debug("%s: %s=%r %s", label, name, value, " ".join(extras)) + + def _resolve_trigger_source(self, requested: str, *, strict: bool) -> tuple[str, bool]: + requested = str(requested or "auto").strip() + feature = self._feature("TriggerSource") + available = self._feature_symbolics(feature) + + if not available: + if strict: + raise RuntimeError("Basler feature 'TriggerSource' is not available or has no symbolics") + LOG.warning("Basler feature 'TriggerSource' is not available; disabling trigger input.") + return requested, False + + if requested in available: + return requested, True + + if requested.lower() == "auto": + for candidate in ("Line1", "Line2", "Line3", "Line4", "Line0", "Software", "Action1"): + if candidate in available: + LOG.info("Basler TriggerSource auto-selected '%s'. Available: %s", candidate, available) + return candidate, True + + LOG.warning("Could not auto-select a Basler TriggerSource. Available: %s", available) + return requested, False + + if strict: + raise RuntimeError(f"Basler feature 'TriggerSource' does not support '{requested}'. Available: {available}") + + LOG.warning("Basler TriggerSource '%s' is not available. Available: %s", requested, available) + return requested, False + + def _configure_trigger(self) -> None: + cfg = getattr(self, "_trigger", CameraTriggerSettings()) + self._trigger = cfg + role = str(self._trigger_attr(cfg, "role", "off") or "off").strip().lower() + strict = bool(self._trigger_attr(cfg, "strict", False)) + + if role in {"off", "disabled"}: + self._configure_trigger_off(strict=strict) + return + + if role in {"external", "follower"}: + self._configure_trigger_input(cfg, strict=strict) + return + + if role == "software": + self._configure_trigger_software(cfg, strict=strict) + return + + if role == "master": + self._configure_trigger_master(cfg, strict=strict) + return + + if strict: + raise RuntimeError(f"Unsupported Basler trigger role: {role!r}") + + LOG.warning("Unsupported Basler trigger role '%s'; disabling trigger.", role) + self._configure_trigger_off(strict=False) + + def _configure_trigger_off(self, *, strict: bool = False) -> None: + # Select FrameStart first when possible so TriggerMode=Off applies to + # the frame-start trigger path. + self._set_enum_feature("TriggerSelector", "FrameStart", strict=False) + self._set_enum_feature("TriggerMode", "Off", strict=strict) + + def _configure_trigger_input(self, cfg, *, strict: bool = False) -> None: + role = str(self._trigger_attr(cfg, "role", "external") or "external").strip().lower() + selector = str(self._trigger_attr(cfg, "selector", "FrameStart") or "FrameStart") + activation = str(self._trigger_attr(cfg, "activation", "RisingEdge") or "RisingEdge") + source = str(self._trigger_attr(cfg, "source", "auto") or "auto").strip() + delay = self._trigger_attr(cfg, "delay", None) + + # Disable trigger while changing trigger-related parameters. + self._set_enum_feature("TriggerMode", "Off", strict=False) + + selector_ok = self._set_enum_feature("TriggerSelector", selector, strict=strict) + + resolved_source, source_supported = self._resolve_trigger_source(source, strict=strict) + source_ok = False + if source_supported: + source_ok = self._set_enum_feature("TriggerSource", resolved_source, strict=strict) + + activation_ok = self._set_enum_feature("TriggerActivation", activation, strict=False) + + if delay is not None: + delay_value = float(delay) + if not self._set_numeric_feature("TriggerDelay", delay_value, strict=False): + self._set_numeric_feature("TriggerDelayAbs", delay_value, strict=False) + + self._set_enum_feature("AcquisitionMode", "Continuous", strict=False) + + if not selector_ok: + LOG.warning("Could not apply Basler TriggerSelector=%s; disabling trigger.", selector) + self._configure_trigger_off(strict=False) + self._trigger = CameraTriggerSettings() + return + + if not source_ok: + LOG.warning( + "Could not apply Basler TriggerSource=%s resolved=%s; disabling trigger.", + source, + resolved_source, + ) + self._configure_trigger_off(strict=False) + self._trigger = CameraTriggerSettings() + return + + if not self._set_enum_feature("TriggerMode", "On", strict=strict): + LOG.warning("Could not enable Basler TriggerMode=On; disabling trigger.") + self._configure_trigger_off(strict=False) + self._trigger = CameraTriggerSettings() + return + + LOG.info( + "Basler trigger input configured: role=%s selector=%s source=%s activation=%s " + "selector_ok=%s source_ok=%s activation_ok=%s", + role, + selector, + resolved_source, + activation, + selector_ok, + source_ok, + activation_ok, + ) + + def _configure_trigger_software(self, cfg, *, strict: bool = False) -> None: + selector = str(self._trigger_attr(cfg, "selector", "FrameStart") or "FrameStart") + delay = self._trigger_attr(cfg, "delay", None) + + self._set_enum_feature("TriggerMode", "Off", strict=False) + + selector_ok = self._set_enum_feature("TriggerSelector", selector, strict=strict) + source_ok = self._set_enum_feature("TriggerSource", "Software", strict=strict) + + if delay is not None: + delay_value = float(delay) + if not self._set_numeric_feature("TriggerDelay", delay_value, strict=False): + self._set_numeric_feature("TriggerDelayAbs", delay_value, strict=False) + + self._set_enum_feature("AcquisitionMode", "Continuous", strict=False) + + if not selector_ok or not source_ok: + LOG.warning( + "Could not configure Basler software trigger selector_ok=%s source_ok=%s; disabling trigger.", + selector_ok, + source_ok, + ) + self._configure_trigger_off(strict=False) + self._trigger = CameraTriggerSettings() + return + + if not self._set_enum_feature("TriggerMode", "On", strict=strict): + LOG.warning("Could not enable Basler software TriggerMode=On; disabling trigger.") + self._configure_trigger_off(strict=False) + self._trigger = CameraTriggerSettings() + return + + LOG.info("Basler software trigger configured: selector=%s source=Software", selector) + + def _configure_trigger_master(self, cfg, *, strict: bool = False) -> None: + output_line = str(self._trigger_attr(cfg, "output_line", "Line2") or "Line2") + output_source = str(self._trigger_attr(cfg, "output_source", "ExposureActive") or "ExposureActive") + + # Master camera should acquire freely. + self._configure_trigger_off(strict=False) + + selected = self._set_enum_feature("LineSelector", output_line, strict=strict) + if not selected: + msg = f"Could not select Basler output line '{output_line}'" + if strict: + raise RuntimeError(msg) + LOG.warning("%s; skipping master output configuration.", msg) + return + + mode_ok = self._set_enum_feature("LineMode", "Output", strict=strict) + source_ok = self._set_enum_feature("LineSource", output_source, strict=strict) + + if mode_ok and source_ok: + LOG.info( + "Basler trigger master configured via Line*: output_line=%s output_source=%s", + output_line, + output_source, + ) + return + + msg = ( + "Could not configure Basler trigger master output completely " + f"(LineMode ok={mode_ok}, LineSource ok={source_ok})." + ) + + if strict: + raise RuntimeError(msg) + + LOG.warning(msg) + + def _restore_trigger_idle(self) -> None: + role = str(self._trigger_attr(getattr(self, "_trigger", None), "role", "off") or "off").lower() + + try: + if role in {"external", "follower", "software"}: + self._set_enum_feature("TriggerMode", "Off", strict=False) + + elif role == "master": + self._set_enum_feature("LineSource", "Off", strict=False) + self._set_enum_feature("LineMode", "Input", strict=False) + + except Exception: + LOG.debug("Best-effort Basler trigger restore failed", exc_info=True) + def _configure_resolution(self) -> None: """ Apply width/height only if explicitly requested (GUI or override). diff --git a/dlclivegui/gui/camera_config/camera_config_dialog.py b/dlclivegui/gui/camera_config/camera_config_dialog.py index 444f273..d8defb4 100644 --- a/dlclivegui/gui/camera_config/camera_config_dialog.py +++ b/dlclivegui/gui/camera_config/camera_config_dialog.py @@ -820,16 +820,16 @@ def _on_active_camera_selected(self, row: int) -> None: def _ensure_default_trigger_config(self, cam: CameraSettings) -> None: backend = (cam.backend or "").lower() - if backend != "gentl": + if backend not in {"gentl", "basler"}: return if not isinstance(cam.properties, dict): cam.properties = {} - ns = cam.properties.setdefault("gentl", {}) + ns = cam.properties.setdefault(backend, {}) if not isinstance(ns, dict): ns = {} - cam.properties["gentl"] = ns + cam.properties[backend] = ns ns.setdefault("trigger", CameraTriggerSettings().model_dump(exclude_none=True)) diff --git a/dlclivegui/gui/camera_config/trigger_config_dialog.py b/dlclivegui/gui/camera_config/trigger_config_dialog.py index c6f7814..bda9cae 100644 --- a/dlclivegui/gui/camera_config/trigger_config_dialog.py +++ b/dlclivegui/gui/camera_config/trigger_config_dialog.py @@ -1,6 +1,8 @@ # dlclivegui/gui/camera_config/trigger_config_dialog.py from __future__ import annotations +from dataclasses import dataclass + from PySide6.QtWidgets import ( QCheckBox, QComboBox, @@ -31,15 +33,92 @@ def _backend_namespace(cam: CameraSettings) -> dict: return ns +@dataclass(frozen=True) +class TriggerUiProfile: + supports_input: bool = True + supports_master: bool = False + supports_software: bool = False + + show_strobe_fields: bool = False + show_line_output_fields: bool = False + + source_suggestions: tuple[str, ...] = ("auto",) + default_source: str = "auto" + + default_output_line: str = "Line2" + default_output_source: str = "ExposureActive" + + help_text: str = "" + + +def trigger_ui_profile_for_backend(backend: str) -> TriggerUiProfile: + """Return GUI-only trigger presentation profile for a backend. + + This intentionally does not perform backend/runtime validation. + Backends still own actual GenICam/pypylon/Harvesters configuration. + """ + backend = (backend or "").lower() + + if backend == "gentl": + return TriggerUiProfile( + supports_input=True, + supports_master=True, + supports_software=False, + show_strobe_fields=True, + show_line_output_fields=True, + source_suggestions=("auto", "Line0", "Line1", "Line2", "Any", "Software"), + default_source="auto", + default_output_line="Line2", + default_output_source="ExposureActive", + help_text=( + "GenTL trigger support is best-effort and depends on the camera's GenICam nodes. " + "Some cameras expose generic Line* output nodes; TIS/DMK 37U cameras may expose Strobe* nodes." + ), + ) + + if backend == "basler": + return TriggerUiProfile( + supports_input=True, + supports_master=True, + supports_software=False, # enable later when controller supports trigger_once() + show_strobe_fields=False, + show_line_output_fields=True, + source_suggestions=("auto", "Line1", "Line2", "Line3", "Line4", "Software"), + default_source="auto", + default_output_line="Line2", + default_output_source="ExposureActive", + help_text=( + "Basler trigger support uses pylon camera features when available. " + "The available trigger sources and output lines depend on the camera model." + ), + ) + + return TriggerUiProfile( + supports_input=False, + supports_master=False, + supports_software=False, + show_strobe_fields=False, + show_line_output_fields=False, + source_suggestions=("auto",), + help_text="This backend does not expose trigger configuration.", + ) + + class TriggerConfigDialog(QDialog): - """Small dialog for editing per-camera hardware trigger settings.""" + """Dialog for editing per-camera trigger settings. + + The dialog is backend-aware only for presentation. + Actual trigger configuration remains backend-owned. + """ def __init__(self, cam: CameraSettings, parent: QWidget | None = None): super().__init__(parent) self.setWindowTitle("Configure trigger mode") - self.setMinimumWidth(420) + self.setMinimumWidth(460) self._cam = cam.model_copy(deep=True) + self._backend = (self._cam.backend or "").lower() + self._profile = trigger_ui_profile_for_backend(self._backend) ns = _backend_namespace(self._cam) try: @@ -58,71 +137,106 @@ def camera_settings(self) -> CameraSettings: def _setup_ui(self) -> None: root = QVBoxLayout(self) - info = QLabel( - "Configure hardware trigger settings for this camera.\n" - "Follower/external mode arms the camera and waits for electrical pulses on TRIGGER_IN.\n" - "Master mode enables STROBE_OUT pulses. For TIS/DMK 37U cameras this uses Strobe settings; " - "Line output settings are kept as a generic fallback.\n" + info_text = ( + "Configure per-camera trigger settings.\n" + "External/follower mode arms the camera and waits for trigger pulses on a selected input source.\n" + "Master mode configures an output signal if the backend/camera exposes compatible output-line features.\n" + "Some fields are backend- or camera-model-specific and may be ignored unless strict mode is enabled.\n" "In strict mode, unsupported trigger nodes fail camera open." ) - info.setWordWrap(True) - root.addWidget(info) + if self._profile.help_text: + info_text += f"\n\n{self._profile.help_text}" - group = QGroupBox("Hardware Trigger") - form = QFormLayout(group) + self.info_label = QLabel(info_text) + self.info_label.setWordWrap(True) + root.addWidget(self.info_label) + group_title = f"Trigger Settings ({self._backend or 'unknown'})" + group = QGroupBox(group_title) + self.form = QFormLayout(group) + + # ---------------------------- + # Role + # ---------------------------- self.role_combo = QComboBox() self.role_combo.addItem("Off / Free-run", "off") - self.role_combo.addItem("External trigger", "external") - self.role_combo.addItem("Follower", "follower") - self.role_combo.addItem("Master", "master") - form.addRow("Role:", self.role_combo) + if self._profile.supports_input: + self.role_combo.addItem("External trigger", "external") + self.role_combo.addItem("Follower", "follower") + + if self._profile.supports_master: + self.role_combo.addItem("Master output", "master") + + if self._profile.supports_software: + self.role_combo.addItem("Software trigger", "software") + + self.form.addRow("Role:", self.role_combo) + + # ---------------------------- + # Input trigger fields + # ---------------------------- self.selector_edit = QLineEdit() self.selector_edit.setPlaceholderText("FrameStart") - form.addRow("Trigger selector:", self.selector_edit) - - self.source_edit = QLineEdit() - self.source_edit.setPlaceholderText("auto, Line0, Software, ...") - form.addRow("Trigger source:", self.source_edit) + self.selector_edit.setToolTip("TriggerSelector value. Most area-scan cameras use FrameStart.") + self.form.addRow("Trigger selector:", self.selector_edit) + + self.source_combo = QComboBox() + self.source_combo.setEditable(True) + for value in self._profile.source_suggestions: + self.source_combo.addItem(value, value) + self.source_combo.setToolTip( + "TriggerSource value. Suggestions are backend defaults only; " + "the backend validates the actual camera-supported values when opening." + ) + if self.source_combo.lineEdit() is not None: + self.source_combo.lineEdit().setPlaceholderText("auto, Line1, Software, ...") + self.form.addRow("Trigger source:", self.source_combo) self.activation_combo = QComboBox() for value in ("RisingEdge", "FallingEdge", "AnyEdge", "LevelHigh", "LevelLow"): self.activation_combo.addItem(value, value) - form.addRow("Activation:", self.activation_combo) + self.activation_combo.setToolTip( + "TriggerActivation value. Some software/internal trigger sources may ignore this." + ) + self.form.addRow("Activation:", self.activation_combo) + # ---------------------------- + # Generic output line fields + # ---------------------------- self.output_line_edit = QLineEdit() - self.output_line_edit.setPlaceholderText("Line2") + self.output_line_edit.setPlaceholderText(self._profile.default_output_line) self.output_line_edit.setToolTip( - "Generic Line* output selector for cameras exposing LineSelector/LineSource. " - "Ignored by TIS/DMK 37U strobe-based output." + "Generic LineSelector value for cameras exposing LineSelector/LineSource. " + "Ignored if the backend/camera does not support generic line output." ) - form.addRow("Output line:", self.output_line_edit) + self.form.addRow("Output line:", self.output_line_edit) self.output_source_edit = QLineEdit() - self.output_source_edit.setPlaceholderText("ExposureActive") + self.output_source_edit.setPlaceholderText(self._profile.default_output_source) self.output_source_edit.setToolTip( - "Generic LineSource value for cameras exposing LineSource. " - "For TIS/DMK 37U cameras, use Strobe operation instead." + "Generic LineSource value for cameras exposing LineSource, e.g. ExposureActive." ) - form.addRow("Output source:", self.output_source_edit) + self.form.addRow("Output source:", self.output_source_edit) + # ---------------------------- + # Strobe fields, mainly useful for specific GenTL/TIS devices + # ---------------------------- self.strobe_polarity_combo = QComboBox() self.strobe_polarity_combo.addItem("Active high", "ActiveHigh") self.strobe_polarity_combo.addItem("Active low", "ActiveLow") self.strobe_polarity_combo.setToolTip( - "Polarity of STROBE_OUT. If the follower does not trigger, also try changing the follower activation edge." + "Strobe output polarity. Only used by backends/cameras exposing compatible Strobe* nodes." ) - form.addRow("Strobe polarity:", self.strobe_polarity_combo) + self.form.addRow("Strobe polarity:", self.strobe_polarity_combo) self.strobe_operation_combo = QComboBox() self.strobe_operation_combo.addItem("Exposure duration", "Exposure") self.strobe_operation_combo.addItem("Fixed duration", "FixedDuration") self.strobe_operation_combo.setToolTip( - "Exposure: strobe pulse length follows exposure time. " - "FixedDuration: strobe pulse length is set by Strobe duration." + "Strobe operation. Only used by backends/cameras exposing compatible Strobe* nodes." ) - form.addRow("Strobe operation:", self.strobe_operation_combo) + self.form.addRow("Strobe operation:", self.strobe_operation_combo) self.strobe_duration_spin = QSpinBox() self.strobe_duration_spin.setRange(0, 32767) @@ -130,33 +244,34 @@ def _setup_ui(self) -> None: self.strobe_duration_spin.setSuffix(" µs") self.strobe_duration_spin.setSpecialValueText("Default") self.strobe_duration_spin.setToolTip( - "Used only when Strobe operation is FixedDuration. 0 means backend/device default." + "Used only when strobe operation is FixedDuration. 0 means backend/device default." ) - form.addRow("Strobe duration:", self.strobe_duration_spin) + self.form.addRow("Strobe duration:", self.strobe_duration_spin) self.strobe_delay_spin = QSpinBox() self.strobe_delay_spin.setRange(0, 32767) self.strobe_delay_spin.setSingleStep(100) self.strobe_delay_spin.setSuffix(" µs") self.strobe_delay_spin.setSpecialValueText("Default") - self.strobe_delay_spin.setToolTip( - "Delay between start of exposure and STROBE_OUT pulse. 0 means no delay/device default." - ) - form.addRow("Strobe delay:", self.strobe_delay_spin) + self.strobe_delay_spin.setToolTip("Delay before strobe output. 0 means no explicit delay/device default.") + self.form.addRow("Strobe delay:", self.strobe_delay_spin) + # ---------------------------- + # Common options + # ---------------------------- self.timeout_spin = QDoubleSpinBox() self.timeout_spin.setRange(0.0, 3600.0) self.timeout_spin.setDecimals(3) self.timeout_spin.setSingleStep(0.1) self.timeout_spin.setSpecialValueText("Default") self.timeout_spin.setToolTip( - "Fetch poll timeout in seconds. The backend may cap individual fetches to keep preview shutdown responsive." + "Read/fetch timeout in seconds. The backend may cap individual waits to keep preview shutdown responsive." ) - form.addRow("Read timeout:", self.timeout_spin) + self.form.addRow("Read timeout:", self.timeout_spin) self.strict_checkbox = QCheckBox("Strict mode") - self.strict_checkbox.setToolTip("If enabled, missing/unsupported GenICam trigger nodes fail camera open.") - form.addRow(self.strict_checkbox) + self.strict_checkbox.setToolTip("If enabled, missing/unsupported trigger features fail camera open.") + self.form.addRow(self.strict_checkbox) root.addWidget(group) @@ -168,20 +283,86 @@ def _setup_ui(self) -> None: self.role_combo.currentIndexChanged.connect(self._sync_role_ui) self.strobe_operation_combo.currentIndexChanged.connect(self._sync_role_ui) + # Hide backend-irrelevant rows immediately. + self._apply_profile_visibility() + + # ------------------------------------------------------------------ + # UI helpers + # ------------------------------------------------------------------ + + def _set_form_row_visible(self, widget: QWidget, visible: bool) -> None: + """Hide/show a QFormLayout field and its label.""" + widget.setVisible(visible) + try: + label = self.form.labelForField(widget) + if label is not None: + label.setVisible(visible) + except Exception: + pass + + def _set_combo_text(self, combo: QComboBox, text: str) -> None: + text = str(text or "") + idx = combo.findText(text) + if idx >= 0: + combo.setCurrentIndex(idx) + else: + combo.setCurrentText(text) + + def _combo_text(self, combo: QComboBox, fallback: str) -> str: + text = str(combo.currentText() or "").strip() + return text or fallback + + def _apply_profile_visibility(self) -> None: + """Apply static backend-profile visibility. + + Role-specific enablement is handled separately by _sync_role_ui(). + """ + # Input trigger fields are only meaningful for input/software roles. + self._set_form_row_visible(self.selector_edit, self._profile.supports_input or self._profile.supports_software) + self._set_form_row_visible(self.source_combo, self._profile.supports_input or self._profile.supports_software) + self._set_form_row_visible( + self.activation_combo, + self._profile.supports_input, + ) + + # Output fields depend on backend presentation profile. + self._set_form_row_visible(self.output_line_edit, self._profile.show_line_output_fields) + self._set_form_row_visible(self.output_source_edit, self._profile.show_line_output_fields) + + # Strobe fields should not appear for Basler. + self._set_form_row_visible(self.strobe_polarity_combo, self._profile.show_strobe_fields) + self._set_form_row_visible(self.strobe_operation_combo, self._profile.show_strobe_fields) + self._set_form_row_visible(self.strobe_duration_spin, self._profile.show_strobe_fields) + self._set_form_row_visible(self.strobe_delay_spin, self._profile.show_strobe_fields) + + # ------------------------------------------------------------------ + # Model <-> UI + # ------------------------------------------------------------------ + def _load_from_trigger(self, trigger: CameraTriggerSettings) -> None: role = str(getattr(trigger, "role", "off") or "off").lower() idx = self.role_combo.findData(role) self.role_combo.setCurrentIndex(idx if idx >= 0 else 0) self.selector_edit.setText(str(getattr(trigger, "selector", "FrameStart") or "FrameStart")) - self.source_edit.setText(str(getattr(trigger, "source", "auto") or "auto")) + + source = str(getattr(trigger, "source", self._profile.default_source) or self._profile.default_source) + self._set_combo_text(self.source_combo, source) activation = str(getattr(trigger, "activation", "RisingEdge") or "RisingEdge") idx = self.activation_combo.findData(activation) self.activation_combo.setCurrentIndex(idx if idx >= 0 else 0) - self.output_line_edit.setText(str(getattr(trigger, "output_line", "Line2") or "Line2")) - self.output_source_edit.setText(str(getattr(trigger, "output_source", "ExposureActive") or "ExposureActive")) + output_line = str( + getattr(trigger, "output_line", self._profile.default_output_line) or self._profile.default_output_line + ) + self.output_line_edit.setText(output_line) + + output_source = str( + getattr(trigger, "output_source", self._profile.default_output_source) + or self._profile.default_output_source + ) + self.output_source_edit.setText(output_source) strobe_polarity = str(getattr(trigger, "strobe_polarity", "ActiveHigh") or "ActiveHigh") idx = self.strobe_polarity_combo.findData(strobe_polarity) @@ -205,29 +386,34 @@ def _load_from_trigger(self, trigger: CameraTriggerSettings) -> None: def _sync_role_ui(self) -> None: role = str(self.role_combo.currentData() or "off") - input_enabled = role in {"external", "follower"} + input_enabled = role in {"external", "follower", "software"} + hw_input_enabled = role in {"external", "follower"} + output_enabled = role == "master" + # Input fields. self.selector_edit.setEnabled(input_enabled) - self.source_edit.setEnabled(input_enabled) - self.activation_combo.setEnabled(input_enabled) + self.source_combo.setEnabled(input_enabled) + self.activation_combo.setEnabled(hw_input_enabled) - output_enabled = role == "master" - # Generic Line* fallback fields. - self.output_line_edit.setEnabled(output_enabled) - self.output_source_edit.setEnabled(output_enabled) + # Generic Line* output fields. + line_output_active = output_enabled and self._profile.show_line_output_fields + self.output_line_edit.setEnabled(line_output_active) + self.output_source_edit.setEnabled(line_output_active) - # TIS/DMK 37U Strobe* fields. - self.strobe_polarity_combo.setEnabled(output_enabled) - self.strobe_operation_combo.setEnabled(output_enabled) + # Strobe fields. + strobe_active = output_enabled and self._profile.show_strobe_fields + self.strobe_polarity_combo.setEnabled(strobe_active) + self.strobe_operation_combo.setEnabled(strobe_active) fixed_duration = ( - output_enabled and str(self.strobe_operation_combo.currentData() or "Exposure") == "FixedDuration" + strobe_active and str(self.strobe_operation_combo.currentData() or "Exposure") == "FixedDuration" ) self.strobe_duration_spin.setEnabled(fixed_duration) - self.strobe_delay_spin.setEnabled(output_enabled) + self.strobe_delay_spin.setEnabled(strobe_active) - # Timeout is mostly useful for external/follower, but harmless for any role. - self.timeout_spin.setEnabled(role in {"external", "follower"}) + # Timeout is useful for trigger-waiting modes. Keep it available for + # software too if software support is later enabled. + self.timeout_spin.setEnabled(role in {"external", "follower", "software"}) def _accept(self) -> None: role = str(self.role_combo.currentData() or "off") @@ -235,34 +421,37 @@ def _accept(self) -> None: payload = { "role": role, "selector": self.selector_edit.text().strip() or "FrameStart", - "source": self.source_edit.text().strip() or "auto", + "source": self._combo_text(self.source_combo, self._profile.default_source), "activation": str(self.activation_combo.currentData() or "RisingEdge"), - # Generic/SFNC Line* fallback output settings. - "output_line": self.output_line_edit.text().strip() or "Line2", - "output_source": self.output_source_edit.text().strip() or "ExposureActive", - # Strobe output settings used by TIS/DMK 37U cameras. - "strobe_polarity": str(self.strobe_polarity_combo.currentData() or "ActiveHigh"), - "strobe_operation": str(self.strobe_operation_combo.currentData() or "Exposure"), + "output_line": self.output_line_edit.text().strip() or self._profile.default_output_line, + "output_source": self.output_source_edit.text().strip() or self._profile.default_output_source, "strict": bool(self.strict_checkbox.isChecked()), } timeout = float(self.timeout_spin.value()) - if role in {"external", "follower"} and timeout > 0: + if role in {"external", "follower", "software"} and timeout > 0: payload["timeout"] = timeout elif role == "off": - payload["timeout"] = None # ensure timeout is cleared when disabling trigger - strobe_duration = int(self.strobe_duration_spin.value()) - if role == "master" and strobe_duration > 0: - payload["strobe_duration"] = strobe_duration + payload["timeout"] = None + + # Only include strobe-specific settings for profiles that expose them. + # This avoids cluttering Basler trigger configs with TIS-specific fields. + if self._profile.show_strobe_fields: + payload["strobe_polarity"] = str(self.strobe_polarity_combo.currentData() or "ActiveHigh") + payload["strobe_operation"] = str(self.strobe_operation_combo.currentData() or "Exposure") + + strobe_duration = int(self.strobe_duration_spin.value()) + if role == "master" and strobe_duration > 0: + payload["strobe_duration"] = strobe_duration - strobe_delay = int(self.strobe_delay_spin.value()) - if role == "master" and strobe_delay > 0: - payload["strobe_delay"] = strobe_delay + strobe_delay = int(self.strobe_delay_spin.value()) + if role == "master" and strobe_delay > 0: + payload["strobe_delay"] = strobe_delay try: trigger = CameraTriggerSettings.from_any(payload) - except Exception as e: - QMessageBox.critical(self, "Error", f"Failed to apply trigger settings: {e}") + except Exception as exc: + QMessageBox.critical(self, "Error", f"Failed to apply trigger settings: {exc}") return ns = _backend_namespace(self._cam) diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index 6fddba6..37e4bf4 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -9,10 +9,6 @@ import time from pathlib import Path -# NOTE @C-Achard: his could be added in settings eventually -# Forces pypylon to create 2 emulation virtual cameras, -# mostly for testing. This shold not be enabled for release. -# os.environ["PYLON_CAMEMU"] = "2" import cv2 import numpy as np from PySide6.QtCore import QRect, QSettings, Qt, QTimer, QUrl diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index c88fbda..4d8bf9c 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -4,6 +4,7 @@ import copy import logging +import time from dataclasses import dataclass from functools import partial from threading import Event, Lock @@ -56,6 +57,10 @@ def __init__(self, camera_id: str, settings: CameraSettings): self._retry_delay = 0.1 self._trigger_timeout_delay = 0.05 + self._trigger_wait_log_interval = 2.0 + self._last_trigger_wait_log = 0.0 + self._trigger_wait_suppressed_count = 0 + # Performance logs self._timing = WorkerTimingStats( camera_id, logger=LOGGER, log_interval=1.0, enabled=SINGLE_CAMERA_WORKER_DO_LOG_TIMING @@ -126,11 +131,7 @@ def run(self) -> None: # "no trigger pulse arrived during this poll interval". # This is expected and should not count as a camera failure. if bool(getattr(self._backend, "waits_for_hardware_trigger", False)): - LOGGER.debug( - "[Worker %s] waiting for hardware trigger: %s", - self._camera_id, - exc, - ) + self._log_trigger_wait_throttled(exc) consecutive_errors = 0 if self._stop_event.wait(self._trigger_timeout_delay): @@ -169,6 +170,36 @@ def run(self) -> None: def stop(self) -> None: self._stop_event.set() + def _log_trigger_wait_throttled(self, exc: BaseException) -> None: + """Log hardware-trigger wait timeouts at a controlled rate. + + In trigger-waiting modes, read timeouts are expected polling misses. + Without throttling, the log can be flooded at ~10-20 messages/sec/camera. + """ + now = time.monotonic() + + if now - self._last_trigger_wait_log < self._trigger_wait_log_interval: + self._trigger_wait_suppressed_count += 1 + return + + suppressed = self._trigger_wait_suppressed_count + self._trigger_wait_suppressed_count = 0 + self._last_trigger_wait_log = now + + if suppressed: + LOGGER.debug( + "[Worker %s] waiting for hardware trigger: %s (suppressed %d repeated timeout logs)", + self._camera_id, + exc, + suppressed, + ) + else: + LOGGER.debug( + "[Worker %s] waiting for hardware trigger: %s", + self._camera_id, + exc, + ) + def get_display_id(settings: CameraSettings) -> str: return f"{settings.backend}:{settings.index}" @@ -516,7 +547,6 @@ def _on_frame_captured(self, camera_id: str, frame: np.ndarray, timestamp: float display_ids=dict(self._display_ids), ) - if frame_data is not None: with timing.measure("Multi.emit.frame_ready"): self.frame_ready.emit(frame_data) diff --git a/tests/cameras/backends/conftest.py b/tests/cameras/backends/conftest.py index f0a0b6b..dfec64f 100644 --- a/tests/cameras/backends/conftest.py +++ b/tests/cameras/backends/conftest.py @@ -381,32 +381,115 @@ def _make(buffers): # ----------------------------------------------------------------------------- +class FakePylonTimeoutException(RuntimeError): + pass + + class FakePylon: - """Minimal fake for 'from pypylon import pylon' usage in basler_backend.""" + """Fake for 'from pypylon import pylon' used by BaslerCameraBackend.""" - # Constants used by Basler backend GrabStrategy_LatestImageOnly = 1 TimeoutHandling_ThrowException = 1 - PixelType_BGR8packed = 0x02180014 # arbitrary token + PixelType_BGR8packed = 0x02180014 OutputBitAlignment_MsbAligned = 1 + class _EnumEntry: + def __init__(self, symbolic: str): + self._symbolic = symbolic + + def GetSymbolic(self): + return self._symbolic + class _Feature: - def __init__(self, value=0): + def __init__( + self, + value=0, + *, + symbolics: list[str] | None = None, + minimum=None, + maximum=None, + increment=1, + writable=True, + readable=True, + ): self._value = value + self._symbolics = list(symbolics or []) + self._min = minimum + self._max = maximum + self._inc = increment + self._writable = writable + self._readable = readable + self.set_calls: list[object] = [] def SetValue(self, v): + if not self._writable: + raise RuntimeError("feature is not writable") + if self._symbolics and v not in self._symbolics: + raise RuntimeError(f"unsupported symbolic {v!r}; available={self._symbolics}") self._value = v + self.set_calls.append(v) def GetValue(self): + if not self._readable: + raise RuntimeError("feature is not readable") return self._value + def GetSymbolics(self): + return list(self._symbolics) + + def GetEntries(self): + return [FakePylon._EnumEntry(s) for s in self._symbolics] + + def IsWritable(self): + return bool(self._writable) + + def IsReadable(self): + return bool(self._readable) + + def GetMin(self): + if self._min is None: + raise RuntimeError("no min") + return self._min + + def GetMax(self): + if self._max is None: + raise RuntimeError("no max") + return self._max + + def GetInc(self): + return self._inc + class _DeviceInfo: - def __init__(self, serial: str): + def __init__( + self, + serial: str, + *, + vendor: str = "Basler", + model: str = "FakeBasler", + friendly: str | None = None, + full_name: str | None = None, + ): self._serial = serial + self._vendor = vendor + self._model = model + self._friendly = friendly or f"{vendor} {model} ({serial})" + self._full_name = full_name or f"FakeFullName-{serial}" def GetSerialNumber(self): return self._serial + def GetVendorName(self): + return self._vendor + + def GetModelName(self): + return self._model + + def GetFriendlyName(self): + return self._friendly + + def GetFullName(self): + return self._full_name + class _Device: def __init__(self, info): self.info = info @@ -433,12 +516,13 @@ class _GrabResult: def __init__(self, ok=True, array=None): self._ok = ok self._array = array + self.released = False def GrabSucceeded(self): return bool(self._ok) def Release(self): - return None + self.released = True class InstantCamera: def __init__(self, device): @@ -446,36 +530,106 @@ def __init__(self, device): self._open = False self._grabbing = False - # Feature nodes the backend uses + self.retrieve_calls: list[int] = [] + self.start_calls = 0 + self.stop_calls = 0 + self.close_calls = 0 + self.software_trigger_calls = 0 + self._software_trigger_pending = 0 + + # General camera controls. + self.ExposureAuto = FakePylon._Feature("Off", symbolics=["Off", "Once", "Continuous"]) self.ExposureTime = FakePylon._Feature(1000.0) + self.GainAuto = FakePylon._Feature("Off", symbolics=["Off", "Once", "Continuous"]) self.Gain = FakePylon._Feature(0.0) - self.Width = FakePylon._Feature(1920) - self.Height = FakePylon._Feature(1080) + + self.Width = FakePylon._Feature(1920, minimum=64, maximum=4096, increment=2) + self.Height = FakePylon._Feature(1080, minimum=64, maximum=4096, increment=2) self.AcquisitionFrameRateEnable = FakePylon._Feature(False) self.AcquisitionFrameRate = FakePylon._Feature(30.0) + self.MaxNumBuffer = FakePylon._Feature(10) + + # Basler/pypylon trigger features. + self.AcquisitionMode = FakePylon._Feature("Continuous", symbolics=["Continuous", "SingleFrame"]) + self.TriggerSelector = FakePylon._Feature("FrameStart", symbolics=["FrameStart"]) + self.TriggerMode = FakePylon._Feature("Off", symbolics=["Off", "On"]) + self.TriggerSource = FakePylon._Feature( + "Software", + symbolics=[ + "Software", + "Line1", + "Line2", + "Line3", + "PeriodicSignal1", + "Action1", + ], + ) + self.TriggerActivation = FakePylon._Feature( + "RisingEdge", + symbolics=["RisingEdge", "FallingEdge", "AnyEdge", "LevelHigh", "LevelLow"], + ) + self.TriggerDelay = FakePylon._Feature(0.0) + + # Generic output line features. + self.LineSelector = FakePylon._Feature("Line1", symbolics=["Line1", "Line2", "Line3"]) + self.LineMode = FakePylon._Feature("Input", symbolics=["Input", "Output"]) + self.LineSource = FakePylon._Feature( + "Off", + symbolics=["Off", "ExposureActive", "AcquisitionActive"], + ) + self.LineInverter = FakePylon._Feature(False) + + # Test knobs. + self.allow_hardware_trigger_frame = False + self.force_failed_grab = False + def Open(self): self._open = True def Close(self): + self.close_calls += 1 self._open = False def IsOpen(self): return bool(self._open) def StartGrabbing(self, *_args, **_kwargs): + self.start_calls += 1 self._grabbing = True def StopGrabbing(self): + self.stop_calls += 1 self._grabbing = False def IsGrabbing(self): return bool(self._grabbing) - def RetrieveResult(self, *_args, **_kwargs): - # Always succeed with a small dummy image (BGR) - import numpy as np + def ExecuteSoftwareTrigger(self): + self.software_trigger_calls += 1 + self._software_trigger_pending += 1 + + def RetrieveResult(self, timeout_ms, *_args, **_kwargs): + self.retrieve_calls.append(int(timeout_ms)) + + if not self._grabbing: + raise FakePylonTimeoutException("Grab timed out: acquisition not started") + + if self.force_failed_grab: + return FakePylon._GrabResult(ok=False, array=None) + + trigger_on = self.TriggerMode.GetValue() == "On" + source = self.TriggerSource.GetValue() + + if trigger_on: + if source == "Software": + if self._software_trigger_pending <= 0: + raise FakePylonTimeoutException("Grab timed out: waiting for software trigger") + self._software_trigger_pending -= 1 + else: + if not self.allow_hardware_trigger_frame: + raise FakePylonTimeoutException("Grab timed out: waiting for hardware trigger") frame = np.zeros((10, 10, 3), dtype=np.uint8) return FakePylon._GrabResult(ok=True, array=frame) @@ -498,25 +652,61 @@ def Convert(self, grab_result): @pytest.fixture() def fake_pylon_module(): - """ - Returns the FakePylon 'module' and resets singleton devices for isolation. - """ - # reset singleton factory so devices list resets per test + """Returns fake pylon module and resets fake device inventory.""" FakePylon.TlFactory._instance = None + factory = FakePylon.TlFactory.GetInstance() + factory._devices = [ + FakePylon._DeviceInfo("FAKE-BASLER-0"), + FakePylon._DeviceInfo("FAKE-BASLER-1"), + ] return FakePylon @pytest.fixture() def patch_basler_sdk(monkeypatch, fake_pylon_module): - """ - Patch Basler backend to behave as if pypylon is installed, using FakePylon. - """ + """Patch Basler backend to use FakePylon.""" import dlclivegui.cameras.backends.basler_backend as bb monkeypatch.setattr(bb, "pylon", fake_pylon_module, raising=False) return fake_pylon_module +@pytest.fixture() +def basler_settings_factory(): + from dlclivegui.config import CameraSettings + + def _make( + *, + index=0, + name="BaslerTestCam", + width=0, + height=0, + fps=0.0, + exposure=0, + gain=0.0, + enabled=True, + properties=None, + ): + props = properties if isinstance(properties, dict) else {} + props.setdefault("basler", {}) + props["basler"] = dict(props["basler"]) + + return CameraSettings( + name=name, + index=index, + backend="basler", + width=width, + height=height, + fps=fps, + exposure=exposure, + gain=gain, + enabled=enabled, + properties=props, + ) + + return _make + + # ----------------------------------------------------------------------------- # Fake GenTL / harvesters SDK (SDK-free) + fixtures for strict lifecycle tests # ----------------------------------------------------------------------------- diff --git a/tests/cameras/backends/test_basler_backend.py b/tests/cameras/backends/test_basler_backend.py new file mode 100644 index 0000000..64496b9 --- /dev/null +++ b/tests/cameras/backends/test_basler_backend.py @@ -0,0 +1,464 @@ +from __future__ import annotations + +import numpy as np +import pytest + +# --------------------------------------------------------------------- +# Core lifecycle +# --------------------------------------------------------------------- + + +def test_basler_open_starts_grabbing_and_read_returns_frame(patch_basler_sdk, basler_settings_factory): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory() + be = bb.BaslerCameraBackend(settings) + + be.open() + + assert be._camera is not None + assert be._camera.IsOpen() + assert be._camera.IsGrabbing() + assert be._converter is not None + + frame, ts = be.read() + assert isinstance(ts, float) + assert isinstance(frame, np.ndarray) + assert frame.shape == (10, 10, 3) + + be.close() + assert be._camera is None + assert be._converter is None + + +def test_basler_fast_start_does_not_start_grabbing_and_read_raises( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory(properties={"basler": {"fast_start": True}}) + be = bb.BaslerCameraBackend(settings) + + be.open() + + assert be._camera is not None + assert be._camera.IsOpen() + assert not be._camera.IsGrabbing() + assert be._converter is None + + with pytest.raises(RuntimeError, match="fast-start"): + be.read() + + be.close() + + +def test_basler_close_is_idempotent(patch_basler_sdk, basler_settings_factory): + import dlclivegui.cameras.backends.basler_backend as bb + + be = bb.BaslerCameraBackend(basler_settings_factory()) + be.open() + be.close() + be.close() + + +def test_basler_stop_before_open_and_after_close_is_safe(patch_basler_sdk, basler_settings_factory): + import dlclivegui.cameras.backends.basler_backend as bb + + be = bb.BaslerCameraBackend(basler_settings_factory()) + + be.stop() + + be.open() + be.stop() + + assert be._camera is not None + assert not be._camera.IsGrabbing() + + be.close() + be.stop() + + +def test_basler_read_before_open_raises_runtimeerror(patch_basler_sdk, basler_settings_factory): + import dlclivegui.cameras.backends.basler_backend as bb + + be = bb.BaslerCameraBackend(basler_settings_factory()) + + with pytest.raises(RuntimeError, match="not opened"): + be.read() + + +# --------------------------------------------------------------------- +# Discovery / identity / rebind +# --------------------------------------------------------------------- + + +def test_basler_discover_devices_returns_serial_identity_and_label( + patch_basler_sdk, +): + import dlclivegui.cameras.backends.basler_backend as bb + + cams = bb.BaslerCameraBackend.discover_devices(max_devices=10) + + assert len(cams) == 2 + assert cams[0].device_id == "FAKE-BASLER-0" + assert "Basler" in cams[0].label + assert "FAKE-BASLER-0" in cams[0].label + assert cams[0].path + + +def test_basler_quick_ping_true_for_existing_false_for_missing(patch_basler_sdk): + import dlclivegui.cameras.backends.basler_backend as bb + + assert bb.BaslerCameraBackend.quick_ping(0) is True + assert bb.BaslerCameraBackend.quick_ping(1) is True + assert bb.BaslerCameraBackend.quick_ping(2) is False + + +def test_basler_rebind_settings_uses_serial_device_id( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory( + index=0, + properties={"basler": {"device_id": "FAKE-BASLER-1"}}, + ) + + out = bb.BaslerCameraBackend.rebind_settings(settings) + + assert int(out.index) == 1 + ns = out.properties["basler"] + assert ns["device_id"] == "FAKE-BASLER-1" + assert ns["device_name"] + + +def test_basler_open_selects_device_id_and_persists_identity( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory( + index=0, + properties={"basler": {"device_id": "FAKE-BASLER-1"}}, + ) + + be = bb.BaslerCameraBackend(settings) + be.open() + + ns = settings.properties["basler"] + assert ns["device_id"] == "FAKE-BASLER-1" + assert ns["device_name"] + + be.close() + + +def test_basler_open_index_out_of_range_raises(patch_basler_sdk, basler_settings_factory): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory(index=99) + be = bb.BaslerCameraBackend(settings) + + with pytest.raises(RuntimeError, match="out of range"): + be.open() + + +# --------------------------------------------------------------------- +# Camera controls +# --------------------------------------------------------------------- + + +def test_basler_resolution_auto_does_not_modify_dimensions( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory(width=0, height=0) + be = bb.BaslerCameraBackend(settings) + + be.open() + + assert be._camera.Width.GetValue() == 1920 + assert be._camera.Height.GetValue() == 1080 + assert be.actual_resolution == (1920, 1080) + + be.close() + + +def test_basler_resolution_request_snaps_to_increment( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory(width=641, height=481) + be = bb.BaslerCameraBackend(settings) + + be.open() + + assert be._camera.Width.GetValue() == 640 + assert be._camera.Height.GetValue() == 480 + assert be.actual_resolution == (640, 480) + + be.close() + + +def test_basler_exposure_gain_fps_are_applied_when_nonzero( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory(exposure=20000, gain=2.5, fps=50.0) + be = bb.BaslerCameraBackend(settings) + + be.open() + + assert be._camera.ExposureAuto.GetValue() == "Off" + assert be._camera.ExposureTime.GetValue() == pytest.approx(20000.0) + assert be._camera.GainAuto.GetValue() == "Off" + assert be._camera.Gain.GetValue() == pytest.approx(2.5) + assert be._camera.AcquisitionFrameRateEnable.GetValue() is True + assert be._camera.AcquisitionFrameRate.GetValue() == pytest.approx(50.0) + + be.close() + + +# --------------------------------------------------------------------- +# Basler trigger behavior +# --------------------------------------------------------------------- + + +def test_basler_static_capabilities_advertises_hardware_trigger_best_effort( + patch_basler_sdk, +): + import dlclivegui.cameras.backends.basler_backend as bb + from dlclivegui.cameras.base import SupportLevel + + caps = bb.BaslerCameraBackend.static_capabilities() + assert caps["hardware_trigger"] == SupportLevel.BEST_EFFORT + + +def test_basler_default_trigger_is_off_and_free_runs( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory() + be = bb.BaslerCameraBackend(settings) + + be.open() + + assert be._camera.TriggerMode.GetValue() == "Off" + assert be.waits_for_hardware_trigger is False + + frame, _ = be.read() + assert frame.shape == (10, 10, 3) + + be.close() + + +def test_basler_follower_auto_selects_line1_and_times_out_waiting_for_trigger( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory( + properties={ + "basler": { + "trigger": { + "role": "follower", + "selector": "FrameStart", + "source": "auto", + "activation": "RisingEdge", + "timeout": 5.0, + "strict": False, + } + } + } + ) + + be = bb.BaslerCameraBackend(settings) + + # Timeout is configured in seconds but pypylon RetrieveResult uses ms; + # hardware-trigger waits should be capped for responsive shutdown. + assert be._retrieve_timeout_ms == 1000 + + be.open() + + assert be.waits_for_hardware_trigger is True + assert be._camera.TriggerSelector.GetValue() == "FrameStart" + assert be._camera.TriggerSource.GetValue() == "Line1" + assert be._camera.TriggerActivation.GetValue() == "RisingEdge" + assert be._camera.TriggerMode.GetValue() == "On" + + with pytest.raises(TimeoutError, match="waiting for hardware trigger"): + be.read() + + assert be._camera.retrieve_calls[-1] == 1000 + + be.close() + + +def test_basler_follower_strict_invalid_source_raises( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory( + properties={ + "basler": { + "trigger": { + "role": "follower", + "selector": "FrameStart", + "source": "NotARealSource", + "activation": "RisingEdge", + "strict": True, + } + } + } + ) + + be = bb.BaslerCameraBackend(settings) + + with pytest.raises(RuntimeError, match="TriggerSource"): + be.open() + + +def test_basler_follower_non_strict_invalid_source_disables_trigger( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory( + properties={ + "basler": { + "trigger": { + "role": "follower", + "source": "NotARealSource", + "strict": False, + } + } + } + ) + + be = bb.BaslerCameraBackend(settings) + be.open() + + assert be._camera.TriggerMode.GetValue() == "Off" + assert be.waits_for_hardware_trigger is False + + frame, _ = be.read() + assert frame.shape == (10, 10, 3) + + be.close() + + +def test_basler_master_configures_generic_line_output_and_restores_on_close( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory( + properties={ + "basler": { + "trigger": { + "role": "master", + "output_line": "Line2", + "output_source": "ExposureActive", + "strict": False, + } + } + } + ) + + be = bb.BaslerCameraBackend(settings) + be.open() + + cam = be._camera + assert cam.LineSelector.GetValue() == "Line2" + assert cam.LineMode.GetValue() == "Output" + assert cam.LineSource.GetValue() == "ExposureActive" + assert be.waits_for_hardware_trigger is False + + be.close() + + # Local reference remains valid after backend clears self._camera. + assert cam.LineSource.GetValue() == "Off" + assert cam.LineMode.GetValue() == "Input" + + +@pytest.mark.xfail(reason="Software trigger support is not implemented yet.") +def test_basler_software_trigger_requires_trigger_once_before_read( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory( + properties={ + "basler": { + "trigger": { + "role": "software", + "selector": "FrameStart", + "strict": False, + } + } + } + ) + + be = bb.BaslerCameraBackend(settings) + be.open() + + assert be._camera.TriggerMode.GetValue() == "On" + assert be._camera.TriggerSource.GetValue() == "Software" + assert be.waits_for_hardware_trigger is False + + # No software trigger has been fired yet. + with pytest.raises(RuntimeError, match="Failed to retrieve image"): + be.read() + + be.trigger_once() + assert be._camera.software_trigger_calls == 1 + + frame, _ = be.read() + assert frame.shape == (10, 10, 3) + + be.close() + + +def test_basler_close_turns_input_trigger_off( + patch_basler_sdk, + basler_settings_factory, +): + import dlclivegui.cameras.backends.basler_backend as bb + + settings = basler_settings_factory( + properties={ + "basler": { + "trigger": { + "role": "external", + "source": "Line1", + "activation": "RisingEdge", + } + } + } + ) + + be = bb.BaslerCameraBackend(settings) + be.open() + + cam = be._camera + assert cam.TriggerMode.GetValue() == "On" + + be.close() + + assert cam.TriggerMode.GetValue() == "Off"