From c3ccd56937d4c4e4c9bfadefa77cb1a375eed781 Mon Sep 17 00:00:00 2001 From: Andrew Bates <358901+abates@users.noreply.github.com> Date: Wed, 11 Feb 2026 14:54:40 -0500 Subject: [PATCH 1/2] feat: Provides a system to determine available interfaces in a range --- docs/user/include_jinja_list.md | 1 + netutils/interface.py | 80 +++++++++++++++++++++++++++++++++ netutils/utils.py | 1 + tests/unit/test_interface.py | 59 ++++++++++++++++++++++++ 4 files changed, 141 insertions(+) diff --git a/docs/user/include_jinja_list.md b/docs/user/include_jinja_list.md index 9841a339..dbd02e2b 100644 --- a/docs/user/include_jinja_list.md +++ b/docs/user/include_jinja_list.md @@ -28,6 +28,7 @@ | canonical_interface_name_list | netutils.interface.canonical_interface_name_list | | interface_range_compress | netutils.interface.interface_range_compress | | interface_range_expansion | netutils.interface.interface_range_expansion | +| slice_interface_range | netutils.interface.slice_interface_range | | sort_interface_list | netutils.interface.sort_interface_list | | split_interface | netutils.interface.split_interface | | cidr_to_netmask | netutils.ip.cidr_to_netmask | diff --git a/netutils/interface.py b/netutils/interface.py index 9cc1e22a..1c665923 100644 --- a/netutils/interface.py +++ b/netutils/interface.py @@ -458,6 +458,86 @@ def sort_interface_list(interfaces: t.List[str]) -> t.List[str]: return list(_iter_tree(root, [])) +def slice_interface_range( + interface_names: t.List[str], *cut_points: str) -> t.Tuple[t.Iterator[str], ...]: + """Slice a sorted list of interface names based on cut points. + + The interface names will be cut based on the values in the cut_points list. Each cut + point defines the start of its respective slice. A slice runs from its cut point + up to (but not including) the next cut point. The final slice contains all + remaining interface names from the last cut point to the end of the sorted list. This + method always returns the same number of iterators as cut points. However, if an + iterator of interface names runs out of names it will begin yielding None. + + This method can be used to help finding available interfaces for provisioning systems. + If connectivity for systems follows a pattern, then interface name slices can be used + to help provide selections for provisioning. For example, if a device has two upstream + connections and the first is always provisioned between Ethernet[1-24] and the second + between Ethernet[25-48] then two slices are needed. The input would be the total + list of available interfaces on the upstream device (those interfaces that are not + already connected to something) and the output would be two iterables within the + given ranges. Once the ranges are exhausted, the slices start yielding None. + + Args: + interfaces: A list of interface names to be sorted and sliced. + *cut_points: Interface names that define the start of each slice. The number + of iterables returned equals the number of cut points. + + Returns: + A tuple of :class:`InterfaceRangeSlicer.SliceIterator` instances, one per cut point. + Each iterator yields the next interface name in its slice yielding `None` once exhausted. + + Raises: + ValueError: If no cut points are provided. + + Examples: + >>> from netutils.interface import slice_interface_range + >>> slices = slice_interface_range(["Ethernet1", "Ethernet2", "Ethernet3", "Ethernet4"], "Ethernet1", "Ethernet3") + >>> next(slices[0]) + 'Ethernet1' + >>> next(slices[0]) + 'Ethernet2' + >>> next(slices[0]) is None + True + >>> next(slices[1]) + 'Ethernet3' + >>> next(slices[1]) + 'Ethernet4' + >>> next(slices[1]) is None + True + """ + if not cut_points: + raise ValueError("At least one cut point must be provided.") + + def interface_name_yielder(interface_names: t.List[str]): + _index = 0 + while True: + if _index < len(interface_names): + yield interface_names[_index] + _index += 1 + else: + yield None + + interface_names = sort_interface_list(interface_names) + sorted_cut_points = sort_interface_list(cut_points) # type: ignore + slices = [] + start_index = None + for i, interface in enumerate(interface_names): + if interface == sorted_cut_points[0] or _split_interface_tuple(sorted_cut_points[0]) < _split_interface_tuple(interface): + if start_index is not None: + slices.append(interface_name_yielder(interface_names[start_index:i])) + start_index = i + sorted_cut_points.pop(0) + + if len(sorted_cut_points) == 0: + slices.append(interface_name_yielder(interface_names[start_index:])) + break + + for _ in range(len(sorted_cut_points)): + slices.append(interface_name_yielder([])) + return tuple(slices) + + INTERFACE_LIST_ORDERING_OPTIONS = {"alphabetical": sort_interface_list} diff --git a/netutils/utils.py b/netutils/utils.py index cda16016..75190e86 100644 --- a/netutils/utils.py +++ b/netutils/utils.py @@ -28,6 +28,7 @@ "canonical_interface_name_list": "interface.canonical_interface_name_list", "abbreviated_interface_name": "interface.abbreviated_interface_name", "abbreviated_interface_name_list": "interface.abbreviated_interface_name_list", + "slice_interface_range": "interface.slice_interface_range", "sort_interface_list": "interface.sort_interface_list", "ip_to_hex": "ip.ip_to_hex", "ip_addition": "ip.ip_addition", diff --git a/tests/unit/test_interface.py b/tests/unit/test_interface.py index ab661778..72a3a18e 100644 --- a/tests/unit/test_interface.py +++ b/tests/unit/test_interface.py @@ -638,3 +638,62 @@ def test_abbreviated_interface_name_order_failure(): with pytest.raises(ValueError, match=r"weight is not one of the supported orderings"): data = {"interfaces": "SuperFastEth 1/0/1", "order": "weight"} interface.abbreviated_interface_name_list(**data) + + +SLICE_INTERFACE_RANGE = [ + { + "sent": {"interfaces": ["Ethernet1", "Ethernet2", "Ethernet3", "Ethernet4"], "cut_points": ["Ethernet1", "Ethernet3"]}, + "received": [["Ethernet1", "Ethernet2"], ["Ethernet3", "Ethernet4"]], + }, + { + "sent": {"interfaces": ["Ethernet2", "Ethernet3", "Ethernet4", "Ethernet6", "Ethernet7", "Ethernet8"], "cut_points": ["Ethernet1", "Ethernet5"]}, + "received": [["Ethernet2", "Ethernet3", "Ethernet4"], ["Ethernet6", "Ethernet7", "Ethernet8"]], + }, + { + "sent": {"interfaces": ["Ethernet1", "Ethernet2", "Ethernet3"], "cut_points": ["Ethernet1"]}, + "received": [["Ethernet1", "Ethernet2", "Ethernet3"]], + }, + { + "sent": {"interfaces": ["Ethernet1", "Ethernet2", "Ethernet3"], "cut_points": ["Ethernet2"]}, + "received": [["Ethernet2", "Ethernet3"]], + }, + { + "sent": { + "interfaces": ["Ethernet1", "Ethernet2", "Ethernet3", "Ethernet4", "Ethernet5", "Ethernet6"], + "cut_points": ["Ethernet1", "Ethernet2", "Ethernet4"], + }, + "received": [["Ethernet1"], ["Ethernet2", "Ethernet3"], ["Ethernet4", "Ethernet5", "Ethernet6"]], + }, + { + "sent": {"interfaces": ["Ethernet4", "Ethernet1", "Ethernet3", "Ethernet2"], "cut_points": ["Ethernet1", "Ethernet2"]}, + "received": [["Ethernet1"], ["Ethernet2", "Ethernet3", "Ethernet4"]], + }, + { + "sent": {"interfaces": ["Ethernet1"], "cut_points": ["Ethernet1"]}, + "received": [["Ethernet1"]], + }, +] + + +@pytest.mark.parametrize("data", SLICE_INTERFACE_RANGE) +def test_slice_interface_range(data): + slices = interface.slice_interface_range(data["sent"]["interfaces"], *data["sent"]["cut_points"]) + assert len(slices) == len(data["received"]) + for slice_iter, expected in zip(slices, data["received"]): + for expected_name in expected: + assert next(slice_iter) == expected_name + assert next(slice_iter) is None + + +def test_slice_interface_range_none_repeats(): + slices = interface.slice_interface_range(["Ethernet1"], "Ethernet1") + assert next(slices[0]) == "Ethernet1" + assert next(slices[0]) is None + assert next(slices[0]) is None + assert next(slices[0]) is None + + +def test_slice_interface_range_no_cut_points(): + with pytest.raises(ValueError, match="At least one cut point"): + interface.slice_interface_range(["Ethernet1", "Ethernet2"]) + From b89416052a0620e439612ad182c67eb760f33b0f Mon Sep 17 00:00:00 2001 From: Andrew Bates <358901+abates@users.noreply.github.com> Date: Thu, 12 Feb 2026 08:07:59 -0500 Subject: [PATCH 2/2] style:Corrected linting errors --- netutils/interface.py | 9 +++++---- tests/unit/test_interface.py | 16 ++++++++++++---- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/netutils/interface.py b/netutils/interface.py index 1c665923..9e008654 100644 --- a/netutils/interface.py +++ b/netutils/interface.py @@ -458,8 +458,7 @@ def sort_interface_list(interfaces: t.List[str]) -> t.List[str]: return list(_iter_tree(root, [])) -def slice_interface_range( - interface_names: t.List[str], *cut_points: str) -> t.Tuple[t.Iterator[str], ...]: +def slice_interface_range(interface_names: t.List[str], *cut_points: str) -> t.Tuple[t.Iterator[str], ...]: """Slice a sorted list of interface names based on cut points. The interface names will be cut based on the values in the cut_points list. Each cut @@ -519,11 +518,13 @@ def interface_name_yielder(interface_names: t.List[str]): yield None interface_names = sort_interface_list(interface_names) - sorted_cut_points = sort_interface_list(cut_points) # type: ignore + sorted_cut_points = sort_interface_list(cut_points) # type: ignore slices = [] start_index = None for i, interface in enumerate(interface_names): - if interface == sorted_cut_points[0] or _split_interface_tuple(sorted_cut_points[0]) < _split_interface_tuple(interface): + if interface == sorted_cut_points[0] or _split_interface_tuple(sorted_cut_points[0]) < _split_interface_tuple( + interface + ): if start_index is not None: slices.append(interface_name_yielder(interface_names[start_index:i])) start_index = i diff --git a/tests/unit/test_interface.py b/tests/unit/test_interface.py index 72a3a18e..ce182280 100644 --- a/tests/unit/test_interface.py +++ b/tests/unit/test_interface.py @@ -642,11 +642,17 @@ def test_abbreviated_interface_name_order_failure(): SLICE_INTERFACE_RANGE = [ { - "sent": {"interfaces": ["Ethernet1", "Ethernet2", "Ethernet3", "Ethernet4"], "cut_points": ["Ethernet1", "Ethernet3"]}, + "sent": { + "interfaces": ["Ethernet1", "Ethernet2", "Ethernet3", "Ethernet4"], + "cut_points": ["Ethernet1", "Ethernet3"], + }, "received": [["Ethernet1", "Ethernet2"], ["Ethernet3", "Ethernet4"]], }, { - "sent": {"interfaces": ["Ethernet2", "Ethernet3", "Ethernet4", "Ethernet6", "Ethernet7", "Ethernet8"], "cut_points": ["Ethernet1", "Ethernet5"]}, + "sent": { + "interfaces": ["Ethernet2", "Ethernet3", "Ethernet4", "Ethernet6", "Ethernet7", "Ethernet8"], + "cut_points": ["Ethernet1", "Ethernet5"], + }, "received": [["Ethernet2", "Ethernet3", "Ethernet4"], ["Ethernet6", "Ethernet7", "Ethernet8"]], }, { @@ -665,7 +671,10 @@ def test_abbreviated_interface_name_order_failure(): "received": [["Ethernet1"], ["Ethernet2", "Ethernet3"], ["Ethernet4", "Ethernet5", "Ethernet6"]], }, { - "sent": {"interfaces": ["Ethernet4", "Ethernet1", "Ethernet3", "Ethernet2"], "cut_points": ["Ethernet1", "Ethernet2"]}, + "sent": { + "interfaces": ["Ethernet4", "Ethernet1", "Ethernet3", "Ethernet2"], + "cut_points": ["Ethernet1", "Ethernet2"], + }, "received": [["Ethernet1"], ["Ethernet2", "Ethernet3", "Ethernet4"]], }, { @@ -696,4 +705,3 @@ def test_slice_interface_range_none_repeats(): def test_slice_interface_range_no_cut_points(): with pytest.raises(ValueError, match="At least one cut point"): interface.slice_interface_range(["Ethernet1", "Ethernet2"]) -