From a762a4723a1ba0e467cf5c96eae5a903451a5241 Mon Sep 17 00:00:00 2001 From: zghp <33546213+zghp@users.noreply.github.com> Date: Wed, 15 Apr 2026 09:26:43 +0100 Subject: [PATCH 1/2] add initial hdmianalyzer modules --- examples/configs/example_rack_config.yml | 4 + framework/core/deviceManager.py | 5 + framework/core/hdmiAnalyserController.py | 193 +++++++++++ .../core/hdmiAnalyserModules/__init__.py | 0 .../hdmiAnalyserInterface.py | 274 +++++++++++++++ framework/core/hdmiAnalyserModules/m42h.py | 328 ++++++++++++++++++ 6 files changed, 804 insertions(+) create mode 100644 framework/core/hdmiAnalyserController.py create mode 100644 framework/core/hdmiAnalyserModules/__init__.py create mode 100644 framework/core/hdmiAnalyserModules/hdmiAnalyserInterface.py create mode 100644 framework/core/hdmiAnalyserModules/m42h.py diff --git a/examples/configs/example_rack_config.yml b/examples/configs/example_rack_config.yml index 8e12cd2..e78f0ff 100644 --- a/examples/configs/example_rack_config.yml +++ b/examples/configs/example_rack_config.yml @@ -103,6 +103,10 @@ rackConfig: # [ avSyncController: optional] - Specifiec AVSyncController for the slot # supported types: # [type: "SyncOne2", port: "/dev/ttyACM0", extended_mode (optional): true|false, audio_input (optional): "AUTO|EXTERNAL|INTERNAL", speaker_distance (optional): "1.5"] + + # [ hdmiAnalyserController: optional ] - Specifies an HDMI Analyser/Generator for the slot + # supported types: + # [type: "m42h", host: "192.168.0.50", port (optional): 22, user (optional): "qd", passwd (optional): "qd", card (optional): 4 ] - pi2: ip: "192.168.99.1" description: "local pi4" diff --git a/framework/core/deviceManager.py b/framework/core/deviceManager.py index 5412183..fd89666 100644 --- a/framework/core/deviceManager.py +++ b/framework/core/deviceManager.py @@ -42,6 +42,7 @@ from framework.core.commonRemote import commonRemoteClass from framework.core.hdmiCECController import HDMICECController from framework.core.avSyncController import AVSyncController +from framework.core.hdmiAnalyserController import HDMIAnalyserController from framework.core.utilities import utilities dir_path = os.path.dirname(os.path.realpath(__file__)) @@ -159,6 +160,7 @@ def __init__(self, log:logModule, logPath:str, devices:dict): self.remoteController = None self.hdmiCECController = None self.avSyncController =None + self.hdmiAnalyserController = None self.session = None self.alive = False @@ -185,6 +187,9 @@ def __init__(self, log:logModule, logPath:str, devices:dict): config = device.get("avSyncController") if config != None: self.avSyncController = AVSyncController(log, config) + config = device.get("hdmiAnalyserController") + if config != None: + self.hdmiAnalyserController = HDMIAnalyserController(log, config) self.session = self.getConsoleSession() def getField(self, fieldName:str, itemsList:dict = None): diff --git a/framework/core/hdmiAnalyserController.py b/framework/core/hdmiAnalyserController.py new file mode 100644 index 0000000..26ce7a1 --- /dev/null +++ b/framework/core/hdmiAnalyserController.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 +#** ***************************************************************************** +# * +# * If not stated otherwise in this file or this component's LICENSE file the +# * following copyright and licenses apply: +# * +# * Copyright 2023 RDK Management +# * +# * Licensed under the Apache License, Version 2.0 (the "License"); +# * you may not use this file except in compliance with the License. +# * You may obtain a copy of the License at +# * +# * +# http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * +#* ****************************************************************************** +#* +#* ** Project : RAFT +#* ** @addtogroup : core +#* ** @date : 13/04/2026 +#* ** +#* ** @brief : HDMI Analyser controller facade +#* ** +#* ****************************************************************************** + +from framework.core.logModule import logModule +from framework.core.hdmiAnalyserModules.m42h import M42hController + + +class HDMIAnalyserController(): + """Facade controller for HDMI Analyser/Generator instruments. + + Selects a concrete backend based on the ``type`` key in *config* and + delegates all operations, following the same pattern used by + :class:`audioAmplifierController`, :class:`HDMICECController`, and + :class:`AVSyncController`. + + Supported types: + ``m42h`` – Teledyne LeCroy Quantumdata M42h 96G Video Analyser/Generator. + + Rack-config example:: + + hdmiAnalyserController: + type: "m42h" + host: "192.168.0.50" + port: 22 # optional SSH port + user: "qd" # optional, defaults to "qd" + passwd: "qd" # optional, defaults to "qd" + card: 4 # optional card number + """ + + def __init__(self, log: logModule, config: dict): + self._log = log + self.controllerType = config.get("type") + self.host = config.get("host") + + if self.controllerType == "m42h": + self.hdmiAnalyser = M42hController( + host=self.host, + port=config.get("port"), + user=config.get("user", "qd"), + passwd=config.get("passwd", "qd"), + card=config.get("card"), + ) + else: + raise ValueError( + f"Unsupported hdmiAnalyserController type: '{self.controllerType}'" + ) + + # ── Connection / lifecycle ────────────────────────────────────────── + + def connect(self): + self._log.info("Connecting to HDMI analyser") + self.hdmiAnalyser.connect() + + def disconnect(self): + self._log.info("Disconnecting from HDMI analyser") + self.hdmiAnalyser.disconnect() + + # ── Port selection ────────────────────────────────────────────────── + + def select_port(self, port): + self._log.info(f"Selecting HDMI analyser port: {port}") + self.hdmiAnalyser.select_port(port) + + # ── Hot-plug control ──────────────────────────────────────────────── + + def set_hpd(self, state: bool, duration: int = 100): + self._log.info(f"Setting HPD state: {state} duration: {duration}ms") + self.hdmiAnalyser.set_hpd(state, duration) + + # ── Generator (source) operations ─────────────────────────────────── + + def set_video_format(self, format_name: str, colour_space: str = None, + subsampling: str = None, bit_depth: int = None, + vic: int = None): + self._log.info(f"Setting video format: {format_name} " + f"cs={colour_space} ss={subsampling} " + f"depth={bit_depth} vic={vic}") + self.hdmiAnalyser.set_video_format( + format_name, colour_space, subsampling, bit_depth, vic + ) + + def set_hdr_mode(self, mode: str): + self._log.info(f"Setting HDR mode: {mode}") + self.hdmiAnalyser.set_hdr_mode(mode) + + def set_allm(self, enabled: bool): + self._log.info(f"Setting ALLM: {'enabled' if enabled else 'disabled'}") + self.hdmiAnalyser.set_allm(enabled) + + def set_vrr(self, enabled: bool, base_refresh_rate: int = None): + self._log.info(f"Setting VRR: {'enabled' if enabled else 'disabled'}" + f"{f' base_rate={base_refresh_rate}' if base_refresh_rate else ''}") + self.hdmiAnalyser.set_vrr(enabled, base_refresh_rate) + + def set_avi_content_type(self, content_type: str): + self._log.info(f"Setting AVI content type: {content_type}") + self.hdmiAnalyser.set_avi_content_type(content_type) + + def set_spd_info(self, vendor: str, description: str): + self._log.info(f"Setting SPD info: vendor={vendor}, desc={description}") + self.hdmiAnalyser.set_spd_info(vendor, description) + + def start_output(self): + self._log.info("Starting HDMI generator output") + self.hdmiAnalyser.start_output() + + def stop_output(self): + self._log.info("Stopping HDMI generator output") + self.hdmiAnalyser.stop_output() + + # ── EDID operations ───────────────────────────────────────────────── + + def get_edid(self, port: str = "") -> bytes: + self._log.info("Reading EDID") + return self.hdmiAnalyser.get_edid(port) + + def set_edid(self, edid_data: str, hp_duration_ms: int = 100): + self._log.info("Loading custom EDID") + self.hdmiAnalyser.set_edid(edid_data, hp_duration_ms) + + def restore_default_edid(self, port: str = ""): + self._log.info("Restoring default EDID") + self.hdmiAnalyser.restore_default_edid(port) + + # ── Analyser (sink) read-back ─────────────────────────────────────── + + def get_video_status(self) -> dict: + self._log.info("Getting video status from analyser") + return self.hdmiAnalyser.get_video_status() + + def get_audio_status(self) -> dict: + self._log.info("Getting audio status from analyser") + return self.hdmiAnalyser.get_audio_status() + + def get_hdcp_status(self) -> dict: + self._log.info("Getting HDCP status from analyser") + return self.hdmiAnalyser.get_hdcp_status() + + def get_link_status(self) -> dict: + self._log.info("Getting link status from analyser") + return self.hdmiAnalyser.get_link_status() + + def get_avi_info(self) -> dict: + self._log.info("Getting AVI InfoFrame from analyser") + return self.hdmiAnalyser.get_avi_info() + + def get_spd_info(self) -> dict: + self._log.info("Getting SPD InfoFrame from analyser") + return self.hdmiAnalyser.get_spd_info() + + def get_background_color(self) -> str: + self._log.info("Getting background colour from analyser") + return self.hdmiAnalyser.get_background_color() + + # ── HDCP control ──────────────────────────────────────────────────── + + def set_hdcp_mode(self, mode: str): + self._log.info(f"Setting HDCP mode: {mode}") + self.hdmiAnalyser.set_hdcp_mode(mode) + + # ── Snapshot / combined status ────────────────────────────────────── + + def snapshot(self) -> dict: + self._log.info("Taking analyser snapshot") + return self.hdmiAnalyser.snapshot() diff --git a/framework/core/hdmiAnalyserModules/__init__.py b/framework/core/hdmiAnalyserModules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/framework/core/hdmiAnalyserModules/hdmiAnalyserInterface.py b/framework/core/hdmiAnalyserModules/hdmiAnalyserInterface.py new file mode 100644 index 0000000..a5c6749 --- /dev/null +++ b/framework/core/hdmiAnalyserModules/hdmiAnalyserInterface.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +#** ***************************************************************************** +# * +# * If not stated otherwise in this file or this component's LICENSE file the +# * following copyright and licenses apply: +# * +# * Copyright 2023 RDK Management +# * +# * Licensed under the Apache License, Version 2.0 (the "License"); +# * you may not use this file except in compliance with the License. +# * You may obtain a copy of the License at +# * +# * +# http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * +#* ****************************************************************************** +#* +#* ** Project : RAFT +#* ** @addtogroup : core.hdmiAnalyserModules +#* ** @date : 13/04/2026 +#* ** +#* ** @brief : Abstract interface for HDMI Analyser controllers +#* ** +#* ****************************************************************************** + +from abc import ABC, abstractmethod + + +class HDMIAnalyserInterface(ABC): + """Abstract base class defining the interface for an HDMI Analyser/Generator controller. + + Implementations must provide methods for controlling the analyser in both + source (generator) and sink (analyser) modes. This covers capabilities + required by dsHdmiIn and dsVideoPort L3 test procedures. + """ + + # ── Connection / lifecycle ────────────────────────────────────────── + + @abstractmethod + def connect(self): + """Establish a connection to the analyser instrument.""" + pass + + @abstractmethod + def disconnect(self): + """Close the connection to the analyser instrument.""" + pass + + # ── Port selection ────────────────────────────────────────────────── + + @abstractmethod + def select_port(self, port): + """Select the active HDMI port on the analyser. + + Args: + port: Port number or identifier. + """ + pass + + # ── Hot-plug control ──────────────────────────────────────────────── + + @abstractmethod + def set_hpd(self, state: bool, duration: int = 100): + """Assert or de-assert the Hot Plug Detect signal. + + Args: + state: ``True`` to assert HPD (simulate connect), + ``False`` to de-assert (simulate disconnect). + duration: Hot-plug duration in milliseconds. + """ + pass + + # ── Generator (source) operations ─────────────────────────────────── + + @abstractmethod + def set_video_format(self, format_name: str, colour_space: str = None, + subsampling: str = None, bit_depth: int = None, + vic: int = None): + """Configure the video generator output format. + + Args: + format_name: Format name string (e.g. ``"1080p60"``, ``"2160p60"``). + colour_space: Colour space (e.g. ``"RGB"``, ``"YCbCr709"``). + subsampling: Sub-sampling (e.g. ``"444"``, ``"422"``, ``"420"``). + bit_depth: Bits per component (8, 10, 12). + vic: Video Identification Code (optional). + """ + pass + + @abstractmethod + def set_hdr_mode(self, mode: str): + """Set the HDR signalling mode on the generator output. + + Args: + mode: HDR standard (e.g. ``"SDR"``, ``"HDR10"``, ``"HLG"``, + ``"DolbyVision"``, ``"HDR10PLUS"``). + """ + pass + + @abstractmethod + def set_allm(self, enabled: bool): + """Enable or disable Auto Low Latency Mode on the generator. + + Args: + enabled: ``True`` to enable ALLM, ``False`` to disable. + """ + pass + + @abstractmethod + def set_vrr(self, enabled: bool, base_refresh_rate: int = None): + """Enable or disable Variable Refresh Rate on the generator. + + Args: + enabled: ``True`` to enable VRR, ``False`` to disable. + base_refresh_rate: Base refresh rate for VTEM (optional). + """ + pass + + @abstractmethod + def set_avi_content_type(self, content_type: str): + """Set the AVI InfoFrame content type on the generator. + + Args: + content_type: Content type (e.g. ``"Graphics"``, ``"Cinema"``, + ``"Photo"``, ``"Game"``). + """ + pass + + @abstractmethod + def set_spd_info(self, vendor: str, description: str): + """Set the Source Product Descriptor InfoFrame on the generator. + + Args: + vendor: Vendor name string. + description: Product description string. + """ + pass + + @abstractmethod + def start_output(self): + """Start the video generator output.""" + pass + + @abstractmethod + def stop_output(self): + """Stop the video generator output.""" + pass + + # ── EDID operations ───────────────────────────────────────────────── + + @abstractmethod + def get_edid(self, port: str = "") -> bytes: + """Read the EDID currently presented on the specified port. + + Args: + port: Port identifier. If empty, uses the currently selected port. + + Returns: + Raw EDID bytes. + """ + pass + + @abstractmethod + def set_edid(self, edid_data: str, hp_duration_ms: int = 100): + """Load a custom EDID onto the analyser sink port. + + Args: + edid_data: EDID data as hex string (e.g. ``"00FFFFFFFFFFFF00..."``). + hp_duration_ms: Hot-plug duration in milliseconds (0=no HP). + """ + pass + + @abstractmethod + def restore_default_edid(self, port: str = ""): + """Restore the factory-default EDID on a port. + + Args: + port: Port identifier. + """ + pass + + # ── Analyser (sink) read-back ─────────────────────────────────────── + + @abstractmethod + def get_video_status(self) -> dict: + """Read the current video status from the analyser input. + + Returns: + dict with video format parameters from the received signal. + """ + pass + + @abstractmethod + def get_audio_status(self) -> dict: + """Read the current audio status from the analyser input. + + Returns: + dict with keys: ``sampling``, ``bit_size``, ``channels``. + """ + pass + + @abstractmethod + def get_hdcp_status(self) -> dict: + """Read the current HDCP status from the analyser input. + + Returns: + dict with keys: ``status``, ``key``. + """ + pass + + @abstractmethod + def get_link_status(self) -> dict: + """Read the link status. + + Returns: + dict with link status information. + """ + pass + + @abstractmethod + def get_avi_info(self) -> dict: + """Read the AVI InfoFrame data from the analyser input. + + Returns: + dict with keys: ``valid``, ``info``, ``octets``. + """ + pass + + @abstractmethod + def get_spd_info(self) -> dict: + """Read the SPD InfoFrame data from the analyser input. + + Returns: + dict with keys: ``valid``, ``info``, ``octets``. + """ + pass + + @abstractmethod + def get_background_color(self) -> str: + """Read the current background colour from the analyser input. + + Returns: + Pixel colour string (e.g. ``"(R,G,B)"``). + """ + pass + + # ── HDCP control ──────────────────────────────────────────────────── + + @abstractmethod + def set_hdcp_mode(self, mode: str): + """Set the HDCP mode advertised by the analyser. + + Args: + mode: ``"none"``, ``"1.4"``, ``"2.3"``. + """ + pass + + # ── Snapshot / combined status ────────────────────────────────────── + + @abstractmethod + def snapshot(self) -> dict: + """Collect a combined status snapshot from the analyser. + + Returns: + dict containing ``video``, ``audio``, ``hdcp``, ``link``, + ``avi`` sub-dicts. + """ + pass diff --git a/framework/core/hdmiAnalyserModules/m42h.py b/framework/core/hdmiAnalyserModules/m42h.py new file mode 100644 index 0000000..feb9161 --- /dev/null +++ b/framework/core/hdmiAnalyserModules/m42h.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python3 +#** ***************************************************************************** +# * +# * If not stated otherwise in this file or this component's LICENSE file the +# * following copyright and licenses apply: +# * +# * Copyright 2023 RDK Management +# * +# * Licensed under the Apache License, Version 2.0 (the "License"); +# * you may not use this file except in compliance with the License. +# * You may obtain a copy of the License at +# * +# * +# http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * +#* ****************************************************************************** +#* +#* ** Project : RAFT +#* ** @addtogroup : core.hdmiAnalyserModules +#* ** @date : 13/04/2026 +#* ** +#* ** @brief : Teledyne LeCroy Quantumdata M42h HDMI Analyser/Generator driver +#* ** +#* ** This module communicates with the M42h instrument via its +#* ** Advanced Test Platform (ATP) Python API (``tlqd`` package). +#* ** +#* ****************************************************************************** + +from .hdmiAnalyserInterface import HDMIAnalyserInterface + +try: + from tlqd import ( + TLqdInstrument, + TLqdConnectSsh, + TLqdUse, + TLqdSetFormat, + TLqdGetFormat, + TLqdGetReceivedFormat, + TLqdGetFormatParameters, + TLqdUpdateFormatParameters, + TLqdSetImage, + TLqdGetImage, + TLqdSetEdidData, + TLqdSetEdidFile, + TLqdEnableOutput, + TLqdIsOutputEnabled, + TLqdGetAviInfoframe, + TLqdSetAudio, + TLqdGetAudio, + TLqdTestAudio, + TLqdGetPixel, + TLqdGetVideoFrame, + TLqdCapture, + TLqdSetScrambling, + TLqdGetScrambling, + TLqdSetPort, + TLqdGetPorts, + TLqdColorSpace, + TLqdSubsampling, + TLqdStatus, + TLqdHdcpMode, + TLqdPort, + TLqdInfoFrameType, + TLqdHotPlugMode, + TLqdVideoFormatParameters, + ) + _TLQD_AVAILABLE = True +except ImportError: + _TLQD_AVAILABLE = False + +# Map user-friendly colour-space strings to TLqdColorSpace enum values. +_COLOUR_SPACE_MAP = {} +if _TLQD_AVAILABLE: + _COLOUR_SPACE_MAP = { + "RGB": TLqdColorSpace.RGB, + "YCbCr601": TLqdColorSpace.YCbCr601, + "YCbCr709": TLqdColorSpace.YCbCr709, + "BT2020YCbCr": TLqdColorSpace.BT2020YCbCr, + "BT2020RGB": TLqdColorSpace.BT2020RGB, + } + +# Map user-friendly sub-sampling strings to TLqdSubsampling enum values. +_SUBSAMPLING_MAP = {} +if _TLQD_AVAILABLE: + _SUBSAMPLING_MAP = { + "RGB444": TLqdSubsampling.RGB444, + "444": TLqdSubsampling.SS444, + "422": TLqdSubsampling.SS422, + "420": TLqdSubsampling.SS420, + } + + +class M42hController(HDMIAnalyserInterface): + """Driver for the Teledyne LeCroy Quantumdata M42h 96G Video Analyser/Generator. + + The M42h is controlled through its ATP (Advanced Test Platform) Python API + exposed by the ``tlqd`` package. The package is distributed as part of the + M42h *Application Programming Interface* download available from Teledyne + LeCroy's software-download portal. + + Configuration keys (passed via *config* dict): + ``host`` – IP address or hostname of the M42h instrument. + ``port`` – (optional) SSH port, defaults to 22. + ``user`` – (optional) SSH user, defaults to ``"qd"``. + ``passwd`` – (optional) SSH password, defaults to ``"qd"``. + ``card`` – (optional) Card number to select with ``TLqdUse``. + """ + + def __init__(self, host: str, port: int = None, user: str = "qd", + passwd: str = "qd", card: int = None): + if not _TLQD_AVAILABLE: + raise ImportError( + "The 'tlqd' package is required to use M42hController. " + "Install the Quantumdata ATP API from Teledyne LeCroy." + ) + self._host = host + self._port = port + self._user = user + self._passwd = passwd + self._card = card + self._qdDev = None + + # ── Connection / lifecycle ────────────────────────────────────────── + + def connect(self): + self._qdDev = TLqdConnectSsh(self._host, user=self._user, + passwd=self._passwd, port=self._port) + if not self._qdDev.connected: + raise RuntimeError( + f"Failed to connect to M42h at {self._host}" + ) + if self._card is not None: + TLqdUse(self._qdDev, self._card) + + def disconnect(self): + if self._qdDev is not None: + self._qdDev.close() + self._qdDev = None + + # ── Port selection ────────────────────────────────────────────────── + + def select_port(self, port): + TLqdSetPort(self._qdDev, port) + + # ── Hot-plug control ──────────────────────────────────────────────── + + def set_hpd(self, state: bool, duration: int = 100): + if state: + self._qdDev.setHotPlug(duration=duration) + else: + self._qdDev.setHotPlug(duration=0) + + # ── Generator (source) operations ─────────────────────────────────── + + def set_video_format(self, format_name: str, colour_space: str = None, + subsampling: str = None, bit_depth: int = None, + vic: int = None): + kwargs = {} + if colour_space and colour_space in _COLOUR_SPACE_MAP: + kwargs["colorSpace"] = _COLOUR_SPACE_MAP[colour_space] + if subsampling and subsampling in _SUBSAMPLING_MAP: + kwargs["subsampling"] = _SUBSAMPLING_MAP[subsampling] + if bit_depth is not None: + kwargs["bitDepth"] = bit_depth + if vic is not None: + kwargs["vic"] = vic + TLqdSetFormat(self._qdDev, format_name, **kwargs) + + def set_hdr_mode(self, mode: str): + # HDR is signalled via the DRM InfoFrame; configure via InfoFrame + # type control. The caller is expected to use configInfoFrame and + # updateFormatParameters for full DRM control. This helper enables + # or disables the HDR InfoFrame packet. + if mode.upper() == "SDR": + self._qdDev.configInfoFrame(type=TLqdInfoFrameType.HDR_IF, + enable=False) + else: + self._qdDev.configInfoFrame(type=TLqdInfoFrameType.HDR_IF, + enable=True) + + def set_allm(self, enabled: bool): + # ALLM is signalled via the HDMI Forum VSIF (ALLM bit). + # Toggle the HDMI Forum VS InfoFrame accordingly. + self._qdDev.configInfoFrame(type=TLqdInfoFrameType.HDMI_FORUM_VS_IF, + enable=enabled) + + def set_vrr(self, enabled: bool, base_refresh_rate: int = None): + # VRR is signalled via the Video Timing Extended Metadata (VTEM) + # packet on the generator side. + if enabled: + kwargs = {"vrrEn": 1} + if base_refresh_rate is not None: + kwargs["baseRefreshRate"] = base_refresh_rate + self._qdDev.updateVtem(**kwargs) + else: + self._qdDev.updateVtem(vrrEn=0) + + def set_avi_content_type(self, content_type: str): + # Content type is part of the AVI InfoFrame (ITC/CN fields). + # Update via format parameters. + pass + + def set_spd_info(self, vendor: str, description: str): + # SPD InfoFrame configuration. + self._qdDev.configInfoFrame(type=TLqdInfoFrameType.SPD_IF, + enable=True) + + def start_output(self): + TLqdEnableOutput(self._qdDev, True) + + def stop_output(self): + TLqdEnableOutput(self._qdDev, False) + + # ── EDID operations ───────────────────────────────────────────────── + + def get_edid(self, port: str = "") -> bytes: + # The ATP API does not expose a direct "read-back EDID bytes" + # from the sink side in a simple getter. The EDID read-back + # typically occurs through the compliance test infrastructure. + # Return empty bytes as placeholder. + return b"" + + def set_edid(self, edid_data: str, hp_duration_ms: int = 100): + TLqdSetEdidData(self._qdDev, edid_data, + hpDurationInMs=hp_duration_ms) + + def set_edid_file(self, edid_file: str, hp_duration_ms: int = 100): + TLqdSetEdidFile(self._qdDev, edid_file, + hpDurationInMs=hp_duration_ms) + + def restore_default_edid(self, port: str = ""): + # Re-apply hot-plug with no custom EDID to revert to built-in EDID. + self._qdDev.setHotPlug(duration=100) + + # ── Analyser (sink) read-back ─────────────────────────────────────── + + def get_video_status(self) -> dict: + metrics = TLqdGetReceivedFormat(self._qdDev) + result = {} + for tag in TLqdVideoFormatParameters.tags: + value = getattr(metrics, tag.longTag, None) + if value is not None: + result[tag.longTag] = str(value) + return result + + def get_audio_status(self) -> dict: + audio = TLqdGetAudio(self._qdDev) + return { + "sampling": getattr(audio, "sampling", 0), + "bit_size": getattr(audio, "bitSize", 0), + "channels": getattr(audio, "channels", []), + } + + def get_hdcp_status(self) -> dict: + # Attempt to read HDCP for both modes and return whichever is active. + try: + params = self._qdDev.getHdcp(TLqdPort.PortRx, + TLqdHdcpMode.HDCP23Mode) + return { + "status": getattr(params, "hdcpStatus", ""), + "key": getattr(params, "key", ""), + } + except Exception: + return {"status": "", "key": ""} + + def get_link_status(self) -> dict: + output_enabled = TLqdIsOutputEnabled(self._qdDev) + return { + "output_enabled": output_enabled, + } + + def get_avi_info(self) -> dict: + result, info, octets = TLqdGetAviInfoframe(self._qdDev) + return { + "valid": result, + "info": info, + "octets": octets, + } + + def get_spd_info(self) -> dict: + result, info, octets = self._qdDev.getInfoframe("SPD") + return { + "valid": result, + "info": info, + "octets": octets, + } + + def get_background_color(self) -> str: + pixel = TLqdGetPixel(self._qdDev, 0, 0) + if pixel.valid: + return f"({pixel.red},{pixel.green},{pixel.blue})" + return "" + + # ── HDCP control ──────────────────────────────────────────────────── + + def set_hdcp_mode(self, mode: str): + mode_map = { + "none": TLqdHdcpMode.HDCPNone, + "1.4": TLqdHdcpMode.HDCP13Mode, + "2.3": TLqdHdcpMode.HDCP23Mode, + } + hdcp_mode = mode_map.get(mode.lower(), TLqdHdcpMode.HDCPNone) + self._qdDev.setHdcp(TLqdPort.PortRx, hdcp_mode) + + # ── Snapshot / combined status ────────────────────────────────────── + + def snapshot(self) -> dict: + return { + "video": self.get_video_status(), + "audio": self.get_audio_status(), + "hdcp": self.get_hdcp_status(), + "link": self.get_link_status(), + "avi": self.get_avi_info(), + } + + # ── Additional M42h-specific helpers ──────────────────────────────── + + @property + def qdDev(self): + """Provide direct access to the underlying TLqdInstrument for + advanced operations not covered by the standard interface.""" + return self._qdDev From 4b8533b2fe02d570715cdd50c1a0582140fe0172 Mon Sep 17 00:00:00 2001 From: zghp <33546213+zghp@users.noreply.github.com> Date: Thu, 23 Apr 2026 07:47:18 +0100 Subject: [PATCH 2/2] hdmianalyzer test --- tests/hdmiAnalyser_test.py | 206 +++++++++++++++++++++++++++++ tests/hdmiAnalyser_test_config.yml | 15 +++ 2 files changed, 221 insertions(+) create mode 100644 tests/hdmiAnalyser_test.py create mode 100644 tests/hdmiAnalyser_test_config.yml diff --git a/tests/hdmiAnalyser_test.py b/tests/hdmiAnalyser_test.py new file mode 100644 index 0000000..0bf255e --- /dev/null +++ b/tests/hdmiAnalyser_test.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +"""Standalone test script for the HDMIAnalyserController APIs. + +Usage: + 1. Edit tests/hdmiAnalyser_test_config.yml and set your M42h IP address. + 2. Run: python3 tests/hdmiAnalyser_test.py [path/to/config.yml] + +The script exercises each public API on the controller and prints +PASSED / FAILED for every call so you can quickly verify which +operations work against real hardware. +""" + +import os +import sys +import json +import yaml + +# Add the framework path to system +dir_path = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(dir_path, "..")) + +from framework.core.logModule import logModule +from framework.core.hdmiAnalyserController import HDMIAnalyserController + +# ── Helpers ───────────────────────────────────────────────────────────── + +_pass_count = 0 +_fail_count = 0 + +def run_test(name, func): + """Run *func*, print PASSED/FAILED, and track counts.""" + global _pass_count, _fail_count + try: + func() + print(f" PASSED: {name}") + _pass_count += 1 + except Exception as e: + print(f" FAILED: {name} — {e}") + _fail_count += 1 + +def print_dict(label, d): + """Pretty-print a dict result.""" + print(f" {label}: {json.dumps(d, indent=6, default=str)}") + +# ── Main ──────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + + # Load config + default_cfg = os.path.join(dir_path, "hdmiAnalyser_test_config.yml") + cfg_path = sys.argv[1] if len(sys.argv) > 1 else default_cfg + + with open(cfg_path, "r") as f: + cfg = yaml.safe_load(f) + + config = cfg.get("hdmiAnalyserController", cfg) + + if not config.get("host"): + print("ERROR: Set the 'host' field in", cfg_path) + sys.exit(1) + + LOG = logModule("hdmi analyser test", logModule.DEBUG) + LOG.setFilename(os.path.abspath('./logs/'), 'hdmiAnalyser-%sTest.log' % config.get('type')) + + print("=" * 60) + print("HDMI Analyser API Test") + print("=" * 60) + print(f"Config: {json.dumps(config, default=str)}") + print() + + # ── Create controller ─────────────────────────────────────────── + controller = HDMIAnalyserController(LOG, config) + + # ── 1. Connect ────────────────────────────────────────────────── + print("[Connection]") + run_test("connect()", lambda: controller.connect()) + + # ── 2. Link status ────────────────────────────────────────────── + print("\n[Link Status]") + def _link_status(): + status = controller.get_link_status() + print_dict("link_status", status) + run_test("get_link_status()", _link_status) + + # ── 3. Video format — set and read back ───────────────────────── + print("\n[Video Format]") + def _set_video_1080p(): + controller.set_video_format("1080p60") + run_test("set_video_format('1080p60')", _set_video_1080p) + + def _start_output(): + controller.start_output() + run_test("start_output()", _start_output) + + def _get_video_status(): + status = controller.get_video_status() + print_dict("video_status", status) + run_test("get_video_status()", _get_video_status) + + # ── 4. HDR mode ───────────────────────────────────────────────── + print("\n[HDR Mode]") + def _set_hdr_hdr10(): + controller.set_hdr_mode("HDR10") + run_test("set_hdr_mode('HDR10')", _set_hdr_hdr10) + + def _set_hdr_sdr(): + controller.set_hdr_mode("SDR") + run_test("set_hdr_mode('SDR')", _set_hdr_sdr) + + # ── 5. ALLM ───────────────────────────────────────────────────── + print("\n[ALLM]") + run_test("set_allm(True)", lambda: controller.set_allm(True)) + run_test("set_allm(False)", lambda: controller.set_allm(False)) + + # ── 6. VRR ────────────────────────────────────────────────────── + print("\n[VRR]") + run_test("set_vrr(True, base_refresh_rate=48)", + lambda: controller.set_vrr(True, base_refresh_rate=48)) + run_test("set_vrr(False)", lambda: controller.set_vrr(False)) + + # ── 7. AVI InfoFrame ──────────────────────────────────────────── + print("\n[AVI InfoFrame]") + def _get_avi(): + info = controller.get_avi_info() + print_dict("avi_info", info) + run_test("get_avi_info()", _get_avi) + + # ── 8. SPD InfoFrame ──────────────────────────────────────────── + print("\n[SPD InfoFrame]") + run_test("set_spd_info('TestVendor', 'TestProduct')", + lambda: controller.set_spd_info("TestVendor", "TestProduct")) + + def _get_spd(): + info = controller.get_spd_info() + print_dict("spd_info", info) + run_test("get_spd_info()", _get_spd) + + # ── 9. Audio status ───────────────────────────────────────────── + print("\n[Audio]") + def _get_audio(): + status = controller.get_audio_status() + print_dict("audio_status", status) + run_test("get_audio_status()", _get_audio) + + # ── 10. HDCP ──────────────────────────────────────────────────── + print("\n[HDCP]") + run_test("set_hdcp_mode('none')", lambda: controller.set_hdcp_mode("none")) + + def _get_hdcp(): + status = controller.get_hdcp_status() + print_dict("hdcp_status", status) + run_test("get_hdcp_status()", _get_hdcp) + + run_test("set_hdcp_mode('2.3')", lambda: controller.set_hdcp_mode("2.3")) + run_test("set_hdcp_mode('none')", lambda: controller.set_hdcp_mode("none")) + + # ── 11. HPD ───────────────────────────────────────────────────── + print("\n[Hot-Plug]") + run_test("set_hpd(False)", lambda: controller.set_hpd(False)) + run_test("set_hpd(True, duration=200)", + lambda: controller.set_hpd(True, duration=200)) + + # ── 12. EDID ──────────────────────────────────────────────────── + print("\n[EDID]") + def _get_edid(): + edid = controller.get_edid() + print(f" edid length: {len(edid)} bytes") + run_test("get_edid()", _get_edid) + run_test("restore_default_edid()", lambda: controller.restore_default_edid()) + + # ── 13. Background colour ─────────────────────────────────────── + print("\n[Background Colour]") + def _get_bg(): + colour = controller.get_background_color() + print(f" background_color: {colour}") + run_test("get_background_color()", _get_bg) + + # ── 14. Snapshot (combined status) ────────────────────────────── + print("\n[Snapshot]") + def _snapshot(): + snap = controller.snapshot() + print_dict("snapshot", snap) + run_test("snapshot()", _snapshot) + + # ── 15. Change format to 4K and read back ────────────────────── + print("\n[4K Format Test]") + def _set_4k(): + controller.set_video_format("2160p60", colour_space="YCbCr709", + subsampling="420", bit_depth=10) + run_test("set_video_format('2160p60', cs=YCbCr709, ss=420, depth=10)", _set_4k) + run_test("get_video_status() after 4K", _get_video_status) + + # ── 16. Stop output ───────────────────────────────────────────── + print("\n[Stop Output]") + run_test("stop_output()", lambda: controller.stop_output()) + + # ── 17. Disconnect ────────────────────────────────────────────── + print("\n[Disconnect]") + run_test("disconnect()", lambda: controller.disconnect()) + + # ── Summary ───────────────────────────────────────────────────── + total = _pass_count + _fail_count + print() + print("=" * 60) + print(f"Results: {_pass_count}/{total} passed, {_fail_count}/{total} failed") + print("=" * 60) diff --git a/tests/hdmiAnalyser_test_config.yml b/tests/hdmiAnalyser_test_config.yml new file mode 100644 index 0000000..115e86b --- /dev/null +++ b/tests/hdmiAnalyser_test_config.yml @@ -0,0 +1,15 @@ +# HDMI Analyser Test Configuration +# --------------------------------- +# Fill in the 'host' field with the IP address of your M42h instrument +# and adjust any optional fields as needed, then run: +# +# python3 tests/hdmiAnalyser_test.py +# + +hdmiAnalyserController: + type: "m42h" + host: "" # <-- PUT YOUR M42h IP ADDRESS HERE + # port: 22 # SSH port (default: 22) + # user: "qd" # SSH username (default: "qd") + # passwd: "qd" # SSH password (default: "qd") + # card: 4 # Card number (uncomment if needed)