diff --git a/packages/components/credentials/JungleGridApi.credential.ts b/packages/components/credentials/JungleGridApi.credential.ts new file mode 100644 index 00000000000..48ed1414e5a --- /dev/null +++ b/packages/components/credentials/JungleGridApi.credential.ts @@ -0,0 +1,33 @@ +import { INodeParams, INodeCredential } from '../src/Interface' + +class JungleGridApiCredential implements INodeCredential { + label: string + name: string + version: number + description: string + inputs: INodeParams[] + + constructor() { + this.label = 'Jungle Grid API' + this.name = 'jungleGridApi' + this.version = 1.0 + this.description = + 'Use a Jungle Grid API key to estimate, submit, monitor, cancel, and retrieve artifacts for long-running workloads.' + this.inputs = [ + { + label: 'Jungle Grid API Key', + name: 'apiKey', + type: 'password' + }, + { + label: 'Jungle Grid API Base URL', + name: 'baseUrl', + type: 'url', + default: 'https://api.junglegrid.dev', + description: 'Override only for development or self-hosted Jungle Grid orchestrators.' + } + ] + } +} + +module.exports = { credClass: JungleGridApiCredential } diff --git a/packages/components/nodes/tools/JungleGrid/JungleGrid.test.ts b/packages/components/nodes/tools/JungleGrid/JungleGrid.test.ts new file mode 100644 index 00000000000..625e9b6ee21 --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/JungleGrid.test.ts @@ -0,0 +1,74 @@ +import { secureAxiosRequest } from '../../../src/httpSecurity' +import { convertMultiOptionsToStringArray, getCredentialData, getCredentialParam } from '../../../src/utils' + +jest.mock('../../../src/httpSecurity', () => ({ + secureAxiosRequest: jest.fn() +})) + +jest.mock('../../../src/utils', () => ({ + convertMultiOptionsToStringArray: jest.fn((value) => (Array.isArray(value) ? value : value ? [value] : [])), + getCredentialData: jest.fn(), + getCredentialParam: jest.fn((name, credentialData) => credentialData[name]) +})) + +const mockedSecureAxiosRequest = secureAxiosRequest as jest.MockedFunction +const mockedGetCredentialData = getCredentialData as jest.MockedFunction +const mockedGetCredentialParam = getCredentialParam as jest.MockedFunction +const mockedConvertMultiOptionsToStringArray = convertMultiOptionsToStringArray as jest.MockedFunction< + typeof convertMultiOptionsToStringArray +> + +describe('JungleGrid_Tools', () => { + beforeEach(() => { + mockedSecureAxiosRequest.mockReset() + mockedSecureAxiosRequest.mockResolvedValue({ + status: 200, + data: { available: true } + } as any) + mockedGetCredentialData.mockReset() + mockedGetCredentialParam.mockClear() + mockedConvertMultiOptionsToStringArray.mockClear() + }) + + it('loads Jungle Grid credentials through Flowise credential helpers', async () => { + mockedGetCredentialData.mockResolvedValue({ + apiKey: 'credential-api-key', + baseUrl: 'https://api.example.test/' + } as any) + + const { nodeClass } = require('./JungleGrid') + const node = new nodeClass() + const tools = await node.init( + { + credential: 'credential-id', + inputs: { + actions: ['estimateJob'] + } + } as any, + '', + {} as any + ) + + await tools[0]._call({ workload_type: 'batch', image: 'python:3.11' }) + + expect(mockedGetCredentialData).toHaveBeenCalledWith('credential-id', {}) + expect(mockedGetCredentialParam).toHaveBeenCalledWith( + 'apiKey', + expect.objectContaining({ apiKey: 'credential-api-key' }), + expect.anything() + ) + expect(mockedGetCredentialParam).toHaveBeenCalledWith( + 'baseUrl', + expect.objectContaining({ baseUrl: 'https://api.example.test/' }), + expect.anything() + ) + expect(mockedSecureAxiosRequest).toHaveBeenCalledWith( + expect.objectContaining({ + url: 'https://api.example.test/v1/jobs/estimate', + headers: expect.objectContaining({ + Authorization: 'Bearer credential-api-key' + }) + }) + ) + }) +}) diff --git a/packages/components/nodes/tools/JungleGrid/JungleGrid.ts b/packages/components/nodes/tools/JungleGrid/JungleGrid.ts new file mode 100644 index 00000000000..c3f3e822b91 --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/JungleGrid.ts @@ -0,0 +1,69 @@ +import { convertMultiOptionsToStringArray, getCredentialData, getCredentialParam } from '../../../src/utils' +import { createJungleGridTools, DEFAULT_JUNGLE_GRID_BASE_URL, JungleGridAction, JungleGridClient } from './core' +import type { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' + +const ALL_ACTIONS: { label: string; name: JungleGridAction }[] = [ + { label: 'Estimate Job', name: 'estimateJob' }, + { label: 'Submit Job', name: 'submitJob' }, + { label: 'List Jobs', name: 'listJobs' }, + { label: 'Get Job', name: 'getJob' }, + { label: 'Get Job Runtime', name: 'getJobRuntime' }, + { label: 'Cancel Job', name: 'cancelJob' }, + { label: 'Get Job Logs', name: 'getJobLogs' }, + { label: 'List Artifacts', name: 'listArtifacts' }, + { label: 'Get Artifact', name: 'getArtifact' } +] + +class JungleGrid_Tools implements INode { + label: string + name: string + version: number + type: string + icon: string + category: string + description: string + baseClasses: string[] + credential: INodeParams + inputs: INodeParams[] + documentation?: string + + constructor() { + this.label = 'Jungle Grid' + this.name = 'jungleGrid' + this.version = 1.0 + this.type = 'JungleGrid' + this.icon = 'junglegrid.svg' + this.category = 'Tools' + this.description = 'Estimate, submit, monitor, cancel, and retrieve artifacts for asynchronous Jungle Grid workloads' + this.documentation = 'https://junglegrid.dev/docs/api' + this.baseClasses = [this.type, 'Tool'] + this.credential = { + label: 'Jungle Grid Credential', + name: 'credential', + type: 'credential', + credentialNames: ['jungleGridApi'] + } + this.inputs = [ + { + label: 'Actions', + name: 'actions', + type: 'multiOptions', + options: ALL_ACTIONS, + default: ['estimateJob', 'submitJob', 'getJob', 'getJobRuntime', 'getJobLogs', 'listArtifacts', 'getArtifact'], + description: 'Choose which Jungle Grid tools to expose to the agent.' + } + ] + } + + async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { + const credentialData = await getCredentialData(nodeData.credential ?? '', options) + const apiKey = getCredentialParam('apiKey', credentialData, nodeData) as string + const baseUrl = (getCredentialParam('baseUrl', credentialData, nodeData) as string) || DEFAULT_JUNGLE_GRID_BASE_URL + const actions = convertMultiOptionsToStringArray(nodeData.inputs?.actions) as JungleGridAction[] + + const client = new JungleGridClient({ apiKey, baseUrl }) + return createJungleGridTools(client, actions) + } +} + +module.exports = { nodeClass: JungleGrid_Tools } diff --git a/packages/components/nodes/tools/JungleGrid/README.md b/packages/components/nodes/tools/JungleGrid/README.md new file mode 100644 index 00000000000..a84e5b9125d --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/README.md @@ -0,0 +1,138 @@ +# Jungle Grid + +The Jungle Grid tool node lets Flowise agents estimate, submit, monitor, cancel, and retrieve outputs for asynchronous Jungle Grid workloads. + +Jungle Grid is live production infrastructure. Submitting jobs may start managed compute and spend credits. Estimate first whenever a user needs cost, capacity, or routing information before execution. + +## Links + +- Website: https://junglegrid.dev +- Docs: https://junglegrid.dev/docs +- API docs: https://junglegrid.dev/docs/api +- MCP docs: https://junglegrid.dev/docs/mcp +- MCP server: https://github.com/Jungle-Grid/mcp-server +- Discord: https://discord.com/invite/kpJqxXFFCs + +## Credentials + +Create a `Jungle Grid API` credential in Flowise: + +- `Jungle Grid API Key`: a scoped Jungle Grid API key from the Jungle Grid portal. +- `Jungle Grid API Base URL`: defaults to `https://api.junglegrid.dev`. Override only for staging, local development, or private deployments. + +Keep the API key in Flowise credentials or a server-side environment variable used to create that credential. Do not place Jungle Grid API keys in prompts, command arguments, exported flows, source code, logs, browser code, or public repositories. + +Recommended API key scopes: + +- `jobs:estimate` for estimates. +- `jobs:submit` or `jobs:write` for submit and cancel. +- `jobs:read` for list, status, runtime, logs, and artifacts. +- `logs:read` for stored job logs. + +## Actions + +- `Estimate Job`: calls `POST /v1/jobs/estimate`. This is read-only and does not start compute. +- `Submit Job`: calls `POST /v1/jobs`. This starts asynchronous remote work and can spend credits. +- `List Jobs`: calls `GET /v1/jobs` with `limit`, `cursor`, and `status` filters. +- `Get Job`: calls `GET /v1/jobs/{job_id}` for status and job details. +- `Get Job Runtime`: calls `GET /v1/jobs/{job_id}/runtime` for runtime tails, exit code, timeout, and diagnostics where available. +- `Get Job Logs`: calls `GET /v1/jobs/{job_id}/logs` with `tail`, `limit`, `cursor`, and `stream` filters. +- `Cancel Job`: calls `POST /v1/jobs/{job_id}/cancel`. Use only for non-terminal jobs. +- `List Artifacts`: calls `GET /v1/jobs/{job_id}/artifacts`. +- `Get Artifact`: calls `POST /v1/jobs/{job_id}/artifacts/{artifact_id}/download` to create temporary download information. + +Live log streaming uses `GET /v1/jobs/{job_id}/logs/live` as Server-Sent Events. Flowise tools execute synchronously, so this integration exposes polling through `Get Job` and `Get Job Logs` instead of holding a long-lived stream. + +## Usage Pattern + +```text +Estimate Job + -> review estimate/capacity/cost fields + -> Submit Job after user approval + -> store job_id + -> Get Job / Get Job Runtime / Get Job Logs + -> wait for completed, failed, rejected, or cancelled + -> List Artifacts + -> Get Artifact +``` + +`Submit Job` returns a `job_id` immediately. It does not wait for completion. Poll until Jungle Grid reports a terminal status before assuming outputs are final. + +## Example Workloads + +Inference estimate: + +```json +{ + "name": "flowise-inference-estimate", + "image": "python:3.11", + "workload_type": "inference", + "model_size_gb": 1, + "optimize_for": "balanced" +} +``` + +Low-cost batch submit: + +```json +{ + "name": "flowise-batch-smoke", + "image": "python:3.11", + "workload_type": "batch", + "model_size_gb": 1, + "optimize_for": "cost", + "command": "python", + "args": ["-c", "print(42)"] +} +``` + +Agent-triggered artifact job: + +```json +{ + "name": "flowise-artifact-job", + "image": "python:3.11", + "workload_type": "batch", + "model_size_gb": 1, + "command": "python", + "args": [ + "-c", + "import json, os; os.makedirs('/workspace/artifacts', exist_ok=True); json.dump({'status':'ok'}, open('/workspace/artifacts/output.json','w'))" + ] +} +``` + +Job monitoring: + +```json +{ + "job_id": "job_..." +} +``` + +Logs retrieval: + +```json +{ + "job_id": "job_...", + "tail": 100, + "stream": "all" +} +``` + +Artifact retrieval: + +```json +{ + "job_id": "job_...", + "artifact_id": "art_..." +} +``` + +## Field Notes + +- `workload_type` supports `inference`, `training`, `fine_tuning` / `fine-tuning`, and `batch`. The integration sends `fine-tuning` to the REST API. +- `routing_mode` is accepted as an agent-friendly alias for `optimize_for`. +- `env` is accepted as an alias for `environment`; values must be strings. +- `callback_url`, `callback_auth_token`, and `callback_metadata` follow the documented per-job callback fields. Treat callback tokens as secrets. +- Managed jobs can upload regular files written under `/workspace/artifacts`. Direct input file upload is not part of the documented public job-submit contract; pass file references through your image, command, environment, or storage system where appropriate. diff --git a/packages/components/nodes/tools/JungleGrid/core.test.ts b/packages/components/nodes/tools/JungleGrid/core.test.ts new file mode 100644 index 00000000000..fcb41f30718 --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/core.test.ts @@ -0,0 +1,198 @@ +import { JungleGridClient, createJungleGridTools } from './core' +import { secureAxiosRequest } from '../../../src/httpSecurity' + +jest.mock('../../../src/httpSecurity', () => ({ + secureAxiosRequest: jest.fn() +})) + +const mockedSecureAxiosRequest = secureAxiosRequest as jest.MockedFunction + +describe('JungleGridClient', () => { + beforeEach(() => { + mockedSecureAxiosRequest.mockReset() + mockedSecureAxiosRequest.mockResolvedValue({ + status: 200, + data: { ok: true } + } as any) + }) + + it('uses bearer authentication and the documented estimate route', async () => { + const client = new JungleGridClient({ apiKey: 'test-key', baseUrl: 'https://api.junglegrid.dev/' }) + + await client.estimateJob({ workload_type: 'inference', image: 'python:3.11' }) + + expect(mockedSecureAxiosRequest).toHaveBeenCalledWith( + expect.objectContaining({ + url: 'https://api.junglegrid.dev/v1/jobs/estimate', + method: 'POST', + timeout: 30000, + headers: expect.objectContaining({ + Authorization: 'Bearer test-key', + 'Content-Type': 'application/json' + }), + data: { + workload_type: 'inference', + image: 'python:3.11' + } + }) + ) + }) + + it('normalizes agent-friendly aliases before submit', async () => { + const client = new JungleGridClient({ apiKey: 'test-key' }) + + await client.submitJob({ + name: 'flowise-job', + workload_type: 'fine_tuning', + image: 'python:3.11-slim', + routing_mode: 'cost', + command: 'python', + args: ['-c', 'print(42)'], + env: { CODE: 'print(42)', SHARED: 'env-value' }, + environment: { SHARED: 'environment-value' } + }) + + expect(mockedSecureAxiosRequest).toHaveBeenCalledWith( + expect.objectContaining({ + url: 'https://api.junglegrid.dev/v1/jobs', + data: { + name: 'flowise-job', + workload_type: 'fine-tuning', + image: 'python:3.11-slim', + optimize_for: 'cost', + command: 'python', + args: ['-c', 'print(42)'], + environment: { + CODE: 'print(42)', + SHARED: 'environment-value' + } + } + }) + ) + }) + + it('uses documented production routes for lifecycle, logs, and artifacts', async () => { + const client = new JungleGridClient({ apiKey: 'test-key' }) + + await client.listJobs({ limit: 20, cursor: 42, status: 'running' }) + await client.getJob('job_123') + await client.getJobRuntime('job_123') + await client.getJobLogs('job_123', { tail: 100, cursor: 42, stream: 'stdout' }) + await client.cancelJob('job_123', 'test') + await client.listJobArtifacts('job_123') + await client.getArtifactDownloadUrl('job_123', 'artifact_123') + + const urls = mockedSecureAxiosRequest.mock.calls.map(([config]) => config.url) + expect(urls).toEqual([ + 'https://api.junglegrid.dev/v1/jobs?limit=20&cursor=42&status=running', + 'https://api.junglegrid.dev/v1/jobs/job_123', + 'https://api.junglegrid.dev/v1/jobs/job_123/runtime', + 'https://api.junglegrid.dev/v1/jobs/job_123/logs?tail=100&cursor=42&stream=stdout', + 'https://api.junglegrid.dev/v1/jobs/job_123/cancel', + 'https://api.junglegrid.dev/v1/jobs/job_123/artifacts', + 'https://api.junglegrid.dev/v1/jobs/job_123/artifacts/artifact_123/download' + ]) + }) + + it('formats API errors with Jungle Grid code and message', async () => { + mockedSecureAxiosRequest.mockResolvedValueOnce({ + status: 403, + data: { + error: { + code: 'FORBIDDEN', + message: 'The token is not authorized for this Jungle Grid action.' + } + } + } as any) + const client = new JungleGridClient({ apiKey: 'test-key' }) + + await expect(client.getJob('job_123')).rejects.toThrow( + 'GET /v1/jobs/job_123 failed with status 403: FORBIDDEN: The token is not authorized for this Jungle Grid action.' + ) + }) +}) + +describe('createJungleGridTools', () => { + beforeEach(() => { + mockedSecureAxiosRequest.mockReset() + mockedSecureAxiosRequest.mockResolvedValue({ + status: 200, + data: { ok: true } + } as any) + }) + + it('creates agent-facing tools with async job guidance and current artifact names', () => { + const client = new JungleGridClient({ apiKey: 'test-key' }) + const tools = createJungleGridTools(client, ['estimateJob', 'submitJob', 'getJobLogs', 'listArtifacts', 'getArtifact']) + + expect(tools.map((tool) => tool.name)).toEqual([ + 'jungle_grid_estimate_job', + 'jungle_grid_submit_job', + 'jungle_grid_get_job_logs', + 'jungle_grid_list_artifacts', + 'jungle_grid_get_artifact' + ]) + expect(tools[1].description).toContain('returns a job_id immediately') + expect(tools[1].description).toContain('does not mean the job has finished') + }) + + it('validates missing required fields before submit', async () => { + const client = new JungleGridClient({ apiKey: 'test-key' }) + const [submitTool] = createJungleGridTools(client, ['submitJob']) + + const result = await (submitTool as any)._call({ workload_type: 'batch' }) + + expect(result).toContain('jungle_grid_submit_job failed') + expect(result).toContain('name: Required') + expect(result).toContain('image: Required') + expect(mockedSecureAxiosRequest).not.toHaveBeenCalled() + }) + + it('runs estimate and submit flows through tool calls', async () => { + const client = new JungleGridClient({ apiKey: 'test-key' }) + const [estimateTool, submitTool] = createJungleGridTools(client, ['estimateJob', 'submitJob']) + + await (estimateTool as any)._call({ workload_type: 'inference', image: 'python:3.11', model_size_gb: 1 }) + await (submitTool as any)._call({ + name: 'flowise-smoke', + workload_type: 'batch', + image: 'python:3.11', + command: 'python', + args: ['-c', 'print(42)'] + }) + + expect(mockedSecureAxiosRequest).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ url: expect.stringContaining('/v1/jobs/estimate') }) + ) + expect(mockedSecureAxiosRequest).toHaveBeenNthCalledWith(2, expect.objectContaining({ url: expect.stringContaining('/v1/jobs') })) + }) + + it('redacts callback tokens from tool errors and echoed params', async () => { + mockedSecureAxiosRequest.mockResolvedValueOnce({ + status: 400, + data: { + error: { + code: 'INVALID_REQUEST', + message: 'callback_auth_token "callback-secret-fixture" is invalid; Authorization: Bearer api-key-fixture' + } + } + } as any) + const client = new JungleGridClient({ apiKey: 'test-key' }) + const [submitTool] = createJungleGridTools(client, ['submitJob']) + + const result = await (submitTool as any)._call({ + name: 'flowise-smoke', + workload_type: 'batch', + image: 'python:3.11', + callback_auth_token: 'super-secret-callback-token' + }) + + expect(result).toContain('INVALID_REQUEST') + expect(result).toContain('Bearer [REDACTED]') + expect(result).toContain('"callback_auth_token":"[REDACTED]"') + expect(result).not.toContain('super-secret-callback-token') + expect(result).not.toContain('callback-secret-fixture') + expect(result).not.toContain('api-key-fixture') + }) +}) diff --git a/packages/components/nodes/tools/JungleGrid/core.ts b/packages/components/nodes/tools/JungleGrid/core.ts new file mode 100644 index 00000000000..cd4e0235dc2 --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/core.ts @@ -0,0 +1,441 @@ +import { z } from 'zod/v3' +import { DynamicStructuredTool } from '../OpenAPIToolkit/core' +import { formatToolError } from '../../../src/agents' +import { secureAxiosRequest } from '../../../src/httpSecurity' + +export const DEFAULT_JUNGLE_GRID_BASE_URL = 'https://api.junglegrid.dev' + +const jsonHeaders = { + 'Content-Type': 'application/json', + Accept: 'application/json' +} + +const workloadTypeSchema = z.enum(['inference', 'training', 'fine_tuning', 'fine-tuning', 'batch']) +const optimizeForSchema = z.enum(['balanced', 'cost', 'speed']) +const prioritySchema = z.enum(['low', 'balanced', 'high']) +const gpuClassSchema = z.enum(['consumer', 'datacenter']) +const regionModeSchema = z.enum(['prefer', 'strict']) +const jobStatusSchema = z.enum(['pending', 'queued', 'assigned', 'running', 'completed', 'failed', 'rejected', 'cancelled']) +const cursorSchema = z.union([z.string(), z.number()]) + +const constraintsSchema = z + .object({ + max_price_per_hour: z.number().optional().describe('Optional maximum hourly price in USD, as documented by Jungle Grid.'), + preferred_gpu_family: z.string().optional().describe('Optional preferred GPU family, such as l4.'), + avoid_gpu_families: z.array(z.string()).optional().describe('Optional GPU families to avoid.'), + gpu_type: z.string().optional().describe('Optional exact GPU override verified by the official Jungle Grid MCP package.'), + gpu_class: gpuClassSchema.optional().describe('Optional GPU class preference verified by the official Jungle Grid MCP package.'), + region_preference: z.string().optional().describe('Optional preferred region such as us-east or eu-west.'), + region_mode: regionModeSchema.optional().describe('Whether the region preference is preferred or strict.'), + latency_priority: prioritySchema.optional().describe("Latency sensitivity: 'low', 'balanced', or 'high'."), + cost_priority: prioritySchema.optional().describe("Cost sensitivity: 'low', 'balanced', or 'high'.") + }) + .optional() + +const estimateJobSchema = z.object({ + name: z.string().optional().describe('Optional readable job name for the draft workload.'), + workload_type: workloadTypeSchema.describe("Type of workload to estimate. Use 'fine_tuning' or 'fine-tuning' for fine-tuning."), + image: z.string().min(1).describe('Docker image to run.'), + command: z.string().optional().describe('Optional container command to include in the estimate draft, for example python.'), + args: z.array(z.string()).optional().describe('Optional arguments passed to the command.'), + model_size_gb: z.number().optional().describe('Approximate model size in GB.'), + disk_gb: z.number().optional().describe('Optional managed-provider local disk override in GB.'), + optimize_for: optimizeForSchema.optional().describe("Scheduling optimization goal: 'balanced', 'cost', or 'speed'."), + routing_mode: optimizeForSchema.optional().describe('Agent-friendly alias for optimize_for. If both are set, they must match.'), + template: z.string().optional().describe('Optional Jungle Grid template identifier when submitting a template-backed workload.'), + notes: z.string().optional().describe('Optional execution intent notes for the estimator when supported by the API.'), + latency_priority: prioritySchema.optional().describe("Latency sensitivity: 'low', 'balanced', or 'high'."), + cost_priority: prioritySchema.optional().describe("Cost sensitivity: 'low', 'balanced', or 'high'."), + constraints: constraintsSchema.describe('Optional routing constraints.') +}) + +const submitJobSchema = estimateJobSchema.extend({ + name: z.string().min(1).describe('Readable job name. Submit Job starts asynchronous remote work and may spend credits.'), + command: z.string().optional().describe('Container command to run, for example python. Omit to use the image entrypoint or CMD.'), + environment: z + .record(z.string()) + .optional() + .describe('Environment variables injected into the container. Do not include secrets unless the user explicitly approves.'), + env: z.record(z.string()).optional().describe('Agent-friendly alias for environment. If both are set, keys from environment are used.'), + huggingface_credential_id: z.string().optional().describe('Optional saved Hugging Face credential id.'), + callback_url: z.string().optional().describe('Optional documented callback URL for lifecycle events.'), + callback_auth_token: z.string().optional().describe('Optional callback authentication token. Never use this for unrelated secrets.'), + callback_metadata: z.record(z.any()).optional().describe('Optional metadata included with callbacks.'), + metadata: z.record(z.any()).optional().describe('Optional workload metadata when supported by the API.') +}) + +const jobIdSchema = z.object({ + job_id: z.string().min(1).describe('Jungle Grid job_id returned by submit_job.') +}) + +const listJobsSchema = z.object({ + limit: z + .number() + .min(1) + .max(100) + .optional() + .default(20) + .describe('Maximum number of jobs to return. Jungle Grid documents a maximum of 100.'), + cursor: cursorSchema.optional().describe('Optional next_cursor from a previous list_jobs response.'), + status: jobStatusSchema.optional().describe('Optional status filter.') +}) + +const getJobLogsSchema = jobIdSchema.extend({ + tail: z.number().min(1).max(500).optional().describe('Return the most recent 1..500 log entries.'), + limit: z.number().min(1).max(500).optional().describe('Return up to this many log entries, 1..500.'), + cursor: cursorSchema.optional().describe('Optional next_cursor from a previous get_job_logs response.'), + stream: z.enum(['stdout', 'stderr', 'all']).optional().describe("Filter logs by stream. Defaults to the API's behavior.") +}) + +const cancelJobSchema = jobIdSchema.extend({ + reason: z.string().optional().describe('Optional cancellation reason.') +}) + +const artifactDownloadSchema = jobIdSchema.extend({ + artifact_id: z.string().min(1).describe('Artifact ID returned by list_artifacts.') +}) + +export interface JungleGridClientOptions { + apiKey: string + baseUrl?: string +} + +export class JungleGridClient { + private readonly apiKey: string + private readonly baseUrl: string + + constructor({ apiKey, baseUrl }: JungleGridClientOptions) { + if (!apiKey) throw new Error('Jungle Grid API key is required') + this.apiKey = apiKey + this.baseUrl = normalizeBaseUrl(baseUrl) + } + + estimateJob(input: z.infer): Promise { + return this.request('POST', '/v1/jobs/estimate', normalizeJobPayload(input)) + } + + submitJob(input: z.infer): Promise { + return this.request('POST', '/v1/jobs', normalizeJobPayload(input)) + } + + listJobs(input: z.infer): Promise { + const params = new URLSearchParams() + if (input.limit !== undefined) params.set('limit', String(input.limit)) + if (input.cursor !== undefined) params.set('cursor', String(input.cursor)) + if (input.status) params.set('status', input.status) + const query = params.toString() + return this.request('GET', query ? `/v1/jobs?${query}` : '/v1/jobs') + } + + getJob(jobId: string): Promise { + return this.request('GET', `/v1/jobs/${encodeURIComponent(jobId)}`) + } + + getJobRuntime(jobId: string): Promise { + return this.request('GET', `/v1/jobs/${encodeURIComponent(jobId)}/runtime`) + } + + getJobLogs(jobId: string, input: Omit, 'job_id'> = {}): Promise { + const params = new URLSearchParams() + if (input.tail !== undefined) params.set('tail', String(input.tail)) + if (input.limit !== undefined) params.set('limit', String(input.limit)) + if (input.cursor !== undefined) params.set('cursor', String(input.cursor)) + if (input.stream) params.set('stream', input.stream) + const query = params.toString() + return this.request('GET', `/v1/jobs/${encodeURIComponent(jobId)}/logs${query ? `?${query}` : ''}`) + } + + cancelJob(jobId: string, reason?: string): Promise { + return this.request('POST', `/v1/jobs/${encodeURIComponent(jobId)}/cancel`, { + reason: reason ?? 'Cancelled via Flowise' + }) + } + + listJobArtifacts(jobId: string): Promise { + return this.request('GET', `/v1/jobs/${encodeURIComponent(jobId)}/artifacts`) + } + + getArtifactDownloadUrl(jobId: string, artifactId: string): Promise { + return this.request('POST', `/v1/jobs/${encodeURIComponent(jobId)}/artifacts/${encodeURIComponent(artifactId)}/download`) + } + + private async request(method: string, path: string, body?: any): Promise { + const response = await secureAxiosRequest({ + url: `${this.baseUrl}${path}`, + method, + headers: { + Authorization: `Bearer ${this.apiKey}`, + ...jsonHeaders + }, + data: body !== undefined ? removeUndefined(body) : undefined, + timeout: 30000 + }) + + if (response.status < 200 || response.status >= 300) { + throw new Error(responseErrorMessage(response, method, path)) + } + + if (response.status === 204) return { ok: true } + + if (response.data === undefined || response.data === null || response.data === '') return { ok: true } + return response.data + } +} + +export type JungleGridAction = + | 'estimateJob' + | 'submitJob' + | 'listJobs' + | 'getJob' + | 'getJobRuntime' + | 'cancelJob' + | 'getJobLogs' + | 'listArtifacts' + | 'getArtifact' + +export function createJungleGridTools(client: JungleGridClient, actions: JungleGridAction[]): DynamicStructuredTool[] { + const selectedActions = actions.length > 0 ? actions : (Object.keys(toolFactories) as JungleGridAction[]) + return selectedActions.map((action) => toolFactories[action](client)) +} + +class JungleGridTool extends DynamicStructuredTool { + private readonly handler: (input: any) => Promise + private readonly inputSchema: any + + constructor(args: { name: string; description: string; schema: any; handler: (input: any) => Promise }) { + super({ + name: args.name, + description: args.description, + schema: args.schema, + baseUrl: '', + method: 'GET', + headers: {} + }) + this.handler = args.handler + this.inputSchema = args.schema + } + + async _call(input: any): Promise { + try { + const parsedInput = this.inputSchema?.parse ? this.inputSchema.parse(input ?? {}) : input + return this.handler(parsedInput) + } catch (error) { + return formatToolError(`${this.name} failed: ${formatErrorMessage(error)}`, redactSensitiveParams(input)) + } + } +} + +const toolFactories: Record DynamicStructuredTool> = { + estimateJob: (client) => + new JungleGridTool({ + name: 'jungle_grid_estimate_job', + description: + 'Estimate a Jungle Grid workload before submission. This is read-only and returns route, capacity, cost, queue wait, start window, warning, and screening fields when available. Use this before jungle_grid_submit_job whenever spending credits or capacity selection matters.', + schema: estimateJobSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_estimate_job', input, () => client.estimateJob(input)) + }), + submitJob: (client) => + new JungleGridTool({ + name: 'jungle_grid_submit_job', + description: + 'Submit an asynchronous Jungle Grid workload. This may start managed compute and spend credits. It returns a job_id immediately; a returned job_id does not mean the job has finished. Poll jungle_grid_get_job and jungle_grid_get_job_logs until a terminal status is reached. Never put untrusted secrets in environment variables, command, or args without explicit user permission.', + schema: submitJobSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_submit_job', input, () => client.submitJob(input)) + }), + listJobs: (client) => + new JungleGridTool({ + name: 'jungle_grid_list_jobs', + description: 'List recent Jungle Grid jobs for the authenticated account with limit, cursor, and optional status filtering.', + schema: listJobsSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_list_jobs', input, () => client.listJobs(input)) + }), + getJob: (client) => + new JungleGridTool({ + name: 'jungle_grid_get_job', + description: + "Get current Jungle Grid job status and details by job_id. Poll this after submit_job until the job reaches a terminal status such as 'completed', 'failed', 'cancelled', or 'rejected'.", + schema: jobIdSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_get_job', input, () => client.getJob(input.job_id)) + }), + getJobRuntime: (client) => + new JungleGridTool({ + name: 'jungle_grid_get_job_runtime', + description: + 'Retrieve runtime tails, exit code, timeout flag, diagnostics, and runtime availability for a Jungle Grid job. Use this to monitor or diagnose running, failed, or completed work.', + schema: jobIdSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_get_job_runtime', input, () => client.getJobRuntime(input.job_id)) + }), + cancelJob: (client) => + new JungleGridTool({ + name: 'jungle_grid_cancel_job', + description: + 'Cancel a pending, queued, assigned, or running Jungle Grid job. This uses the verified production cancel route and has no useful effect on already terminal jobs.', + schema: cancelJobSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_cancel_job', input, () => client.cancelJob(input.job_id, input.reason)) + }), + getJobLogs: (client) => + new JungleGridTool({ + name: 'jungle_grid_get_job_logs', + description: + 'Retrieve stored Jungle Grid stdout/stderr log entries for a job. Supports tail, limit, cursor, and stream filters. For live Server-Sent Events use Jungle Grid directly; Flowise tools poll this stored logs endpoint.', + schema: getJobLogsSchema, + handler: async (input) => + safeJsonToolCall('jungle_grid_get_job_logs', input, () => + client.getJobLogs(input.job_id, { + tail: input.tail, + limit: input.limit, + cursor: input.cursor, + stream: input.stream + }) + ) + }), + listArtifacts: (client) => + new JungleGridTool({ + name: 'jungle_grid_list_artifacts', + description: + 'List managed artifacts uploaded by a Jungle Grid job. Retrieve artifacts after successful completion unless the API response explicitly shows partial outputs are available. Managed jobs upload regular files written under /workspace/artifacts.', + schema: jobIdSchema, + handler: async (input) => safeJsonToolCall('jungle_grid_list_artifacts', input, () => client.listJobArtifacts(input.job_id)) + }), + getArtifact: (client) => + new JungleGridTool({ + name: 'jungle_grid_get_artifact', + description: + 'Create temporary download information for a managed Jungle Grid artifact. The signed URL is a temporary secret; do not print it into public logs. Call jungle_grid_list_artifacts first and use an artifact_id from that response.', + schema: artifactDownloadSchema, + handler: async (input) => + safeJsonToolCall('jungle_grid_get_artifact', input, () => client.getArtifactDownloadUrl(input.job_id, input.artifact_id)) + }) +} + +async function safeJsonToolCall(toolName: string, params: any, call: () => Promise): Promise { + try { + const result = await call() + return JSON.stringify(result) + } catch (error) { + const message = formatErrorMessage(error) + return formatToolError(`${toolName} failed: ${message}`, redactSensitiveParams(params)) + } +} + +function normalizeJobPayload(input: any): any { + const payload = removeUndefined(input) + + if (payload.workload_type === 'fine_tuning') { + payload.workload_type = 'fine-tuning' + } + + if (payload.routing_mode !== undefined) { + if (payload.optimize_for !== undefined && payload.optimize_for !== payload.routing_mode) { + throw new Error('optimize_for and routing_mode must match when both are provided') + } + payload.optimize_for = payload.routing_mode + delete payload.routing_mode + } + + if (payload.env !== undefined) { + payload.environment = { + ...payload.env, + ...(payload.environment ?? {}) + } + delete payload.env + } + + return payload +} + +function normalizeBaseUrl(baseUrl?: string): string { + const raw = (baseUrl || DEFAULT_JUNGLE_GRID_BASE_URL).trim() + if (!raw) return DEFAULT_JUNGLE_GRID_BASE_URL + return raw.replace(/\/+$/, '') +} + +function removeUndefined(value: any): any { + if (Array.isArray(value)) { + return value.map(removeUndefined) + } + if (value != null && typeof value === 'object' && (value.constructor === Object || Object.getPrototypeOf(value) === null)) { + const output: Record = {} + for (const [key, child] of Object.entries(value)) { + if (child !== undefined) output[key] = removeUndefined(child) + } + return output + } + return value +} + +function redactSensitiveParams(params: any): any { + if (!params || typeof params !== 'object') return params + if (Array.isArray(params)) return params.map(redactSensitiveParams) + const output: Record = {} + for (const [key, value] of Object.entries(params)) { + if (/token|secret|password|api[_-]?key/i.test(key)) { + output[key] = '[REDACTED]' + } else if (Array.isArray(value)) { + output[key] = value.map(redactSensitiveParams) + } else if (value && typeof value === 'object' && !Array.isArray(value)) { + output[key] = redactSensitiveParams(value) + } else { + output[key] = value + } + } + return output +} + +function formatErrorMessage(error: any): string { + if (error?.issues && Array.isArray(error.issues)) { + return error.issues + .map((issue: any) => { + const path = Array.isArray(issue.path) && issue.path.length > 0 ? `${issue.path.join('.')}: ` : '' + return `${path}${issue.message}` + }) + .join('; ') + } + + if (error instanceof Error) return redactErrorMessage(error.message) + if (typeof error === 'string') return redactErrorMessage(error) + return 'Unknown Jungle Grid API error' +} + +function redactErrorMessage(message: string): string { + return message + .replace(/jg_[A-Za-z0-9._-]+/g, 'jg_[REDACTED]') + .replace(/Bearer\s+[A-Za-z0-9._-]+/gi, 'Bearer [REDACTED]') + .replace(/("(?:callback_auth_token|apiKey|api_key|token|secret|password)"\s*:\s*")([^"]+)(")/gi, '$1[REDACTED]$3') + .replace(/((?:callback_auth_token|apiKey|api_key|token|secret|password)\s*[:=]?\s*["'])([^"']+)(["'])/gi, '$1[REDACTED]$3') +} + +function responseErrorMessage(response: any, method: string, path: string): string { + const fallback = `${method} ${path} failed with status ${response.status}` + const payload = response.data + if (payload === undefined || payload === null || payload === '') return fallback + if (typeof payload === 'string') { + const trimmed = payload.trim() + return trimmed ? `${fallback}: ${redactErrorMessage(trimmed)}` : fallback + } + + const detail = formatApiError(payload) + return detail ? `${fallback}: ${redactErrorMessage(detail)}` : fallback +} + +function formatApiError(payload: any): string | undefined { + if (!payload || typeof payload !== 'object') return undefined + if (typeof payload.error === 'string' && payload.error.trim()) return payload.error.trim() + if (payload.error && typeof payload.error === 'object') { + const code = stringField(payload.error.code) + const message = stringField(payload.error.message) + if (code && message) return `${code}: ${message}` + return message ?? code + } + + const code = stringField(payload.code) + const message = stringField(payload.message) + if (code && message) return `${code}: ${message}` + return message ?? code +} + +function stringField(value: any): string | undefined { + if (typeof value !== 'string') return undefined + const trimmed = value.trim() + return trimmed || undefined +} diff --git a/packages/components/nodes/tools/JungleGrid/junglegrid.svg b/packages/components/nodes/tools/JungleGrid/junglegrid.svg new file mode 100644 index 00000000000..3dc64a7bfdc --- /dev/null +++ b/packages/components/nodes/tools/JungleGrid/junglegrid.svg @@ -0,0 +1,7 @@ + + + + + + + diff --git a/packages/components/src/httpSecurity.ts b/packages/components/src/httpSecurity.ts index 9f09e000ab1..60b89501b4e 100644 --- a/packages/components/src/httpSecurity.ts +++ b/packages/components/src/httpSecurity.ts @@ -336,7 +336,11 @@ function createPinnedAgent(target: ResolvedTarget, options?: { ca?: string | str const Agent = target.protocol === 'https' ? https.Agent : http.Agent return new Agent({ - lookup: (_host, _opts, cb) => { + lookup: (_host, opts, cb) => { + if (typeof opts === 'object' && opts?.all) { + cb(null, [{ address: target.ip, family: target.family }]) + return + } cb(null, target.ip, target.family) }, ...options