-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathipc.py
More file actions
137 lines (107 loc) · 3.88 KB
/
ipc.py
File metadata and controls
137 lines (107 loc) · 3.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""IPC client to communicate with the Gateway."""
from __future__ import print_function
import functools
import json
import jsonschema
import os
import threading
import time
import websocket
from .constants import MessageType
_IPC_PORT = 9500
_SCHEMA_DIR = os.path.realpath(
os.path.join(os.path.dirname(__file__), 'schema')
)
print = functools.partial(print, flush=True)
class Resolver(jsonschema.RefResolver):
"""Resolver for $ref members in schemas."""
def __init__(self):
"""Initialize the resolver."""
jsonschema.RefResolver.__init__(
self,
base_uri='',
referrer=None,
cache_remote=True,
)
def resolve_remote(self, uri):
"""
Resolve a remote URI. We only look locally.
uri -- the URI to resolve
"""
name = uri.split('/')[-1]
local = os.path.join(_SCHEMA_DIR, 'messages', name)
if os.path.exists(local):
with open(local, 'rt') as f:
return json.load(f)
else:
print('Unable to find referenced schema:', name)
class IpcClient:
"""IPC client which can communicate between the Gateway and an add-on."""
def __init__(self, plugin_id, on_message, verbose=False):
"""
Initialize the object.
plugin_id -- ID of this plugin
on_message -- message handler
verbose -- whether or not to enable verbose logging
"""
with open(os.path.join(_SCHEMA_DIR, 'schema.json'), 'rt') as f:
schema = json.load(f)
self.plugin_id = plugin_id
self.verbose = verbose
self.owner_message_handler = on_message
self.validator = jsonschema.Draft7Validator(
schema=schema,
resolver=Resolver()
)
self.registered = False
self.ws = websocket.WebSocketApp(
'ws://127.0.0.1:{}/'.format(_IPC_PORT),
on_open=self.on_open,
on_message=self.on_message,
)
self.thread = threading.Thread(target=self.ws.run_forever)
self.thread.daemon = True
self.thread.start()
while not self.registered:
time.sleep(0.01)
def on_open(self, _):
"""Event handler for WebSocket opening."""
if self.verbose:
print('IpcClient: Connected to server, registering...')
try:
self.ws.send(json.dumps({
'messageType': MessageType.PLUGIN_REGISTER_REQUEST,
'data': {
'pluginId': self.plugin_id,
}
}))
except websocket.WebSocketException as e:
print('IpcClient: Failed to send message: {}'.format(e))
return
def on_message(self, _, message):
"""
Event handler for WebSocket messages.
message -- the received message
"""
try:
if os.path.exists('/boot/firmware/developer.txt'):
print('gateway_addon: on_message: \n' + str(message))
resp = json.loads(message)
self.validator.validate({'message': resp})
if resp['messageType'] == MessageType.PLUGIN_REGISTER_RESPONSE:
if self.verbose:
print('IpcClient: Registered with PluginServer')
self.gateway_version = resp['data']['gatewayVersion']
self.user_profile = resp['data']['userProfile']
self.preferences = resp['data']['preferences']
self.registered = True
else:
self.owner_message_handler(resp)
except ValueError:
print('IpcClient: Unexpected registration reply from gateway: {}'
.format(resp))
except jsonschema.exceptions.ValidationError:
print('Invalid message received:', resp)
def close(self):
"""Close the WebSocket."""
self.ws.close()