This repository was archived by the owner on Nov 13, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathauto_landing_worker.py
More file actions
82 lines (67 loc) · 2.45 KB
/
auto_landing_worker.py
File metadata and controls
82 lines (67 loc) · 2.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""
Auto-landing worker.
"""
import os
import pathlib
import queue
import time
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import auto_landing
from ..common.modules.logger import logger
def auto_landing_worker(
fov_x: float,
fov_y: float,
im_h: float,
im_w: float,
period: float,
detection_strategy: auto_landing.DetectionSelectionStrategy,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process for auto-landing operations.
fov_x: Horizontal field of view in degrees.
fov_y: Vertical field of view in degrees.
im_h: Image height in pixels.
im_w: Image width in pixels.
period: Wait time in seconds between processing cycles.
detection_strategy: Strategy for selecting detection when multiple targets are present.
input_queue: Queue for receiving merged odometry detections.
output_queue: Queue for sending auto-landing information.
controller: Worker controller for pause/exit management.
"""
worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
print("ERROR: Worker failed to create logger")
return
# Get Pylance to stop complaining
assert local_logger is not None
local_logger.info("Logger initialized", True)
# Create auto-landing instance
result, auto_lander = auto_landing.AutoLanding.create(
fov_x, fov_y, im_h, im_w, local_logger, detection_strategy
)
if not result:
local_logger.error("Worker failed to create AutoLanding object", True)
return
# Get Pylance to stop complaining
assert auto_lander is not None
local_logger.info("Auto-landing worker initialized successfully", True)
while not controller.is_exit_requested():
controller.check_pause()
# Process detections if available
input_data = None
try:
input_data = input_queue.queue.get_nowait()
except queue.Empty:
# No data available, continue
continue
if input_data is not None:
result, landing_info = auto_lander.run(input_data)
if result and landing_info:
output_queue.queue.put(landing_info)
time.sleep(period)