Skip to content

Commit c63665d

Browse files
committed
MINIFICPP-2590 Add Sparkplug B support for PublishMQTT processor
- Add SparkplugBWriter record writer controller service - Add Record Reader and Record Writer properties to PublishMQTT
1 parent 66efb04 commit c63665d

18 files changed

Lines changed: 998 additions & 108 deletions

File tree

CONTROLLERS.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ limitations under the License.
3030
- [RocksDbStateStorage](#RocksDbStateStorage)
3131
- [SmbConnectionControllerService](#SmbConnectionControllerService)
3232
- [SparkplugBReader](#SparkplugBReader)
33+
- [SparkplugBWriter](#SparkplugBWriter)
3334
- [SSLContextService](#SSLContextService)
3435
- [UpdatePolicyControllerService](#UpdatePolicyControllerService)
3536
- [VolatileMapStateStorage](#VolatileMapStateStorage)
@@ -292,6 +293,20 @@ In the list below, the names of required properties appear in bold. Any other pr
292293
|------|---------------|------------------|-------------|
293294

294295

296+
## SparkplugBWriter
297+
298+
### Description
299+
300+
Serializes recordset to Sparkplug B messages and writes them into a FlowFile. This writer is typically used with MQTT processors like PublishMQTT.
301+
302+
### Properties
303+
304+
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
305+
306+
| Name | Default Value | Allowable Values | Description |
307+
|------|---------------|------------------|-------------|
308+
309+
295310
## SSLContextService
296311

297312
### Description

PROCESSORS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2164,6 +2164,8 @@ In the list below, the names of required properties appear in bold. Any other pr
21642164
| Retain | false | true<br/>false | Retain published message in broker |
21652165
| Message Expiry Interval | | | Time while message is valid and will be forwarded by broker. MQTT 5.x only. |
21662166
| Content Type | | | Content type of the message. MQTT 5.x only.<br/>**Supports Expression Language: true** |
2167+
| Record Reader | | | The Record Reader to use for parsing the incoming FlowFile into Records. |
2168+
| Record Writer | | | The Record Writer to use for serializing Records before publishing them as an MQTT Message. |
21672169
| **Quality of Service** | 0 | 0<br/>1<br/>2 | The Quality of Service (QoS) of messages. |
21682170
| Connection Timeout | 10 sec | | Maximum time interval the client will wait for the network connection to the MQTT broker |
21692171
| Keep Alive Interval | 60 sec | | Defines the maximum time interval between messages sent or received |

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte
8585
| Kubernetes (Linux) | [KubernetesControllerService](CONTROLLERS.md#kubernetescontrollerservice) | -DENABLE_KUBERNETES=ON |
8686
| LlamaCpp | [RunLlamaCppInference](PROCESSORS.md#runllamacppinference) | -DENABLE_LLAMACPP=ON |
8787
| Lua Scripting | [ExecuteScript](PROCESSORS.md#executescript) | -DENABLE_LUA_SCRIPTING=ON |
88-
| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)<br/>[PublishMQTT](PROCESSORS.md#publishmqtt)<br/>[SparkplugBReader](PROCESSORS.md#sparkplugbreader) | -DENABLE_MQTT=ON |
88+
| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)<br/>[PublishMQTT](PROCESSORS.md#publishmqtt)<br/>[SparkplugBReader](PROCESSORS.md#sparkplugbreader)<br/>[SparkplugBWriter](PROCESSORS.md#sparkplugbwriter) | -DENABLE_MQTT=ON |
8989
| OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)<br/>[PutOPCProcessor](PROCESSORS.md#putopcprocessor) | -DENABLE_OPC=ON |
9090
| OpenCV | [CaptureRTSPFrame](PROCESSORS.md#capturertspframe)<br/>[MotionDetector](PROCESSORS.md#motiondetector) | -DENABLE_OPENCV=ON |
9191
| PDH (Windows) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) | -DENABLE_PDH=ON |

docker/test/integration/features/mqtt.feature

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,3 +539,44 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
539539
| version |
540540
| 3.1.1 |
541541
| 5.0 |
542+
543+
Scenario Outline: A MiNiFi instance publishes then consumes Sparkplug message through MQTT broker
544+
Given an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
545+
546+
And a SparkplugBReader controller service is set up in the "consumer-client" flow
547+
And a JsonRecordSetWriter controller service is set up with "Array" output grouping in the "consumer-client" flow
548+
And a SparkplugBWriter controller service is set up in the "publisher-client" flow
549+
And a JsonTreeReader controller service is set up in the "publisher-client" flow
550+
551+
And a file with the content '{"timestamp":987654321,"metrics":[{"int_value":123,"timestamp":45345346346,"name":"TestMetric"}],"seq":12345,"body":"test-body","uuid":"test-uuid"}' is present in '/tmp/input'
552+
553+
And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
554+
And a PublishMQTT processor in the "publisher-client" flow
555+
And the "Topic" property of the PublishMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice"
556+
And the "MQTT Version" property of the PublishMQTT processor is set to "<version>"
557+
And the "Record Reader" property of the PublishMQTT processor is set to "JsonTreeReader"
558+
And the "Record Writer" property of the PublishMQTT processor is set to "SparkplugBWriter"
559+
560+
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
561+
562+
And a ConsumeMQTT processor in the "consumer-client" flow
563+
And the "Topic" property of the ConsumeMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice"
564+
And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
565+
And the "Record Reader" property of the ConsumeMQTT processor is set to "SparkplugBReader"
566+
And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter"
567+
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
568+
And a LogAttribute processor with the "Log Payload" property set to "true" in the "consumer-client" flow
569+
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
570+
And the "success" relationship of the PutFile processor is connected to the LogAttribute
571+
572+
When all instances start up
573+
574+
Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
575+
And a flowfile with the JSON content '[{"_isRetained":false,"timestamp":987654321,"metrics":[{"int_value":123,"timestamp":45345346346,"name":"TestMetric"}],"seq":12345,"body":"test-body","_topic":"spBv1.0/TestGroup/DDATA/TestNode/TestDevice","_topic.segment.0":"spBv1.0","uuid":"test-uuid","_topic.segment.1":"TestGroup","_topic.segment.3":"TestNode","_qos":0,"_topic.segment.4":"TestDevice","_topic.segment.2":"DDATA","_isDuplicate":false}]' is placed in the monitored directory in less than 60 seconds
576+
And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
577+
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds
578+
579+
Examples: MQTT versions
580+
| version |
581+
| 3.1.1 |
582+
| 5.0 |

docker/test/integration/features/steps/steps.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from minifi.controllers.JsonTreeReader import JsonTreeReader
2828
from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
2929
from minifi.controllers.SparkplugBReader import SparkplugBReader
30+
from minifi.controllers.SparkplugBWriter import SparkplugBWriter
3031

3132
from behave import given, then, when
3233
from behave.model_describe import ModelDescriptor
@@ -392,20 +393,30 @@ def step_impl(context, processor_name):
392393

393394

394395
# Record set reader and writer
395-
@given("a JsonRecordSetWriter controller service is set up with \"{}\" output grouping")
396-
def step_impl(context, output_grouping: str):
396+
@given("a JsonRecordSetWriter controller service is set up with \"{}\" output grouping in the \"{minifi_container_name}\" flow")
397+
def step_impl(context, output_grouping: str, minifi_container_name: str):
397398
json_record_set_writer = JsonRecordSetWriter(name="JsonRecordSetWriter", output_grouping=output_grouping)
398-
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
399+
container = context.test.acquire_container(context=context, name=minifi_container_name)
399400
container.add_controller(json_record_set_writer)
400401

401402

402-
@given("a JsonTreeReader controller service is set up")
403-
def step_impl(context):
403+
@given("a JsonTreeReader controller service is set up in the \"{minifi_container_name}\" flow")
404+
def step_impl(context, minifi_container_name: str):
404405
json_record_set_reader = JsonTreeReader("JsonTreeReader")
405-
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
406+
container = context.test.acquire_container(context=context, name=minifi_container_name)
406407
container.add_controller(json_record_set_reader)
407408

408409

410+
@given("a JsonRecordSetWriter controller service is set up with \"{}\" output grouping")
411+
def step_impl(context, output_grouping: str):
412+
context.execute_steps(f"given a JsonRecordSetWriter controller service is set up with \"{output_grouping}\" output grouping in the \"minifi-cpp-flow\" flow")
413+
414+
415+
@given("a JsonTreeReader controller service is set up")
416+
def step_impl(context):
417+
context.execute_steps("given a JsonTreeReader controller service is set up in the \"minifi-cpp-flow\" flow")
418+
419+
409420
# Kubernetes
410421
def __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, properties):
411422
kubernetes_controller_service = KubernetesControllerService("Kubernetes Controller Service", properties)
@@ -436,13 +447,30 @@ def step_impl(context):
436447
context.test.start('mqtt-broker')
437448

438449

439-
@given("a SparkplugBReader controller service is set up")
440-
def step_impl(context):
450+
@given("a SparkplugBReader controller service is set up in the \"{minifi_container_name}\" flow")
451+
def step_impl(context, minifi_container_name: str):
441452
json_record_set_reader = SparkplugBReader("SparkplugBReader")
442-
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
453+
container = context.test.acquire_container(context=context, name=minifi_container_name)
443454
container.add_controller(json_record_set_reader)
444455

445456

457+
@given("a SparkplugBWriter controller service is set up in the \"{minifi_container_name}\" flow")
458+
def step_impl(context, minifi_container_name: str):
459+
json_record_set_reader = SparkplugBWriter("SparkplugBWriter")
460+
container = context.test.acquire_container(context=context, name=minifi_container_name)
461+
container.add_controller(json_record_set_reader)
462+
463+
464+
@given("a SparkplugBReader controller service is set up")
465+
def step_impl(context):
466+
context.execute_steps("given a SparkplugBReader controller service is set up in the \"minifi-cpp-flow\" flow")
467+
468+
469+
@given("a SparkplugBWriter controller service is set up")
470+
def step_impl(context):
471+
context.execute_steps("given a SparkplugBWriter controller service is set up in the \"minifi-cpp-flow\" flow")
472+
473+
446474
@when("a test Sparkplug payload is published to the topic \"{topic}\"")
447475
def step_impl(context, topic):
448476
context.test.publish_test_sparkplug_payload(topic)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
from ..core.ControllerService import ControllerService
18+
19+
20+
class SparkplugBWriter(ControllerService):
21+
def __init__(self, name=None):
22+
super(SparkplugBWriter, self).__init__(name=name)
23+
self.service_class = 'SparkplugBWriter'

extension-utils/include/utils/ProcessorConfigUtils.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,4 +170,18 @@ template<typename ControllerServiceType>
170170
std::shared_ptr<ControllerServiceType> parseControllerService(const core::ProcessContext& context, const core::PropertyReference& prop, const utils::Identifier& processor_uuid) {
171171
return parseOptionalControllerService<ControllerServiceType>(context, prop, processor_uuid) | utils::orThrow("Required Controller Service");
172172
}
173+
174+
template<typename RecordSetIO>
175+
std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context, const core::PropertyReference& property,
176+
const utils::Identifier& processor_uuid) {
177+
std::string service_name = context.getProperty(property).value_or("");
178+
if (!IsNullOrEmpty(service_name)) {
179+
auto record_set_io = std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name, processor_uuid));
180+
if (!record_set_io) {
181+
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, fmt::format("'{}' property is set to invalid controller service '{}'", property.name, service_name));
182+
}
183+
return record_set_io;
184+
}
185+
return nullptr;
186+
}
173187
} // namespace org::apache::nifi::minifi::utils

0 commit comments

Comments
 (0)