-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathobserver.py
More file actions
117 lines (95 loc) · 3.55 KB
/
observer.py
File metadata and controls
117 lines (95 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import threading
import rospy
class Observer(object):
def __init__(self, name, loop_rate_hz=1):
self._name = name
self._rate = rospy.Rate(loop_rate_hz)
self._lock = threading.Lock()
self._thread = threading.Thread(
target=self._run)
self._thread.daemon = True
self._stop_event = threading.Event()
def __del__(self):
if Observer:
print("{} stopped".format(self._name))
# Every derived class needs to override this
def generate_output():
return None
def _run(self):
while not rospy.is_shutdown() and not self._stopped():
msg = self.generate_output()
print(msg)
# perform_output() should be implemented by OutputInterface's sub-class
self._perform_output(msg)
self._rate.sleep()
def start(self, func):
print("starting {}...".format(self._name))
self._perform_output = func
self._thread.start()
def stop(self):
self._lock.acquire()
self._stop_event.set()
self._lock.release()
def _stopped(self):
self._lock.acquire()
isSet = self._stop_event.isSet()
self._lock.release()
return isSet
class ServiceObserver(Observer):
def __init__(self, name, service_name=None, service_type=None, loop_rate_hz=1):
super(ServiceObserver, self).__init__(name, loop_rate_hz)
self.name = service_name
self.type = service_type
self.client = None
try:
self.start_service()
except:
print("{} service not started".format(self.name))
super.__del__()
def start_service(self):
try:
rospy.wait_for_service(self.name, timeout=1.0)
self.client = rospy.ServiceProxy(self.name, self.type)
print("Service '" + self.name +
"' added of type" + str(self.type))
except rospy.ServiceException as exc:
print("Service {} is not running: ".format(self.name) + str(exc))
def generate_output(self):
try:
resp = self.client.call()
except rospy.ServiceException as exc:
print("Service {} did not process request: ".format(
self.name) + str(exc))
msg = self.msg_from_response(resp)
return msg
# Every derived class needs to override this
def msg_from_response(self, response):
None
class TopicObserver(Observer):
def __init__(self, name, loop_rate_hz, topics):
super(TopicObserver, self).__init__(name=name, loop_rate_hz=loop_rate_hz)
self._topics = topics
self._id = ""
self._num_topics = len(topics)
def is_close(self, a, b, rel_tol=1e-09, abs_tol=0.0):
return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
# Every derived class needs to override this
def calculate_attr(self, msgs):
# do calculations
return None
def generate_output(self):
msgs = []
received_all = True
for topic, topic_type in self._topics:
try:
# Add message synchronization here -- message filter
# Consider "asynchronous" reception as well?
msgs.append(rospy.wait_for_message(topic, topic_type))
except rospy.ROSException as exc:
print("Topic {} is not found: ".format(topic) + str(exc))
received_all = False
break
msg = None
if received_all:
msg = self.calculate_attr(msgs)
return msg