Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
import org.apache.nifi.kafka.service.api.consumer.PollingContext;
import org.apache.nifi.kafka.service.api.consumer.RebalanceCallback;
import org.apache.nifi.kafka.service.api.consumer.SessionContext;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
Expand Down Expand Up @@ -430,77 +432,87 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final OffsetTracker offsetTracker = new OffsetTracker();
boolean recordsReceived = false;

while (System.currentTimeMillis() < stopTime) {
try {
final Duration maxWaitDuration = Duration.ofMillis(stopTime - System.currentTimeMillis());
if (maxWaitDuration.toMillis() <= 0) {
break;
}
final RebalanceSessionHolder sessionHolder = new RebalanceSessionHolder(session, offsetTracker);
consumerService.setSessionContext(sessionHolder);

final Iterator<ByteRecord> consumerRecords = consumerService.poll(maxWaitDuration).iterator();
if (!consumerRecords.hasNext()) {
getLogger().trace("No Kafka Records consumed: {}", pollingContext);
// Check if a rebalance occurred during poll - if so, break to commit what we have
if (consumerService.hasRevokedPartitions()) {
getLogger().debug("Rebalance detected with revoked partitions, breaking to commit session");
try {
while (System.currentTimeMillis() < stopTime) {
try {
final Duration maxWaitDuration = Duration.ofMillis(stopTime - System.currentTimeMillis());
if (maxWaitDuration.toMillis() <= 0) {
break;
}
continue;
}

recordsReceived = true;
processConsumerRecords(context, session, offsetTracker, consumerRecords);
final Iterator<ByteRecord> consumerRecords = consumerService.poll(maxWaitDuration).iterator();
if (!consumerRecords.hasNext()) {
getLogger().trace("No Kafka Records consumed: {}", pollingContext);
// Check if a rebalance occurred during poll - if so, break to commit what we have
if (consumerService.hasRevokedPartitions()) {
getLogger().debug("Rebalance detected with revoked partitions, breaking to commit session");
break;
}
continue;
}

// Check if a rebalance occurred during poll - if so, break to commit what we have
if (consumerService.hasRevokedPartitions()) {
getLogger().debug("Rebalance detected with revoked partitions, breaking to commit session");
break;
}
recordsReceived = true;
processConsumerRecords(context, session, offsetTracker, consumerRecords);

if (maxUncommittedSizeConfigured) {
// Stop consuming before reaching Max Uncommitted Time when exceeding Max Uncommitted Size
final long totalRecordSize = offsetTracker.getTotalRecordSize();
if (totalRecordSize > maxUncommittedSize) {
// Check if a rebalance occurred during poll - if so, break to commit what we have
if (consumerService.hasRevokedPartitions()) {
getLogger().debug("Rebalance detected with revoked partitions, breaking to commit session");
break;
}

if (maxUncommittedSizeConfigured) {
// Stop consuming before reaching Max Uncommitted Time when exceeding Max Uncommitted Size
final long totalRecordSize = offsetTracker.getTotalRecordSize();
if (totalRecordSize > maxUncommittedSize) {
break;
}
}
} catch (final Exception e) {
getLogger().error("Failed to consume Kafka Records", e);
consumerService.rollback();
close(consumerService, "Encountered Exception while consuming or writing out Kafka Records");
context.yield();
// If there are any FlowFiles already created and transferred, roll them back because we're rolling back offsets and
// because we will consume the data again, we don't want to transfer out the FlowFiles.
session.rollback();
return;
}
} catch (final Exception e) {
getLogger().error("Failed to consume Kafka Records", e);
consumerService.rollback();
close(consumerService, "Encountered Exception while consuming or writing out Kafka Records");
context.yield();
// If there are any FlowFiles already created and transferred, roll them back because we're rolling back offsets and
// because we will consume the data again, we don't want to transfer out the FlowFiles.
session.rollback();
return;
}
}

if (!recordsReceived && !consumerService.hasRevokedPartitions()) {
getLogger().trace("No Kafka Records consumed, re-queuing consumer");
consumerServices.offer(consumerService);
return;
}
if (!recordsReceived && !consumerService.hasRevokedPartitions()) {
getLogger().trace("No Kafka Records consumed, re-queuing consumer");
consumerServices.offer(consumerService);
return;
}

// If no records received but we have revoked partitions, we still need to commit their offsets
if (!recordsReceived && consumerService.hasRevokedPartitions()) {
getLogger().debug("No records received but rebalance occurred, committing offsets for revoked partitions");
try {
consumerService.commitOffsetsForRevokedPartitions();
} catch (final Exception e) {
getLogger().warn("Failed to commit offsets for revoked partitions", e);
// If no records received but we have revoked partitions, we still need to commit their offsets.
// Note: When a rebalance callback is registered (which is the case in this processor), offsets for
// revoked partitions are committed synchronously during onPartitionsRevoked(), so hasRevokedPartitions()
// will return false. This code path exists for backward compatibility when no callback is registered.
if (!recordsReceived && consumerService.hasRevokedPartitions()) {
getLogger().debug("No records received but rebalance occurred, committing offsets for revoked partitions");
try {
consumerService.commitOffsetsForRevokedPartitions();
} catch (final Exception e) {
getLogger().warn("Failed to commit offsets for revoked partitions", e);
}
consumerServices.offer(consumerService);
return;
}
consumerServices.offer(consumerService);
return;
}

session.commitAsync(
() -> commitOffsets(consumerService, offsetTracker, pollingContext, session),
throwable -> {
getLogger().error("Failed to commit session; will roll back any uncommitted records", throwable);
rollback(consumerService, offsetTracker, session);
context.yield();
});
session.commitAsync(
() -> commitOffsets(consumerService, offsetTracker, pollingContext, session),
throwable -> {
getLogger().error("Failed to commit session; will roll back any uncommitted records", throwable);
rollback(consumerService, offsetTracker, session);
context.yield();
});
} finally {
consumerService.setSessionContext(null);
}
}

private void commitOffsets(final KafkaConsumerService consumerService, final OffsetTracker offsetTracker, final PollingContext pollingContext, final ProcessSession session) {
Expand All @@ -513,7 +525,9 @@ private void commitOffsets(final KafkaConsumerService consumerService, final Off
});
}

// After successful session commit, also commit offsets for any partitions that were revoked during rebalance
// After successful session commit, also commit offsets for any partitions that were revoked during rebalance.
// Note: When a rebalance callback is registered, this check will always be false since offsets are
// committed synchronously during onPartitionsRevoked(). This code path is for backward compatibility.
if (consumerService.hasRevokedPartitions()) {
getLogger().debug("Committing offsets for partitions revoked during rebalance");
consumerService.commitOffsetsForRevokedPartitions();
Expand Down Expand Up @@ -688,7 +702,43 @@ private KafkaConsumerService getConsumerService(final ProcessContext context) {

getLogger().info("No Kafka Consumer Service available; creating a new one. Active count: {}", activeCount);
final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
return connectionService.getConsumerService(pollingContext);
final KafkaConsumerService newService = connectionService.getConsumerService(pollingContext);
newService.setRebalanceCallback(createRebalanceCallback());
return newService;
}

private RebalanceCallback createRebalanceCallback() {
return new RebalanceCallback() {
@Override
public void onPartitionsRevoked(final Collection<PartitionState> revokedPartitions, final SessionContext sessionContext) {
if (sessionContext == null) {
getLogger().debug("No session context during rebalance callback, nothing to commit");
return;
}

final RebalanceSessionHolder holder = (RebalanceSessionHolder) sessionContext;
final ProcessSession session = holder.session;
final OffsetTracker offsetTracker = holder.offsetTracker;

getLogger().info("Rebalance callback invoked for {} revoked partitions, committing session synchronously",
revokedPartitions.size());

try {
session.commit();
getLogger().debug("Session committed successfully during rebalance callback");

if (offsetTracker != null) {
offsetTracker.getRecordCounts().forEach((topic, count) -> {
session.adjustCounter("Records Acknowledged for " + topic, count, true);
});
offsetTracker.clear();
}
} catch (final Exception e) {
getLogger().error("Failed to commit session during rebalance callback", e);
throw new RuntimeException("Failed to commit session during rebalance", e);
}
}
};
}

private int getMaxConsumerCount() {
Expand Down Expand Up @@ -780,4 +830,13 @@ private PollingContext createPollingContext(final ProcessContext context, final
return pollingContext;
}

private static class RebalanceSessionHolder implements SessionContext {
private final ProcessSession session;
private final OffsetTracker offsetTracker;

RebalanceSessionHolder(final ProcessSession session, final OffsetTracker offsetTracker) {
this.session = session;
this.offsetTracker = offsetTracker;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ public PollingSummary getPollingSummary(final PollingContext pollingContext) {
}
return pollingSummary;
}

public void clear() {
offsets.clear();
recordCounts.clear();
totalRecordSize.set(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,44 @@ default void commitOffsetsForRevokedPartitions() {
*/
default void clearRevokedPartitions() {
}

/**
* Set a callback to be invoked during consumer group rebalance when partitions are revoked.
* The callback is invoked inside {@code onPartitionsRevoked()} before Kafka offsets are committed,
* allowing the processor to commit its session (FlowFiles) first.
* <p>
* This is critical for preventing both data loss and duplicates during rebalance:
* <ul>
* <li>The callback commits the NiFi session (FlowFiles) first</li>
* <li>Then Kafka offsets are committed while consumer is still in valid state</li>
* </ul>
* </p>
*
* @param callback the callback to invoke during rebalance, or null to clear
*/
default void setRebalanceCallback(RebalanceCallback callback) {
}

/**
* Set a session context object that will be associated with this consumer service instance.
* This is used to store processor-specific state (like session and offset tracker) that
* needs to be accessed during rebalance callbacks.
* <p>
* Each consumer service instance should have its own session context to ensure thread safety
* when multiple concurrent tasks are using different consumer services from a pool.
* </p>
*
* @param sessionContext the context object to associate with this service, or null to clear
*/
default void setSessionContext(SessionContext sessionContext) {
}

/**
* Get the session context object associated with this consumer service instance.
*
* @return the session context, or null if not set
*/
default SessionContext getSessionContext() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.service.api.consumer;

import org.apache.nifi.kafka.service.api.common.PartitionState;

import java.util.Collection;

/**
* Callback interface for handling Kafka consumer group rebalance events.
* <p>
* When a rebalance occurs and partitions are being revoked from a consumer,
* Kafka calls {@code onPartitionsRevoked()} on the ConsumerRebalanceListener.
* This is the ONLY time when the consumer is still in a valid state to commit offsets.
* After this callback returns, the consumer is no longer part of an active group
* and any commit attempts will fail with RebalanceInProgressException.
* </p>
* <p>
* This callback allows processors to be notified during the rebalance so they can
* commit their session (FlowFiles) before Kafka offsets are committed. This ensures:
* <ul>
* <li>No data loss: NiFi session is committed before Kafka offsets</li>
* <li>No duplicates: Kafka offsets are committed while consumer is still valid</li>
* </ul>
* </p>
*/
@FunctionalInterface
public interface RebalanceCallback {

/**
* Called during {@code onPartitionsRevoked()} when partitions with uncommitted offsets
* are being revoked from this consumer.
* <p>
* The implementation should commit any pending work (e.g., NiFi session with FlowFiles)
* for the specified partitions. After this method returns, Kafka offsets will be committed
* for these partitions.
* </p>
*
* @param revokedPartitions the partitions being revoked that have uncommitted offsets
* @param sessionContext the session context stored in the consumer service, containing
* processor-specific state (e.g., ProcessSession and OffsetTracker)
*/
void onPartitionsRevoked(Collection<PartitionState> revokedPartitions, SessionContext sessionContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.service.api.consumer;

/**
* Marker interface for session context objects stored in KafkaConsumerService.
* <p>
* Implementations of this interface hold processor-specific state (such as ProcessSession
* and offset tracking information) that needs to be accessed during rebalance callbacks.
* Each consumer service instance should have its own session context to ensure thread safety
* when multiple concurrent tasks are using different consumer services from a pool.
* </p>
*/
public interface SessionContext {
}
Loading
Loading