-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathmonitor.py
More file actions
executable file
·93 lines (74 loc) · 2.75 KB
/
monitor.py
File metadata and controls
executable file
·93 lines (74 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/env python
import importlib
import time
import inspect
import pkgutil
import rospy
import rosgraph_monitor.observers
from controller_manager_msgs.srv import *
def iter_namespace(ns_pkg):
return pkgutil.iter_modules(ns_pkg.__path__, ns_pkg.__name__ + ".")
class ModuleManager(object):
def __init__(self):
self._modules = {}
self._observers = {}
rospy.Service('/load_observer', LoadController, self.handle_load)
rospy.Service('/unload_observer', UnloadController, self.handle_unload)
rospy.Service('/active_observers',
ListControllerTypes, self.handle_active)
rospy.Service('/list_observers',
ListControllerTypes, self.handle_types)
def handle_load(self, req):
started = self.start_observer(req.name)
return LoadControllerResponse(started)
def handle_unload(self, req):
stopped = self.stop_observer(req.name)
return UnloadControllerResponse(stopped)
def handle_active(self, req):
names = self._observers.keys()
return ListControllerTypesResponse(names, [])
def handle_types(self, req):
names = self._modules.keys()
return ListControllerTypesResponse(names, [])
def load_observers(self):
available_plugins = {
name: importlib.import_module(name)
for finder, name, ispkg
in iter_namespace(rosgraph_monitor.observers)
}
self._modules = self._get_leaf_nodes(
rosgraph_monitor.observer.Observer)
def start_observer(self, name):
started = True
try:
module = self._modules[name]
self._observers[name] = getattr(module, name)(name)
self._observers[name].start()
except Exception as exc:
print("Could not start {}".format(name))
started = False
return started
def stop_observer(self, name):
stopped = True
try:
self._observers[name].stop()
del self._observers[name]
except Exception as exc:
print("Could not stop {}".format(name))
stopped = False
return stopped
def _get_leaf_nodes(self, root):
leafs = {}
self._collect_leaf_nodes(root, leafs)
return leafs
def _collect_leaf_nodes(self, node, leafs):
if node is not None: # change this to see if it is class
if len(node.__subclasses__()) == 0:
leafs[node.__name__] = inspect.getmodule(node)
for n in node.__subclasses__():
self._collect_leaf_nodes(n, leafs)
if __name__ == "__main__":
rospy.init_node('graph_monitor')
manager = ModuleManager()
manager.load_observers()
rospy.spin()