Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sigmf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# SPDX-License-Identifier: LGPL-3.0-or-later

# version of this python module
__version__ = "1.7.0"
__version__ = "1.7.1"
# matching version of the SigMF specification
__specification__ = "1.2.6"

Expand Down
4 changes: 2 additions & 2 deletions sigmf/sigmffile.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,9 @@ def add_capture(self, start_index, metadata=None):
new_capture[self.START_INDEX_KEY] = start_index
# merge if capture exists
merged = False
for existing_capture in self._metadata[self.CAPTURE_KEY]:
for idx, existing_capture in enumerate(self._metadata[self.CAPTURE_KEY]):
if existing_capture[self.START_INDEX_KEY] == start_index:
existing_capture = dict_merge(existing_capture, new_capture)
self._metadata[self.CAPTURE_KEY][idx] = dict_merge(existing_capture, new_capture)
merged = True
if not merged:
capture_list += [new_capture]
Expand Down
154 changes: 93 additions & 61 deletions tests/test_sigmffile.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ def test_equality(self):
class TestAnnotationHandling(unittest.TestCase):
def test_get_annotations_with_index(self):
"""Test that only annotations containing index are returned from get_annotations()"""
smf = SigMFFile(copy.deepcopy(TEST_METADATA))
smf.add_annotation(start_index=1)
smf.add_annotation(start_index=4, length=4)
annotations_idx10 = smf.get_annotations(index=10)
meta = SigMFFile(copy.deepcopy(TEST_METADATA))
meta.add_annotation(start_index=1)
meta.add_annotation(start_index=4, length=4)
annotations_idx10 = meta.get_annotations(index=10)
self.assertListEqual(
annotations_idx10,
[
Expand All @@ -96,26 +96,26 @@ def test_get_annotations_with_index(self):
],
)

def test__count_samples_from_annotation(self):
def test_sample_count_from_annotations(self):
"""Make sure sample count from annotations use correct end index"""
smf = SigMFFile(copy.deepcopy(TEST_METADATA))
smf.add_annotation(start_index=0, length=32)
smf.add_annotation(start_index=4, length=4)
sample_count = smf._count_samples()
meta = SigMFFile(copy.deepcopy(TEST_METADATA))
meta.add_annotation(start_index=0, length=32)
meta.add_annotation(start_index=4, length=4)
sample_count = meta._count_samples()
self.assertEqual(sample_count, 32)

def test_set_data_file_without_annotations(self):
"""
Make sure setting data_file with no annotations registered does not
raise any errors
"""
smf = SigMFFile(copy.deepcopy(TEST_METADATA))
smf._metadata[SigMFFile.ANNOTATION_KEY].clear()
meta = SigMFFile(copy.deepcopy(TEST_METADATA))
meta._metadata[SigMFFile.ANNOTATION_KEY].clear()
with tempfile.TemporaryDirectory() as tmpdir:
temp_path_data = Path(tmpdir) / "datafile"
TEST_FLOAT32_DATA.tofile(temp_path_data)
smf.set_data_file(temp_path_data)
samples = smf.read_samples()
meta.set_data_file(temp_path_data)
samples = meta.read_samples()
self.assertTrue(len(samples) == 16)

