diff --git a/core/services/ardupilot_manager/api/v1/routers/index.py b/core/services/ardupilot_manager/api/v1/routers/index.py index 30fbea116e..22aa7571d7 100644 --- a/core/services/ardupilot_manager/api/v1/routers/index.py +++ b/core/services/ardupilot_manager/api/v1/routers/index.py @@ -261,6 +261,23 @@ def available_routers() -> Any: return autopilot.get_available_routers() +@index_router_v1.post("/set_exec_arguments", summary="Set arguments to be passed to autopilot executable") +@index_to_http_exception +def set_exec_arguments(firmware: str, board: str, arguments: dict[str, str]) -> Any: + logger.info(f"Setting execution arguments of {firmware} on board {board} to {arguments}") + autopilot.set_exec_arguments(firmware, board, arguments) + + +@index_router_v1.post( + "/get_exec_arguments", + summary="Get arguments to be passed to specified autopilot executable", +) +@index_to_http_exception +async def get_exec_arguments(firmware: str, board: str) -> Any: + logger.info(f"Getting execution arguments for firmware {firmware} on board {board}") + return autopilot.get_exec_arguments(firmware, board) + + @index_router_v1.post("/stop", summary="Stop the autopilot.") @index_to_http_exception async def stop() -> Any: diff --git a/core/services/ardupilot_manager/autopilot_manager.py b/core/services/ardupilot_manager/autopilot_manager.py index b64682ce7a..6160bb931f 100644 --- a/core/services/ardupilot_manager/autopilot_manager.py +++ b/core/services/ardupilot_manager/autopilot_manager.py @@ -1,4 +1,5 @@ import asyncio +import json import os import pathlib import subprocess @@ -24,6 +25,7 @@ from mavlink_proxy.Manager import Manager as MavlinkManager from settings import Settings from typedefs import ( + EndpointDefinition, Firmware, FlightController, FlightControllerFlags, @@ -438,6 +440,7 @@ async def start_manual_board(self, board: FlightController) -> None: self.ardupilot_subprocess = None await self.start_mavlink_manager(self.master_endpoint) + # pylint: disable=too-many-locals async def start_sitl(self) -> None: self._current_board = BoardDetector.detect_sitl() if not self.firmware_manager.is_firmware_installed(self._current_board): @@ -451,26 +454,52 @@ async def start_sitl(self) -> None: firmware_path = self.firmware_manager.firmware_path(self._current_board.platform) self.firmware_manager.validate_firmware(firmware_path, self._current_board.platform) - # ArduPilot SITL binary will bind TCP port 5760 (server) and the mavlink router will connect to it as a client - master_endpoint = Endpoint( - name="Master", - owner=self.settings.app_name, - connection_type=EndpointType.TCPClient, - place="127.0.0.1", - argument=5760, - protected=True, - ) + if "exec_arguments" not in self.configuration or str(firmware_path) not in self.configuration["exec_arguments"]: + with open(pathlib.Path(__file__).parent.resolve() / "default_arguments.json", "r", encoding="utf-8") as f: + default_config = json.load(f) + logger.warning(f"Setting defaults parameters for SITL to {default_config}") + self.set_exec_arguments(str(firmware_path), "SITL", default_config["SITL"]) + # Refresh configuration, as user may have changed settings since restart + self.settings.load() + self.configuration = deepcopy(self.settings.content) + arguments = self.configuration["exec_arguments"][str(firmware_path)]["SITL"] + + # ArduPilot SITL binary will bind TCP port specified by user (or default to port 5760) and the mavlink router will connect to it as a client + default_endpoint_args = EndpointDefinition() + + endpoint_config = arguments.get("endpoint") + if endpoint_config is None: + logger.warning("Using default endpoint for SITL because none was provided in settings.json.") + master_endpoint_args = default_endpoint_args + elif not isinstance(endpoint_config, dict): + logger.warning( + "Invalid endpoint configuration type '%s'; falling back to default endpoint for SITL.", + type(endpoint_config).__name__, + ) + master_endpoint_args = default_endpoint_args + else: + # Start with defaults, override only known fields, ignore unknown keys + arg_dict = EndpointDefinition().dict() + for key, value in endpoint_config.items(): + if key in default_endpoint_args: + arg_dict[key] = self._sanitize_endpoint_argument(key, value) + else: + logger.debug("Ignoring unknown endpoint field '%s' in settings.json.", key) + master_endpoint_args = EndpointDefinition(**arg_dict) + + master_endpoint = Endpoint(**master_endpoint_args.dict()) + + arg_list = [str(firmware_path)] + for k, v in arguments.items(): + if k == "endpoint": + continue + arg_list.append(str(k)) + if v != "": + arg_list.append(str(v)) + # pylint: disable=consider-using-with self.ardupilot_subprocess = subprocess.Popen( - [ - firmware_path, - "--model", - self.current_sitl_frame.value, - "--base-port", - str(master_endpoint.argument), - "--home", - "-27.563,-48.459,0.0,270.0", - ], + arg_list, shell=False, encoding="utf-8", errors="ignore", @@ -479,6 +508,17 @@ async def start_sitl(self) -> None: await self.start_mavlink_manager(master_endpoint) + def _sanitize_endpoint_argument(self, key: str, value: str) -> int | bool | str: + if key == "argument": + return int(value) + if key == "protected": + if value == "True": + return True + if value == "False": + return False + raise ValueError("Invalid value for 'protected' argument of endpoint") + return value + async def start_mavlink_manager(self, device: Endpoint) -> None: for endpoint in self.autopilot_default_endpoints: try: @@ -518,6 +558,31 @@ def get_preferred_board(self) -> FlightController: raise NoPreferredBoardSet("Preferred board not set yet.") return FlightController(**preferred_board) + def set_exec_arguments(self, firmware_name: str, board: str, settings: dict[str, str]) -> None: + self.settings.load() + self.configuration = deepcopy(self.settings.content) + try: + if "exec_arguments" not in self.configuration: + self.configuration["exec_arguments"] = {} + if firmware_name not in self.configuration["exec_arguments"]: + self.configuration["exec_arguments"][firmware_name] = {} + self.configuration["exec_arguments"][firmware_name][board] = settings + self.settings.save(self.configuration) + logger.info("Execution arguments set.") + except Exception as e: + logger.error("Error while saving execution arguments") + logger.error(repr(e)) + + def get_exec_arguments(self, firmware_name: str, board: str) -> Any: + self.settings.load() + self.configuration = deepcopy(self.settings.content) + try: + return self.configuration["exec_arguments"][firmware_name][board] + except Exception as e: + logger.error("Error while getting execution arguments") + logger.error(repr(e)) + return None + def get_board_to_be_used(self, boards: List[FlightController]) -> FlightController: """Check if preferred board exists and is connected. If so, use it, otherwise, choose by priority.""" try: diff --git a/core/services/ardupilot_manager/default_arguments.json b/core/services/ardupilot_manager/default_arguments.json new file mode 100644 index 0000000000..6e4a80f9a6 --- /dev/null +++ b/core/services/ardupilot_manager/default_arguments.json @@ -0,0 +1,25 @@ +{ + "SITL" : { + "endpoint" : { + "name" : "Master", + "owner" : "Ardupilot Manager", + "connection_type" : "tcpout", + "place" : "127.0.0.1", + "argument" : "5760", + "protected" : "True" + }, + "--model" : "vectored", + "--home" : "-27.563,-48.459,0.0,270.0" + }, + "Linux" : { + "endpoint" : { + "name" : "Master", + "owner" : "Ardupilot Manager", + "connection_type" : "udpin", + "place" : "127.0.0.1", + "argument" : "8852", + "protected" : "True" + }, + "use_defaults" : "True" + } +} \ No newline at end of file diff --git a/core/services/ardupilot_manager/typedefs.py b/core/services/ardupilot_manager/typedefs.py index 4861693a02..a8f05c59c2 100644 --- a/core/services/ardupilot_manager/typedefs.py +++ b/core/services/ardupilot_manager/typedefs.py @@ -209,3 +209,12 @@ def valid_endpoint(cls: Any, value: str) -> str: def __hash__(self) -> int: # make hashable BaseModel subclass return hash(self.port + self.endpoint) + + +class EndpointDefinition(BaseModel): + name: str = "Master" + owner: str = "Ardupilot Manager" + connection_type: str = "tcpout" + place: str = "127.0.0.1" + argument: Optional[int] = 5760 + protected: Optional[bool] = True