NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Arr…#10981
NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Arr…#10981agturley wants to merge 3 commits intoapache:mainfrom
Conversation
…ay, and Single JSON input formats with size-based batching
pvillard31
left a comment
There was a problem hiding this comment.
Few comments after having a quick look through the changes.
...-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
Outdated
Show resolved
Hide resolved
...-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
Outdated
Show resolved
Hide resolved
...-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
Outdated
Show resolved
Hide resolved
...-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
Outdated
Show resolved
Hide resolved
...-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
Show resolved
Hide resolved
| chunkBytes += docBytes; | ||
| totalBytesAccumulated += docBytes; | ||
| if (chunkBytes >= maxBatchBytes) { | ||
| flushChunk(operations, operationFlowFiles, errorFlowFiles, flowFile, pendingBulkErrors, context, session); |
There was a problem hiding this comment.
When flushChunk throws an ElasticsearchException, the handler combines operationFlowFiles and allProcessedFlowFiles and sends all of them to retry/failure. The FlowFiles in allProcessedFlowFiles have already been successfully indexed by prior flushChunk calls. Routing them to retry could cause duplicate indexing when re-processed. Not saying this should be done differently but just mentioning that it could cause duplicates.
There was a problem hiding this comment.
If such behaviour is retained, I'd suggest it needs to be clearly documented as I think it would be a change to how things were handled previously, and the duplication could cause problems for some systems
There was a problem hiding this comment.
I'm not a huge fan of the potential to have duplicates, this seems to work.... When a bulk request comes back from Elasticsearch, we look at which individual documents failed. Instead of buffering the raw bytes of every document as we go (which would hold the entire file in memory twice), we just record the index of each failed document — a small integer per error, regardless of document size.
Once all chunks for a FlowFile are processed, we route it:
No errors: clone the original FlowFile straight to REL_SUCCESSFUL. Zero extra I/O.
Single JSON input (one doc per FlowFile): if it errored, clone it to REL_ERRORS. If not, clone to REL_SUCCESSFUL. Again, no re-read needed.
NDJSON or JSON Array input with at least one error: we do a single re-read of the original FlowFile and split it in one pass into two streams: one for the failed records (REL_ERRORS) and one for the successful records (REL_SUCCESSFUL). Both outputs are written as clean NDJSON with no trailing newline.
The re-read only happens for FlowFiles that actually had partial failures in NDJSON/JSON Array format, so the common happy path (all docs succeed) is just a cheap clone with no extra I/O.
...-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
Outdated
Show resolved
Hide resolved
...cessors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
Outdated
Show resolved
Hide resolved
...ch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
Outdated
Show resolved
Hide resolved
...-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
Outdated
Show resolved
Hide resolved
…ay, and Single JSON input formats with size-based batching
|
Finished round1 of your suggestions, please let me know your thoughts on the error handling shenanigans I'm trying. I'll be doing high volumes testing tomorrow and report back. |
…ay, and Single JSON input formats with size-based batching
…ay, and Single JSON input formats with size-based batching
Summary
NIFI-15681
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation