Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ public interface NiFiServiceFacade {
Set<ControllerServiceEntity> getConnectorControllerServices(String connectorId, String processGroupId, boolean includeAncestorGroups,
boolean includeDescendantGroups, boolean includeReferencingComponents);

/**
* Returns the parameter context bound to the specified process group within the connector's hierarchy. Sensitive parameter values are masked
* by the underlying DTO factory.
*
* @param connectorId the connector id
* @param processGroupId the process group id within the connector's hierarchy
* @return the parameter context entity with effective parameters (inherited included), or {@code null} if the process group has no bound parameter context
*/
ParameterContextEntity getConnectorParameterContext(String connectorId, String processGroupId);

void verifyCanVerifyConnectorConfigurationStep(String connectorId, String configurationStepName);

List<ConfigVerificationResultDTO> performConnectorConfigurationStepVerification(String connectorId, String configurationStepName, ConfigurationStepConfigurationDTO configurationStepConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3950,6 +3950,26 @@ public Set<ControllerServiceEntity> getConnectorControllerServices(final String
.collect(Collectors.toSet());
}

@Override
public ParameterContextEntity getConnectorParameterContext(final String connectorId, final String processGroupId) {
final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY);
final ProcessGroup managedProcessGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
final ProcessGroup targetProcessGroup = managedProcessGroup.findProcessGroup(processGroupId);
if (targetProcessGroup == null) {
throw new ResourceNotFoundException("Process Group with ID " + processGroupId + " was not found within Connector " + connectorId);
}

final ParameterContext parameterContext = targetProcessGroup.getParameterContext();
if (parameterContext == null) {
return null;
}

// Connector-managed parameter contexts (and any contexts they inherit from) are not registered with the
// global flow's ParameterContextManager. The DTO factory now resolves a parameter's source context by
// walking the in-memory inheritance graph on the supplied context, so an empty lookup is sufficient here.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "now resolves" is patch-relative — it describes the code relative to what it used to do rather than its permanent contract. Comments should explain the invariant, not the change (those belong in the commit message).

Suggested rewrite:

// Connector-managed parameter contexts are not registered with the global ParameterContextManager,
// so the DAO-backed lookup would fail for any inherited parameter. The in-memory inheritance graph
// reachable from the supplied context is sufficient to resolve parameter source contexts for
// connector-managed flows, making an empty lookup safe here.

return createParameterContextEntity(parameterContext, true, NiFiUserUtils.getNiFiUser(), ParameterContextLookup.EMPTY);
}

@Override
public void verifyCanVerifyConnectorConfigurationStep(final String connectorId, final String configurationStepName) {
connectorDAO.verifyCanVerifyConfigurationStep(connectorId, configurationStepName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.SearchResultsEntity;
Expand Down Expand Up @@ -1874,6 +1875,59 @@ public Response getControllerServicesFromConnectorProcessGroup(
return generateOkResponse(entity).build();
}

/**
* Retrieves the parameter context bound to the specified process group within a connector.
*
* @param connectorId The id of the connector
* @param processGroupId The process group id within the connector's hierarchy
* @return A parameterContextEntity, or 204 No Content if the process group has no bound parameter context
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("/{connectorId}/flow/process-groups/{processGroupId}/parameter-context")
@Operation(
summary = "Gets the parameter context bound to a process group within a connector",
responses = {
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = ParameterContextEntity.class))),
@ApiResponse(responseCode = "204", description = "The specified process group has no bound parameter context."),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
},
security = {
@SecurityRequirement(name = "Read - /connectors/{uuid}")
},
description = "Returns the parameter context (with effective parameters, including those inherited from other contexts) bound to the " +
"specified process group within the connector's hierarchy. Sensitive parameter values are masked. Returns 204 No Content if the " +
"process group has no bound parameter context."
)
public Response getParameterContextForConnectorProcessGroup(
@Parameter(description = "The connector id.", required = true)
@PathParam("connectorId") final String connectorId,
@Parameter(description = "The process group id.", required = true)
@PathParam("processGroupId") final String processGroupId) {

if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}

// authorize access to the connector
serviceFacade.authorizeAccess(lookup -> {
final Authorizable connector = lookup.getConnector(connectorId);
connector.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});

final ParameterContextEntity entity = serviceFacade.getConnectorParameterContext(connectorId, processGroupId);
if (entity == null) {
return Response.noContent().build();
}

return generateOkResponse(entity).build();
}

/**
* Retrieves the status for the process group managed by the specified connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1620,8 +1620,7 @@ public ParameterDTO createParameterDto(final ParameterContext parameterContext,
final Set<AffectedComponentEntity> referencingComponentEntities = createAffectedComponentEntities(referencingComponents, revisionManager);
dto.setReferencingComponents(referencingComponentEntities);

final ParameterContext containingParameterContext = (parameter.getParameterContextId() == null)
? parameterContext : parameterContextLookup.getParameterContext(parameter.getParameterContextId());
final ParameterContext containingParameterContext = resolveContainingParameterContext(parameterContext, parameter, parameterContextLookup);

dto.setInherited(!containingParameterContext.getIdentifier().equals(parameterContext.getIdentifier()));

Expand All @@ -1631,6 +1630,51 @@ public ParameterDTO createParameterDto(final ParameterContext parameterContext,
return dto;
}

/**
* Resolves the {@link ParameterContext} where the given parameter was originally defined.
*
* <p>For parameters declared directly on the current context (or whose source id matches the current
* context's identifier), the current context is returned without consulting any external lookup. For
* inherited parameters, the source context is found by walking the in-memory inheritance graph reachable
* from the current context via {@link ParameterContext#getInheritedParameterContexts()}. Only if the
* source context is not reachable on that graph (which is expected only for legacy callers that pass a
* registry-backed lookup) does the provided {@link ParameterContextLookup} get consulted as a fallback.</p>
*/
private ParameterContext resolveContainingParameterContext(final ParameterContext parameterContext, final Parameter parameter,
final ParameterContextLookup parameterContextLookup) {
final String sourceId = parameter.getParameterContextId();
if (sourceId == null || sourceId.equals(parameterContext.getIdentifier())) {
return parameterContext;
}

final ParameterContext fromGraph = findInheritedParameterContext(parameterContext, sourceId, new HashSet<>());
if (fromGraph != null) {
return fromGraph;
}

return parameterContextLookup.getParameterContext(sourceId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolveContainingParameterContext can return null here when both the in-memory graph walk and the lookup come up empty. The connector path in StandardNiFiServiceFacade passes ParameterContextLookup.EMPTY, whose getParameterContext unconditionally returns null. If a parameter carries a parameterContextId that is not reachable in the in-memory graph (e.g. data inconsistency after a reconciliation), this method returns null, and the caller on line 1625 immediately dereferences it with .getIdentifier(), producing an NPE.

Suggested fix — guard the return value:

final ParameterContext result = parameterContextLookup.getParameterContext(sourceId);
return result != null ? result : parameterContext;

Falling back to parameterContext (treating the parameter as locally defined) is conservative and safe. Alternatively, throw an IllegalStateException with a descriptive message so the inconsistency surfaces clearly rather than crashing silently.

}

private ParameterContext findInheritedParameterContext(final ParameterContext parameterContext, final String sourceId, final Set<String> visited) {
if (parameterContext == null || !visited.add(parameterContext.getIdentifier())) {
return null;
}
if (sourceId.equals(parameterContext.getIdentifier())) {
return parameterContext;
}
final List<ParameterContext> inherited = parameterContext.getInheritedParameterContexts();
if (inherited == null) {
return null;
}
for (final ParameterContext inheritedContext : inherited) {
final ParameterContext match = findInheritedParameterContext(inheritedContext, sourceId, visited);
if (match != null) {
return match;
}
}
return null;
}

public ReportingTaskDTO createReportingTaskDto(final ReportingTaskNode reportingTaskNode) {
final BundleCoordinate bundleCoordinate = reportingTaskNode.getBundleCoordinate();
final List<Bundle> compatibleBundles = extensionManager.getBundles(reportingTaskNode.getCanonicalClassName()).stream().filter(bundle -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterContextLookup;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.flow.FlowRegistryUtil;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
Expand Down Expand Up @@ -101,6 +103,7 @@
import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.EntityFactory;
import org.apache.nifi.web.api.dto.ParameterContextDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
Expand All @@ -115,6 +118,7 @@
import org.apache.nifi.web.api.entity.ConnectorEntity;
import org.apache.nifi.web.api.entity.CopyRequestEntity;
import org.apache.nifi.web.api.entity.CopyResponseEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.SecretsEntity;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
Expand Down Expand Up @@ -2075,4 +2079,100 @@ public void testGetConnectorClusterNodeRequest() {
assertNotNull(entity.getComponent(), "Component should be populated when clusterNodeRequest is true");
assertEquals("RUNNING", entity.getComponent().getState());
}

@Test
public void testGetConnectorParameterContextReturnsEntityWhenContextBound() {
final String connectorId = "connector-id";
final String processGroupId = "process-group-id";
final String parameterContextId = "parameter-context-id";

final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
final DtoFactory dtoFactory = mock(DtoFactory.class);
final RevisionManager revisionManager = mock(RevisionManager.class);
final EntityFactory entityFactory = new EntityFactory();
serviceFacade.setConnectorDAO(connectorDAO);
serviceFacade.setDtoFactory(dtoFactory);
serviceFacade.setRevisionManager(revisionManager);
serviceFacade.setEntityFactory(entityFactory);

final ConnectorNode connectorNode = mock(ConnectorNode.class);
final FrameworkFlowContext flowContext = mock(FrameworkFlowContext.class);
final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
final ProcessGroup targetProcessGroup = mock(ProcessGroup.class);
final ParameterContext parameterContext = mock(ParameterContext.class);

when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode);
when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
when(managedProcessGroup.findProcessGroup(processGroupId)).thenReturn(targetProcessGroup);
when(targetProcessGroup.getParameterContext()).thenReturn(parameterContext);
when(parameterContext.getIdentifier()).thenReturn(parameterContextId);

final ParameterContextDTO parameterContextDto = new ParameterContextDTO();
parameterContextDto.setId(parameterContextId);
parameterContextDto.setName("context-name");
when(dtoFactory.createParameterContextDto(eq(parameterContext), eq(revisionManager), eq(true), any(ParameterContextLookup.class)))
.thenReturn(parameterContextDto);
when(dtoFactory.createPermissionsDto(eq(parameterContext), any())).thenReturn(null);
when(dtoFactory.createRevisionDTO(any(Revision.class))).thenReturn(new RevisionDTO());
when(revisionManager.getRevision(parameterContextId)).thenReturn(new Revision(1L, null, parameterContextId));

final ParameterContextEntity entity = serviceFacade.getConnectorParameterContext(connectorId, processGroupId);

assertNotNull(entity);
assertEquals(parameterContextId, entity.getId());

final ArgumentCaptor<ParameterContextLookup> lookupCaptor = ArgumentCaptor.forClass(ParameterContextLookup.class);
verify(dtoFactory).createParameterContextDto(eq(parameterContext), eq(revisionManager), eq(true), lookupCaptor.capture());
assertNotNull(lookupCaptor.getValue());
assertNull(lookupCaptor.getValue().getParameterContext("any-id"));
assertFalse(lookupCaptor.getValue().hasParameterContext("any-id"));
}

@Test
public void testGetConnectorParameterContextReturnsNullWhenNoBoundContext() {
final String connectorId = "connector-id";
final String processGroupId = "process-group-id";

final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
final DtoFactory dtoFactory = mock(DtoFactory.class);
serviceFacade.setConnectorDAO(connectorDAO);
serviceFacade.setDtoFactory(dtoFactory);

final ConnectorNode connectorNode = mock(ConnectorNode.class);
final FrameworkFlowContext flowContext = mock(FrameworkFlowContext.class);
final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
final ProcessGroup targetProcessGroup = mock(ProcessGroup.class);

when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode);
when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
when(managedProcessGroup.findProcessGroup(processGroupId)).thenReturn(targetProcessGroup);
when(targetProcessGroup.getParameterContext()).thenReturn(null);

final ParameterContextEntity entity = serviceFacade.getConnectorParameterContext(connectorId, processGroupId);

assertNull(entity);
Mockito.verifyNoInteractions(dtoFactory);
}

@Test
public void testGetConnectorParameterContextThrowsWhenProcessGroupNotFound() {
final String connectorId = "connector-id";
final String processGroupId = "missing-process-group";

final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
serviceFacade.setConnectorDAO(connectorDAO);

final ConnectorNode connectorNode = mock(ConnectorNode.class);
final FrameworkFlowContext flowContext = mock(FrameworkFlowContext.class);
final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);

when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode);
when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
when(managedProcessGroup.findProcessGroup(processGroupId)).thenReturn(null);

assertThrows(ResourceNotFoundException.class, () -> serviceFacade.getConnectorParameterContext(connectorId, processGroupId));
}
}
Loading
Loading