Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 160 additions & 92 deletions core/frontend/src/components/kraken/KrakenManager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts'

import zenoh from '@/libs/zenoh'
import {
ExtensionData,
Expand All @@ -9,24 +11,14 @@ import {
UploadProgressEvent,
} from '@/types/kraken'
import back_axios from '@/utils/api'
import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts'
import { createDeferred } from '@/utils/deferred'

const KRAKEN_BASE_URL = '/kraken'
const KRAKEN_API_V2_URL = `${KRAKEN_BASE_URL}/v2.0`
const KRAKEN_BASE_ZENOH = 'kraken'
const INSTALL_PROGRESS_TOPIC = `${KRAKEN_BASE_ZENOH}/extension/install/progress`
const ZENOH_QUERY_STANDARD_TIMEOUT = 10000

/**
* List details of all installed extensions.
* @returns {Promise<InstalledExtensionData[]>}
*/
export async function fetchInstalledExtensions(): Promise<InstalledExtensionData[]> {
const response = await back_axios({
method: 'get',
url: `${KRAKEN_API_V2_URL}/extension/`,
timeout: 10000,
})

return response.data as InstalledExtensionData[]
}

/**
* List all manifest sources from kraken, uses API v2
Expand Down Expand Up @@ -188,78 +180,161 @@ export async function setManifestSourceOrder(identifier: string, order: number):
})
}

function buildInstallQueryKey(identifier: string, tag: string | undefined, stable: boolean): string {
let key = `${KRAKEN_BASE_ZENOH}/extension/install?identifier=${encodeURIComponent(identifier)}`
if (tag) key += `;tag=${encodeURIComponent(tag)}`
if (!stable) key += ';stable=false'
return key
}

type InstallSample =
| { kind: 'error'; message: string }
| { kind: 'complete' }
| { kind: 'progress'; raw: string }
| null

function parseInstallSample(raw: string, identifier: string): InstallSample {
let data: { identifier?: string; status?: string; error?: string }
try {
data = JSON.parse(raw)
} catch {
return null
}
if (data.identifier !== identifier) return null
if (data.error) return { kind: 'error', message: data.error }
if (data.status === 'complete') return { kind: 'complete' }
return { kind: 'progress', raw }
}

/**
* Install an extension to the latest version available
* @param {InstalledExtensionData} extension The extension to be installed
* Install an extension to the latest version available.
* The backend publishes the pull progress on `INSTALL_PROGRESS_TOPIC`.
*
* @param {string} identifier The identifier of the extension
* @param {function} progressHandler The progress handler for the download
* @param {string} tag The tag of the extension
* @param {boolean} stable If true, will install the latest stable version, default is true
* @param {number} timeout The timeout for the install
*/
export async function installExtension(
extension: InstalledExtensionData,
progressHandler: (event: any) => void,
identifier: string,
progressHandler?: (fragment: string) => void,
tag?: string,
stable = true,
timeout = 600000,
): Promise<void> {
await back_axios({
url: `${KRAKEN_API_V2_URL}/extension/install`,
method: 'POST',
data: {
identifier: extension.identifier,
name: extension.name,
docker: extension.docker,
tag: extension.tag,
enabled: true,
permissions: extension?.permissions ?? '',
user_permissions: extension?.user_permissions ?? '',
},
timeout: 600000,
onDownloadProgress: progressHandler,
})
const deferred = createDeferred<void>()
let subscriber: Subscriber | null = null
let timer: ReturnType<typeof setTimeout> | null = null

async function cleanup(): Promise<void> {
if (timer !== null) {
clearTimeout(timer)
timer = null
}
try {
await subscriber?.undeclare()
} catch {
// The subscriber may already be gone. Ignore cleanup errors.
}
subscriber = null
}

async function handleSample(sample: Sample): Promise<void> {
const result = parseInstallSample(sample.payload().to_string(), identifier)
if (result === null) return
switch (result.kind) {
case 'error':
cleanup().finally(() => deferred.reject(new Error(result.message)))
break
case 'complete':
cleanup().finally(() => deferred.resolve())
break
case 'progress':
progressHandler?.(result.raw)
break
default:
break
}
}

// Subscribe before triggering the install.
subscriber = await zenoh.subscriber(INSTALL_PROGRESS_TOPIC, handleSample)
if (!subscriber) {
throw new Error('Failed to subscribe to install progress topic')
}
timer = setTimeout(
() => cleanup().finally(() => deferred.reject(new Error(`Install timed out after ${timeout}ms`))),
timeout,
)

try {
const reply = await zenoh.query(
buildInstallQueryKey(identifier, tag, stable),
QueryTarget.BestMatching,
timeout,
)
if (!reply || reply.error) {
throw new Error(reply?.error ?? 'Install query failed')
}
} catch (error) {
await cleanup()
throw error
}

return deferred.promise
}

/**
* Enable an extension by its identifier and tag, uses API v2
* Enable an extension by its identifier and tag, uses zenoh
* @param {string} identifier The identifier of the extension
* @param {string} tag The tag of the extension
*/
export async function enableExtension(identifier: string, tag: string): Promise<void> {
await back_axios({
method: 'POST',
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${tag}/enable`,
timeout: 10000,
})
await zenoh.query(
Comment thread
nicoschmdt marked this conversation as resolved.
`${KRAKEN_BASE_ZENOH}/extension/enable?identifier=${encodeURIComponent(identifier)};tag=${encodeURIComponent(tag)}`,
QueryTarget.BestMatching,
ZENOH_QUERY_STANDARD_TIMEOUT,
)
}

/**
* Disable an extension by its identifier, uses API v2
* Disable an extension by its identifier, uses zenoh
* @param {string} identifier The identifier of the extension
*/
export async function disableExtension(identifier: string): Promise<void> {
await back_axios({
method: 'POST',
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/disable`,
timeout: 10000,
})
await zenoh.query(
`${KRAKEN_BASE_ZENOH}/extension/disable?identifier=${encodeURIComponent(identifier)}`,
QueryTarget.BestMatching,
ZENOH_QUERY_STANDARD_TIMEOUT,
)
}

/**
* Uninstall an extension by its identifier, uses API v2
* Uninstall an extension by its identifier, uses zenoh
* @param {string} identifier The identifier of the extension
*/
export async function uninstallExtension(identifier: string): Promise<void> {
await back_axios({
method: 'DELETE',
url: `${KRAKEN_API_V2_URL}/extension/${identifier}`,
})
export async function uninstallExtension(identifier: string, tag?: string): Promise<void> {
let queryKey = `${KRAKEN_BASE_ZENOH}/extension/uninstall?identifier=${encodeURIComponent(identifier)}`
if (tag) queryKey += `;tag=${encodeURIComponent(tag)}`

await zenoh.query(
queryKey,
QueryTarget.BestMatching,
ZENOH_QUERY_STANDARD_TIMEOUT,
)
}

/**
* Restart an extension by its identifier, uses API v2
* Restart an extension by its identifier, uses zenoh
* @param {string} identifier The identifier of the extension
*/
export async function restartExtension(identifier: string): Promise<void> {
await back_axios({
method: 'POST',
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/restart`,
timeout: 10000,
})
await zenoh.query(
`${KRAKEN_BASE_ZENOH}/extension/restart?identifier=${encodeURIComponent(identifier)}`,
QueryTarget.BestMatching,
ZENOH_QUERY_STANDARD_TIMEOUT,
)
}

/**
Expand All @@ -282,42 +357,37 @@ export async function updateExtensionToVersion(
}

/**
* List all installed extensions from kraken, uses API v2
* List details of all installed extensions.
* @returns {Promise<InstalledExtensionData[]> | null}
*/
export async function getInstalledExtensions(): Promise<InstalledExtensionData[]> {
const response = await back_axios({
method: 'GET',
url: `${KRAKEN_API_V2_URL}/extension/`,
timeout: 30000,
})

return response.data as InstalledExtensionData[]
export async function fetchInstalledExtensions(): Promise<InstalledExtensionData[] | null> {
return zenoh.query(
`${KRAKEN_BASE_ZENOH}/extension/fetch`,
QueryTarget.BestMatching,
30000,
)
}

/**
* List all running containers from kraken, uses API v2
* List all running containers from kraken, uses zenoh.
*/
export async function listContainers(): Promise<RunningContainer[]> {
const response = await back_axios({
method: 'GET',
url: `${KRAKEN_API_V2_URL}/container/`,
timeout: 30000,
})

return response.data as RunningContainer[]
export async function listContainers(): Promise<RunningContainer[] | null> {
return zenoh.query(
`${KRAKEN_BASE_ZENOH}/container/fetch`,
QueryTarget.BestMatching,
ZENOH_QUERY_STANDARD_TIMEOUT,
)
}

/**
* List all stats of all running containers from kraken, uses API v2
* List all stats of all running containers from kraken, uses zenoh.
*/
export async function getContainersStats(): Promise<any> {
const response = await back_axios({
method: 'GET',
url: `${KRAKEN_API_V2_URL}/container/stats`,
timeout: 20000,
})

return response.data
export async function getContainersStats(): Promise<any | null> {
return zenoh.query(
`${KRAKEN_BASE_ZENOH}/container/stats`,
QueryTarget.BestMatching,
ZENOH_QUERY_STANDARD_TIMEOUT,
)
}

/**
Expand Down Expand Up @@ -352,11 +422,11 @@ export async function uploadExtensionTarFile(
* @returns {Promise<void>}
*/
export async function keepTemporaryExtensionAlive(tempTag: string): Promise<void> {
await back_axios({
method: 'POST',
url: `${KRAKEN_API_V2_URL}/extension/upload/keep-alive?temp_tag=${tempTag}`,
timeout: 10000,
})
await zenoh.query(
`${KRAKEN_BASE_ZENOH}/extension/upload/keep-alive?temp_tag=${encodeURIComponent(tempTag)}`,
QueryTarget.BestMatching,
ZENOH_QUERY_STANDARD_TIMEOUT,
)
}

/**
Expand Down Expand Up @@ -395,7 +465,7 @@ export async function finalizeExtension(
* @returns {Promise<any | null>}
*/
export async function getHistoricalLogsForExtension(identifier: string, timeout: number): Promise<any | null> {
const queryKey = `kraken/extension/logs/request?extension_name=${identifier}`
const queryKey = `${KRAKEN_BASE_ZENOH}/container/logs/request?extension_name=${identifier}`
return await zenoh.query(queryKey, QueryTarget.BestMatching, timeout)
}

Expand Down Expand Up @@ -424,9 +494,7 @@ export default {
disabledManifestSource,
setManifestSourcesOrders,
setManifestSourceOrder,
updateExtensionToVersion,
installExtension,
getInstalledExtensions,
enableExtension,
disableExtension,
uninstallExtension,
Expand Down
15 changes: 15 additions & 0 deletions core/frontend/src/utils/deferred.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
interface Deferred<T> {
promise: Promise<T>
resolve: (value: T) => void
reject: (reason: Error) => void
}

export function createDeferred<T>(): Deferred<T> {
let resolve!: (value: T) => void
let reject!: (reason: Error) => void
const promise = new Promise<T>((res, rej) => {
resolve = res
reject = rej
})
return { promise, resolve, reject }
}
Loading