Skip to content

NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th…#10964

Open
markap14 wants to merge 6 commits intoapache:mainfrom
markap14:NIFI-15669
Open

NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th…#10964
markap14 wants to merge 6 commits intoapache:mainfrom
markap14:NIFI-15669

Conversation

@markap14
Copy link
Contributor

@markap14 markap14 commented Mar 4, 2026

…is provides much faster startup times and drastically reduces heap utilization when using Enhanced Fan Out (EFO) mode.

Summary

NIFI-00000

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@exceptionfactory exceptionfactory self-assigned this Mar 4, 2026
Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the extensive work on redesigning this Processor @markap14!

I plan to do a more thorough review, for now, highlighting an integration test failure that may point to some unstable expectations.

Error:  Tests run: 18, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 130.6 s <<< FAILURE! -- in org.apache.nifi.processors.aws.kinesis.ConsumeKinesisIT
Error:  org.apache.nifi.processors.aws.kinesis.ConsumeKinesisIT.testKplMultipleAggregatedRecords -- Time elapsed: 4.039 s <<< FAILURE!
org.opentest4j.AssertionFailedError: 3 aggregated records x 5 sub-records each ==> expected: <15> but was: <0>
	at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:158)
	at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:139)
	at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:201)
	at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:152)
	at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:590)
	at org.apache.nifi.processors.aws.kinesis.ConsumeKinesisIT.testKplMultipleAggregatedRecords(ConsumeKinesisIT.java:618)

@markap14 markap14 force-pushed the NIFI-15669 branch 3 times, most recently from 62fb403 to a7366a0 Compare March 5, 2026 19:40
Copy link
Contributor

@awelless awelless left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed source files only so far.
Stylistic comments are marked with "nit".

final TableSchema destinationSchema) {
return switch (destinationSchema) {
case NEW -> item;
case LEGACY -> convertToLegacyItem(item);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the intention in supporting lease table conversion to the old KCL format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good catch. After I did some refactoring in how the migration works, this is no longer actually necessary. Will remove.

}

final AttributeValue sequenceNumber = item.get("sequenceNumber");
final String leaseKey = streamName.s() + ":" + shardIdValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This depends on whether KCL is configured in a single- or multi-stream mode. If it's single stream, only shard id is a part of the lease key.

The maximum size of the buffer is controlled by the 'Max Bytes to Buffer' property.
In addition, the processor may cache some amount of data for each shard when the processor's buffer is full.""")
ConsumeKinesis buffers Kinesis Records in memory until they can be processed. \
The maximum size of the buffer is controlled by the 'Max Batch Size' property.""")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Max Batch Size determines how much data we write in a single task execution.
It doesn't configure the buffer caches, which is 500 GetRecords results for polling and {number of active shards} for EFO.

private static final long QUEUE_POLL_TIMEOUT_MILLIS = 100;
private static final Duration API_CALL_TIMEOUT = Duration.ofSeconds(30);
private static final Duration API_CALL_ATTEMPT_TIMEOUT = Duration.ofSeconds(10);
private static final byte[] NEWLINE_DELIMITER = new byte[] {'\n'};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: shall we use System.lineSeparator() instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The separator should not depend on the OS of the host.


Using a larger value may increase the throughput, but will do so at the expense of using more memory.
""")
static final PropertyDescriptor MAX_RECORDS_PER_REQUEST = new PropertyDescriptor.Builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property is used only when CONSUMER_TYPE is SHARED_THROUGHPUT. We shouldn't display it for EFO consumers.

Comment on lines +362 to +364
private volatile String lastQueuedSequenceNumber;
private volatile String lastOnNextMaxSequence;
private final AtomicReference<String> lastAcknowledgedSequenceNumber = new AtomicReference<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastQueuedSequenceNumber and lastAcknowledgedSequenceNumber are used only to calculate a sequence number to start reading data from during a subscription restart.
It seems that lastQueuedSequenceNumber can always be used, since we don't purge queues in KinesisConsumerClient.

Also lastQueuedSequenceNumber is the same as lastOnNextMaxSequence.

}
shardConsumers.clear();

