Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 62 additions & 46 deletions PyStageLinQ/PyStageLinQ.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import platform
import psutil
import ipaddress
import os
from typing import Callable

from . import Device
Expand Down Expand Up @@ -52,42 +53,53 @@ def get_interface_from_ip(self, ip):
ip_list = ip
else:
raise TypeError
logger.info(
f"Found {len(psutil.net_if_stats().items())} total network interfaces, listing IPv4 interfaces:"
)

for interface in psutil.net_if_stats().items():
for interface_info in psutil.net_if_addrs()[interface[0]]:
# Only look for IPV4 binds
if socket.AF_INET == interface_info.family and (
interface_info.address in ip_list or ip_list[0] == "any"
):
self.target_interfaces.append(
PyStageLinQ_interface_info(
interface[0],
len(self.target_interfaces),
int(ipaddress.IPv4Address(interface_info.address)),
interface_info.address,
int(ipaddress.IPv4Address(interface_info.netmask)),
interface[1],
0,
if socket.AF_INET == interface_info.family:
logger.info(f" - {interface[0]}: {interface_info.address}")
if interface_info.address in ip_list or ip_list[0] == "any":
self.target_interfaces.append(
PyStageLinQ_interface_info(
interface[0],
len(self.target_interfaces),
int(ipaddress.IPv4Address(interface_info.address)),
interface_info.address,
int(ipaddress.IPv4Address(interface_info.netmask)),
interface[1],
0,
)
)
)

logger.info(
f"{len(self.target_interfaces)} interfaces matched with requested interfaces and will be used by PyStageLinQ:"
)
for iface in self.target_interfaces:
logger.info(f" - {iface.name}: {iface.addr_str}")

def send_discovery_frame(self, discovery_frame):
for interface in self.target_interfaces:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as discovery_socket:
discovery_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
discovery_socket.bind((interface.addr_str, 0))
try:
try:
with socket.socket(
socket.AF_INET, socket.SOCK_DGRAM
) as discovery_socket:
discovery_socket.setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, 1
)
discovery_socket.bind((interface.addr_str, 0))
discovery_socket.sendto(
discovery_frame,
("255.255.255.255", self.discovery_port),
)
self.target_interfaces[interface.id].n_disc_msg_send += 1
except PermissionError:
logger.warning(
f"Cannot send message on interface {interface.name}, "
f"this error could be due to that there is no network interface set up with this IP range"
)
raise PermissionError
except Exception as e:
logger.debug(
f"Cannot send discovery on interface {interface.name} ({interface.addr_str}): {e}"
)

def determine_interface_of_remote_ip(self, ip):
for interface in self.target_interfaces:
Expand All @@ -99,13 +111,13 @@ def determine_interface_of_remote_ip(self, ip):
return None

def send_desc_on_all_if(self):
# Check if at least one working interface has sent 3+ discovery messages
any_working = False
for interface in self.target_interfaces:
# Wait until a few discovery frames have been sent to make sure the other devices have seen us. If they have
# not and we are asking for services it will be denied.
if interface.n_disc_msg_send < 3:
return False

return True
# Skip interfaces that have failed (never sent any messages after multiple attempts)
if interface.n_disc_msg_send >= 3:
any_working = True
return any_working


class PyStageLinQ:
Expand Down Expand Up @@ -181,17 +193,13 @@ def start(self):

def _stop(self):
logger.info(f"Stop requested, trying graceful shutdown")
try:
discovery = StageLinQDiscovery()
discovery_info = self.discovery_info
discovery_info.ConnectionType = ConnectionTypes.EXIT
discovery_frame = discovery.encode_frame(discovery_info)
discovery = StageLinQDiscovery()
discovery_info = self.discovery_info
discovery_info.ConnectionType = ConnectionTypes.EXIT
discovery_frame = discovery.encode_frame(discovery_info)

self.network_interface.send_discovery_frame(discovery_frame)
logger.info(f"Gracefully shutdown complete")
except Exception as e:
logger.debug('Could not send "EXIT" discovery frame during shutdown')
raise e
self.network_interface.send_discovery_frame(discovery_frame)
logger.info(f"Gracefully shutdown complete")

def _announce_self(self):
discovery = StageLinQDiscovery()
Expand All @@ -214,16 +222,18 @@ async def _discover_stagelinq_device(self, host_ip, timeout=10):
# Create socket
discover_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

bind_ip = "" if os.name == "posix" else host_ip

try:
discover_socket.bind(
(host_ip, self.StageLinQ_discovery_port)
(bind_ip, self.StageLinQ_discovery_port)
) # bind socket to broadcast
except Exception as e:
# Cannot bind to socket, check if IP is correct and link is up
logger.warning(
logger.debug(
f"Cannot bind to IP socket: {host_ip} on port {self.StageLinQ_discovery_port}"
)
raise e
return PyStageLinQError.CANNOTBINDSOCKET
discover_socket.setblocking(False)

loop_timeout = time.time() + timeout
Expand Down Expand Up @@ -361,12 +371,18 @@ async def _periodic_announcement(self):

async def _py_stagelinq_strapper(self):
strapper_tasks = set()
logger.info(f"Starting to look for StageLinQ discovery frames:")

strapper_tasks.add(
asyncio.create_task(self._discover_stagelinq_device("", timeout=2))
logger.info(
f"Looking for discovery frames on {len(self.network_interface.target_interfaces)} IP addresses:"
)

for interface in self.network_interface.target_interfaces:
logger.info(f"{interface.addr_str}")
strapper_tasks.add(
asyncio.create_task(
self._discover_stagelinq_device(interface.addr_str, timeout=2)
)
)

while self.get_loop_condition():
all_tasks_done = True
for task in strapper_tasks.copy():
Expand Down
8 changes: 7 additions & 1 deletion tests/Main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def state_map_data_print(data):
def main():
logging.basicConfig(level=logging.INFO)
global PrimeGo
ip_choice = 1
ip_choice = 2
match ip_choice:
case 0:
PrimeGo = PyStageLinQ.PyStageLinQ(
Expand All @@ -69,6 +69,12 @@ def main():
name="Jaxcie StageLinQ",
ip=["169.254.13.37", "127.0.0.1"],
)
case 3:
# should fail
PrimeGo = PyStageLinQ.PyStageLinQ(
new_device_found_callback, name="Jaxcie StageLinQ", ip="127.0.0.1"
)

PrimeGo.start_standalone()


Expand Down
23 changes: 19 additions & 4 deletions tests/unit/test_unit_PyStageLinQ.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ async def test_discover_stagelinq_device_bind_error(

dummy_socket.socket.return_value.bind.side_effect = Exception()

with pytest.raises(Exception) as exception:
assert (
await dummy_pystagelinq._discover_stagelinq_device(dummy_ip)

assert exception.type is Exception
== PyStageLinQError.CANNOTBINDSOCKET
)


def test_get_loop_condition(dummy_pystagelinq):
Expand Down Expand Up @@ -176,7 +176,7 @@ async def test_discover_stagelinq_check_initialization(
dummy_socket.AF_INET, dummy_socket.SOCK_DGRAM
)
dummy_socket.socket.return_value.bind.assert_called_once_with(
(dummy_ip, dummy_pystagelinq.StageLinQ_discovery_port)
("", dummy_pystagelinq.StageLinQ_discovery_port)
)
dummy_socket.socket.return_value.setblocking.assert_called_once_with(False)

Expand Down Expand Up @@ -832,6 +832,11 @@ async def test_py_stagelinq_strapper(dummy_pystagelinq, monkeypatch):
monkeypatch.setattr(
dummy_pystagelinq, "_discover_stagelinq_device", discover_device_mock
)
monkeypatch.setattr(
dummy_pystagelinq.network_interface,
"target_interfaces",
[PyStageLinQ.PyStageLinQ.PyStageLinQ_interface_info("", 0, 0, "", 0, "", 0)],
)

await dummy_pystagelinq._py_stagelinq_strapper()

Expand All @@ -850,6 +855,11 @@ async def test_py_stagelinq_strapper_loop_condition_false(
monkeypatch.setattr(
dummy_pystagelinq, "_discover_stagelinq_device", discover_device_mock
)
monkeypatch.setattr(
dummy_pystagelinq.network_interface,
"target_interfaces",
[PyStageLinQ.PyStageLinQ.PyStageLinQ_interface_info("", 0, 0, "", 0, "", 0)],
)

await dummy_pystagelinq._py_stagelinq_strapper()

Expand Down Expand Up @@ -884,6 +894,11 @@ async def test_py_stagelinq_strapper_task_exception(
monkeypatch.setattr(
dummy_pystagelinq, "_discover_stagelinq_device", discover_device_mock
)
monkeypatch.setattr(
dummy_pystagelinq.network_interface,
"target_interfaces",
[PyStageLinQ.PyStageLinQ.PyStageLinQ_interface_info("", 0, 0, "", 0, "", 0)],
)

get_loop_condition_mock.side_effect = [True, False]

Expand Down
3 changes: 1 addition & 2 deletions tests/unit/test_unit_PyStageLinQ_interface_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,7 @@ def test_send_discovery_frame_permission_error(

monkeypatch.setattr(PyStageLinQ.PyStageLinQ, "socket", dummy_socket)

with pytest.raises(PermissionError) as exception:
dummy_PyStageLinQ_network_interface.send_discovery_frame(dummy_discovery_frame)
dummy_PyStageLinQ_network_interface.send_discovery_frame(dummy_discovery_frame)

dummy_socket.socket.assert_called_once_with(
dummy_socket.AF_INET, dummy_socket.SOCK_DGRAM
Expand Down