Skip to content

NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Arr…#10981

Open
agturley wants to merge 3 commits intoapache:mainfrom
agturley:NIFI-15681
Open

NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Arr…#10981
agturley wants to merge 3 commits intoapache:mainfrom
agturley:NIFI-15681

Conversation

@agturley
Copy link
Contributor

@agturley agturley commented Mar 8, 2026

…ay, and Single JSON input formats with size-based batching

Summary

NIFI-15681

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

…ay, and Single JSON input formats with size-based batching
Copy link
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments after having a quick look through the changes.

chunkBytes += docBytes;
totalBytesAccumulated += docBytes;
if (chunkBytes >= maxBatchBytes) {
flushChunk(operations, operationFlowFiles, errorFlowFiles, flowFile, pendingBulkErrors, context, session);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When flushChunk throws an ElasticsearchException, the handler combines operationFlowFiles and allProcessedFlowFiles and sends all of them to retry/failure. The FlowFiles in allProcessedFlowFiles have already been successfully indexed by prior flushChunk calls. Routing them to retry could cause duplicate indexing when re-processed. Not saying this should be done differently but just mentioning that it could cause duplicates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If such behaviour is retained, I'd suggest it needs to be clearly documented as I think it would be a change to how things were handled previously, and the duplication could cause problems for some systems

Copy link
Contributor Author

@agturley agturley Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of the potential to have duplicates, this seems to work.... When a bulk request comes back from Elasticsearch, we look at which individual documents failed. Instead of buffering the raw bytes of every document as we go (which would hold the entire file in memory twice), we just record the index of each failed document — a small integer per error, regardless of document size.

Once all chunks for a FlowFile are processed, we route it:

No errors: clone the original FlowFile straight to REL_SUCCESSFUL. Zero extra I/O.

Single JSON input (one doc per FlowFile): if it errored, clone it to REL_ERRORS. If not, clone to REL_SUCCESSFUL. Again, no re-read needed.

NDJSON or JSON Array input with at least one error: we do a single re-read of the original FlowFile and split it in one pass into two streams: one for the failed records (REL_ERRORS) and one for the successful records (REL_SUCCESSFUL). Both outputs are written as clean NDJSON with no trailing newline.
The re-read only happens for FlowFiles that actually had partial failures in NDJSON/JSON Array format, so the common happy path (all docs succeed) is just a cheap clone with no extra I/O.

…ay, and Single JSON input formats with size-based batching
@agturley
Copy link
Contributor Author

agturley commented Mar 10, 2026

Finished round1 of your suggestions, please let me know your thoughts on the error handling shenanigans I'm trying. I'll be doing high volumes testing tomorrow and report back.

@agturley agturley requested a review from pvillard31 March 10, 2026 01:48
…ay, and Single JSON input formats with size-based batching
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants