diff --git a/core/frontend/src/components/kraken/KrakenManager.ts b/core/frontend/src/components/kraken/KrakenManager.ts index 451c4b6d54..7b0f93f106 100644 --- a/core/frontend/src/components/kraken/KrakenManager.ts +++ b/core/frontend/src/components/kraken/KrakenManager.ts @@ -1,3 +1,5 @@ +import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts' + import zenoh from '@/libs/zenoh' import { ExtensionData, @@ -9,24 +11,14 @@ import { UploadProgressEvent, } from '@/types/kraken' import back_axios from '@/utils/api' -import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts' +import { createDeferred } from '@/utils/deferred' const KRAKEN_BASE_URL = '/kraken' const KRAKEN_API_V2_URL = `${KRAKEN_BASE_URL}/v2.0` +const KRAKEN_BASE_ZENOH = 'kraken' +const INSTALL_PROGRESS_TOPIC = `${KRAKEN_BASE_ZENOH}/extension/install/progress` +const ZENOH_QUERY_STANDARD_TIMEOUT = 10000 -/** - * List details of all installed extensions. - * @returns {Promise} - */ -export async function fetchInstalledExtensions(): Promise { - const response = await back_axios({ - method: 'get', - url: `${KRAKEN_API_V2_URL}/extension/`, - timeout: 10000, - }) - - return response.data as InstalledExtensionData[] -} /** * List all manifest sources from kraken, uses API v2 @@ -188,78 +180,161 @@ export async function setManifestSourceOrder(identifier: string, order: number): }) } +function buildInstallQueryKey(identifier: string, tag: string | undefined, stable: boolean): string { + let key = `${KRAKEN_BASE_ZENOH}/extension/install?identifier=${encodeURIComponent(identifier)}` + if (tag) key += `;tag=${encodeURIComponent(tag)}` + if (!stable) key += ';stable=false' + return key +} + +type InstallSample = + | { kind: 'error'; message: string } + | { kind: 'complete' } + | { kind: 'progress'; raw: string } + | null + +function parseInstallSample(raw: string, identifier: string): InstallSample { + let data: { identifier?: string; status?: string; error?: string } + try { + data = JSON.parse(raw) + } catch { + return null + } + if (data.identifier !== identifier) return null + if (data.error) return { kind: 'error', message: data.error } + if (data.status === 'complete') return { kind: 'complete' } + return { kind: 'progress', raw } +} + /** - * Install an extension to the latest version available - * @param {InstalledExtensionData} extension The extension to be installed + * Install an extension to the latest version available. + * The backend publishes the pull progress on `INSTALL_PROGRESS_TOPIC`. + * + * @param {string} identifier The identifier of the extension * @param {function} progressHandler The progress handler for the download + * @param {string} tag The tag of the extension + * @param {boolean} stable If true, will install the latest stable version, default is true + * @param {number} timeout The timeout for the install */ export async function installExtension( - extension: InstalledExtensionData, - progressHandler: (event: any) => void, + identifier: string, + progressHandler?: (fragment: string) => void, + tag?: string, + stable = true, + timeout = 600000, ): Promise { - await back_axios({ - url: `${KRAKEN_API_V2_URL}/extension/install`, - method: 'POST', - data: { - identifier: extension.identifier, - name: extension.name, - docker: extension.docker, - tag: extension.tag, - enabled: true, - permissions: extension?.permissions ?? '', - user_permissions: extension?.user_permissions ?? '', - }, - timeout: 600000, - onDownloadProgress: progressHandler, - }) + const deferred = createDeferred() + let subscriber: Subscriber | null = null + let timer: ReturnType | null = null + + async function cleanup(): Promise { + if (timer !== null) { + clearTimeout(timer) + timer = null + } + try { + await subscriber?.undeclare() + } catch { + // The subscriber may already be gone. Ignore cleanup errors. + } + subscriber = null + } + + async function handleSample(sample: Sample): Promise { + const result = parseInstallSample(sample.payload().to_string(), identifier) + if (result === null) return + switch (result.kind) { + case 'error': + cleanup().finally(() => deferred.reject(new Error(result.message))) + break + case 'complete': + cleanup().finally(() => deferred.resolve()) + break + case 'progress': + progressHandler?.(result.raw) + break + default: + break + } + } + + // Subscribe before triggering the install. + subscriber = await zenoh.subscriber(INSTALL_PROGRESS_TOPIC, handleSample) + if (!subscriber) { + throw new Error('Failed to subscribe to install progress topic') + } + timer = setTimeout( + () => cleanup().finally(() => deferred.reject(new Error(`Install timed out after ${timeout}ms`))), + timeout, + ) + + try { + const reply = await zenoh.query( + buildInstallQueryKey(identifier, tag, stable), + QueryTarget.BestMatching, + timeout, + ) + if (!reply || reply.error) { + throw new Error(reply?.error ?? 'Install query failed') + } + } catch (error) { + await cleanup() + throw error + } + + return deferred.promise } /** - * Enable an extension by its identifier and tag, uses API v2 + * Enable an extension by its identifier and tag, uses zenoh * @param {string} identifier The identifier of the extension * @param {string} tag The tag of the extension */ export async function enableExtension(identifier: string, tag: string): Promise { - await back_axios({ - method: 'POST', - url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${tag}/enable`, - timeout: 10000, - }) + await zenoh.query( + `${KRAKEN_BASE_ZENOH}/extension/enable?identifier=${encodeURIComponent(identifier)};tag=${encodeURIComponent(tag)}`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** - * Disable an extension by its identifier, uses API v2 + * Disable an extension by its identifier, uses zenoh * @param {string} identifier The identifier of the extension */ export async function disableExtension(identifier: string): Promise { - await back_axios({ - method: 'POST', - url: `${KRAKEN_API_V2_URL}/extension/${identifier}/disable`, - timeout: 10000, - }) + await zenoh.query( + `${KRAKEN_BASE_ZENOH}/extension/disable?identifier=${encodeURIComponent(identifier)}`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** - * Uninstall an extension by its identifier, uses API v2 + * Uninstall an extension by its identifier, uses zenoh * @param {string} identifier The identifier of the extension */ -export async function uninstallExtension(identifier: string): Promise { - await back_axios({ - method: 'DELETE', - url: `${KRAKEN_API_V2_URL}/extension/${identifier}`, - }) +export async function uninstallExtension(identifier: string, tag?: string): Promise { + let queryKey = `${KRAKEN_BASE_ZENOH}/extension/uninstall?identifier=${encodeURIComponent(identifier)}` + if (tag) queryKey += `;tag=${encodeURIComponent(tag)}` + + await zenoh.query( + queryKey, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** - * Restart an extension by its identifier, uses API v2 + * Restart an extension by its identifier, uses zenoh * @param {string} identifier The identifier of the extension */ export async function restartExtension(identifier: string): Promise { - await back_axios({ - method: 'POST', - url: `${KRAKEN_API_V2_URL}/extension/${identifier}/restart`, - timeout: 10000, - }) + await zenoh.query( + `${KRAKEN_BASE_ZENOH}/extension/restart?identifier=${encodeURIComponent(identifier)}`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** @@ -282,42 +357,37 @@ export async function updateExtensionToVersion( } /** - * List all installed extensions from kraken, uses API v2 + * List details of all installed extensions. + * @returns {Promise | null} */ -export async function getInstalledExtensions(): Promise { - const response = await back_axios({ - method: 'GET', - url: `${KRAKEN_API_V2_URL}/extension/`, - timeout: 30000, - }) - - return response.data as InstalledExtensionData[] +export async function fetchInstalledExtensions(): Promise { + return zenoh.query( + `${KRAKEN_BASE_ZENOH}/extension/fetch`, + QueryTarget.BestMatching, + 30000, + ) } /** - * List all running containers from kraken, uses API v2 + * List all running containers from kraken, uses zenoh. */ -export async function listContainers(): Promise { - const response = await back_axios({ - method: 'GET', - url: `${KRAKEN_API_V2_URL}/container/`, - timeout: 30000, - }) - - return response.data as RunningContainer[] +export async function listContainers(): Promise { + return zenoh.query( + `${KRAKEN_BASE_ZENOH}/container/fetch`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** - * List all stats of all running containers from kraken, uses API v2 + * List all stats of all running containers from kraken, uses zenoh. */ -export async function getContainersStats(): Promise { - const response = await back_axios({ - method: 'GET', - url: `${KRAKEN_API_V2_URL}/container/stats`, - timeout: 20000, - }) - - return response.data +export async function getContainersStats(): Promise { + return zenoh.query( + `${KRAKEN_BASE_ZENOH}/container/stats`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** @@ -352,11 +422,11 @@ export async function uploadExtensionTarFile( * @returns {Promise} */ export async function keepTemporaryExtensionAlive(tempTag: string): Promise { - await back_axios({ - method: 'POST', - url: `${KRAKEN_API_V2_URL}/extension/upload/keep-alive?temp_tag=${tempTag}`, - timeout: 10000, - }) + await zenoh.query( + `${KRAKEN_BASE_ZENOH}/extension/upload/keep-alive?temp_tag=${encodeURIComponent(tempTag)}`, + QueryTarget.BestMatching, + ZENOH_QUERY_STANDARD_TIMEOUT, + ) } /** @@ -395,7 +465,7 @@ export async function finalizeExtension( * @returns {Promise} */ export async function getHistoricalLogsForExtension(identifier: string, timeout: number): Promise { - const queryKey = `kraken/extension/logs/request?extension_name=${identifier}` + const queryKey = `${KRAKEN_BASE_ZENOH}/container/logs/request?extension_name=${identifier}` return await zenoh.query(queryKey, QueryTarget.BestMatching, timeout) } @@ -424,9 +494,7 @@ export default { disabledManifestSource, setManifestSourcesOrders, setManifestSourceOrder, - updateExtensionToVersion, installExtension, - getInstalledExtensions, enableExtension, disableExtension, uninstallExtension, diff --git a/core/frontend/src/utils/deferred.ts b/core/frontend/src/utils/deferred.ts new file mode 100644 index 0000000000..8fde32480c --- /dev/null +++ b/core/frontend/src/utils/deferred.ts @@ -0,0 +1,15 @@ +interface Deferred { + promise: Promise + resolve: (value: T) => void + reject: (reason: Error) => void +} + +export function createDeferred(): Deferred { + let resolve!: (value: T) => void + let reject!: (reason: Error) => void + const promise = new Promise((res, rej) => { + resolve = res + reject = rej + }) + return { promise, resolve, reject } +} diff --git a/core/frontend/src/views/ExtensionManagerView.vue b/core/frontend/src/views/ExtensionManagerView.vue index 91bcc2674f..3b7ea53fc0 100644 --- a/core/frontend/src/views/ExtensionManagerView.vue +++ b/core/frontend/src/views/ExtensionManagerView.vue @@ -378,7 +378,6 @@