Skip to content

Commit f0b83dc

Browse files
committed
Add Document Type property
1 parent cda0f92 commit f0b83dc

13 files changed

Lines changed: 91 additions & 16 deletions

File tree

PROCESSORS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2218,6 +2218,7 @@ In the list below, the names of required properties appear in bold. Any other pr
22182218
| **Bucket Name** | default | | The name of bucket to access.<br/>**Supports Expression Language: true** |
22192219
| Scope Name | | | Scope to use inside the bucket. If not specified, the _default scope is used.<br/>**Supports Expression Language: true** |
22202220
| Collection Name | | | Collection to use inside the bucket scope. If not specified, the _default collection is used.<br/>**Supports Expression Language: true** |
2221+
| **Document Type** | Json | Json<br/>Binary<br/>String | Content type to store data as. |
22212222
| Document Id | | | A static, fixed Couchbase document id, or an expression to construct the Couchbase document id. If not specified, the FlowFile UUID will be used.<br/>**Supports Expression Language: true** |
22222223
| **Persist To** | NONE | NONE<br/>ACTIVE<br/>ONE<br/>TWO<br/>THREE<br/>FOUR | Durability constraint about disk persistence. |
22232224
| **Replicate To** | NONE | NONE<br/>ONE<br/>TWO<br/>THREE | Durability constraint about replication. |

docker/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ azure-storage-blob==12.13.0
1212
prometheus-api-client==0.5.0
1313
humanfriendly==10.0
1414
requests<2.29 # https://github.com/docker/docker-py/issues/3113
15+
couchbase==4.3.2

docker/test/integration/cluster/DockerTestCluster.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from .checkers.SplunkChecker import SplunkChecker
3636
from .checkers.GrafanaLokiChecker import GrafanaLokiChecker
3737
from .checkers.ModbusChecker import ModbusChecker
38+
from .checkers.CouchbaseChecker import CouchbaseChecker
3839
from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, retry_check
3940

4041

@@ -54,6 +55,7 @@ def __init__(self, context, feature_id):
5455
self.grafana_loki_checker = GrafanaLokiChecker()
5556
self.minifi_controller_executor = MinifiControllerExecutor(self.container_communicator)
5657
self.modbus_checker = ModbusChecker(self.container_communicator)
58+
self.couchbase_checker = CouchbaseChecker()
5759

5860
def cleanup(self):
5961
self.container_store.cleanup()
@@ -424,3 +426,6 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd):
424426

425427
def enable_ssl_in_nifi(self):
426428
self.container_store.enable_ssl_in_nifi()
429+
430+
def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
431+
return self.couchbase_checker.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)

docker/test/integration/cluster/containers/CouchbaseServerContainer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,5 @@ def deploy(self):
5151
detach=True,
5252
name=self.name,
5353
network=self.network.name,
54+
ports={'11210/tcp': 11210},
5455
entrypoint=self.command)

docker/test/integration/features/MiNiFi_integration_test_driver.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,3 +445,6 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd):
445445

446446
def enable_ssl_in_nifi(self):
447447
self.cluster.enable_ssl_in_nifi()
448+
449+
def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
450+
assert self.cluster.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)

docker/test/integration/features/couchbase.feature

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ Feature: Executing Couchbase operations from MiNiFi-C++
1818
Background:
1919
Given the content of "/tmp/output" is monitored
2020

21-
Scenario: A MiNiFi instance can insert data to test bucket with PutCouchbaseKey processor
21+
Scenario: A MiNiFi instance can insert json data to test bucket with PutCouchbaseKey processor
2222
Given a Couchbase server is set up
2323
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
2424
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
2525
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
2626
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
27+
And the "Document Type" property of the PutCouchbaseKey processor is set to "Json"
2728
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
2829
And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService"
2930

@@ -39,3 +40,28 @@ Feature: Executing Couchbase operations from MiNiFi-C++
3940
And the Minifi logs match the following regex: "key:couchbase.doc.sequence.number value:[1-9][0-9]*" in less than 1 seconds
4041
And the Minifi logs match the following regex: "key:couchbase.partition.uuid value:[1-9][0-9]*" in less than 1 seconds
4142
And the Minifi logs match the following regex: "key:couchbase.partition.id value:[1-9][0-9]*" in less than 1 seconds
43+
And a document with id "test_doc_id" in bucket "test_bucket" is present with data '{"field1": "value1", "field2": "value2"}' of type "Json" in Couchbase
44+
45+
Scenario: A MiNiFi instance can insert binary data to test bucket with PutCouchbaseKey processor
46+
Given a Couchbase server is set up
47+
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
48+
And a file with the content '{"field1": "value1"}' is present in '/tmp/input'
49+
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
50+
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
51+
And the "Document Type" property of the PutCouchbaseKey processor is set to "Binary"
52+
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
53+
And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService"
54+
55+
And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
56+
And the "success" relationship of the PutCouchbaseKey processor is connected to the LogAttribute
57+
58+
When a Couchbase server is started
59+
And all instances start up
60+
61+
Then the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 60 seconds
62+
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
63+
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
64+
And the Minifi logs match the following regex: "key:couchbase.doc.sequence.number value:[1-9][0-9]*" in less than 1 seconds
65+
And the Minifi logs match the following regex: "key:couchbase.partition.uuid value:[1-9][0-9]*" in less than 1 seconds
66+
And the Minifi logs match the following regex: "key:couchbase.partition.id value:[1-9][0-9]*" in less than 1 seconds
67+
And a document with id "test_doc_id" in bucket "test_bucket" is present with data '{"field1": "value1"}' of type "Binary" in Couchbase

