From ff1088c6867f7d72e1fe6d976e7ec9dea460914e Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Mon, 26 Jan 2026 10:53:47 +0100 Subject: [PATCH] MINIFICPP-2688 Move site-to-site tests to modular docker tests --- .../containers/nifi_container.py | 25 +- .../resources/nifi/convert_cert_to_jks.sh | 45 +++ .../minifi/connection.py | 1 + .../minifi/flow_definition.py | 60 ++++ .../minifi/input_port.py | 41 +++ .../minifi/minifi_flow_definition.py | 9 +- .../minifi/nifi_flow_definition.py | 22 +- .../minifi/output_port.py | 41 +++ .../minifi/remote_port.py | 33 +++ .../minifi/remote_process_group.py | 44 +++ .../steps/configuration_steps.py | 5 + .../minifi_test_framework/steps/core_steps.py | 10 + .../steps/flow_building_steps.py | 66 +++++ .../tests}/features/s2s.feature | 276 +++++++----------- 14 files changed, 496 insertions(+), 182 deletions(-) create mode 100755 behave_framework/src/minifi_test_framework/containers/resources/nifi/convert_cert_to_jks.sh create mode 100644 behave_framework/src/minifi_test_framework/minifi/input_port.py create mode 100644 behave_framework/src/minifi_test_framework/minifi/output_port.py create mode 100644 behave_framework/src/minifi_test_framework/minifi/remote_port.py create mode 100644 behave_framework/src/minifi_test_framework/minifi/remote_process_group.py rename {docker/test/integration => extensions/standard-processors/tests}/features/s2s.feature (53%) diff --git a/behave_framework/src/minifi_test_framework/containers/nifi_container.py b/behave_framework/src/minifi_test_framework/containers/nifi_container.py index 396b1e1a75..4f6fff3199 100644 --- a/behave_framework/src/minifi_test_framework/containers/nifi_container.py +++ b/behave_framework/src/minifi_test_framework/containers/nifi_container.py @@ -15,22 +15,27 @@ import io import gzip -from typing import List, Optional +import logging +import os +from pathlib import Path +from OpenSSL import crypto + from minifi_test_framework.containers.file import File from minifi_test_framework.containers.container import Container from minifi_test_framework.core.helpers import wait_for_condition from minifi_test_framework.core.minifi_test_context import MinifiTestContext from minifi_test_framework.minifi.nifi_flow_definition import NifiFlowDefinition +from minifi_test_framework.containers.host_file import HostFile +from minifi_test_framework.core.ssl_utils import make_server_cert class NifiContainer(Container): - NIFI_VERSION = '2.7.2' - - def __init__(self, test_context: MinifiTestContext, command: Optional[List[str]] = None, use_ssl: bool = False): + def __init__(self, test_context: MinifiTestContext, command: list[str] | None = None, use_ssl: bool = False): self.flow_definition = NifiFlowDefinition() name = f"nifi-{test_context.scenario_id}" if use_ssl: - entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' " + entry_command = (r"/scripts/convert_cert_to_jks.sh /tmp/resources /tmp/resources/nifi_client.key /tmp/resources/nifi_client.crt /tmp/resources/root_ca.crt &&" + r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' " r"-e 's/^\(nifi.remote.input.secure\)=.*/\1=true/' " r"-e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' " r"-e 's/^\(nifi.web.https.port\)=.*/\1=8443/' " @@ -68,10 +73,18 @@ def __init__(self, test_context: MinifiTestContext, command: Optional[List[str]] if not command: command = ["/bin/sh", "-c", entry_command] - super().__init__("apache/nifi:" + self.NIFI_VERSION, name, test_context.network, entrypoint=command) + super().__init__("apache/nifi:" + NifiFlowDefinition.NIFI_VERSION, name, test_context.network, entrypoint=command) + resource_dir = Path(__file__).resolve().parent / "resources" / "nifi" + self.host_files.append(HostFile("/scripts/convert_cert_to_jks.sh", os.path.join(resource_dir, "convert_cert_to_jks.sh"))) + + nifi_client_cert, nifi_client_key = make_server_cert(common_name=f"nifi-{test_context.scenario_id}", ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key) + self.files.append(File("/tmp/resources/root_ca.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=test_context.root_ca_cert))) + self.files.append(File("/tmp/resources/nifi_client.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=nifi_client_cert))) + self.files.append(File("/tmp/resources/nifi_client.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=nifi_client_key))) def deploy(self): flow_config = self.flow_definition.to_json() + logging.info(f"Deploying NiFi container '{self.container_name}' with flow configuration:\n{flow_config}") buffer = io.BytesIO() with gzip.GzipFile(fileobj=buffer, mode='wb') as gz_file: diff --git a/behave_framework/src/minifi_test_framework/containers/resources/nifi/convert_cert_to_jks.sh b/behave_framework/src/minifi_test_framework/containers/resources/nifi/convert_cert_to_jks.sh new file mode 100755 index 0000000000..33c37b677f --- /dev/null +++ b/behave_framework/src/minifi_test_framework/containers/resources/nifi/convert_cert_to_jks.sh @@ -0,0 +1,45 @@ +#!/bin/bash +set -euo pipefail + +# Usage: ./create_jks.sh + +DIR=$1 +SSL_KEY_PATH=$2 +SSL_CERT_PATH=$3 +CA_CERT_PATH=$4 + +KEYSTORE="$DIR/keystore.jks" +TRUSTSTORE="$DIR/truststore.jks" +PKCS12_FILE="$DIR/keystore.p12" +PASSWORD="passw0rd1!" + +cat "${CA_CERT_PATH}" >> "${SSL_CERT_PATH}" + +if [ ! -d "$DIR" ]; then + mkdir -p "$DIR" +fi + +openssl pkcs12 -export \ + -inkey "$SSL_KEY_PATH" \ + -in "$SSL_CERT_PATH" \ + -name "nifi-key" \ + -out "$PKCS12_FILE" \ + -password pass:$PASSWORD + +keytool -importkeystore \ + -destkeystore "$KEYSTORE" \ + -deststoretype jks \ + -destalias nifi-key \ + -srckeystore "$PKCS12_FILE" \ + -srcstoretype pkcs12 \ + -srcalias "nifi-key" \ + -storepass "$PASSWORD" \ + -srcstorepass "$PASSWORD" \ + -noprompt + +keytool -importcert \ + -alias "nifi-cert" \ + -file "$CA_CERT_PATH" \ + -keystore "$TRUSTSTORE" \ + -storepass "$PASSWORD" \ + -noprompt diff --git a/behave_framework/src/minifi_test_framework/minifi/connection.py b/behave_framework/src/minifi_test_framework/minifi/connection.py index 8674dfe3b4..7e443e59c3 100644 --- a/behave_framework/src/minifi_test_framework/minifi/connection.py +++ b/behave_framework/src/minifi_test_framework/minifi/connection.py @@ -24,6 +24,7 @@ def __init__(self, source_name: str, source_relationship: str, target_name: str) self.source_name: str = source_name self.source_relationship: str = source_relationship self.target_name: str = target_name + self.drop_empty_flowfiles: bool = False def __repr__(self): return f"({self.source_name}:{self.source_relationship} --> {self.target_name})" diff --git a/behave_framework/src/minifi_test_framework/minifi/flow_definition.py b/behave_framework/src/minifi_test_framework/minifi/flow_definition.py index cbaa4b8de6..a636cf353a 100644 --- a/behave_framework/src/minifi_test_framework/minifi/flow_definition.py +++ b/behave_framework/src/minifi_test_framework/minifi/flow_definition.py @@ -22,6 +22,9 @@ from .funnel import Funnel from .parameter_context import ParameterContext from .processor import Processor +from .remote_process_group import RemoteProcessGroup +from .input_port import InputPort +from .output_port import OutputPort class FlowDefinition(ABC): @@ -32,10 +35,59 @@ def __init__(self, flow_name: str = "MiNiFi Flow"): self.funnels: list[Funnel] = [] self.connections: list[Connection] = [] self.parameter_contexts: list[ParameterContext] = [] + self.remote_process_groups: list[RemoteProcessGroup] = [] + self.input_ports: list[InputPort] = [] + self.output_ports: list[OutputPort] = [] def add_processor(self, processor: Processor): self.processors.append(processor) + def add_remote_process_group(self, address: str, name: str, protocol: str = "RAW"): + rpg = RemoteProcessGroup(name, address, protocol) + self.remote_process_groups.append(rpg) + + def add_input_port_to_rpg(self, rpg_name: str, port_name: str, use_compression: bool = False): + rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None) + if rpg: + rpg.add_input_port(port_name, use_compression) + else: + raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.") + + def add_output_port_to_rpg(self, rpg_name: str, port_name: str, use_compression: bool = False): + rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None) + if rpg: + rpg.add_output_port(port_name, use_compression) + else: + raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.") + + def get_input_port_id_of_rpg(self, rpg_name: str, rpg_port_name: str) -> str: + rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None) + if rpg: + port = next((port for port in rpg.input_ports if port.name == rpg_port_name), None) + if port: + return port.id + else: + raise ValueError(f"InputPort with name '{rpg_port_name}' not found in RPG '{rpg_name}'.") + else: + raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.") + + def get_output_port_id_of_rpg(self, rpg_name: str, rpg_port_name: str) -> str: + rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None) + if rpg: + port = next((port for port in rpg.output_ports if port.name == rpg_port_name), None) + if port: + return port.id + else: + raise ValueError(f"OutputPort with name '{rpg_port_name}' not found in RPG '{rpg_name}'.") + else: + raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.") + + def add_input_port(self, input_port_id: str, input_port_name: str): + self.input_ports.append(InputPort(input_port_name, input_port_id)) + + def add_output_port(self, output_port_id: str, output_port_name: str): + self.output_ports.append(OutputPort(output_port_name, output_port_id)) + def get_processor(self, processor_name: str) -> Processor | None: return next((proc for proc in self.processors if proc.name == processor_name), None) @@ -46,12 +98,20 @@ def get_parameter_context(self, parameter_context_name: str) -> ParameterContext return next((parameter_context for parameter_context in self.parameter_contexts if parameter_context.name == parameter_context_name), None) + def get_remote_process_group(self, rpg_name: str) -> RemoteProcessGroup | None: + return next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None) + def add_funnel(self, funnel: Funnel): self.funnels.append(funnel) def add_connection(self, connection: Connection): self.connections.append(connection) + def set_drop_empty_for_destination(self, destination_name: str): + for connection in self.connections: + if connection.target_name == destination_name: + connection.drop_empty_flowfiles = True + def to_yaml(self) -> str: raise NotImplementedError("to_yaml method must be implemented in subclasses") diff --git a/behave_framework/src/minifi_test_framework/minifi/input_port.py b/behave_framework/src/minifi_test_framework/minifi/input_port.py new file mode 100644 index 0000000000..1af6d20972 --- /dev/null +++ b/behave_framework/src/minifi_test_framework/minifi/input_port.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class InputPort: + def __init__(self, name: str, id: str): + self.id: str = id + self.name: str = name + + def to_json_dict(self): + data = { + "identifier": self.id, + "instanceIdentifier": self.id, + "name": self.name, + "comments": "", + "position": { + "x": 0, + "y": 0 + }, + "type": "INPUT_PORT", + "concurrentlySchedulableTaskCount": 1, + "scheduledState": "RUNNING", + "allowRemoteAccess": True, + "portFunction": "STANDARD", + "componentType": "INPUT_PORT", + "groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8" + } + return data diff --git a/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py b/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py index 469a3b8c17..e01a91880a 100644 --- a/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py +++ b/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py @@ -31,8 +31,10 @@ def to_yaml(self) -> str: # This is crucial for finding the source/destination IDs for connections processors_by_name = {p.name: p for p in self.processors} funnels_by_name = {f.name: f for f in self.funnels} + remote_input_ports_by_name = {port.name: port for rpg in self.remote_process_groups for port in rpg.input_ports} + remote_output_ports_by_name = {port.name: port for rpg in self.remote_process_groups for port in rpg.output_ports} - connectables_by_name = {**processors_by_name, **funnels_by_name} + connectables_by_name = {**processors_by_name, **funnels_by_name, **remote_input_ports_by_name, **remote_output_ports_by_name} if len(self.parameter_contexts) > 0: parameter_context_name = self.parameter_contexts[0].name @@ -44,7 +46,8 @@ def to_yaml(self) -> str: 'Processors': [p.to_yaml_dict() for p in self.processors], 'Funnels': [f.to_yaml_dict() for f in self.funnels], 'Connections': [], 'Controller Services': [c.to_yaml_dict() for c in self.controller_services], - 'Remote Processing Groups': [], 'Parameter Context Name': parameter_context_name} + 'Remote Processing Groups': [rpg.to_yaml_dict() for rpg in self.remote_process_groups], + 'Parameter Context Name': parameter_context_name} # Build the connections list by looking up processor IDs for conn in self.connections: @@ -58,6 +61,6 @@ def to_yaml(self) -> str: config['Connections'].append( {'name': f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}", 'id': conn.id, 'source id': source_proc.id, 'source relationship name': conn.source_relationship, - 'destination id': dest_proc.id}) + 'destination id': dest_proc.id, "drop empty": conn.drop_empty_flowfiles}) return yaml.dump(config, sort_keys=False, indent=2, width=120) diff --git a/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py b/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py index 8e7faab5ad..a41e248bcf 100644 --- a/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py +++ b/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py @@ -21,7 +21,7 @@ class NifiFlowDefinition(FlowDefinition): - NIFI_VERSION: str = '2.2.0' + NIFI_VERSION: str = '2.7.2' def __init__(self, flow_name: str = "NiFi Flow"): super().__init__(flow_name) @@ -113,9 +113,22 @@ def to_json(self) -> str: connections_node = config["rootGroup"]["connections"] + processors_by_name = {p.name: p for p in self.processors} + input_ports_by_name = {port.name: port for port in self.input_ports} + output_ports_by_name = {port.name: port for port in self.output_ports} + for conn in self.connections: + source_type = "PROCESSOR" source_proc = processors_by_name.get(conn.source_name) + if not source_proc: + source_proc = input_ports_by_name.get(conn.source_name) + source_type = "INPUT_PORT" + conn.source_relationship = "" + dest_type = "PROCESSOR" dest_proc = processors_by_name.get(conn.target_name) + if not dest_proc: + dest_proc = output_ports_by_name.get(conn.target_name) + dest_type = "OUTPUT_PORT" if not source_proc or not dest_proc: raise ValueError( f"Could not find processors for connection from '{conn.source_name}' to '{conn.target_name}'") @@ -126,7 +139,7 @@ def to_json(self) -> str: "name": f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}", "source": { "id": source_proc.id, - "type": "PROCESSOR", + "type": source_type, "groupId": "9802c873-3322-3b60-a71d-732d02bd60f8", "name": conn.source_name, "comments": "", @@ -134,7 +147,7 @@ def to_json(self) -> str: }, "destination": { "id": dest_proc.id, - "type": "PROCESSOR", + "type": dest_type, "groupId": "9802c873-3322-3b60-a71d-732d02bd60f8", "name": dest_proc.name, "comments": "", @@ -155,4 +168,7 @@ def to_json(self) -> str: "groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8" }) + config["rootGroup"]["inputPorts"] = [input_port.to_json_dict() for input_port in self.input_ports] + config["rootGroup"]["outputPorts"] = [output_port.to_json_dict() for output_port in self.output_ports] + return json.dumps(config) diff --git a/behave_framework/src/minifi_test_framework/minifi/output_port.py b/behave_framework/src/minifi_test_framework/minifi/output_port.py new file mode 100644 index 0000000000..ca2ce57272 --- /dev/null +++ b/behave_framework/src/minifi_test_framework/minifi/output_port.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class OutputPort: + def __init__(self, name: str, id: str): + self.id: str = id + self.name: str = name + + def to_json_dict(self): + data = { + "identifier": self.id, + "instanceIdentifier": self.id, + "name": self.name, + "comments": "", + "position": { + "x": 0, + "y": 0 + }, + "type": "OUTPUT_PORT", + "concurrentlySchedulableTaskCount": 1, + "scheduledState": "RUNNING", + "allowRemoteAccess": True, + "portFunction": "STANDARD", + "componentType": "OUTPUT_PORT", + "groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8" + } + return data diff --git a/behave_framework/src/minifi_test_framework/minifi/remote_port.py b/behave_framework/src/minifi_test_framework/minifi/remote_port.py new file mode 100644 index 0000000000..5e0d7ef677 --- /dev/null +++ b/behave_framework/src/minifi_test_framework/minifi/remote_port.py @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import uuid + + +class RemotePort: + def __init__(self, name: str, use_compression: bool): + self.id: str = str(uuid.uuid4()) + self.name: str = name + self.use_compression: bool = use_compression + self.properties: dict[str, str] = {} + + def add_property(self, property_name: str, property_value: str): + self.properties[property_name] = property_value + + def to_yaml_dict(self): + data = {'id': self.id, 'name': self.name, 'use compression': self.use_compression, 'max concurrent tasks': 1, + 'Properties': self.properties} + return data diff --git a/behave_framework/src/minifi_test_framework/minifi/remote_process_group.py b/behave_framework/src/minifi_test_framework/minifi/remote_process_group.py new file mode 100644 index 0000000000..ae8b697535 --- /dev/null +++ b/behave_framework/src/minifi_test_framework/minifi/remote_process_group.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import uuid +from .remote_port import RemotePort + + +class RemoteProcessGroup: + def __init__(self, name: str, address: str, protocol: str): + self.id: str = str(uuid.uuid4()) + self.name: str = name + self.address: str = address + self.protocol: str = protocol + self.input_ports: list[RemotePort] = [] + self.output_ports: list[RemotePort] = [] + self.properties: dict[str, str] = {} + + def add_input_port(self, port_name: str, use_compression: bool = False): + self.input_ports.append(RemotePort(port_name, use_compression)) + + def add_output_port(self, port_name: str, use_compression: bool = False): + self.output_ports.append(RemotePort(port_name, use_compression)) + + def get_input_port(self, port_name: str) -> RemotePort | None: + return next((port for port in self.input_ports if port.name == port_name), None) + + def to_yaml_dict(self): + data = {'id': self.id, 'name': self.name, 'timeout': '30 sec', 'transport protocol': self.protocol, + 'url': self.address, 'yield period': '3 sec', 'Input Ports': [port.to_yaml_dict() for port in self.input_ports], + 'Output Ports': [port.to_yaml_dict() for port in self.output_ports]} + return data diff --git a/behave_framework/src/minifi_test_framework/steps/configuration_steps.py b/behave_framework/src/minifi_test_framework/steps/configuration_steps.py index 6bedc791c5..878d7d8e0f 100644 --- a/behave_framework/src/minifi_test_framework/steps/configuration_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/configuration_steps.py @@ -50,6 +50,11 @@ def step_impl(context: MinifiTestContext): context.get_or_create_default_minifi_container().fetch_flow_config_from_flow_url() +@given("SSL properties are set in MiNiFi") +def step_impl(context: MinifiTestContext): + context.get_or_create_default_minifi_container().set_up_ssl_proprties() + + @given("ssl properties are set up for MiNiFi C2 server") def step_impl(context: MinifiTestContext): context.get_or_create_default_minifi_container().enable_c2_with_ssl() diff --git a/behave_framework/src/minifi_test_framework/steps/core_steps.py b/behave_framework/src/minifi_test_framework/steps/core_steps.py index 72ef1c1e98..1e57b56b98 100644 --- a/behave_framework/src/minifi_test_framework/steps/core_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/core_steps.py @@ -147,6 +147,16 @@ def step_impl(context: MinifiTestContext): context.containers["nifi"] = NifiContainer(context) +@step("a NiFi container is set up with SSL enabled") +def step_impl(context: MinifiTestContext): + context.containers["nifi"] = NifiContainer(context, use_ssl=True) + + +@when(u'NiFi is started') +def step_impl(context): + assert context.containers["nifi"].deploy() or context.containers["nifi"].log_app_output() + + @given("a MiNiFi C2 server is set up") def step_impl(context: MinifiTestContext): context.containers["minifi-c2-server"] = MinifiC2Server(context) diff --git a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py index 770f222a7c..a8c07c3be4 100644 --- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py @@ -99,6 +99,17 @@ def step_impl(context: MinifiTestContext, processor_type: str): context.execute_steps(f'given a {processor_type} processor in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow') +@given('the "{property_name}" property of the "{port_name}" port in the "{rpg_name}" remote process group is set to "{property_value}"') +def step_impl(context: MinifiTestContext, property_name: str, port_name: str, rpg_name: str, property_value: str): + rpg = context.get_or_create_minifi_container(DEFAULT_MINIFI_CONTAINER_NAME).flow_definition.get_remote_process_group(rpg_name) + if not rpg: + raise ValueError(f"Remote Process Group with name {rpg_name} not found") + port = rpg.get_input_port(port_name) + if not port: + raise ValueError(f"Remote input port with name {port_name} not found in RPG {rpg_name}") + port.add_property(property_name, property_value) + + @given('the "{property_name}" property of the {processor_name} processor is set to "{property_value}" in the "{minifi_container_name}" flow') def step_impl(context: MinifiTestContext, property_name: str, processor_name: str, property_value: str, minifi_container_name: str): processor = context.get_or_create_minifi_container(minifi_container_name).flow_definition.get_processor(processor_name) @@ -138,6 +149,7 @@ def step_impl(context: MinifiTestContext, funnel_name: str): @step('in the "{minifi_container_name}" flow the "{relationship_name}" relationship of the {source} processor is connected to the {target}') +@step('in the "{minifi_container_name}" flow the "{relationship_name}" relationship of the {source} node is connected to the {target}') def step_impl(context: MinifiTestContext, relationship_name: str, source: str, target: str, minifi_container_name: str): connection = Connection(source_name=source, source_relationship=relationship_name, target_name=target) context.get_or_create_minifi_container(minifi_container_name).flow_definition.add_connection(connection) @@ -150,10 +162,17 @@ def step_impl(context: MinifiTestContext, relationship_name: str, source: str, t @step('the "{relationship_name}" relationship of the {source} processor is connected to the {target}') +@step('the "{relationship_name}" relationship of the {source} node is connected to the {target}') def step_impl(context: MinifiTestContext, relationship_name: str, source: str, target: str): context.execute_steps(f'given in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow the "{relationship_name}" relationship of the {source} processor is connected to the {target}') +@step("the output port \"{port_name}\" is connected to the {destination_name} processor") +def step_impl(context: MinifiTestContext, port_name: str, destination_name: str): + connection = Connection(source_name=port_name, source_relationship="undefined", target_name=destination_name) + context.get_or_create_minifi_container(DEFAULT_MINIFI_CONTAINER_NAME).flow_definition.add_connection(connection) + + @step('the Funnel with the name "{funnel_name}" is connected to the {target}') def step_impl(context: MinifiTestContext, funnel_name: str, target: str): connection = Connection(source_name=funnel_name, source_relationship="success", target_name=target) @@ -351,3 +370,50 @@ def step_impl(context: MinifiTestContext, processor_name): add_ssl_context_service_for_minifi(context, "minifi_client", use_system_cert_store=True) processor = context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name) processor.add_property('SSL Context Service', 'SSLContextService') + + +@given("a RemoteProcessGroup node with name \"{rpg_name}\" is opened on \"{address}\" with transport protocol set to \"{transport_protocol}\"") +def step_impl(context: MinifiTestContext, rpg_name: str, address: str, transport_protocol: str): + context.get_or_create_default_minifi_container().flow_definition.add_remote_process_group(address, rpg_name, transport_protocol) + + +@given("a RemoteProcessGroup node with name \"{rpg_name}\" is opened on \"{address}\"") +def step_impl(context: MinifiTestContext, rpg_name: str, address: str): + context.execute_steps(f"given a RemoteProcessGroup node with name \"{rpg_name}\" is opened on \"{address}\" with transport protocol set to \"RAW\"") + + +@given("an input port with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"") +def step_impl(context: MinifiTestContext, port_name: str, rpg_name: str): + context.get_or_create_default_minifi_container().flow_definition.add_input_port_to_rpg(rpg_name, port_name) + + +@given("an input port using compression with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"") +def step_impl(context: MinifiTestContext, port_name: str, rpg_name: str): + context.get_or_create_default_minifi_container().flow_definition.add_input_port_to_rpg(rpg_name, port_name, use_compression=True) + + +@given("an output port with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"") +def step_impl(context: MinifiTestContext, port_name: str, rpg_name: str): + context.get_or_create_default_minifi_container().flow_definition.add_output_port_to_rpg(rpg_name, port_name) + + +@given("an output port using compression with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"") +def step_impl(context: MinifiTestContext, port_name: str, rpg_name: str): + context.get_or_create_default_minifi_container().flow_definition.add_output_port_to_rpg(rpg_name, port_name, use_compression=True) + + +@given("a NiFi flow is receiving data from the RemoteProcessGroup named \"{rpg_name}\" in an input port named \"{input_port_name}\" which has the same id as the port named \"{rpg_port_name}\"") +def step_impl(context: MinifiTestContext, input_port_name: str, rpg_port_name: str, rpg_name: str): + input_port_id = context.get_or_create_default_minifi_container().flow_definition.get_input_port_id_of_rpg(rpg_name, rpg_port_name) + context.containers["nifi"].flow_definition.add_input_port(input_port_id, input_port_name) + + +@given("a NiFi flow is sending data to an output port named \"{port_name}\" with the id of the port named \"{rpg_port_name}\" from the RemoteProcessGroup named \"{rpg_name}\"") +def step_impl(context: MinifiTestContext, port_name: str, rpg_port_name: str, rpg_name: str): + output_port_id = context.get_or_create_default_minifi_container().flow_definition.get_output_port_id_of_rpg(rpg_name, rpg_port_name) + context.containers["nifi"].flow_definition.add_output_port(output_port_id, port_name) + + +@given("the connection going to {destination} has \"drop empty\" set") +def step_impl(context: MinifiTestContext, destination: str): + context.get_or_create_default_minifi_container().flow_definition.set_drop_empty_for_destination(destination) diff --git a/docker/test/integration/features/s2s.feature b/extensions/standard-processors/tests/features/s2s.feature similarity index 53% rename from docker/test/integration/features/s2s.feature rename to extensions/standard-processors/tests/features/s2s.feature index 9b3321d4f9..4e56b4affa 100644 --- a/docker/test/integration/features/s2s.feature +++ b/extensions/standard-processors/tests/features/s2s.feature @@ -19,354 +19,290 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol As a user of MiNiFi I need to have RemoteProcessGroup flow nodes - Background: - Given the content of "/tmp/output" is monitored - Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" + And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" And the "success" relationship of the GetFile processor is connected to the to_nifi + And a NiFi container is set up And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile + And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "nifi" flow + And PutFile's failure relationship is auto-terminated in the "nifi" flow When NiFi is started And all instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then in the "nifi" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: A MiNiFi instance produces and transfers a large data file to a NiFi instance via s2s Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content "this is a very long file we want to send by site-to-site" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" + And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" And the "success" relationship of the GetFile processor is connected to the to_nifi + And a NiFi container is set up And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile + And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "nifi" flow + And PutFile's failure relationship is auto-terminated in the "nifi" flow When NiFi is started And all instances start up - Then a flowfile with the content "this is a very long file we want to send by site-to-site" is placed in the monitored directory in less than 90 seconds + Then in the "nifi" container a single file with the content "this is a very long file we want to send by site-to-site" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: Zero length files are transfered between via s2s if the "drop empty" connection property is false + # "drop empty" is only supported with yaml config Given a MiNiFi CPP server with yaml config And a GenerateFlowFile processor with the "File Size" property set to "0B" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" + And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" And the "success" relationship of the GenerateFlowFile processor is connected to the to_nifi + And a NiFi container is set up And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile + And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "nifi" flow + And PutFile's failure relationship is auto-terminated in the "nifi" flow When NiFi is started And all instances start up - Then at least one empty flowfile is placed in the monitored directory in less than 90 seconds + Then in the "nifi" container at least one empty file is placed in the "/tmp/output" directory in less than 90 seconds Scenario: Zero length files are not transfered between via s2s if the "drop empty" connection property is true # "drop empty" is only supported with yaml config Given a MiNiFi CPP server with yaml config And a GenerateFlowFile processor with the "File Size" property set to "0B" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" + And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" And the "success" relationship of the GenerateFlowFile processor is connected to the to_nifi - And the connection going to the RemoteProcessGroup has "drop empty" set + And the connection going to to_nifi has "drop empty" set + And a NiFi container is set up And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile + And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "nifi" flow + And PutFile's failure relationship is auto-terminated in the "nifi" flow When NiFi is started And all instances start up - Then no files are placed in the monitored directory in 50 seconds of running time + Then in the "nifi" container no files are placed in the "/tmp/output" directory in 50 seconds of running time Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using SSL - Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And the "Keep Source File" property of the GetFile processor is set to "true" - And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi" - And a SSL context service is set up for the following remote process group: "RemoteProcessGroup" - And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile processor is connected to the to_nifi - - And SSL is enabled in NiFi flow - And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" - And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile - - When NiFi is started - And all instances start up - - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds - And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds - - Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using SSL with YAML config - Given a MiNiFi CPP server with yaml config + Given an ssl context service is set up And a GetFile processor with the "Input Directory" property set to "/tmp/input" And the "Keep Source File" property of the GetFile processor is set to "true" And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi" - And a SSL context service is set up for the following remote process group: "RemoteProcessGroup" + And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${scenario_id}:8443/nifi" And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" + And the "SSL Context Service" property of the "to_nifi" port in the "RemoteProcessGroup" remote process group is set to "SSLContextService" And the "success" relationship of the GetFile processor is connected to the to_nifi - And SSL is enabled in NiFi flow + And a NiFi container is set up with SSL enabled And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile + And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "nifi" flow + And PutFile's failure relationship is auto-terminated in the "nifi" flow When NiFi is started And all instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then in the "nifi" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using SSL config defined in minifi.properties Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And the "Keep Source File" property of the GetFile processor is set to "true" And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi" + And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${scenario_id}:8443/nifi" And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" And the "success" relationship of the GetFile processor is connected to the to_nifi And SSL properties are set in MiNiFi - And SSL is enabled in NiFi flow + And a NiFi container is set up with SSL enabled And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile + And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "nifi" flow + And PutFile's failure relationship is auto-terminated in the "nifi" flow When NiFi is started And all instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds - And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds - - Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using YAML config and SSL config defined in minifi.properties - Given a MiNiFi CPP server with yaml config - And a GetFile processor with the "Input Directory" property set to "/tmp/input" - And the "Keep Source File" property of the GetFile processor is set to "true" - And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi" - And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile processor is connected to the to_nifi - And SSL properties are set in MiNiFi - - And SSL is enabled in NiFi flow - And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" - And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile - - When NiFi is started - And all instances start up - - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then in the "nifi" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using HTTP protocol Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" with transport protocol set to "HTTP" + And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" with transport protocol set to "HTTP" And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" And the "success" relationship of the GetFile processor is connected to the to_nifi + And a NiFi container is set up And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile + And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "nifi" flow + And PutFile's failure relationship is auto-terminated in the "nifi" flow When NiFi is started And all instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds - And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds - - Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using HTTP with YAML config and SSL config defined in minifi.properties - Given a MiNiFi CPP server with yaml config - And a GetFile processor with the "Input Directory" property set to "/tmp/input" - And the "Keep Source File" property of the GetFile processor is set to "true" - And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi" with transport protocol set to "HTTP" - And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile processor is connected to the to_nifi - And SSL properties are set in MiNiFi - - And SSL is enabled in NiFi flow - And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" - And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile - - When NiFi is started - And all instances start up - - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then in the "nifi" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s - Given a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" + Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" And an output port with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And "from_nifi" port is a start node And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the output port "from_nifi" is connected to the PutFile processor + And PutFile's success relationship is auto-terminated - And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine + And a NiFi container is set up + And a file with the content "test" is present in "/tmp/input" in the "nifi" flow + And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi + And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi When NiFi is started And all instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s using HTTP protocol - Given a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" with transport protocol set to "HTTP" + Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" with transport protocol set to "HTTP" And an output port with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And "from_nifi" port is a start node And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the output port "from_nifi" is connected to the PutFile processor + And PutFile's success relationship is auto-terminated - And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine + And a NiFi container is set up + And a file with the content "test" is present in "/tmp/input" in the "nifi" flow + And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi + And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi When NiFi is started And all instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s with SSL config defined in minifi.properties - Given a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi" + Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${scenario_id}:8443/nifi" And an output port with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And "from_nifi" port is a start node And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the output port "from_nifi" is connected to the PutFile processor And SSL properties are set in MiNiFi + And PutFile's success relationship is auto-terminated - And SSL is enabled in NiFi flow - And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine + And a NiFi container is set up with SSL enabled + And a file with the content "test" is present in "/tmp/input" in the "nifi" flow + And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi + And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi When NiFi is started And all instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s using HTTP protocol with SSL config defined in minifi.properties - Given a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi" with transport protocol set to "HTTP" + Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${scenario_id}:8443/nifi" with transport protocol set to "HTTP" And an output port with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And "from_nifi" port is a start node And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the output port "from_nifi" is connected to the PutFile processor And SSL properties are set in MiNiFi + And PutFile's success relationship is auto-terminated - And SSL is enabled in NiFi flow - And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine + And a NiFi container is set up with SSL enabled + And a file with the content "test" is present in "/tmp/input" in the "nifi" flow + And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi + And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi When NiFi is started And all instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using compression Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" + And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" And an input port using compression with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" And the "success" relationship of the GetFile processor is connected to the to_nifi + And a NiFi container is set up And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile + And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "nifi" flow + And PutFile's failure relationship is auto-terminated in the "nifi" flow - When both instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds - And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds - - Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using compression in YAML config - Given a MiNiFi CPP server with yaml config - And a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" - And an input port using compression with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile processor is connected to the to_nifi - - And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi" - And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the from-minifi is connected to the PutFile + When NiFi is started + And all instances start up - When both instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then in the "nifi" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s using compression - Given a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" + Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" And an output port using compression with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And "from_nifi" port is a start node And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the output port "from_nifi" is connected to the PutFile processor + And PutFile's success relationship is auto-terminated - And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine + And a NiFi container is set up + And a file with the content "test" is present in "/tmp/input" in the "nifi" flow + And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi + And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi - When both instances start up + When NiFi is started + And all instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s using compression and HTTP protocol - Given a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" with transport protocol set to "HTTP" + Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" with transport protocol set to "HTTP" And an output port using compression with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And "from_nifi" port is a start node And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the output port "from_nifi" is connected to the PutFile processor + And PutFile's success relationship is auto-terminated - And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine + And a NiFi container is set up + And a file with the content "test" is present in "/tmp/input" in the "nifi" flow + And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi + And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi - When both instances start up - - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds - And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds - - Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s using compression with YAML config and SSL config defined in minifi.properties - Given a MiNiFi CPP server with yaml config - And a file with the content "test" is present in "/tmp/input" - And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi" - And an output port using compression with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup" - And "from_nifi" port is a start node - And a PutFile processor with the "Directory" property set to "/tmp/output" - And the output port "from_nifi" is connected to the PutFile processor - And SSL properties are set in MiNiFi - - And SSL is enabled in NiFi flow - And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine - And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup" - And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi - - When both instances start up + When NiFi is started + And all instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds