Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/services/ardupilot_manager/api/v1/routers/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,23 @@ def available_routers() -> Any:
return autopilot.get_available_routers()


@index_router_v1.post("/set_exec_arguments", summary="Set arguments to be passed to autopilot executable")
@index_to_http_exception
def set_exec_arguments(firmware: str, board: str, arguments: dict[str, str]) -> Any:
logger.info(f"Setting execution arguments of {firmware} on board {board} to {arguments}")
autopilot.set_exec_arguments(firmware, board, arguments)


@index_router_v1.post(
"/get_exec_arguments",
summary="Get arguments to be passed to specified autopilot executable",
)
@index_to_http_exception
async def get_exec_arguments(firmware: str, board: str) -> Any:
logger.info(f"Getting execution arguments for firmware {firmware} on board {board}")
return autopilot.get_exec_arguments(firmware, board)


@index_router_v1.post("/stop", summary="Stop the autopilot.")
@index_to_http_exception
async def stop() -> Any:
Expand Down
100 changes: 83 additions & 17 deletions core/services/ardupilot_manager/autopilot_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
import os
import pathlib
import subprocess
Expand All @@ -24,6 +25,7 @@
from mavlink_proxy.Manager import Manager as MavlinkManager
from settings import Settings
from typedefs import (
EndpointDefinition,
Firmware,
FlightController,
FlightControllerFlags,
Expand Down Expand Up @@ -438,6 +440,7 @@ async def start_manual_board(self, board: FlightController) -> None:
self.ardupilot_subprocess = None
await self.start_mavlink_manager(self.master_endpoint)

# pylint: disable=too-many-locals
async def start_sitl(self) -> None:
self._current_board = BoardDetector.detect_sitl()
if not self.firmware_manager.is_firmware_installed(self._current_board):
Expand All @@ -451,26 +454,53 @@ async def start_sitl(self) -> None:
firmware_path = self.firmware_manager.firmware_path(self._current_board.platform)
self.firmware_manager.validate_firmware(firmware_path, self._current_board.platform)

if "exec_arguments" not in self.configuration or str(firmware_path) not in self.configuration["exec_arguments"]:
with open(pathlib.Path(__file__).parent.resolve() / "default_arguments.json", "r", encoding="utf-8") as f:
default_config = json.load(f)
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
logger.warning(f"Setting defaults parameters for SITL to {default_config}")
self.set_exec_arguments(str(firmware_path), "SITL", default_config["SITL"])
# Refresh configuration, as user may have changed settings since restart
self.settings.load()
self.configuration = deepcopy(self.settings.content)
arguments = self.configuration["exec_arguments"][str(firmware_path)]["SITL"]

# ArduPilot SITL binary will bind TCP port 5760 (server) and the mavlink router will connect to it as a client
master_endpoint = Endpoint(
name="Master",
owner=self.settings.app_name,
connection_type=EndpointType.TCPClient,
place="127.0.0.1",
argument=5760,
protected=True,
)
default_endpoint_args = EndpointDefinition()

endpoint_config = arguments.get("endpoint")
if endpoint_config is None:
logger.warning("Using default endpoint for SITL because none was provided in settings.json.")
master_endpoint_args = default_endpoint_args
else:
if not isinstance(endpoint_config, dict):
logger.warning(
"Invalid endpoint configuration type '%s'; falling back to default endpoint for SITL.",
type(endpoint_config).__name__,
)
master_endpoint_args = default_endpoint_args
else:
# Start with defaults, override only known fields, ignore unknown keys
arg_dict = EndpointDefinition().dict()
for key, value in endpoint_config.items():
if key in default_endpoint_args:
arg_dict[key] = self._sanitize_endpoint_argument(key, value)
else:
logger.debug("Ignoring unknown endpoint field '%s' in settings.json.", key)
master_endpoint_args = EndpointDefinition(**arg_dict)

master_endpoint = Endpoint(**master_endpoint_args.dict())

arg_list = [str(firmware_path)]
for k, v in arguments.items():
if k == "endpoint":
continue
arg_list.append(str(k))
if v != "":
arg_list.append(str(v))

# pylint: disable=consider-using-with
self.ardupilot_subprocess = subprocess.Popen(
[
firmware_path,
"--model",
self.current_sitl_frame.value,
"--base-port",
str(master_endpoint.argument),
"--home",
"-27.563,-48.459,0.0,270.0",
],
arg_list,
shell=False,
encoding="utf-8",
errors="ignore",
Expand All @@ -479,6 +509,17 @@ async def start_sitl(self) -> None:

await self.start_mavlink_manager(master_endpoint)

def _sanitize_endpoint_argument(self, key: str, value: str) -> int | bool | str:
if key == "argument":
return int(value)
if key == "protected":
if value == "True":
return True
if value == "False":
return False
raise ValueError("Invalid value for 'protected' argument of endpoint")
return value

async def start_mavlink_manager(self, device: Endpoint) -> None:
for endpoint in self.autopilot_default_endpoints:
try:
Expand Down Expand Up @@ -518,6 +559,31 @@ def get_preferred_board(self) -> FlightController:
raise NoPreferredBoardSet("Preferred board not set yet.")
return FlightController(**preferred_board)

def set_exec_arguments(self, firmware_name: str, board: str, settings: dict[str, str]) -> None:
self.settings.load()
self.configuration = deepcopy(self.settings.content)
try:
if "exec_arguments" not in self.configuration:
self.configuration["exec_arguments"] = {}
if firmware_name not in self.configuration["exec_arguments"]:
self.configuration["exec_arguments"][firmware_name] = {}
self.configuration["exec_arguments"][firmware_name][board] = settings
self.settings.save(self.configuration)
logger.info("Execution arguments set.")
except Exception as e:
logger.error("Error while saving execution arguments")
logger.error(repr(e))

def get_exec_arguments(self, firmware_name: str, board: str) -> Any:
self.settings.load()
self.configuration = deepcopy(self.settings.content)
try:
return self.configuration["exec_arguments"][firmware_name][board]
except Exception as e:
logger.error("Error while getting execution arguments")
logger.error(repr(e))
return None

def get_board_to_be_used(self, boards: List[FlightController]) -> FlightController:
"""Check if preferred board exists and is connected. If so, use it, otherwise, choose by priority."""
try:
Expand Down
25 changes: 25 additions & 0 deletions core/services/ardupilot_manager/default_arguments.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"SITL" : {
"endpoint" : {
"name" : "Master",
"owner" : "Ardupilot Manager",
"connection_type" : "tcpout",
"place" : "127.0.0.1",
"argument" : "5760",
"protected" : "True"
},
"--model" : "vectored",
"--home" : "-27.563,-48.459,0.0,270.0"
},
"Linux" : {
"endpoint" : {
"name" : "Master",
"owner" : "Ardupilot Manager",
"connection_type" : "udpin",
"place" : "127.0.0.1",
"argument" : "8852",
"protected" : "True"
},
"use_defaults" : "True"
}
}
9 changes: 9 additions & 0 deletions core/services/ardupilot_manager/typedefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,12 @@ def valid_endpoint(cls: Any, value: str) -> str:

def __hash__(self) -> int: # make hashable BaseModel subclass
return hash(self.port + self.endpoint)


class EndpointDefinition(BaseModel):
name: str = "Master"
owner: str = "Ardupilot Manager"
connection_type: str = "tcpout"
place: str = "127.0.0.1"
argument: Optional[int] = 5760
protected: Optional[bool] = True