deregisterEfoConsumer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When NiFi scales down, the processor is stopped on the node being decommissioned, right?
Meaning this node will deregister the consumer, while the other active nodes are still subscribed to it.
The efo consumer is created in initialize only, thus after decommissioning a node the processors will be stuck until restarted.
If the above is correct, then we shouldn't deregister the consumer, if there are other nodes using it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I hadn't considered that case. Will update it to only deregister in @OnRemoved so that if the processor is deleted from the canvas it will deregister it.

}
}

if (totalQueuedResults() >= MAX_QUEUED_RESULTS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not fair. There a risk a consumer of a particular shard will be always sleeping.
Shall we either:

  1. Not track totalQueuedResults and use a batch per shard approach as done in efo consumer?
  2. Use a fair semaphore to handle cache limits instead of simple sleep?

return null;
} catch (final SdkClientException e) {
if (!state.isStopped()) {
logger.warn("GetRecords timed out for shard {}; will retry with existing iterator", shardId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this isn't necessarily a timeout error, right?

for (final ShardFetchResult result : results) {
final PollingShardState state = pollingShardStates.get(result.shardId());
if (state != null) {
state.requestReset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rollback to the latest sequence number instead? The records are still kept in the queue in KinesisConsumeClient.
This might cause out of order delivery - test with repro.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should drain the queue while still holding the shard lease. I.e. here, in the rollbackResults.
Since reset doesn't happen immediately, so there is a window when the lease can be acquired, subsequent records polled from the queue, before the reset happens. Repro.

But we need to make sure that while draining the queue we don't fetch a new batch of records. Otherwise we'd have to drain it as well.

Comment on lines +587 to +588
shardManager.writeCheckpoints(batch.checkpoints());
consumerClient.acknowledgeResults(accepted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If writeCheckpoints fails, we don't acknowledgeResults in this callback. It seems efo consumer will be stuck in that situation, as we request next records in the acknowledgement. Shall we swap these operations? Or have a ladder with try {} finally {} statement.

…is provides much faster startup times and drastically reduces heap utilization when using Enhanced Fan-Out (EFO) mode.
@markap14
Copy link
Contributor Author

Thanks for the thorough review @awelless! I did several refactorings of this PR before pushing it, and it looks like I did a pretty poor job of cleaning up a couple of the approaches that I'd taken. Should be in much better shape now! And you caught a few interesting points that I'd not considered, as well! I pushed a new commit that I think addresses everything. Added some additional tests. Pushed 30,085,000 records to a Kinesis Stream and then consumed all using both EFO and Shared Throughput mode to ensure that all data was consumed in exactly the correct order without any duplicates and to ensure that performance was as expected. All looks good!

…use we always include all sub-records within a single ProcessSession so we don't need to checkpoint partial sequences
Copy link
Contributor

@awelless awelless left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment about race condition is the one requiring attention. The rest is optional.

@Override
public void migrateProperties(final PropertyConfiguration config) {
ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we really move Max Bytes to Buffer to Max Batch Size? These are different properties.
The default value of 100 MB for buffer size might be too much for the batch size.

.build();

static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP, ProxySpec.HTTP_AUTH);
static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then should we have separate endpoints for Kinesis and DynamoDB?
In Localstack that's the same endpoint for each service, but this might not be the case for production scenarios.

return;
}

final Set<String> ownedShardIds = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
final Set<String> ownedShardIds = new HashSet<>();
final Set<String> ownedShardIds = HashSet.newHashSet(ownedShards.size());

private void shutdownScheduler() {
if (kinesisScheduler.shutdownComplete()) {
@OnRemoved
public void onRemoved(final ProcessContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach to deregistering consumers looks good to me.
What should happen when the processor's CONSUMER_TYPE changes? Now we wait for the processor to be removed. Furthermore, if the APPLICATION_NAME is changed, the old consumer will be orphaned.

Should we deregister the consumer immediately when CONSUMER_TYPE changes to SHARED_THROUGHPUT or when CONSUMER_TYPE is efo and APPLICATION_NAME changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, but I don't think it's worth the effort. If user does that and wants to reclaim the slot, they can do so manually in AWS console.

resultsByShard.computeIfAbsent(result.shardId(), k -> new ArrayList<>()).add(result);
}
for (final List<ShardFetchResult> shardResults : resultsByShard.values()) {
shardResults.sort(Comparator.comparing(ShardFetchResult::firstSequenceNumber));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really need to sort the results here.
Since we're consuming data from a queue in KinesisConsumerClient, we should have the data ordered by sequence numbers already, right? We operate on lists everywhere, so the order is preserved.

for (final ShardFetchResult result : results) {
final PollingShardState state = pollingShardStates.get(result.shardId());
if (state != null) {
state.requestReset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should drain the queue while still holding the shard lease. I.e. here, in the rollbackResults.
Since reset doesn't happen immediately, so there is a window when the lease can be acquired, subsequent records polled from the queue, before the reset happens. Repro.

But we need to make sure that while draining the queue we don't fetch a new batch of records. Otherwise we'd have to drain it as well.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting together this major refactor @markap14, the approach looks good in general. I'm planning on further review, but noted a handful of mostly minor recommendations thus far.

Thread.sleep(TABLE_POLL_MILLIS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new ProcessException("Interrupted while waiting for DynamoDB table to become ACTIVE", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new ProcessException("Interrupted while waiting for DynamoDB table to become ACTIVE", e);
throw new ProcessException("Interrupted while waiting for DynamoDB table [%s] to become ACTIVE".formatted(tableName), e);

Thread.sleep(TABLE_POLL_MILLIS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new ProcessException("Interrupted while waiting for DynamoDB table deletion", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new ProcessException("Interrupted while waiting for DynamoDB table deletion", e);
throw new ProcessException("Interrupted while waiting for DynamoDB table [%s] deletion".formatted(tableName), e);

if (keySchema.size() == 2
&& hasKey(keySchema, "streamName", KeyType.HASH)
&& hasKey(keySchema, "shardId", KeyType.RANGE)) {
return TableSchema.NEW;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamName and sharedId appear to be used multiple times in multiple methods, which seem look good candidates for private static final Strings.

* @param data the user payload bytes
* @param approximateArrivalTimestamp approximate time the enclosing record arrived at Kinesis
*/
record DeaggregatedRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the description, what do you think about naming this UserRecord, DistinctRecord or DataRecord? The User prefix sounds a bit related to identity, but aligns with the description. Distinct or similar may be an option.

* per shard via HTTP/2. Uses Reactive Streams demand-driven backpressure to control the
* rate of event delivery.
*/
final class EfoKinesisClient extends KinesisConsumerClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about spelling out EnhancedFanOutKinesisClient for clarity since Efo is not a common acronym.

}
} else if (!existing.isExhausted() && !existing.isStopped() && !existing.isLoopRunning()
&& existing.tryStartLoop()) {
logger.warn("Restarting dead fetch loop for shard {}", shardId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to include the streamName

});
} catch (final RejectedExecutionException e) {
state.markLoopStopped();
logger.debug("Executor shut down; cannot start fetch loop for shard {}", shardId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.debug("Executor shut down; cannot start fetch loop for shard {}", shardId);
logger.debug("Executor shut down; cannot start fetch loop for stream [{}] shard [{}]", streamName, shardId);

}
} catch (final Exception e) {
if (!state.isStopped()) {
logger.error("Unexpected error in fetch loop for shard {}; will retry", shardId, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be logged as a warning if it is going to be retried?

Comment on lines +48 to +50
"streamName", AttributeValue.builder().s("my-stream").build(),
"shardId", AttributeValue.builder().s("shardId-0001").build(),
"sequenceNumber", AttributeValue.builder().s("12345").build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend declaring static variables for the map keys and values that can be reused across methods.


<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This library brings in Apache HTTP Client 4, which has limited updates. The url-connection-client does not have all the flexibility, but what do you think about using it instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the url-connection-client does not support proxies directly. And it doesn't support connection pooling with a max, which we're depending on here. Fortunately, though, we can upgrade to Apache HTTP Client 5, which I think makes a lot of sense.

@markap14
Copy link
Contributor Author

Thanks @exceptionfactory I think all of your feedback makes sense. I pushed a new commit that incorporates all of it and switches to Apache HTTP Client 5 instead of version 4.

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<artifactId>apache5-client</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently nifi-aws-service-api-nar doesn't bring apache5-client as a dependency. We should either add it to that nar or remove it from this list.

Before adding it into nifi-aws-service-api I was getting ClassNotFoundException for Apache 5 http client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D'oh! I added it to the api nar but it looks like i didn't include that in the commit 🤦 Will have that up shortly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants