From 5d9945f90f873dd72124dd9e242e00457b1a8c26 Mon Sep 17 00:00:00 2001 From: Nico Date: Mon, 27 Apr 2026 13:23:35 -0300 Subject: [PATCH 1/7] core: zenoh: run query handlers on a shared asyncio loop --- .../src/commonwealth/utils/zenoh_helper.py | 114 ++++++++++++++---- 1 file changed, 90 insertions(+), 24 deletions(-) diff --git a/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py b/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py index 71914cf469..e7f31fa3e3 100644 --- a/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py +++ b/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py @@ -1,8 +1,9 @@ import asyncio import json import re -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Callable +import threading +from concurrent.futures import Future +from typing import Any, Callable, Coroutine import fastapi import zenoh @@ -12,12 +13,17 @@ from .Singleton import Singleton PARAM_REGEX = r"{[a-zA-Z0-9_]+}" +_LOOP_START_TIMEOUT_S = 5.0 +_LOOP_JOIN_TIMEOUT_S = 2.0 class ZenohSession(metaclass=Singleton): session: zenoh.Session | None = None config: zenoh.Config - _executor: ThreadPoolExecutor | None = None + _loop: asyncio.AbstractEventLoop | None = None + _loop_thread: threading.Thread | None = None + _loop_ready: threading.Event + _loop_start_error: Exception | None = None def __init__(self, service_name: str) -> None: if self.session is not None: @@ -26,27 +32,83 @@ def __init__(self, service_name: str) -> None: self.zenoh_config(service_name) self.session = zenoh.open(self.config) - self._executor = ThreadPoolExecutor( - max_workers=4, - thread_name_prefix="zenoh-", + self._loop_ready = threading.Event() + self._loop_start_error = None + self._loop_thread = threading.Thread( + target=self._run_loop, + name="zenoh-loop", + daemon=True, ) - - def submit_to_executor(self, func: Callable[..., Any]) -> None: - if self._executor is None: - logger.warning("Zenoh session executor is not available, task will not be initialized.") - return + self._loop_thread.start() + if not self._loop_ready.wait(timeout=_LOOP_START_TIMEOUT_S): + raise RuntimeError(f"Zenoh event loop did not signal readiness within {_LOOP_START_TIMEOUT_S}s") + if self._loop_start_error is not None: + raise RuntimeError( + f"Zenoh event loop failed to start: {self._loop_start_error}" + ) from self._loop_start_error + + def _run_loop(self) -> None: try: - self._executor.submit(func) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self._loop = loop except Exception as e: - logger.error(f"Error submitting task to zenoh session executor: {e}") + self._loop_start_error = e + self._loop_ready.set() + logger.exception("Failed to initialize Zenoh event loop") + return + self._loop_ready.set() + try: + loop.run_forever() + finally: + try: + pending = asyncio.all_tasks(loop) + for task in pending: + task.cancel() + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.run_until_complete(loop.shutdown_default_executor()) + finally: + loop.close() + + def submit_coroutine(self, coroutine: Coroutine[Any, Any, Any]) -> Future[Any] | None: + + loop = self._loop + if loop is None or not loop.is_running(): + logger.warning("Zenoh session loop is not available, task will not be scheduled.") + coroutine.close() + return None + try: + future = asyncio.run_coroutine_threadsafe(coroutine, loop) + except RuntimeError as e: + logger.warning(f"Could not schedule coroutine on Zenoh loop: {e}") + coroutine.close() + return None + + def _log_if_failed(fut: Future[Any]) -> None: + if fut.cancelled(): + return + exc = fut.exception() + if exc is not None: + logger.opt(exception=exc).error("Unhandled error in Zenoh background task") + + future.add_done_callback(_log_if_failed) + return future def close(self) -> None: if self.session: self.session.close() # type: ignore[no-untyped-call] self.session = None - if self._executor: - self._executor.shutdown(wait=False, cancel_futures=True) - self._executor = None + loop = self._loop + if loop is not None and loop.is_running(): + loop.call_soon_threadsafe(loop.stop) + if self._loop_thread is not None: + self._loop_thread.join(timeout=_LOOP_JOIN_TIMEOUT_S) + if self._loop_thread.is_alive(): + logger.warning(f"Zenoh loop thread did not terminate within {_LOOP_JOIN_TIMEOUT_S}") + self._loop_thread = None + self._loop = None def zenoh_config(self, service_name: str) -> None: configuration = { @@ -78,24 +140,28 @@ def add_queryable(self, path: str, func: Callable[..., Any]) -> None: def wrapper(query: zenoh.Query) -> None: params = dict(query.parameters) # type: ignore + key_expr = query.selector.key_expr async def _handle_async() -> None: try: response = await func(**params) if response is not None: - query.reply(query.selector.key_expr, json.dumps(response, default=str)) + query.reply(key_expr, json.dumps(response, default=str)) except Exception as e: - logger.exception(f"Error in zenoh query handler: {query.selector.key_expr}") + logger.exception(f"Error in zenoh query handler: {key_expr}") error_response = { "error": str(e), "error_type": type(e).__name__, } - query.reply(query.selector.key_expr, json.dumps(error_response)) - - def run_async() -> None: - asyncio.run(_handle_async()) - - self.zenoh_session.submit_to_executor(run_async) + try: + query.reply(key_expr, json.dumps(error_response)) + except Exception: + logger.exception(f"Failed to send error reply for {key_expr}") + finally: + query.drop() # type: ignore[no-untyped-call] + + if self.zenoh_session.submit_coroutine(_handle_async()) is None: + query.drop() # type: ignore[no-untyped-call] if self.zenoh_session.session: self.zenoh_session.session.declare_queryable(full_path, wrapper) From c573eab71b665e00336b46715d5cda11ab576e9a Mon Sep 17 00:00:00 2001 From: Nico Date: Mon, 27 Apr 2026 13:52:20 -0300 Subject: [PATCH 2/7] kraken: add Container endpoints to zenoh --- core/services/kraken/zenoh_handlers/app.py | 8 +++--- ...ension_handler.py => container_handler.py} | 27 ++++++++++++++++--- 2 files changed, 27 insertions(+), 8 deletions(-) rename core/services/kraken/zenoh_handlers/{extension_handler.py => container_handler.py} (64%) diff --git a/core/services/kraken/zenoh_handlers/app.py b/core/services/kraken/zenoh_handlers/app.py index 67c55d351d..3a30286c82 100644 --- a/core/services/kraken/zenoh_handlers/app.py +++ b/core/services/kraken/zenoh_handlers/app.py @@ -1,10 +1,10 @@ from commonwealth.utils.zenoh_helper import ZenohRouter, ZenohSession from config import SERVICE_NAME -from zenoh_handlers.extension_handler import ExtensionHandlers +from zenoh_handlers.container_handler import ContainerHandlers session = ZenohSession(SERVICE_NAME) router = ZenohRouter(SERVICE_NAME) -# Extension -extension_handlers = ExtensionHandlers(router) -extension_handlers.register_queryables() +# Container +container_handlers = ContainerHandlers(router) +container_handlers.register_queryables() \ No newline at end of file diff --git a/core/services/kraken/zenoh_handlers/extension_handler.py b/core/services/kraken/zenoh_handlers/container_handler.py similarity index 64% rename from core/services/kraken/zenoh_handlers/extension_handler.py rename to core/services/kraken/zenoh_handlers/container_handler.py index 26a513582b..ac4100b191 100644 --- a/core/services/kraken/zenoh_handlers/extension_handler.py +++ b/core/services/kraken/zenoh_handlers/container_handler.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, List from commonwealth.utils.zenoh_helper import ZenohRouter from extension_logs import ExtensionLogPublisher @@ -7,7 +7,7 @@ from settings import get_extension_settings -class ExtensionHandlers: +class ContainerHandlers: def __init__(self, router: ZenohRouter) -> None: self.router = router @@ -17,7 +17,10 @@ async def logs_request_handler(self, extension_name: str) -> dict[str, Any]: try: extensions = get_extension_settings() - extension = next((ext for ext in extensions if extension_name in (ext.identifier, ext.name)), None) + extension = next( + (ext for ext in extensions if extension_name in (ext.identifier, ext.name)), + None, + ) if not extension: return {"error": f"Extension {extension_name} not found"} @@ -49,5 +52,21 @@ async def logs_request_handler(self, extension_name: str) -> dict[str, Any]: logger.exception(f"Error handling logs request for {extension_name}") return {"error": str(e), "error_type": type(e).__name__} + async def list_containers_handler(self) -> List[dict[str, Any]]: + """ + List all running containers. + """ + containers = await ContainerManager.get_running_containers() + return [container.dict() for container in containers] + + async def container_stats_handler(self) -> dict[str, dict[str, Any]]: + """ + List stats of all running containers. + """ + stats = await ContainerManager.get_containers_stats() + return {name: usage.dict() for name, usage in stats.items()} + def register_queryables(self) -> None: - self.router.add_queryable("extension/logs/request", self.logs_request_handler) + self.router.add_queryable("container/logs/request", self.logs_request_handler) + self.router.add_queryable("container/fetch", self.list_containers_handler) + self.router.add_queryable("container/stats", self.container_stats_handler) From 403fbf8cb1711641d9a9fed4418bb7fa9949269c Mon Sep 17 00:00:00 2001 From: Nico Date: Mon, 27 Apr 2026 13:55:18 -0300 Subject: [PATCH 3/7] frontend: kraken: move container endpoints to zenoh --- .../src/components/kraken/KrakenManager.ts | 36 +++++++++---------- .../src/views/ExtensionManagerView.vue | 28 ++++++++++----- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/core/frontend/src/components/kraken/KrakenManager.ts b/core/frontend/src/components/kraken/KrakenManager.ts index 451c4b6d54..df41cf012b 100644 --- a/core/frontend/src/components/kraken/KrakenManager.ts +++ b/core/frontend/src/components/kraken/KrakenManager.ts @@ -13,6 +13,8 @@ import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts' const KRAKEN_BASE_URL = '/kraken' const KRAKEN_API_V2_URL = `${KRAKEN_BASE_URL}/v2.0` +const KRAKEN_BASE_ZENOH = 'kraken' +const ZENOH_QUERY_STANDARD_TIMEOUT = 10000 /** * List details of all installed extensions. @@ -295,29 +297,25 @@ export async function getInstalledExtensions(): Promise { - const response = await back_axios({ - method: 'GET', - url: `${KRAKEN_API_V2_URL}/container/`, - timeout: 30000, - }) - - return response.data as RunningContainer[] +export async function listContainers(): Promise { + return zenoh.query( + `${KRAKEN_BASE_ZENOH}/container/fetch`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** - * List all stats of all running containers from kraken, uses API v2 + * List all stats of all running containers from kraken, uses zenoh. */ -export async function getContainersStats(): Promise { - const response = await back_axios({ - method: 'GET', - url: `${KRAKEN_API_V2_URL}/container/stats`, - timeout: 20000, - }) - - return response.data +export async function getContainersStats(): Promise { + return zenoh.query( + `${KRAKEN_BASE_ZENOH}/container/stats`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** @@ -395,7 +393,7 @@ export async function finalizeExtension( * @returns {Promise} */ export async function getHistoricalLogsForExtension(identifier: string, timeout: number): Promise { - const queryKey = `kraken/extension/logs/request?extension_name=${identifier}` + const queryKey = `${KRAKEN_BASE_ZENOH}/container/logs/request?extension_name=${identifier}` return await zenoh.query(queryKey, QueryTarget.BestMatching, timeout) } diff --git a/core/frontend/src/views/ExtensionManagerView.vue b/core/frontend/src/views/ExtensionManagerView.vue index 91bcc2674f..c16e4a1b90 100644 --- a/core/frontend/src/views/ExtensionManagerView.vue +++ b/core/frontend/src/views/ExtensionManagerView.vue @@ -636,6 +636,10 @@ export default Vue.extend({ console.error('[ExtensionManagerView] Failed to open Zenoh session:', error) } }, + applyRunningContainers(containers: RunningContainer[]): void { + this.running_containers = containers + this.clearStaleInstallingState() + }, clearEditedExtension() { this.edited_extension = null }, @@ -863,12 +867,16 @@ export default Vue.extend({ }, async fetchRunningContainers(): Promise { try { - this.running_containers = await kraken.listContainers() - this.clearStaleInstallingState() + const containers = await kraken.listContainers() + if (!Array.isArray(containers)) { + throw new Error('Unexpected response while fetching running containers') + } + this.applyRunningContainers(containers) } catch (error) { notifier.pushBackError('RUNNING_CONTAINERS_FETCH_FAIL', error) } }, + clearStaleInstallingState(): void { if (!this.active_operation_identifier || this.show_pull_output) return const ext = this.installed_extensions[this.active_operation_identifier] @@ -883,13 +891,15 @@ export default Vue.extend({ } }, async fetchContainersStats(): Promise { - kraken.getContainersStats() - .then((response) => { - this.metrics = response - }) - .catch((error) => { - notifier.pushBackError('EXTENSIONS_METRICS_FETCH_FAIL', error) - }) + try { + const stats = await kraken.getContainersStats() + if (!stats || typeof stats !== 'object' || stats.error) { + throw new Error(stats?.error ?? 'Unexpected response while fetching container stats') + } + this.metrics = stats + } catch (error) { + notifier.pushBackError('EXTENSIONS_METRICS_FETCH_FAIL', error) + } }, getContainerName(extension: InstalledExtensionData): string | null { return this.getContainer(extension)?.name ?? null From 2b9f3f036e92688de41d19dffc43226c6193de61 Mon Sep 17 00:00:00 2001 From: Nico Date: Mon, 27 Apr 2026 17:29:41 -0300 Subject: [PATCH 4/7] core: zenoh: add queryable support for install operations --- .../src/commonwealth/utils/zenoh_helper.py | 96 ++++++++++++++----- 1 file changed, 74 insertions(+), 22 deletions(-) diff --git a/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py b/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py index e7f31fa3e3..b3bb9a247a 100644 --- a/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py +++ b/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py @@ -3,7 +3,7 @@ import re import threading from concurrent.futures import Future -from typing import Any, Callable, Coroutine +from typing import Any, AsyncGenerator, Callable, Coroutine, Optional import fastapi import zenoh @@ -128,44 +128,90 @@ def zenoh_config(self, service_name: str) -> None: class ZenohRouter: prefix: str zenoh_session: ZenohSession + _publishers: dict[str, zenoh.Publisher] def __init__(self, service_name: str): self.prefix = service_name self.zenoh_session = ZenohSession(service_name) + self._publishers = {} + + def close(self) -> None: + for publisher in self._publishers.values(): + try: + publisher.undeclare() # type: ignore[no-untyped-call] + except Exception: + logger.exception(f"Failed to undeclare publisher {publisher.key_expr}.") + self._publishers.clear() + + def ensure_publisher(self, path: str) -> None: + full_path = self.get_path(path) + + if self.zenoh_session.session is None: + logger.warning(f"Zenoh session unavailable, cannot declare publisher for {full_path}") + return + + if full_path not in self._publishers: + self._publishers[full_path] = self.zenoh_session.session.declare_publisher(full_path) + + def get_publisher(self, path: str) -> zenoh.Publisher | None: + full_path = self.get_path(path) + return self._publishers.get(full_path) def add_queryable(self, path: str, func: Callable[..., Any]) -> None: - full_path = self.prefix - if path: - full_path += f"/{path}" + full_path = self.get_path(path) def wrapper(query: zenoh.Query) -> None: params = dict(query.parameters) # type: ignore key_expr = query.selector.key_expr - async def _handle_async() -> None: - try: - response = await func(**params) - if response is not None: - query.reply(key_expr, json.dumps(response, default=str)) - except Exception as e: - logger.exception(f"Error in zenoh query handler: {key_expr}") - error_response = { - "error": str(e), - "error_type": type(e).__name__, - } + async def _handle_async(q: zenoh.Query) -> None: + with q: try: - query.reply(key_expr, json.dumps(error_response)) - except Exception: - logger.exception(f"Failed to send error reply for {key_expr}") - finally: - query.drop() # type: ignore[no-untyped-call] - - if self.zenoh_session.submit_coroutine(_handle_async()) is None: + response = await func(**params) + if response is not None: + q.reply(key_expr, json.dumps(response, default=str)) + except Exception as e: + logger.exception(f"Error in zenoh query handler: {key_expr}") + error_response = { + "error": str(e), + "error_type": type(e).__name__, + } + try: + q.reply(key_expr, json.dumps(error_response)) + except Exception: + logger.exception(f"Failed to send error reply for {key_expr}") + + if self.zenoh_session.submit_coroutine(_handle_async(query)) is None: query.drop() # type: ignore[no-untyped-call] if self.zenoh_session.session: self.zenoh_session.session.declare_queryable(full_path, wrapper) + def publish_from_generator( + self, + topic: str, + generator: AsyncGenerator[str, None], + on_complete: Optional[str] = None, + ) -> None: + async def _run() -> None: + publisher = self.get_publisher(topic) + if publisher is None: + logger.warning(f"Publisher for {topic} not found, dropping generator") + async for _ in generator: + pass + return + + try: + async for chunk in generator: + publisher.put(chunk) + except Exception: + logger.exception(f"Error while publishing from generator on {topic}") + finally: + if on_complete is not None: + publisher.put(on_complete) + + self.zenoh_session.submit_coroutine(_run()) + def add_routes_to_zenoh(self, app: fastapi.FastAPI) -> None: queryables = [] for route in app.router.routes: @@ -181,6 +227,12 @@ def add_routes_to_zenoh(self, app: fastapi.FastAPI) -> None: for path, func in queryables: self.add_queryable(path, func) + def get_path(self, path: str) -> str: + full_path = self.prefix + if path: + full_path += f"/{path}" + return full_path + def clean_path(path: str) -> str: path = path.removeprefix("/").removesuffix("/") From 2a9d15f99c6ef94360710ee6845e8e1980abb18d Mon Sep 17 00:00:00 2001 From: Nico Date: Mon, 27 Apr 2026 14:00:32 -0300 Subject: [PATCH 5/7] kraken: zenoh: add handlers for the extension endpoints --- core/services/kraken/zenoh_handlers/app.py | 7 +- .../zenoh_handlers/extension_handler.py | 101 ++++++++++++++++++ 2 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 core/services/kraken/zenoh_handlers/extension_handler.py diff --git a/core/services/kraken/zenoh_handlers/app.py b/core/services/kraken/zenoh_handlers/app.py index 3a30286c82..f15230a3d7 100644 --- a/core/services/kraken/zenoh_handlers/app.py +++ b/core/services/kraken/zenoh_handlers/app.py @@ -1,10 +1,15 @@ from commonwealth.utils.zenoh_helper import ZenohRouter, ZenohSession from config import SERVICE_NAME from zenoh_handlers.container_handler import ContainerHandlers +from zenoh_handlers.extension_handler import ExtensionHandlers session = ZenohSession(SERVICE_NAME) router = ZenohRouter(SERVICE_NAME) +# Extension +extension_handlers = ExtensionHandlers(router) +extension_handlers.register_queryables() + # Container container_handlers = ContainerHandlers(router) -container_handlers.register_queryables() \ No newline at end of file +container_handlers.register_queryables() diff --git a/core/services/kraken/zenoh_handlers/extension_handler.py b/core/services/kraken/zenoh_handlers/extension_handler.py new file mode 100644 index 0000000000..050c18bbed --- /dev/null +++ b/core/services/kraken/zenoh_handlers/extension_handler.py @@ -0,0 +1,101 @@ +import asyncio +import json +from typing import Any, AsyncGenerator, List, cast + +from commonwealth.utils.zenoh_helper import ZenohRouter +from extension.extension import Extension +from loguru import logger + + +class ExtensionHandlers: + INSTALL_PROGRESS_TOPIC = "extension/install/progress" + + def __init__(self, router: ZenohRouter) -> None: + self.router = router + self.router.ensure_publisher(self.INSTALL_PROGRESS_TOPIC) + + @staticmethod + async def _install_progress_stream(identifier: str, extension: Extension) -> AsyncGenerator[str, None]: + try: + async for chunk in extension.install(): + try: + payload = json.loads(chunk) + except (TypeError, ValueError) as e: + logger.debug(f"Failed to parse install progress chunk: {e}") + continue + payload["identifier"] = identifier + yield json.dumps(payload, default=str) + except Exception as error: + logger.exception(f"Install of {identifier} failed") + yield json.dumps({"identifier": identifier, "error": str(error)}, default=str) + + async def install_handler(self, identifier: str, tag: str = "", stable: str = "true") -> dict[str, str]: + """ + Install an extension by its identifier and tag, if tag is not provided it will install the latest stable version. + """ + if tag: + extension = cast(Extension, await Extension.from_manifest(identifier, tag)) + else: + extension = await Extension.from_latest(identifier, stable.lower() == "true") + + on_complete = json.dumps({"identifier": identifier, "status": "complete"}) + self.router.publish_from_generator( + self.INSTALL_PROGRESS_TOPIC, + self._install_progress_stream(identifier, extension), + on_complete=on_complete, + ) + return {"status": "started", "identifier": identifier} + + async def uninstall_handler(self, identifier: str, tag: str = "") -> None: + """ + Uninstall all versions of an extension by its identifier or just a specific version if a tag is provided. + """ + if tag: + extension = cast(Extension, await Extension.from_settings(identifier, tag)) + await extension.uninstall() + else: + extensions = cast(List[Extension], await Extension.from_settings(identifier)) + await asyncio.gather(*[ext.uninstall() for ext in extensions]) + + async def enable_handler(self, identifier: str, tag: str) -> None: + """ + Enables an extension by its identifier and tag. + """ + extension = cast(Extension, await Extension.from_settings(identifier, tag)) + await extension.enable() + + async def disable_handler(self, identifier: str) -> None: + """ + Disables current running extension by its identifier. + """ + extension = await Extension.from_running(identifier) + await extension.disable() + + async def restart_handler(self, identifier: str) -> None: + """ + Restart current running extension by its identifier. + """ + extension = await Extension.from_running(identifier) + await extension.restart() + + async def fetch_handler(self) -> list[dict[str, Any]]: + """ + List details of all installed extensions. + """ + extensions = cast(List[Extension], await Extension.from_settings()) + return [ext.source.model_dump() for ext in extensions if ext.source.identifier != ""] + + async def keep_uploaded_extension_alive_handler(self, temp_tag: str) -> None: + """ + Refresh the keep-alive timestamp for a temporary extension while the user is editing metadata. + """ + Extension.keep_temporary_extension_alive(temp_tag) + + def register_queryables(self) -> None: + self.router.add_queryable("extension/fetch", self.fetch_handler) + self.router.add_queryable("extension/install", self.install_handler) + self.router.add_queryable("extension/uninstall", self.uninstall_handler) + self.router.add_queryable("extension/enable", self.enable_handler) + self.router.add_queryable("extension/disable", self.disable_handler) + self.router.add_queryable("extension/restart", self.restart_handler) + self.router.add_queryable("extension/upload/keep-alive", self.keep_uploaded_extension_alive_handler) From ac0437a06af1e7657adcc0608544bde4258a6d91 Mon Sep 17 00:00:00 2001 From: Nico Date: Thu, 30 Apr 2026 12:47:57 -0300 Subject: [PATCH 6/7] frontend: utils: add defered pattern --- core/frontend/src/utils/deferred.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 core/frontend/src/utils/deferred.ts diff --git a/core/frontend/src/utils/deferred.ts b/core/frontend/src/utils/deferred.ts new file mode 100644 index 0000000000..8fde32480c --- /dev/null +++ b/core/frontend/src/utils/deferred.ts @@ -0,0 +1,15 @@ +interface Deferred { + promise: Promise + resolve: (value: T) => void + reject: (reason: Error) => void +} + +export function createDeferred(): Deferred { + let resolve!: (value: T) => void + let reject!: (reason: Error) => void + const promise = new Promise((res, rej) => { + resolve = res + reject = rej + }) + return { promise, resolve, reject } +} From cea7cfbaf04e16724a2f8c0df9488850735c56ac Mon Sep 17 00:00:00 2001 From: Nico Date: Thu, 30 Apr 2026 12:47:34 -0300 Subject: [PATCH 7/7] frontend: kraken: move extension endpoints to zenoh --- .../src/components/kraken/KrakenManager.ts | 216 ++++++++++++------ .../src/views/ExtensionManagerView.vue | 76 +++--- 2 files changed, 177 insertions(+), 115 deletions(-) diff --git a/core/frontend/src/components/kraken/KrakenManager.ts b/core/frontend/src/components/kraken/KrakenManager.ts index df41cf012b..7b0f93f106 100644 --- a/core/frontend/src/components/kraken/KrakenManager.ts +++ b/core/frontend/src/components/kraken/KrakenManager.ts @@ -1,3 +1,5 @@ +import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts' + import zenoh from '@/libs/zenoh' import { ExtensionData, @@ -9,26 +11,14 @@ import { UploadProgressEvent, } from '@/types/kraken' import back_axios from '@/utils/api' -import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts' +import { createDeferred } from '@/utils/deferred' const KRAKEN_BASE_URL = '/kraken' const KRAKEN_API_V2_URL = `${KRAKEN_BASE_URL}/v2.0` const KRAKEN_BASE_ZENOH = 'kraken' +const INSTALL_PROGRESS_TOPIC = `${KRAKEN_BASE_ZENOH}/extension/install/progress` const ZENOH_QUERY_STANDARD_TIMEOUT = 10000 -/** - * List details of all installed extensions. - * @returns {Promise} - */ -export async function fetchInstalledExtensions(): Promise { - const response = await back_axios({ - method: 'get', - url: `${KRAKEN_API_V2_URL}/extension/`, - timeout: 10000, - }) - - return response.data as InstalledExtensionData[] -} /** * List all manifest sources from kraken, uses API v2 @@ -190,78 +180,161 @@ export async function setManifestSourceOrder(identifier: string, order: number): }) } +function buildInstallQueryKey(identifier: string, tag: string | undefined, stable: boolean): string { + let key = `${KRAKEN_BASE_ZENOH}/extension/install?identifier=${encodeURIComponent(identifier)}` + if (tag) key += `;tag=${encodeURIComponent(tag)}` + if (!stable) key += ';stable=false' + return key +} + +type InstallSample = + | { kind: 'error'; message: string } + | { kind: 'complete' } + | { kind: 'progress'; raw: string } + | null + +function parseInstallSample(raw: string, identifier: string): InstallSample { + let data: { identifier?: string; status?: string; error?: string } + try { + data = JSON.parse(raw) + } catch { + return null + } + if (data.identifier !== identifier) return null + if (data.error) return { kind: 'error', message: data.error } + if (data.status === 'complete') return { kind: 'complete' } + return { kind: 'progress', raw } +} + /** - * Install an extension to the latest version available - * @param {InstalledExtensionData} extension The extension to be installed + * Install an extension to the latest version available. + * The backend publishes the pull progress on `INSTALL_PROGRESS_TOPIC`. + * + * @param {string} identifier The identifier of the extension * @param {function} progressHandler The progress handler for the download + * @param {string} tag The tag of the extension + * @param {boolean} stable If true, will install the latest stable version, default is true + * @param {number} timeout The timeout for the install */ export async function installExtension( - extension: InstalledExtensionData, - progressHandler: (event: any) => void, + identifier: string, + progressHandler?: (fragment: string) => void, + tag?: string, + stable = true, + timeout = 600000, ): Promise { - await back_axios({ - url: `${KRAKEN_API_V2_URL}/extension/install`, - method: 'POST', - data: { - identifier: extension.identifier, - name: extension.name, - docker: extension.docker, - tag: extension.tag, - enabled: true, - permissions: extension?.permissions ?? '', - user_permissions: extension?.user_permissions ?? '', - }, - timeout: 600000, - onDownloadProgress: progressHandler, - }) + const deferred = createDeferred() + let subscriber: Subscriber | null = null + let timer: ReturnType | null = null + + async function cleanup(): Promise { + if (timer !== null) { + clearTimeout(timer) + timer = null + } + try { + await subscriber?.undeclare() + } catch { + // The subscriber may already be gone. Ignore cleanup errors. + } + subscriber = null + } + + async function handleSample(sample: Sample): Promise { + const result = parseInstallSample(sample.payload().to_string(), identifier) + if (result === null) return + switch (result.kind) { + case 'error': + cleanup().finally(() => deferred.reject(new Error(result.message))) + break + case 'complete': + cleanup().finally(() => deferred.resolve()) + break + case 'progress': + progressHandler?.(result.raw) + break + default: + break + } + } + + // Subscribe before triggering the install. + subscriber = await zenoh.subscriber(INSTALL_PROGRESS_TOPIC, handleSample) + if (!subscriber) { + throw new Error('Failed to subscribe to install progress topic') + } + timer = setTimeout( + () => cleanup().finally(() => deferred.reject(new Error(`Install timed out after ${timeout}ms`))), + timeout, + ) + + try { + const reply = await zenoh.query( + buildInstallQueryKey(identifier, tag, stable), + QueryTarget.BestMatching, + timeout, + ) + if (!reply || reply.error) { + throw new Error(reply?.error ?? 'Install query failed') + } + } catch (error) { + await cleanup() + throw error + } + + return deferred.promise } /** - * Enable an extension by its identifier and tag, uses API v2 + * Enable an extension by its identifier and tag, uses zenoh * @param {string} identifier The identifier of the extension * @param {string} tag The tag of the extension */ export async function enableExtension(identifier: string, tag: string): Promise { - await back_axios({ - method: 'POST', - url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${tag}/enable`, - timeout: 10000, - }) + await zenoh.query( + `${KRAKEN_BASE_ZENOH}/extension/enable?identifier=${encodeURIComponent(identifier)};tag=${encodeURIComponent(tag)}`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** - * Disable an extension by its identifier, uses API v2 + * Disable an extension by its identifier, uses zenoh * @param {string} identifier The identifier of the extension */ export async function disableExtension(identifier: string): Promise { - await back_axios({ - method: 'POST', - url: `${KRAKEN_API_V2_URL}/extension/${identifier}/disable`, - timeout: 10000, - }) + await zenoh.query( + `${KRAKEN_BASE_ZENOH}/extension/disable?identifier=${encodeURIComponent(identifier)}`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** - * Uninstall an extension by its identifier, uses API v2 + * Uninstall an extension by its identifier, uses zenoh * @param {string} identifier The identifier of the extension */ -export async function uninstallExtension(identifier: string): Promise { - await back_axios({ - method: 'DELETE', - url: `${KRAKEN_API_V2_URL}/extension/${identifier}`, - }) +export async function uninstallExtension(identifier: string, tag?: string): Promise { + let queryKey = `${KRAKEN_BASE_ZENOH}/extension/uninstall?identifier=${encodeURIComponent(identifier)}` + if (tag) queryKey += `;tag=${encodeURIComponent(tag)}` + + await zenoh.query( + queryKey, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** - * Restart an extension by its identifier, uses API v2 + * Restart an extension by its identifier, uses zenoh * @param {string} identifier The identifier of the extension */ export async function restartExtension(identifier: string): Promise { - await back_axios({ - method: 'POST', - url: `${KRAKEN_API_V2_URL}/extension/${identifier}/restart`, - timeout: 10000, - }) + await zenoh.query( + `${KRAKEN_BASE_ZENOH}/extension/restart?identifier=${encodeURIComponent(identifier)}`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** @@ -284,16 +357,15 @@ export async function updateExtensionToVersion( } /** - * List all installed extensions from kraken, uses API v2 + * List details of all installed extensions. + * @returns {Promise | null} */ -export async function getInstalledExtensions(): Promise { - const response = await back_axios({ - method: 'GET', - url: `${KRAKEN_API_V2_URL}/extension/`, - timeout: 30000, - }) - - return response.data as InstalledExtensionData[] +export async function fetchInstalledExtensions(): Promise { + return zenoh.query( + `${KRAKEN_BASE_ZENOH}/extension/fetch`, + QueryTarget.BestMatching, + 30000, + ) } /** @@ -350,11 +422,11 @@ export async function uploadExtensionTarFile( * @returns {Promise} */ export async function keepTemporaryExtensionAlive(tempTag: string): Promise { - await back_axios({ - method: 'POST', - url: `${KRAKEN_API_V2_URL}/extension/upload/keep-alive?temp_tag=${tempTag}`, - timeout: 10000, - }) + await zenoh.query( + `${KRAKEN_BASE_ZENOH}/extension/upload/keep-alive?temp_tag=${encodeURIComponent(tempTag)}`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** @@ -422,9 +494,7 @@ export default { disabledManifestSource, setManifestSourcesOrders, setManifestSourceOrder, - updateExtensionToVersion, installExtension, - getInstalledExtensions, enableExtension, disableExtension, uninstallExtension, diff --git a/core/frontend/src/views/ExtensionManagerView.vue b/core/frontend/src/views/ExtensionManagerView.vue index c16e4a1b90..3b7ea53fc0 100644 --- a/core/frontend/src/views/ExtensionManagerView.vue +++ b/core/frontend/src/views/ExtensionManagerView.vue @@ -378,7 +378,6 @@