Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 79 additions & 36 deletions robotics_application_manager/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def __init__(self, host: str, port: int):
self.world_type = None
self.robot_launcher = None
self.tools_launcher = None
self.application_process = None
self.application_processes = {}
self.running = True
self.linter = Lint()

Expand Down Expand Up @@ -722,6 +722,14 @@ def on_change_style(self, event):
except Exception as e:
LogManager.logger.exception(f"Error refreshing GTK applications: {e}")

def _kill_all_applications(self):
for name, proc in self.application_processes.items():
try:
stop_process_and_children(proc)
except Exception:
pass
self.application_processes = {}

def on_run_application(self, event):
"""
Handle the 'run_application' event.
Expand All @@ -734,28 +742,25 @@ def on_run_application(self, event):
event: The event object containing application configuration and code data.
"""
# Kill already running code
try:
proc = psutil.Process(self.application_process.pid)
proc.suspend()
proc.kill()
except Exception:
pass
self._kill_all_applications()

# Delete old files
if os.path.exists("/workspace/code"):
shutil.rmtree("/workspace/code", ignore_errors=False)
os.mkdir("/workspace/code")

# Extract app config
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing comment. Please revert

app_cfg = event.kwargs.get("data", {})
entrypoint = app_cfg["entrypoint"]
to_lint = app_cfg["linter"]

# Unzip the app
# the code comes as a base64 data-uri from the browser, strip the header part
if app_cfg["code"].startswith("data:"):
_, _, code = app_cfg["code"].partition("base64,")
with open("/workspace/code/app.zip", "wb") as result:
result.write(base64.b64decode(code))

# just extract evrything to /workspace/code — if theres a processB/ folder
# inside the zip it'll end up at /workspace/code/processB/ on its own
zip_ref = zipfile.ZipFile("/workspace/code/app.zip", "r")
zip_ref.extractall("/workspace/code")
zip_ref.close()
Expand Down Expand Up @@ -786,9 +791,8 @@ def on_run_application(self, event):
if returncode != 0:
raise Exception("Failed to compile")

self.unpause_sim()
if entrypoint.endswith(".launch.py"):
self.application_process = subprocess.Popen(
proc = subprocess.Popen(
[
f"source /workspace/code/install/setup.bash && ros2 launch {entrypoint}"
],
Expand All @@ -801,8 +805,7 @@ def on_run_application(self, event):
executable="/bin/bash",
)
else:

self.application_process = subprocess.Popen(
proc = subprocess.Popen(
[
"source /workspace/code/install/setup.bash && ros2 run academy academyCode"
],
Expand All @@ -814,6 +817,8 @@ def on_run_application(self, event):
shell=True,
executable="/bin/bash",
)
self.application_processes["agentA"] = proc
self.unpause_sim()
return

# Pass the linter
Expand All @@ -831,15 +836,48 @@ def on_run_application(self, event):
fds = os.listdir("/dev/pts/")
console_fd = str(max(map(int, fds[:-1])))

self.unpause_sim()
self.application_process = subprocess.Popen(
proc = subprocess.Popen(
["python3", entrypoint],
stdin=open("/dev/pts/" + console_fd, "r"),
stdout=open("/dev/pts/" + console_fd, "w"),
stderr=sys.stdout,
bufsize=1024,
universal_newlines=True,
)
self.application_processes["agentA"] = proc

# check if theres a second agent in processB/ subdir — this one is
# pre-programmed from the server side so no need to lint it
processb_entrypoint = os.path.join(
"/workspace/code/processB", os.path.basename(entrypoint)
)
if os.path.isfile(processb_entrypoint):
proc_b = subprocess.Popen(
["python3", processb_entrypoint],
stdin=open("/dev/pts/" + console_fd, "r"),
stdout=open("/dev/pts/" + console_fd, "w"),
stderr=sys.stdout,
bufsize=1024,
universal_newlines=True,
)
self.application_processes["agentB"] = proc_b

# SIGSTOP all the procs first, then unpause gazebo, then SIGCONT them togther.
# using finally so even if unpause fails we dont leave frozen processes behind
for p in self.application_processes.values():
try:
os.kill(p.pid, signal.SIGSTOP)
except ProcessLookupError:
pass

try:
self.unpause_sim()
finally:
for p in self.application_processes.values():
try:
os.kill(p.pid, signal.SIGCONT)
except ProcessLookupError:
pass

LogManager.logger.info("Run application transition finished")

Expand All @@ -853,10 +891,9 @@ def on_terminate_application(self, event):
Parameters:
event: The event object associated with the termination request.
"""
if self.application_process:
if self.application_processes:
try:
stop_process_and_children(self.application_process)
self.application_process = None
self._kill_all_applications()
self.pause_sim()
self.reset_sim()
except Exception:
Expand Down Expand Up @@ -894,10 +931,9 @@ def on_disconnect(self, event):
terminates launchers, and restarts the script.
"""

if self.application_process:
if self.application_processes:
try:
stop_process_and_children(self.application_process)
self.application_process = None
self._kill_all_applications()
except Exception as e:
LogManager.logger.exception("Exception stopping application process")

Expand Down Expand Up @@ -929,13 +965,17 @@ def process_message(self, message):
self.consumer.send_message(message.response(response))

def on_pause(self, msg):
if self.application_process is not None:
proc = psutil.Process(self.application_process.pid)
children = proc.children(recursive=True)
children.append(proc)
for p in children:
if self.application_processes:
for proc in self.application_processes.values():
try:
p.suspend()
p = psutil.Process(proc.pid)
children = p.children(recursive=True)
children.append(p)
for child in children:
try:
child.suspend()
except psutil.NoSuchProcess:
pass
except psutil.NoSuchProcess:
pass
self.pause_sim()
Expand All @@ -953,13 +993,17 @@ def on_resume(self, msg):
Parameters:
msg: The event or message triggering the resume action.
"""
if self.application_process is not None:
proc = psutil.Process(self.application_process.pid)
children = proc.children(recursive=True)
children.append(proc)
for p in children:
if self.application_processes:
for proc in self.application_processes.values():
try:
p.resume()
p = psutil.Process(proc.pid)
children = p.children(recursive=True)
children.append(p)
for child in children:
try:
child.resume()
except psutil.NoSuchProcess:
pass
except psutil.NoSuchProcess:
pass
self.unpause_sim()
Expand Down Expand Up @@ -1028,10 +1072,9 @@ def signal_handler(sign, frame):
except Exception as e:
LogManager.logger.exception("Exception stopping consumer")

if self.application_process:
if self.application_processes:
try:
stop_process_and_children(self.application_process)
self.application_process = None
self._kill_all_applications()
except Exception as e:
LogManager.logger.exception(
"Exception stopping application process"
Expand Down