diff --git a/behave_framework/src/minifi_test_framework/containers/nifi_container.py b/behave_framework/src/minifi_test_framework/containers/nifi_container.py
index 396b1e1a75..4f6fff3199 100644
--- a/behave_framework/src/minifi_test_framework/containers/nifi_container.py
+++ b/behave_framework/src/minifi_test_framework/containers/nifi_container.py
@@ -15,22 +15,27 @@
import io
import gzip
-from typing import List, Optional
+import logging
+import os
+from pathlib import Path
+from OpenSSL import crypto
+
from minifi_test_framework.containers.file import File
from minifi_test_framework.containers.container import Container
from minifi_test_framework.core.helpers import wait_for_condition
from minifi_test_framework.core.minifi_test_context import MinifiTestContext
from minifi_test_framework.minifi.nifi_flow_definition import NifiFlowDefinition
+from minifi_test_framework.containers.host_file import HostFile
+from minifi_test_framework.core.ssl_utils import make_server_cert
class NifiContainer(Container):
- NIFI_VERSION = '2.7.2'
-
- def __init__(self, test_context: MinifiTestContext, command: Optional[List[str]] = None, use_ssl: bool = False):
+ def __init__(self, test_context: MinifiTestContext, command: list[str] | None = None, use_ssl: bool = False):
self.flow_definition = NifiFlowDefinition()
name = f"nifi-{test_context.scenario_id}"
if use_ssl:
- entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' "
+ entry_command = (r"/scripts/convert_cert_to_jks.sh /tmp/resources /tmp/resources/nifi_client.key /tmp/resources/nifi_client.crt /tmp/resources/root_ca.crt &&"
+ r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' "
r"-e 's/^\(nifi.remote.input.secure\)=.*/\1=true/' "
r"-e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' "
r"-e 's/^\(nifi.web.https.port\)=.*/\1=8443/' "
@@ -68,10 +73,18 @@ def __init__(self, test_context: MinifiTestContext, command: Optional[List[str]]
if not command:
command = ["/bin/sh", "-c", entry_command]
- super().__init__("apache/nifi:" + self.NIFI_VERSION, name, test_context.network, entrypoint=command)
+ super().__init__("apache/nifi:" + NifiFlowDefinition.NIFI_VERSION, name, test_context.network, entrypoint=command)
+ resource_dir = Path(__file__).resolve().parent / "resources" / "nifi"
+ self.host_files.append(HostFile("/scripts/convert_cert_to_jks.sh", os.path.join(resource_dir, "convert_cert_to_jks.sh")))
+
+ nifi_client_cert, nifi_client_key = make_server_cert(common_name=f"nifi-{test_context.scenario_id}", ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key)
+ self.files.append(File("/tmp/resources/root_ca.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=test_context.root_ca_cert)))
+ self.files.append(File("/tmp/resources/nifi_client.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=nifi_client_cert)))
+ self.files.append(File("/tmp/resources/nifi_client.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=nifi_client_key)))
def deploy(self):
flow_config = self.flow_definition.to_json()
+ logging.info(f"Deploying NiFi container '{self.container_name}' with flow configuration:\n{flow_config}")
buffer = io.BytesIO()
with gzip.GzipFile(fileobj=buffer, mode='wb') as gz_file:
diff --git a/behave_framework/src/minifi_test_framework/containers/resources/nifi/convert_cert_to_jks.sh b/behave_framework/src/minifi_test_framework/containers/resources/nifi/convert_cert_to_jks.sh
new file mode 100755
index 0000000000..33c37b677f
--- /dev/null
+++ b/behave_framework/src/minifi_test_framework/containers/resources/nifi/convert_cert_to_jks.sh
@@ -0,0 +1,45 @@
+#!/bin/bash
+set -euo pipefail
+
+# Usage: ./create_jks.sh
+
+DIR=$1
+SSL_KEY_PATH=$2
+SSL_CERT_PATH=$3
+CA_CERT_PATH=$4
+
+KEYSTORE="$DIR/keystore.jks"
+TRUSTSTORE="$DIR/truststore.jks"
+PKCS12_FILE="$DIR/keystore.p12"
+PASSWORD="passw0rd1!"
+
+cat "${CA_CERT_PATH}" >> "${SSL_CERT_PATH}"
+
+if [ ! -d "$DIR" ]; then
+ mkdir -p "$DIR"
+fi
+
+openssl pkcs12 -export \
+ -inkey "$SSL_KEY_PATH" \
+ -in "$SSL_CERT_PATH" \
+ -name "nifi-key" \
+ -out "$PKCS12_FILE" \
+ -password pass:$PASSWORD
+
+keytool -importkeystore \
+ -destkeystore "$KEYSTORE" \
+ -deststoretype jks \
+ -destalias nifi-key \
+ -srckeystore "$PKCS12_FILE" \
+ -srcstoretype pkcs12 \
+ -srcalias "nifi-key" \
+ -storepass "$PASSWORD" \
+ -srcstorepass "$PASSWORD" \
+ -noprompt
+
+keytool -importcert \
+ -alias "nifi-cert" \
+ -file "$CA_CERT_PATH" \
+ -keystore "$TRUSTSTORE" \
+ -storepass "$PASSWORD" \
+ -noprompt
diff --git a/behave_framework/src/minifi_test_framework/minifi/connection.py b/behave_framework/src/minifi_test_framework/minifi/connection.py
index 8674dfe3b4..7e443e59c3 100644
--- a/behave_framework/src/minifi_test_framework/minifi/connection.py
+++ b/behave_framework/src/minifi_test_framework/minifi/connection.py
@@ -24,6 +24,7 @@ def __init__(self, source_name: str, source_relationship: str, target_name: str)
self.source_name: str = source_name
self.source_relationship: str = source_relationship
self.target_name: str = target_name
+ self.drop_empty_flowfiles: bool = False
def __repr__(self):
return f"({self.source_name}:{self.source_relationship} --> {self.target_name})"
diff --git a/behave_framework/src/minifi_test_framework/minifi/flow_definition.py b/behave_framework/src/minifi_test_framework/minifi/flow_definition.py
index cbaa4b8de6..a636cf353a 100644
--- a/behave_framework/src/minifi_test_framework/minifi/flow_definition.py
+++ b/behave_framework/src/minifi_test_framework/minifi/flow_definition.py
@@ -22,6 +22,9 @@
from .funnel import Funnel
from .parameter_context import ParameterContext
from .processor import Processor
+from .remote_process_group import RemoteProcessGroup
+from .input_port import InputPort
+from .output_port import OutputPort
class FlowDefinition(ABC):
@@ -32,10 +35,59 @@ def __init__(self, flow_name: str = "MiNiFi Flow"):
self.funnels: list[Funnel] = []
self.connections: list[Connection] = []
self.parameter_contexts: list[ParameterContext] = []
+ self.remote_process_groups: list[RemoteProcessGroup] = []
+ self.input_ports: list[InputPort] = []
+ self.output_ports: list[OutputPort] = []
def add_processor(self, processor: Processor):
self.processors.append(processor)
+ def add_remote_process_group(self, address: str, name: str, protocol: str = "RAW"):
+ rpg = RemoteProcessGroup(name, address, protocol)
+ self.remote_process_groups.append(rpg)
+
+ def add_input_port_to_rpg(self, rpg_name: str, port_name: str, use_compression: bool = False):
+ rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
+ if rpg:
+ rpg.add_input_port(port_name, use_compression)
+ else:
+ raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")
+
+ def add_output_port_to_rpg(self, rpg_name: str, port_name: str, use_compression: bool = False):
+ rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
+ if rpg:
+ rpg.add_output_port(port_name, use_compression)
+ else:
+ raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")
+
+ def get_input_port_id_of_rpg(self, rpg_name: str, rpg_port_name: str) -> str:
+ rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
+ if rpg:
+ port = next((port for port in rpg.input_ports if port.name == rpg_port_name), None)
+ if port:
+ return port.id
+ else:
+ raise ValueError(f"InputPort with name '{rpg_port_name}' not found in RPG '{rpg_name}'.")
+ else:
+ raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")
+
+ def get_output_port_id_of_rpg(self, rpg_name: str, rpg_port_name: str) -> str:
+ rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
+ if rpg:
+ port = next((port for port in rpg.output_ports if port.name == rpg_port_name), None)
+ if port:
+ return port.id
+ else:
+ raise ValueError(f"OutputPort with name '{rpg_port_name}' not found in RPG '{rpg_name}'.")
+ else:
+ raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")
+
+ def add_input_port(self, input_port_id: str, input_port_name: str):
+ self.input_ports.append(InputPort(input_port_name, input_port_id))
+
+ def add_output_port(self, output_port_id: str, output_port_name: str):
+ self.output_ports.append(OutputPort(output_port_name, output_port_id))
+
def get_processor(self, processor_name: str) -> Processor | None:
return next((proc for proc in self.processors if proc.name == processor_name), None)
@@ -46,12 +98,20 @@ def get_parameter_context(self, parameter_context_name: str) -> ParameterContext
return next((parameter_context for parameter_context in self.parameter_contexts if
parameter_context.name == parameter_context_name), None)
+ def get_remote_process_group(self, rpg_name: str) -> RemoteProcessGroup | None:
+ return next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
+
def add_funnel(self, funnel: Funnel):
self.funnels.append(funnel)
def add_connection(self, connection: Connection):
self.connections.append(connection)
+ def set_drop_empty_for_destination(self, destination_name: str):
+ for connection in self.connections:
+ if connection.target_name == destination_name:
+ connection.drop_empty_flowfiles = True
+
def to_yaml(self) -> str:
raise NotImplementedError("to_yaml method must be implemented in subclasses")
diff --git a/behave_framework/src/minifi_test_framework/minifi/input_port.py b/behave_framework/src/minifi_test_framework/minifi/input_port.py
new file mode 100644
index 0000000000..1af6d20972
--- /dev/null
+++ b/behave_framework/src/minifi_test_framework/minifi/input_port.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class InputPort:
+ def __init__(self, name: str, id: str):
+ self.id: str = id
+ self.name: str = name
+
+ def to_json_dict(self):
+ data = {
+ "identifier": self.id,
+ "instanceIdentifier": self.id,
+ "name": self.name,
+ "comments": "",
+ "position": {
+ "x": 0,
+ "y": 0
+ },
+ "type": "INPUT_PORT",
+ "concurrentlySchedulableTaskCount": 1,
+ "scheduledState": "RUNNING",
+ "allowRemoteAccess": True,
+ "portFunction": "STANDARD",
+ "componentType": "INPUT_PORT",
+ "groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8"
+ }
+ return data
diff --git a/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py b/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py
index 469a3b8c17..e01a91880a 100644
--- a/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py
+++ b/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py
@@ -31,8 +31,10 @@ def to_yaml(self) -> str:
# This is crucial for finding the source/destination IDs for connections
processors_by_name = {p.name: p for p in self.processors}
funnels_by_name = {f.name: f for f in self.funnels}
+ remote_input_ports_by_name = {port.name: port for rpg in self.remote_process_groups for port in rpg.input_ports}
+ remote_output_ports_by_name = {port.name: port for rpg in self.remote_process_groups for port in rpg.output_ports}
- connectables_by_name = {**processors_by_name, **funnels_by_name}
+ connectables_by_name = {**processors_by_name, **funnels_by_name, **remote_input_ports_by_name, **remote_output_ports_by_name}
if len(self.parameter_contexts) > 0:
parameter_context_name = self.parameter_contexts[0].name
@@ -44,7 +46,8 @@ def to_yaml(self) -> str:
'Processors': [p.to_yaml_dict() for p in self.processors],
'Funnels': [f.to_yaml_dict() for f in self.funnels], 'Connections': [],
'Controller Services': [c.to_yaml_dict() for c in self.controller_services],
- 'Remote Processing Groups': [], 'Parameter Context Name': parameter_context_name}
+ 'Remote Processing Groups': [rpg.to_yaml_dict() for rpg in self.remote_process_groups],
+ 'Parameter Context Name': parameter_context_name}
# Build the connections list by looking up processor IDs
for conn in self.connections:
@@ -58,6 +61,6 @@ def to_yaml(self) -> str:
config['Connections'].append(
{'name': f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}", 'id': conn.id,
'source id': source_proc.id, 'source relationship name': conn.source_relationship,
- 'destination id': dest_proc.id})
+ 'destination id': dest_proc.id, "drop empty": conn.drop_empty_flowfiles})
return yaml.dump(config, sort_keys=False, indent=2, width=120)
diff --git a/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py b/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py
index 8e7faab5ad..a41e248bcf 100644
--- a/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py
+++ b/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py
@@ -21,7 +21,7 @@
class NifiFlowDefinition(FlowDefinition):
- NIFI_VERSION: str = '2.2.0'
+ NIFI_VERSION: str = '2.7.2'
def __init__(self, flow_name: str = "NiFi Flow"):
super().__init__(flow_name)
@@ -113,9 +113,22 @@ def to_json(self) -> str:
connections_node = config["rootGroup"]["connections"]
+ processors_by_name = {p.name: p for p in self.processors}
+ input_ports_by_name = {port.name: port for port in self.input_ports}
+ output_ports_by_name = {port.name: port for port in self.output_ports}
+
for conn in self.connections:
+ source_type = "PROCESSOR"
source_proc = processors_by_name.get(conn.source_name)
+ if not source_proc:
+ source_proc = input_ports_by_name.get(conn.source_name)
+ source_type = "INPUT_PORT"
+ conn.source_relationship = ""
+ dest_type = "PROCESSOR"
dest_proc = processors_by_name.get(conn.target_name)
+ if not dest_proc:
+ dest_proc = output_ports_by_name.get(conn.target_name)
+ dest_type = "OUTPUT_PORT"
if not source_proc or not dest_proc:
raise ValueError(
f"Could not find processors for connection from '{conn.source_name}' to '{conn.target_name}'")
@@ -126,7 +139,7 @@ def to_json(self) -> str:
"name": f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}",
"source": {
"id": source_proc.id,
- "type": "PROCESSOR",
+ "type": source_type,
"groupId": "9802c873-3322-3b60-a71d-732d02bd60f8",
"name": conn.source_name,
"comments": "",
@@ -134,7 +147,7 @@ def to_json(self) -> str:
},
"destination": {
"id": dest_proc.id,
- "type": "PROCESSOR",
+ "type": dest_type,
"groupId": "9802c873-3322-3b60-a71d-732d02bd60f8",
"name": dest_proc.name,
"comments": "",
@@ -155,4 +168,7 @@ def to_json(self) -> str:
"groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8"
})
+ config["rootGroup"]["inputPorts"] = [input_port.to_json_dict() for input_port in self.input_ports]
+ config["rootGroup"]["outputPorts"] = [output_port.to_json_dict() for output_port in self.output_ports]
+
return json.dumps(config)
diff --git a/behave_framework/src/minifi_test_framework/minifi/output_port.py b/behave_framework/src/minifi_test_framework/minifi/output_port.py
new file mode 100644
index 0000000000..ca2ce57272
--- /dev/null
+++ b/behave_framework/src/minifi_test_framework/minifi/output_port.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class OutputPort:
+ def __init__(self, name: str, id: str):
+ self.id: str = id
+ self.name: str = name
+
+ def to_json_dict(self):
+ data = {
+ "identifier": self.id,
+ "instanceIdentifier": self.id,
+ "name": self.name,
+ "comments": "",
+ "position": {
+ "x": 0,
+ "y": 0
+ },
+ "type": "OUTPUT_PORT",
+ "concurrentlySchedulableTaskCount": 1,
+ "scheduledState": "RUNNING",
+ "allowRemoteAccess": True,
+ "portFunction": "STANDARD",
+ "componentType": "OUTPUT_PORT",
+ "groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8"
+ }
+ return data
diff --git a/behave_framework/src/minifi_test_framework/minifi/remote_port.py b/behave_framework/src/minifi_test_framework/minifi/remote_port.py
new file mode 100644
index 0000000000..5e0d7ef677
--- /dev/null
+++ b/behave_framework/src/minifi_test_framework/minifi/remote_port.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import uuid
+
+
+class RemotePort:
+ def __init__(self, name: str, use_compression: bool):
+ self.id: str = str(uuid.uuid4())
+ self.name: str = name
+ self.use_compression: bool = use_compression
+ self.properties: dict[str, str] = {}
+
+ def add_property(self, property_name: str, property_value: str):
+ self.properties[property_name] = property_value
+
+ def to_yaml_dict(self):
+ data = {'id': self.id, 'name': self.name, 'use compression': self.use_compression, 'max concurrent tasks': 1,
+ 'Properties': self.properties}
+ return data
diff --git a/behave_framework/src/minifi_test_framework/minifi/remote_process_group.py b/behave_framework/src/minifi_test_framework/minifi/remote_process_group.py
new file mode 100644
index 0000000000..ae8b697535
--- /dev/null
+++ b/behave_framework/src/minifi_test_framework/minifi/remote_process_group.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import uuid
+from .remote_port import RemotePort
+
+
+class RemoteProcessGroup:
+ def __init__(self, name: str, address: str, protocol: str):
+ self.id: str = str(uuid.uuid4())
+ self.name: str = name
+ self.address: str = address
+ self.protocol: str = protocol
+ self.input_ports: list[RemotePort] = []
+ self.output_ports: list[RemotePort] = []
+ self.properties: dict[str, str] = {}
+
+ def add_input_port(self, port_name: str, use_compression: bool = False):
+ self.input_ports.append(RemotePort(port_name, use_compression))
+
+ def add_output_port(self, port_name: str, use_compression: bool = False):
+ self.output_ports.append(RemotePort(port_name, use_compression))
+
+ def get_input_port(self, port_name: str) -> RemotePort | None:
+ return next((port for port in self.input_ports if port.name == port_name), None)
+
+ def to_yaml_dict(self):
+ data = {'id': self.id, 'name': self.name, 'timeout': '30 sec', 'transport protocol': self.protocol,
+ 'url': self.address, 'yield period': '3 sec', 'Input Ports': [port.to_yaml_dict() for port in self.input_ports],
+ 'Output Ports': [port.to_yaml_dict() for port in self.output_ports]}
+ return data
diff --git a/behave_framework/src/minifi_test_framework/steps/configuration_steps.py b/behave_framework/src/minifi_test_framework/steps/configuration_steps.py
index 6bedc791c5..878d7d8e0f 100644
--- a/behave_framework/src/minifi_test_framework/steps/configuration_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/configuration_steps.py
@@ -50,6 +50,11 @@ def step_impl(context: MinifiTestContext):
context.get_or_create_default_minifi_container().fetch_flow_config_from_flow_url()
+@given("SSL properties are set in MiNiFi")
+def step_impl(context: MinifiTestContext):
+ context.get_or_create_default_minifi_container().set_up_ssl_proprties()
+
+
@given("ssl properties are set up for MiNiFi C2 server")
def step_impl(context: MinifiTestContext):
context.get_or_create_default_minifi_container().enable_c2_with_ssl()
diff --git a/behave_framework/src/minifi_test_framework/steps/core_steps.py b/behave_framework/src/minifi_test_framework/steps/core_steps.py
index 72ef1c1e98..1e57b56b98 100644
--- a/behave_framework/src/minifi_test_framework/steps/core_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/core_steps.py
@@ -147,6 +147,16 @@ def step_impl(context: MinifiTestContext):
context.containers["nifi"] = NifiContainer(context)
+@step("a NiFi container is set up with SSL enabled")
+def step_impl(context: MinifiTestContext):
+ context.containers["nifi"] = NifiContainer(context, use_ssl=True)
+
+
+@when(u'NiFi is started')
+def step_impl(context):
+ assert context.containers["nifi"].deploy() or context.containers["nifi"].log_app_output()
+
+
@given("a MiNiFi C2 server is set up")
def step_impl(context: MinifiTestContext):
context.containers["minifi-c2-server"] = MinifiC2Server(context)
diff --git a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
index 770f222a7c..a8c07c3be4 100644
--- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
@@ -99,6 +99,17 @@ def step_impl(context: MinifiTestContext, processor_type: str):
context.execute_steps(f'given a {processor_type} processor in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow')
+@given('the "{property_name}" property of the "{port_name}" port in the "{rpg_name}" remote process group is set to "{property_value}"')
+def step_impl(context: MinifiTestContext, property_name: str, port_name: str, rpg_name: str, property_value: str):
+ rpg = context.get_or_create_minifi_container(DEFAULT_MINIFI_CONTAINER_NAME).flow_definition.get_remote_process_group(rpg_name)
+ if not rpg:
+ raise ValueError(f"Remote Process Group with name {rpg_name} not found")
+ port = rpg.get_input_port(port_name)
+ if not port:
+ raise ValueError(f"Remote input port with name {port_name} not found in RPG {rpg_name}")
+ port.add_property(property_name, property_value)
+
+
@given('the "{property_name}" property of the {processor_name} processor is set to "{property_value}" in the "{minifi_container_name}" flow')
def step_impl(context: MinifiTestContext, property_name: str, processor_name: str, property_value: str, minifi_container_name: str):
processor = context.get_or_create_minifi_container(minifi_container_name).flow_definition.get_processor(processor_name)
@@ -138,6 +149,7 @@ def step_impl(context: MinifiTestContext, funnel_name: str):
@step('in the "{minifi_container_name}" flow the "{relationship_name}" relationship of the {source} processor is connected to the {target}')
+@step('in the "{minifi_container_name}" flow the "{relationship_name}" relationship of the {source} node is connected to the {target}')
def step_impl(context: MinifiTestContext, relationship_name: str, source: str, target: str, minifi_container_name: str):
connection = Connection(source_name=source, source_relationship=relationship_name, target_name=target)
context.get_or_create_minifi_container(minifi_container_name).flow_definition.add_connection(connection)
@@ -150,10 +162,17 @@ def step_impl(context: MinifiTestContext, relationship_name: str, source: str, t
@step('the "{relationship_name}" relationship of the {source} processor is connected to the {target}')
+@step('the "{relationship_name}" relationship of the {source} node is connected to the {target}')
def step_impl(context: MinifiTestContext, relationship_name: str, source: str, target: str):
context.execute_steps(f'given in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow the "{relationship_name}" relationship of the {source} processor is connected to the {target}')
+@step("the output port \"{port_name}\" is connected to the {destination_name} processor")
+def step_impl(context: MinifiTestContext, port_name: str, destination_name: str):
+ connection = Connection(source_name=port_name, source_relationship="undefined", target_name=destination_name)
+ context.get_or_create_minifi_container(DEFAULT_MINIFI_CONTAINER_NAME).flow_definition.add_connection(connection)
+
+
@step('the Funnel with the name "{funnel_name}" is connected to the {target}')
def step_impl(context: MinifiTestContext, funnel_name: str, target: str):
connection = Connection(source_name=funnel_name, source_relationship="success", target_name=target)
@@ -351,3 +370,50 @@ def step_impl(context: MinifiTestContext, processor_name):
add_ssl_context_service_for_minifi(context, "minifi_client", use_system_cert_store=True)
processor = context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name)
processor.add_property('SSL Context Service', 'SSLContextService')
+
+
+@given("a RemoteProcessGroup node with name \"{rpg_name}\" is opened on \"{address}\" with transport protocol set to \"{transport_protocol}\"")
+def step_impl(context: MinifiTestContext, rpg_name: str, address: str, transport_protocol: str):
+ context.get_or_create_default_minifi_container().flow_definition.add_remote_process_group(address, rpg_name, transport_protocol)
+
+
+@given("a RemoteProcessGroup node with name \"{rpg_name}\" is opened on \"{address}\"")
+def step_impl(context: MinifiTestContext, rpg_name: str, address: str):
+ context.execute_steps(f"given a RemoteProcessGroup node with name \"{rpg_name}\" is opened on \"{address}\" with transport protocol set to \"RAW\"")
+
+
+@given("an input port with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"")
+def step_impl(context: MinifiTestContext, port_name: str, rpg_name: str):
+ context.get_or_create_default_minifi_container().flow_definition.add_input_port_to_rpg(rpg_name, port_name)
+
+
+@given("an input port using compression with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"")
+def step_impl(context: MinifiTestContext, port_name: str, rpg_name: str):
+ context.get_or_create_default_minifi_container().flow_definition.add_input_port_to_rpg(rpg_name, port_name, use_compression=True)
+
+
+@given("an output port with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"")
+def step_impl(context: MinifiTestContext, port_name: str, rpg_name: str):
+ context.get_or_create_default_minifi_container().flow_definition.add_output_port_to_rpg(rpg_name, port_name)
+
+
+@given("an output port using compression with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"")
+def step_impl(context: MinifiTestContext, port_name: str, rpg_name: str):
+ context.get_or_create_default_minifi_container().flow_definition.add_output_port_to_rpg(rpg_name, port_name, use_compression=True)
+
+
+@given("a NiFi flow is receiving data from the RemoteProcessGroup named \"{rpg_name}\" in an input port named \"{input_port_name}\" which has the same id as the port named \"{rpg_port_name}\"")
+def step_impl(context: MinifiTestContext, input_port_name: str, rpg_port_name: str, rpg_name: str):
+ input_port_id = context.get_or_create_default_minifi_container().flow_definition.get_input_port_id_of_rpg(rpg_name, rpg_port_name)
+ context.containers["nifi"].flow_definition.add_input_port(input_port_id, input_port_name)
+
+
+@given("a NiFi flow is sending data to an output port named \"{port_name}\" with the id of the port named \"{rpg_port_name}\" from the RemoteProcessGroup named \"{rpg_name}\"")
+def step_impl(context: MinifiTestContext, port_name: str, rpg_port_name: str, rpg_name: str):
+ output_port_id = context.get_or_create_default_minifi_container().flow_definition.get_output_port_id_of_rpg(rpg_name, rpg_port_name)
+ context.containers["nifi"].flow_definition.add_output_port(output_port_id, port_name)
+
+
+@given("the connection going to {destination} has \"drop empty\" set")
+def step_impl(context: MinifiTestContext, destination: str):
+ context.get_or_create_default_minifi_container().flow_definition.set_drop_empty_for_destination(destination)
diff --git a/docker/test/integration/features/s2s.feature b/extensions/standard-processors/tests/features/s2s.feature
similarity index 53%
rename from docker/test/integration/features/s2s.feature
rename to extensions/standard-processors/tests/features/s2s.feature
index 9b3321d4f9..4e56b4affa 100644
--- a/docker/test/integration/features/s2s.feature
+++ b/extensions/standard-processors/tests/features/s2s.feature
@@ -19,354 +19,290 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol
As a user of MiNiFi
I need to have RemoteProcessGroup flow nodes
- Background:
- Given the content of "/tmp/output" is monitored
-
Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi"
+ And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi"
And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
And the "success" relationship of the GetFile processor is connected to the to_nifi
+ And a NiFi container is set up
And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
+ And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile
+ And PutFile's success relationship is auto-terminated in the "nifi" flow
+ And PutFile's failure relationship is auto-terminated in the "nifi" flow
When NiFi is started
And all instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then in the "nifi" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: A MiNiFi instance produces and transfers a large data file to a NiFi instance via s2s
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content "this is a very long file we want to send by site-to-site" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi"
+ And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi"
And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
And the "success" relationship of the GetFile processor is connected to the to_nifi
+ And a NiFi container is set up
And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
+ And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile
+ And PutFile's success relationship is auto-terminated in the "nifi" flow
+ And PutFile's failure relationship is auto-terminated in the "nifi" flow
When NiFi is started
And all instances start up
- Then a flowfile with the content "this is a very long file we want to send by site-to-site" is placed in the monitored directory in less than 90 seconds
+ Then in the "nifi" container a single file with the content "this is a very long file we want to send by site-to-site" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: Zero length files are transfered between via s2s if the "drop empty" connection property is false
+ # "drop empty" is only supported with yaml config
Given a MiNiFi CPP server with yaml config
And a GenerateFlowFile processor with the "File Size" property set to "0B"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi"
+ And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi"
And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
And the "success" relationship of the GenerateFlowFile processor is connected to the to_nifi
+ And a NiFi container is set up
And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
+ And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile
+ And PutFile's success relationship is auto-terminated in the "nifi" flow
+ And PutFile's failure relationship is auto-terminated in the "nifi" flow
When NiFi is started
And all instances start up
- Then at least one empty flowfile is placed in the monitored directory in less than 90 seconds
+ Then in the "nifi" container at least one empty file is placed in the "/tmp/output" directory in less than 90 seconds
Scenario: Zero length files are not transfered between via s2s if the "drop empty" connection property is true
# "drop empty" is only supported with yaml config
Given a MiNiFi CPP server with yaml config
And a GenerateFlowFile processor with the "File Size" property set to "0B"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi"
+ And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi"
And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
And the "success" relationship of the GenerateFlowFile processor is connected to the to_nifi
- And the connection going to the RemoteProcessGroup has "drop empty" set
+ And the connection going to to_nifi has "drop empty" set
+ And a NiFi container is set up
And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
+ And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile
+ And PutFile's success relationship is auto-terminated in the "nifi" flow
+ And PutFile's failure relationship is auto-terminated in the "nifi" flow
When NiFi is started
And all instances start up
- Then no files are placed in the monitored directory in 50 seconds of running time
+ Then in the "nifi" container no files are placed in the "/tmp/output" directory in 50 seconds of running time
Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using SSL
- Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
- And the "Keep Source File" property of the GetFile processor is set to "true"
- And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi"
- And a SSL context service is set up for the following remote process group: "RemoteProcessGroup"
- And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile processor is connected to the to_nifi
-
- And SSL is enabled in NiFi flow
- And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
- And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
-
- When NiFi is started
- And all instances start up
-
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
- And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
-
- Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using SSL with YAML config
- Given a MiNiFi CPP server with yaml config
+ Given an ssl context service is set up
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
And the "Keep Source File" property of the GetFile processor is set to "true"
And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi"
- And a SSL context service is set up for the following remote process group: "RemoteProcessGroup"
+ And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${scenario_id}:8443/nifi"
And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
+ And the "SSL Context Service" property of the "to_nifi" port in the "RemoteProcessGroup" remote process group is set to "SSLContextService"
And the "success" relationship of the GetFile processor is connected to the to_nifi
- And SSL is enabled in NiFi flow
+ And a NiFi container is set up with SSL enabled
And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
+ And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile
+ And PutFile's success relationship is auto-terminated in the "nifi" flow
+ And PutFile's failure relationship is auto-terminated in the "nifi" flow
When NiFi is started
And all instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then in the "nifi" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using SSL config defined in minifi.properties
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And the "Keep Source File" property of the GetFile processor is set to "true"
And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi"
+ And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${scenario_id}:8443/nifi"
And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
And the "success" relationship of the GetFile processor is connected to the to_nifi
And SSL properties are set in MiNiFi
- And SSL is enabled in NiFi flow
+ And a NiFi container is set up with SSL enabled
And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
+ And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile
+ And PutFile's success relationship is auto-terminated in the "nifi" flow
+ And PutFile's failure relationship is auto-terminated in the "nifi" flow
When NiFi is started
And all instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
- And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
-
- Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using YAML config and SSL config defined in minifi.properties
- Given a MiNiFi CPP server with yaml config
- And a GetFile processor with the "Input Directory" property set to "/tmp/input"
- And the "Keep Source File" property of the GetFile processor is set to "true"
- And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi"
- And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile processor is connected to the to_nifi
- And SSL properties are set in MiNiFi
-
- And SSL is enabled in NiFi flow
- And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
- And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
-
- When NiFi is started
- And all instances start up
-
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then in the "nifi" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using HTTP protocol
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" with transport protocol set to "HTTP"
+ And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" with transport protocol set to "HTTP"
And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
And the "success" relationship of the GetFile processor is connected to the to_nifi
+ And a NiFi container is set up
And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
+ And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile
+ And PutFile's success relationship is auto-terminated in the "nifi" flow
+ And PutFile's failure relationship is auto-terminated in the "nifi" flow
When NiFi is started
And all instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
- And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
-
- Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using HTTP with YAML config and SSL config defined in minifi.properties
- Given a MiNiFi CPP server with yaml config
- And a GetFile processor with the "Input Directory" property set to "/tmp/input"
- And the "Keep Source File" property of the GetFile processor is set to "true"
- And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi" with transport protocol set to "HTTP"
- And an input port with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile processor is connected to the to_nifi
- And SSL properties are set in MiNiFi
-
- And SSL is enabled in NiFi flow
- And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
- And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
-
- When NiFi is started
- And all instances start up
-
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then in the "nifi" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s
- Given a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi"
+ Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi"
And an output port with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And "from_nifi" port is a start node
And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And PutFile is EVENT_DRIVEN
And the output port "from_nifi" is connected to the PutFile processor
+ And PutFile's success relationship is auto-terminated
- And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine
+ And a NiFi container is set up
+ And a file with the content "test" is present in "/tmp/input" in the "nifi" flow
+ And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow
And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi
+ And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi
When NiFi is started
And all instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s using HTTP protocol
- Given a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" with transport protocol set to "HTTP"
+ Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" with transport protocol set to "HTTP"
And an output port with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And "from_nifi" port is a start node
And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And PutFile is EVENT_DRIVEN
And the output port "from_nifi" is connected to the PutFile processor
+ And PutFile's success relationship is auto-terminated
- And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine
+ And a NiFi container is set up
+ And a file with the content "test" is present in "/tmp/input" in the "nifi" flow
+ And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow
And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi
+ And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi
When NiFi is started
And all instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s with SSL config defined in minifi.properties
- Given a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi"
+ Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${scenario_id}:8443/nifi"
And an output port with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And "from_nifi" port is a start node
And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And PutFile is EVENT_DRIVEN
And the output port "from_nifi" is connected to the PutFile processor
And SSL properties are set in MiNiFi
+ And PutFile's success relationship is auto-terminated
- And SSL is enabled in NiFi flow
- And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine
+ And a NiFi container is set up with SSL enabled
+ And a file with the content "test" is present in "/tmp/input" in the "nifi" flow
+ And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow
And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi
+ And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi
When NiFi is started
And all instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s using HTTP protocol with SSL config defined in minifi.properties
- Given a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi" with transport protocol set to "HTTP"
+ Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${scenario_id}:8443/nifi" with transport protocol set to "HTTP"
And an output port with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And "from_nifi" port is a start node
And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And PutFile is EVENT_DRIVEN
And the output port "from_nifi" is connected to the PutFile processor
And SSL properties are set in MiNiFi
+ And PutFile's success relationship is auto-terminated
- And SSL is enabled in NiFi flow
- And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine
+ And a NiFi container is set up with SSL enabled
+ And a file with the content "test" is present in "/tmp/input" in the "nifi" flow
+ And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow
And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi
+ And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi
When NiFi is started
And all instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using compression
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi"
+ And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi"
And an input port using compression with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
And the "success" relationship of the GetFile processor is connected to the to_nifi
+ And a NiFi container is set up
And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
+ And in the "nifi" flow the "success" relationship of the from-minifi node is connected to the PutFile
+ And PutFile's success relationship is auto-terminated in the "nifi" flow
+ And PutFile's failure relationship is auto-terminated in the "nifi" flow
- When both instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
- And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
-
- Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using compression in YAML config
- Given a MiNiFi CPP server with yaml config
- And a GetFile processor with the "Input Directory" property set to "/tmp/input"
- And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi"
- And an input port using compression with name "to_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile processor is connected to the to_nifi
-
- And a NiFi flow is receiving data from the RemoteProcessGroup named "RemoteProcessGroup" in an input port named "from-minifi" which has the same id as the port named "to_nifi"
- And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
- And the "success" relationship of the from-minifi is connected to the PutFile
+ When NiFi is started
+ And all instances start up
- When both instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then in the "nifi" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s using compression
- Given a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi"
+ Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi"
And an output port using compression with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And "from_nifi" port is a start node
And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And PutFile is EVENT_DRIVEN
And the output port "from_nifi" is connected to the PutFile processor
+ And PutFile's success relationship is auto-terminated
- And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine
+ And a NiFi container is set up
+ And a file with the content "test" is present in "/tmp/input" in the "nifi" flow
+ And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow
And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi
+ And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi
- When both instances start up
+ When NiFi is started
+ And all instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s using compression and HTTP protocol
- Given a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${feature_id}:8080/nifi" with transport protocol set to "HTTP"
+ Given a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "http://nifi-${scenario_id}:8080/nifi" with transport protocol set to "HTTP"
And an output port using compression with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And "from_nifi" port is a start node
And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And PutFile is EVENT_DRIVEN
And the output port "from_nifi" is connected to the PutFile processor
+ And PutFile's success relationship is auto-terminated
- And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine
+ And a NiFi container is set up
+ And a file with the content "test" is present in "/tmp/input" in the "nifi" flow
+ And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow
And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi
+ And in the "nifi" flow the "success" relationship of the GetFile processor is connected to the to-minifi-in-nifi
- When both instances start up
-
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
- And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
-
- Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s using compression with YAML config and SSL config defined in minifi.properties
- Given a MiNiFi CPP server with yaml config
- And a file with the content "test" is present in "/tmp/input"
- And a RemoteProcessGroup node with name "RemoteProcessGroup" is opened on "https://nifi-${feature_id}:8443/nifi"
- And an output port using compression with name "from_nifi" is created on the RemoteProcessGroup named "RemoteProcessGroup"
- And "from_nifi" port is a start node
- And a PutFile processor with the "Directory" property set to "/tmp/output"
- And the output port "from_nifi" is connected to the PutFile processor
- And SSL properties are set in MiNiFi
-
- And SSL is enabled in NiFi flow
- And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "nifi" flow using the "nifi" engine
- And a NiFi flow is sending data to an output port named "to-minifi-in-nifi" with the id of the port named "from_nifi" from the RemoteProcessGroup named "RemoteProcessGroup"
- And the "success" relationship of the GetFile is connected to the to-minifi-in-nifi
-
- When both instances start up
+ When NiFi is started
+ And all instances start up
- Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+ Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds