This repository was archived by the owner on Nov 13, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathtest_decision_worker.py
More file actions
149 lines (110 loc) · 3.96 KB
/
test_decision_worker.py
File metadata and controls
149 lines (110 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
Test worker process.
"""
import multiprocessing as mp
import time
from modules import drone_odometry_local
from modules import object_in_world
from modules import odometry_and_time
from modules.decision import decision
from modules.decision import decision_worker
from utilities.workers import worker_controller
from utilities.workers import queue_proxy_wrapper
DISTANCE_SQUARED_THRESHOLD = 25 # Distance is in meters
TOLERANCE = 0.1 # meters
CAMERA_FOV_FORWARDS = 90 # degrees
CAMERA_FOV_SIDEWAYS = 120 # degrees
SEARCH_HEIGHT = 10 # meters
SEARCH_OVERLAP = 0.2
SMALL_ADJUSTMENT = 0.5 # meters
WORK_COUNT = 5
def simulate_cluster_estimation_worker(
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
) -> None:
"""
Places list of detected landing pads into the queue.
"""
result, object1 = object_in_world.ObjectInWorld.create(1.0, 2.0, 0.9)
assert result
assert object1 is not None
result, object2 = object_in_world.ObjectInWorld.create(4.5, 3.0, 2.0)
assert result
assert object2 is not None
result, object3 = object_in_world.ObjectInWorld.create(7.2, 2.9, 4.6)
assert result
assert object3 is not None
objects = [object1, object2, object3]
input_queue.queue.put(objects)
def simulate_flight_interface_worker(
timestamp: float, odometry_queue: queue_proxy_wrapper.QueueProxyWrapper
) -> None:
"""
Place odometry data into queue of size 1.
"""
result, drone_position = drone_odometry_local.DronePositionLocal.create(0.0, 2.0, -1.0)
assert result
assert drone_position is not None
result, drone_orientation = drone_odometry_local.DroneOrientationLocal.create_new(0.0, 0.0, 0.0)
assert result
assert drone_orientation is not None
result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create(
drone_position, drone_orientation
)
assert result
assert drone_odometry is not None
result, drone_odometry_and_time = odometry_and_time.OdometryAndTime.create(drone_odometry)
assert result
assert drone_odometry_and_time is not None
drone_odometry_and_time.timestamp = timestamp
odometry_queue.queue.put(drone_odometry_and_time)
def main() -> int:
"""
Main function.
"""
controller = worker_controller.WorkerController()
mp_manager = mp.Manager()
cluster_input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
odometry_input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager, maxsize=1)
decision_output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
worker = mp.Process(
target=decision_worker.decision_worker,
args=(
DISTANCE_SQUARED_THRESHOLD,
TOLERANCE,
CAMERA_FOV_FORWARDS,
CAMERA_FOV_SIDEWAYS,
SEARCH_HEIGHT,
SEARCH_OVERLAP,
SMALL_ADJUSTMENT,
odometry_input_queue,
cluster_input_queue,
decision_output_queue,
controller,
),
)
# Starts the decision worker
worker.start()
# Simulate odometry data and cluster estimation
for i in range(0, WORK_COUNT):
simulate_flight_interface_worker(i, odometry_input_queue)
simulate_cluster_estimation_worker(cluster_input_queue)
time.sleep(1)
for i in range(0, WORK_COUNT):
simulate_flight_interface_worker(i, odometry_input_queue)
simulate_cluster_estimation_worker(cluster_input_queue)
controller.request_exit()
# Test
for i in range(0, WORK_COUNT * 2):
decision_output: decision.Decision = decision_output_queue.queue.get_nowait()
print(f"Decision output: {decision_output}")
assert decision_output is not None
# Teardown
odometry_input_queue.fill_and_drain_queue()
cluster_input_queue.fill_and_drain_queue()
worker.join()
return 0
if __name__ == "__main__":
result_main = main()
if result_main < 0:
print(f"ERROR: Status code: {result_main}")
print("Done!")