From ddc892ab0e69daa3b6f354c0f6a9bb7104da0518 Mon Sep 17 00:00:00 2001 From: dejaguarkyng Date: Mon, 1 Jun 2026 01:03:38 +0000 Subject: [PATCH 1/5] feat: add Jungle Grid integration --- .../credentials/JungleGridApi.credential.ts | 33 ++ .../nodes/tools/JungleGrid/JungleGrid.ts | 77 ++++ .../nodes/tools/JungleGrid/README.md | 75 ++++ .../nodes/tools/JungleGrid/core.test.ts | 69 ++++ .../components/nodes/tools/JungleGrid/core.ts | 348 ++++++++++++++++++ .../nodes/tools/JungleGrid/junglegrid.svg | 7 + packages/components/src/httpSecurity.ts | 6 +- 7 files changed, 614 insertions(+), 1 deletion(-) create mode 100644 packages/components/credentials/JungleGridApi.credential.ts create mode 100644 packages/components/nodes/tools/JungleGrid/JungleGrid.ts create mode 100644 packages/components/nodes/tools/JungleGrid/README.md create mode 100644 packages/components/nodes/tools/JungleGrid/core.test.ts create mode 100644 packages/components/nodes/tools/JungleGrid/core.ts create mode 100644 packages/components/nodes/tools/JungleGrid/junglegrid.svg diff --git a/packages/components/credentials/JungleGridApi.credential.ts b/packages/components/credentials/JungleGridApi.credential.ts new file mode 100644 index 00000000000..48ed1414e5a --- /dev/null +++ b/packages/components/credentials/JungleGridApi.credential.ts @@ -0,0 +1,33 @@ +import { INodeParams, INodeCredential } from '../src/Interface' + +class JungleGridApiCredential implements INodeCredential { + label: string + name: string + version: number + description: string + inputs: INodeParams[] + + constructor() { + this.label = 'Jungle Grid API' + this.name = 'jungleGridApi' + this.version = 1.0 + this.description = + 'Use a Jungle Grid API key to estimate, submit, monitor, cancel, and retrieve artifacts for long-running workloads.' + this.inputs = [ + { + label: 'Jungle Grid API Key', + name: 'apiKey', + type: 'password' + }, + { + label: 'Jungle Grid API Base URL', + name: 'baseUrl', + type: 'url', + default: 'https://api.junglegrid.dev', + description: 'Override only for development or self-hosted Jungle Grid orchestrators.' + } + ] + } +} + +module.exports = { credClass: JungleGridApiCredential } diff --git a/packages/components/nodes/tools/JungleGrid/JungleGrid.ts b/packages/components/nodes/tools/JungleGrid/JungleGrid.ts new file mode 100644 index 00000000000..2c5a5d037ce --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/JungleGrid.ts @@ -0,0 +1,77 @@ +import { convertMultiOptionsToStringArray, getCredentialData, getCredentialParam } from '../../../src/utils' +import { createJungleGridTools, DEFAULT_JUNGLE_GRID_BASE_URL, JungleGridAction, JungleGridClient } from './core' +import type { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' + +const ALL_ACTIONS: { label: string; name: JungleGridAction }[] = [ + { label: 'Estimate Job', name: 'estimateJob' }, + { label: 'Submit Job', name: 'submitJob' }, + { label: 'List Jobs', name: 'listJobs' }, + { label: 'Get Job', name: 'getJob' }, + { label: 'Get Job Runtime', name: 'getJobRuntime' }, + { label: 'Cancel Job', name: 'cancelJob' }, + { label: 'Get Job Logs', name: 'getJobLogs' }, + { label: 'List Job Artifacts', name: 'listJobArtifacts' }, + { label: 'Get Artifact Download URL', name: 'getArtifactDownloadUrl' } +] + +class JungleGrid_Tools implements INode { + label: string + name: string + version: number + type: string + icon: string + category: string + description: string + baseClasses: string[] + credential: INodeParams + inputs: INodeParams[] + documentation?: string + + constructor() { + this.label = 'Jungle Grid' + this.name = 'jungleGrid' + this.version = 1.0 + this.type = 'JungleGrid' + this.icon = 'junglegrid.svg' + this.category = 'Tools' + this.description = 'Estimate, submit, monitor, cancel, and retrieve artifacts for asynchronous Jungle Grid workloads' + this.documentation = 'https://junglegrid.dev/docs/mcp' + this.baseClasses = [this.type, 'Tool'] + this.credential = { + label: 'Jungle Grid Credential', + name: 'credential', + type: 'credential', + credentialNames: ['jungleGridApi'] + } + this.inputs = [ + { + label: 'Actions', + name: 'actions', + type: 'multiOptions', + options: ALL_ACTIONS, + default: [ + 'estimateJob', + 'submitJob', + 'getJob', + 'getJobRuntime', + 'getJobLogs', + 'listJobArtifacts', + 'getArtifactDownloadUrl' + ], + description: 'Choose which Jungle Grid tools to expose to the agent.' + } + ] + } + + async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { + const credentialData = await getCredentialData(nodeData.credential ?? '', options) + const apiKey = getCredentialParam('apiKey', credentialData, nodeData) as string + const baseUrl = (getCredentialParam('baseUrl', credentialData, nodeData) as string) || DEFAULT_JUNGLE_GRID_BASE_URL + const actions = convertMultiOptionsToStringArray(nodeData.inputs?.actions) as JungleGridAction[] + + const client = new JungleGridClient({ apiKey, baseUrl }) + return createJungleGridTools(client, actions) + } +} + +module.exports = { nodeClass: JungleGrid_Tools } diff --git a/packages/components/nodes/tools/JungleGrid/README.md b/packages/components/nodes/tools/JungleGrid/README.md new file mode 100644 index 00000000000..9b0dd500db9 --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/README.md @@ -0,0 +1,75 @@ +# Jungle Grid + +The Jungle Grid tool node lets Flowise agents estimate, submit, monitor, cancel, and retrieve artifacts for asynchronous Jungle Grid workloads. + +Jungle Grid acts as the durable execution layer for long-running AI workloads while Flowise remains the orchestration and visual agent-building layer. + +## Credentials + +Create a `Jungle Grid API` credential with: + +- `Jungle Grid API Key`: a Jungle Grid API key. +- `Jungle Grid API Base URL`: defaults to `https://api.junglegrid.dev`. Override only for development or self-hosted orchestrators. + +Do not place Jungle Grid API keys in prompts, command arguments, environment variables, source code, exported flows, or logs. + +## Actions + +- `Estimate Job`: calls `POST /v1/jobs/estimate` before starting work. Use this when cost, capacity, routing, or GPU tier matters. +- `Submit Job`: calls `POST /v1/jobs` and returns immediately with a `job_id`. A returned `job_id` does not mean the job is complete. +- `List Jobs`: calls `GET /v1/jobs` with verified `limit` and `status` filters. +- `Get Job`: calls `GET /v1/jobs/{job_id}` to retrieve current status and details. +- `Get Job Runtime`: calls `GET /v1/jobs/{job_id}/runtime` for stdout/stderr tails, exit information, diagnostics, and runtime availability. +- `Cancel Job`: calls `POST /v1/jobs/{job_id}/cancel` with an optional reason. +- `Get Job Logs`: uses the verified runtime endpoint to return available stdout/stderr and exit information. +- `List Job Artifacts`: calls `GET /v1/jobs/{job_id}/artifacts`. +- `Get Artifact Download URL`: calls `POST /v1/jobs/{job_id}/artifacts/{artifact_id}/download`. + +Live log streaming is intentionally not exposed as a Flowise tool action. The official stream route is long-lived Server-Sent Events, while Flowise agent tools execute synchronously. Polling `Get Job`, `Get Job Runtime`, and `Get Job Logs` is the production-safe Flowise path. + +## Usage Pattern + +```text +Estimate Job + -> Submit Job + -> store job_id + -> Get Job / Get Job Runtime / Get Job Logs + -> wait for terminal status + -> List Job Artifacts + -> Get Artifact Download URL +``` + +`Submit Job` is asynchronous. It returns a `job_id` immediately, but that does not mean the workload has completed. Poll `Get Job`, `Get Job Runtime`, or `Get Job Logs` until Jungle Grid reports a terminal status. Retrieve artifacts after successful completion unless the API response explicitly shows partial outputs are available. + +## Example Workloads + +Minimal inference smoke-test shape: + +```json +{ + "name": "flowise-jungle-grid-smoke-test", + "image": "python:3.11", + "workload_type": "inference", + "model_size_gb": 1, + "command": "python", + "args": ["-c", "print(42)"] +} +``` + +Artifact-producing shape: + +```json +{ + "name": "flowise-jungle-grid-artifact-test", + "image": "python:3.11", + "workload_type": "batch", + "model_size_gb": 1, + "command": "python", + "args": [ + "-c", + "import json, os; os.makedirs('/workspace/artifacts', exist_ok=True); json.dump({'status':'ok'}, open('/workspace/artifacts/output.json','w'))" + ] +} +``` + +Common use cases include running inference workloads, batch jobs, evaluation workloads, artifact-producing container jobs, and agent-monitored long-running compute work. diff --git a/packages/components/nodes/tools/JungleGrid/core.test.ts b/packages/components/nodes/tools/JungleGrid/core.test.ts new file mode 100644 index 00000000000..8d8a7efec62 --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/core.test.ts @@ -0,0 +1,69 @@ +import { JungleGridClient, createJungleGridTools } from './core' +import { secureAxiosRequest } from '../../../src/httpSecurity' + +jest.mock('../../../src/httpSecurity', () => ({ + secureAxiosRequest: jest.fn() +})) + +const mockedSecureAxiosRequest = secureAxiosRequest as jest.MockedFunction + +describe('JungleGridClient', () => { + beforeEach(() => { + mockedSecureAxiosRequest.mockReset() + mockedSecureAxiosRequest.mockResolvedValue({ + status: 200, + data: { ok: true } + } as any) + }) + + it('uses bearer authentication and the documented estimate route', async () => { + const client = new JungleGridClient({ apiKey: 'test-key', baseUrl: 'https://api.junglegrid.dev/' }) + + await client.estimateJob({ workload_type: 'inference', image: 'python:3.11' }) + + expect(mockedSecureAxiosRequest).toHaveBeenCalledWith( + expect.objectContaining({ + url: 'https://api.junglegrid.dev/v1/jobs/estimate', + method: 'POST', + headers: expect.objectContaining({ + Authorization: 'Bearer test-key', + 'Content-Type': 'application/json' + }) + }) + ) + }) + + it('uses verified production routes for lifecycle and artifact operations', async () => { + const client = new JungleGridClient({ apiKey: 'test-key' }) + + await client.submitJob({ workload_type: 'batch', image: 'python:3.11', command: 'python', args: ['-c', 'print(42)'] }) + await client.listJobs({ limit: 20, status: 'running' }) + await client.getJob('job_123') + await client.getJobRuntime('job_123') + await client.cancelJob('job_123', 'test') + await client.listJobArtifacts('job_123') + await client.getArtifactDownloadUrl('job_123', 'artifact_123') + + const urls = mockedSecureAxiosRequest.mock.calls.map(([config]) => config.url) + expect(urls).toEqual([ + 'https://api.junglegrid.dev/v1/jobs', + 'https://api.junglegrid.dev/v1/jobs?limit=20&status=running', + 'https://api.junglegrid.dev/v1/jobs/job_123', + 'https://api.junglegrid.dev/v1/jobs/job_123/runtime', + 'https://api.junglegrid.dev/v1/jobs/job_123/cancel', + 'https://api.junglegrid.dev/v1/jobs/job_123/artifacts', + 'https://api.junglegrid.dev/v1/jobs/job_123/artifacts/artifact_123/download' + ]) + }) +}) + +describe('createJungleGridTools', () => { + it('creates agent-facing tools with async job guidance', () => { + const client = new JungleGridClient({ apiKey: 'test-key' }) + const tools = createJungleGridTools(client, ['estimateJob', 'submitJob', 'getJob']) + + expect(tools.map((tool) => tool.name)).toEqual(['jungle_grid_estimate_job', 'jungle_grid_submit_job', 'jungle_grid_get_job']) + expect(tools[1].description).toContain('returns a job_id immediately') + expect(tools[1].description).toContain('does not mean the job has finished') + }) +}) diff --git a/packages/components/nodes/tools/JungleGrid/core.ts b/packages/components/nodes/tools/JungleGrid/core.ts new file mode 100644 index 00000000000..b5f19e3835e --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/core.ts @@ -0,0 +1,348 @@ +import { z } from 'zod/v3' +import { DynamicStructuredTool } from '../OpenAPIToolkit/core' +import { formatToolError } from '../../../src/agents' +import { secureAxiosRequest } from '../../../src/httpSecurity' + +export const DEFAULT_JUNGLE_GRID_BASE_URL = 'https://api.junglegrid.dev' + +const jsonHeaders = { + 'Content-Type': 'application/json', + Accept: 'application/json' +} + +const workloadTypeSchema = z.enum(['inference', 'training', 'fine-tuning', 'batch']) +const optimizeForSchema = z.enum(['balanced', 'cost', 'speed']) +const prioritySchema = z.enum(['low', 'balanced', 'high']) +const gpuClassSchema = z.enum(['consumer', 'datacenter']) +const regionModeSchema = z.enum(['prefer', 'strict']) +const jobStatusSchema = z.enum(['pending', 'queued', 'assigned', 'running', 'completed', 'failed', 'rejected', 'cancelled']) + +const constraintsSchema = z + .object({ + max_price_per_hour: z.number().optional().describe('Optional maximum hourly price in USD, as documented by Jungle Grid.'), + preferred_gpu_family: z.string().optional().describe('Optional preferred GPU family, such as l4.'), + avoid_gpu_families: z.array(z.string()).optional().describe('Optional GPU families to avoid.'), + gpu_type: z.string().optional().describe('Optional exact GPU override verified by the official Jungle Grid MCP package.'), + gpu_class: gpuClassSchema.optional().describe('Optional GPU class preference verified by the official Jungle Grid MCP package.'), + region_preference: z.string().optional().describe('Optional preferred region such as us-east or eu-west.'), + region_mode: regionModeSchema.optional().describe('Whether the region preference is preferred or strict.'), + latency_priority: prioritySchema.optional().describe("Latency sensitivity: 'low', 'balanced', or 'high'."), + cost_priority: prioritySchema.optional().describe("Cost sensitivity: 'low', 'balanced', or 'high'.") + }) + .optional() + +const estimateJobSchema = z.object({ + name: z.string().optional().describe('Optional readable job name for the draft workload.'), + workload_type: workloadTypeSchema.describe('Type of workload to estimate.'), + image: z.string().min(1).describe('Docker image to run.'), + command: z.string().optional().describe('Optional container command to include in the estimate draft, for example python.'), + args: z.array(z.string()).optional().describe('Optional arguments passed to the command.'), + model_size_gb: z.number().optional().describe('Approximate model size in GB.'), + disk_gb: z.number().optional().describe('Optional managed-provider local disk override in GB.'), + optimize_for: optimizeForSchema.optional().describe("Scheduling optimization goal: 'balanced', 'cost', or 'speed'."), + latency_priority: prioritySchema.optional().describe("Latency sensitivity: 'low', 'balanced', or 'high'."), + cost_priority: prioritySchema.optional().describe("Cost sensitivity: 'low', 'balanced', or 'high'."), + constraints: constraintsSchema.describe('Optional routing constraints.') +}) + +const submitJobSchema = estimateJobSchema.extend({ + command: z.string().min(1).describe('Container command to run, for example python. Submit Job starts asynchronous remote work.'), + environment: z + .record(z.string()) + .optional() + .describe('Environment variables injected into the container. Do not include secrets unless the user explicitly approves.'), + huggingface_credential_id: z.string().optional().describe('Optional saved Hugging Face credential id.'), + webhook_url: z.string().optional().describe('Optional HTTPS URL to receive signed lifecycle event callbacks.'), + callback_url: z.string().optional().describe('Optional documented callback URL for lifecycle events.'), + callback_auth_token: z.string().optional().describe('Optional callback authentication token. Never use this for unrelated secrets.'), + callback_metadata: z.record(z.any()).optional().describe('Optional metadata included with callbacks.') +}) + +const jobIdSchema = z.object({ + job_id: z.string().min(1).describe('Jungle Grid job_id returned by submit_job.') +}) + +const listJobsSchema = z.object({ + limit: z + .number() + .optional() + .default(20) + .describe('Maximum number of jobs to return. The official MCP package defaults to 20 and documents max 100.'), + status: jobStatusSchema.optional().describe('Optional status filter.') +}) + +const cancelJobSchema = jobIdSchema.extend({ + reason: z.string().optional().describe('Optional cancellation reason.') +}) + +const artifactDownloadSchema = jobIdSchema.extend({ + artifact_id: z.string().min(1).describe('Artifact ID returned by list_job_artifacts.') +}) + +export interface JungleGridClientOptions { + apiKey: string + baseUrl?: string +} + +export class JungleGridClient { + private readonly apiKey: string + private readonly baseUrl: string + + constructor({ apiKey, baseUrl }: JungleGridClientOptions) { + if (!apiKey) throw new Error('Jungle Grid API key is required') + this.apiKey = apiKey + this.baseUrl = normalizeBaseUrl(baseUrl) + } + + estimateJob(input: z.infer): Promise { + return this.request('POST', '/v1/jobs/estimate', input) + } + + submitJob(input: z.infer): Promise { + return this.request('POST', '/v1/jobs', input) + } + + listJobs(input: z.infer): Promise { + const params = new URLSearchParams() + if (input.limit !== undefined) params.set('limit', String(input.limit)) + if (input.status) params.set('status', input.status) + const query = params.toString() + return this.request('GET', query ? `/v1/jobs?${query}` : '/v1/jobs') + } + + getJob(jobId: string): Promise { + return this.request('GET', `/v1/jobs/${encodeURIComponent(jobId)}`) + } + + getJobRuntime(jobId: string): Promise { + return this.request('GET', `/v1/jobs/${encodeURIComponent(jobId)}/runtime`) + } + + cancelJob(jobId: string, reason?: string): Promise { + return this.request('POST', `/v1/jobs/${encodeURIComponent(jobId)}/cancel`, { + reason: reason ?? 'Cancelled via Flowise' + }) + } + + listJobArtifacts(jobId: string): Promise { + return this.request('GET', `/v1/jobs/${encodeURIComponent(jobId)}/artifacts`) + } + + getArtifactDownloadUrl(jobId: string, artifactId: string): Promise { + return this.request('POST', `/v1/jobs/${encodeURIComponent(jobId)}/artifacts/${encodeURIComponent(artifactId)}/download`) + } + + private async request(method: string, path: string, body?: any): Promise { + const response = await secureAxiosRequest({ + url: `${this.baseUrl}${path}`, + method, + headers: { + Authorization: `Bearer ${this.apiKey}`, + ...jsonHeaders + }, + data: body !== undefined ? removeUndefined(body) : undefined + }) + + if (response.status < 200 || response.status >= 300) { + throw new Error(await responseErrorMessage(response, method, path)) + } + + if (response.status === 204) return { ok: true } + + if (response.data === undefined || response.data === null || response.data === '') return { ok: true } + return response.data + } +} + +export type JungleGridAction = + | 'estimateJob' + | 'submitJob' + | 'listJobs' + | 'getJob' + | 'getJobRuntime' + | 'cancelJob' + | 'getJobLogs' + | 'listJobArtifacts' + | 'getArtifactDownloadUrl' + +export function createJungleGridTools(client: JungleGridClient, actions: JungleGridAction[]): DynamicStructuredTool[] { + const selectedActions = actions.length > 0 ? actions : (Object.keys(toolFactories) as JungleGridAction[]) + return selectedActions.map((action) => toolFactories[action](client)) +} + +class JungleGridTool extends DynamicStructuredTool { + private readonly handler: (input: any) => Promise + + constructor(args: { name: string; description: string; schema: any; handler: (input: any) => Promise }) { + super({ + name: args.name, + description: args.description, + schema: args.schema, + baseUrl: '', + method: 'GET', + headers: {} + }) + this.handler = args.handler + } + + async _call(input: any): Promise { + return this.handler(input) + } +} + +const toolFactories: Record DynamicStructuredTool> = { + estimateJob: (client) => + new JungleGridTool({ + name: 'jungle_grid_estimate_job', + description: + 'Estimate a Jungle Grid workload before submission when cost, capacity, routing, or GPU tier matters. Accepts the same verified draft workload fields as submit_job and returns the real production estimate response, including hourly rate/range, runtime, queue/start window, route/capacity, and constraint-relaxation fields when available. Use this before jungle_grid_submit_job if spending credits or capacity selection matters.', + schema: estimateJobSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_estimate_job', input, () => client.estimateJob(input)) + }), + submitJob: (client) => + new JungleGridTool({ + name: 'jungle_grid_submit_job', + description: + 'Submit an asynchronous Jungle Grid workload. This starts remote work and returns a job_id immediately; a returned job_id does not mean the job has finished. Poll jungle_grid_get_job and use jungle_grid_get_job_runtime or jungle_grid_get_job_logs until a terminal status is reached. Never put untrusted secrets in environment variables, command, or args without explicit user permission.', + schema: submitJobSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_submit_job', input, () => client.submitJob(input)) + }), + listJobs: (client) => + new JungleGridTool({ + name: 'jungle_grid_list_jobs', + description: 'List recent Jungle Grid jobs for the authenticated account. Use this to find job IDs and recent statuses.', + schema: listJobsSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_list_jobs', input, () => client.listJobs(input)) + }), + getJob: (client) => + new JungleGridTool({ + name: 'jungle_grid_get_job', + description: + "Get current Jungle Grid job status and details by job_id. Poll this after submit_job until the job reaches a terminal status such as 'completed', 'failed', 'cancelled', or 'rejected'.", + schema: jobIdSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_get_job', input, () => client.getJob(input.job_id)) + }), + getJobRuntime: (client) => + new JungleGridTool({ + name: 'jungle_grid_get_job_runtime', + description: + 'Retrieve runtime tails, exit code, timeout flag, diagnostics, and runtime availability for a Jungle Grid job. Use this to monitor or diagnose running, failed, or completed work.', + schema: jobIdSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_get_job_runtime', input, () => client.getJobRuntime(input.job_id)) + }), + cancelJob: (client) => + new JungleGridTool({ + name: 'jungle_grid_cancel_job', + description: + 'Cancel a pending, queued, assigned, or running Jungle Grid job. This uses the verified production cancel route and has no useful effect on already terminal jobs.', + schema: cancelJobSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_cancel_job', input, () => client.cancelJob(input.job_id, input.reason)) + }), + getJobLogs: (client) => + new JungleGridTool({ + name: 'jungle_grid_get_job_logs', + description: + 'Retrieve available stdout/stderr tails and exit information for a Jungle Grid job. The official MCP package verifies this as the same production runtime endpoint used by get_job_runtime. Use it after submit_job while polling or after terminal status for diagnostics.', + schema: jobIdSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_get_job_logs', input, () => client.getJobRuntime(input.job_id)) + }), + listJobArtifacts: (client) => + new JungleGridTool({ + name: 'jungle_grid_list_job_artifacts', + description: + 'List managed artifacts uploaded by a Jungle Grid job. Retrieve artifacts after successful completion unless the API response explicitly shows partial outputs are available. Managed jobs upload regular files written under /workspace/artifacts.', + schema: jobIdSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_list_job_artifacts', input, () => client.listJobArtifacts(input.job_id)) + }), + getArtifactDownloadUrl: (client) => + new JungleGridTool({ + name: 'jungle_grid_get_artifact_download_url', + description: + 'Create a temporary signed download URL for a managed Jungle Grid artifact. Call jungle_grid_list_job_artifacts first and use an artifact_id from that response.', + schema: artifactDownloadSchema, + handler: async (input) => + safeJsonToolCall('jungle_grid_get_artifact_download_url', input, () => + client.getArtifactDownloadUrl(input.job_id, input.artifact_id) + ) + }) +} + +async function safeJsonToolCall(toolName: string, params: any, call: () => Promise): Promise { + try { + const result = await call() + return JSON.stringify(result) + } catch (error) { + const message = error instanceof Error ? error.message : typeof error === 'string' ? error : 'Unknown Jungle Grid API error' + return formatToolError(`${toolName} failed: ${message}`, redactSensitiveParams(params)) + } +} + +function normalizeBaseUrl(baseUrl?: string): string { + const raw = (baseUrl || DEFAULT_JUNGLE_GRID_BASE_URL).trim() + if (!raw) return DEFAULT_JUNGLE_GRID_BASE_URL + return raw.replace(/\/+$/, '') +} + +function removeUndefined(value: any): any { + if (Array.isArray(value)) { + return value.map(removeUndefined) + } + if (value && typeof value === 'object') { + const output: Record = {} + for (const [key, child] of Object.entries(value)) { + if (child !== undefined) output[key] = removeUndefined(child) + } + return output + } + return value +} + +function redactSensitiveParams(params: any): any { + if (!params || typeof params !== 'object') return params + const output: Record = {} + for (const [key, value] of Object.entries(params)) { + if (/token|secret|password|api[_-]?key/i.test(key)) { + output[key] = '[REDACTED]' + } else if (value && typeof value === 'object' && !Array.isArray(value)) { + output[key] = redactSensitiveParams(value) + } else { + output[key] = value + } + } + return output +} + +async function responseErrorMessage(response: any, method: string, path: string): Promise { + const fallback = `${method} ${path} failed with status ${response.status}` + const payload = response.data + if (payload === undefined || payload === null || payload === '') return fallback + if (typeof payload === 'string') { + const trimmed = payload.trim() + return trimmed ? `${fallback}: ${trimmed}` : fallback + } + + const detail = formatApiError(payload) + return detail ? `${fallback}: ${detail}` : fallback +} + +function formatApiError(payload: any): string | undefined { + if (!payload || typeof payload !== 'object') return undefined + if (typeof payload.error === 'string' && payload.error.trim()) return payload.error.trim() + if (payload.error && typeof payload.error === 'object') { + const code = stringField(payload.error.code) + const message = stringField(payload.error.message) + if (code && message) return `${code}: ${message}` + return message ?? code + } + + const code = stringField(payload.code) + const message = stringField(payload.message) + if (code && message) return `${code}: ${message}` + return message ?? code +} + +function stringField(value: any): string | undefined { + if (typeof value !== 'string') return undefined + const trimmed = value.trim() + return trimmed || undefined +} diff --git a/packages/components/nodes/tools/JungleGrid/junglegrid.svg b/packages/components/nodes/tools/JungleGrid/junglegrid.svg new file mode 100644 index 00000000000..3dc64a7bfdc --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/junglegrid.svg @@ -0,0 +1,7 @@ + + + + + + + diff --git a/packages/components/src/httpSecurity.ts b/packages/components/src/httpSecurity.ts index 9f09e000ab1..60b89501b4e 100644 --- a/packages/components/src/httpSecurity.ts +++ b/packages/components/src/httpSecurity.ts @@ -336,7 +336,11 @@ function createPinnedAgent(target: ResolvedTarget, options?: { ca?: string | str const Agent = target.protocol === 'https' ? https.Agent : http.Agent return new Agent({ - lookup: (_host, _opts, cb) => { + lookup: (_host, opts, cb) => { + if (typeof opts === 'object' && opts?.all) { + cb(null, [{ address: target.ip, family: target.family }]) + return + } cb(null, target.ip, target.family) }, ...options From bc4004755efdcc538251718808afdd0633e0f524 Mon Sep 17 00:00:00 2001 From: Benedict Blessing Gbogr Date: Mon, 1 Jun 2026 01:15:09 +0000 Subject: [PATCH 2/5] Update packages/components/nodes/tools/JungleGrid/core.ts Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- packages/components/nodes/tools/JungleGrid/core.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/components/nodes/tools/JungleGrid/core.ts b/packages/components/nodes/tools/JungleGrid/core.ts index b5f19e3835e..d1972dab08b 100644 --- a/packages/components/nodes/tools/JungleGrid/core.ts +++ b/packages/components/nodes/tools/JungleGrid/core.ts @@ -144,7 +144,7 @@ export class JungleGridClient { }) if (response.status < 200 || response.status >= 300) { - throw new Error(await responseErrorMessage(response, method, path)) + throw new Error(responseErrorMessage(response, method, path)) } if (response.status === 204) return { ok: true } From f595f27ead449ca1e910c2d12f806867dd2ecbb9 Mon Sep 17 00:00:00 2001 From: Benedict Blessing Gbogr Date: Mon, 1 Jun 2026 01:15:21 +0000 Subject: [PATCH 3/5] Update packages/components/nodes/tools/JungleGrid/core.ts Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- packages/components/nodes/tools/JungleGrid/core.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/components/nodes/tools/JungleGrid/core.ts b/packages/components/nodes/tools/JungleGrid/core.ts index d1972dab08b..f8542c75d83 100644 --- a/packages/components/nodes/tools/JungleGrid/core.ts +++ b/packages/components/nodes/tools/JungleGrid/core.ts @@ -312,7 +312,7 @@ function redactSensitiveParams(params: any): any { return output } -async function responseErrorMessage(response: any, method: string, path: string): Promise { +function responseErrorMessage(response: any, method: string, path: string): string { const fallback = `${method} ${path} failed with status ${response.status}` const payload = response.data if (payload === undefined || payload === null || payload === '') return fallback From db206aba022dfacb7d4eb7379c514ec25f738607 Mon Sep 17 00:00:00 2001 From: Benedict Blessing Gbogr Date: Mon, 1 Jun 2026 01:16:15 +0000 Subject: [PATCH 4/5] Update packages/components/nodes/tools/JungleGrid/core.ts Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- packages/components/nodes/tools/JungleGrid/core.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/components/nodes/tools/JungleGrid/core.ts b/packages/components/nodes/tools/JungleGrid/core.ts index f8542c75d83..1dc36dadef1 100644 --- a/packages/components/nodes/tools/JungleGrid/core.ts +++ b/packages/components/nodes/tools/JungleGrid/core.ts @@ -287,7 +287,7 @@ function removeUndefined(value: any): any { if (Array.isArray(value)) { return value.map(removeUndefined) } - if (value && typeof value === 'object') { + if (value != null && typeof value === 'object' && (value.constructor === Object || Object.getPrototypeOf(value) === null)) { const output: Record = {} for (const [key, child] of Object.entries(value)) { if (child !== undefined) output[key] = removeUndefined(child) From ed751d480cab164d8460e660d9a41d14dfb64e55 Mon Sep 17 00:00:00 2001 From: dejaguarkyng Date: Sat, 6 Jun 2026 03:30:09 +0000 Subject: [PATCH 5/5] feat: harden Jungle Grid integration --- .../nodes/tools/JungleGrid/JungleGrid.test.ts | 74 +++++++++ .../nodes/tools/JungleGrid/JungleGrid.ts | 16 +- .../nodes/tools/JungleGrid/README.md | 115 ++++++++++--- .../nodes/tools/JungleGrid/core.test.ts | 147 ++++++++++++++++- .../components/nodes/tools/JungleGrid/core.ts | 155 ++++++++++++++---- 5 files changed, 429 insertions(+), 78 deletions(-) create mode 100644 packages/components/nodes/tools/JungleGrid/JungleGrid.test.ts diff --git a/packages/components/nodes/tools/JungleGrid/JungleGrid.test.ts b/packages/components/nodes/tools/JungleGrid/JungleGrid.test.ts new file mode 100644 index 00000000000..625e9b6ee21 --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/JungleGrid.test.ts @@ -0,0 +1,74 @@ +import { secureAxiosRequest } from '../../../src/httpSecurity' +import { convertMultiOptionsToStringArray, getCredentialData, getCredentialParam } from '../../../src/utils' + +jest.mock('../../../src/httpSecurity', () => ({ + secureAxiosRequest: jest.fn() +})) + +jest.mock('../../../src/utils', () => ({ + convertMultiOptionsToStringArray: jest.fn((value) => (Array.isArray(value) ? value : value ? [value] : [])), + getCredentialData: jest.fn(), + getCredentialParam: jest.fn((name, credentialData) => credentialData[name]) +})) + +const mockedSecureAxiosRequest = secureAxiosRequest as jest.MockedFunction +const mockedGetCredentialData = getCredentialData as jest.MockedFunction +const mockedGetCredentialParam = getCredentialParam as jest.MockedFunction +const mockedConvertMultiOptionsToStringArray = convertMultiOptionsToStringArray as jest.MockedFunction< + typeof convertMultiOptionsToStringArray +> + +describe('JungleGrid_Tools', () => { + beforeEach(() => { + mockedSecureAxiosRequest.mockReset() + mockedSecureAxiosRequest.mockResolvedValue({ + status: 200, + data: { available: true } + } as any) + mockedGetCredentialData.mockReset() + mockedGetCredentialParam.mockClear() + mockedConvertMultiOptionsToStringArray.mockClear() + }) + + it('loads Jungle Grid credentials through Flowise credential helpers', async () => { + mockedGetCredentialData.mockResolvedValue({ + apiKey: 'credential-api-key', + baseUrl: 'https://api.example.test/' + } as any) + + const { nodeClass } = require('./JungleGrid') + const node = new nodeClass() + const tools = await node.init( + { + credential: 'credential-id', + inputs: { + actions: ['estimateJob'] + } + } as any, + '', + {} as any + ) + + await tools[0]._call({ workload_type: 'batch', image: 'python:3.11' }) + + expect(mockedGetCredentialData).toHaveBeenCalledWith('credential-id', {}) + expect(mockedGetCredentialParam).toHaveBeenCalledWith( + 'apiKey', + expect.objectContaining({ apiKey: 'credential-api-key' }), + expect.anything() + ) + expect(mockedGetCredentialParam).toHaveBeenCalledWith( + 'baseUrl', + expect.objectContaining({ baseUrl: 'https://api.example.test/' }), + expect.anything() + ) + expect(mockedSecureAxiosRequest).toHaveBeenCalledWith( + expect.objectContaining({ + url: 'https://api.example.test/v1/jobs/estimate', + headers: expect.objectContaining({ + Authorization: 'Bearer credential-api-key' + }) + }) + ) + }) +}) diff --git a/packages/components/nodes/tools/JungleGrid/JungleGrid.ts b/packages/components/nodes/tools/JungleGrid/JungleGrid.ts index 2c5a5d037ce..c3f3e822b91 100644 --- a/packages/components/nodes/tools/JungleGrid/JungleGrid.ts +++ b/packages/components/nodes/tools/JungleGrid/JungleGrid.ts @@ -10,8 +10,8 @@ const ALL_ACTIONS: { label: string; name: JungleGridAction }[] = [ { label: 'Get Job Runtime', name: 'getJobRuntime' }, { label: 'Cancel Job', name: 'cancelJob' }, { label: 'Get Job Logs', name: 'getJobLogs' }, - { label: 'List Job Artifacts', name: 'listJobArtifacts' }, - { label: 'Get Artifact Download URL', name: 'getArtifactDownloadUrl' } + { label: 'List Artifacts', name: 'listArtifacts' }, + { label: 'Get Artifact', name: 'getArtifact' } ] class JungleGrid_Tools implements INode { @@ -35,7 +35,7 @@ class JungleGrid_Tools implements INode { this.icon = 'junglegrid.svg' this.category = 'Tools' this.description = 'Estimate, submit, monitor, cancel, and retrieve artifacts for asynchronous Jungle Grid workloads' - this.documentation = 'https://junglegrid.dev/docs/mcp' + this.documentation = 'https://junglegrid.dev/docs/api' this.baseClasses = [this.type, 'Tool'] this.credential = { label: 'Jungle Grid Credential', @@ -49,15 +49,7 @@ class JungleGrid_Tools implements INode { name: 'actions', type: 'multiOptions', options: ALL_ACTIONS, - default: [ - 'estimateJob', - 'submitJob', - 'getJob', - 'getJobRuntime', - 'getJobLogs', - 'listJobArtifacts', - 'getArtifactDownloadUrl' - ], + default: ['estimateJob', 'submitJob', 'getJob', 'getJobRuntime', 'getJobLogs', 'listArtifacts', 'getArtifact'], description: 'Choose which Jungle Grid tools to expose to the agent.' } ] diff --git a/packages/components/nodes/tools/JungleGrid/README.md b/packages/components/nodes/tools/JungleGrid/README.md index 9b0dd500db9..a84e5b9125d 100644 --- a/packages/components/nodes/tools/JungleGrid/README.md +++ b/packages/components/nodes/tools/JungleGrid/README.md @@ -1,66 +1,96 @@ # Jungle Grid -The Jungle Grid tool node lets Flowise agents estimate, submit, monitor, cancel, and retrieve artifacts for asynchronous Jungle Grid workloads. +The Jungle Grid tool node lets Flowise agents estimate, submit, monitor, cancel, and retrieve outputs for asynchronous Jungle Grid workloads. -Jungle Grid acts as the durable execution layer for long-running AI workloads while Flowise remains the orchestration and visual agent-building layer. +Jungle Grid is live production infrastructure. Submitting jobs may start managed compute and spend credits. Estimate first whenever a user needs cost, capacity, or routing information before execution. + +## Links + +- Website: https://junglegrid.dev +- Docs: https://junglegrid.dev/docs +- API docs: https://junglegrid.dev/docs/api +- MCP docs: https://junglegrid.dev/docs/mcp +- MCP server: https://github.com/Jungle-Grid/mcp-server +- Discord: https://discord.com/invite/kpJqxXFFCs ## Credentials -Create a `Jungle Grid API` credential with: +Create a `Jungle Grid API` credential in Flowise: + +- `Jungle Grid API Key`: a scoped Jungle Grid API key from the Jungle Grid portal. +- `Jungle Grid API Base URL`: defaults to `https://api.junglegrid.dev`. Override only for staging, local development, or private deployments. + +Keep the API key in Flowise credentials or a server-side environment variable used to create that credential. Do not place Jungle Grid API keys in prompts, command arguments, exported flows, source code, logs, browser code, or public repositories. -- `Jungle Grid API Key`: a Jungle Grid API key. -- `Jungle Grid API Base URL`: defaults to `https://api.junglegrid.dev`. Override only for development or self-hosted orchestrators. +Recommended API key scopes: -Do not place Jungle Grid API keys in prompts, command arguments, environment variables, source code, exported flows, or logs. +- `jobs:estimate` for estimates. +- `jobs:submit` or `jobs:write` for submit and cancel. +- `jobs:read` for list, status, runtime, logs, and artifacts. +- `logs:read` for stored job logs. ## Actions -- `Estimate Job`: calls `POST /v1/jobs/estimate` before starting work. Use this when cost, capacity, routing, or GPU tier matters. -- `Submit Job`: calls `POST /v1/jobs` and returns immediately with a `job_id`. A returned `job_id` does not mean the job is complete. -- `List Jobs`: calls `GET /v1/jobs` with verified `limit` and `status` filters. -- `Get Job`: calls `GET /v1/jobs/{job_id}` to retrieve current status and details. -- `Get Job Runtime`: calls `GET /v1/jobs/{job_id}/runtime` for stdout/stderr tails, exit information, diagnostics, and runtime availability. -- `Cancel Job`: calls `POST /v1/jobs/{job_id}/cancel` with an optional reason. -- `Get Job Logs`: uses the verified runtime endpoint to return available stdout/stderr and exit information. -- `List Job Artifacts`: calls `GET /v1/jobs/{job_id}/artifacts`. -- `Get Artifact Download URL`: calls `POST /v1/jobs/{job_id}/artifacts/{artifact_id}/download`. +- `Estimate Job`: calls `POST /v1/jobs/estimate`. This is read-only and does not start compute. +- `Submit Job`: calls `POST /v1/jobs`. This starts asynchronous remote work and can spend credits. +- `List Jobs`: calls `GET /v1/jobs` with `limit`, `cursor`, and `status` filters. +- `Get Job`: calls `GET /v1/jobs/{job_id}` for status and job details. +- `Get Job Runtime`: calls `GET /v1/jobs/{job_id}/runtime` for runtime tails, exit code, timeout, and diagnostics where available. +- `Get Job Logs`: calls `GET /v1/jobs/{job_id}/logs` with `tail`, `limit`, `cursor`, and `stream` filters. +- `Cancel Job`: calls `POST /v1/jobs/{job_id}/cancel`. Use only for non-terminal jobs. +- `List Artifacts`: calls `GET /v1/jobs/{job_id}/artifacts`. +- `Get Artifact`: calls `POST /v1/jobs/{job_id}/artifacts/{artifact_id}/download` to create temporary download information. -Live log streaming is intentionally not exposed as a Flowise tool action. The official stream route is long-lived Server-Sent Events, while Flowise agent tools execute synchronously. Polling `Get Job`, `Get Job Runtime`, and `Get Job Logs` is the production-safe Flowise path. +Live log streaming uses `GET /v1/jobs/{job_id}/logs/live` as Server-Sent Events. Flowise tools execute synchronously, so this integration exposes polling through `Get Job` and `Get Job Logs` instead of holding a long-lived stream. ## Usage Pattern ```text Estimate Job - -> Submit Job + -> review estimate/capacity/cost fields + -> Submit Job after user approval -> store job_id -> Get Job / Get Job Runtime / Get Job Logs - -> wait for terminal status - -> List Job Artifacts - -> Get Artifact Download URL + -> wait for completed, failed, rejected, or cancelled + -> List Artifacts + -> Get Artifact ``` -`Submit Job` is asynchronous. It returns a `job_id` immediately, but that does not mean the workload has completed. Poll `Get Job`, `Get Job Runtime`, or `Get Job Logs` until Jungle Grid reports a terminal status. Retrieve artifacts after successful completion unless the API response explicitly shows partial outputs are available. +`Submit Job` returns a `job_id` immediately. It does not wait for completion. Poll until Jungle Grid reports a terminal status before assuming outputs are final. ## Example Workloads -Minimal inference smoke-test shape: +Inference estimate: ```json { - "name": "flowise-jungle-grid-smoke-test", + "name": "flowise-inference-estimate", "image": "python:3.11", "workload_type": "inference", "model_size_gb": 1, + "optimize_for": "balanced" +} +``` + +Low-cost batch submit: + +```json +{ + "name": "flowise-batch-smoke", + "image": "python:3.11", + "workload_type": "batch", + "model_size_gb": 1, + "optimize_for": "cost", "command": "python", "args": ["-c", "print(42)"] } ``` -Artifact-producing shape: +Agent-triggered artifact job: ```json { - "name": "flowise-jungle-grid-artifact-test", + "name": "flowise-artifact-job", "image": "python:3.11", "workload_type": "batch", "model_size_gb": 1, @@ -72,4 +102,37 @@ Artifact-producing shape: } ``` -Common use cases include running inference workloads, batch jobs, evaluation workloads, artifact-producing container jobs, and agent-monitored long-running compute work. +Job monitoring: + +```json +{ + "job_id": "job_..." +} +``` + +Logs retrieval: + +```json +{ + "job_id": "job_...", + "tail": 100, + "stream": "all" +} +``` + +Artifact retrieval: + +```json +{ + "job_id": "job_...", + "artifact_id": "art_..." +} +``` + +## Field Notes + +- `workload_type` supports `inference`, `training`, `fine_tuning` / `fine-tuning`, and `batch`. The integration sends `fine-tuning` to the REST API. +- `routing_mode` is accepted as an agent-friendly alias for `optimize_for`. +- `env` is accepted as an alias for `environment`; values must be strings. +- `callback_url`, `callback_auth_token`, and `callback_metadata` follow the documented per-job callback fields. Treat callback tokens as secrets. +- Managed jobs can upload regular files written under `/workspace/artifacts`. Direct input file upload is not part of the documented public job-submit contract; pass file references through your image, command, environment, or storage system where appropriate. diff --git a/packages/components/nodes/tools/JungleGrid/core.test.ts b/packages/components/nodes/tools/JungleGrid/core.test.ts index 8d8a7efec62..fcb41f30718 100644 --- a/packages/components/nodes/tools/JungleGrid/core.test.ts +++ b/packages/components/nodes/tools/JungleGrid/core.test.ts @@ -25,45 +25,174 @@ describe('JungleGridClient', () => { expect.objectContaining({ url: 'https://api.junglegrid.dev/v1/jobs/estimate', method: 'POST', + timeout: 30000, headers: expect.objectContaining({ Authorization: 'Bearer test-key', 'Content-Type': 'application/json' - }) + }), + data: { + workload_type: 'inference', + image: 'python:3.11' + } }) ) }) - it('uses verified production routes for lifecycle and artifact operations', async () => { + it('normalizes agent-friendly aliases before submit', async () => { const client = new JungleGridClient({ apiKey: 'test-key' }) - await client.submitJob({ workload_type: 'batch', image: 'python:3.11', command: 'python', args: ['-c', 'print(42)'] }) - await client.listJobs({ limit: 20, status: 'running' }) + await client.submitJob({ + name: 'flowise-job', + workload_type: 'fine_tuning', + image: 'python:3.11-slim', + routing_mode: 'cost', + command: 'python', + args: ['-c', 'print(42)'], + env: { CODE: 'print(42)', SHARED: 'env-value' }, + environment: { SHARED: 'environment-value' } + }) + + expect(mockedSecureAxiosRequest).toHaveBeenCalledWith( + expect.objectContaining({ + url: 'https://api.junglegrid.dev/v1/jobs', + data: { + name: 'flowise-job', + workload_type: 'fine-tuning', + image: 'python:3.11-slim', + optimize_for: 'cost', + command: 'python', + args: ['-c', 'print(42)'], + environment: { + CODE: 'print(42)', + SHARED: 'environment-value' + } + } + }) + ) + }) + + it('uses documented production routes for lifecycle, logs, and artifacts', async () => { + const client = new JungleGridClient({ apiKey: 'test-key' }) + + await client.listJobs({ limit: 20, cursor: 42, status: 'running' }) await client.getJob('job_123') await client.getJobRuntime('job_123') + await client.getJobLogs('job_123', { tail: 100, cursor: 42, stream: 'stdout' }) await client.cancelJob('job_123', 'test') await client.listJobArtifacts('job_123') await client.getArtifactDownloadUrl('job_123', 'artifact_123') const urls = mockedSecureAxiosRequest.mock.calls.map(([config]) => config.url) expect(urls).toEqual([ - 'https://api.junglegrid.dev/v1/jobs', - 'https://api.junglegrid.dev/v1/jobs?limit=20&status=running', + 'https://api.junglegrid.dev/v1/jobs?limit=20&cursor=42&status=running', 'https://api.junglegrid.dev/v1/jobs/job_123', 'https://api.junglegrid.dev/v1/jobs/job_123/runtime', + 'https://api.junglegrid.dev/v1/jobs/job_123/logs?tail=100&cursor=42&stream=stdout', 'https://api.junglegrid.dev/v1/jobs/job_123/cancel', 'https://api.junglegrid.dev/v1/jobs/job_123/artifacts', 'https://api.junglegrid.dev/v1/jobs/job_123/artifacts/artifact_123/download' ]) }) + + it('formats API errors with Jungle Grid code and message', async () => { + mockedSecureAxiosRequest.mockResolvedValueOnce({ + status: 403, + data: { + error: { + code: 'FORBIDDEN', + message: 'The token is not authorized for this Jungle Grid action.' + } + } + } as any) + const client = new JungleGridClient({ apiKey: 'test-key' }) + + await expect(client.getJob('job_123')).rejects.toThrow( + 'GET /v1/jobs/job_123 failed with status 403: FORBIDDEN: The token is not authorized for this Jungle Grid action.' + ) + }) }) describe('createJungleGridTools', () => { - it('creates agent-facing tools with async job guidance', () => { + beforeEach(() => { + mockedSecureAxiosRequest.mockReset() + mockedSecureAxiosRequest.mockResolvedValue({ + status: 200, + data: { ok: true } + } as any) + }) + + it('creates agent-facing tools with async job guidance and current artifact names', () => { const client = new JungleGridClient({ apiKey: 'test-key' }) - const tools = createJungleGridTools(client, ['estimateJob', 'submitJob', 'getJob']) + const tools = createJungleGridTools(client, ['estimateJob', 'submitJob', 'getJobLogs', 'listArtifacts', 'getArtifact']) - expect(tools.map((tool) => tool.name)).toEqual(['jungle_grid_estimate_job', 'jungle_grid_submit_job', 'jungle_grid_get_job']) + expect(tools.map((tool) => tool.name)).toEqual([ + 'jungle_grid_estimate_job', + 'jungle_grid_submit_job', + 'jungle_grid_get_job_logs', + 'jungle_grid_list_artifacts', + 'jungle_grid_get_artifact' + ]) expect(tools[1].description).toContain('returns a job_id immediately') expect(tools[1].description).toContain('does not mean the job has finished') }) + + it('validates missing required fields before submit', async () => { + const client = new JungleGridClient({ apiKey: 'test-key' }) + const [submitTool] = createJungleGridTools(client, ['submitJob']) + + const result = await (submitTool as any)._call({ workload_type: 'batch' }) + + expect(result).toContain('jungle_grid_submit_job failed') + expect(result).toContain('name: Required') + expect(result).toContain('image: Required') + expect(mockedSecureAxiosRequest).not.toHaveBeenCalled() + }) + + it('runs estimate and submit flows through tool calls', async () => { + const client = new JungleGridClient({ apiKey: 'test-key' }) + const [estimateTool, submitTool] = createJungleGridTools(client, ['estimateJob', 'submitJob']) + + await (estimateTool as any)._call({ workload_type: 'inference', image: 'python:3.11', model_size_gb: 1 }) + await (submitTool as any)._call({ + name: 'flowise-smoke', + workload_type: 'batch', + image: 'python:3.11', + command: 'python', + args: ['-c', 'print(42)'] + }) + + expect(mockedSecureAxiosRequest).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ url: expect.stringContaining('/v1/jobs/estimate') }) + ) + expect(mockedSecureAxiosRequest).toHaveBeenNthCalledWith(2, expect.objectContaining({ url: expect.stringContaining('/v1/jobs') })) + }) + + it('redacts callback tokens from tool errors and echoed params', async () => { + mockedSecureAxiosRequest.mockResolvedValueOnce({ + status: 400, + data: { + error: { + code: 'INVALID_REQUEST', + message: 'callback_auth_token "callback-secret-fixture" is invalid; Authorization: Bearer api-key-fixture' + } + } + } as any) + const client = new JungleGridClient({ apiKey: 'test-key' }) + const [submitTool] = createJungleGridTools(client, ['submitJob']) + + const result = await (submitTool as any)._call({ + name: 'flowise-smoke', + workload_type: 'batch', + image: 'python:3.11', + callback_auth_token: 'super-secret-callback-token' + }) + + expect(result).toContain('INVALID_REQUEST') + expect(result).toContain('Bearer [REDACTED]') + expect(result).toContain('"callback_auth_token":"[REDACTED]"') + expect(result).not.toContain('super-secret-callback-token') + expect(result).not.toContain('callback-secret-fixture') + expect(result).not.toContain('api-key-fixture') + }) }) diff --git a/packages/components/nodes/tools/JungleGrid/core.ts b/packages/components/nodes/tools/JungleGrid/core.ts index 1dc36dadef1..cd4e0235dc2 100644 --- a/packages/components/nodes/tools/JungleGrid/core.ts +++ b/packages/components/nodes/tools/JungleGrid/core.ts @@ -10,12 +10,13 @@ const jsonHeaders = { Accept: 'application/json' } -const workloadTypeSchema = z.enum(['inference', 'training', 'fine-tuning', 'batch']) +const workloadTypeSchema = z.enum(['inference', 'training', 'fine_tuning', 'fine-tuning', 'batch']) const optimizeForSchema = z.enum(['balanced', 'cost', 'speed']) const prioritySchema = z.enum(['low', 'balanced', 'high']) const gpuClassSchema = z.enum(['consumer', 'datacenter']) const regionModeSchema = z.enum(['prefer', 'strict']) const jobStatusSchema = z.enum(['pending', 'queued', 'assigned', 'running', 'completed', 'failed', 'rejected', 'cancelled']) +const cursorSchema = z.union([z.string(), z.number()]) const constraintsSchema = z .object({ @@ -33,29 +34,34 @@ const constraintsSchema = z const estimateJobSchema = z.object({ name: z.string().optional().describe('Optional readable job name for the draft workload.'), - workload_type: workloadTypeSchema.describe('Type of workload to estimate.'), + workload_type: workloadTypeSchema.describe("Type of workload to estimate. Use 'fine_tuning' or 'fine-tuning' for fine-tuning."), image: z.string().min(1).describe('Docker image to run.'), command: z.string().optional().describe('Optional container command to include in the estimate draft, for example python.'), args: z.array(z.string()).optional().describe('Optional arguments passed to the command.'), model_size_gb: z.number().optional().describe('Approximate model size in GB.'), disk_gb: z.number().optional().describe('Optional managed-provider local disk override in GB.'), optimize_for: optimizeForSchema.optional().describe("Scheduling optimization goal: 'balanced', 'cost', or 'speed'."), + routing_mode: optimizeForSchema.optional().describe('Agent-friendly alias for optimize_for. If both are set, they must match.'), + template: z.string().optional().describe('Optional Jungle Grid template identifier when submitting a template-backed workload.'), + notes: z.string().optional().describe('Optional execution intent notes for the estimator when supported by the API.'), latency_priority: prioritySchema.optional().describe("Latency sensitivity: 'low', 'balanced', or 'high'."), cost_priority: prioritySchema.optional().describe("Cost sensitivity: 'low', 'balanced', or 'high'."), constraints: constraintsSchema.describe('Optional routing constraints.') }) const submitJobSchema = estimateJobSchema.extend({ - command: z.string().min(1).describe('Container command to run, for example python. Submit Job starts asynchronous remote work.'), + name: z.string().min(1).describe('Readable job name. Submit Job starts asynchronous remote work and may spend credits.'), + command: z.string().optional().describe('Container command to run, for example python. Omit to use the image entrypoint or CMD.'), environment: z .record(z.string()) .optional() .describe('Environment variables injected into the container. Do not include secrets unless the user explicitly approves.'), + env: z.record(z.string()).optional().describe('Agent-friendly alias for environment. If both are set, keys from environment are used.'), huggingface_credential_id: z.string().optional().describe('Optional saved Hugging Face credential id.'), - webhook_url: z.string().optional().describe('Optional HTTPS URL to receive signed lifecycle event callbacks.'), callback_url: z.string().optional().describe('Optional documented callback URL for lifecycle events.'), callback_auth_token: z.string().optional().describe('Optional callback authentication token. Never use this for unrelated secrets.'), - callback_metadata: z.record(z.any()).optional().describe('Optional metadata included with callbacks.') + callback_metadata: z.record(z.any()).optional().describe('Optional metadata included with callbacks.'), + metadata: z.record(z.any()).optional().describe('Optional workload metadata when supported by the API.') }) const jobIdSchema = z.object({ @@ -65,18 +71,28 @@ const jobIdSchema = z.object({ const listJobsSchema = z.object({ limit: z .number() + .min(1) + .max(100) .optional() .default(20) - .describe('Maximum number of jobs to return. The official MCP package defaults to 20 and documents max 100.'), + .describe('Maximum number of jobs to return. Jungle Grid documents a maximum of 100.'), + cursor: cursorSchema.optional().describe('Optional next_cursor from a previous list_jobs response.'), status: jobStatusSchema.optional().describe('Optional status filter.') }) +const getJobLogsSchema = jobIdSchema.extend({ + tail: z.number().min(1).max(500).optional().describe('Return the most recent 1..500 log entries.'), + limit: z.number().min(1).max(500).optional().describe('Return up to this many log entries, 1..500.'), + cursor: cursorSchema.optional().describe('Optional next_cursor from a previous get_job_logs response.'), + stream: z.enum(['stdout', 'stderr', 'all']).optional().describe("Filter logs by stream. Defaults to the API's behavior.") +}) + const cancelJobSchema = jobIdSchema.extend({ reason: z.string().optional().describe('Optional cancellation reason.') }) const artifactDownloadSchema = jobIdSchema.extend({ - artifact_id: z.string().min(1).describe('Artifact ID returned by list_job_artifacts.') + artifact_id: z.string().min(1).describe('Artifact ID returned by list_artifacts.') }) export interface JungleGridClientOptions { @@ -95,16 +111,17 @@ export class JungleGridClient { } estimateJob(input: z.infer): Promise { - return this.request('POST', '/v1/jobs/estimate', input) + return this.request('POST', '/v1/jobs/estimate', normalizeJobPayload(input)) } submitJob(input: z.infer): Promise { - return this.request('POST', '/v1/jobs', input) + return this.request('POST', '/v1/jobs', normalizeJobPayload(input)) } listJobs(input: z.infer): Promise { const params = new URLSearchParams() if (input.limit !== undefined) params.set('limit', String(input.limit)) + if (input.cursor !== undefined) params.set('cursor', String(input.cursor)) if (input.status) params.set('status', input.status) const query = params.toString() return this.request('GET', query ? `/v1/jobs?${query}` : '/v1/jobs') @@ -118,6 +135,16 @@ export class JungleGridClient { return this.request('GET', `/v1/jobs/${encodeURIComponent(jobId)}/runtime`) } + getJobLogs(jobId: string, input: Omit, 'job_id'> = {}): Promise { + const params = new URLSearchParams() + if (input.tail !== undefined) params.set('tail', String(input.tail)) + if (input.limit !== undefined) params.set('limit', String(input.limit)) + if (input.cursor !== undefined) params.set('cursor', String(input.cursor)) + if (input.stream) params.set('stream', input.stream) + const query = params.toString() + return this.request('GET', `/v1/jobs/${encodeURIComponent(jobId)}/logs${query ? `?${query}` : ''}`) + } + cancelJob(jobId: string, reason?: string): Promise { return this.request('POST', `/v1/jobs/${encodeURIComponent(jobId)}/cancel`, { reason: reason ?? 'Cancelled via Flowise' @@ -140,7 +167,8 @@ export class JungleGridClient { Authorization: `Bearer ${this.apiKey}`, ...jsonHeaders }, - data: body !== undefined ? removeUndefined(body) : undefined + data: body !== undefined ? removeUndefined(body) : undefined, + timeout: 30000 }) if (response.status < 200 || response.status >= 300) { @@ -162,8 +190,8 @@ export type JungleGridAction = | 'getJobRuntime' | 'cancelJob' | 'getJobLogs' - | 'listJobArtifacts' - | 'getArtifactDownloadUrl' + | 'listArtifacts' + | 'getArtifact' export function createJungleGridTools(client: JungleGridClient, actions: JungleGridAction[]): DynamicStructuredTool[] { const selectedActions = actions.length > 0 ? actions : (Object.keys(toolFactories) as JungleGridAction[]) @@ -172,6 +200,7 @@ export function createJungleGridTools(client: JungleGridClient, actions: JungleG class JungleGridTool extends DynamicStructuredTool { private readonly handler: (input: any) => Promise + private readonly inputSchema: any constructor(args: { name: string; description: string; schema: any; handler: (input: any) => Promise }) { super({ @@ -183,10 +212,16 @@ class JungleGridTool extends DynamicStructuredTool { headers: {} }) this.handler = args.handler + this.inputSchema = args.schema } async _call(input: any): Promise { - return this.handler(input) + try { + const parsedInput = this.inputSchema?.parse ? this.inputSchema.parse(input ?? {}) : input + return this.handler(parsedInput) + } catch (error) { + return formatToolError(`${this.name} failed: ${formatErrorMessage(error)}`, redactSensitiveParams(input)) + } } } @@ -195,7 +230,7 @@ const toolFactories: Record Dyn new JungleGridTool({ name: 'jungle_grid_estimate_job', description: - 'Estimate a Jungle Grid workload before submission when cost, capacity, routing, or GPU tier matters. Accepts the same verified draft workload fields as submit_job and returns the real production estimate response, including hourly rate/range, runtime, queue/start window, route/capacity, and constraint-relaxation fields when available. Use this before jungle_grid_submit_job if spending credits or capacity selection matters.', + 'Estimate a Jungle Grid workload before submission. This is read-only and returns route, capacity, cost, queue wait, start window, warning, and screening fields when available. Use this before jungle_grid_submit_job whenever spending credits or capacity selection matters.', schema: estimateJobSchema, handler: async (input) => safeJsonToolCall('jungle_grid_estimate_job', input, () => client.estimateJob(input)) }), @@ -203,14 +238,14 @@ const toolFactories: Record Dyn new JungleGridTool({ name: 'jungle_grid_submit_job', description: - 'Submit an asynchronous Jungle Grid workload. This starts remote work and returns a job_id immediately; a returned job_id does not mean the job has finished. Poll jungle_grid_get_job and use jungle_grid_get_job_runtime or jungle_grid_get_job_logs until a terminal status is reached. Never put untrusted secrets in environment variables, command, or args without explicit user permission.', + 'Submit an asynchronous Jungle Grid workload. This may start managed compute and spend credits. It returns a job_id immediately; a returned job_id does not mean the job has finished. Poll jungle_grid_get_job and jungle_grid_get_job_logs until a terminal status is reached. Never put untrusted secrets in environment variables, command, or args without explicit user permission.', schema: submitJobSchema, handler: async (input) => safeJsonToolCall('jungle_grid_submit_job', input, () => client.submitJob(input)) }), listJobs: (client) => new JungleGridTool({ name: 'jungle_grid_list_jobs', - description: 'List recent Jungle Grid jobs for the authenticated account. Use this to find job IDs and recent statuses.', + description: 'List recent Jungle Grid jobs for the authenticated account with limit, cursor, and optional status filtering.', schema: listJobsSchema, handler: async (input) => safeJsonToolCall('jungle_grid_list_jobs', input, () => client.listJobs(input)) }), @@ -242,28 +277,34 @@ const toolFactories: Record Dyn new JungleGridTool({ name: 'jungle_grid_get_job_logs', description: - 'Retrieve available stdout/stderr tails and exit information for a Jungle Grid job. The official MCP package verifies this as the same production runtime endpoint used by get_job_runtime. Use it after submit_job while polling or after terminal status for diagnostics.', - schema: jobIdSchema, - handler: async (input) => safeJsonToolCall('jungle_grid_get_job_logs', input, () => client.getJobRuntime(input.job_id)) + 'Retrieve stored Jungle Grid stdout/stderr log entries for a job. Supports tail, limit, cursor, and stream filters. For live Server-Sent Events use Jungle Grid directly; Flowise tools poll this stored logs endpoint.', + schema: getJobLogsSchema, + handler: async (input) => + safeJsonToolCall('jungle_grid_get_job_logs', input, () => + client.getJobLogs(input.job_id, { + tail: input.tail, + limit: input.limit, + cursor: input.cursor, + stream: input.stream + }) + ) }), - listJobArtifacts: (client) => + listArtifacts: (client) => new JungleGridTool({ - name: 'jungle_grid_list_job_artifacts', + name: 'jungle_grid_list_artifacts', description: 'List managed artifacts uploaded by a Jungle Grid job. Retrieve artifacts after successful completion unless the API response explicitly shows partial outputs are available. Managed jobs upload regular files written under /workspace/artifacts.', schema: jobIdSchema, - handler: async (input) => safeJsonToolCall('jungle_grid_list_job_artifacts', input, () => client.listJobArtifacts(input.job_id)) + handler: async (input) => safeJsonToolCall('jungle_grid_list_artifacts', input, () => client.listJobArtifacts(input.job_id)) }), - getArtifactDownloadUrl: (client) => + getArtifact: (client) => new JungleGridTool({ - name: 'jungle_grid_get_artifact_download_url', + name: 'jungle_grid_get_artifact', description: - 'Create a temporary signed download URL for a managed Jungle Grid artifact. Call jungle_grid_list_job_artifacts first and use an artifact_id from that response.', + 'Create temporary download information for a managed Jungle Grid artifact. The signed URL is a temporary secret; do not print it into public logs. Call jungle_grid_list_artifacts first and use an artifact_id from that response.', schema: artifactDownloadSchema, handler: async (input) => - safeJsonToolCall('jungle_grid_get_artifact_download_url', input, () => - client.getArtifactDownloadUrl(input.job_id, input.artifact_id) - ) + safeJsonToolCall('jungle_grid_get_artifact', input, () => client.getArtifactDownloadUrl(input.job_id, input.artifact_id)) }) } @@ -272,11 +313,37 @@ async function safeJsonToolCall(toolName: string, params: any, call: () => Promi const result = await call() return JSON.stringify(result) } catch (error) { - const message = error instanceof Error ? error.message : typeof error === 'string' ? error : 'Unknown Jungle Grid API error' + const message = formatErrorMessage(error) return formatToolError(`${toolName} failed: ${message}`, redactSensitiveParams(params)) } } +function normalizeJobPayload(input: any): any { + const payload = removeUndefined(input) + + if (payload.workload_type === 'fine_tuning') { + payload.workload_type = 'fine-tuning' + } + + if (payload.routing_mode !== undefined) { + if (payload.optimize_for !== undefined && payload.optimize_for !== payload.routing_mode) { + throw new Error('optimize_for and routing_mode must match when both are provided') + } + payload.optimize_for = payload.routing_mode + delete payload.routing_mode + } + + if (payload.env !== undefined) { + payload.environment = { + ...payload.env, + ...(payload.environment ?? {}) + } + delete payload.env + } + + return payload +} + function normalizeBaseUrl(baseUrl?: string): string { const raw = (baseUrl || DEFAULT_JUNGLE_GRID_BASE_URL).trim() if (!raw) return DEFAULT_JUNGLE_GRID_BASE_URL @@ -299,10 +366,13 @@ function removeUndefined(value: any): any { function redactSensitiveParams(params: any): any { if (!params || typeof params !== 'object') return params + if (Array.isArray(params)) return params.map(redactSensitiveParams) const output: Record = {} for (const [key, value] of Object.entries(params)) { if (/token|secret|password|api[_-]?key/i.test(key)) { output[key] = '[REDACTED]' + } else if (Array.isArray(value)) { + output[key] = value.map(redactSensitiveParams) } else if (value && typeof value === 'object' && !Array.isArray(value)) { output[key] = redactSensitiveParams(value) } else { @@ -312,17 +382,40 @@ function redactSensitiveParams(params: any): any { return output } +function formatErrorMessage(error: any): string { + if (error?.issues && Array.isArray(error.issues)) { + return error.issues + .map((issue: any) => { + const path = Array.isArray(issue.path) && issue.path.length > 0 ? `${issue.path.join('.')}: ` : '' + return `${path}${issue.message}` + }) + .join('; ') + } + + if (error instanceof Error) return redactErrorMessage(error.message) + if (typeof error === 'string') return redactErrorMessage(error) + return 'Unknown Jungle Grid API error' +} + +function redactErrorMessage(message: string): string { + return message + .replace(/jg_[A-Za-z0-9._-]+/g, 'jg_[REDACTED]') + .replace(/Bearer\s+[A-Za-z0-9._-]+/gi, 'Bearer [REDACTED]') + .replace(/("(?:callback_auth_token|apiKey|api_key|token|secret|password)"\s*:\s*")([^"]+)(")/gi, '$1[REDACTED]$3') + .replace(/((?:callback_auth_token|apiKey|api_key|token|secret|password)\s*[:=]?\s*["'])([^"']+)(["'])/gi, '$1[REDACTED]$3') +} + function responseErrorMessage(response: any, method: string, path: string): string { const fallback = `${method} ${path} failed with status ${response.status}` const payload = response.data if (payload === undefined || payload === null || payload === '') return fallback if (typeof payload === 'string') { const trimmed = payload.trim() - return trimmed ? `${fallback}: ${trimmed}` : fallback + return trimmed ? `${fallback}: ${redactErrorMessage(trimmed)}` : fallback } const detail = formatApiError(payload) - return detail ? `${fallback}: ${detail}` : fallback + return detail ? `${fallback}: ${redactErrorMessage(detail)}` : fallback } function formatApiError(payload: any): string | undefined {