From 2b4b582fdbacb1f1b6a23f54adfff89fc3a3ed95 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 30 Apr 2026 09:04:26 +0200 Subject: [PATCH 01/10] fix: Patch NiFi to avoid FlowSynchronizationException issue --- ...e-disabled-before-calling-updateCont.patch | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 nifi/stackable/patches/2.9.0/0006-check-services-are-disabled-before-calling-updateCont.patch diff --git a/nifi/stackable/patches/2.9.0/0006-check-services-are-disabled-before-calling-updateCont.patch b/nifi/stackable/patches/2.9.0/0006-check-services-are-disabled-before-calling-updateCont.patch new file mode 100644 index 000000000..bf20b01b9 --- /dev/null +++ b/nifi/stackable/patches/2.9.0/0006-check-services-are-disabled-before-calling-updateCont.patch @@ -0,0 +1,65 @@ +From bc590ca0aceb644e0bd03d48ff48e77897bccfad Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Wed, 29 Apr 2026 17:32:02 +0200 +Subject: check services are disabled before calling updateControllerService + +--- + ...tandardVersionedComponentSynchronizer.java | 40 ++++++++++++++++--- + 1 file changed, 34 insertions(+), 6 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index 092d2f7e7b..dbdbe166f3 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -745,17 +745,45 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + updateControllerService(addedService, proposedService, topLevelGroup); + } + +- // Update all of the Controller Services to match the VersionedControllerService ++ // Update all Controller Services to match the VersionedControllerService. ++ // Services may be ENABLED here if the outer "affected components" pass did not ++ // disable them (e.g. COMPONENT_ADDED diffs are skipped by AffectedComponentSet). ++ // We must disable before calling updateControllerService, which calls setProperties ++ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. ++ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); + for (final Map.Entry entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { +- updateControllerService(service, proposedService, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); +- LOG.info("Updated {}", service); ++ final Set referencesToRestart = new HashSet<>(); ++ final Set servicesToRestart = new HashSet<>(); ++ ++ try { ++ try { ++ stopControllerService(service, proposedService, stopTimeout, ++ syncOptions.getComponentStopTimeoutAction(), ++ referencesToRestart, servicesToRestart, syncOptions); ++ } catch (final FlowSynchronizationException | TimeoutException e) { ++ LOG.error("Failed to stop Controller Service {} before updating", service, e); ++ throw new RuntimeException(e); ++ } catch (final InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ LOG.error("Interrupted while stopping Controller Service {}", service, e); ++ throw new RuntimeException(e); ++ } ++ updateControllerService(service, proposedService, topLevelGroup); ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); ++ LOG.info("Updated {}", service); ++ } finally { ++ // Re-enable services and restart components that were stopped for the update, ++ // restoring the controller to its pre-update running state. ++ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { ++ context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart); ++ context.getControllerServiceProvider().scheduleReferencingComponents( ++ service, referencesToRestart, context.getComponentScheduler()); ++ } ++ } + } + } + From ce80df87b539ef7d9d74ec82790236d4b912259c Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 30 Apr 2026 13:40:33 +0200 Subject: [PATCH 02/10] propagate exception --- ...e-disabled-before-calling-updateCont.patch | 76 ++++++++++++++++--- 1 file changed, 66 insertions(+), 10 deletions(-) diff --git a/nifi/stackable/patches/2.9.0/0006-check-services-are-disabled-before-calling-updateCont.patch b/nifi/stackable/patches/2.9.0/0006-check-services-are-disabled-before-calling-updateCont.patch index bf20b01b9..f8f6f1082 100644 --- a/nifi/stackable/patches/2.9.0/0006-check-services-are-disabled-before-calling-updateCont.patch +++ b/nifi/stackable/patches/2.9.0/0006-check-services-are-disabled-before-calling-updateCont.patch @@ -1,17 +1,66 @@ -From bc590ca0aceb644e0bd03d48ff48e77897bccfad Mon Sep 17 00:00:00 2001 +From 8f7007ac0ac2f83e27666d865727335156b5053b Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 29 Apr 2026 17:32:02 +0200 Subject: check services are disabled before calling updateControllerService --- - ...tandardVersionedComponentSynchronizer.java | 40 ++++++++++++++++--- - 1 file changed, 34 insertions(+), 6 deletions(-) + ...tandardVersionedComponentSynchronizer.java | 54 ++++++++++++++----- + 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java -index 092d2f7e7b..dbdbe166f3 100644 +index 092d2f7e7b..7cbcc8c6dc 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java -@@ -745,17 +745,45 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen +@@ -270,8 +270,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), + additions.getParameterContexts(), additions.getParameterProviders(), group); + additionsBuilder.addProcessGroup(newProcessGroup); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -386,8 +386,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); + synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), + parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -416,7 +416,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + + // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we + // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it, +@@ -691,7 +691,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map childGroupsByVersionedId, final Map parameterProviderReferences, +- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { + + for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { + final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); +@@ -711,7 +711,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } + + private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, +- final ProcessGroup topLevelGroup) { ++ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each +@@ -745,17 +745,43 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen updateControllerService(addedService, proposedService, topLevelGroup); } @@ -40,13 +89,11 @@ index 092d2f7e7b..dbdbe166f3 100644 + stopControllerService(service, proposedService, stopTimeout, + syncOptions.getComponentStopTimeoutAction(), + referencesToRestart, servicesToRestart, syncOptions); -+ } catch (final FlowSynchronizationException | TimeoutException e) { -+ LOG.error("Failed to stop Controller Service {} before updating", service, e); -+ throw new RuntimeException(e); ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); -+ LOG.error("Interrupted while stopping Controller Service {}", service, e); -+ throw new RuntimeException(e); ++ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); + } + updateControllerService(service, proposedService, topLevelGroup); + createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); @@ -63,3 +110,12 @@ index 092d2f7e7b..dbdbe166f3 100644 } } +@@ -1379,7 +1405,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, + final Map versionedParameterContexts, +- final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { + final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); + final String connectorId = destination.getConnectorIdentifier().orElse(null); + final ProcessGroup group = context.getFlowManager().createProcessGroup(id, connectorId); From 8cc799f34ad6fa68393c5b36aaee32d1adaf8288 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 30 Apr 2026 18:09:01 +0200 Subject: [PATCH 03/10] add patch for 2.7.2 --- ...re-disabled-before-calling-updateCon.patch | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 nifi/stackable/patches/2.7.2/0008-check-services-are-disabled-before-calling-updateCon.patch diff --git a/nifi/stackable/patches/2.7.2/0008-check-services-are-disabled-before-calling-updateCon.patch b/nifi/stackable/patches/2.7.2/0008-check-services-are-disabled-before-calling-updateCon.patch new file mode 100644 index 000000000..06414c398 --- /dev/null +++ b/nifi/stackable/patches/2.7.2/0008-check-services-are-disabled-before-calling-updateCon.patch @@ -0,0 +1,121 @@ +From 3893ce8d059f7d9365ba48b3f5e4f898701b71e8 Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Thu, 30 Apr 2026 18:05:11 +0200 +Subject: check services are disabled before calling updateControllerService + +--- + ...tandardVersionedComponentSynchronizer.java | 54 ++++++++++++++----- + 1 file changed, 40 insertions(+), 14 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index b79ee4d6e8..d5b4d8c36e 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -269,8 +269,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), + additions.getParameterContexts(), additions.getParameterProviders(), group); + additionsBuilder.addProcessGroup(newProcessGroup); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -392,8 +392,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); + synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), + parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -422,7 +422,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + + // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we + // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it, +@@ -687,7 +687,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map childGroupsByVersionedId, final Map parameterProviderReferences, +- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { + + for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { + final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); +@@ -707,7 +707,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } + + private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, +- final ProcessGroup topLevelGroup) { ++ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each +@@ -741,17 +741,43 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + updateControllerService(addedService, proposedService, topLevelGroup); + } + +- // Update all of the Controller Services to match the VersionedControllerService ++ // Update all Controller Services to match the VersionedControllerService. ++ // Services may be ENABLED here if the outer "affected components" pass did not ++ // disable them (e.g. COMPONENT_ADDED diffs are skipped by AffectedComponentSet). ++ // We must disable before calling updateControllerService, which calls setProperties ++ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. ++ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); + for (final Map.Entry entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { +- updateControllerService(service, proposedService, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); +- LOG.info("Updated {}", service); ++ final Set referencesToRestart = new HashSet<>(); ++ final Set servicesToRestart = new HashSet<>(); ++ ++ try { ++ try { ++ stopControllerService(service, proposedService, stopTimeout, ++ syncOptions.getComponentStopTimeoutAction(), ++ referencesToRestart, servicesToRestart, syncOptions); ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); ++ } catch (final InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); ++ } ++ updateControllerService(service, proposedService, topLevelGroup); ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); ++ LOG.info("Updated {}", service); ++ } finally { ++ // Re-enable services and restart components that were stopped for the update, ++ // restoring the controller to its pre-update running state. ++ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { ++ context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart); ++ context.getControllerServiceProvider().scheduleReferencingComponents( ++ service, referencesToRestart, context.getComponentScheduler()); ++ } ++ } + } + } + +@@ -1375,7 +1401,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, + final Map versionedParameterContexts, +- final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { + final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); + final ProcessGroup group = context.getFlowManager().createProcessGroup(id); + group.setVersionedComponentId(proposed.getIdentifier()); From 6c9b67ec90744f2f11f0970abe80541364faed5c Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Mon, 4 May 2026 11:38:02 +0200 Subject: [PATCH 04/10] add patch for 2.6.0 --- ...re-disabled-before-calling-updateCon.patch | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 nifi/stackable/patches/2.6.0/0008-check-services-are-disabled-before-calling-updateCon.patch diff --git a/nifi/stackable/patches/2.6.0/0008-check-services-are-disabled-before-calling-updateCon.patch b/nifi/stackable/patches/2.6.0/0008-check-services-are-disabled-before-calling-updateCon.patch new file mode 100644 index 000000000..73b5dacc5 --- /dev/null +++ b/nifi/stackable/patches/2.6.0/0008-check-services-are-disabled-before-calling-updateCon.patch @@ -0,0 +1,121 @@ +From f19aa11944fa70bf6706aeb619813faece1a606d Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Mon, 4 May 2026 11:33:36 +0200 +Subject: check services are disabled before calling updateControllerService + +--- + ...tandardVersionedComponentSynchronizer.java | 54 ++++++++++++++----- + 1 file changed, 40 insertions(+), 14 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index b79ee4d6e8..0310e57be9 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -269,8 +269,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), + additions.getParameterContexts(), additions.getParameterProviders(), group); + additionsBuilder.addProcessGroup(newProcessGroup); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -392,8 +392,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); + synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), + parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -422,7 +422,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + + // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we + // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it, +@@ -687,7 +687,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map childGroupsByVersionedId, final Map parameterProviderReferences, +- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { + + for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { + final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); +@@ -707,7 +707,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } + + private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, +- final ProcessGroup topLevelGroup) { ++ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each +@@ -741,17 +741,43 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + updateControllerService(addedService, proposedService, topLevelGroup); + } + +- // Update all of the Controller Services to match the VersionedControllerService ++ // Update all Controller Services to match the VersionedControllerService. ++ // Services may be ENABLED here if the outer "affected components" pass did not ++ // disable them (e.g. COMPONENT_ADDED diffs are skipped by AffectedComponentSet). ++ // We must disable before calling updateControllerService, which calls setProperties ++ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. ++ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); + for (final Map.Entry entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { +- updateControllerService(service, proposedService, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); +- LOG.info("Updated {}", service); ++ final Set referencesToRestart = new HashSet<>(); ++ final Set servicesToRestart = new HashSet<>(); ++ ++ try { ++ try { ++ stopControllerService(service, proposedService, stopTimeout, ++ syncOptions.getComponentStopTimeoutAction(), ++ referencesToRestart, servicesToRestart, syncOptions); ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); ++ } catch (final InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); ++ } ++ updateControllerService(service, proposedService, topLevelGroup); ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); ++ LOG.info("Updated {}", service); ++ } finally { ++ // Re-enable services and restart components that were stopped for the update, ++ // restoring the controller to its pre-update running state. ++ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { ++ context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart); ++ context.getControllerServiceProvider().scheduleReferencingComponents( ++ service, referencesToRestart, context.getComponentScheduler()); ++ } ++ } + } + } + +@@ -1375,7 +1401,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, + final Map versionedParameterContexts, +- final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { + final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); + final ProcessGroup group = context.getFlowManager().createProcessGroup(id); + group.setVersionedComponentId(proposed.getIdentifier()); From e449e667d78e322cbb70c972fb2c09bbd3efae03 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Mon, 4 May 2026 17:58:30 +0200 Subject: [PATCH 05/10] rename patches to be trackable --- ...01-check-services-are-disabled-before-calling-updateCon.patch} | 0 ...01-check-services-are-disabled-before-calling-updateCon.patch} | 0 ...1-check-services-are-disabled-before-calling-updateCont.patch} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename nifi/stackable/patches/2.6.0/{0008-check-services-are-disabled-before-calling-updateCon.patch => 0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch} (100%) rename nifi/stackable/patches/2.7.2/{0008-check-services-are-disabled-before-calling-updateCon.patch => 0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch} (100%) rename nifi/stackable/patches/2.9.0/{0006-check-services-are-disabled-before-calling-updateCont.patch => 0006-NIFI-15901-check-services-are-disabled-before-calling-updateCont.patch} (100%) diff --git a/nifi/stackable/patches/2.6.0/0008-check-services-are-disabled-before-calling-updateCon.patch b/nifi/stackable/patches/2.6.0/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch similarity index 100% rename from nifi/stackable/patches/2.6.0/0008-check-services-are-disabled-before-calling-updateCon.patch rename to nifi/stackable/patches/2.6.0/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch diff --git a/nifi/stackable/patches/2.7.2/0008-check-services-are-disabled-before-calling-updateCon.patch b/nifi/stackable/patches/2.7.2/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch similarity index 100% rename from nifi/stackable/patches/2.7.2/0008-check-services-are-disabled-before-calling-updateCon.patch rename to nifi/stackable/patches/2.7.2/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch diff --git a/nifi/stackable/patches/2.9.0/0006-check-services-are-disabled-before-calling-updateCont.patch b/nifi/stackable/patches/2.9.0/0006-NIFI-15901-check-services-are-disabled-before-calling-updateCont.patch similarity index 100% rename from nifi/stackable/patches/2.9.0/0006-check-services-are-disabled-before-calling-updateCont.patch rename to nifi/stackable/patches/2.9.0/0006-NIFI-15901-check-services-are-disabled-before-calling-updateCont.patch From 32b0272ecb9f4212962d59fdb7f1d773da8379c1 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 5 May 2026 10:10:16 +0200 Subject: [PATCH 06/10] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c27667d1..eaefb8030 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file. - hadoop: Add precompiled hadoop for later reuse in dependent images ([#1466], [#1474]). - nifi: Add version `2.9.0` ([#1463]). +- nifi: Backport NIFI-15901 to 2.x versions ([#1481]). ### Changed @@ -29,6 +30,7 @@ All notable changes to this project will be documented in this file. [#1471]: https://github.com/stackabletech/docker-images/pull/1471 [#1474]: https://github.com/stackabletech/docker-images/pull/1474 [#1476]: https://github.com/stackabletech/docker-images/pull/1476 +[#1481]: https://github.com/stackabletech/docker-images/pull/1481 ## [26.3.0] - 2026-03-16 From d013bb6047f9c5811042ffa6185ac27169a311f8 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 5 May 2026 16:59:48 +0200 Subject: [PATCH 07/10] add patch for sibling issue NIFI-15801 before (updated) NIFI-15901 patch --- ...rocessors-in-synchronizeProcessors-.patch} | 108 +++++++++--------- ...le-controller-services-before-updati.patch | 76 ++++++++++++ 2 files changed, 128 insertions(+), 56 deletions(-) rename nifi/stackable/patches/2.9.0/{0006-NIFI-15901-check-services-are-disabled-before-calling-updateCont.patch => 0006-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch} (56%) create mode 100644 nifi/stackable/patches/2.9.0/0007-NIFI-15901-Disable-controller-services-before-updati.patch diff --git a/nifi/stackable/patches/2.9.0/0006-NIFI-15901-check-services-are-disabled-before-calling-updateCont.patch b/nifi/stackable/patches/2.9.0/0006-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch similarity index 56% rename from nifi/stackable/patches/2.9.0/0006-NIFI-15901-check-services-are-disabled-before-calling-updateCont.patch rename to nifi/stackable/patches/2.9.0/0006-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch index f8f6f1082..b497941d0 100644 --- a/nifi/stackable/patches/2.9.0/0006-NIFI-15901-check-services-are-disabled-before-calling-updateCont.patch +++ b/nifi/stackable/patches/2.9.0/0006-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch @@ -1,14 +1,14 @@ -From 8f7007ac0ac2f83e27666d865727335156b5053b Mon Sep 17 00:00:00 2001 +From 2cf0721126a21eb429500000de96372272d9d3b5 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy -Date: Wed, 29 Apr 2026 17:32:02 +0200 -Subject: check services are disabled before calling updateControllerService +Date: Tue, 5 May 2026 16:48:22 +0200 +Subject: NIFI-15801 Stop processors in synchronizeProcessors before updating --- - ...tandardVersionedComponentSynchronizer.java | 54 ++++++++++++++----- - 1 file changed, 40 insertions(+), 14 deletions(-) + ...tandardVersionedComponentSynchronizer.java | 61 ++++++++++++------- + 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java -index 092d2f7e7b..7cbcc8c6dc 100644 +index 092d2f7e7b..2c64fa8cae 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -270,8 +270,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen @@ -51,71 +51,67 @@ index 092d2f7e7b..7cbcc8c6dc 100644 for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); -@@ -711,7 +711,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - } - - private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, -- final ProcessGroup topLevelGroup) { -+ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { - // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller - // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding - // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each -@@ -745,17 +745,43 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - updateControllerService(addedService, proposedService, topLevelGroup); - } +@@ -1193,21 +1193,39 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen -- // Update all of the Controller Services to match the VersionedControllerService -+ // Update all Controller Services to match the VersionedControllerService. -+ // Services may be ENABLED here if the outer "affected components" pass did not -+ // disable them (e.g. COMPONENT_ADDED diffs are skipped by AffectedComponentSet). -+ // We must disable before calling updateControllerService, which calls setProperties -+ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. -+ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); - for (final Map.Entry entry : services.entrySet()) { - final ControllerServiceNode service = entry.getKey(); - final VersionedControllerService proposedService = entry.getValue(); + private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map processorsByVersionedId, + final ProcessGroup topLevelGroup) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { - if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { -- updateControllerService(service, proposedService, topLevelGroup); +- for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { +- final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); +- if (processor == null) { +- final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); +- LOG.info("Added {} to {}", added, group); +- } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { +- updateProcessor(processor, proposedProcessor, topLevelGroup); - // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, - // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state -- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); -- LOG.info("Updated {}", service); -+ final Set referencesToRestart = new HashSet<>(); -+ final Set servicesToRestart = new HashSet<>(); +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); +- LOG.info("Updated {}", processor); +- } else { +- processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); ++ final Set processorsToRestart = new HashSet<>(); + -+ try { ++ try { ++ for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { ++ final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); ++ if (processor == null) { ++ final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); ++ LOG.info("Added {} to {}", added, group); ++ } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { ++ final long processorStopDeadline = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); + try { -+ stopControllerService(service, proposedService, stopTimeout, -+ syncOptions.getComponentStopTimeoutAction(), -+ referencesToRestart, servicesToRestart, syncOptions); ++ final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions); ++ if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { ++ processorsToRestart.add(processor); ++ } + } catch (final TimeoutException e) { -+ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); -+ } catch (final InterruptedException e) { -+ Thread.currentThread().interrupt(); -+ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); -+ } -+ updateControllerService(service, proposedService, topLevelGroup); -+ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); -+ LOG.info("Updated {}", service); -+ } finally { -+ // Re-enable services and restart components that were stopped for the update, -+ // restoring the controller to its pre-update running state. -+ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { -+ context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart); -+ context.getControllerServiceProvider().scheduleReferencingComponents( -+ service, referencesToRestart, context.getComponentScheduler()); ++ throw new FlowSynchronizationException("Failed to stop processor " + processor + " in preparation for update", e); + } ++ updateProcessor(processor, proposedProcessor, topLevelGroup); ++ // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, ++ // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); ++ LOG.info("Updated {}", processor); ++ } else { ++ processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); + } ++ } ++ } finally { ++ for (final ProcessorNode processor : processorsToRestart) { ++ processor.getProcessGroup().startProcessor(processor, false); ++ notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING); } } - -@@ -1379,7 +1405,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } +@@ -1379,7 +1397,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, final Map versionedParameterContexts, - final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { -+ final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { ++ final Map parameterProviderReferences, ProcessGroup topLevelGroup) ++ throws ProcessorInstantiationException, FlowSynchronizationException { final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); final String connectorId = destination.getConnectorIdentifier().orElse(null); final ProcessGroup group = context.getFlowManager().createProcessGroup(id, connectorId); diff --git a/nifi/stackable/patches/2.9.0/0007-NIFI-15901-Disable-controller-services-before-updati.patch b/nifi/stackable/patches/2.9.0/0007-NIFI-15901-Disable-controller-services-before-updati.patch new file mode 100644 index 000000000..94c196c98 --- /dev/null +++ b/nifi/stackable/patches/2.9.0/0007-NIFI-15901-Disable-controller-services-before-updati.patch @@ -0,0 +1,76 @@ +From 18fd66eeba5ebf61048576cc48500611a3e2e5ba Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 5 May 2026 16:57:20 +0200 +Subject: NIFI-15901 Disable controller services before updating + +--- + ...tandardVersionedComponentSynchronizer.java | 44 ++++++++++++++++--- + 1 file changed, 37 insertions(+), 7 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index 2c64fa8cae..028dfbf90b 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -711,7 +711,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } + + private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, +- final ProcessGroup topLevelGroup) { ++ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each +@@ -745,17 +745,47 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + updateControllerService(addedService, proposedService, topLevelGroup); + } + +- // Update all of the Controller Services to match the VersionedControllerService ++ // Update all Controller Services to match the VersionedControllerService. ++ // Services may still be ENABLED here because not all callers disable them before sync-ing. ++ // We must disable before calling updateControllerService, which calls setProperties ++ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. + for (final Map.Entry entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { +- updateControllerService(service, proposedService, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); +- LOG.info("Updated {}", service); ++ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); ++ final Set referencesToRestart = new HashSet<>(); ++ final Set servicesToRestart = new HashSet<>(); ++ ++ try { ++ try { ++ stopControllerService(service, proposedService, stopTimeout, ++ syncOptions.getComponentStopTimeoutAction(), ++ referencesToRestart, servicesToRestart, syncOptions); ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); ++ } catch (final InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); ++ } ++ updateControllerService(service, proposedService, topLevelGroup); ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); ++ LOG.info("Updated {}", service); ++ } finally { ++ // Re-enable services and restart components that were stopped for the update, ++ // restoring the controller to its pre-update running state. ++ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { ++ // Use the component scheduler (not the provider directly) which has ++ // already been paused, avoiding a race with the enable loop below. ++ context.getComponentScheduler().enableControllerServicesAsync(servicesToRestart); ++ notifyScheduledStateChange(servicesToRestart, syncOptions, org.apache.nifi.flow.ScheduledState.ENABLED); ++ context.getControllerServiceProvider().scheduleReferencingComponents( ++ service, referencesToRestart, context.getComponentScheduler()); ++ referencesToRestart.forEach(componentNode -> ++ notifyScheduledStateChange(componentNode, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING)); ++ } ++ } + } + } + From 719d1f4046f35240ac0c9b7ea79e051bd1994973 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 5 May 2026 17:27:35 +0200 Subject: [PATCH 08/10] add patch for sibling issue NIFI-15801 before (updated) NIFI-15901 patch: version 2.7.2 --- ...rocessors-in-synchronizeProcessors-.patch} | 108 ++++++++-------- ...re-disabled-before-calling-updateCon.patch | 121 ------------------ ...le-controller-services-before-updati.patch | 76 +++++++++++ 3 files changed, 128 insertions(+), 177 deletions(-) rename nifi/stackable/patches/{2.6.0/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch => 2.7.2/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch} (56%) delete mode 100644 nifi/stackable/patches/2.7.2/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch create mode 100644 nifi/stackable/patches/2.7.2/0009-NIFI-15901-Disable-controller-services-before-updati.patch diff --git a/nifi/stackable/patches/2.6.0/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch b/nifi/stackable/patches/2.7.2/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch similarity index 56% rename from nifi/stackable/patches/2.6.0/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch rename to nifi/stackable/patches/2.7.2/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch index 73b5dacc5..aafba6b22 100644 --- a/nifi/stackable/patches/2.6.0/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch +++ b/nifi/stackable/patches/2.7.2/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch @@ -1,14 +1,14 @@ -From f19aa11944fa70bf6706aeb619813faece1a606d Mon Sep 17 00:00:00 2001 +From 7dd6e0295bce03dcc86a0c671a73b67641c7af25 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy -Date: Mon, 4 May 2026 11:33:36 +0200 -Subject: check services are disabled before calling updateControllerService +Date: Tue, 5 May 2026 17:19:10 +0200 +Subject: NIFI-15801 Stop processors in synchronizeProcessors before updating --- - ...tandardVersionedComponentSynchronizer.java | 54 ++++++++++++++----- - 1 file changed, 40 insertions(+), 14 deletions(-) + ...tandardVersionedComponentSynchronizer.java | 61 ++++++++++++------- + 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java -index b79ee4d6e8..0310e57be9 100644 +index b79ee4d6e8..d8f2d2da6d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -269,8 +269,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen @@ -51,71 +51,67 @@ index b79ee4d6e8..0310e57be9 100644 for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); -@@ -707,7 +707,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - } - - private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, -- final ProcessGroup topLevelGroup) { -+ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { - // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller - // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding - // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each -@@ -741,17 +741,43 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - updateControllerService(addedService, proposedService, topLevelGroup); - } +@@ -1189,21 +1189,39 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen -- // Update all of the Controller Services to match the VersionedControllerService -+ // Update all Controller Services to match the VersionedControllerService. -+ // Services may be ENABLED here if the outer "affected components" pass did not -+ // disable them (e.g. COMPONENT_ADDED diffs are skipped by AffectedComponentSet). -+ // We must disable before calling updateControllerService, which calls setProperties -+ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. -+ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); - for (final Map.Entry entry : services.entrySet()) { - final ControllerServiceNode service = entry.getKey(); - final VersionedControllerService proposedService = entry.getValue(); + private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map processorsByVersionedId, + final ProcessGroup topLevelGroup) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { - if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { -- updateControllerService(service, proposedService, topLevelGroup); +- for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { +- final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); +- if (processor == null) { +- final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); +- LOG.info("Added {} to {}", added, group); +- } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { +- updateProcessor(processor, proposedProcessor, topLevelGroup); - // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, - // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state -- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); -- LOG.info("Updated {}", service); -+ final Set referencesToRestart = new HashSet<>(); -+ final Set servicesToRestart = new HashSet<>(); +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); +- LOG.info("Updated {}", processor); +- } else { +- processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); ++ final Set processorsToRestart = new HashSet<>(); + -+ try { ++ try { ++ for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { ++ final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); ++ if (processor == null) { ++ final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); ++ LOG.info("Added {} to {}", added, group); ++ } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { ++ final long processorStopDeadline = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); + try { -+ stopControllerService(service, proposedService, stopTimeout, -+ syncOptions.getComponentStopTimeoutAction(), -+ referencesToRestart, servicesToRestart, syncOptions); ++ final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions); ++ if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { ++ processorsToRestart.add(processor); ++ } + } catch (final TimeoutException e) { -+ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); -+ } catch (final InterruptedException e) { -+ Thread.currentThread().interrupt(); -+ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); -+ } -+ updateControllerService(service, proposedService, topLevelGroup); -+ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); -+ LOG.info("Updated {}", service); -+ } finally { -+ // Re-enable services and restart components that were stopped for the update, -+ // restoring the controller to its pre-update running state. -+ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { -+ context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart); -+ context.getControllerServiceProvider().scheduleReferencingComponents( -+ service, referencesToRestart, context.getComponentScheduler()); ++ throw new FlowSynchronizationException("Failed to stop processor " + processor + " in preparation for update", e); + } ++ updateProcessor(processor, proposedProcessor, topLevelGroup); ++ // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, ++ // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); ++ LOG.info("Updated {}", processor); ++ } else { ++ processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); + } ++ } ++ } finally { ++ for (final ProcessorNode processor : processorsToRestart) { ++ processor.getProcessGroup().startProcessor(processor, false); ++ notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING); } } - -@@ -1375,7 +1401,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } +@@ -1375,7 +1393,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, final Map versionedParameterContexts, - final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { -+ final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { ++ final Map parameterProviderReferences, ProcessGroup topLevelGroup) ++ throws ProcessorInstantiationException, FlowSynchronizationException { final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); final ProcessGroup group = context.getFlowManager().createProcessGroup(id); group.setVersionedComponentId(proposed.getIdentifier()); diff --git a/nifi/stackable/patches/2.7.2/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch b/nifi/stackable/patches/2.7.2/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch deleted file mode 100644 index 06414c398..000000000 --- a/nifi/stackable/patches/2.7.2/0008-NIFI-15901-check-services-are-disabled-before-calling-updateCon.patch +++ /dev/null @@ -1,121 +0,0 @@ -From 3893ce8d059f7d9365ba48b3f5e4f898701b71e8 Mon Sep 17 00:00:00 2001 -From: Andrew Kenworthy -Date: Thu, 30 Apr 2026 18:05:11 +0200 -Subject: check services are disabled before calling updateControllerService - ---- - ...tandardVersionedComponentSynchronizer.java | 54 ++++++++++++++----- - 1 file changed, 40 insertions(+), 14 deletions(-) - -diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java -index b79ee4d6e8..d5b4d8c36e 100644 ---- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java -+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java -@@ -269,8 +269,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), - additions.getParameterContexts(), additions.getParameterProviders(), group); - additionsBuilder.addProcessGroup(newProcessGroup); -- } catch (final ProcessorInstantiationException pie) { -- throw new RuntimeException(pie); -+ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { -+ throw new RuntimeException(e); - } - }); - -@@ -392,8 +392,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); - synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), - parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); -- } catch (final ProcessorInstantiationException pie) { -- throw new RuntimeException(pie); -+ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { -+ throw new RuntimeException(e); - } - }); - -@@ -422,7 +422,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - - private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, - final Map parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings) -- throws ProcessorInstantiationException { -+ throws ProcessorInstantiationException, FlowSynchronizationException { - - // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we - // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it, -@@ -687,7 +687,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - - private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, - final Map childGroupsByVersionedId, final Map parameterProviderReferences, -- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { -+ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { - - for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { - final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); -@@ -707,7 +707,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - } - - private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, -- final ProcessGroup topLevelGroup) { -+ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { - // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller - // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding - // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each -@@ -741,17 +741,43 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - updateControllerService(addedService, proposedService, topLevelGroup); - } - -- // Update all of the Controller Services to match the VersionedControllerService -+ // Update all Controller Services to match the VersionedControllerService. -+ // Services may be ENABLED here if the outer "affected components" pass did not -+ // disable them (e.g. COMPONENT_ADDED diffs are skipped by AffectedComponentSet). -+ // We must disable before calling updateControllerService, which calls setProperties -+ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. -+ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); - for (final Map.Entry entry : services.entrySet()) { - final ControllerServiceNode service = entry.getKey(); - final VersionedControllerService proposedService = entry.getValue(); - - if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { -- updateControllerService(service, proposedService, topLevelGroup); -- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, -- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state -- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); -- LOG.info("Updated {}", service); -+ final Set referencesToRestart = new HashSet<>(); -+ final Set servicesToRestart = new HashSet<>(); -+ -+ try { -+ try { -+ stopControllerService(service, proposedService, stopTimeout, -+ syncOptions.getComponentStopTimeoutAction(), -+ referencesToRestart, servicesToRestart, syncOptions); -+ } catch (final TimeoutException e) { -+ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); -+ } catch (final InterruptedException e) { -+ Thread.currentThread().interrupt(); -+ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); -+ } -+ updateControllerService(service, proposedService, topLevelGroup); -+ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); -+ LOG.info("Updated {}", service); -+ } finally { -+ // Re-enable services and restart components that were stopped for the update, -+ // restoring the controller to its pre-update running state. -+ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { -+ context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart); -+ context.getControllerServiceProvider().scheduleReferencingComponents( -+ service, referencesToRestart, context.getComponentScheduler()); -+ } -+ } - } - } - -@@ -1375,7 +1401,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen - - private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, - final Map versionedParameterContexts, -- final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { -+ final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { - final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); - final ProcessGroup group = context.getFlowManager().createProcessGroup(id); - group.setVersionedComponentId(proposed.getIdentifier()); diff --git a/nifi/stackable/patches/2.7.2/0009-NIFI-15901-Disable-controller-services-before-updati.patch b/nifi/stackable/patches/2.7.2/0009-NIFI-15901-Disable-controller-services-before-updati.patch new file mode 100644 index 000000000..34186aabe --- /dev/null +++ b/nifi/stackable/patches/2.7.2/0009-NIFI-15901-Disable-controller-services-before-updati.patch @@ -0,0 +1,76 @@ +From 5a16796127e103b181a3cdb760f566c65bab0851 Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 5 May 2026 17:25:46 +0200 +Subject: NIFI-15901 Disable controller services before updating + +--- + ...tandardVersionedComponentSynchronizer.java | 44 ++++++++++++++++--- + 1 file changed, 37 insertions(+), 7 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index d8f2d2da6d..01ffa09a4b 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -707,7 +707,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } + + private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, +- final ProcessGroup topLevelGroup) { ++ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each +@@ -741,17 +741,47 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + updateControllerService(addedService, proposedService, topLevelGroup); + } + +- // Update all of the Controller Services to match the VersionedControllerService ++ // Update all Controller Services to match the VersionedControllerService. ++ // Services may still be ENABLED here because not all callers disable them before sync-ing. ++ // We must disable before calling updateControllerService, which calls setProperties ++ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. + for (final Map.Entry entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { +- updateControllerService(service, proposedService, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); +- LOG.info("Updated {}", service); ++ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); ++ final Set referencesToRestart = new HashSet<>(); ++ final Set servicesToRestart = new HashSet<>(); ++ ++ try { ++ try { ++ stopControllerService(service, proposedService, stopTimeout, ++ syncOptions.getComponentStopTimeoutAction(), ++ referencesToRestart, servicesToRestart, syncOptions); ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); ++ } catch (final InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); ++ } ++ updateControllerService(service, proposedService, topLevelGroup); ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); ++ LOG.info("Updated {}", service); ++ } finally { ++ // Re-enable services and restart components that were stopped for the update, ++ // restoring the controller to its pre-update running state. ++ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { ++ // Use the component scheduler (not the provider directly) which has ++ // already been paused, avoiding a race with the enable loop below. ++ context.getComponentScheduler().enableControllerServicesAsync(servicesToRestart); ++ notifyScheduledStateChange(servicesToRestart, syncOptions, org.apache.nifi.flow.ScheduledState.ENABLED); ++ context.getControllerServiceProvider().scheduleReferencingComponents( ++ service, referencesToRestart, context.getComponentScheduler()); ++ referencesToRestart.forEach(componentNode -> ++ notifyScheduledStateChange(componentNode, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING)); ++ } ++ } + } + } + From 20bc7fe45457495fdae6c6b90fc621f65501d228 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 5 May 2026 17:53:15 +0200 Subject: [PATCH 09/10] add patch for sibling issue NIFI-15801 before (updated) NIFI-15901 patch: version 2.6.0 --- ...processors-in-synchronizeProcessors-.patch | 117 ++++++++++++++++++ ...le-controller-services-before-updati.patch | 76 ++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 nifi/stackable/patches/2.6.0/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch create mode 100644 nifi/stackable/patches/2.6.0/0009-NIFI-15901-Disable-controller-services-before-updati.patch diff --git a/nifi/stackable/patches/2.6.0/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch b/nifi/stackable/patches/2.6.0/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch new file mode 100644 index 000000000..34d1806c5 --- /dev/null +++ b/nifi/stackable/patches/2.6.0/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch @@ -0,0 +1,117 @@ +From ec247bbf5e8b607267abaa2b302dc4a355e9767e Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 5 May 2026 17:44:42 +0200 +Subject: NIFI-15801 Stop processors in synchronizeProcessors before updating + +--- + ...tandardVersionedComponentSynchronizer.java | 61 ++++++++++++------- + 1 file changed, 40 insertions(+), 21 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index b79ee4d6e8..c3e059171e 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -269,8 +269,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), + additions.getParameterContexts(), additions.getParameterProviders(), group); + additionsBuilder.addProcessGroup(newProcessGroup); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -392,8 +392,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); + synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), + parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -422,7 +422,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + + // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we + // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it, +@@ -687,7 +687,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map childGroupsByVersionedId, final Map parameterProviderReferences, +- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { + + for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { + final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); +@@ -1189,21 +1189,39 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map processorsByVersionedId, + final ProcessGroup topLevelGroup) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + +- for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { +- final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); +- if (processor == null) { +- final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); +- LOG.info("Added {} to {}", added, group); +- } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { +- updateProcessor(processor, proposedProcessor, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); +- LOG.info("Updated {}", processor); +- } else { +- processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); ++ final Set processorsToRestart = new HashSet<>(); ++ ++ try { ++ for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { ++ final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); ++ if (processor == null) { ++ final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); ++ LOG.info("Added {} to {}", added, group); ++ } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { ++ final long processorStopDeadline = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); ++ try { ++ final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions); ++ if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { ++ processorsToRestart.add(processor); ++ } ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop processor " + processor + " in preparation for update", e); ++ } ++ updateProcessor(processor, proposedProcessor, topLevelGroup); ++ // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, ++ // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); ++ LOG.info("Updated {}", processor); ++ } else { ++ processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); ++ } ++ } ++ } finally { ++ for (final ProcessorNode processor : processorsToRestart) { ++ processor.getProcessGroup().startProcessor(processor, false); ++ notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING); + } + } + } +@@ -1375,7 +1393,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, + final Map versionedParameterContexts, +- final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final Map parameterProviderReferences, ProcessGroup topLevelGroup) ++ throws ProcessorInstantiationException, FlowSynchronizationException { + final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); + final ProcessGroup group = context.getFlowManager().createProcessGroup(id); + group.setVersionedComponentId(proposed.getIdentifier()); diff --git a/nifi/stackable/patches/2.6.0/0009-NIFI-15901-Disable-controller-services-before-updati.patch b/nifi/stackable/patches/2.6.0/0009-NIFI-15901-Disable-controller-services-before-updati.patch new file mode 100644 index 000000000..c2b0a196f --- /dev/null +++ b/nifi/stackable/patches/2.6.0/0009-NIFI-15901-Disable-controller-services-before-updati.patch @@ -0,0 +1,76 @@ +From 67b404db6005a90b9ef6fce018524b181dae9b98 Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 5 May 2026 17:50:06 +0200 +Subject: NIFI-15901 Disable controller services before updating + +--- + ...tandardVersionedComponentSynchronizer.java | 44 ++++++++++++++++--- + 1 file changed, 37 insertions(+), 7 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index c3e059171e..030f7f5a90 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -707,7 +707,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } + + private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, +- final ProcessGroup topLevelGroup) { ++ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each +@@ -741,17 +741,47 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + updateControllerService(addedService, proposedService, topLevelGroup); + } + +- // Update all of the Controller Services to match the VersionedControllerService ++ // Update all Controller Services to match the VersionedControllerService. ++ // Services may still be ENABLED here because not all callers disable them before sync-ing. ++ // We must disable before calling updateControllerService, which calls setProperties ++ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. + for (final Map.Entry entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { +- updateControllerService(service, proposedService, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); +- LOG.info("Updated {}", service); ++ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); ++ final Set referencesToRestart = new HashSet<>(); ++ final Set servicesToRestart = new HashSet<>(); ++ ++ try { ++ try { ++ stopControllerService(service, proposedService, stopTimeout, ++ syncOptions.getComponentStopTimeoutAction(), ++ referencesToRestart, servicesToRestart, syncOptions); ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); ++ } catch (final InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); ++ } ++ updateControllerService(service, proposedService, topLevelGroup); ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); ++ LOG.info("Updated {}", service); ++ } finally { ++ // Re-enable services and restart components that were stopped for the update, ++ // restoring the controller to its pre-update running state. ++ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { ++ // Use the component scheduler (not the provider directly) which has ++ // already been paused, avoiding a race with the enable loop below. ++ context.getComponentScheduler().enableControllerServicesAsync(servicesToRestart); ++ notifyScheduledStateChange(servicesToRestart, syncOptions, org.apache.nifi.flow.ScheduledState.ENABLED); ++ context.getControllerServiceProvider().scheduleReferencingComponents( ++ service, referencesToRestart, context.getComponentScheduler()); ++ referencesToRestart.forEach(componentNode -> ++ notifyScheduledStateChange(componentNode, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING)); ++ } ++ } + } + } + From 08c284648f335732b9bbca6311352986133f340c Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 5 May 2026 17:53:52 +0200 Subject: [PATCH 10/10] updated changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index eaefb8030..6f391a122 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file. - hadoop: Add precompiled hadoop for later reuse in dependent images ([#1466], [#1474]). - nifi: Add version `2.9.0` ([#1463]). +- nifi: Backport NIFI-15801 to 2.x versions ([#1481]). - nifi: Backport NIFI-15901 to 2.x versions ([#1481]). ### Changed