-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnm-wifi-ssid.py
More file actions
executable file
·198 lines (163 loc) · 7.81 KB
/
nm-wifi-ssid.py
File metadata and controls
executable file
·198 lines (163 loc) · 7.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/python3
"""Start/stop sytemd user targets according to Network Manager's current WiFi SSID."""
import signal
import collections
import dbus
# Needed for the dbus mainloop
import dbus.mainloop.glib
from gi.repository import GLib
# I could theoretically do the D-Bus stuff myself,
# but this module should make things easier
import NetworkManager
# This module is only useful for running as a systemd unit to update the state of this unit,
# it doesn't help query systemd's unit status
import systemd.daemon
# I couldn't figure out how to make this module interact with the --user manager
# import pystemd
SYSTEMD_UNIT_TYPE = 'target'
SYSTEMD_VPN_UNIT_PREFIX = 'nm-vpn-id@'
SYSTEMD_WIFI_UNIT_PREFIX = 'nm-wifi-ssid@'
systemd_unit_info = collections.namedtuple('systemd_unit', field_names=[
'name', # The primary unit name as string
'description', # The human readable description string
'load', # The load state (i.e. whether the unit file has been loaded successfully)
'active', # The active state (i.e. whether the unit is currently started or not)
'sub', # The sub state (a more fine-grained version of the active state that is specific to the unit type,
# which the active state is not)
'following_unit', # A unit that is being followed in its state by this unit, if there is any, otherwise the empty string.
'unit_object_path', # The unit object path
'job_id', # If there is a job queued for the job unit, the numeric job id, 0 otherwise
'job_type', # The job type as string
'job_object_path' # The job object path
])
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
session_bus = dbus.SessionBus()
systemd1 = session_bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
systemd1_manager = dbus.Interface(systemd1, 'org.freedesktop.systemd1.Manager')
def systemd_escape(string_to_escape):
"""
Escape unit names for use with systemd.
Rough attempt to recreate systemd-escape in Python,
but I couldn't make sense of the source code.
So this is based on the test functions, and my own anecdotal usage
"""
# r = unit_name_escape("ab+-c.a/bc@foo.service");
# assert_se(r);
# assert_se(streq(r, "ab\\x2b\\x2dc.a-bc\\x40foo.service"));
assert isinstance(string_to_escape, str)
escaped_string = []
for c in string_to_escape:
if c == '+':
c = '\\x2b'
elif c == '/':
c = '-'
# Must be careful where these are done
elif c == '-':
c = '\\x2d'
elif c == '@':
c = '\\x40'
# Not mentioned in the test cases, but were needed
elif c == ' ':
c = '\\x20'
elif c == '!':
c = '\\x21'
escaped_string.append(c)
return ''.join(escaped_string)
def get_systemd_units(unit_type=None, unit_state=(), startswith=()):
"""Get systemd units with option filtering."""
all_units = (systemd_unit_info(*u) for u in systemd1_manager.ListUnits())
for unit in all_units:
if unit_type and unit.name.rpartition('.')[-1] != unit_type:
continue
if unit_state and not any(s in (unit.load, unit.active, unit.sub) for s in unit_state):
continue
if startswith and not any((unit.name.startswith(prefix) for prefix in startswith)):
continue
yield unit
def bulk_update_systemd_targets(*connections):
"""Stop/start the necessary systemd targets for the given list of SSIDs."""
systemd_unit_names = [u.name for u in get_systemd_units(unit_type=SYSTEMD_UNIT_TYPE,
unit_state=('active',),
startswith=(SYSTEMD_WIFI_UNIT_PREFIX, SYSTEMD_VPN_UNIT_PREFIX))]
# Start any units that might need starting, and keep track of which ones got started
needed_unit_names = []
for conn in connections:
if unit_name := update_target_for_connection(*conn):
needed_unit_names.append(unit_name)
# Stop the running units that should not be running
for unit_name in systemd_unit_names:
if unit_name not in needed_unit_names:
print("Stopping unit", unit_name)
systemd1_manager.StopUnit(unit_name, 'fail')
def update_target_for_connection(conn_type, conn_state, conn_id):
"""
Start/stop the associated systemd unit for the given connection.
NOTE: This doesn't care whether the unit is currently running or not,
since systemd ensures nothing happens if I tell it to go to the state it's already in.
"""
if conn_type in ('wireguard', 'vpn'):
prefix = SYSTEMD_VPN_UNIT_PREFIX
else:
# FIXME: There's a type for this, use an elif for it, then raise NotImplementedError on else
prefix = SYSTEMD_WIFI_UNIT_PREFIX
unit_name = f'{prefix}{systemd_escape(conn_id)}.{SYSTEMD_UNIT_TYPE}'
if conn_state in (NetworkManager.NM_ACTIVE_CONNECTION_STATE_ACTIVATING,
NetworkManager.NM_ACTIVE_CONNECTION_STATE_ACTIVATED):
print("Starting unit", unit_name)
systemd1_manager.StartUnit(unit_name, 'fail')
return unit_name
else:
print("Stopping unit", unit_name)
systemd1_manager.StopUnit(unit_name, 'fail')
return False
def get_current_connections(active_connections=None):
"""
Get the connection state of all currently active WiFi & VPN connections.
Ignores any that it can't find the identifier.
"""
if active_connections is None:
active_connections = NetworkManager.NetworkManager.ActiveConnections
for conn in active_connections:
try:
settings = conn.Connection.GetSettings()
ssid = settings.get('802-11-wireless', {}).get('ssid')
conn_id = settings.get('connection', {}).get('id')
if ssid:
yield (settings.get('connection', {}).get('type'), conn.State, ssid)
elif 'vpn' in settings or 'wireguard' in settings:
# Not a WiFi connection, maybe VPN though
yield (settings.get('connection', {}).get('type'), conn.State, conn_id)
except NetworkManager.ObjectVanished:
# ActiveConnection disappeared while we were processing the info.
# We can't do anything about it, so just ignore it
pass
def on_change(*args, **kwargs):
"""Just update everything when anything changes."""
# assert nm is NetworkManager.NetworkManager
# assert interface == 'org.freedesktop.NetworkManager'
# assert signal == 'PropertiesChanged'
bulk_update_systemd_targets(*get_current_connections())
# org.freedesktop.DBus.Properties.PropertiesChanged
#NetworkManager.NetworkManager.OnPropertiesChanged(on_properties_change)
#NetworkManager.NetworkManager.connect_to_signal("PropertiesChanged", on_properties_change)
# FIXME: What of these do I **actually** care about?
# https://mail.gnome.org/archives/networkmanager-list/2021-July/msg00013.html
# FIXME: Disconnecting from a fortinet VPN is not being noticed here until something else triggers an update.
# I suspect there's a different trigger I should be working with for that.
NetworkManager.NetworkManager.OnStateChanged(on_change)
NetworkManager.NetworkManager.OnDeviceAdded(on_change)
NetworkManager.NetworkManager.OnDeviceRemoved(on_change)
bulk_update_systemd_targets(*get_current_connections())
loop = GLib.MainLoop()
def cleanup(*args, **kwargs):
"""Cleanup our mess on exit."""
bulk_update_systemd_targets() # Easiest way to stop all units we control
loop.quit()
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
systemd.daemon.notify('READY=1')
try:
loop.run()
except KeyboardInterrupt:
systemd.daemon.notify('STOPPING=1')
cleanup()