def test_set_data_file_with_annotations(self):
Expand All @@ -124,15 +124,15 @@ def test_set_data_file_with_annotations(self):
count from data_file and issue a warning if annotations have end
indices bigger than file end index
"""
smf = SigMFFile(copy.deepcopy(TEST_METADATA))
smf.add_annotation(start_index=0, length=32)
meta = SigMFFile(copy.deepcopy(TEST_METADATA))
meta.add_annotation(start_index=0, length=32)
with tempfile.TemporaryDirectory() as tmpdir:
temp_path_data = Path(tmpdir) / "datafile"
TEST_FLOAT32_DATA.tofile(temp_path_data)
with self.assertWarns(Warning):
# Issues warning since file ends before the final annotatio
smf.set_data_file(temp_path_data)
samples = smf.read_samples()
meta.set_data_file(temp_path_data)
samples = meta.read_samples()
self.assertTrue(len(samples) == 16)


Expand Down Expand Up @@ -222,9 +222,9 @@ def test_key_validity():

def test_ordered_metadata():
"""check to make sure the metadata is sorted as expected"""
sigf = SigMFFile()
meta = SigMFFile()
top_sort_order = ["global", "captures", "annotations"]
for kdx, key in enumerate(sigf.ordered_metadata()):
for kdx, key in enumerate(meta.ordered_metadata()):
assert kdx == top_sort_order.index(key)


Expand All @@ -249,7 +249,7 @@ def prepare(self, data: list, meta: dict, dtype: type, autoscale: bool = True) -
meta = sigmf.fromfile(self.temp_path_meta, skip_checksum=True, autoscale=autoscale)
return meta

def test_000(self) -> None:
def test_compliant_two_capture_recording(self) -> None:
"""compliant two-capture recording"""
meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8, autoscale=False)
self.assertEqual(256, meta._count_samples())
Expand All @@ -260,7 +260,7 @@ def test_000(self) -> None:
self.assertTrue(np.array_equal(np.array([]), meta.read_samples_in_capture(0)))
self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples_in_capture(1)))

def test_001(self) -> None:
def test_two_capture_with_header_trailing_bytes(self) -> None:
"""two capture recording with header_bytes and trailing_bytes set"""
meta = self.prepare(TEST_U8_DATA1, TEST_U8_META1, np.uint8, autoscale=False)
self.assertEqual(192, meta._count_samples())
Expand All @@ -270,7 +270,7 @@ def test_001(self) -> None:
self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0)))
self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1)))

def test_002(self) -> None:
def test_two_capture_with_multiple_header_bytes(self) -> None:
"""two capture recording with multiple header_bytes set"""
meta = self.prepare(TEST_U8_DATA2, TEST_U8_META2, np.uint8, autoscale=False)
self.assertEqual(192, meta._count_samples())
Expand All @@ -280,7 +280,7 @@ def test_002(self) -> None:
self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0)))
self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1)))

def test_003(self) -> None:
def test_three_capture_with_multiple_header_bytes(self) -> None:
"""three capture recording with multiple header_bytes set"""
meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8, autoscale=False)
self.assertEqual(192, meta._count_samples())
Expand All @@ -292,8 +292,8 @@ def test_003(self) -> None:
self.assertTrue(np.array_equal(np.arange(32, 128), meta.read_samples_in_capture(1)))
self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(2)))

def test_004(self) -> None:
"""two channel version of 000"""
def test_two_channel_capture_recording(self) -> None:
"""two channel version of compliant capture recording"""
meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False)
self.assertEqual(96, meta._count_samples())
self.assertFalse(meta._is_conforming_dataset())
Expand All @@ -302,20 +302,20 @@ def test_004(self) -> None:
self.assertTrue(np.array_equal(np.arange(64).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(0)))
self.assertTrue(np.array_equal(np.arange(64, 96).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(1)))

def test_slicing_ru8(self) -> None:
def test_slice_real_uint8(self) -> None:
"""slice real uint8"""
meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8, autoscale=False)
self.assertTrue(np.array_equal(meta[:], TEST_U8_DATA0))
self.assertTrue(np.array_equal(meta[6], TEST_U8_DATA0[6]))
self.assertTrue(np.array_equal(meta[1:-1], TEST_U8_DATA0[1:-1]))

def test_slicing_rf32(self) -> None:
def test_slice_real_float32(self) -> None:
"""slice real float32"""
meta = self.prepare(TEST_FLOAT32_DATA, TEST_METADATA, np.float32)
self.assertTrue(np.array_equal(meta[:], TEST_FLOAT32_DATA))
self.assertTrue(np.array_equal(meta[9], TEST_FLOAT32_DATA[9]))

def test_slicing_multiple_channels(self) -> None:
def test_slice_multiple_channels(self) -> None:
"""slice multiple channels"""

meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False)
Expand All @@ -325,7 +325,7 @@ def test_slicing_multiple_channels(self) -> None:
self.assertTrue(np.array_equal(meta[0], channelized[0]))
self.assertTrue(np.array_equal(meta[1, :], channelized[1]))

def test_boundaries(self) -> None:
def test_capture_byte_boundaries(self) -> None:
"""capture byte boundaries from pairs & archives"""
# get a meta pair and archive
meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8)
Expand All @@ -336,6 +336,41 @@ def test_boundaries(self) -> None:
self.assertEqual(meta.get_capture_byte_boundaries(bdx), arc.get_capture_byte_boundaries(bdx))
self.assertTrue(np.array_equal(meta.read_samples_in_capture(bdx), arc.read_samples_in_capture(bdx)))

def test_add_capture(self):
"""test basic capture addition"""
meta = SigMFFile()
meta.add_capture(start_index=0, metadata={})

def test_add_capture_metadata_merge(self):
"""test that adding capture with existing start_index properly merges metadata"""
meta = SigMFFile()

# add initial capture with some metadata
initial_meta = {"core:frequency": 915e6, "core:sample_rate": 1e6}
meta.add_capture(start_index=0, metadata=initial_meta)

# add capture with same start_index but additional metadata
additional_meta = {"core:datetime": "2026-03-17T10:00:00Z", "custom:gain": 30}
meta.add_capture(start_index=0, metadata=additional_meta)

# verify metadata was merged properly
captures = meta.get_captures()
self.assertEqual(len(captures), 1, "should have exactly one capture")

merged_capture = captures[0]
# original metadata should be preserved
self.assertEqual(merged_capture["core:frequency"], 915e6)
self.assertEqual(merged_capture["core:sample_rate"], 1e6)
# new metadata should be added
self.assertEqual(merged_capture["core:datetime"], "2026-03-17T10:00:00Z")
self.assertEqual(merged_capture["custom:gain"], 30)

def test_add_multiple_captures_and_annotations(self):
"""test adding multiple captures with annotations"""
meta = SigMFFile()
for idx in range(3):
simulate_capture(meta, idx, 1024)


def simulate_capture(sigmf_md, n, capture_len):
start_index = capture_len * n
Expand All @@ -352,38 +387,35 @@ def simulate_capture(sigmf_md, n, capture_len):
sigmf_md.add_annotation(start_index=start_index, length=capture_len, metadata=annotation_md)


def test_default_constructor():
SigMFFile()


def test_set_non_required_global_field():
sigf = SigMFFile()
sigf.set_global_field("this_is:not_in_the_schema", None)


def test_add_capture():
sigf = SigMFFile()
sigf.add_capture(start_index=0, metadata={})


def test_add_annotation():
sigf = SigMFFile()
sigf.add_capture(start_index=0)
meta = {"latitude": 40.0, "longitude": -105.0}
sigf.add_annotation(start_index=0, length=128, metadata=meta)


def test_fromarchive(test_sigmffile):
with tempfile.NamedTemporaryFile(suffix=".sigmf") as temp_file:
archive_path = test_sigmffile.archive(name=temp_file.name, overwrite=True)
result = sigmf.fromarchive(archive_path=archive_path)
assert result._metadata == test_sigmffile._metadata == TEST_METADATA


def test_add_multiple_captures_and_annotations():
sigf = SigMFFile()
for idx in range(3):
simulate_capture(sigf, idx, 1024)
class TestBasicFunctionality(unittest.TestCase):
"""test basic SigMFFile functionality"""

def test_default_constructor(self):
"""test default constructor"""
SigMFFile()

def test_set_non_required_global_field(self):
"""test setting field not in schema"""
meta = SigMFFile()
meta.set_global_field("this_is:not_in_the_schema", None)

def test_add_annotation(self):
"""test basic annotation addition"""
meta = SigMFFile()
meta.add_capture(start_index=0)
annot = {"latitude": 40.0, "longitude": -105.0}
meta.add_annotation(start_index=0, length=128, metadata=annot)

def test_load_from_archive(self):
"""test loading from archive"""
with tempfile.NamedTemporaryFile(suffix=".sigmf") as temp_file:
# create temporary data file
with tempfile.NamedTemporaryFile(suffix=".sigmf-data", delete=False) as data_file:
TEST_FLOAT32_DATA.tofile(data_file.name)
meta = SigMFFile(TEST_METADATA, data_file=data_file.name)
archive_path = meta.archive(name=temp_file.name, overwrite=True)
loopback = sigmf.fromarchive(archive_path=archive_path)
self.assertEqual(loopback._metadata, meta._metadata)


class TestOverwrite(unittest.TestCase):
Expand Down
Loading