docker/test/integration/features/steps/steps.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,3 +1377,8 @@ def step_impl(context, processor_name, service_name):
13771377
processor = context.test.get_node_by_name(processor_name)
13781378
processor.controller_services.append(couchbase_cluster_controller_service)
13791379
processor.set_property("Couchbase Cluster Controller Service", couchbase_cluster_controller_service.name)
1380+
1381+
1382+
@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase")
1383+
def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str):
1384+
context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type)

extensions/couchbase/controllerservices/CouchbaseClusterService.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
*/
1818

1919
#include "CouchbaseClusterService.h"
20+
#include "couchbase/codec/raw_binary_transcoder.hxx"
21+
#include "couchbase/codec/raw_string_transcoder.hxx"
22+
#include "couchbase/codec/raw_json_transcoder.hxx"
2023

2124
#include "core/Resource.h"
2225

@@ -39,13 +42,22 @@ nonstd::expected<::couchbase::collection, CouchbaseErrorType> CouchbaseClient::g
3942
}
4043

4144
nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::upsert(const CouchbaseCollection& collection,
42-
const std::string& document_id, const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) {
45+
CouchbaseValueType document_type, const std::string& document_id, const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) {
4346
auto collection_result = getCollection(collection);
4447
if (!collection_result.has_value()) {
4548
return nonstd::make_unexpected(collection_result.error());
4649
}
4750

48-
auto [upsert_err, upsert_resp] = collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id, buffer, options).get();
51+
std::pair<::couchbase::error, ::couchbase::mutation_result> result;
52+
if (document_type == CouchbaseValueType::Json) {
53+
result = collection_result->upsert<::couchbase::codec::raw_json_transcoder>(document_id, buffer, options).get();
54+
} else if (document_type == CouchbaseValueType::String) {
55+
std::string data_str(reinterpret_cast<const char*>(buffer.data()), buffer.size());
56+
result = collection_result->upsert<::couchbase::codec::raw_string_transcoder>(document_id, data_str, options).get();
57+
} else {
58+
result = collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id, buffer, options).get();
59+
}
60+
auto& [upsert_err, upsert_resp] = result;
4961
if (upsert_err.ec()) {
5062
// ambiguous_timeout should not be retried as we do not know if the insert was successful or not
5163
if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && upsert_err.ec().value() != static_cast<int>(::couchbase::errc::common::ambiguous_timeout)) {

extensions/couchbase/controllerservices/CouchbaseClusterService.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
#include "couchbase/cluster.hxx"
3030
#include "core/ProcessContext.h"
3131
#include "core/logging/LoggerConfiguration.h"
32-
#include "couchbase/codec/raw_binary_transcoder.hxx"
33-
#include "couchbase/error.hxx"
3432

3533
namespace org::apache::nifi::minifi::couchbase {
3634

@@ -48,6 +46,12 @@ struct CouchbaseUpsertResult {
4846
std::uint16_t partition_id{0};
4947
};
5048

49+
enum class CouchbaseValueType {
50+
Json,
51+
Binary,
52+
String
53+
};
54+
5155
enum class CouchbaseErrorType {
5256
FATAL,
5357
TEMPORARY,
@@ -59,8 +63,8 @@ class CouchbaseClient {
5963
: connection_string_(std::move(connection_string)), username_(std::move(username)), password_(std::move(password)), logger_(logger) {
6064
}
6165

62-
nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection, const std::string& document_id, const std::vector<std::byte>& buffer,
63-
const ::couchbase::upsert_options& options);
66+
nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type, const std::string& document_id,
67+
const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options);
6468
std::optional<CouchbaseErrorType> establishConnection();
6569
void close();
6670

@@ -142,10 +146,10 @@ class CouchbaseClusterService : public core::controller::ControllerService {
142146
}
143147
}
144148

145-
virtual nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection,
149+
virtual nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type,
146150
const std::string& document_id, const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) {
147151
gsl_Expects(client_);
148-
return client_->upsert(collection, document_id, buffer, options);
152+
return client_->upsert(collection, document_type, document_id, buffer, options);
149153
}
150154

151155
static gsl::not_null<std::shared_ptr<CouchbaseClusterService>> getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property);

extensions/couchbase/processors/PutCouchbaseKey.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ namespace org::apache::nifi::minifi::couchbase::processors {
2525

2626
void PutCouchbaseKey::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
2727
couchbase_cluster_service_ = controllers::CouchbaseClusterService::getFromProperty(context, PutCouchbaseKey::CouchbaseClusterControllerService);
28+
document_type_ = utils::parseEnumProperty<CouchbaseValueType>(context, PutCouchbaseKey::DocumentType);
2829
persist_to_ = utils::parseEnumProperty<::couchbase::persist_to>(context, PutCouchbaseKey::PersistTo);
2930
replicate_to_ = utils::parseEnumProperty<::couchbase::replicate_to>(context, PutCouchbaseKey::ReplicateTo);
3031
}
@@ -61,7 +62,7 @@ void PutCouchbaseKey::onTrigger(core::ProcessContext& context, core::ProcessSess
6162
::couchbase::upsert_options options;
6263
options.durability(persist_to_, replicate_to_);
6364
auto result = session.readBuffer(flow_file);
64-
if (auto upsert_result = couchbase_cluster_service_->upsert(collection, document_id, result.buffer, options)) {
65+
if (auto upsert_result = couchbase_cluster_service_->upsert(collection, document_type_, document_id, result.buffer, options)) {
6566
session.putAttribute(*flow_file, "couchbase.bucket", upsert_result->bucket_name);
6667
session.putAttribute(*flow_file, "couchbase.doc.id", document_id);
6768
session.putAttribute(*flow_file, "couchbase.doc.cas", std::to_string(upsert_result->cas));

0 commit comments

Comments
 (0)