Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,27 @@

import io
import gzip
from typing import List, Optional
import logging
import os
from pathlib import Path
from OpenSSL import crypto

from minifi_test_framework.containers.file import File
from minifi_test_framework.containers.container import Container
from minifi_test_framework.core.helpers import wait_for_condition
from minifi_test_framework.core.minifi_test_context import MinifiTestContext
from minifi_test_framework.minifi.nifi_flow_definition import NifiFlowDefinition
from minifi_test_framework.containers.host_file import HostFile
from minifi_test_framework.core.ssl_utils import make_server_cert


class NifiContainer(Container):
NIFI_VERSION = '2.7.2'

def __init__(self, test_context: MinifiTestContext, command: Optional[List[str]] = None, use_ssl: bool = False):
def __init__(self, test_context: MinifiTestContext, command: list[str] | None = None, use_ssl: bool = False):
self.flow_definition = NifiFlowDefinition()
name = f"nifi-{test_context.scenario_id}"
if use_ssl:
entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' "
entry_command = (r"/scripts/convert_cert_to_jks.sh /tmp/resources /tmp/resources/nifi_client.key /tmp/resources/nifi_client.crt /tmp/resources/root_ca.crt &&"
r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' "
r"-e 's/^\(nifi.remote.input.secure\)=.*/\1=true/' "
r"-e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' "
r"-e 's/^\(nifi.web.https.port\)=.*/\1=8443/' "
Expand Down Expand Up @@ -68,10 +73,18 @@ def __init__(self, test_context: MinifiTestContext, command: Optional[List[str]]
if not command:
command = ["/bin/sh", "-c", entry_command]

super().__init__("apache/nifi:" + self.NIFI_VERSION, name, test_context.network, entrypoint=command)
super().__init__("apache/nifi:" + NifiFlowDefinition.NIFI_VERSION, name, test_context.network, entrypoint=command)
resource_dir = Path(__file__).resolve().parent / "resources" / "nifi"
self.host_files.append(HostFile("/scripts/convert_cert_to_jks.sh", os.path.join(resource_dir, "convert_cert_to_jks.sh")))

nifi_client_cert, nifi_client_key = make_server_cert(common_name=f"nifi-{test_context.scenario_id}", ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key)
self.files.append(File("/tmp/resources/root_ca.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=test_context.root_ca_cert)))
self.files.append(File("/tmp/resources/nifi_client.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=nifi_client_cert)))
self.files.append(File("/tmp/resources/nifi_client.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=nifi_client_key)))

def deploy(self):
flow_config = self.flow_definition.to_json()
logging.info(f"Deploying NiFi container '{self.container_name}' with flow configuration:\n{flow_config}")
buffer = io.BytesIO()

with gzip.GzipFile(fileobj=buffer, mode='wb') as gz_file:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash
set -euo pipefail

# Usage: ./create_jks.sh <base directory> <ssl_key_path> <ssl_cert_path> <ca_cert_path>

DIR=$1
SSL_KEY_PATH=$2
SSL_CERT_PATH=$3
CA_CERT_PATH=$4

KEYSTORE="$DIR/keystore.jks"
TRUSTSTORE="$DIR/truststore.jks"
PKCS12_FILE="$DIR/keystore.p12"
PASSWORD="passw0rd1!"

cat "${CA_CERT_PATH}" >> "${SSL_CERT_PATH}"

if [ ! -d "$DIR" ]; then
mkdir -p "$DIR"
fi

openssl pkcs12 -export \
-inkey "$SSL_KEY_PATH" \
-in "$SSL_CERT_PATH" \
-name "nifi-key" \
-out "$PKCS12_FILE" \
-password pass:$PASSWORD

keytool -importkeystore \
-destkeystore "$KEYSTORE" \
-deststoretype jks \
-destalias nifi-key \
-srckeystore "$PKCS12_FILE" \
-srcstoretype pkcs12 \
-srcalias "nifi-key" \
-storepass "$PASSWORD" \
-srcstorepass "$PASSWORD" \
-noprompt

keytool -importcert \
-alias "nifi-cert" \
-file "$CA_CERT_PATH" \
-keystore "$TRUSTSTORE" \
-storepass "$PASSWORD" \
-noprompt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, source_name: str, source_relationship: str, target_name: str)
self.source_name: str = source_name
self.source_relationship: str = source_relationship
self.target_name: str = target_name
self.drop_empty_flowfiles: bool = False

def __repr__(self):
return f"({self.source_name}:{self.source_relationship} --> {self.target_name})"
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
from .funnel import Funnel
from .parameter_context import ParameterContext
from .processor import Processor
from .remote_process_group import RemoteProcessGroup
from .input_port import InputPort
from .output_port import OutputPort


class FlowDefinition(ABC):
Expand All @@ -32,10 +35,59 @@ def __init__(self, flow_name: str = "MiNiFi Flow"):
self.funnels: list[Funnel] = []
self.connections: list[Connection] = []
self.parameter_contexts: list[ParameterContext] = []
self.remote_process_groups: list[RemoteProcessGroup] = []
self.input_ports: list[InputPort] = []
self.output_ports: list[OutputPort] = []

def add_processor(self, processor: Processor):
self.processors.append(processor)

def add_remote_process_group(self, address: str, name: str, protocol: str = "RAW"):
rpg = RemoteProcessGroup(name, address, protocol)
self.remote_process_groups.append(rpg)

def add_input_port_to_rpg(self, rpg_name: str, port_name: str, use_compression: bool = False):
rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
if rpg:
rpg.add_input_port(port_name, use_compression)
else:
raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")

def add_output_port_to_rpg(self, rpg_name: str, port_name: str, use_compression: bool = False):
rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
if rpg:
rpg.add_output_port(port_name, use_compression)
else:
raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")

def get_input_port_id_of_rpg(self, rpg_name: str, rpg_port_name: str) -> str:
rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
if rpg:
port = next((port for port in rpg.input_ports if port.name == rpg_port_name), None)
if port:
return port.id
else:
raise ValueError(f"InputPort with name '{rpg_port_name}' not found in RPG '{rpg_name}'.")
else:
raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")

def get_output_port_id_of_rpg(self, rpg_name: str, rpg_port_name: str) -> str:
rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
if rpg:
port = next((port for port in rpg.output_ports if port.name == rpg_port_name), None)
if port:
return port.id
else:
raise ValueError(f"OutputPort with name '{rpg_port_name}' not found in RPG '{rpg_name}'.")
else:
raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")

def add_input_port(self, input_port_id: str, input_port_name: str):
self.input_ports.append(InputPort(input_port_name, input_port_id))

def add_output_port(self, output_port_id: str, output_port_name: str):
self.output_ports.append(OutputPort(output_port_name, output_port_id))

def get_processor(self, processor_name: str) -> Processor | None:
return next((proc for proc in self.processors if proc.name == processor_name), None)

Expand All @@ -46,12 +98,20 @@ def get_parameter_context(self, parameter_context_name: str) -> ParameterContext
return next((parameter_context for parameter_context in self.parameter_contexts if
parameter_context.name == parameter_context_name), None)

def get_remote_process_group(self, rpg_name: str) -> RemoteProcessGroup | None:
return next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)

def add_funnel(self, funnel: Funnel):
self.funnels.append(funnel)

def add_connection(self, connection: Connection):
self.connections.append(connection)

def set_drop_empty_for_destination(self, destination_name: str):
for connection in self.connections:
if connection.target_name == destination_name:
connection.drop_empty_flowfiles = True

def to_yaml(self) -> str:
raise NotImplementedError("to_yaml method must be implemented in subclasses")

Expand Down
41 changes: 41 additions & 0 deletions behave_framework/src/minifi_test_framework/minifi/input_port.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

class InputPort:
def __init__(self, name: str, id: str):
self.id: str = id
self.name: str = name

def to_json_dict(self):
data = {
"identifier": self.id,
"instanceIdentifier": self.id,
"name": self.name,
"comments": "",
"position": {
"x": 0,
"y": 0
},
"type": "INPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"scheduledState": "RUNNING",
"allowRemoteAccess": True,
"portFunction": "STANDARD",
"componentType": "INPUT_PORT",
"groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8"
}
return data
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ def to_yaml(self) -> str:
# This is crucial for finding the source/destination IDs for connections
processors_by_name = {p.name: p for p in self.processors}
funnels_by_name = {f.name: f for f in self.funnels}
remote_input_ports_by_name = {port.name: port for rpg in self.remote_process_groups for port in rpg.input_ports}
remote_output_ports_by_name = {port.name: port for rpg in self.remote_process_groups for port in rpg.output_ports}

connectables_by_name = {**processors_by_name, **funnels_by_name}
connectables_by_name = {**processors_by_name, **funnels_by_name, **remote_input_ports_by_name, **remote_output_ports_by_name}

if len(self.parameter_contexts) > 0:
parameter_context_name = self.parameter_contexts[0].name
Expand All @@ -44,7 +46,8 @@ def to_yaml(self) -> str:
'Processors': [p.to_yaml_dict() for p in self.processors],
'Funnels': [f.to_yaml_dict() for f in self.funnels], 'Connections': [],
'Controller Services': [c.to_yaml_dict() for c in self.controller_services],
'Remote Processing Groups': [], 'Parameter Context Name': parameter_context_name}
'Remote Processing Groups': [rpg.to_yaml_dict() for rpg in self.remote_process_groups],
'Parameter Context Name': parameter_context_name}

# Build the connections list by looking up processor IDs
for conn in self.connections:
Expand All @@ -58,6 +61,6 @@ def to_yaml(self) -> str:
config['Connections'].append(
{'name': f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}", 'id': conn.id,
'source id': source_proc.id, 'source relationship name': conn.source_relationship,
'destination id': dest_proc.id})
'destination id': dest_proc.id, "drop empty": conn.drop_empty_flowfiles})

return yaml.dump(config, sort_keys=False, indent=2, width=120)
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


class NifiFlowDefinition(FlowDefinition):
NIFI_VERSION: str = '2.2.0'
NIFI_VERSION: str = '2.7.2'

def __init__(self, flow_name: str = "NiFi Flow"):
super().__init__(flow_name)
Expand Down Expand Up @@ -113,9 +113,22 @@ def to_json(self) -> str:

connections_node = config["rootGroup"]["connections"]

processors_by_name = {p.name: p for p in self.processors}
input_ports_by_name = {port.name: port for port in self.input_ports}
output_ports_by_name = {port.name: port for port in self.output_ports}

for conn in self.connections:
source_type = "PROCESSOR"
source_proc = processors_by_name.get(conn.source_name)
if not source_proc:
source_proc = input_ports_by_name.get(conn.source_name)
source_type = "INPUT_PORT"
conn.source_relationship = ""
dest_type = "PROCESSOR"
dest_proc = processors_by_name.get(conn.target_name)
if not dest_proc:
dest_proc = output_ports_by_name.get(conn.target_name)
dest_type = "OUTPUT_PORT"
if not source_proc or not dest_proc:
raise ValueError(
f"Could not find processors for connection from '{conn.source_name}' to '{conn.target_name}'")
Expand All @@ -126,15 +139,15 @@ def to_json(self) -> str:
"name": f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}",
"source": {
"id": source_proc.id,
"type": "PROCESSOR",
"type": source_type,
"groupId": "9802c873-3322-3b60-a71d-732d02bd60f8",
"name": conn.source_name,
"comments": "",
"instanceIdentifier": source_proc.id
},
"destination": {
"id": dest_proc.id,
"type": "PROCESSOR",
"type": dest_type,
"groupId": "9802c873-3322-3b60-a71d-732d02bd60f8",
"name": dest_proc.name,
"comments": "",
Expand All @@ -155,4 +168,7 @@ def to_json(self) -> str:
"groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8"
})

config["rootGroup"]["inputPorts"] = [input_port.to_json_dict() for input_port in self.input_ports]
config["rootGroup"]["outputPorts"] = [output_port.to_json_dict() for output_port in self.output_ports]

return json.dumps(config)
41 changes: 41 additions & 0 deletions behave_framework/src/minifi_test_framework/minifi/output_port.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

class OutputPort:
def __init__(self, name: str, id: str):
self.id: str = id
self.name: str = name

def to_json_dict(self):
data = {
"identifier": self.id,
"instanceIdentifier": self.id,
"name": self.name,
"comments": "",
"position": {
"x": 0,
"y": 0
},
"type": "OUTPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"scheduledState": "RUNNING",
"allowRemoteAccess": True,
"portFunction": "STANDARD",
"componentType": "OUTPUT_PORT",
"groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8"
}
return data
Loading
Loading