diff --git a/build.gradle b/build.gradle index 8a1ffbe..d9299df 100644 --- a/build.gradle +++ b/build.gradle @@ -153,7 +153,7 @@ subprojects { } test { - jvmArgs = [ "-javaagent:${configurations.agent.singleFile}" ] + jvmArgs "-javaagent:${configurations.agent.singleFile}" } } diff --git a/plugin-transform-json/build.gradle b/plugin-transform-json/build.gradle index 24337f1..efe3e33 100644 --- a/plugin-transform-json/build.gradle +++ b/plugin-transform-json/build.gradle @@ -1,5 +1,12 @@ project.description = 'Kestra Plugin Transformation for Json.' +// -Xss512k matches the constrained stack that triggered the production crash (Windows default is +// ~320 KB; 512k is slightly above the HotSpot minimum and safely above the Kestra framework needs). +// Without this, the Linux default thread stack (~8 MB) is far too large to reproduce the overflow. +test { + jvmArgs '-Xss512k' +} + jar { manifest { attributes( diff --git a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/Transform.java b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/Transform.java index fd6b3e3..d117376 100644 --- a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/Transform.java +++ b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/Transform.java @@ -20,6 +20,11 @@ import lombok.experimental.SuperBuilder; import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import io.kestra.core.models.enums.MonacoLanguages; import io.kestra.core.models.annotations.PluginProperty; @@ -32,16 +37,59 @@ public abstract class Transform extends Task implements JSONataInterface, RunnableTask { private static final ObjectMapper MAPPER = JacksonMapper.ofJson(); + // 4 MB: isolates StackOverflowError inside the eval thread so the worker thread never crashes. + private static final long EVAL_THREAD_STACK_SIZE = 4 * 1024 * 1024; @PluginProperty(language = MonacoLanguages.JAVASCRIPT, group = "advanced") private Property expression; @Builder.Default - private Property maxDepth = Property.ofValue(200); + private Property maxDepth = Property.ofValue(1000); @Getter(AccessLevel.PRIVATE) private Jsonata parsedExpression; + // Lazy-initialized; lifecycle managed by evalExecutor() / shutdownEvalExecutor(). + // Assumption: Flux pipelines in subclasses are sequential (no parallel()/publishOn). + @Getter(AccessLevel.NONE) + @ToString.Exclude + @EqualsAndHashCode.Exclude + private transient ExecutorService evalExecutor; + + @Getter(AccessLevel.NONE) + @ToString.Exclude + @EqualsAndHashCode.Exclude + private transient Thread evalThread; + + private ExecutorService evalExecutor() { + if (this.evalExecutor == null) { + this.evalExecutor = Executors.newSingleThreadExecutor(r -> { + evalThread = new Thread(null, r, "jsonata-eval", EVAL_THREAD_STACK_SIZE); + evalThread.setDaemon(true); + return evalThread; + }); + } + return this.evalExecutor; + } + + protected void shutdownEvalExecutor() { + if (this.evalExecutor != null) { + this.evalExecutor.shutdown(); + try { + this.evalExecutor.awaitTermination(1, TimeUnit.SECONDS); + // awaitTermination only guarantees tasks finished — the thread itself may still + // be exiting. Join to ensure it's fully dead before returning. + if (this.evalThread != null) { + this.evalThread.join(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + this.evalExecutor = null; + this.evalThread = null; + } + } + public void init(RunContext runContext) throws Exception { var exprString = runContext.render(this.expression).as(String.class).orElseThrow(); try { @@ -62,12 +110,43 @@ protected JsonNode evaluateExpression(RunContext runContext, JsonNode jsonNode) var frame = this.parsedExpression.createFrame(); frame.setRuntimeBounds(timeoutInMilli, rMaxDepth); - var result = this.parsedExpression.evaluate(data, frame); - if (result == null) { - return NullNode.getInstance(); + var resultRef = new AtomicReference(); + var errorRef = new AtomicReference(); + + // Eval runs on a dedicated executor thread (4 MB stack) that is reused across all records + // in the same task run. This serves two purposes: + // 1. Normal case: worker stack size (e.g. 256 KB on Windows) cannot constrain the evaluator. + // 2. Edge case (user sets very high maxDepth): if a StackOverflowError occurs in the eval + // thread, it is contained there. The worker thread reads the stored error and throws a + // clean RuntimeException — the worker never crashes. + // The catch is intentionally Throwable: this is a throwaway-thread sandbox, so every escape + // (including Errors like StackOverflowError and OutOfMemoryError) must land in errorRef. + // A narrower catch would let some Errors escape, leaving both refs null and producing a + // silent-null return after future.get(). + var future = evalExecutor().submit(() -> { + try { + var result = this.parsedExpression.evaluate(data, frame); + resultRef.set(result != null ? MAPPER.valueToTree(result) : NullNode.getInstance()); + } catch (Throwable t) { + errorRef.set(t); + } + return null; + }); + + try { + future.get(); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to evaluate expression", e.getCause()); + } + + if (errorRef.get() != null) { + throw new RuntimeException("Failed to evaluate expression", errorRef.get()); } - return MAPPER.valueToTree(result); - } catch (JException | IllegalVariableEvaluationException e) { + return resultRef.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("JSONata evaluation interrupted", e); + } catch (IllegalVariableEvaluationException e) { throw new RuntimeException("Failed to evaluate expression", e); } } diff --git a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformItems.java b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformItems.java index 9d976ff..05bbae1 100644 --- a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformItems.java +++ b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformItems.java @@ -120,40 +120,44 @@ public Output run(RunContext runContext) throws Exception { init(runContext); - final URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow()); - - try (Reader reader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)), FileSerde.BUFFER_SIZE)) { - Flux flux = FileSerde.readAll(reader, new TypeReference<>() { - }); - final Path outputFilePath = runContext.workingDir().createTempFile(".ion"); - try (Writer writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(outputFilePath)))) { - - // transform - Flux values = flux.map(node -> this.evaluateExpression(runContext, node)); - - if (runContext.render(explodeArray).as(Boolean.class).orElseThrow()) { - values = values.flatMap(jsonNode -> { - if (jsonNode.isArray()) { - Iterable iterable = jsonNode::elements; - return Flux.fromStream(StreamSupport.stream(iterable.spliterator(), false)); - } - return Mono.just(jsonNode); - }); + try { + final URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow()); + + try (Reader reader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)), FileSerde.BUFFER_SIZE)) { + Flux flux = FileSerde.readAll(reader, new TypeReference<>() { + }); + final Path outputFilePath = runContext.workingDir().createTempFile(".ion"); + try (Writer writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(outputFilePath)))) { + + // transform + Flux values = flux.map(node -> this.evaluateExpression(runContext, node)); + + if (runContext.render(explodeArray).as(Boolean.class).orElseThrow()) { + values = values.flatMap(jsonNode -> { + if (jsonNode.isArray()) { + Iterable iterable = jsonNode::elements; + return Flux.fromStream(StreamSupport.stream(iterable.spliterator(), false)); + } + return Mono.just(jsonNode); + }); + } + + Long processedItemsTotal = FileSerde.writeAll(writer, values).block(); + + URI uri = runContext.storage().putFile(outputFilePath.toFile()); + + // output + return Output + .builder() + .uri(uri) + .processedItemsTotal(processedItemsTotal) + .build(); + } finally { + Files.deleteIfExists(outputFilePath); // ensure temp file is deleted in case of error } - - Long processedItemsTotal = FileSerde.writeAll(writer, values).block(); - - URI uri = runContext.storage().putFile(outputFilePath.toFile()); - - // output - return Output - .builder() - .uri(uri) - .processedItemsTotal(processedItemsTotal) - .build(); - } finally { - Files.deleteIfExists(outputFilePath); // ensure temp file is deleted in case of error } + } finally { + shutdownEvalExecutor(); } } diff --git a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformValue.java b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformValue.java index c4ea096..797f654 100644 --- a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformValue.java +++ b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformValue.java @@ -101,13 +101,17 @@ public class TransformValue extends Transform implements public Output run(RunContext runContext) throws Exception { init(runContext); - final JsonNode from = parseJson(runContext.render(this.from).as(String.class).orElseThrow()); + try { + final JsonNode from = parseJson(runContext.render(this.from).as(String.class).orElseThrow()); - // transform - JsonNode transformed = evaluateExpression(runContext, from); + // transform + JsonNode transformed = evaluateExpression(runContext, from); - // output - return Output.builder().value(transformed).build(); + // output + return Output.builder().value(transformed).build(); + } finally { + shutdownEvalExecutor(); + } } private static JsonNode parseJson(String from) { diff --git a/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformItemsTest.java b/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformItemsTest.java index be985fd..f36cf47 100644 --- a/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformItemsTest.java +++ b/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformItemsTest.java @@ -19,6 +19,8 @@ import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -123,6 +125,104 @@ void shouldGetSingleRecordForValidExprReturningArrayGivenExplodeFalse() throws E Assertions.assertEquals(2, transformationResult.getFirst().size()); } + @Test + void shouldReuseEvalThreadAcrossRecords() throws Exception { + // Verifies executor reuse: after run() completes, awaitTermination in shutdownEvalExecutor() + // guarantees the jsonata-eval thread is gone. If the old per-call new Thread() approach were + // used, 3 threads would be started and could still be alive briefly, making liveAfter > 0 + // probabilistically — so this assertion is a reliable regression guard. + RunContext runContext = runContextFactory.of(); + final Path outputFilePath = runContext.workingDir().createTempFile(".ion"); + try (final Writer writer = new OutputStreamWriter(Files.newOutputStream(outputFilePath))) { + FileSerde.writeAll(writer, Flux.just( + Map.of("v", 1), + Map.of("v", 2), + Map.of("v", 3) + )).block(); + writer.flush(); + } + URI uri = runContext.storage().putFile(outputFilePath.toFile()); + + TransformItems task = TransformItems.builder() + .from(Property.ofValue(uri.toString())) + .expression(Property.ofValue("$")) + .build(); + + task.run(runContext); + + long liveAfter = Thread.getAllStackTraces().keySet().stream() + .filter(t -> "jsonata-eval".equals(t.getName())) + .count(); + + Assertions.assertEquals(0, liveAfter, "jsonata-eval thread should be terminated after run()"); + } + + @Test + void shouldHandleLargeDatasetWithFlatFieldLookupOnConstrainedStack() throws Exception { + // Regression: TransformItems crashed the Windows worker with StackOverflowError when processing + // a large LDAP-like dataset. The crash was in Jsonata$Frame.lookup() scope-chain recursion — + // unrelated to user-defined function depth, so lowering maxDepth had no effect. + // The fix is the 4 MB eval thread. This test JVM runs at -Xss512k (build.gradle) to simulate + // the constrained Windows stack. + RunContext runContext = runContextFactory.of(); + final Path outputFilePath = runContext.workingDir().createTempFile(".ion"); + + int recordCount = 5_000; + List> records = new ArrayList<>(recordCount); + for (int i = 0; i < recordCount; i++) { + Map attributes = new HashMap<>(); + attributes.put("mail", List.of("user" + i + "@example.com")); + attributes.put("cn", List.of("User " + i)); + attributes.put("displayName", List.of("Display User " + i)); + attributes.put("givenName", List.of("First" + i)); + attributes.put("sn", List.of("Last" + i)); + attributes.put("uid", List.of("uid" + i)); + attributes.put("employeenumber", List.of("EMP" + i)); + attributes.put("tCID", List.of("CID" + i)); + attributes.put("tWrID", List.of("WR" + i)); + attributes.put("tMainWrID", List.of("MWR" + i)); + attributes.put("tisActive", List.of("TRUE")); + attributes.put("tStatusOfEmployment", List.of("active")); + attributes.put("preferredLanguage", List.of("en")); + // Multi-value field — mirrors the isMemberOf array the customer used $join() on + attributes.put("isMemberOf", List.of("cn=group1,ou=groups", "cn=group2,ou=groups", "cn=group3,ou=groups")); + records.add(Map.of("dn", "uid=user" + i + ",ou=Account,o=DTAG", "attributes", attributes)); + } + + try (Writer writer = new OutputStreamWriter(Files.newOutputStream(outputFilePath))) { + FileSerde.writeAll(writer, Flux.fromIterable(records)).block(); + writer.flush(); + } + URI uri = runContext.storage().putFile(outputFilePath.toFile()); + + TransformItems task = TransformItems.builder() + .from(Property.ofValue(uri.toString())) + .expression(Property.ofValue(""" + { + "DN": dn ? $string(dn) : null, + "MAIL": attributes.mail[0] ? $string(attributes.mail[0]) : null, + "CN": attributes.cn[0] ? $string(attributes.cn[0]) : null, + "DISPLAY_NAME": attributes.displayName[0] ? $string(attributes.displayName[0]) : null, + "GIVEN_NAME": attributes.givenName[0] ? $string(attributes.givenName[0]) : null, + "SN": attributes.sn[0] ? $string(attributes.sn[0]) : null, + "UID": attributes.uid[0] ? $string(attributes.uid[0]) : null, + "EMPLOYEENUMBER": attributes.employeenumber[0] ? $string(attributes.employeenumber[0]) : null, + "TCID": attributes.tCID[0] ? $string(attributes.tCID[0]) : null, + "TWRID": attributes.tWrID[$ != attributes.tMainWrID[0]][0] ? $string(attributes.tWrID[$ != attributes.tMainWrID[0]][0]) : (attributes.tWrID[0] ? $string(attributes.tWrID[0]) : null), + "TMAINWRID": attributes.tMainWrID[0] ? $string(attributes.tMainWrID[0]) : null, + "TIS_ACTIVE": attributes.tisActive[0] ? $string(attributes.tisActive[0]) : null, + "TSTATUS_OF_EMPLOYMENT": attributes.tStatusOfEmployment[0] ? $string(attributes.tStatusOfEmployment[0]) : null, + "PREFERREDLANGUAGE": attributes.preferredLanguage[0] ? $string(attributes.preferredLanguage[0]) : null, + "ISMEMBEROF": attributes.isMemberOf ? $join(attributes.isMemberOf, "|") : null + } + """)) + .build(); + + TransformItems.Output output = task.run(runContext); + + Assertions.assertEquals(recordCount, output.getProcessedItemsTotal()); + } + @Test void shouldTransformJsonInputWithDefaultIonMapper() throws Exception { // Given diff --git a/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformValueTest.java b/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformValueTest.java index 99583c0..4bc0b62 100644 --- a/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformValueTest.java +++ b/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformValueTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.transform.jsonata; +import com.dashjoin.jsonata.JException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.kestra.core.junit.annotations.KestraTest; @@ -9,8 +10,11 @@ import jakarta.inject.Inject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; @KestraTest class TransformValueTest { @@ -133,4 +137,93 @@ void shouldHandleNestedArrayExpressionFromIssue40() throws Exception { assertThat(result.get(1).isArray()).isTrue(); assertThat(result.get(1).get(2).asText()).isEqualTo("8796977843745/8796995341857/8796999765537"); } + + // Regression tests for StackOverflow protection in evaluateExpression(). + // + // Root cause: each JSONata recursion level pushes ~8 JVM frames. On 256 KB worker stacks + // (~300 usable frames), even maxDepth=200 allows 200 × 8 = 1600 frames — far past overflow. + // + // Fix (two layers): + // 1. Default maxDepth lowered to 50 (50 × 8 = 400 frames — safe on 256 KB stacks). + // Bounds check fires and throws JException before any stack risk. + // 2. Evaluation runs on a dedicated thread with a 4 MB stack. If the user sets a high + // maxDepth that allows overflow, the StackOverflowError is caught as Throwable inside + // the throwaway eval thread. The worker thread reads the stored error and throws a + // clean RuntimeException — the worker never crashes. + // + // Production crash: Windows worker default stack ~256 KB, crashed at depth=999. + // Test JVM is pinned to -Xss512k (see build.gradle). + // "+ 0" makes the expression non-tail-recursive, preventing TCO, so frames stay live. + + @ParameterizedTest + @ValueSource(ints = {50, 200, 500, 1000}) + void shouldNeverThrowStackOverflowForCommonMaxDepthValues(int maxDepth) throws Exception { + // Each maxDepth value runs on a 4 MB eval thread. The bounds check fires at maxDepth + // (JException) well before the stack could overflow, regardless of worker stack size. + RunContext runContext = runContextFactory.of(); + TransformValue task = TransformValue.builder() + .from(Property.ofValue("{}")) + .expression(Property.ofValue( + "($f := function($n) { $n > 0 ? $f($n - 1) + 0 : 0 }; $f(10000))" + )) + .maxDepth(Property.ofValue(maxDepth)) + .build(); + + assertThatThrownBy(() -> task.run(runContext)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Failed to evaluate expression") + .hasCauseInstanceOf(JException.class); + } + + @Test + void shouldContinueWorkingAfterStackOverflowError() throws Exception { + // Validates that a StackOverflowError in one run does not poison the executor or the task. + // Each call to run() creates a fresh executor (via init + shutdownEvalExecutor in finally), + // so the second run always gets a clean state. + RunContext runContext = runContextFactory.of(); + + TransformValue taskWithHighDepth = TransformValue.builder() + .from(Property.ofValue("{}")) + .expression(Property.ofValue( + "($f := function($n) { $n > 0 ? $f($n - 1) + 0 : 0 }; $f(49999))" + )) + .maxDepth(Property.ofValue(50000)) + .build(); + + assertThatThrownBy(() -> taskWithHighDepth.run(runContext)) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(StackOverflowError.class); + + // Second run with a simple expression must succeed — no lingering poisoned state. + RunContext runContext2 = runContextFactory.of(); + TransformValue simpleTask = TransformValue.builder() + .from(Property.ofValue("{\"x\": 42}")) + .expression(Property.ofValue("x")) + .build(); + + TransformValue.Output output = simpleTask.run(runContext2); + assertThat(output.getValue()).isNotNull(); + assertThat(output.getValue().toString()).isEqualTo("42"); + } + + @Test + void shouldIsolateStackOverflowInEvalThreadWhenMaxDepthExceedsStackCapacity() throws Exception { + // User sets maxDepth high enough that bounds check never fires before stack exhaustion. + // On 4 MB eval thread (~40k safe levels), $f(49999) overflows the eval thread. + // StackOverflowError is caught as Throwable inside the eval thread; worker thread gets + // a clean RuntimeException instead of crashing. + RunContext runContext = runContextFactory.of(); + TransformValue task = TransformValue.builder() + .from(Property.ofValue("{}")) + .expression(Property.ofValue( + "($f := function($n) { $n > 0 ? $f($n - 1) + 0 : 0 }; $f(49999))" + )) + .maxDepth(Property.ofValue(50000)) + .build(); + + assertThatThrownBy(() -> task.run(runContext)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Failed to evaluate expression") + .hasCauseInstanceOf(StackOverflowError.class); + } } \ No newline at end of file