NIFI-15934 Handle oversized FlowFiles and empty header keys in PublishAMQP#11242
NIFI-15934 Handle oversized FlowFiles and empty header keys in PublishAMQP#11242ing-mattioni wants to merge 1 commit into
Conversation
| */ | ||
| private byte[] extractMessage(final FlowFile flowFile, ProcessSession session) { | ||
| final byte[] messageContent = new byte[(int) flowFile.getSize()]; | ||
| final byte[] messageContent = new byte[Math.toIntExact(flowFile.getSize())]; |
There was a problem hiding this comment.
is Math.toIntExact required since we filter out larger flowfiles before?
There was a problem hiding this comment.
Resolved by checking against the configured size limit before allocation, so the remaining cast is bounded.
|
|
||
| if (flowFile.getSize() > Integer.MAX_VALUE) { | ||
| getLogger().error("FlowFile {} with size {} bytes is too large to publish as an AMQP message", flowFile, flowFile.getSize()); | ||
| session.transfer(session.penalize(flowFile), REL_FAILURE); |
There was a problem hiding this comment.
we usually don't penalize when sending into the failure relationship unless we believe that the failure relationship is used as a self-loop on the processor (but this is not a good practice in terms of flow design given that we have the retry at framework level)
There was a problem hiding this comment.
Removed penalization when routing to failure and added a regression assertion.
| final String key = kv[0].trim(); | ||
| if (key.isEmpty()) { | ||
| getLogger().warn("Ignoring AMQP header property with empty key: {}", strEntry); | ||
| continue; | ||
| } | ||
| headers.put(key, kv[1].trim()); | ||
| } else if (kv.length == 1) { | ||
| headers.put(kv[0].trim(), null); | ||
| final String key = kv[0].trim(); | ||
| if (key.isEmpty()) { | ||
| continue; | ||
| } | ||
| headers.put(key, null); |
There was a problem hiding this comment.
Should the two empty-key branches log at the same level (both warn, or both silent), and could the trim plus empty check be extracted into a small helper so the behavior is identical for key=value and bare-key entries?
There was a problem hiding this comment.
Extracted header insertion into a helper so empty keys are handled consistently for both key=value and bare-key entries.
exceptionfactory
left a comment
There was a problem hiding this comment.
On initial review, the proposed check on input FlowFile size looks questionable. It may need to be a configurable property.
| return; | ||
| } | ||
|
|
||
| if (flowFile.getSize() > Integer.MAX_VALUE) { |
There was a problem hiding this comment.
This does not seem like the right approach, the Integer.MAX_VALUE is a very large limit, but there should be other more sensible boundaries, probably lower.
There was a problem hiding this comment.
Addressed by adding a configurable outbound body-size property with a 64 MB default and upper bound, so oversized FlowFiles are routed to failure before reading content into memory.
21f645b to
6e5b2d4
Compare
|
Thank you @pvillard31 and @exceptionfactory , I made the following changes:
|
6e5b2d4 to
9754c75
Compare
|
Addressed the Checkstyle failure in the latest update. The issue was caused by tab characters in |
|
The remaining test failure appears to be unrelated to this PR. The failed test is in
The failure is a timeout while waiting for an asynchronous condition, while this PR only changes |
exceptionfactory
left a comment
There was a problem hiding this comment.
Thanks for the updates @ing-mattioni. Adding the property looks like a good direction. I provided a comment on the naming, and raised a question about the default value.
| public static final PropertyDescriptor MAX_OUTBOUND_MESSAGE_BODY_SIZE = new PropertyDescriptor.Builder() | ||
| .name("Max Outbound Message Body Size") |
There was a problem hiding this comment.
I recommend naming this Maximum Input FlowFile Size since that it what is controls.
| .name("Max Outbound Message Body Size") | ||
| .description("Maximum body size of outbound messages. FlowFiles larger than this value are routed to failure without loading content into memory.") | ||
| .required(true) | ||
| .defaultValue("64 MB") |
There was a problem hiding this comment.
Is there a general published maximum size of AMQP messages, or some reference for using 64 MB is the default value?
Summary
NIFI-15934
This pull request updates
PublishAMQPto improve handling of oversized FlowFiles and AMQP header attributes.PublishAMQPreads FlowFile content into a byte array before publishing an AMQP message. Since Java byte arrays require anintlength, FlowFiles larger thanInteger.MAX_VALUEcannot be published using this path. This change validates the FlowFile size before attempting byte-array allocation and routes oversized FlowFiles tofailurewith penalization.This pull request also updates AMQP header parsing to ignore empty header keys instead of adding them to the AMQP headers map.
P.S. The existing unit test for AMQP header parsing was extended to cover empty header keys. The oversized FlowFile guard is a defensive validation before byte-array allocation; additional targeted coverage may require avoiding real multi-GB content allocation.
Changes
Integer.MAX_VALUEtofailurewith penalizationMath.toIntExact()when converting FlowFile size to byte array lengthTracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkModule-level verification performed:
.\mvnw.cmd -pl :nifi-amqp-processors -am testAdditional verification performed:
.\mvnw.cmd -pl :nifi-amqp-processors -am -P contrib-check install -DskipTests.\mvnw.cmd -pl :nifi-amqp-nar -am install -DskipTestsLicensing
LICENSEandNOTICEfilesNo new dependencies were added.
Documentation
No user-facing documentation files were changed.