Skip to content

NIFI-15934 Handle oversized FlowFiles and empty header keys in PublishAMQP#11242

Open
ing-mattioni wants to merge 1 commit into
apache:mainfrom
ing-mattioni:NIFI-15934-publishamqp-header-size
Open

NIFI-15934 Handle oversized FlowFiles and empty header keys in PublishAMQP#11242
ing-mattioni wants to merge 1 commit into
apache:mainfrom
ing-mattioni:NIFI-15934-publishamqp-header-size

Conversation

@ing-mattioni
Copy link
Copy Markdown

@ing-mattioni ing-mattioni commented May 13, 2026

Summary

NIFI-15934

This pull request updates PublishAMQP to improve handling of oversized FlowFiles and AMQP header attributes.

PublishAMQP reads FlowFile content into a byte array before publishing an AMQP message. Since Java byte arrays require an int length, FlowFiles larger than Integer.MAX_VALUE cannot be published using this path. This change validates the FlowFile size before attempting byte-array allocation and routes oversized FlowFiles to failure with penalization.

This pull request also updates AMQP header parsing to ignore empty header keys instead of adding them to the AMQP headers map.

P.S. The existing unit test for AMQP header parsing was extended to cover empty header keys. The oversized FlowFile guard is a defensive validation before byte-array allocation; additional targeted coverage may require avoiding real multi-GB content allocation.

Changes

  • Validates FlowFile size before extracting message content into a byte array
  • Routes FlowFiles larger than Integer.MAX_VALUE to failure with penalization
  • Uses Math.toIntExact() when converting FlowFile size to byte array length
  • Ignores empty AMQP header keys when parsing AMQP header attributes
  • Extends AMQP header parsing test coverage for empty header keys

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Module-level verification performed:

.\mvnw.cmd -pl :nifi-amqp-processors -am test

Additional verification performed:

.\mvnw.cmd -pl :nifi-amqp-processors -am -P contrib-check install -DskipTests

.\mvnw.cmd -pl :nifi-amqp-nar -am install -DskipTests

Licensing

  • New dependencies are compatible with the Apache License 2.0
  • New dependencies are documented in applicable LICENSE and NOTICE files

No new dependencies were added.

Documentation

  • Documentation formatting appears as expected in rendered files

No user-facing documentation files were changed.

*/
private byte[] extractMessage(final FlowFile flowFile, ProcessSession session) {
final byte[] messageContent = new byte[(int) flowFile.getSize()];
final byte[] messageContent = new byte[Math.toIntExact(flowFile.getSize())];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is Math.toIntExact required since we filter out larger flowfiles before?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved by checking against the configured size limit before allocation, so the remaining cast is bounded.


if (flowFile.getSize() > Integer.MAX_VALUE) {
getLogger().error("FlowFile {} with size {} bytes is too large to publish as an AMQP message", flowFile, flowFile.getSize());
session.transfer(session.penalize(flowFile), REL_FAILURE);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we usually don't penalize when sending into the failure relationship unless we believe that the failure relationship is used as a self-loop on the processor (but this is not a good practice in terms of flow design given that we have the retry at framework level)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed penalization when routing to failure and added a regression assertion.

Comment on lines +326 to +337
final String key = kv[0].trim();
if (key.isEmpty()) {
getLogger().warn("Ignoring AMQP header property with empty key: {}", strEntry);
continue;
}
headers.put(key, kv[1].trim());
} else if (kv.length == 1) {
headers.put(kv[0].trim(), null);
final String key = kv[0].trim();
if (key.isEmpty()) {
continue;
}
headers.put(key, null);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the two empty-key branches log at the same level (both warn, or both silent), and could the trim plus empty check be extracted into a small helper so the behavior is identical for key=value and bare-key entries?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted header insertion into a helper so empty keys are handled consistently for both key=value and bare-key entries.

Copy link
Copy Markdown
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On initial review, the proposed check on input FlowFile size looks questionable. It may need to be a configurable property.

return;
}

if (flowFile.getSize() > Integer.MAX_VALUE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem like the right approach, the Integer.MAX_VALUE is a very large limit, but there should be other more sensible boundaries, probably lower.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by adding a configurable outbound body-size property with a 64 MB default and upper bound, so oversized FlowFiles are routed to failure before reading content into memory.

@ing-mattioni ing-mattioni force-pushed the NIFI-15934-publishamqp-header-size branch from 21f645b to 6e5b2d4 Compare May 18, 2026 07:45
@ing-mattioni
Copy link
Copy Markdown
Author

Thank you @pvillard31 and @exceptionfactory , I made the following changes:

  • Added configurable Max Outbound Message Body Size in nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:100, default/bounded at 64 MB.
  • Oversized FlowFiles now route to failure before content is read into memory.
  • Removed penalization when routing publish failures to failure.
  • Centralized AMQP header insertion with consistent empty-key skip/warn behavior.
  • Added tests for oversized FlowFiles, non-penalized failures, and empty header keys in nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java:177.

@ing-mattioni ing-mattioni force-pushed the NIFI-15934-publishamqp-header-size branch from 6e5b2d4 to 9754c75 Compare May 18, 2026 11:04
@ing-mattioni
Copy link
Copy Markdown
Author

Addressed the Checkstyle failure in the latest update.

The issue was caused by tab characters in PublishAMQP.java, which resulted in FileTabCharacter and indentation violations. I replaced the tabs with spaces, reran the local validation, and force-pushed the updated signed commit.

@ing-mattioni
Copy link
Copy Markdown
Author

The remaining test failure appears to be unrelated to this PR. The failed test is in nifi-aws-processors:

org.apache.nifi.processors.aws.kinesis.stream.pause.TestStandardRecordProcessorBlocker.testBlockAndUnblock

The failure is a timeout while waiting for an asynchronous condition, while this PR only changes nifi-amqp-processors.

Copy link
Copy Markdown
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @ing-mattioni. Adding the property looks like a good direction. I provided a comment on the naming, and raised a question about the default value.

Comment on lines +100 to +101
public static final PropertyDescriptor MAX_OUTBOUND_MESSAGE_BODY_SIZE = new PropertyDescriptor.Builder()
.name("Max Outbound Message Body Size")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend naming this Maximum Input FlowFile Size since that it what is controls.

.name("Max Outbound Message Body Size")
.description("Maximum body size of outbound messages. FlowFiles larger than this value are routed to failure without loading content into memory.")
.required(true)
.defaultValue("64 MB")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a general published maximum size of AMQP messages, or some reference for using 64 MB is the default value?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants