Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -79,9 +82,10 @@
@WritesAttribute(attribute = ValidateJson.ERROR_ATTRIBUTE_KEY, description = "If the flow file is routed to the invalid relationship "
+ ", this attribute will contain the error message resulting from the validation failure.")
})
@CapabilityDescription("Validates the contents of FlowFiles against a configurable JSON Schema. See json-schema.org for specification standards. " +
"This Processor does not support input containing multiple JSON objects, such as newline-delimited JSON. If the input FlowFile contains " +
"newline-delimited JSON, only the first line will be validated."
@CapabilityDescription("""
Validates the contents of FlowFiles against a configurable JSON Schema. See json-schema.org for specification standards.
This Processor supports input containing multiple JSON objects using newline-delimited JSON based on configuration properties,
otherwise if the input FlowFile contains newline-delimited JSON, only the first line will be validated."""
)
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Validating JSON requires reading FlowFile content into memory")
@Restricted(
Expand Down Expand Up @@ -125,7 +129,35 @@ public String getDescription() {
}
}

protected static final String ERROR_ATTRIBUTE_KEY = "json.validation.errors";
enum InputFormat implements DescribedValue {
FLOW_FILE("FlowFile", "Validation applied to FlowFile content containing JSON"),
JSON_LINES("JSON Lines", "Validation applied to FlowFile content containing JSON Lines or NDJSON");

private final String displayName;
private final String description;

InputFormat(final String displayName, final String description) {
this.displayName = displayName;
this.description = description;
}

@Override
public String getValue() {
return name();
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}

static final String ERROR_ATTRIBUTE_KEY = "json.validation.errors";
private static final String SCHEMA_NAME_PROPERTY_NAME = "Schema Name";
private static final String SCHEMA_CONTENT_PROPERTY_NAME = "JSON Schema";
private static final String DEFAULT_MAX_STRING_LENGTH = "20 MB";
Expand Down Expand Up @@ -165,6 +197,14 @@ public String getDescription() {
.dependsOn(SCHEMA_ACCESS_STRATEGY, JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
.build();

public static final PropertyDescriptor INPUT_FORMAT = new PropertyDescriptor.Builder()
.name("Input Format")
.description("Specifies the expected format of FlowFile content containing one or more JSON elements")
.allowableValues(InputFormat.class)
.defaultValue(InputFormat.FLOW_FILE)
.required(true)
.build();

public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
.name("Max String Length")
.description("The maximum allowed length of a string value when parsing the JSON document")
Expand All @@ -184,6 +224,7 @@ public String getDescription() {
SCHEMA_REGISTRY,
SCHEMA_CONTENT,
SCHEMA_VERSION,
INPUT_FORMAT,
MAX_STRING_LENGTH
);

Expand Down Expand Up @@ -298,27 +339,38 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
}

if (schema == null) {
getLogger().error("JSON schema not configured for {}", flowFile);
session.getProvenanceReporter().route(flowFile, REL_FAILURE);
session.transfer(flowFile, REL_FAILURE);
return;
}
Comment thread
dan-s1 marked this conversation as resolved.

final InputFormat inputFormat = context.getProperty(INPUT_FORMAT).asAllowableValue(InputFormat.class);
if (inputFormat == InputFormat.FLOW_FILE) {
validateFlowFile(session, flowFile);
} else {
validateJsonLines(session, flowFile);
}
}

private void validateFlowFile(final ProcessSession session, final FlowFile flowFile) {
final Schema currentSchema = schema;

try (final InputStream in = session.read(flowFile)) {
final JsonNode node = mapper.readTree(in);
final Schema activeSchema = schema;
if (activeSchema == null) {
getLogger().error("JSON schema not configured for {}", flowFile);
session.getProvenanceReporter().route(flowFile, REL_FAILURE);
session.transfer(flowFile, REL_FAILURE);
return;
}
final List<Error> errors = activeSchema.validate(node);
final List<Error> errors = currentSchema.validate(node);

if (errors.isEmpty()) {
getLogger().debug("JSON {} valid", flowFile);
session.getProvenanceReporter().route(flowFile, REL_VALID);
session.transfer(flowFile, REL_VALID);
} else {
final String validationMessages = errors.toString();
flowFile = session.putAttribute(flowFile, ERROR_ATTRIBUTE_KEY, validationMessages);
final FlowFile invalidJsonFlowFile = session.putAttribute(flowFile, ERROR_ATTRIBUTE_KEY, validationMessages);
getLogger().warn("JSON {} invalid: Validation Errors {}", flowFile, validationMessages);
session.getProvenanceReporter().route(flowFile, REL_INVALID);
session.transfer(flowFile, REL_INVALID);
session.getProvenanceReporter().route(invalidJsonFlowFile, REL_INVALID);
session.transfer(invalidJsonFlowFile, REL_INVALID);
}
} catch (final Exception e) {
getLogger().error("JSON processing failed {}", flowFile, e);
Expand All @@ -327,24 +379,57 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
}

private void validateJsonLines(final ProcessSession session, final FlowFile flowFile) {
final Schema currentSchema = schema;

try (final InputStream in = session.read(flowFile);
final LineNumberReader reader = new LineNumberReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {

String line;

while ((line = reader.readLine()) != null) {
if (line.isBlank()) {
continue;
}

final JsonNode node = mapper.readTree(line);
final List<Error> errors = currentSchema.validate(node);

if (!errors.isEmpty()) {
reader.close(); // NOTE: Must call close otherwise get IllegalStateException indicating FlowFile already in use
// by an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
final String validationMessages = errors.toString();
final String validationErrMsg = "JSON at line %s is invalid: %s".formatted(reader.getLineNumber(), validationMessages);
final FlowFile invalidJsonFlowFile = session.putAttribute(flowFile, ERROR_ATTRIBUTE_KEY, validationErrMsg);
getLogger().warn("JSON at line {} in {} is invalid: Validation Errors {}", reader.getLineNumber(), flowFile, validationMessages);
session.getProvenanceReporter().route(invalidJsonFlowFile, REL_INVALID);
session.transfer(invalidJsonFlowFile, REL_INVALID);
return;
}
}

session.getProvenanceReporter().route(flowFile, REL_VALID);
Comment thread
dan-s1 marked this conversation as resolved.
session.transfer(flowFile, REL_VALID);

} catch (final Exception e) {
getLogger().error("{} processing failed {}", InputFormat.JSON_LINES.getDisplayName(), flowFile, e);
session.getProvenanceReporter().route(flowFile, REL_FAILURE);
Comment thread
exceptionfactory marked this conversation as resolved.
session.transfer(flowFile, REL_FAILURE);
}
}

private String getPropertyValidateMessage(JsonSchemaStrategy schemaAccessStrategy, PropertyDescriptor property) {
return "The '" + schemaAccessStrategy.getValue() + "' Schema Access Strategy requires that the " + property.getDisplayName() + " property be set.";
}

private SpecificationVersion mapToSpecification(final SchemaVersion schemaVersion) {
switch (schemaVersion) {
case DRAFT_4:
return SpecificationVersion.DRAFT_4;
case DRAFT_6:
return SpecificationVersion.DRAFT_6;
case DRAFT_7:
return SpecificationVersion.DRAFT_7;
case DRAFT_2019_09:
return SpecificationVersion.DRAFT_2019_09;
case DRAFT_2020_12:
return SpecificationVersion.DRAFT_2020_12;
}
throw new IllegalArgumentException("Unsupported schema version: " + schemaVersion);
return switch (schemaVersion) {
case DRAFT_4 -> SpecificationVersion.DRAFT_4;
case DRAFT_6 -> SpecificationVersion.DRAFT_6;
case DRAFT_7 -> SpecificationVersion.DRAFT_7;
case DRAFT_2019_09 -> SpecificationVersion.DRAFT_2019_09;
case DRAFT_2020_12 -> SpecificationVersion.DRAFT_2020_12;
};
}

private JsonSchemaStrategy getSchemaAccessStrategy(PropertyContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class TestValidateJson {
private static final String JSON = getFileContent("simple-example.json");
Expand All @@ -62,7 +63,7 @@ public void setUp() {
runner = TestRunners.newTestRunner(ValidateJson.class);
}

@ParameterizedTest(name = "{2}")
@ParameterizedTest
@MethodSource("customValidateArgs")
void testCustomValidateMissingProperty(final ValidateJson.JsonSchemaStrategy strategy) {
runner.setProperty(ValidateJson.SCHEMA_ACCESS_STRATEGY, strategy);
Expand Down Expand Up @@ -252,6 +253,35 @@ void testSchemaRetrievalFromRegistry() throws InitializationException {
runner.assertTransferCount(ValidateJson.REL_VALID, 1);
}

@ParameterizedTest
@MethodSource("multilineJsonArgs")
void testMultilineJsonWhereSecondLineInvalid(ValidateJson.InputFormat inputFormat, boolean expectedValid) {
final String multilineJson = """
{"FieldOne":"stringValue","FieldTwo":1234,"FieldThree":[{"arrayField":"arrayValue"}]}
{"FieldOne":"stringValue","FieldTwo":"NAN","FieldThree":[{"arrayField":"arrayValue"}]}
""";
runner.setProperty(ValidateJson.SCHEMA_CONTENT, SIMPLE_SCHEMA);
runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, SCHEMA_VERSION);
runner.setProperty(ValidateJson.INPUT_FORMAT, inputFormat.getValue());
runner.enqueue(multilineJson);

runner.run();

runner.assertTransferCount(ValidateJson.REL_FAILURE, 0);
if (expectedValid) {
runner.assertTransferCount(ValidateJson.REL_INVALID, 0);
runner.assertTransferCount(ValidateJson.REL_VALID, 1);
} else {
runner.assertTransferCount(ValidateJson.REL_INVALID, 1);
runner.assertTransferCount(ValidateJson.REL_VALID, 0);

assertTrue(runner.getLogger().getWarnMessages().stream()
.anyMatch(logMessage -> logMessage.getMsg().contains("JSON at line 2") && logMessage.getMsg().contains("is invalid")));
}

runner.clearTransferState();
}

private void assertValidationErrors(Relationship relationship, boolean expected) {
final Map<String, String> attributes = runner.getFlowFilesForRelationship(relationship).getFirst().getAttributes();

Expand All @@ -265,8 +295,15 @@ private void assertValidationErrors(Relationship relationship, boolean expected)

private static Stream<Arguments> customValidateArgs() {
return Stream.of(
Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY, "requires that the JSON Schema Registry property be set"),
Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY, "requires that the JSON Schema property be set")
Arguments.argumentSet("Require JSON Schema Registry property to be set", ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY),
Arguments.argumentSet("Require JSON Schema property to be set", ValidateJson.JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
);
}

private static Stream<Arguments> multilineJsonArgs() {
return Stream.of(
Arguments.argumentSet(ValidateJson.InputFormat.FLOW_FILE.getDisplayName(), ValidateJson.InputFormat.FLOW_FILE.getValue(), true),
Comment thread
exceptionfactory marked this conversation as resolved.
Arguments.argumentSet(ValidateJson.InputFormat.JSON_LINES.getDisplayName(), ValidateJson.InputFormat.JSON_LINES.getValue(), false)
);
}

Expand Down
Loading