Skip to content

Commit 2f491ab

Browse files
committed
NIFI-15258: Addressed review feedback
1 parent 98fbc04 commit 2f491ab

6 files changed

Lines changed: 56 additions & 36 deletions

File tree

src/main/java/org/apache/nifi/components/connector/AbstractConnector.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.nifi.components.ValidationResult;
2323
import org.apache.nifi.components.connector.components.ConnectionFacade;
2424
import org.apache.nifi.components.connector.components.ControllerServiceFacade;
25-
import org.apache.nifi.components.connector.components.ControllerServiceReferenceHierarchy;
25+
import org.apache.nifi.components.connector.components.ComponentHierarchyScope;
2626
import org.apache.nifi.components.connector.components.ControllerServiceReferenceScope;
2727
import org.apache.nifi.components.connector.components.FlowContext;
2828
import org.apache.nifi.components.connector.components.ProcessGroupFacade;
@@ -57,6 +57,15 @@ public abstract class AbstractConnector implements Connector {
5757
private volatile CompletableFuture<Void> prepareUpdateFuture;
5858
private String description; // effectively final
5959

60+
/**
61+
* Called whenever a specific configuration step has been configured. This allows the Connector to perform any necessary
62+
* actions in response to the configuration step being configured. A typical pattern is to update the flow so that in subsequent
63+
* configuration steps, the properties of the step are available for use when verifying configuration or fetching allowable values.
64+
*
65+
* @param stepName the name of the configuration step that has been configured
66+
* @param workingContext the working flow context that is being used for the configuration step
67+
* @throws FlowUpdateException if there is an error performing the necessary actions in response to the configuration step being configured
68+
*/
6069
protected abstract void onStepConfigured(final String stepName, final FlowContext workingContext) throws FlowUpdateException;
6170

6271

@@ -92,20 +101,20 @@ public void start(final FlowContext context) throws FlowUpdateException {
92101
final ProcessGroupLifecycle lifecycle = context.getRootGroup().getLifecycle();
93102
final CompletableFuture<Void> enableServicesFuture = lifecycle.enableControllerServices(
94103
ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
95-
ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
104+
ComponentHierarchyScope.INCLUDE_CHILD_GROUPS);
96105

97106
try {
98107
enableServicesFuture.get();
99108
} catch (final Exception e) {
100-
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
109+
lifecycle.disableControllerServices(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS);
101110
throw new FlowUpdateException("Failed to enable Controller Services while starting Connector", e);
102111
}
103112

104113
try {
105-
lifecycle.startProcessors(true).get();
106-
lifecycle.startPorts(true).get();
107-
lifecycle.startStatelessGroups(true).get();
108-
lifecycle.startRemoteProcessGroups(true).get();
114+
lifecycle.startProcessors(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS).get();
115+
lifecycle.startPorts(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS).get();
116+
lifecycle.startStatelessGroups(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS).get();
117+
lifecycle.startRemoteProcessGroups(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS).get();
109118
} catch (final Exception e) {
110119
logger.error("Failed to start components for {}", this, e);
111120
try {
@@ -134,7 +143,7 @@ private CompletableFuture<Void> stopAsync(final FlowContext context) {
134143
final ProcessGroupLifecycle lifecycle = rootGroup.getLifecycle();
135144

136145
try {
137-
lifecycle.stopProcessors(true).get(1, TimeUnit.MINUTES);
146+
lifecycle.stopProcessors(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS).get(1, TimeUnit.MINUTES);
138147
} catch (final TimeoutException e) {
139148
final List<ProcessorFacade> running = findProcessors(rootGroup, processor ->
140149
processor.getLifecycle().getState() != ProcessorState.STOPPED && processor.getLifecycle().getState() != ProcessorState.DISABLED);
@@ -147,10 +156,10 @@ private CompletableFuture<Void> stopAsync(final FlowContext context) {
147156
throw new RuntimeException("Failed to stop all Processors", e.getCause());
148157
}
149158

150-
lifecycle.stopPorts(true).get(1, TimeUnit.MINUTES);
151-
lifecycle.stopRemoteProcessGroups(true).get(1, TimeUnit.MINUTES);
152-
lifecycle.stopStatelessGroups(true).get(2, TimeUnit.MINUTES);
153-
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(2, TimeUnit.MINUTES);
159+
lifecycle.stopPorts(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS).get(1, TimeUnit.MINUTES);
160+
lifecycle.stopRemoteProcessGroups(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS).get(1, TimeUnit.MINUTES);
161+
lifecycle.stopStatelessGroups(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS).get(2, TimeUnit.MINUTES);
162+
lifecycle.disableControllerServices(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS).get(2, TimeUnit.MINUTES);
154163

155164
result.complete(null);
156165
} catch (final Exception e) {
@@ -221,7 +230,7 @@ private void completeDrain(final CompletableFuture<Void> result, final FlowConte
221230
int iterations = 0;
222231
while (!isGroupDrained(flowContext.getRootGroup())) {
223232
if (result.isDone()) {
224-
getLogger().info("Drainage has been cancelled; will no longer wait for FlowFiles to drain");
233+
getLogger().info("Drain cancelled: no longer waiting for FlowFiles to drain");
225234
break;
226235
}
227236

@@ -274,7 +283,7 @@ private void startNonSourceComponents(final CompletableFuture<Void> result, fina
274283

275284
final CompletableFuture<Void> enableServices = flowContext.getRootGroup().getLifecycle().enableControllerServices(
276285
ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
277-
ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
286+
ComponentHierarchyScope.INCLUDE_CHILD_GROUPS);
278287

279288
try {
280289
// Wait for all referenced services to be enabled.
@@ -286,7 +295,7 @@ private void startNonSourceComponents(final CompletableFuture<Void> result, fina
286295
}
287296
} catch (final Exception e) {
288297
try {
289-
flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
298+
flowContext.getRootGroup().getLifecycle().disableControllerServices(ComponentHierarchyScope.INCLUDE_CHILD_GROUPS).get();
290299
} catch (final Exception e1) {
291300
e.addSuppressed(e1);
292301
}
@@ -350,7 +359,6 @@ public List<ValidationResult> validate(final FlowContext flowContext, final Conn
350359
return results;
351360
}
352361

353-
354362
protected List<ValidationResult> validateComponents(final FlowContext context, final ProcessGroupFacade group, final ConnectorValidationContext validationContext) {
355363
final List<ValidationResult> validationResults = new ArrayList<>();
356364
validateComponents(context, group, validationContext, validationResults);
@@ -378,7 +386,7 @@ private void validateComponents(final FlowContext context, final ProcessGroupFac
378386

379387
final Set<ControllerServiceFacade> referencedServices = group.getControllerServices(
380388
ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
381-
ControllerServiceReferenceHierarchy.DIRECT_SERVICES_ONLY);
389+
ComponentHierarchyScope.IMMEDIATE_GROUP_ONLY);
382390

383391
for (final ControllerServiceFacade service : referencedServices) {
384392
final List<ValidationResult> serviceResults = service.validate();
@@ -437,9 +445,9 @@ private CompletableFuture<Void> startNonSourceComponents(final ProcessGroupFacad
437445
}
438446

439447
final ProcessGroupLifecycle lifecycle = group.getLifecycle();
440-
startFutures.add(lifecycle.startPorts(false));
441-
startFutures.add(lifecycle.startRemoteProcessGroups(false));
442-
startFutures.add(lifecycle.startStatelessGroups(false));
448+
startFutures.add(lifecycle.startPorts(ComponentHierarchyScope.IMMEDIATE_GROUP_ONLY));
449+
startFutures.add(lifecycle.startRemoteProcessGroups(ComponentHierarchyScope.IMMEDIATE_GROUP_ONLY));
450+
startFutures.add(lifecycle.startStatelessGroups(ComponentHierarchyScope.IMMEDIATE_GROUP_ONLY));
443451

444452
for (final ProcessGroupFacade childGroup : group.getProcessGroups()) {
445453
startFutures.add(startNonSourceComponents(childGroup));

src/main/java/org/apache/nifi/components/connector/ConnectorPropertyDescriptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public ValidationResult validate(final String stepName, final String groupName,
107107
.subject(name)
108108
.input(value)
109109
.valid(false)
110-
.explanation("Failed to fetch allowable values: " + e.getMessage())
110+
.explanation("Failed to fetch allowable values: " + e)
111111
.build();
112112
}
113113
} else {

src/main/java/org/apache/nifi/components/connector/components/ControllerServiceReferenceHierarchy.java renamed to src/main/java/org/apache/nifi/components/connector/components/ComponentHierarchyScope.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717

1818
package org.apache.nifi.components.connector.components;
1919

20-
public enum ControllerServiceReferenceHierarchy {
20+
/**
21+
* Describes the scope of a component operation within the Process Group hierarchy.
22+
*/
23+
public enum ComponentHierarchyScope {
2124

2225
/**
23-
* Interact only with Controller Services that are directly within the Process Group.
26+
* Interact only with components that are immediately within the Process Group.
2427
*/
25-
DIRECT_SERVICES_ONLY,
28+
IMMEDIATE_GROUP_ONLY,
2629

2730
/**
28-
* Interact with Controller Services within the Process Group and all child Process Groups, recursively.
31+
* Interact with components within the Process Group and all child Process Groups, recursively.
2932
*/
3033
INCLUDE_CHILD_GROUPS;
3134
}

src/main/java/org/apache/nifi/components/connector/components/FlowContextType.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,16 @@
1919

2020
public enum FlowContextType {
2121

22+
/**
23+
* The ACTIVE Flow Context represents the version of the flow that able to be started and stopped and run
24+
* the dataflow.
25+
*/
2226
ACTIVE,
2327

28+
/**
29+
* The WORKING Flow Context represents the "in process" version of the flow that is being configured. This
30+
* version of the flow is not directly started or stopped by the user, but is used to verify configuration,
31+
* fetch allowable values, and perform other configuration-related operations.
32+
*/
2433
WORKING;
2534
}

src/main/java/org/apache/nifi/components/connector/components/ProcessGroupFacade.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public interface ProcessGroupFacade {
3838

3939
Set<ControllerServiceFacade> getControllerServices();
4040

41-
Set<ControllerServiceFacade> getControllerServices(ControllerServiceReferenceScope referenceScope, ControllerServiceReferenceHierarchy hierarchy);
41+
Set<ControllerServiceFacade> getControllerServices(ControllerServiceReferenceScope referenceScope, ComponentHierarchyScope hierarchyScope);
4242

4343
ConnectionFacade getConnection(String id);
4444

src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,33 +22,33 @@
2222

2323
public interface ProcessGroupLifecycle {
2424

25-
CompletableFuture<Void> enableControllerServices(ControllerServiceReferenceScope scope, ControllerServiceReferenceHierarchy hierarchy);
25+
CompletableFuture<Void> enableControllerServices(ControllerServiceReferenceScope referenceScope, ComponentHierarchyScope hierarchyScope);
2626

2727
CompletableFuture<Void> enableControllerServices(Collection<String> serviceIdentifiers);
2828

29-
CompletableFuture<Void> disableControllerServices(ControllerServiceReferenceHierarchy hierarchy);
29+
CompletableFuture<Void> disableControllerServices(ComponentHierarchyScope scope);
3030

3131
CompletableFuture<Void> disableControllerServices(Collection<String> serviceIdentifiers);
3232

33-
CompletableFuture<Void> startProcessors(boolean recursive);
33+
CompletableFuture<Void> startProcessors(ComponentHierarchyScope scope);
3434

3535
CompletableFuture<Void> start(ControllerServiceReferenceScope serviceReferenceScope);
3636

3737
CompletableFuture<Void> stop();
3838

39-
CompletableFuture<Void> stopProcessors(boolean recursive);
39+
CompletableFuture<Void> stopProcessors(ComponentHierarchyScope scope);
4040

41-
CompletableFuture<Void> startPorts(boolean recursive);
41+
CompletableFuture<Void> startPorts(ComponentHierarchyScope scope);
4242

43-
CompletableFuture<Void> stopPorts(boolean recursive);
43+
CompletableFuture<Void> stopPorts(ComponentHierarchyScope scope);
4444

45-
CompletableFuture<Void> startRemoteProcessGroups(boolean recursive);
45+
CompletableFuture<Void> startRemoteProcessGroups(ComponentHierarchyScope scope);
4646

47-
CompletableFuture<Void> stopRemoteProcessGroups(boolean recursive);
47+
CompletableFuture<Void> stopRemoteProcessGroups(ComponentHierarchyScope scope);
4848

49-
CompletableFuture<Void> startStatelessGroups(boolean recursive);
49+
CompletableFuture<Void> startStatelessGroups(ComponentHierarchyScope scope);
5050

51-
CompletableFuture<Void> stopStatelessGroups(boolean recursive);
51+
CompletableFuture<Void> stopStatelessGroups(ComponentHierarchyScope scope);
5252

5353
int getActiveThreadCount();
5454
}

0 commit comments

Comments
 (0)