Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/user/include_jinja_list.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
| canonical_interface_name_list | netutils.interface.canonical_interface_name_list |
| interface_range_compress | netutils.interface.interface_range_compress |
| interface_range_expansion | netutils.interface.interface_range_expansion |
| slice_interface_range | netutils.interface.slice_interface_range |
| sort_interface_list | netutils.interface.sort_interface_list |
| split_interface | netutils.interface.split_interface |
| cidr_to_netmask | netutils.ip.cidr_to_netmask |
Expand Down
81 changes: 81 additions & 0 deletions netutils/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,87 @@ def sort_interface_list(interfaces: t.List[str]) -> t.List[str]:
return list(_iter_tree(root, []))


def slice_interface_range(interface_names: t.List[str], *cut_points: str) -> t.Tuple[t.Iterator[str], ...]:
"""Slice a sorted list of interface names based on cut points.

The interface names will be cut based on the values in the cut_points list. Each cut
point defines the start of its respective slice. A slice runs from its cut point
up to (but not including) the next cut point. The final slice contains all
remaining interface names from the last cut point to the end of the sorted list. This
Comment on lines +466 to +467
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me, but in my mind should also be extended to start. Having to always define the first element (Ethernet1 in tests below) seems off to me.

I could understand either both start and end needing to be explicit or neither being explicit, but start being explicit and end being implicit is a bit confusing to me.

method always returns the same number of iterators as cut points. However, if an
iterator of interface names runs out of names it will begin yielding None.

This method can be used to help finding available interfaces for provisioning systems.
If connectivity for systems follows a pattern, then interface name slices can be used
to help provide selections for provisioning. For example, if a device has two upstream
connections and the first is always provisioned between Ethernet[1-24] and the second
between Ethernet[25-48] then two slices are needed. The input would be the total
list of available interfaces on the upstream device (those interfaces that are not
already connected to something) and the output would be two iterables within the
given ranges. Once the ranges are exhausted, the slices start yielding None.

Args:
interfaces: A list of interface names to be sorted and sliced.
*cut_points: Interface names that define the start of each slice. The number
of iterables returned equals the number of cut points.

Returns:
A tuple of :class:`InterfaceRangeSlicer.SliceIterator` instances, one per cut point.
Each iterator yields the next interface name in its slice yielding `None` once exhausted.

Raises:
ValueError: If no cut points are provided.

Examples:
>>> from netutils.interface import slice_interface_range
>>> slices = slice_interface_range(["Ethernet1", "Ethernet2", "Ethernet3", "Ethernet4"], "Ethernet1", "Ethernet3")
>>> next(slices[0])
'Ethernet1'
>>> next(slices[0])
'Ethernet2'
>>> next(slices[0]) is None
True
>>> next(slices[1])
'Ethernet3'
>>> next(slices[1])
'Ethernet4'
>>> next(slices[1]) is None
True
"""
if not cut_points:
raise ValueError("At least one cut point must be provided.")

def interface_name_yielder(interface_names: t.List[str]):
_index = 0
while True:
if _index < len(interface_names):
yield interface_names[_index]
_index += 1
else:
yield None

interface_names = sort_interface_list(interface_names)
sorted_cut_points = sort_interface_list(cut_points) # type: ignore
slices = []
start_index = None
for i, interface in enumerate(interface_names):
if interface == sorted_cut_points[0] or _split_interface_tuple(sorted_cut_points[0]) < _split_interface_tuple(
interface
):
if start_index is not None:
slices.append(interface_name_yielder(interface_names[start_index:i]))
start_index = i
sorted_cut_points.pop(0)

if len(sorted_cut_points) == 0:
slices.append(interface_name_yielder(interface_names[start_index:]))
break

for _ in range(len(sorted_cut_points)):
slices.append(interface_name_yielder([]))
return tuple(slices)


INTERFACE_LIST_ORDERING_OPTIONS = {"alphabetical": sort_interface_list}


Expand Down
1 change: 1 addition & 0 deletions netutils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"canonical_interface_name_list": "interface.canonical_interface_name_list",
"abbreviated_interface_name": "interface.abbreviated_interface_name",
"abbreviated_interface_name_list": "interface.abbreviated_interface_name_list",
"slice_interface_range": "interface.slice_interface_range",
"sort_interface_list": "interface.sort_interface_list",
"ip_to_hex": "ip.ip_to_hex",
"ip_addition": "ip.ip_addition",
Expand Down
67 changes: 67 additions & 0 deletions tests/unit/test_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,3 +638,70 @@ def test_abbreviated_interface_name_order_failure():
with pytest.raises(ValueError, match=r"weight is not one of the supported orderings"):
data = {"interfaces": "SuperFastEth 1/0/1", "order": "weight"}
interface.abbreviated_interface_name_list(**data)


SLICE_INTERFACE_RANGE = [
{
"sent": {
"interfaces": ["Ethernet1", "Ethernet2", "Ethernet3", "Ethernet4"],
"cut_points": ["Ethernet1", "Ethernet3"],
},
"received": [["Ethernet1", "Ethernet2"], ["Ethernet3", "Ethernet4"]],
},
{
"sent": {
"interfaces": ["Ethernet2", "Ethernet3", "Ethernet4", "Ethernet6", "Ethernet7", "Ethernet8"],
"cut_points": ["Ethernet1", "Ethernet5"],
},
"received": [["Ethernet2", "Ethernet3", "Ethernet4"], ["Ethernet6", "Ethernet7", "Ethernet8"]],
},
{
"sent": {"interfaces": ["Ethernet1", "Ethernet2", "Ethernet3"], "cut_points": ["Ethernet1"]},
"received": [["Ethernet1", "Ethernet2", "Ethernet3"]],
},
{
"sent": {"interfaces": ["Ethernet1", "Ethernet2", "Ethernet3"], "cut_points": ["Ethernet2"]},
"received": [["Ethernet2", "Ethernet3"]],
},
{
"sent": {
"interfaces": ["Ethernet1", "Ethernet2", "Ethernet3", "Ethernet4", "Ethernet5", "Ethernet6"],
"cut_points": ["Ethernet1", "Ethernet2", "Ethernet4"],
},
"received": [["Ethernet1"], ["Ethernet2", "Ethernet3"], ["Ethernet4", "Ethernet5", "Ethernet6"]],
},
{
"sent": {
"interfaces": ["Ethernet4", "Ethernet1", "Ethernet3", "Ethernet2"],
"cut_points": ["Ethernet1", "Ethernet2"],
},
"received": [["Ethernet1"], ["Ethernet2", "Ethernet3", "Ethernet4"]],
},
{
"sent": {"interfaces": ["Ethernet1"], "cut_points": ["Ethernet1"]},
"received": [["Ethernet1"]],
},
]


@pytest.mark.parametrize("data", SLICE_INTERFACE_RANGE)
def test_slice_interface_range(data):
slices = interface.slice_interface_range(data["sent"]["interfaces"], *data["sent"]["cut_points"])
assert len(slices) == len(data["received"])
for slice_iter, expected in zip(slices, data["received"]):
for expected_name in expected:
assert next(slice_iter) == expected_name
assert next(slice_iter) is None


def test_slice_interface_range_none_repeats():
slices = interface.slice_interface_range(["Ethernet1"], "Ethernet1")
assert next(slices[0]) == "Ethernet1"
assert next(slices[0]) is None
assert next(slices[0]) is None
assert next(slices[0]) is None


def test_slice_interface_range_no_cut_points():
with pytest.raises(ValueError, match="At least one cut point"):
interface.slice_interface_range(["Ethernet1", "Ethernet2"])
Loading