diff --git a/README.md b/README.md index 201e095..e2457cc 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ sample_rate = meta.sample_rate # get sample rate # read other formats containing RF time series as SigMF meta = sigmf.fromfile("recording.wav") # WAV meta = sigmf.fromfile("recording.cdif") # BLUE / Platinum +meta = sigmf.fromfile("recording.xml") # Signal Hound Spike ``` ### Docs diff --git a/docs/requirements.txt b/docs/requirements.txt index 1016144..cf97c2d 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,3 +1,2 @@ -# pinned 2025-01-15 -sphinx==8.1.3 -sphinx-rtd-theme==3.0.2 +sphinx>=8.0 +sphinx-rtd-theme>=3.0 diff --git a/docs/source/converters.rst b/docs/source/converters.rst index 71a5e88..43e2770 100644 --- a/docs/source/converters.rst +++ b/docs/source/converters.rst @@ -12,6 +12,7 @@ Conversion is available for: * **BLUE files** - MIDAS Blue and Platinum BLUE RF recordings (usually ``.cdif``) * **WAV files** - Audio recordings (``.wav``) +* **Signal Hound Spike files** - Signal Hound zero-span recordings (``.xml`` + ``.iq``) All converters return a :class:`~sigmf.SigMFFile` object with converted metadata. @@ -29,6 +30,7 @@ formats and reads without writing any output files: # auto-detect and create NCD for any supported format meta = sigmf.fromfile("recording.cdif") # BLUE file meta = sigmf.fromfile("recording.wav") # WAV file + meta = sigmf.fromfile("recording.xml") # Signal Hound Spike file meta = sigmf.fromfile("recording.sigmf") # SigMF archive all_samples = meta.read_samples() @@ -44,6 +46,7 @@ For programmatic access, use the individual converter functions directly: from sigmf.convert.wav import wav_to_sigmf from sigmf.convert.blue import blue_to_sigmf + from sigmf.convert.signalhound import signalhound_to_sigmf # convert WAV to SigMF archive _ = wav_to_sigmf(wav_path="recording.wav", out_path="recording", create_archive=True) @@ -51,6 +54,9 @@ For programmatic access, use the individual converter functions directly: # convert BLUE to SigMF pair and return metadata for new files meta = blue_to_sigmf(blue_path="recording.cdif", out_path="recording") + # convert Signal Hound Spike to SigMF pair + meta = signalhound_to_sigmf(signalhound_path="recording.xml", out_path="recording") + Command Line Usage ~~~~~~~~~~~~~~~~~~ @@ -65,8 +71,9 @@ Converters are accessed through a unified command-line interface that automatica # examples sigmf_convert recording.cdif recording.sigmf sigmf_convert recording.wav recording.sigmf + sigmf_convert recording.xml recording.sigmf -The converter uses magic byte detection to automatically identify BLUE and WAV file formats. +The converter uses magic byte detection to automatically identify BLUE, WAV, and Signal Hound Spike file formats. No need to remember format-specific commands! @@ -168,4 +175,40 @@ Examples # access standard SigMF data & metadata all_samples = meta.read_samples() - sample_rate_hz = meta.sample_rate \ No newline at end of file + sample_rate_hz = meta.sample_rate + + +Signal Hound Spike Converter +----------------------------- + +The Signal Hound Spike converter handles recordings from Signal Hound devices. +These recordings consist of two files: an XML metadata file (``.xml``) and a binary IQ data file (``.iq``). +The converter extracts metadata from the XML file and references the IQ data file, storing Signal Hound-specific +fields in the ``spike:`` namespace extension. + +.. autofunction:: sigmf.convert.signalhound.signalhound_to_sigmf + +Examples +~~~~~~~~ + +.. code-block:: python + + from sigmf.convert.signalhound import signalhound_to_sigmf + + # standard conversion (provide path to XML file) + meta = signalhound_to_sigmf(signalhound_path="recording.xml", out_path="recording") + + # create NCD automatically (metadata-only, references original .iq file) + meta = signalhound_to_sigmf(signalhound_path="recording.xml") + + # access standard SigMF data & metadata + all_samples = meta.read_samples() + sample_rate = meta.sample_rate + center_freq = meta.get_captures()[0]["core:frequency"] + + # access Signal Hound-specific metadata in spike: namespace + reference_level_dbm = meta.get_global_field("spike:reference_level_dbm") + scale_factor_mw = meta.get_global_field("spike:scale_factor_mw") + if_bandwidth_hz = meta.get_global_field("spike:if_bandwidth_hz") + iq_filename = meta.get_global_field("spike:iq_filename") # original IQ file name + preview_trace = meta.get_global_field("spike:preview_trace") # max-hold trace diff --git a/docs/source/developers.rst b/docs/source/developers.rst index 268c713..cd9ee1c 100644 --- a/docs/source/developers.rst +++ b/docs/source/developers.rst @@ -60,6 +60,7 @@ To build the docs and host locally: .. code-block:: console $ cd docs + $ pip install -r requirements.txt $ make clean $ make html $ python3 -m http.server --directory build/html/ diff --git a/sigmf/convert/__init__.py b/sigmf/convert/__init__.py index e69de29..1d1654e 100644 --- a/sigmf/convert/__init__.py +++ b/sigmf/convert/__init__.py @@ -0,0 +1,91 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Convert non-SigMF recordings to SigMF format""" + +from pathlib import Path + +from ..error import SigMFConversionError + + +def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0) -> bytes: + """ + Get magic bytes from a file to help identify file type. + + Parameters + ---------- + file_path : Path + Path to the file to read magic bytes from. + count : int, optional + Number of bytes to read. Default is 4. + offset : int, optional + Byte offset to start reading from. Default is 0. + + Returns + ------- + bytes + Magic bytes from the file. + + Raises + ------ + SigMFConversionError + If file cannot be read or is too small. + """ + try: + with open(file_path, "rb") as handle: + handle.seek(offset) + magic_bytes = handle.read(count) + if len(magic_bytes) < count: + raise SigMFConversionError(f"File {file_path} too small to read {count} magic bytes at offset {offset}") + return magic_bytes + except OSError as err: + raise SigMFConversionError(f"Failed to read magic bytes from {file_path}: {err}") from err + + +def detect_converter(file_path: Path): + """ + Detect the appropriate converter for a non-SigMF file. + + Parameters + ---------- + file_path : Path + Path to the file to detect. + + Returns + ------- + str + The converter name: "wav", "blue", or "signalhound" + + Raises + ------ + SigMFConversionError + If the file format is not supported or cannot be detected. + """ + magic_bytes = get_magic_bytes(file_path, count=4, offset=0) + + if magic_bytes == b"RIFF": + return "wav" + + elif magic_bytes == b"BLUE": + return "blue" + + elif magic_bytes == b" + # Check if it's a Signal Hound Spike file + # Skip XML declaration (40 bytes) and check for SignalHoundIQFile root element + expanded_magic_bytes = get_magic_bytes(file_path, count=17, offset=40) + if expanded_magic_bytes == b"SignalHoundIQFile": + return "signalhound" + else: + raise SigMFConversionError( + f"Unsupported XML file format. Root element: {expanded_magic_bytes}. " + f"Expected SignalHoundIQFile for Signal Hound Spike files." + ) + + else: + raise SigMFConversionError( + f"Unsupported file format. Magic bytes: {magic_bytes}. " + f"Supported formats for conversion are WAV, BLUE/Platinum, and Signal Hound Spike." + ) diff --git a/sigmf/convert/__main__.py b/sigmf/convert/__main__.py index de7d6c1..2b215fe 100644 --- a/sigmf/convert/__main__.py +++ b/sigmf/convert/__main__.py @@ -13,8 +13,9 @@ from .. import __version__ as toolversion from ..error import SigMFConversionError -from ..utils import get_magic_bytes +from . import detect_converter from .blue import blue_to_sigmf +from .signalhound import signalhound_to_sigmf from .wav import wav_to_sigmf @@ -60,8 +61,8 @@ def main() -> None: exclusive_group.add_argument( "--ncd", action="store_true", help="Output .sigmf-meta only and process as a Non-Conforming Dataset (NCD)" ) - parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output files") parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") + args = parser.parse_args() level_lut = { @@ -85,33 +86,16 @@ def main() -> None: if output_path.is_dir(): raise SigMFConversionError(f"Output path must be a filename, not a directory: {output_path}") - # detect file type using magic bytes (same logic as fromfile()) - magic_bytes = get_magic_bytes(input_path, count=4, offset=0) - - if magic_bytes == b"RIFF": - # WAV file - _ = wav_to_sigmf( - wav_path=input_path, - out_path=output_path, - create_archive=args.archive, - create_ncd=args.ncd, - overwrite=args.overwrite, - ) - - elif magic_bytes == b"BLUE": - # BLUE file - _ = blue_to_sigmf( - blue_path=input_path, - out_path=output_path, - create_archive=args.archive, - create_ncd=args.ncd, - overwrite=args.overwrite, - ) + # detect file type using magic bytes + converter_type = detect_converter(input_path) - else: - raise SigMFConversionError( - f"Unsupported file format. Magic bytes: {magic_bytes}. " - f"Supported formats for conversion are WAV and BLUE/Platinum." + if converter_type == "wav": + _ = wav_to_sigmf(wav_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) + elif converter_type == "blue": + _ = blue_to_sigmf(blue_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) + elif converter_type == "signalhound": + _ = signalhound_to_sigmf( + signalhound_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd ) diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py index 46cd6c8..c988542 100644 --- a/sigmf/convert/blue.py +++ b/sigmf/convert/blue.py @@ -25,7 +25,6 @@ import numpy as np from packaging.version import InvalidVersion, Version -from .. import __version__ as toolversion from ..error import SigMFConversionError from ..sigmffile import SigMFFile, fromfile, get_sigmf_filenames from ..utils import SIGMF_DATETIME_ISO8601_FMT diff --git a/sigmf/convert/signalhound.py b/sigmf/convert/signalhound.py new file mode 100644 index 0000000..993ba37 --- /dev/null +++ b/sigmf/convert/signalhound.py @@ -0,0 +1,484 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Signal Hound Converter""" + +import getpass +import io +import logging +import tempfile +import xml.etree.ElementTree as ET +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import List, Optional, Tuple + +import numpy as np + +from .. import SigMFFile, fromfile +from ..error import SigMFConversionError +from ..sigmffile import get_sigmf_filenames +from ..utils import SIGMF_DATETIME_ISO8601_FMT + +log = logging.getLogger() + + +def _text_of(root: ET.Element, tag: str) -> Optional[str]: + """Extract and strip text from XML element.""" + elem = root.find(tag) + return elem.text.strip() if (elem is not None and elem.text is not None) else None + + +def _parse_preview_trace(text: Optional[str]) -> List[float]: + """ + Preview trace is a max-hold trace of the signal power across the capture, represented as a comma-separated string of values. + + Example + ------- + >>> trace_str = "-1.0, 0.1, 0.5, 0.3, 0.7" + >>> _parse_preview_trace(trace_str) + [-1.0, 0.1, 0.5, 0.3, 0.7] + """ + if text is None: + return [] + s = text.strip() + if s.endswith(","): + s = s[:-1] + if not s: + return [] + parts = [p.strip() for p in s.split(",") if p.strip() != ""] + vals = [] + for part in parts: + vals.append(float(part)) + return vals + + +def validate_spike(xml_path: Path) -> None: + """ + Validate required Spike XML metadata fields and associated IQ file. + + Parameters + ---------- + xml_path : Path + Path to the Spike XML file. + + Raises + ------ + SigMFConversionError + If required fields are missing or invalid, or IQ file doesn't exist. + """ + tree = ET.parse(xml_path) + root = tree.getroot() + + # validate CenterFrequency + center_freq_raw = _text_of(root, "CenterFrequency") + try: + center_frequency = float(center_freq_raw) + except (TypeError, ValueError) as err: + raise SigMFConversionError(f"Invalid or missing CenterFrequency: {center_freq_raw}") from err + + # validate SampleRate + sample_rate_raw = _text_of(root, "SampleRate") + try: + sample_rate = float(sample_rate_raw) + except (TypeError, ValueError) as err: + raise SigMFConversionError(f"Invalid or missing SampleRate: {sample_rate_raw}") from err + + if sample_rate <= 0: + raise SigMFConversionError(f"Invalid SampleRate: {sample_rate} (must be > 0)") + + # validate DataType + data_type_raw = _text_of(root, "DataType") + if data_type_raw is None: + raise SigMFConversionError("Missing DataType in Spike XML") + + # check datatype mapping - currently only "Complex Short" is supported + if data_type_raw != "Complex Short": + raise SigMFConversionError(f"Unsupported Spike DataType: {data_type_raw}") + + # validate associated IQ file exists + iq_file_path = xml_path.with_suffix(".iq") + if not iq_file_path.exists(): + raise SigMFConversionError(f"Could not find associated IQ file: {iq_file_path}") + + # validate IQ file size is aligned to sample boundary + filesize = iq_file_path.stat().st_size + elem_size = np.dtype(np.int16).itemsize + frame_bytes = 2 * elem_size # I and Q components + if filesize % frame_bytes != 0: + raise SigMFConversionError(f"IQ file size {filesize} not divisible by {frame_bytes}; partial sample present") + + +def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: + """ + Build SigMF metadata components from the Spike XML file. + + Parameters + ---------- + xml_path : Path + Path to the Spike XML file. + + Returns + ------- + tuple of (dict, dict, list, int) + global_info, capture_info, annotations, sample_count + + Raises + ------ + SigMFConversionError + If required fields are missing or invalid. + """ + log.info("converting spike xml metadata to sigmf format") + + xml_path = Path(xml_path) + tree = ET.parse(xml_path) + root = tree.getroot() + + # validate required fields and associated IQ file + validate_spike(xml_path) + + # extract and convert required fields + center_frequency = float(_text_of(root, "CenterFrequency")) + sample_rate = float(_text_of(root, "SampleRate")) + data_type_raw = _text_of(root, "DataType") + + # optional EpochNanos field + epoch_nanos = None + epoch_nanos_raw = _text_of(root, "EpochNanos") + if epoch_nanos_raw: + try: + epoch_nanos = int(epoch_nanos_raw) + except ValueError: + log.warning(f"could not parse EpochNanos: {epoch_nanos_raw}") + + # map datatype + if data_type_raw == "Complex Short": + data_type = "ci16_le" # complex int16 little-endian + else: + raise SigMFConversionError(f"Unsupported Spike DataType: {data_type_raw}") + + # optional fields - only convert if present and valid + reference_level = None + reference_level_raw = _text_of(root, "ReferenceLevel") + if reference_level_raw: + try: + reference_level = float(reference_level_raw) + except ValueError: + log.warning(f"could not parse ReferenceLevel: {reference_level_raw}") + + decimation = None + decimation_raw = _text_of(root, "Decimation") + if decimation_raw: + try: + decimation = int(float(decimation_raw)) + except ValueError: + log.warning(f"could not parse Decimation: {decimation_raw}") + + if_bandwidth = None + if_bandwidth_raw = _text_of(root, "IFBandwidth") + if if_bandwidth_raw: + try: + if_bandwidth = float(if_bandwidth_raw) + except ValueError: + log.warning(f"could not parse IFBandwidth: {if_bandwidth_raw}") + + scale_factor = None + scale_factor_raw = _text_of(root, "ScaleFactor") + if scale_factor_raw: + try: + scale_factor = float(scale_factor_raw) + except ValueError: + log.warning(f"could not parse ScaleFactor: {scale_factor_raw}") + + device_type = _text_of(root, "DeviceType") + serial_number = _text_of(root, "SerialNumber") + iq_file_name = _text_of(root, "IQFileName") + + # parse preview trace if present + preview_trace_raw = _text_of(root, "PreviewTrace") + preview_trace = _parse_preview_trace(preview_trace_raw) if preview_trace_raw else None + + # build hardware description with available information + hw_parts = [] + if device_type: + hw_parts.append(f"{device_type}") + else: + hw_parts.append("Signal Hound Device") + + if serial_number: + hw_parts.append(f"S/N: {serial_number}") + + if decimation: + hw_parts.append(f"decimation: {decimation}") + + hardware_description = ", ".join(hw_parts) if hw_parts else "Signal Hound Device" + + # strip the extension from the original file path + base_file_name = xml_path.with_suffix("") + # build the .iq file path for data file + data_file_path = base_file_name.with_suffix(".iq") + filesize = data_file_path.stat().st_size + + # complex 16-bit integer IQ data > ci16_le in SigMF + elem_size = np.dtype(np.int16).itemsize + frame_bytes = 2 * elem_size # I and Q components + + # calculate sample count using the original IQ data file size + sample_count_calculated = filesize // frame_bytes + log.debug("sample count: %d", sample_count_calculated) + + # convert the datetime object to an ISO 8601 formatted string if EpochNanos is present + iso_8601_string = None + if epoch_nanos is not None: + secs = epoch_nanos // 1_000_000_000 + rem_ns = epoch_nanos % 1_000_000_000 + dt = datetime.fromtimestamp(secs, tz=timezone.utc) + timedelta(microseconds=rem_ns / 1000) + iso_8601_string = dt.strftime(SIGMF_DATETIME_ISO8601_FMT) + + # base global metadata + global_md = { + SigMFFile.AUTHOR_KEY: getpass.getuser(), + SigMFFile.DATATYPE_KEY: data_type, + SigMFFile.HW_KEY: hardware_description, + SigMFFile.NUM_CHANNELS_KEY: 1, + SigMFFile.RECORDER_KEY: "Official SigMF Signal Hound converter", + SigMFFile.SAMPLE_RATE_KEY: sample_rate, + SigMFFile.EXTENSIONS_KEY: [{"name": "spike", "version": "0.0.1", "optional": True}], + } + + # add optional spike-specific fields to global metadata using spike: namespace + # only include fields that aren't already represented in standard SigMF metadata + if reference_level: + global_md["spike:reference_level_dbm"] = reference_level + if scale_factor: + global_md["spike:scale_factor_mw"] = scale_factor # to convert raw to mW + if if_bandwidth: + global_md["spike:if_bandwidth_hz"] = if_bandwidth + if iq_file_name: + global_md["spike:iq_filename"] = iq_file_name # provenance + if preview_trace: + global_md["spike:preview_trace"] = preview_trace # max-hold trace + + # capture info + capture_info = { + SigMFFile.FREQUENCY_KEY: center_frequency, + } + if iso_8601_string: + capture_info[SigMFFile.DATETIME_KEY] = iso_8601_string + + # create annotations array using calculated values + annotations = [] + if if_bandwidth: + upper_frequency_edge = center_frequency + (if_bandwidth / 2.0) + lower_frequency_edge = center_frequency - (if_bandwidth / 2.0) + annotations.append( + { + SigMFFile.START_INDEX_KEY: 0, + SigMFFile.LENGTH_INDEX_KEY: sample_count_calculated, + SigMFFile.FLO_KEY: lower_frequency_edge, + SigMFFile.FHI_KEY: upper_frequency_edge, + SigMFFile.LABEL_KEY: "Spike", + } + ) + + return global_md, capture_info, annotations, sample_count_calculated + + +def convert_iq_data(xml_path: Path, sample_count: int) -> np.ndarray: + """ + Convert IQ data in .iq file to SigMF based on values in Spike XML file. + + Parameters + ---------- + xml_path : Path + Path to the spike XML file. + sample_count : int + Number of samples to read. + + Returns + ------- + numpy.ndarray + Parsed samples. + """ + log.debug("parsing spike file data values") + iq_file_path = xml_path.with_suffix(".iq") + + # calculate element count (I and Q samples) + elem_count = sample_count * 2 # *2 for I and Q samples + + # complex 16-bit integer IQ data > ci16_le in SigMF + elem_size = np.dtype(np.int16).itemsize + + # read raw interleaved int16 IQ + samples = np.fromfile(iq_file_path, dtype=np.int16, offset=0, count=elem_count) + + # trim trailing partial bytes + if samples.nbytes % elem_size != 0: + trim = samples.nbytes % elem_size + log.warning("trimming %d trailing byte(s) to align samples", trim) + samples = samples[: -(trim // elem_size)] + + return samples + + +def signalhound_to_sigmf( + signalhound_path: Path, + out_path: Optional[Path] = None, + create_archive: bool = False, + create_ncd: bool = False, + overwrite: bool = False, +) -> SigMFFile: + """ + Read a signalhound file, optionally write sigmf archive, return associated SigMF object. + + Parameters + ---------- + signalhound_path : Path + Path to the signalhound file. + out_path : Path, optional + Path to the output SigMF metadata file. + create_archive : bool, optional + When True, package output as a .sigmf archive. + create_ncd : bool, optional + When True, create Non-Conforming Dataset + overwrite : bool, optional + If False, raise exception if output files already exist. + + Returns + ------- + SigMFFile + SigMF object, potentially as Non-Conforming Dataset. + + Raises + ------ + SigMFConversionError + If the signalhound file cannot be read. + """ + signalhound_path = Path(signalhound_path) + out_path = None if out_path is None else Path(out_path) + + # auto-enable NCD when no output path is specified + if out_path is None: + create_ncd = True + + # call the SigMF conversion for metadata generation + global_info, capture_info, annotations, sample_count = _build_metadata(signalhound_path) + + # get filenames for metadata, data, and archive based on output path and input file name + if out_path is None: + base_path = signalhound_path + else: + base_path = Path(out_path) + + filenames = get_sigmf_filenames(base_path) + + # create NCD if specified, otherwise create standard SigMF dataset or archive + if create_ncd: + # spike files have no header or trailing bytes + global_info[SigMFFile.DATASET_KEY] = signalhound_path.with_suffix(".iq").name + global_info[SigMFFile.TRAILING_BYTES_KEY] = 0 + capture_info[SigMFFile.HEADER_BYTES_KEY] = 0 + + # build the .iq file path for data file + base_file_name = signalhound_path.with_suffix("") + data_file_path = base_file_name.with_suffix(".iq") + + # create metadata-only SigMF for NCD pointing to original file + meta = SigMFFile(global_info=global_info) + meta.set_data_file(data_file=data_file_path, offset=0) + meta.data_buffer = io.BytesIO() + meta.add_capture(0, metadata=capture_info) + + # add annotations from metadata + for annotation in annotations: + start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) + length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) + # pass remaining fields as metadata (excluding standard annotation keys) + annot_metadata = { + k: v for k, v in annotation.items() if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY] + } + meta.add_annotation(start_idx, length=length, metadata=annot_metadata) + + # write metadata file if output path specified + if out_path is not None: + output_dir = filenames["meta_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + meta.tofile(filenames["meta_fn"], toarchive=False, overwrite=overwrite) + log.info("wrote SigMF non-conforming metadata to %s", filenames["meta_fn"]) + + log.debug("created %r", meta) + return meta + + # create archive if specified, otherwise write separate meta and data files + if create_archive: + # use temporary directory for data file when creating archive + with tempfile.TemporaryDirectory() as temp_dir: + data_path = Path(temp_dir) / filenames["data_fn"].name + + # convert iq data and write to temp directory + try: + iq_data = convert_iq_data(signalhound_path, sample_count) + except Exception as e: + raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") from e + + # write converted iq data to temporary file + iq_data.tofile(data_path) + log.debug("wrote converted iq data to %s", data_path) + + meta = SigMFFile(data_file=data_path, global_info=global_info) + meta.add_capture(0, metadata=capture_info) + + # add annotations from metadata + for annotation in annotations: + start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) + length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) + annot_metadata = { + k: v + for k, v in annotation.items() + if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY] + } + meta.add_annotation(start_idx, length=length, metadata=annot_metadata) + + output_dir = filenames["archive_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + meta.tofile(filenames["archive_fn"], toarchive=True, overwrite=overwrite) + log.info("wrote SigMF archive to %s", filenames["archive_fn"]) + # metadata returned should be for this archive + meta = fromfile(filenames["archive_fn"]) + + else: + # write separate meta and data files + # convert iq data for spike file + try: + iq_data = convert_iq_data(signalhound_path, sample_count) + except Exception as e: + raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") from e + + # write data file + output_dir = filenames["data_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + iq_data.tofile(filenames["data_fn"]) + log.debug("wrote SigMF dataset to %s", filenames["data_fn"]) + + # create sigmffile with converted iq data + meta = SigMFFile(data_file=filenames["data_fn"], global_info=global_info) + meta.add_capture(0, metadata=capture_info) + + # add annotations from metadata + for annotation in annotations: + start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) + length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) + # pass remaining fields as metadata (excluding standard annotation keys) + annot_metadata = { + k: v for k, v in annotation.items() if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY] + } + meta.add_annotation(start_idx, length=length, metadata=annot_metadata) + + # write metadata file + meta.tofile(filenames["meta_fn"], toarchive=False, overwrite=overwrite) + log.info("wrote SigMF metadata to %s", filenames["meta_fn"]) + + log.debug("created %r", meta) + return meta diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 5ae4291..8cb09ec 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -30,7 +30,7 @@ SigMFFileError, SigMFFileExistsError, ) -from .utils import dict_merge, get_magic_bytes +from .utils import dict_merge class SigMFMetafile: @@ -1286,7 +1286,7 @@ def fromfile(filename, skip_checksum=False, autoscale=True): * a SigMF Metadata file (.sigmf-meta) * a SigMF Dataset file (.sigmf-data) * a SigMF Collection file (.sigmf-collection) - * a non-SigMF RF recording that can be converted (.wav, .cdif) + * a non-SigMF RF recording that can be converted (.wav, .cdif, .xml, etc.) Parameters ---------- @@ -1352,17 +1352,28 @@ def fromfile(filename, skip_checksum=False, autoscale=True): if not autoscale: # TODO: allow autoscale=False for converters warnings.warn("non-SigMF auto-detection conversion only supports autoscale=True; ignoring autoscale=False") - magic_bytes = get_magic_bytes(file_path, count=4, offset=0) - if magic_bytes == b"RIFF": - from .convert.wav import wav_to_sigmf + # lazy imports to avoid circular dependency + from .convert import detect_converter - return wav_to_sigmf(file_path, create_ncd=True) + try: + converter_type = detect_converter(file_path) + + if converter_type == "wav": + from .convert.wav import wav_to_sigmf + + return wav_to_sigmf(file_path, create_ncd=True) + elif converter_type == "blue": + from .convert.blue import blue_to_sigmf - elif magic_bytes == b"BLUE": - from .convert.blue import blue_to_sigmf + return blue_to_sigmf(file_path, create_ncd=True) + elif converter_type == "signalhound": + from .convert.signalhound import signalhound_to_sigmf - return blue_to_sigmf(file_path, create_ncd=True) + return signalhound_to_sigmf(file_path, create_ncd=True) + except SigMFConversionError: + # unsupported format, fall through to raise SigMFFileError + pass # if file doesn't exist at all or no valid files found, raise original error raise SigMFFileError(f"Cannot read {filename} as SigMF or supported non-SigMF format.") diff --git a/tests/test_convert_blue.py b/tests/test_convert_blue.py index e81f850..65d3dda 100644 --- a/tests/test_convert_blue.py +++ b/tests/test_convert_blue.py @@ -18,8 +18,7 @@ from sigmf.convert.blue import TYPE_MAP, blue_to_sigmf from sigmf.utils import SIGMF_DATETIME_ISO8601_FMT -from .test_convert_wav import _validate_ncd -from .testdata import get_nonsigmf_path +from .testdata import get_nonsigmf_path, validate_ncd class TestBlueConverter(unittest.TestCase): @@ -153,7 +152,7 @@ def test_blue_to_sigmf_ncd(self) -> None: for form, atol in self.format_tolerance: self.write_minimal(form.encode()) meta = blue_to_sigmf(self.blue_path) - _validate_ncd(self, meta, self.blue_path) + validate_ncd(self, meta, self.blue_path) self.check_data_and_metadata(meta, form, atol) def test_pair_overwrite_protection(self) -> None: @@ -241,7 +240,7 @@ def test_create_ncd(self): """test direct NCD conversion""" for blue_path in self.blue_paths: meta = blue_to_sigmf(blue_path=blue_path) - _validate_ncd(self, meta, blue_path) + validate_ncd(self, meta, blue_path) if len(meta): # check sample read consistency np.testing.assert_allclose(meta.read_samples(count=10), meta[0:10], atol=1e-6) @@ -250,4 +249,4 @@ def test_fromfile_ncd(self): """test automatic NCD conversion with fromfile()""" for blue_path in self.blue_paths: meta = sigmf.fromfile(blue_path) - _validate_ncd(self, meta, blue_path) + validate_ncd(self, meta, blue_path) diff --git a/tests/test_convert_signalhound.py b/tests/test_convert_signalhound.py new file mode 100644 index 0000000..4017997 --- /dev/null +++ b/tests/test_convert_signalhound.py @@ -0,0 +1,177 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Tests for Signal Hound Converter""" + +import tempfile +import unittest +import wave +from pathlib import Path + +import numpy as np + +import sigmf +from sigmf.convert.signalhound import signalhound_to_sigmf + +from .testdata import get_nonsigmf_path, validate_ncd + + +class TestSignalHoundConverter(unittest.TestCase): + """Create a realistic Signal Hound XML/IQ file pair and test conversion methods.""" + + def setUp(self) -> None: + """Create temp XML/IQ file pair with tone for testing.""" + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + self.iq_path = self.tmp_path / "test.iq" + self.xml_path = self.tmp_path / "test.xml" + + # Generate complex IQ test data + self.samp_rate = 48000 + self.center_freq = 915e6 + duration_s = 0.1 + num_samples = int(self.samp_rate * duration_s) + ttt = np.linspace(0, duration_s, num_samples, endpoint=False) + freq = 440 # A4 note + self.iq_data = 0.5 * np.exp(2j * np.pi * freq * ttt) # complex128, normalized to [-0.5, 0.5] + + # Convert complex IQ data to interleaved int16 format (ci16_le - Signal Hound "Complex Short") + scale = 2**15 # int16 range is -32768 to 32767 + ci_real = (self.iq_data.real * scale).astype(np.int16) + ci_imag = (self.iq_data.imag * scale).astype(np.int16) + iq_interleaved = np.empty((len(self.iq_data) * 2,), dtype=np.int16) + iq_interleaved[0::2] = ci_real + iq_interleaved[1::2] = ci_imag + + # Write IQ file as raw interleaved int16 + with open(self.iq_path, "wb") as iq_file: + iq_file.write(iq_interleaved.tobytes()) + + # Write minimal XML metadata file + with open(self.xml_path, "w") as xml_file: + xml_file.write( + f'\n' + f'\n' + f" {self.center_freq}\n" + f" {self.samp_rate}\n" + f" Complex Short\n" + f" {self.iq_path.name}\n" + f"\n" + ) + + def tearDown(self) -> None: + """Clean up temporary directory.""" + self.tmp_dir.cleanup() + + def _verify(self, meta: sigmf.SigMFFile) -> None: + """Verify metadata fields and data integrity.""" + self.assertIsInstance(meta, sigmf.SigMFFile) + self.assertEqual(meta.get_global_field("core:datatype"), "ci16_le") + self.assertEqual(meta.get_global_field("core:sample_rate"), self.samp_rate) + # center frequency is in capture metadata + self.assertEqual(meta.get_captures()[0]["core:frequency"], self.center_freq) + # verify data + data = meta.read_samples() + self.assertGreater(len(data), 0, "Should read some samples") + # allow numerical differences due to int16 quantization + self.assertTrue(np.allclose(self.iq_data, data, atol=1e-4)) + + def test_signalhound_to_sigmf_pair(self): + """Test standard Signal Hound to SigMF conversion with file pairs.""" + sigmf_path = self.tmp_path / "converted" + meta = signalhound_to_sigmf(signalhound_path=self.xml_path, out_path=sigmf_path) + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") + self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") + self._verify(meta) + + # test overwrite protection + with self.assertRaises(sigmf.error.SigMFFileError) as context: + signalhound_to_sigmf(signalhound_path=self.xml_path, out_path=sigmf_path, overwrite=False) + self.assertIn("already exists", str(context.exception)) + + # test overwrite works + meta2 = signalhound_to_sigmf(signalhound_path=self.xml_path, out_path=sigmf_path, overwrite=True) + self.assertIsInstance(meta2, sigmf.SigMFFile) + + def test_signalhound_to_sigmf_archive(self): + """Test Signal Hound to SigMF conversion with archive output.""" + sigmf_path = self.tmp_path / "converted_archive" + meta = signalhound_to_sigmf(signalhound_path=self.xml_path, out_path=sigmf_path, create_archive=True) + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["archive_fn"].exists(), "archive path missing") + self._verify(meta) + + # test overwrite protection + with self.assertRaises(sigmf.error.SigMFFileError) as context: + signalhound_to_sigmf( + signalhound_path=self.xml_path, out_path=sigmf_path, create_archive=True, overwrite=False + ) + self.assertIn("already exists", str(context.exception)) + + def test_signalhound_to_sigmf_ncd(self): + """Test Signal Hound to SigMF conversion as Non-Conforming Dataset.""" + meta = signalhound_to_sigmf(signalhound_path=self.xml_path, create_ncd=True) + target_path = self.iq_path + validate_ncd(self, meta, target_path) + self._verify(meta) + + +class TestSignalHoundWithNonSigMFRepo(unittest.TestCase): + """Test Signal Hound converter with real example files if available.""" + + def setUp(self) -> None: + """Find a non-SigMF dataset for testing.""" + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + nonsigmf_path = get_nonsigmf_path(self) + # glob all files in signal hound directory + hound_dir = nonsigmf_path / "signal_hound" + self.hound_paths = [] + self.hound_paths.extend(hound_dir.glob("*.xml")) + if not self.hound_paths: + self.fail(f"No Signal Hound XML files found in {hound_dir}.") + + def tearDown(self) -> None: + """Clean up temporary directory.""" + self.tmp_dir.cleanup() + + def test_sigmf_pair(self): + """test basic signal hound to sigmf conversion with file pairs""" + for hound_path in self.hound_paths: + sigmf_path = self.tmp_path / hound_path.stem + meta = signalhound_to_sigmf(signalhound_path=hound_path, out_path=sigmf_path) + self.assertIsInstance(meta, sigmf.SigMFFile) + if not meta.get_global_field("core:metadata_only"): + # check sample read consistency + np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_sigmf_archive(self): + """test signal hound to sigmf conversion with archive output""" + for hound_path in self.hound_paths: + sigmf_path = self.tmp_path / f"{hound_path.stem}_archive" + meta = signalhound_to_sigmf(signalhound_path=hound_path, out_path=sigmf_path, create_archive=True) + self.assertIsInstance(meta, sigmf.SigMFFile) + if not meta.get_global_field("core:metadata_only"): + # check sample read consistency + np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_create_ncd(self): + """test direct NCD conversion""" + for hound_path in self.hound_paths: + meta = signalhound_to_sigmf(signalhound_path=hound_path) + target_path = hound_path.with_suffix(".iq") + validate_ncd(self, meta, target_path) + if len(meta): + # check sample read consistency + np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_fromfile_ncd(self): + """test automatic NCD conversion with fromfile""" + for hound_path in self.hound_paths: + meta = sigmf.fromfile(hound_path) + target_path = hound_path.with_suffix(".iq") + validate_ncd(self, meta, target_path) diff --git a/tests/test_convert_wav.py b/tests/test_convert_wav.py index e2861f4..cf28f88 100644 --- a/tests/test_convert_wav.py +++ b/tests/test_convert_wav.py @@ -16,24 +16,7 @@ import sigmf from sigmf.convert.wav import wav_to_sigmf -from .testdata import get_nonsigmf_path - - -def _validate_ncd(test: unittest.TestCase, meta: sigmf.SigMFFile, target_path: Path): - """non-conforming dataset has a specific structure""" - test.assertEqual(str(meta.data_file), str(target_path), "Auto-detected NCD should point to original file") - test.assertIsInstance(meta, sigmf.SigMFFile) - - global_info = meta.get_global_info() - capture_info = meta.get_captures() - - # validate NCD SigMF spec compliance - test.assertGreater(len(capture_info), 0, "Should have at least one capture") - test.assertIn("core:header_bytes", capture_info[0]) - test.assertGreater(capture_info[0]["core:header_bytes"], 0, "Should have non-zero core:header_bytes field") - test.assertIn("core:trailing_bytes", global_info, "Should have core:trailing_bytes field.") - test.assertIn("core:dataset", global_info, "Should have core:dataset field.") - test.assertNotIn("core:metadata_only", global_info, "Should NOT have core:metadata_only field.") +from .testdata import get_nonsigmf_path, validate_ncd class TestWAVConverter(unittest.TestCase): @@ -63,6 +46,15 @@ def tearDown(self) -> None: """clean up temporary directory""" self.tmp_dir.cleanup() + def _verify(self, meta: sigmf.SigMFFile) -> None: + """Verify metadata fields and data integrity.""" + self.assertIsInstance(meta, sigmf.SigMFFile) + # verify data + data = meta.read_samples() + self.assertGreater(len(data), 0, "Should read some samples") + # allow numerical differences due to PCM quantization + self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + def test_wav_to_sigmf_pair(self) -> None: """test standard wav to sigmf conversion with file pairs""" sigmf_path = self.tmp_path / "bar" @@ -70,11 +62,7 @@ def test_wav_to_sigmf_pair(self) -> None: filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") - # verify data - data = meta.read_samples() - self.assertGreater(len(data), 0, "Should read some samples") - # allow numerical differences due to PCM quantization - self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + self._verify(meta) # test overwrite protection with self.assertRaises(sigmf.error.SigMFFileError) as context: @@ -91,11 +79,7 @@ def test_wav_to_sigmf_archive(self) -> None: meta = wav_to_sigmf(wav_path=self.wav_path, out_path=sigmf_path, create_archive=True) filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) self.assertTrue(filenames["archive_fn"].exists(), "archive path missing") - # verify data - data = meta.read_samples() - self.assertGreater(len(data), 0, "Should read some samples") - # allow numerical differences due to PCM quantization - self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + self._verify(meta) # test overwrite protection with self.assertRaises(sigmf.error.SigMFFileError) as context: @@ -109,13 +93,8 @@ def test_wav_to_sigmf_archive(self) -> None: def test_wav_to_sigmf_ncd(self) -> None: """test wav to sigmf conversion as Non-Conforming Dataset""" meta = wav_to_sigmf(wav_path=self.wav_path, create_ncd=True) - _validate_ncd(self, meta, self.wav_path) - - # verify data - data = meta.read_samples() - # allow numerical differences due to PCM quantization - self.assertGreater(len(data), 0, "Should read some samples") - self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + validate_ncd(self, meta, self.wav_path) + self._verify(meta) # test overwrite protection when creating NCD with output path sigmf_path = self.tmp_path / "ncd_test" @@ -173,7 +152,7 @@ def test_create_ncd(self) -> None: """test direct NCD conversion""" for wav_path in self.wav_paths: meta = wav_to_sigmf(wav_path=wav_path) - _validate_ncd(self, meta, wav_path) + validate_ncd(self, meta, wav_path) # test file read _ = meta.read_samples(count=10) @@ -182,4 +161,4 @@ def test_autodetect_ncd(self) -> None: """test automatic NCD conversion""" for wav_path in self.wav_paths: meta = sigmf.fromfile(wav_path) - _validate_ncd(self, meta, wav_path) + validate_ncd(self, meta, wav_path) diff --git a/tests/testdata.py b/tests/testdata.py index 28e3ac7..2e93b99 100644 --- a/tests/testdata.py +++ b/tests/testdata.py @@ -27,6 +27,25 @@ def get_nonsigmf_path(test: unittest.TestCase) -> Path: return recordings_path +def validate_ncd(test: unittest.TestCase, meta: SigMFFile, target_path: Path): + """Validate that a SigMF object is a properly structured non-conforming dataset (NCD).""" + test.assertEqual(str(meta.data_file), str(target_path), "Auto-detected NCD should point to original file") + test.assertIsInstance(meta, SigMFFile) + + global_info = meta.get_global_info() + capture_info = meta.get_captures() + + # validate NCD SigMF spec compliance + test.assertGreater(len(capture_info), 0, "Should have at least one capture") + test.assertIn("core:header_bytes", capture_info[0]) + if target_path.suffix != ".iq": + # skip for Signal Hound + test.assertGreater(capture_info[0]["core:header_bytes"], 0, "Should have non-zero core:header_bytes field") + test.assertIn("core:trailing_bytes", global_info, "Should have core:trailing_bytes field.") + test.assertIn("core:dataset", global_info, "Should have core:dataset field.") + test.assertNotIn("core:metadata_only", global_info, "Should NOT have core:metadata_only field.") + + TEST_FLOAT32_DATA = np.arange(16, dtype=np.float32) TEST_METADATA = { SigMFFile.ANNOTATION_KEY: [{SigMFFile.LENGTH_INDEX_KEY: 16, SigMFFile.START_INDEX_KEY: 0}],