-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeflowControl.py
More file actions
59 lines (46 loc) · 1.51 KB
/
pipeflowControl.py
File metadata and controls
59 lines (46 loc) · 1.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import json
import subprocess
from pipeflowData import *
class PipeflowControl:
def __init__(self, data):
self.data = data
self.log = data.log
# Command helpers
def set_metadata(self, args):
subprocess.Popen(["pw-metadata"] + args)
def set_param(self, args):
subprocess.run(["pw-cli", "set-param"] + args, stdout=subprocess.DEVNULL)
def pw_link(self, args):
subprocess.Popen(["pw-link"] + args)
# Control functions
def update_meta(self, meta, is_sink):
if is_sink:
data = json.dumps({
"name": meta.default_sink
})
self.set_metadata(["0", "default.audio.sink", data])
self.set_metadata(["0", "default.configured.audio.sink", data])
else:
data = json.dumps({
"name": meta.default_source
})
self.set_metadata(["0", "default.audio.source", data])
self.set_metadata(["0", "default.configured.audio.sink", data])
def update_mute(self, node):
data = json.dumps({
"mute": node.mute
})
self.set_param([str(node.node_id), "Props", data])
def update_volume(self, node):
vols = [ c.chn_volume for c in node.channels if not c.placeholder ]
#print("Set volumes ", vols)
data = json.dumps({
"channelVolumes": vols
})
self.set_param([str(node.node_id), "Props", data])
def add_link(self, inport, outport):
self.pw_link([str(outport.port_id), str(inport.port_id)])
self.log.debug("ADDED link %d -> %d", inport.port_label, outport.port_label)
def remove_link(self, link):
self.pw_link(["-d", str(link.link_id)])
self.log.debug("REMOVED link %d", link.link_id)