This repository was archived by the owner on Nov 13, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathtest_flight_interface_worker.py
More file actions
158 lines (125 loc) · 4.72 KB
/
test_flight_interface_worker.py
File metadata and controls
158 lines (125 loc) · 4.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""
To test, start Mission Planner and forward MAVLink over TCP.
"""
import multiprocessing as mp
import queue
import time
from modules.flight_interface import flight_interface_worker
from modules import decision_command
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550"
FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds
FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate
FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds
FLIGHT_ENABLE_HITL = False # bool
FLIGHT_IMAGE_PATH = "tests/brightspot_example" # file path
def apply_decision_test(
in_queue: queue_proxy_wrapper.QueueProxyWrapper,
out_queue: queue_proxy_wrapper.QueueProxyWrapper,
) -> bool:
"""
Test the apply_decision method by sending DecisionCommands via input_queue and verifying odometry data from output_queue.
"""
# Test MOVE_TO_RELATIVE_POSITION command
decision_command_move_relative = (
decision_command.DecisionCommand.create_move_to_relative_position_command(
15.0, 0.0, -5.0 # Move 10 meters north, 0 meters east, ascend 5 meters
)
)
print("Applying MOVE_TO_RELATIVE_POSITION command...")
in_queue.queue.put(decision_command_move_relative)
# Wait for the drone to move
time.sleep(15)
# Verify the drone's position
try:
odometry_and_time_data = out_queue.queue.get_nowait()
print("Drone position after MOVE_TO_RELATIVE_POSITION command:")
print(odometry_and_time_data.odometry_data.position)
except queue.Empty:
print("No odometry data received after MOVE_TO_RELATIVE_POSITION command.")
return False
# Test MOVE_TO_ABSOLUTE_POSITION command
decision_command_move_absolute = (
decision_command.DecisionCommand.create_move_to_absolute_position_command(
43.4337659, -80.5769169, 400 # Somewhere at WRESTRC
)
)
print("Applying MOVE_TO_ABSOLUTE_POSITION command...")
in_queue.queue.put(decision_command_move_absolute)
# Wait for the drone to move
time.sleep(15)
# Verify the drone's position
try:
odometry_and_time_data = out_queue.queue.get_nowait()
print("Drone position after MOVE_TO_ABSOLUTE_POSITION command:")
print(odometry_and_time_data.odometry_data.position)
except queue.Empty:
print("No odometry data received after MOVE_TO_ABSOLUTE_POSITION command.")
return False
# Test LAND_AT_RELATIVE_POSITION command
decision_command_land_relative = (
decision_command.DecisionCommand.create_land_at_relative_position_command(-20, 10, 0)
)
print("Applying LAND_AT_RELATIVE_POSITION command...")
in_queue.queue.put(decision_command_land_relative)
# Wait for the drone to land
time.sleep(15)
# Verify that the drone has landed
try:
odometry_and_time_data = out_queue.queue.get_nowait()
print("Drone position after LAND_AT_RELATIVE_POSITION command:")
print(odometry_and_time_data.odometry_data.position)
except queue.Empty:
print("No odometry data received after LAND_AT_RELATIVE_POSITION command.")
return False
print("apply_decision tests completed successfully.")
return True
def main() -> int:
"""
Main function.
"""
# Setup
controller = worker_controller.WorkerController()
mp_manager = mp.Manager()
out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
communications_in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
worker = mp.Process(
target=flight_interface_worker.flight_interface_worker,
args=(
MAVLINK_CONNECTION_ADDRESS,
FLIGHT_INTERFACE_TIMEOUT,
FLIGHT_INTERFACE_BAUD_RATE,
FLIGHT_INTERFACE_WORKER_PERIOD,
FLIGHT_ENABLE_HITL,
FLIGHT_IMAGE_PATH,
in_queue, # Added input_queue
communications_in_queue,
out_queue,
home_position_out_queue,
controller,
),
)
# Run
worker.start()
time.sleep(3)
# Test
home_position = home_position_out_queue.queue.get()
assert home_position is not None
# Run the apply_decision tests
test_result = apply_decision_test(in_queue, out_queue)
if not test_result:
print("apply_decision tests failed.")
worker.terminate()
return -1
# Teardown
controller.request_exit()
worker.join()
return 0
if __name__ == "__main__":
result_main = main()
if result_main < 0:
print(f"ERROR: Status code: {result_main}")
print("Done!")